diff --git a/src/main/java/hlf/java/rest/client/config/FabricProperties.java b/src/main/java/hlf/java/rest/client/config/FabricProperties.java index cb34d65..0b0f5d4 100644 --- a/src/main/java/hlf/java/rest/client/config/FabricProperties.java +++ b/src/main/java/hlf/java/rest/client/config/FabricProperties.java @@ -68,14 +68,21 @@ public static class Events { // preferred for providing Chaincode details for Event subscription private List chaincode; private boolean standardCCEventEnabled; - private List block; + private List blockDetails; private List chaincodeDetails; } + @Data + public static class BlockDetails { + private String channelName; + private List listenerTopics; + } + @Data public static class ChaincodeDetails { private String channelName; private String chaincodeId; + private List listenerTopics; } /** diff --git a/src/main/java/hlf/java/rest/client/config/KafkaConsumerErrorHandler.java b/src/main/java/hlf/java/rest/client/config/KafkaConsumerErrorHandler.java index 83ca30a..c155186 100644 --- a/src/main/java/hlf/java/rest/client/config/KafkaConsumerErrorHandler.java +++ b/src/main/java/hlf/java/rest/client/config/KafkaConsumerErrorHandler.java @@ -7,6 +7,7 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.Consumer; @@ -22,6 +23,7 @@ import org.springframework.kafka.listener.ConsumerAwareRecordRecoverer; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.util.CollectionUtils; import org.springframework.util.backoff.FixedBackOff; @Configuration @@ -59,11 +61,16 @@ public CommonErrorHandler topicTransactionErrorHandler() { deadLetterPublishingRecoverer = generateRecordRecovererWithPublisher(kafkaProperties.getFailedMessageListener()); - } else if (Objects.nonNull(kafkaProperties.getEventListener()) - && kafkaProperties.getEventListener().isListenToFailedMessages()) { - - deadLetterPublishingRecoverer = - generateRecordRecovererWithPublisher(kafkaProperties.getEventListener()); + } else if (!CollectionUtils.isEmpty(kafkaProperties.getEventListeners())) { + + Optional eventProducerOptional = + kafkaProperties.getEventListeners().stream() + .filter(KafkaProperties.EventProducer::isListenToFailedMessages) + .findAny(); + if (eventProducerOptional.isPresent()) { + deadLetterPublishingRecoverer = + generateRecordRecovererWithPublisher(eventProducerOptional.get()); + } } /* @@ -103,7 +110,7 @@ public void accept( private DeadLetterPublishingRecoverer generateRecordRecovererWithPublisher( KafkaProperties.Producer destination) { - KafkaTemplate deadLetterPublisherTemplate = + KafkaTemplate deadLetterPublisherTemplate = new KafkaTemplate<>(kafkaProducerConfig.eventProducerFactory(destination)); deadLetterPublisherTemplate.setDefaultTopic(destination.getTopic()); diff --git a/src/main/java/hlf/java/rest/client/config/KafkaProducerConfig.java b/src/main/java/hlf/java/rest/client/config/KafkaProducerConfig.java index 7e202df..5a9d3f1 100644 --- a/src/main/java/hlf/java/rest/client/config/KafkaProducerConfig.java +++ b/src/main/java/hlf/java/rest/client/config/KafkaProducerConfig.java @@ -2,7 +2,9 @@ import io.micrometer.core.instrument.MeterRegistry; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; +import java.util.regex.Pattern; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.beans.factory.annotation.Autowired; @@ -10,26 +12,26 @@ import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.support.GenericApplicationContext; import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.MicrometerProducerListener; import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.core.RoutingKafkaTemplate; /** This class is the configuration class for sending to Chaincode event to eventHub/Kafka Topic. */ @Slf4j @Configuration -@ConditionalOnProperty("kafka.event-listener.brokerHost") +@ConditionalOnProperty("kafka.event-listeners[0].brokerHost") @RefreshScope public class KafkaProducerConfig extends BaseKafkaConfig { - private static final String PRODUCER_ALL_ACKS = "all"; private static final int RETRIES_CONFIG_FOR_AT_MOST_ONCE = 0; @Autowired private KafkaProperties kafkaProperties; @Autowired private MeterRegistry meterRegistry; - public ProducerFactory eventProducerFactory( + public ProducerFactory eventProducerFactory( KafkaProperties.Producer kafkaProducerProperties) { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProducerProperties.getBrokerHost()); @@ -60,7 +62,7 @@ public ProducerFactory eventProducerFactory( log.info("Generating Kafka producer factory.."); - DefaultKafkaProducerFactory defaultKafkaProducerFactory = + DefaultKafkaProducerFactory defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(props); defaultKafkaProducerFactory.addListener(new MicrometerProducerListener<>(meterRegistry)); @@ -69,8 +71,19 @@ public ProducerFactory eventProducerFactory( @Bean @RefreshScope - public KafkaTemplate kafkaTemplate() { - return new KafkaTemplate<>(eventProducerFactory(kafkaProperties.getEventListener())); + public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context) { + + Map> map = new LinkedHashMap<>(); + for (KafkaProperties.EventProducer eventProducer : kafkaProperties.getEventListeners()) { + ProducerFactory defaultKafkaProducerFactory = + eventProducerFactory(eventProducer); + context.registerBean( + eventProducer.getTopic() + "PF", + ProducerFactory.class, + () -> defaultKafkaProducerFactory); + map.put(Pattern.compile(eventProducer.getTopic()), defaultKafkaProducerFactory); + } + return new RoutingKafkaTemplate(map); } @Override diff --git a/src/main/java/hlf/java/rest/client/config/KafkaProperties.java b/src/main/java/hlf/java/rest/client/config/KafkaProperties.java index d909834..06bedb0 100644 --- a/src/main/java/hlf/java/rest/client/config/KafkaProperties.java +++ b/src/main/java/hlf/java/rest/client/config/KafkaProperties.java @@ -20,7 +20,7 @@ public class KafkaProperties { private List integrationPoints; - private EventProducer eventListener; + private List eventListeners; private Producer failedMessageListener; @Getter diff --git a/src/main/java/hlf/java/rest/client/listener/FabricEventListener.java b/src/main/java/hlf/java/rest/client/listener/FabricEventListener.java index c62296e..b01f1d0 100644 --- a/src/main/java/hlf/java/rest/client/listener/FabricEventListener.java +++ b/src/main/java/hlf/java/rest/client/listener/FabricEventListener.java @@ -47,13 +47,15 @@ public void onRefresh(RefreshScopeRefreshedEvent event) { startEventListener(); } - public void startEventListener() { + private void startEventListener() { try { - List blockChannelNames = fabricProperties.getEvents().getBlock(); - if (!CollectionUtils.isEmpty(blockChannelNames)) { + List blockDetailsList = + fabricProperties.getEvents().getBlockDetails(); + if (!CollectionUtils.isEmpty(blockDetailsList)) { - for (String channelName : blockChannelNames) { + for (FabricProperties.BlockDetails blockDetails : blockDetailsList) { + String channelName = blockDetails.getChannelName(); log.info("channel names {}", channelName); Network network = gateway.getNetwork(channelName); diff --git a/src/main/java/hlf/java/rest/client/service/EventPublishService.java b/src/main/java/hlf/java/rest/client/service/EventPublishService.java index 554b807..99c0d75 100644 --- a/src/main/java/hlf/java/rest/client/service/EventPublishService.java +++ b/src/main/java/hlf/java/rest/client/service/EventPublishService.java @@ -6,7 +6,7 @@ * The EventPublishService is a service class, which include the kafka template. It sends the * Message to the Event Kafka message topic */ -@ConditionalOnProperty("kafka.event-listener.brokerHost") +@ConditionalOnProperty("kafka.event-listeners[0].brokerHost") public interface EventPublishService { /** @@ -25,9 +25,8 @@ boolean sendMessage( * @param eventName String chaincode event-name * @param channelName String Name of the channel where the event was generated. * @param messageKey associated key for the payload. - * @return status boolean status of msg sent */ - boolean publishChaincodeEvents( + void publishChaincodeEvents( final String payload, String chaincodeName, String fabricTxId, @@ -42,9 +41,8 @@ boolean publishChaincodeEvents( * @param channelName String Name of the channel where the event was generated. * @param functionName String Name of the function name. * @param isPrivateDataPresent boolean flag to check if privateData present in payload - * @return status boolean status of msg sent */ - boolean publishBlockEvents( + void publishBlockEvents( final String payload, String fabricTxId, String channelName, diff --git a/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java b/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java index 3639069..4b912ac 100644 --- a/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java +++ b/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java @@ -4,27 +4,29 @@ import hlf.java.rest.client.config.KafkaProperties; import hlf.java.rest.client.service.EventPublishService; import hlf.java.rest.client.util.FabricClientConstants; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.internals.RecordHeader; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.RoutingKafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; @Slf4j @Service("eventPublishService") -@ConditionalOnProperty("kafka.event-listener.topic") +@ConditionalOnProperty("kafka.event-listeners[0].topic") public class EventPublishServiceImpl implements EventPublishService { @Autowired private KafkaProperties kafkaProperties; @Autowired private FabricProperties fabricProperties; - @Autowired private KafkaTemplate kafkaTemplate; + @Autowired private RoutingKafkaTemplate routingKafkaTemplate; @Override public boolean sendMessage(String msg, String fabricTxId, String eventName, String channelName) { @@ -35,9 +37,11 @@ public boolean sendMessage(String msg, String fabricTxId, String eventName, Stri try { - ProducerRecord producerRecord = - new ProducerRecord( - kafkaProperties.getEventListener().getTopic(), String.valueOf(msg.hashCode()), msg); + ProducerRecord producerRecord = + new ProducerRecord<>( + kafkaProperties.getEventListeners().get(0).getTopic(), + String.valueOf(msg.hashCode()), + msg); producerRecord .headers() @@ -50,13 +54,14 @@ public boolean sendMessage(String msg, String fabricTxId, String eventName, Stri .headers() .add(new RecordHeader(FabricClientConstants.FABRIC_CHANNEL_NAME, channelName.getBytes())); - ListenableFuture> future = kafkaTemplate.send(producerRecord); + ListenableFuture> future = + routingKafkaTemplate.send(producerRecord); future.addCallback( - new ListenableFutureCallback>() { + new ListenableFutureCallback>() { @Override - public void onSuccess(SendResult result) { + public void onSuccess(SendResult result) { log.info( "Sent message '{}' to partition {} for offset {}", msg, @@ -82,18 +87,51 @@ public void onFailure(Throwable ex) { } @Override - public boolean publishChaincodeEvents( + public void publishChaincodeEvents( String payload, String chaincodeName, String fabricTxId, String eventName, String channelName, String messageKey) { - boolean status = true; + Optional optionalChaincodeDetails = + fabricProperties.getEvents().getChaincodeDetails().stream() + .filter( + chaincodeDetail -> + chaincodeDetail.getChannelName().equals(channelName) + && chaincodeDetail.getChaincodeId().equals(chaincodeName)) + .findAny(); + + if (!optionalChaincodeDetails.isPresent() + || CollectionUtils.isEmpty(optionalChaincodeDetails.get().getListenerTopics())) { + sendMessage( + kafkaProperties.getEventListeners().get(0).getTopic(), + payload, + chaincodeName, + fabricTxId, + eventName, + channelName, + messageKey); + return; + } + + for (String topic : optionalChaincodeDetails.get().getListenerTopics()) { + sendMessage(topic, payload, chaincodeName, fabricTxId, eventName, channelName, messageKey); + } + } + + private void sendMessage( + String topic, + String payload, + String chaincodeName, + String fabricTxId, + String eventName, + String channelName, + String messageKey) { try { - ProducerRecord producerRecord = - new ProducerRecord<>(kafkaProperties.getEventListener().getTopic(), messageKey, payload); + ProducerRecord producerRecord = + new ProducerRecord<>(topic, messageKey, payload); producerRecord .headers() @@ -118,17 +156,16 @@ public boolean publishChaincodeEvents( FabricClientConstants.FABRIC_EVENT_TYPE, FabricClientConstants.FABRIC_EVENT_TYPE_CHAINCODE.getBytes())); - log.info( - "Publishing Chaincode event to outbound topic {}", - kafkaProperties.getEventListener().getTopic()); + log.info("Publishing Chaincode event to outbound topic {}", topic); - ListenableFuture> future = kafkaTemplate.send(producerRecord); + ListenableFuture> future = + routingKafkaTemplate.send(producerRecord); future.addCallback( - new ListenableFutureCallback>() { + new ListenableFutureCallback>() { @Override - public void onSuccess(SendResult result) { + public void onSuccess(SendResult result) { log.info( "Sent message '{}' to partition {} for offset {}", payload, @@ -146,30 +183,61 @@ public void onFailure(Throwable ex) { }); } catch (Exception ex) { - status = false; log.error("Error sending message - " + ex.getMessage()); } - - return status; } @Override - public boolean publishBlockEvents( + public void publishBlockEvents( String payload, String fabricTxId, String channelName, String chaincodeName, String functionName, Boolean isPrivateDataPresent) { - boolean status = true; + Optional optionalBlockDetails = + fabricProperties.getEvents().getBlockDetails().stream() + .filter(blockDetails -> blockDetails.getChannelName().equals(channelName)) + .findAny(); + + if (!optionalBlockDetails.isPresent() + || CollectionUtils.isEmpty(optionalBlockDetails.get().getListenerTopics())) { + sendMessage( + kafkaProperties.getEventListeners().get(0).getTopic(), + payload, + fabricTxId, + channelName, + chaincodeName, + functionName, + isPrivateDataPresent); + return; + } + + for (String topic : optionalBlockDetails.get().getListenerTopics()) { + sendMessage( + topic, + payload, + fabricTxId, + channelName, + chaincodeName, + functionName, + isPrivateDataPresent); + } + } + + private void sendMessage( + String topic, + String payload, + String fabricTxId, + String channelName, + String chaincodeName, + String functionName, + Boolean isPrivateDataPresent) { try { - ProducerRecord producerRecord = - new ProducerRecord<>( - kafkaProperties.getEventListener().getTopic(), - String.valueOf(payload.hashCode()), - payload); + ProducerRecord producerRecord = + new ProducerRecord<>(topic, String.valueOf(payload.hashCode()), payload); producerRecord .headers() @@ -204,17 +272,16 @@ public boolean publishBlockEvents( FabricClientConstants.IS_PRIVATE_DATA_PRESENT, isPrivateDataPresent.toString().getBytes())); - log.info( - "Publishing Block event to outbound topic {}", - kafkaProperties.getEventListener().getTopic()); + log.info("Publishing Block event to outbound topic {}", topic); - ListenableFuture> future = kafkaTemplate.send(producerRecord); + ListenableFuture> future = + routingKafkaTemplate.send(producerRecord); future.addCallback( - new ListenableFutureCallback>() { + new ListenableFutureCallback>() { @Override - public void onSuccess(SendResult result) { + public void onSuccess(SendResult result) { log.info( "Sent message '{}' to partition {} for offset {}", payload, @@ -232,10 +299,7 @@ public void onFailure(Throwable ex) { }); } catch (Exception ex) { - status = false; log.error("Error sending message - " + ex.getMessage()); } - - return status; } } diff --git a/src/main/resources/application.template b/src/main/resources/application.template index 7f70104..140b17e 100644 --- a/src/main/resources/application.template +++ b/src/main/resources/application.template @@ -32,13 +32,15 @@ fabric: enable: true standardCCEventEnabled: boolean (if set to true then the chaincode event is attempted at deserializing in the connector) chaincode: (Note : Will soon be deprecated / removed) - block: + blockDetails: + - channelName: Name of the Channel + listenerTopics: topics to which event messages will be sent chaincodeDetails: - - - channelName: Name of the Channel - chaincodeId: chaincode-id of the deployed chaincode in this Channel + - channelName: Name of the Channel + chaincodeId: chaincode-id of the deployed chaincode in this Channel + listenerTopics: topics to which event messages will be sent kafka: - integration-points: - groupId: test_group_id enableParallelListenerCapabilities: boolean topicPartitions: @@ -47,14 +49,14 @@ kafka: ssl-enabled: boolean security-protocol: ssl-keystore-base64: - ssl-truststore-base64: + ssl-truststore-base64: offsetResetPolicy: if not provided, default will be latest. - event-listener: + event-listeners: brokerHost: - topic: + topic: ssl-enabled: boolean security-protocol: - listenToFailedMessages: boolean + listenToFailedMessages: boolean enableIdempotence: boolean, enable strict Kafka producer idempotence failed-message-listener: @@ -65,7 +67,7 @@ kafka: dedupe: enable: boolean, if enabled, the runtime instance of Connector utilises an in-memory recency cache that would validate a recent submission of Transaction prior to emitting an event with the matching Transaction ID. recency-window-size: applicable only if dedupe is enabled, defines the recency cache size. - recency-window-expiry-in-minutes: applicable only if dedupe is enabled, defines the recency cache TTL in minites + recency-window-expiry-in-minutes: applicable only if dedupe is enabled, defines the recency cache TTL in minutes --- spring: profiles: container diff --git a/src/test/java/hlf/java/rest/client/integration/ConfigHandlerControllerIntegrationTest.java b/src/test/java/hlf/java/rest/client/integration/ConfigHandlerControllerIntegrationTest.java index dac61a3..3fb774d 100644 --- a/src/test/java/hlf/java/rest/client/integration/ConfigHandlerControllerIntegrationTest.java +++ b/src/test/java/hlf/java/rest/client/integration/ConfigHandlerControllerIntegrationTest.java @@ -54,7 +54,8 @@ void testContextRefresh() throws Exception { Assertions.assertEquals( TestConfiguration.FABRIC_PROPERTIES_EVENTS, fabricProperties.getEvents().toString()); Assertions.assertEquals( - TestConfiguration.KAFKA_PROPERTIES_PRODUCER, kafkaProperties.getEventListener().toString()); + TestConfiguration.KAFKA_PROPERTIES_PRODUCER, + kafkaProperties.getEventListeners().get(0).toString()); Assertions.assertEquals( TestConfiguration.KAFKA_CONSUMER_PROPERTIES, kafkaProperties.getIntegrationPoints().toString()); @@ -81,7 +82,7 @@ private static class TestConfiguration { static String FABRIC_PROPERTIES_CLIENT = "FabricProperties.Client(rest=FabricProperties.Client.Rest(apikey=expected-key))"; static String FABRIC_PROPERTIES_EVENTS = - "FabricProperties.Events(enable=true, chaincode=[chaincode12, chaincode2], standardCCEventEnabled=false, block=[block111, block2], chaincodeDetails=null)"; + "FabricProperties.Events(enable=true, chaincode=[chaincode12, chaincode2], standardCCEventEnabled=false, blockDetails=[FabricProperties.BlockDetails(channelName=block111, listenerTopics=[topic-1])], chaincodeDetails=null)"; static String KAFKA_PROPERTIES_PRODUCER = "Producer{brokerHost='localhost:8087', topic='hlf-offchain-topic1', saslJaasConfig='null'}"; static String KAFKA_CONSUMER_PROPERTIES = diff --git a/src/test/resources/application-default-consumer.yml b/src/test/resources/application-default-consumer.yml index f447865..ee4eb12 100644 --- a/src/test/resources/application-default-consumer.yml +++ b/src/test/resources/application-default-consumer.yml @@ -15,7 +15,7 @@ fabric: enable: true chaincode: [] chaincodeDetails: [] - block: [] + blockDetails: [] client: rest: apikey: 6uoIAnR @@ -26,12 +26,12 @@ kafka: topic: test-consumer-inbound-topic config-id: 892019 ssl-enabled: false - event-listener: - ssl-enabled: false - brokerHost: localhost:9092 - topic: test-publisher-event-topice - enable-idempotence: true - enable-at-most-once-semantics: true + event-listeners: + - ssl-enabled: false + brokerHost: localhost:9092 + topic: test-publisher-event-topice + enable-idempotence: true + enable-at-most-once-semantics: true management: endpoints: web: diff --git a/src/test/resources/application-per-partition-consumer.yml b/src/test/resources/application-per-partition-consumer.yml index 7f0f77b..ff3d5e7 100644 --- a/src/test/resources/application-per-partition-consumer.yml +++ b/src/test/resources/application-per-partition-consumer.yml @@ -15,7 +15,7 @@ fabric: enable: true chaincode: [] chaincodeDetails: [] - block: [] + blockDetails: [] client: rest: apikey: 6uoIAnR @@ -27,11 +27,11 @@ kafka: config-id: 892019 ssl-enabled: false topicPartitions: 12 - event-listener: - ssl-enabled: false - brokerHost: localhost:9092 - topic: test-publisher-event-topic - enable-idempotence: true + event-listeners: + - ssl-enabled: false + brokerHost: localhost:9092 + topic: test-publisher-event-topic + enable-idempotence: true failed-message-listener: brokerHost: localhost:9092 topic: test-consumer-dlt diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml index 92d67c1..212839f 100644 --- a/src/test/resources/application.yml +++ b/src/test/resources/application.yml @@ -43,11 +43,11 @@ kafka: brokerHost: localhost:9094 groupId: fabric-consumer topic: hlf-integration-topic2 - event-listener: - brokerHost: localhost:9093 - topic: hlf-offchain-topic - enable-idempotence: true - enable-at-most-once-semantics: true + event-listeners: + - brokerHost: localhost:9093 + topic: hlf-offchain-topic + enable-idempotence: true + enable-at-most-once-semantics: true dedupe: enable: false --- @@ -67,7 +67,9 @@ fabric: events: enable: false chaincode: channel1 - block: channel1 + blockDetails: + - channelName: channel1 + listenerTopics: topic-1 client: rest: apikey: abc diff --git a/src/test/resources/integration/sample-application.yml b/src/test/resources/integration/sample-application.yml index 7544a1e..c4b8f0c 100644 --- a/src/test/resources/integration/sample-application.yml +++ b/src/test/resources/integration/sample-application.yml @@ -38,7 +38,9 @@ fabric: enable: true standardCCEventEnabled: false chaincode: chaincode12, chaincode2 - block: block111, block2 + blockDetails: + - channelName: block111 + listenerTopics: topic-1 kafka: integration-points: - @@ -49,9 +51,9 @@ kafka: brokerHost: localhost:8087 groupId: fabric-consumer1 topic: hlf-integration-topic21 - event-listener: - brokerHost: localhost:8087 - topic: hlf-offchain-topic1 + event-listeners: + - brokerHost: localhost:8087 + topic: hlf-offchain-topic1 ---