From 41078ec072aa7e4d0b18d1c82234a9952e10b4d5 Mon Sep 17 00:00:00 2001 From: Nithin Pankaj Date: Sun, 2 Jun 2024 15:52:34 +0530 Subject: [PATCH] Provide configuration backed support for atmost once delivery (#150) Signed-off-by: Nithin --- .../client/config/KafkaProducerConfig.java | 12 ++++++++ .../rest/client/config/KafkaProperties.java | 1 + .../service/impl/EventPublishServiceImpl.java | 28 +++++++++++-------- .../application-default-consumer.yml | 1 + .../application-per-partition-consumer.yml | 1 + src/test/resources/application.yml | 1 + 6 files changed, 32 insertions(+), 12 deletions(-) diff --git a/src/main/java/hlf/java/rest/client/config/KafkaProducerConfig.java b/src/main/java/hlf/java/rest/client/config/KafkaProducerConfig.java index e86dfa4..7e202df 100644 --- a/src/main/java/hlf/java/rest/client/config/KafkaProducerConfig.java +++ b/src/main/java/hlf/java/rest/client/config/KafkaProducerConfig.java @@ -22,6 +22,9 @@ @RefreshScope public class KafkaProducerConfig extends BaseKafkaConfig { + private static final String PRODUCER_ALL_ACKS = "all"; + private static final int RETRIES_CONFIG_FOR_AT_MOST_ONCE = 0; + @Autowired private KafkaProperties kafkaProperties; @Autowired private MeterRegistry meterRegistry; @@ -39,6 +42,15 @@ public ProducerFactory eventProducerFactory( props.put( ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, kafkaProducerProperties.getEnableIdempotence()); + if (kafkaProducerProperties.getEnableAtMostOnceSemantics()) { + // at-most once requires retries to be set as zero since the client wouldn't re-attempt a + // publish in case of Broker failure. + props.put(ProducerConfig.RETRIES_CONFIG, RETRIES_CONFIG_FOR_AT_MOST_ONCE); + props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); + + log.info("Kafka producer will be initialised with at-most once behaviour"); + } + // Azure event-hub config configureSaslProperties(props, kafkaProducerProperties.getSaslJaasConfig()); diff --git a/src/main/java/hlf/java/rest/client/config/KafkaProperties.java b/src/main/java/hlf/java/rest/client/config/KafkaProperties.java index f5df694..d909834 100644 --- a/src/main/java/hlf/java/rest/client/config/KafkaProperties.java +++ b/src/main/java/hlf/java/rest/client/config/KafkaProperties.java @@ -30,6 +30,7 @@ public static class Producer extends SSLProperties { private String topic; private String saslJaasConfig; private Boolean enableIdempotence; + private Boolean enableAtMostOnceSemantics; @Override public String toString() { diff --git a/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java b/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java index 5598531..3639069 100644 --- a/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java +++ b/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java @@ -130,16 +130,18 @@ public boolean publishChaincodeEvents( @Override public void onSuccess(SendResult result) { log.info( - "Sent message=[" - + payload - + "] with offset=[" - + result.getRecordMetadata().offset() - + "]"); + "Sent message '{}' to partition {} for offset {}", + payload, + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); } @Override public void onFailure(Throwable ex) { - log.error("Unable to send message=[" + payload + "] due to : " + ex.getMessage()); + log.error( + "Failed to send message event for Transaction ID {} due to {}", + fabricTxId, + ex.getMessage()); } }); @@ -214,16 +216,18 @@ public boolean publishBlockEvents( @Override public void onSuccess(SendResult result) { log.info( - "Sent message=[" - + payload - + "] with offset=[" - + result.getRecordMetadata().offset() - + "]"); + "Sent message '{}' to partition {} for offset {}", + payload, + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); } @Override public void onFailure(Throwable ex) { - log.error("Unable to send message=[" + payload + "] due to : " + ex.getMessage()); + log.error( + "Failed to send message event for Transaction ID {} due to {}", + fabricTxId, + ex.getMessage()); } }); diff --git a/src/test/resources/application-default-consumer.yml b/src/test/resources/application-default-consumer.yml index 2ce465a..f447865 100644 --- a/src/test/resources/application-default-consumer.yml +++ b/src/test/resources/application-default-consumer.yml @@ -31,6 +31,7 @@ kafka: brokerHost: localhost:9092 topic: test-publisher-event-topice enable-idempotence: true + enable-at-most-once-semantics: true management: endpoints: web: diff --git a/src/test/resources/application-per-partition-consumer.yml b/src/test/resources/application-per-partition-consumer.yml index d9c0eaa..7f0f77b 100644 --- a/src/test/resources/application-per-partition-consumer.yml +++ b/src/test/resources/application-per-partition-consumer.yml @@ -37,6 +37,7 @@ kafka: topic: test-consumer-dlt ssl-enabled: false enable-idempotence: true + enable-at-most-once-semantics: true management: endpoints: web: diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml index c594a47..92d67c1 100644 --- a/src/test/resources/application.yml +++ b/src/test/resources/application.yml @@ -47,6 +47,7 @@ kafka: brokerHost: localhost:9093 topic: hlf-offchain-topic enable-idempotence: true + enable-at-most-once-semantics: true dedupe: enable: false ---