Skip to content

Commit

Permalink
Introduce dedupe for outbound events (#148)
Browse files Browse the repository at this point in the history
* Update application.template with dedupe config details

Signed-off-by: Nithin <nithin.pankaj@walmart.com>

* Enable configurations for Dedupe and Producer idempotence

Signed-off-by: Nithin <nithin.pankaj@walmart.com>

* Service and impl for RecencyTransactionContext

Signed-off-by: Nithin <nithin.pankaj@walmart.com>

* Setting transaction context prior to submitting proposal

Signed-off-by: Nithin <nithin.pankaj@walmart.com>

* Validate for duplicate events prior to kafka submission

Signed-off-by: Nithin <nithin.pankaj@walmart.com>

* Support optional Transaction Id filter for event replay

Signed-off-by: Nithin <nithin.pankaj@walmart.com>

* Add dependency for Guava utils

Signed-off-by: Nithin <nithin.pankaj@walmart.com>

* Test fix for producer idempotence

Signed-off-by: Nithin <nithin.pankaj@walmart.com>

---------

Signed-off-by: Nithin <nithin.pankaj@walmart.com>
  • Loading branch information
nithin-pankaj authored May 30, 2024
1 parent cb8a11c commit 96b353f
Show file tree
Hide file tree
Showing 18 changed files with 203 additions and 18 deletions.
9 changes: 9 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@
<scope>runtime</scope>
<optional>true</optional>
</dependency>

<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.2.0-jre</version>
</dependency>


<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
Expand Down
47 changes: 47 additions & 0 deletions src/main/java/hlf/java/rest/client/config/EventDedupeConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package hlf.java.rest.client.config;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import hlf.java.rest.client.service.RecencyTransactionContext;
import hlf.java.rest.client.service.impl.DefaultCacheBasedRecencyTransactionContext;
import hlf.java.rest.client.service.impl.NoOpRecencyTransactionContext;
import java.util.concurrent.TimeUnit;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Data
@Configuration
@ConfigurationProperties(prefix = "dedupe")
@Slf4j
public class EventDedupeConfig {

private boolean enable;
private int recencyWindowSize;
private int recencyWindowExpiryInMinutes;

@Bean
public RecencyTransactionContext recencyTransactionContext() {

if (!this.isEnable()) {
log.info(
"Dedupe config is disabled. Events wont be validated prior to submission to publisher topic");
return new NoOpRecencyTransactionContext();
}

Cache<String, Object> recencyCache =
CacheBuilder.newBuilder()
.maximumSize(this.getRecencyWindowSize())
.expireAfterAccess(this.getRecencyWindowExpiryInMinutes(), TimeUnit.MINUTES)
.build();

log.info(
"Enabling recency check with cache size {} and TTL {} minutes",
recencyWindowSize,
recencyWindowExpiryInMinutes);

return new DefaultCacheBasedRecencyTransactionContext(recencyCache);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public ProducerFactory<String, String> eventProducerFactory(
props.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer.class);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, Boolean.FALSE);
props.put(
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, kafkaProducerProperties.getEnableIdempotence());

// Azure event-hub config
configureSaslProperties(props, kafkaProducerProperties.getSaslJaasConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public static class Producer extends SSLProperties {
private String brokerHost;
private String topic;
private String saslJaasConfig;
private Boolean enableIdempotence;

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class EventController {
public ResponseEntity<ClientResponseModel> replayEvents(
@RequestParam("block-number-start") Long startBlockNumber,
@RequestParam("block-number-end") Long endBlockNumber,
@RequestParam(value = "transaction-id", required = false) String transactionId,
@RequestParam("channel") @Validated String networkName,
@RequestParam("eventType") @Validated String eventType) {
log.info(
Expand All @@ -41,6 +42,7 @@ public ResponseEntity<ClientResponseModel> replayEvents(
endBlockNumber,
networkName,
eventType);
return eventFulfillment.replayEvents(startBlockNumber, endBlockNumber, networkName, eventType);
return eventFulfillment.replayEvents(
startBlockNumber, endBlockNumber, transactionId, networkName, eventType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import hlf.java.rest.client.model.EventType;
import hlf.java.rest.client.sdk.StandardCCEvent;
import hlf.java.rest.client.service.EventPublishService;
import hlf.java.rest.client.service.RecencyTransactionContext;
import hlf.java.rest.client.util.FabricClientConstants;
import hlf.java.rest.client.util.FabricEventParseUtil;
import java.nio.charset.StandardCharsets;
Expand All @@ -29,6 +30,8 @@ public class ChaincodeEventListener {

@Autowired private FabricProperties fabricProperties;

@Autowired private RecencyTransactionContext recencyTransactionContext;

private static String eventTxnId = FabricClientConstants.FABRIC_TRANSACTION_ID;

public void chaincodeEventListener(ContractEvent contractEvent) {
Expand All @@ -43,7 +46,16 @@ public void chaincodeEventListener(ContractEvent contractEvent) {
? new String(contractEvent.getPayload().get(), StandardCharsets.UTF_8)
: StringUtils.EMPTY;

publishChaincodeEvent(txId, chaincodeId, eventName, payload, channelName, blockNumber);
if (recencyTransactionContext.validateAndRemoveTransactionContext(txId)) {
publishChaincodeEvent(txId, chaincodeId, eventName, payload, channelName, blockNumber);
return;
}

log.info(
"TxnID {} for Block Number {} qualifies as a duplicate event.. Discarding the payload {} from being published.",
txId,
blockNumber,
payload);
}

@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,9 @@ public interface EventFulfillment {
* @return responseEntity ResponseEntity Transaction Response
*/
ResponseEntity<ClientResponseModel> replayEvents(
Long startBlockNumber, Long endBlockNumber, String networkName, String eventType);
Long startBlockNumber,
Long endBlockNumber,
String transactionId,
String networkName,
String eventType);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package hlf.java.rest.client.service;

public interface RecencyTransactionContext {

void setTransactionContext(String transactionId);

boolean validateAndRemoveTransactionContext(String transactionId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package hlf.java.rest.client.service.impl;

import com.google.common.cache.Cache;
import hlf.java.rest.client.service.RecencyTransactionContext;

public class DefaultCacheBasedRecencyTransactionContext implements RecencyTransactionContext {

private Cache<String, Object> recencyCache;

public DefaultCacheBasedRecencyTransactionContext(Cache<String, Object> recencyCache) {
this.recencyCache = recencyCache;
}

@Override
public void setTransactionContext(String transactionId) {
recencyCache.put(transactionId, 1);
}

@Override
public boolean validateAndRemoveTransactionContext(String transactionId) {
synchronized (this) {
if (recencyCache.getIfPresent(transactionId) == null) {
return false;
}
recencyCache.invalidate(transactionId);
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ public class EventFulfillmentImpl implements EventFulfillment {
*/
@Override
public ResponseEntity<ClientResponseModel> replayEvents(
Long startBlockNumber, Long endBlockNumber, String networkName, String eventType) {
Long startBlockNumber,
Long endBlockNumber,
String transactionId,
String networkName,
String eventType) {
log.info(
"Initiate the replay of events since {} until {} on channel {} and type {}",
startBlockNumber,
Expand Down Expand Up @@ -87,8 +91,17 @@ public ResponseEntity<ClientResponseModel> replayEvents(
.forEach(
transactionActionInfo -> {
ChaincodeEvent chaincodeEvent = transactionActionInfo.getEvent();
chaincodeEventListener.listener(
StringUtils.EMPTY, blockInfo, chaincodeEvent, networkName);

if (Objects.isNull(transactionId)
|| chaincodeEvent.getTxId().equals(transactionId)) {
chaincodeEventListener.listener(
StringUtils.EMPTY, blockInfo, chaincodeEvent, networkName);
} else {
log.info(
"Event TransactionID {} does not match the provided TransactionID filter {}. Skipping event.",
chaincodeEvent.getTxId(),
transactionId);
}
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,18 @@ public boolean sendMessage(String msg, String fabricTxId, String eventName, Stri
@Override
public void onSuccess(SendResult<String, String> result) {
log.info(
"Sent message=["
+ msg
+ "] with offset=["
+ result.getRecordMetadata().offset()
+ "]");
"Sent message '{}' to partition {} for offset {}",
msg,
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}

@Override
public void onFailure(Throwable ex) {
log.warn("Unable to send message=[" + msg + "] due to : " + ex.getMessage());
log.error(
"Failed to send message event for Transaction ID {} due to {}",
fabricTxId,
ex.getMessage());
}
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package hlf.java.rest.client.service.impl;

import hlf.java.rest.client.service.RecencyTransactionContext;

public class NoOpRecencyTransactionContext implements RecencyTransactionContext {
@Override
public void setTransactionContext(String transactionId) {
// NO-OP
}

@Override
public boolean validateAndRemoveTransactionContext(String transactionId) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import hlf.java.rest.client.model.MultiDataTransactionPayload;
import hlf.java.rest.client.model.MultiPrivateDataTransactionPayloadValidator;
import hlf.java.rest.client.service.HFClientWrapper;
import hlf.java.rest.client.service.RecencyTransactionContext;
import hlf.java.rest.client.service.TransactionFulfillment;
import hlf.java.rest.client.util.FabricEventParseUtil;
import java.io.IOException;
Expand Down Expand Up @@ -84,6 +85,8 @@ public class TransactionFulfillmentImpl implements TransactionFulfillment {

@Autowired private HFClientWrapper hfClientWrapper;

@Autowired private RecencyTransactionContext recencyTransactionContext;

@Override
public ResponseEntity<ClientResponseModel> initSmartContract(
String networkName,
Expand Down Expand Up @@ -218,6 +221,11 @@ public ResponseEntity<ClientResponseModel> writeTransactionToLedger(
}
}

recencyTransactionContext.setTransactionContext(fabricTransaction.getTransactionId());

log.info(
"Performing Write Transaction to Ledger with Tx ID {}",
fabricTransaction.getTransactionId());
byte[] result = fabricTransaction.submit(transactionParams);
resultString = new String(result, StandardCharsets.UTF_8);
log.info("Transaction Successfully Submitted - Response: " + resultString);
Expand Down Expand Up @@ -283,6 +291,12 @@ public ResponseEntity<ClientResponseModel> writePrivateTransactionToLedger(

transientParam.put(transientKey, jsonPayload.getBytes());
fabricTransaction.setTransient(transientParam);

recencyTransactionContext.setTransactionContext(fabricTransaction.getTransactionId());

log.info(
"Performing Write Transaction to Ledger with Tx ID {}",
fabricTransaction.getTransactionId());
byte[] result = fabricTransaction.submit(collection, transientKey);
resultString = new String(result, StandardCharsets.UTF_8);
log.info("Transaction Successfully Submitted - Response: " + resultString);
Expand Down Expand Up @@ -513,6 +527,11 @@ public ResponseEntity<ClientResponseModel> writeMultiDataTransactionToLedger(
// Map to String Array for dispatching via SDK method
String[] publicDataArgs = publicParamsList.toArray(new String[publicParamsList.size()]);

recencyTransactionContext.setTransactionContext(fabricTransaction.getTransactionId());

log.info(
"Performing Write Transaction to Ledger with Tx ID {}",
fabricTransaction.getTransactionId());
byte[] result = fabricTransaction.submit(publicDataArgs);

resultString = new String(result, StandardCharsets.UTF_8);
Expand Down
5 changes: 5 additions & 0 deletions src/main/resources/application.template
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,17 @@ kafka:
ssl-enabled: boolean
security-protocol: <Only supports SSL>
listenToFailedMessages: boolean <set as true if you wish to recieve errored Transaction records back to this topic>
enableIdempotence: boolean, enable strict Kafka producer idempotence

failed-message-listener: <Note, if you wish to receive errored Transactions to a dedicated topic, these details should be filled up>
brokerHost: <Comma separated list of boostrap servers>
topic: <topic to publish errored Records>
ssl-enabled: boolean
security-protocol: <Only supports SSL>
dedupe:
enable: boolean, if enabled, the runtime instance of Connector utilises an in-memory recency cache that would validate a recent submission of Transaction prior to emitting an event with the matching Transaction ID.
recency-window-size: applicable only if dedupe is enabled, defines the recency cache size.
recency-window-expiry-in-minutes: applicable only if dedupe is enabled, defines the recency cache TTL in minites
---
spring:
profiles: container
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import hlf.java.rest.client.exception.FabricTransactionException;
import hlf.java.rest.client.exception.ServiceException;
import hlf.java.rest.client.model.ClientResponseModel;
import hlf.java.rest.client.service.RecencyTransactionContext;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -42,6 +43,7 @@ public class TransactionFulfillmentImplTest {
@Mock Network network;
@Mock Contract contract;
@Mock Transaction transaction;
@Mock RecencyTransactionContext recencyTransactionContext;

private String testNetworkString = "some string";
private String testContractString = "some string";
Expand Down Expand Up @@ -69,7 +71,9 @@ public void writeTransactionToLedgerTest()
Mockito.when(gatewayConnetion.getNetwork(Mockito.anyString())).thenReturn(network);
Mockito.when(network.getContract(Mockito.anyString())).thenReturn(contract);
Mockito.when(contract.createTransaction(Mockito.anyString())).thenReturn(transaction);
Mockito.when(transaction.getTransactionId()).thenReturn("TX-1234");
Mockito.when(transaction.submit(Mockito.<String>any())).thenReturn(byteArrayResponse);
Mockito.doNothing().when(recencyTransactionContext).setTransactionContext(Mockito.anyString());
/*
* Mockito.when(contract.submitTransaction(Mockito.anyString(),
* Mockito.<String>any())) .thenReturn(byteArrayResponse);
Expand All @@ -92,7 +96,9 @@ public void writeTransactionToLedgerContractExceptionTest()
Mockito.when(gatewayConnetion.getNetwork(Mockito.anyString())).thenReturn(network);
Mockito.when(network.getContract(Mockito.anyString())).thenReturn(contract);
Mockito.when(contract.createTransaction(Mockito.anyString())).thenReturn(transaction);
Mockito.when(transaction.getTransactionId()).thenReturn("TX-1234");
Mockito.when(transaction.submit(Mockito.<String>any())).thenThrow(new ContractException(""));
Mockito.doNothing().when(recencyTransactionContext).setTransactionContext(Mockito.anyString());
/*
* Mockito.when(contract.submitTransaction(Mockito.anyString(),
* Mockito.<String>any())) .thenThrow(new ContractException(""));
Expand All @@ -115,7 +121,9 @@ public void writeTransactionToLedgerTimeoutExceptionTest()
Mockito.when(gatewayConnetion.getNetwork(Mockito.anyString())).thenReturn(network);
Mockito.when(network.getContract(Mockito.anyString())).thenReturn(contract);
Mockito.when(contract.createTransaction(Mockito.anyString())).thenReturn(transaction);
Mockito.when(transaction.getTransactionId()).thenReturn("TX-1234");
Mockito.when(transaction.submit(Mockito.<String>any())).thenThrow(new TimeoutException());
Mockito.doNothing().when(recencyTransactionContext).setTransactionContext(Mockito.anyString());
/*
* Mockito.when(contract.submitTransaction(Mockito.anyString(),
* Mockito.<String>any())) .thenThrow(new TimeoutException());
Expand All @@ -138,7 +146,9 @@ public void writeTransactionToLedgerInterruptExceptionTest()
Mockito.when(gatewayConnetion.getNetwork(Mockito.anyString())).thenReturn(network);
Mockito.when(network.getContract(Mockito.anyString())).thenReturn(contract);
Mockito.when(contract.createTransaction(Mockito.anyString())).thenReturn(transaction);
Mockito.when(transaction.getTransactionId()).thenReturn("TX-1234");
Mockito.when(transaction.submit(Mockito.<String>any())).thenThrow(new InterruptedException());
Mockito.doNothing().when(recencyTransactionContext).setTransactionContext(Mockito.anyString());
/*
* Mockito.when(contract.submitTransaction(Mockito.anyString(),
* Mockito.<String>any())) .thenThrow(new InterruptedException());
Expand Down
Loading

0 comments on commit 96b353f

Please sign in to comment.