From 8ff936e4116de950535ec91f8f3082b5c7111ec9 Mon Sep 17 00:00:00 2001 From: Trond Magnus Sevre Date: Tue, 3 Sep 2024 14:32:23 +0200 Subject: [PATCH] Update consumer-shared (change in topic naming and retetion checks) --- build.gradle | 14 +++++++------- .../ArbeidslokasjonRequestKafkaConsumer.java | 5 +++-- .../ArbeidslokasjonResponseKafkaConsumer.java | 5 +++-- .../arbeidslokasjon/ArbeidslokasjonService.java | 6 +++--- .../OrganisasjonselementRequestKafkaConsumer.java | 5 +++-- .../OrganisasjonselementResponseKafkaConsumer.java | 5 +++-- .../OrganisasjonselementService.java | 6 +++--- 7 files changed, 25 insertions(+), 21 deletions(-) diff --git a/build.gradle b/build.gradle index 3c0d400..d4b7765 100644 --- a/build.gradle +++ b/build.gradle @@ -1,6 +1,6 @@ plugins { - id 'org.springframework.boot' version '3.3.1' - id 'io.spring.dependency-management' version '1.1.5' + id 'org.springframework.boot' version '3.3.3' + id 'io.spring.dependency-management' version '1.1.6' id 'java' id 'groovy' id 'com.github.ben-manes.versions' version '0.51.0' @@ -26,12 +26,12 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-webflux' implementation "no.fint:fint-administrasjon-resource-model-java:${apiVersion}" - implementation 'no.fintlabs:fint-core-resource-server-security:2.0.0' - implementation 'no.fintlabs:fint-core-consumer-shared:2.4.1' + implementation 'no.fintlabs:fint-core-resource-server-security:2.1.0' + implementation 'no.fintlabs:fint-core-consumer-shared:2.5.0' - implementation 'com.google.guava:guava:33.2.1-jre' - implementation 'org.apache.commons:commons-lang3:3.14.0' - implementation 'io.netty:netty-resolver-dns-native-macos:4.1.89.Final:osx-aarch_64' + implementation 'com.google.guava:guava:33.3.0-jre' + implementation 'org.apache.commons:commons-lang3:3.17.0' + implementation 'io.netty:netty-resolver-dns-native-macos:4.2.0.Alpha3:osx-aarch_64' compileOnly 'org.projectlombok:lombok' runtimeOnly 'io.micrometer:micrometer-registry-prometheus' diff --git a/src/main/java/no/fintlabs/consumer/model/arbeidslokasjon/ArbeidslokasjonRequestKafkaConsumer.java b/src/main/java/no/fintlabs/consumer/model/arbeidslokasjon/ArbeidslokasjonRequestKafkaConsumer.java index 296c6cc..313eddf 100644 --- a/src/main/java/no/fintlabs/consumer/model/arbeidslokasjon/ArbeidslokasjonRequestKafkaConsumer.java +++ b/src/main/java/no/fintlabs/consumer/model/arbeidslokasjon/ArbeidslokasjonRequestKafkaConsumer.java @@ -3,11 +3,12 @@ import no.fint.model.resource.administrasjon.organisasjon.ArbeidslokasjonResource; import no.fintlabs.core.consumer.shared.resource.event.EventRequestKafkaConsumer; import no.fintlabs.kafka.event.EventConsumerFactoryService; +import no.fintlabs.kafka.event.topic.EventTopicService; import org.springframework.stereotype.Service; @Service public class ArbeidslokasjonRequestKafkaConsumer extends EventRequestKafkaConsumer { - public ArbeidslokasjonRequestKafkaConsumer(EventConsumerFactoryService eventConsumerFactoryService, ArbeidslokasjonConfig arbeidslokasjonConfig) { - super(eventConsumerFactoryService, arbeidslokasjonConfig); + public ArbeidslokasjonRequestKafkaConsumer(EventConsumerFactoryService eventConsumerFactoryService, ArbeidslokasjonConfig arbeidslokasjonConfig, EventTopicService eventTopicService) { + super(eventConsumerFactoryService, arbeidslokasjonConfig, eventTopicService); } } diff --git a/src/main/java/no/fintlabs/consumer/model/arbeidslokasjon/ArbeidslokasjonResponseKafkaConsumer.java b/src/main/java/no/fintlabs/consumer/model/arbeidslokasjon/ArbeidslokasjonResponseKafkaConsumer.java index cbe7313..844f491 100644 --- a/src/main/java/no/fintlabs/consumer/model/arbeidslokasjon/ArbeidslokasjonResponseKafkaConsumer.java +++ b/src/main/java/no/fintlabs/consumer/model/arbeidslokasjon/ArbeidslokasjonResponseKafkaConsumer.java @@ -3,13 +3,14 @@ import no.fint.model.resource.administrasjon.organisasjon.ArbeidslokasjonResource; import no.fintlabs.core.consumer.shared.resource.event.EventResponseKafkaConsumer; import no.fintlabs.kafka.event.EventConsumerFactoryService; +import no.fintlabs.kafka.event.topic.EventTopicService; import org.springframework.stereotype.Service; @Service public class ArbeidslokasjonResponseKafkaConsumer extends EventResponseKafkaConsumer { - public ArbeidslokasjonResponseKafkaConsumer(EventConsumerFactoryService eventConsumerFactoryService, ArbeidslokasjonConfig arbeidslokasjonConfig, ArbeidslokasjonLinker arbeidslokasjonLinker) { - super(eventConsumerFactoryService, arbeidslokasjonConfig, arbeidslokasjonLinker); + public ArbeidslokasjonResponseKafkaConsumer(EventConsumerFactoryService eventConsumerFactoryService, ArbeidslokasjonConfig arbeidslokasjonConfig, ArbeidslokasjonLinker arbeidslokasjonLinker, EventTopicService eventTopicService) { + super(eventConsumerFactoryService, arbeidslokasjonConfig, arbeidslokasjonLinker, eventTopicService); } } diff --git a/src/main/java/no/fintlabs/consumer/model/arbeidslokasjon/ArbeidslokasjonService.java b/src/main/java/no/fintlabs/consumer/model/arbeidslokasjon/ArbeidslokasjonService.java index 684c079..b9ddfa2 100644 --- a/src/main/java/no/fintlabs/consumer/model/arbeidslokasjon/ArbeidslokasjonService.java +++ b/src/main/java/no/fintlabs/consumer/model/arbeidslokasjon/ArbeidslokasjonService.java @@ -44,11 +44,11 @@ protected Cache initializeCache(CacheManager cacheManag @PostConstruct private void registerKafkaListener() { - long retention = entityKafkaConsumer.registerListener(ArbeidslokasjonResource.class, this::addResourceToCache); - getCache().setRetentionPeriodInMs(retention); + entityKafkaConsumer.registerListener(ArbeidslokasjonResource.class, this::addResourceToCache); } private void addResourceToCache(ConsumerRecord consumerRecord) { + updateRetensionTime(consumerRecord.headers().lastHeader("topic-retension-time")); this.eventLogger.logDataRecieved(); ArbeidslokasjonResource resource = consumerRecord.value(); if (resource == null) { @@ -56,7 +56,7 @@ private void addResourceToCache(ConsumerRecord } else { linker.mapLinks(resource); this.getCache().put(consumerRecord.key(), resource, linker.hashCodes(resource)); - if (consumerRecord.headers().lastHeader("event-corr-id") != null){ + if (consumerRecord.headers().lastHeader("event-corr-id") != null) { String corrId = new String(consumerRecord.headers().lastHeader("event-corr-id").value(), StandardCharsets.UTF_8); log.debug("Adding corrId to EntityResponseCache: {}", corrId); arbeidslokasjonResponseKafkaConsumer.getEntityCache().add(corrId, resource); diff --git a/src/main/java/no/fintlabs/consumer/model/organisasjonselement/OrganisasjonselementRequestKafkaConsumer.java b/src/main/java/no/fintlabs/consumer/model/organisasjonselement/OrganisasjonselementRequestKafkaConsumer.java index f57d20e..c8cc701 100644 --- a/src/main/java/no/fintlabs/consumer/model/organisasjonselement/OrganisasjonselementRequestKafkaConsumer.java +++ b/src/main/java/no/fintlabs/consumer/model/organisasjonselement/OrganisasjonselementRequestKafkaConsumer.java @@ -3,11 +3,12 @@ import no.fint.model.resource.administrasjon.organisasjon.OrganisasjonselementResource; import no.fintlabs.core.consumer.shared.resource.event.EventRequestKafkaConsumer; import no.fintlabs.kafka.event.EventConsumerFactoryService; +import no.fintlabs.kafka.event.topic.EventTopicService; import org.springframework.stereotype.Service; @Service public class OrganisasjonselementRequestKafkaConsumer extends EventRequestKafkaConsumer { - public OrganisasjonselementRequestKafkaConsumer(EventConsumerFactoryService eventConsumerFactoryService, OrganisasjonselementConfig organisasjonselementConfig) { - super(eventConsumerFactoryService, organisasjonselementConfig); + public OrganisasjonselementRequestKafkaConsumer(EventConsumerFactoryService eventConsumerFactoryService, OrganisasjonselementConfig organisasjonselementConfig, EventTopicService eventTopicService) { + super(eventConsumerFactoryService, organisasjonselementConfig, eventTopicService); } } diff --git a/src/main/java/no/fintlabs/consumer/model/organisasjonselement/OrganisasjonselementResponseKafkaConsumer.java b/src/main/java/no/fintlabs/consumer/model/organisasjonselement/OrganisasjonselementResponseKafkaConsumer.java index 49c33ef..f727ee2 100644 --- a/src/main/java/no/fintlabs/consumer/model/organisasjonselement/OrganisasjonselementResponseKafkaConsumer.java +++ b/src/main/java/no/fintlabs/consumer/model/organisasjonselement/OrganisasjonselementResponseKafkaConsumer.java @@ -3,13 +3,14 @@ import no.fint.model.resource.administrasjon.organisasjon.OrganisasjonselementResource; import no.fintlabs.core.consumer.shared.resource.event.EventResponseKafkaConsumer; import no.fintlabs.kafka.event.EventConsumerFactoryService; +import no.fintlabs.kafka.event.topic.EventTopicService; import org.springframework.stereotype.Service; @Service public class OrganisasjonselementResponseKafkaConsumer extends EventResponseKafkaConsumer { - public OrganisasjonselementResponseKafkaConsumer(EventConsumerFactoryService eventConsumerFactoryService, OrganisasjonselementConfig organisasjonselementConfig, OrganisasjonselementLinker organisasjonselementLinker) { - super(eventConsumerFactoryService, organisasjonselementConfig, organisasjonselementLinker); + public OrganisasjonselementResponseKafkaConsumer(EventConsumerFactoryService eventConsumerFactoryService, OrganisasjonselementConfig organisasjonselementConfig, OrganisasjonselementLinker organisasjonselementLinker, EventTopicService eventTopicService) { + super(eventConsumerFactoryService, organisasjonselementConfig, organisasjonselementLinker, eventTopicService); } } diff --git a/src/main/java/no/fintlabs/consumer/model/organisasjonselement/OrganisasjonselementService.java b/src/main/java/no/fintlabs/consumer/model/organisasjonselement/OrganisasjonselementService.java index 2fde54d..88359d2 100644 --- a/src/main/java/no/fintlabs/consumer/model/organisasjonselement/OrganisasjonselementService.java +++ b/src/main/java/no/fintlabs/consumer/model/organisasjonselement/OrganisasjonselementService.java @@ -44,11 +44,11 @@ protected Cache initializeCache(CacheManager cache @PostConstruct private void registerKafkaListener() { - long retention = entityKafkaConsumer.registerListener(OrganisasjonselementResource.class, this::addResourceToCache); - getCache().setRetentionPeriodInMs(retention); + entityKafkaConsumer.registerListener(OrganisasjonselementResource.class, this::addResourceToCache); } private void addResourceToCache(ConsumerRecord consumerRecord) { + updateRetensionTime(consumerRecord.headers().lastHeader("topic-retension-time")); this.eventLogger.logDataRecieved(); OrganisasjonselementResource resource = consumerRecord.value(); if (resource == null) { @@ -56,7 +56,7 @@ private void addResourceToCache(ConsumerRecord