Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose Kafka Consumer & Producer metrics to Metrics Endpoint. Introduce custom error counters for Invalid Txn Payload & Processing Failures #137

Merged
merged 6 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions src/main/java/hlf/java/rest/client/config/BaseKafkaConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package hlf.java.rest.client.config;

import hlf.java.rest.client.util.FabricClientConstants;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs;

@Slf4j
public abstract class BaseKafkaConfig {

enum ConfigType {
PRODUCER,
CONSUMER
}

protected void configureSaslProperties(Map<String, Object> props, String saslJaasConfig) {
if (StringUtils.isNotEmpty(saslJaasConfig)) {
props.put(
FabricClientConstants.KAFKA_SECURITY_PROTOCOL_KEY,
FabricClientConstants.KAFKA_SECURITY_PROTOCOL_VALUE);
props.put(
FabricClientConstants.KAFKA_SASL_MECHANISM_KEY,
FabricClientConstants.KAFKA_SASL_MECHANISM_VALUE);
props.put(FabricClientConstants.KAFKA_SASL_JASS_ENDPOINT_KEY, saslJaasConfig);
}
}

protected void configureSSLProperties(
Map<String, Object> props,
KafkaProperties.SSLProperties sslProperties,
String topicName,
MeterRegistry sslMetricsRegistry) {

if (sslProperties.isSslAuthRequired()) {
SSLAuthFilesHelper.createSSLAuthFiles(sslProperties);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, sslProperties.getSecurityProtocol());
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslProperties.getSslKeystoreLocation());
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslProperties.getSslKeystorePassword());
props.put(
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslProperties.getSslTruststoreLocation());
props.put(
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslProperties.getSslTruststorePassword());
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, sslProperties.getSslKeyPassword());

try {
Timestamp keyStoreCertExpiryTimestamp =
SSLAuthFilesHelper.getExpiryTimestampForKeyStore(
sslProperties.getSslKeystoreLocation(), sslProperties.getSslKeystorePassword());
Timestamp trustStoreCertExpiryTimestamp =
SSLAuthFilesHelper.getExpiryTimestampForKeyStore(
sslProperties.getSslTruststoreLocation(), sslProperties.getSslTruststorePassword());

String guagePrefix =
getConfigType().equals(ConfigType.CONSUMER) ? "consumer." : "producer.";

Gauge.builder(
guagePrefix + topicName + ".keystore.expiryTs",
keyStoreCertExpiryTimestamp::getTime)
.strongReference(true)
.register(sslMetricsRegistry);

Gauge.builder(
guagePrefix + topicName + ".truststore.expiryTs",
trustStoreCertExpiryTimestamp::getTime)
.strongReference(true)
.register(sslMetricsRegistry);

boolean hasKeyStoreCertExpired =
keyStoreCertExpiryTimestamp.before(Timestamp.from(Instant.now()));
boolean hasTrustStoreCertExpired =
trustStoreCertExpiryTimestamp.before(Timestamp.from(Instant.now()));

Gauge.builder(
guagePrefix + topicName + ".keystore.hasExpired",
hasKeyStoreCertExpired,
BooleanUtils::toInteger)
.strongReference(true)
.register(sslMetricsRegistry);

Gauge.builder(
guagePrefix + topicName + ".truststore.hasExpired",
hasTrustStoreCertExpired,
BooleanUtils::toInteger)
.strongReference(true)
.register(sslMetricsRegistry);

} catch (Exception e) {
log.error(
"Failed to extract expiry details of SSL Certs. Metrics for SSL cert-expiry will not be available.");
}
}
}

