Skip to content

Commit

Permalink
Remove explicit publish of failed message to EventPublishService
Browse files Browse the repository at this point in the history
Signed-off-by: “Nithin <nithin.pankaj@walmartlabs.com>
  • Loading branch information
“Nithin committed Nov 16, 2023
1 parent d0d1741 commit f2e14bb
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import hlf.java.rest.client.exception.ServiceException;
import hlf.java.rest.client.metrics.EmitKafkaCustomMetrics;
import hlf.java.rest.client.model.MultiDataTransactionPayload;
import hlf.java.rest.client.service.EventPublishService;
import hlf.java.rest.client.service.TransactionFulfillment;
import hlf.java.rest.client.util.FabricClientConstants;
import java.nio.charset.StandardCharsets;
Expand All @@ -30,11 +29,8 @@ public class TransactionConsumer {
private static final String PAYLOAD_KIND = "payload_kind";
private static final String PL_KIND_MULTI_DATA = "multi_data";

@Autowired TransactionFulfillment transactionFulfillment;
@Autowired ObjectMapper objectMapper;

@Autowired(required = false)
EventPublishService eventPublishServiceImpl;
@Autowired private TransactionFulfillment transactionFulfillment;
@Autowired private ObjectMapper objectMapper;

/**
* This method routes the kafka messages to appropriate methods and acknowledges once processing
Expand Down Expand Up @@ -125,9 +121,9 @@ public void listen(ConsumerRecord<String, String> message) {
if (isIdentifiableFunction(networkName, contractName, transactionFunctionName)
&& !transactionParams.isEmpty()) {

if (null != peerNames && !peerNames.isEmpty()) {
if (!peerNames.isEmpty()) {
List<String> lstPeerNames = Arrays.asList(peerNames.split(","));
if (null != lstPeerNames && !lstPeerNames.isEmpty()) {
if (!lstPeerNames.isEmpty()) {
if (StringUtils.isNotBlank(collections) && StringUtils.isNotBlank(transientKey)) {
transactionFulfillment.writePrivateTransactionToLedger(
networkName,
Expand Down Expand Up @@ -166,13 +162,19 @@ public void listen(ConsumerRecord<String, String> message) {
}

} else {
log.info("Incorrect Transaction Payload");
log.error("Incorrect Transaction Payload");
throw new ServiceException(
ErrorCode.VALIDATION_FAILED,
"Inbound transaction format is incorrect or doesn't contain valid parameters.");
}

} catch (FabricTransactionException fte) {
eventPublishServiceImpl.publishTransactionFailureEvent(
fte.getMessage(), networkName, contractName, transactionFunctionName, transactionParams);
log.error("Error in Submitting Transaction - Exception - " + fte.getMessage());
/*
If the error handler has dead letter publish enabled, the errored Record header will be enriched by extracting
the error cause and message from the thrown exception.
*/
throw fte;
} catch (Exception ex) {
log.error("Error in Kafka Listener - Message Format exception - " + ex.getMessage());
throw new ServiceException(ErrorCode.HYPERLEDGER_FABRIC_TRANSACTION_ERROR, ex.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

/**
* The EventPublishService is a service class, which include the kafka template. It sends the
* Message to the the Event Kafka message topic
* Message to the Event Kafka message topic
*/
@ConditionalOnProperty("kafka.event-listener.brokerHost")
public interface EventPublishService {
Expand Down Expand Up @@ -49,19 +49,4 @@ boolean publishBlockEvents(
String chaincodeName,
String functionName,
Boolean isPrivateDataPresent);

/**
* @param errorMsg contents of the error message
* @param networkName channel name in Fabric
* @param contractName chaincode name in the Fabric
* @param functionName function name in a given chaincode.
* @param parameters parameters sent to the chaincode
* @return status of the published message to Kafka, successful or not
*/
boolean publishTransactionFailureEvent(
String errorMsg,
String networkName,
String contractName,
String functionName,
String parameters);
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,73 +222,4 @@ public void onFailure(Throwable ex) {

return status;
}

@Override
public boolean publishTransactionFailureEvent(
String errorMsg,
String networkName,
String contractName,
String functionName,
String parameters) {
boolean status = true;

try {

ProducerRecord<String, String> producerRecord =
new ProducerRecord<String, String>(topicName, functionName, parameters);

producerRecord
.headers()
.add(new RecordHeader(FabricClientConstants.ERROR_MSG, errorMsg.getBytes()));

producerRecord
.headers()
.add(
new RecordHeader(
FabricClientConstants.FABRIC_CHAINCODE_NAME, contractName.getBytes()));
producerRecord
.headers()
.add(new RecordHeader(FabricClientConstants.FABRIC_CHANNEL_NAME, networkName.getBytes()));

producerRecord
.headers()
.add(
new RecordHeader(
FabricClientConstants.FABRIC_EVENT_TYPE,
FabricClientConstants.FABRIC_EVENT_TYPE_ERROR.getBytes()));

producerRecord
.headers()
.add(
new RecordHeader(
FabricClientConstants.FABRIC_EVENT_FUNC_NAME, functionName.getBytes()));

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(producerRecord);

future.addCallback(
new ListenableFutureCallback<SendResult<String, String>>() {

@Override
public void onSuccess(SendResult<String, String> result) {
log.info(
"Sent message=["
+ parameters
+ "] with offset=["
+ result.getRecordMetadata().offset()
+ "]");
}

@Override
public void onFailure(Throwable ex) {
log.error("Unable to send message=[" + parameters + "] due to : " + ex.getMessage());
}
});

} catch (Exception ex) {
status = false;
log.error("Error sending message - " + ex.getMessage());
}

return status;
}
}

0 comments on commit f2e14bb

Please sign in to comment.