Skip to content

Commit

Permalink
Provide configuration backed support for atmost once delivery (#150)
Browse files Browse the repository at this point in the history
Signed-off-by: Nithin <nithin.pankaj@walmart.com>
  • Loading branch information
nithin-pankaj authored Jun 2, 2024
1 parent bee61b2 commit 41078ec
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 12 deletions.
12 changes: 12 additions & 0 deletions src/main/java/hlf/java/rest/client/config/KafkaProducerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
@RefreshScope
public class KafkaProducerConfig extends BaseKafkaConfig {

private static final String PRODUCER_ALL_ACKS = "all";
private static final int RETRIES_CONFIG_FOR_AT_MOST_ONCE = 0;

@Autowired private KafkaProperties kafkaProperties;

@Autowired private MeterRegistry meterRegistry;
Expand All @@ -39,6 +42,15 @@ public ProducerFactory<String, String> eventProducerFactory(
props.put(
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, kafkaProducerProperties.getEnableIdempotence());

if (kafkaProducerProperties.getEnableAtMostOnceSemantics()) {
// at-most once requires retries to be set as zero since the client wouldn't re-attempt a
// publish in case of Broker failure.
props.put(ProducerConfig.RETRIES_CONFIG, RETRIES_CONFIG_FOR_AT_MOST_ONCE);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);

log.info("Kafka producer will be initialised with at-most once behaviour");
}

// Azure event-hub config
configureSaslProperties(props, kafkaProducerProperties.getSaslJaasConfig());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public static class Producer extends SSLProperties {
private String topic;
private String saslJaasConfig;
private Boolean enableIdempotence;
private Boolean enableAtMostOnceSemantics;

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,18 @@ public boolean publishChaincodeEvents(
@Override
public void onSuccess(SendResult<String, String> result) {
log.info(
"Sent message=["
+ payload
+ "] with offset=["
+ result.getRecordMetadata().offset()
+ "]");
"Sent message '{}' to partition {} for offset {}",
payload,
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}

@Override
public void onFailure(Throwable ex) {
log.error("Unable to send message=[" + payload + "] due to : " + ex.getMessage());
log.error(
"Failed to send message event for Transaction ID {} due to {}",
fabricTxId,
ex.getMessage());
}
});

Expand Down Expand Up @@ -214,16 +216,18 @@ public boolean publishBlockEvents(
@Override
public void onSuccess(SendResult<String, String> result) {
log.info(
"Sent message=["
+ payload
+ "] with offset=["
+ result.getRecordMetadata().offset()
+ "]");
"Sent message '{}' to partition {} for offset {}",
payload,
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}

@Override
public void onFailure(Throwable ex) {
log.error("Unable to send message=[" + payload + "] due to : " + ex.getMessage());
log.error(
"Failed to send message event for Transaction ID {} due to {}",
fabricTxId,
ex.getMessage());
}
});

Expand Down
1 change: 1 addition & 0 deletions src/test/resources/application-default-consumer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ kafka:
brokerHost: localhost:9092
topic: test-publisher-event-topice
enable-idempotence: true
enable-at-most-once-semantics: true
management:
endpoints:
web:
Expand Down
1 change: 1 addition & 0 deletions src/test/resources/application-per-partition-consumer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ kafka:
topic: test-consumer-dlt
ssl-enabled: false
enable-idempotence: true
enable-at-most-once-semantics: true
management:
endpoints:
web:
Expand Down
1 change: 1 addition & 0 deletions src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ kafka:
brokerHost: localhost:9093
topic: hlf-offchain-topic
enable-idempotence: true
enable-at-most-once-semantics: true
dedupe:
enable: false
---
Expand Down

0 comments on commit 41078ec

Please sign in to comment.