protected abstract ConfigType getConfigType();
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package hlf.java.rest.client.config;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnProperty("kafka.integration-points[0].brokerHost")
public class CustomTxnListenerMetricsConfig {

@Bean
public Counter customKafkaSuccessCounter(MeterRegistry meterRegistry) {
return meterRegistry.counter("kafka.messages.processed.messages");
}

@Bean
public Counter invalidInboundTransactionMessageCounter(MeterRegistry meterRegistry) {
return meterRegistry.counter("transaction.messages.unrecognized.failures");
}

@Bean
public Counter inboundTxnProcessingFailureCounter(MeterRegistry meterRegistry) {
return meterRegistry.counter("transaction.messages.process.failures");
}

@Bean
public Counter inboundTxnContractExceptionCounter(MeterRegistry meterRegistry) {
return meterRegistry.counter("transaction.messages.contract.failures");
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package hlf.java.rest.client.config;

import hlf.java.rest.client.util.FabricClientConstants;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.MicrometerConsumerListener;

/*
* This class is the configuration class for setting the properties for the kafka consumers.
Expand All @@ -22,7 +23,9 @@
@Configuration
@ConditionalOnProperty("kafka.integration-points[0].brokerHost")
@RefreshScope
public class KafkaConsumerConfig {
public class KafkaConsumerConfig extends BaseKafkaConfig {

@Autowired private MeterRegistry meterRegistry;

public DefaultKafkaConsumerFactory<String, String> consumerFactory(
KafkaProperties.Consumer kafkaConsumerProperties) {
Expand All @@ -39,48 +42,30 @@ public DefaultKafkaConsumerFactory<String, String> consumerFactory(
FabricClientConstants.KAFKA_INTG_MAX_POLL_INTERVAL);
props.put(
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, FabricClientConstants.KAFKA_INTG_MAX_POLL_RECORDS);

// Azure event-hub config
if (StringUtils.isNotEmpty(kafkaConsumerProperties.getSaslJaasConfig())) {
props.put(
FabricClientConstants.KAFKA_SECURITY_PROTOCOL_KEY,
FabricClientConstants.KAFKA_SECURITY_PROTOCOL_VALUE);
props.put(
FabricClientConstants.KAFKA_SASL_MECHANISM_KEY,
FabricClientConstants.KAFKA_SASL_MECHANISM_VALUE);
props.put(
FabricClientConstants.KAFKA_SASL_JASS_ENDPOINT_KEY,
kafkaConsumerProperties.getSaslJaasConfig());
}
configureSaslProperties(props, kafkaConsumerProperties.getSaslJaasConfig());

if (StringUtils.isNotBlank(kafkaConsumerProperties.getOffsetResetPolicy())) {
props.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConsumerProperties.getOffsetResetPolicy());
}

// Adding SSL configuration if Kafka Cluster is SSL secured
if (kafkaConsumerProperties.isSslAuthRequired()) {
configureSSLProperties(
props, kafkaConsumerProperties, kafkaConsumerProperties.getTopic(), meterRegistry);

SSLAuthFilesCreationHelper.createSSLAuthFiles(kafkaConsumerProperties);
log.info("Generating Kafka consumer factory..");

props.put(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
kafkaConsumerProperties.getSecurityProtocol());
props.put(
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
kafkaConsumerProperties.getSslKeystoreLocation());
props.put(
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
kafkaConsumerProperties.getSslKeystorePassword());
props.put(
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
kafkaConsumerProperties.getSslTruststoreLocation());
props.put(
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
kafkaConsumerProperties.getSslTruststorePassword());
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaConsumerProperties.getSslKeyPassword());
}
DefaultKafkaConsumerFactory<String, String> defaultKafkaConsumerFactory =
new DefaultKafkaConsumerFactory<>(props);
defaultKafkaConsumerFactory.addListener(new MicrometerConsumerListener<>(meterRegistry));

return defaultKafkaConsumerFactory;
}

log.info("Created kafka consumer factory");
return new DefaultKafkaConsumerFactory<>(props);
@Override
protected ConfigType getConfigType() {
return ConfigType.CONSUMER;
}
}
Original file line number Diff line number Diff line change
@@ -1,31 +1,31 @@
package hlf.java.rest.client.config;

import hlf.java.rest.client.util.FabricClientConstants;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.MicrometerProducerListener;
import org.springframework.kafka.core.ProducerFactory;

/** This class is the configuration class for sending to Chaincode event to eventHub/Kafka Topic. */
@Slf4j
@Configuration
@ConditionalOnProperty("kafka.event-listener.brokerHost")
@RefreshScope
public class KafkaProducerConfig {
public class KafkaProducerConfig extends BaseKafkaConfig {

@Autowired private KafkaProperties kafkaProperties;

@Autowired private MeterRegistry meterRegistry;

public ProducerFactory<String, String> eventProducerFactory(
KafkaProperties.Producer kafkaProducerProperties) {
Map<String, Object> props = new HashMap<>();
Expand All @@ -37,49 +37,31 @@ public ProducerFactory<String, String> eventProducerFactory(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer.class);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, Boolean.FALSE);

// Azure event-hub config
if (StringUtils.isNotEmpty(kafkaProducerProperties.getSaslJaasConfig())) {
props.put(
FabricClientConstants.KAFKA_SECURITY_PROTOCOL_KEY,
FabricClientConstants.KAFKA_SECURITY_PROTOCOL_VALUE);
props.put(
FabricClientConstants.KAFKA_SASL_MECHANISM_KEY,
FabricClientConstants.KAFKA_SASL_MECHANISM_VALUE);
props.put(
FabricClientConstants.KAFKA_SASL_JASS_ENDPOINT_KEY,
kafkaProducerProperties.getSaslJaasConfig());
}
configureSaslProperties(props, kafkaProducerProperties.getSaslJaasConfig());

// Adding SSL configuration if Kafka Cluster is SSL secured
if (kafkaProducerProperties.isSslAuthRequired()) {
configureSSLProperties(
props, kafkaProducerProperties, kafkaProducerProperties.getTopic(), meterRegistry);

SSLAuthFilesCreationHelper.createSSLAuthFiles(kafkaProducerProperties);
log.info("Generating Kafka producer factory..");

props.put(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
kafkaProducerProperties.getSecurityProtocol());
props.put(
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
kafkaProducerProperties.getSslKeystoreLocation());
props.put(
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
kafkaProducerProperties.getSslKeystorePassword());
props.put(
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
kafkaProducerProperties.getSslTruststoreLocation());
props.put(
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
kafkaProducerProperties.getSslTruststorePassword());
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaProducerProperties.getSslKeyPassword());
}
DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory =
new DefaultKafkaProducerFactory<>(props);
defaultKafkaProducerFactory.addListener(new MicrometerProducerListener<>(meterRegistry));

log.info("Created kafka producer factory");
return new DefaultKafkaProducerFactory<>(props);
return defaultKafkaProducerFactory;
}

@Bean
@RefreshScope
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(eventProducerFactory(kafkaProperties.getEventListener()));
}

@Override
protected ConfigType getConfigType() {
return ConfigType.PRODUCER;
}
}
Loading
Loading