Skip to content

Commit

Permalink
Update consumer-shared (change in topic naming and retetion checks)
Browse files Browse the repository at this point in the history
  • Loading branch information
trondsevre committed Sep 3, 2024
1 parent 751986c commit 8ff936e
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 21 deletions.
14 changes: 7 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
plugins {
id 'org.springframework.boot' version '3.3.1'
id 'io.spring.dependency-management' version '1.1.5'
id 'org.springframework.boot' version '3.3.3'
id 'io.spring.dependency-management' version '1.1.6'
id 'java'
id 'groovy'
id 'com.github.ben-manes.versions' version '0.51.0'
Expand All @@ -26,12 +26,12 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-webflux'

implementation "no.fint:fint-administrasjon-resource-model-java:${apiVersion}"
implementation 'no.fintlabs:fint-core-resource-server-security:2.0.0'
implementation 'no.fintlabs:fint-core-consumer-shared:2.4.1'
implementation 'no.fintlabs:fint-core-resource-server-security:2.1.0'
implementation 'no.fintlabs:fint-core-consumer-shared:2.5.0'

implementation 'com.google.guava:guava:33.2.1-jre'
implementation 'org.apache.commons:commons-lang3:3.14.0'
implementation 'io.netty:netty-resolver-dns-native-macos:4.1.89.Final:osx-aarch_64'
implementation 'com.google.guava:guava:33.3.0-jre'
implementation 'org.apache.commons:commons-lang3:3.17.0'
implementation 'io.netty:netty-resolver-dns-native-macos:4.2.0.Alpha3:osx-aarch_64'

compileOnly 'org.projectlombok:lombok'
runtimeOnly 'io.micrometer:micrometer-registry-prometheus'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
import no.fint.model.resource.administrasjon.organisasjon.ArbeidslokasjonResource;
import no.fintlabs.core.consumer.shared.resource.event.EventRequestKafkaConsumer;
import no.fintlabs.kafka.event.EventConsumerFactoryService;
import no.fintlabs.kafka.event.topic.EventTopicService;
import org.springframework.stereotype.Service;

@Service
public class ArbeidslokasjonRequestKafkaConsumer extends EventRequestKafkaConsumer<ArbeidslokasjonResource> {
public ArbeidslokasjonRequestKafkaConsumer(EventConsumerFactoryService eventConsumerFactoryService, ArbeidslokasjonConfig arbeidslokasjonConfig) {
super(eventConsumerFactoryService, arbeidslokasjonConfig);
public ArbeidslokasjonRequestKafkaConsumer(EventConsumerFactoryService eventConsumerFactoryService, ArbeidslokasjonConfig arbeidslokasjonConfig, EventTopicService eventTopicService) {
super(eventConsumerFactoryService, arbeidslokasjonConfig, eventTopicService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import no.fint.model.resource.administrasjon.organisasjon.ArbeidslokasjonResource;
import no.fintlabs.core.consumer.shared.resource.event.EventResponseKafkaConsumer;
import no.fintlabs.kafka.event.EventConsumerFactoryService;
import no.fintlabs.kafka.event.topic.EventTopicService;
import org.springframework.stereotype.Service;

@Service
public class ArbeidslokasjonResponseKafkaConsumer extends EventResponseKafkaConsumer<ArbeidslokasjonResource> {

public ArbeidslokasjonResponseKafkaConsumer(EventConsumerFactoryService eventConsumerFactoryService, ArbeidslokasjonConfig arbeidslokasjonConfig, ArbeidslokasjonLinker arbeidslokasjonLinker) {
super(eventConsumerFactoryService, arbeidslokasjonConfig, arbeidslokasjonLinker);
public ArbeidslokasjonResponseKafkaConsumer(EventConsumerFactoryService eventConsumerFactoryService, ArbeidslokasjonConfig arbeidslokasjonConfig, ArbeidslokasjonLinker arbeidslokasjonLinker, EventTopicService eventTopicService) {
super(eventConsumerFactoryService, arbeidslokasjonConfig, arbeidslokasjonLinker, eventTopicService);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,19 @@ protected Cache<ArbeidslokasjonResource> initializeCache(CacheManager cacheManag

@PostConstruct
private void registerKafkaListener() {
long retention = entityKafkaConsumer.registerListener(ArbeidslokasjonResource.class, this::addResourceToCache);
getCache().setRetentionPeriodInMs(retention);
entityKafkaConsumer.registerListener(ArbeidslokasjonResource.class, this::addResourceToCache);
}

private void addResourceToCache(ConsumerRecord<String, ArbeidslokasjonResource> consumerRecord) {
updateRetensionTime(consumerRecord.headers().lastHeader("topic-retension-time"));
this.eventLogger.logDataRecieved();
ArbeidslokasjonResource resource = consumerRecord.value();
if (resource == null) {
getCache().remove(consumerRecord.key());
} else {
linker.mapLinks(resource);
this.getCache().put(consumerRecord.key(), resource, linker.hashCodes(resource));
if (consumerRecord.headers().lastHeader("event-corr-id") != null){
if (consumerRecord.headers().lastHeader("event-corr-id") != null) {
String corrId = new String(consumerRecord.headers().lastHeader("event-corr-id").value(), StandardCharsets.UTF_8);
log.debug("Adding corrId to EntityResponseCache: {}", corrId);
arbeidslokasjonResponseKafkaConsumer.getEntityCache().add(corrId, resource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
import no.fint.model.resource.administrasjon.organisasjon.OrganisasjonselementResource;
import no.fintlabs.core.consumer.shared.resource.event.EventRequestKafkaConsumer;
import no.fintlabs.kafka.event.EventConsumerFactoryService;
import no.fintlabs.kafka.event.topic.EventTopicService;
import org.springframework.stereotype.Service;

@Service
public class OrganisasjonselementRequestKafkaConsumer extends EventRequestKafkaConsumer<OrganisasjonselementResource> {
public OrganisasjonselementRequestKafkaConsumer(EventConsumerFactoryService eventConsumerFactoryService, OrganisasjonselementConfig organisasjonselementConfig) {
super(eventConsumerFactoryService, organisasjonselementConfig);
public OrganisasjonselementRequestKafkaConsumer(EventConsumerFactoryService eventConsumerFactoryService, OrganisasjonselementConfig organisasjonselementConfig, EventTopicService eventTopicService) {
super(eventConsumerFactoryService, organisasjonselementConfig, eventTopicService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import no.fint.model.resource.administrasjon.organisasjon.OrganisasjonselementResource;
import no.fintlabs.core.consumer.shared.resource.event.EventResponseKafkaConsumer;
import no.fintlabs.kafka.event.EventConsumerFactoryService;
import no.fintlabs.kafka.event.topic.EventTopicService;
import org.springframework.stereotype.Service;

@Service
public class OrganisasjonselementResponseKafkaConsumer extends EventResponseKafkaConsumer<OrganisasjonselementResource> {

public OrganisasjonselementResponseKafkaConsumer(EventConsumerFactoryService eventConsumerFactoryService, OrganisasjonselementConfig organisasjonselementConfig, OrganisasjonselementLinker organisasjonselementLinker) {
super(eventConsumerFactoryService, organisasjonselementConfig, organisasjonselementLinker);
public OrganisasjonselementResponseKafkaConsumer(EventConsumerFactoryService eventConsumerFactoryService, OrganisasjonselementConfig organisasjonselementConfig, OrganisasjonselementLinker organisasjonselementLinker, EventTopicService eventTopicService) {
super(eventConsumerFactoryService, organisasjonselementConfig, organisasjonselementLinker, eventTopicService);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,19 @@ protected Cache<OrganisasjonselementResource> initializeCache(CacheManager cache

@PostConstruct
private void registerKafkaListener() {
long retention = entityKafkaConsumer.registerListener(OrganisasjonselementResource.class, this::addResourceToCache);
getCache().setRetentionPeriodInMs(retention);
entityKafkaConsumer.registerListener(OrganisasjonselementResource.class, this::addResourceToCache);
}

private void addResourceToCache(ConsumerRecord<String, OrganisasjonselementResource> consumerRecord) {
updateRetensionTime(consumerRecord.headers().lastHeader("topic-retension-time"));
this.eventLogger.logDataRecieved();
OrganisasjonselementResource resource = consumerRecord.value();
if (resource == null) {
getCache().remove(consumerRecord.key());
} else {
linker.mapLinks(resource);
this.getCache().put(consumerRecord.key(), resource, linker.hashCodes(resource));
if (consumerRecord.headers().lastHeader("event-corr-id") != null){
if (consumerRecord.headers().lastHeader("event-corr-id") != null) {
String corrId = new String(consumerRecord.headers().lastHeader("event-corr-id").value(), StandardCharsets.UTF_8);
log.debug("Adding corrId to EntityResponseCache: {}", corrId);
organisasjonselementResponseKafkaConsumer.getEntityCache().add(corrId, resource);
Expand Down

0 comments on commit 8ff936e

Please sign in to comment.