From 2efde819c7ce4b762f4e2f9bf7c7071f17250ba4 Mon Sep 17 00:00:00 2001 From: Ljupco Vangelski Date: Mon, 6 May 2024 13:25:24 +0200 Subject: [PATCH] [#4167] Kafka certificate authentication (#4168) --- .../helm/templates/backend/deployment.yaml | 10 ++++++ .../contacts/helm/templates/deployment.yaml | 10 ++++++ .../facebook/helm/templates/deployments.yaml | 20 +++++++++++ .../google/helm/templates/deployments.yaml | 20 +++++++++++ .../helm/templates/deployment.yaml | 10 ++++++ .../helm/templates/deployment.yaml | 10 ++++++ .../streams/helm/templates/deployment.yaml | 10 ++++++ .../twilio/helm/templates/deployments.yaml | 10 ++++++ .../viber/helm/templates/deployments.yaml | 10 ++++++ .../webhook/helm/templates/deployments.yaml | 10 ++++++ .../whatsapp/helm/templates/deployments.yaml | 10 ++++++ .../docs/getting-started/installation/helm.md | 33 +++++++++++++++++++ .../components/api-admin/deployment.yaml | 10 ++++++ .../api-communication/deployment.yaml | 10 ++++++ .../api-components-installer/deployment.yaml | 11 ++++++- .../components/api-websocket/deployment.yaml | 10 ++++++ .../components/unread-counter/deployment.yaml | 10 ++++++ .../helm-chart/templates/config/kafka.yaml | 1 + infrastructure/helm-chart/values.yaml | 1 + .../airy/kafka/core/KafkaConsumerWrapper.java | 16 ++++++++- .../kafka/streams/KafkaStreamsWrapper.java | 19 ++++++++++- .../spring/kafka/core/KafkaCoreConfig.java | 21 ++++++++++-- .../kafka/streams/KafkaStreamsConfig.java | 5 ++- 23 files changed, 270 insertions(+), 7 deletions(-) diff --git a/backend/components/chat-plugin/helm/templates/backend/deployment.yaml b/backend/components/chat-plugin/helm/templates/backend/deployment.yaml index 1f8995b817..a82f25fd2f 100644 --- a/backend/components/chat-plugin/helm/templates/backend/deployment.yaml +++ b/backend/components/chat-plugin/helm/templates/backend/deployment.yaml @@ -51,6 +51,11 @@ spec: initialDelaySeconds: 120 periodSeconds: 10 failureThreshold: 3 +{{ if .Values.global.kafkaCertAuth }} + volumeMounts: + - name: kafka-config-certs + mountPath: /opt/kafka/certs +{{ end }} resources: {{ toYaml .Values.backend.resources | indent 10 }} initContainers: @@ -68,3 +73,8 @@ spec: - name: provisioning-scripts configMap: name: provisioning-scripts +{{ if .Values.global.kafkaCertAuth }} + - name: kafka-config-certs + configMap: + name: kafka-config-certs +{{ end }} \ No newline at end of file diff --git a/backend/components/contacts/helm/templates/deployment.yaml b/backend/components/contacts/helm/templates/deployment.yaml index 5e4c6846a8..b0d2e68d9b 100644 --- a/backend/components/contacts/helm/templates/deployment.yaml +++ b/backend/components/contacts/helm/templates/deployment.yaml @@ -45,6 +45,11 @@ spec: initialDelaySeconds: 120 periodSeconds: 10 failureThreshold: 3 +{{ if .Values.global.kafkaCertAuth }} + volumeMounts: + - name: kafka-config-certs + mountPath: /opt/kafka/certs +{{ end }} resources: {{ toYaml .Values.resources | indent 10 }} initContainers: @@ -62,3 +67,8 @@ spec: - name: provisioning-scripts configMap: name: provisioning-scripts +{{ if .Values.global.kafkaCertAuth }} + - name: kafka-config-certs + configMap: + name: kafka-config-certs +{{ end }} \ No newline at end of file diff --git a/backend/components/facebook/helm/templates/deployments.yaml b/backend/components/facebook/helm/templates/deployments.yaml index 4d72ef5174..a530a8122f 100644 --- a/backend/components/facebook/helm/templates/deployments.yaml +++ b/backend/components/facebook/helm/templates/deployments.yaml @@ -59,6 +59,11 @@ spec: - name: Health-Check value: health-check initialDelaySeconds: 120 +{{ if .Values.global.kafkaCertAuth }} + volumeMounts: + - name: kafka-config-certs + mountPath: /opt/kafka/certs +{{ end }} resources: {{ toYaml .Values.connector.resources | indent 12 }} initContainers: @@ -76,6 +81,11 @@ spec: - name: provisioning-scripts configMap: name: provisioning-scripts +{{ if .Values.global.kafkaCertAuth }} + - name: kafka-config-certs + configMap: + name: kafka-config-certs +{{ end }} --- apiVersion: apps/v1 kind: Deployment @@ -124,6 +134,11 @@ spec: initialDelaySeconds: 120 periodSeconds: 10 failureThreshold: 3 +{{ if .Values.global.kafkaCertAuth }} + volumeMounts: + - name: kafka-config-certs + mountPath: /opt/kafka/certs +{{ end }} resources: {{ toYaml .Values.eventsRouter.resources | indent 10 }} initContainers: @@ -141,3 +156,8 @@ spec: - name: provisioning-scripts configMap: name: provisioning-scripts +{{ if .Values.global.kafkaCertAuth }} + - name: kafka-config-certs + configMap: + name: kafka-config-certs +{{ end }} \ No newline at end of file diff --git a/backend/components/google/helm/templates/deployments.yaml b/backend/components/google/helm/templates/deployments.yaml index 7af611787e..80c2af2376 100644 --- a/backend/components/google/helm/templates/deployments.yaml +++ b/backend/components/google/helm/templates/deployments.yaml @@ -54,6 +54,11 @@ spec: - name: Health-Check value: health-check initialDelaySeconds: 120 +{{ if .Values.global.kafkaCertAuth }} + volumeMounts: + - name: kafka-config-certs + mountPath: /opt/kafka/certs +{{ end }} resources: {{ toYaml .Values.connector.resources | indent 12 }} initContainers: @@ -71,6 +76,11 @@ spec: - name: provisioning-scripts configMap: name: provisioning-scripts +{{ if .Values.global.kafkaCertAuth }} + - name: kafka-config-certs + configMap: + name: kafka-config-certs +{{ end }} --- apiVersion: apps/v1 kind: Deployment @@ -122,6 +132,11 @@ spec: initialDelaySeconds: 120 periodSeconds: 10 failureThreshold: 3 +{{ if .Values.global.kafkaCertAuth }} + volumeMounts: + - name: kafka-config-certs + mountPath: /opt/kafka/certs +{{ end }} resources: {{ toYaml .Values.eventsRouter.resources | indent 10 }} initContainers: @@ -139,3 +154,8 @@ spec: - name: provisioning-scripts configMap: name: provisioning-scripts +{{ if .Values.global.kafkaCertAuth }} + - name: kafka-config-certs + configMap: + name: kafka-config-certs +{{ end }} \ No newline at end of file diff --git a/backend/components/media-resolver/helm/templates/deployment.yaml b/backend/components/media-resolver/helm/templates/deployment.yaml index b31af97ffd..3484e0b71f 100644 --- a/backend/components/media-resolver/helm/templates/deployment.yaml +++ b/backend/components/media-resolver/helm/templates/deployment.yaml @@ -45,6 +45,11 @@ spec: initialDelaySeconds: 120 periodSeconds: 10 failureThreshold: 3 +{{ if .Values.global.kafkaCertAuth }} + volumeMounts: + - name: kafka-config-certs + mountPath: /opt/kafka/certs +{{ end }} resources: {{ toYaml .Values.resources | indent 12 }} initContainers: @@ -62,3 +67,8 @@ spec: - name: provisioning-scripts configMap: name: provisioning-scripts +{{ if .Values.global.kafkaCertAuth }} + - name: kafka-config-certs + configMap: + name: kafka-config-certs +{{ end }} \ No newline at end of file diff --git a/backend/components/sources-api/helm/templates/deployment.yaml b/backend/components/sources-api/helm/templates/deployment.yaml index 9ce7cda665..4d23b16dea 100644 --- a/backend/components/sources-api/helm/templates/deployment.yaml +++ b/backend/components/sources-api/helm/templates/deployment.yaml @@ -50,6 +50,11 @@ spec: initialDelaySeconds: 120 periodSeconds: 10 failureThreshold: 3 +{{ if .Values.global.kafkaCertAuth }} + volumeMounts: + - name: kafka-config-certs + mountPath: /opt/kafka/certs +{{ end }} resources: {{ toYaml .Values.resources | indent 10 }} initContainers: @@ -67,3 +72,8 @@ spec: - name: provisioning-scripts configMap: name: provisioning-scripts +{{ if .Values.global.kafkaCertAuth }} + - name: kafka-config-certs + configMap: + name: kafka-config-certs +{{ end }} \ No newline at end of file diff --git a/backend/components/streams/helm/templates/deployment.yaml b/backend/components/streams/helm/templates/deployment.yaml index 067e9bf6c9..b0cd1dfb3f 100644 --- a/backend/components/streams/helm/templates/deployment.yaml +++ b/backend/components/streams/helm/templates/deployment.yaml @@ -48,6 +48,11 @@ spec: initialDelaySeconds: 120 periodSeconds: 10 failureThreshold: 3 +{{ if .Values.global.kafkaCertAuth }} + volumeMounts: + - name: kafka-config-certs + mountPath: /opt/kafka/certs +{{ end }} resources: {{ toYaml .Values.resources | indent 10 }} initContainers: @@ -67,3 +72,8 @@ spec: - name: provisioning-scripts configMap: name: provisioning-scripts +{{ if .Values.global.kafkaCertAuth }} + - name: kafka-config-certs + configMap: + name: kafka-config-certs +{{ end }} \ No newline at end of file diff --git a/backend/components/twilio/helm/templates/deployments.yaml b/backend/components/twilio/helm/templates/deployments.yaml index 0e3a6a5706..461dd6c30d 100644 --- a/backend/components/twilio/helm/templates/deployments.yaml +++ b/backend/components/twilio/helm/templates/deployments.yaml @@ -54,6 +54,11 @@ spec: - name: Health-Check value: health-check initialDelaySeconds: 120 +{{ if .Values.global.kafkaCertAuth }} + volumeMounts: + - name: kafka-config-certs + mountPath: /opt/kafka/certs +{{ end }} resources: {{ toYaml .Values.connector.resources | indent 12 }} initContainers: @@ -141,3 +146,8 @@ spec: - name: provisioning-scripts configMap: name: provisioning-scripts +{{ if .Values.global.kafkaCertAuth }} + - name: kafka-config-certs + configMap: + name: kafka-config-certs +{{ end }} \ No newline at end of file diff --git a/backend/components/viber/helm/templates/deployments.yaml b/backend/components/viber/helm/templates/deployments.yaml index ebb0d5254b..55fbb10d72 100644 --- a/backend/components/viber/helm/templates/deployments.yaml +++ b/backend/components/viber/helm/templates/deployments.yaml @@ -49,6 +49,11 @@ spec: - name: Health-Check value: health-check initialDelaySeconds: 120 +{{ if .Values.global.kafkaCertAuth }} + volumeMounts: + - name: kafka-config-certs + mountPath: /opt/kafka/certs +{{ end }} resources: {{ toYaml .Values.connector.resources | indent 12 }} initContainers: @@ -66,3 +71,8 @@ spec: - name: provisioning-scripts configMap: name: provisioning-scripts +{{ if .Values.global.kafkaCertAuth }} + - name: kafka-config-certs + configMap: + name: kafka-config-certs +{{ end }} \ No newline at end of file diff --git a/backend/components/webhook/helm/templates/deployments.yaml b/backend/components/webhook/helm/templates/deployments.yaml index 814a41c9ae..6c6cb5b1b1 100644 --- a/backend/components/webhook/helm/templates/deployments.yaml +++ b/backend/components/webhook/helm/templates/deployments.yaml @@ -55,6 +55,11 @@ spec: - name: Health-Check value: health-check initialDelaySeconds: 120 +{{ if .Values.global.kafkaCertAuth }} + volumeMounts: + - name: kafka-config-certs + mountPath: /opt/kafka/certs +{{ end }} resources: {{ toYaml .Values.consumer.resources | indent 10 }} initContainers: @@ -157,3 +162,8 @@ spec: - name: provisioning-scripts configMap: name: provisioning-scripts +{{ if .Values.global.kafkaCertAuth }} + - name: kafka-config-certs + configMap: + name: kafka-config-certs +{{ end }} \ No newline at end of file diff --git a/backend/components/whatsapp/helm/templates/deployments.yaml b/backend/components/whatsapp/helm/templates/deployments.yaml index 1fcbff4b11..4c5e04b48a 100644 --- a/backend/components/whatsapp/helm/templates/deployments.yaml +++ b/backend/components/whatsapp/helm/templates/deployments.yaml @@ -113,6 +113,11 @@ spec: initialDelaySeconds: 120 periodSeconds: 10 failureThreshold: 3 +{{ if .Values.global.kafkaCertAuth }} + volumeMounts: + - name: kafka-config-certs + mountPath: /opt/kafka/certs +{{ end }} resources: {{ toYaml .Values.eventsRouter.resources | indent 10 }} initContainers: @@ -130,3 +135,8 @@ spec: - name: provisioning-scripts configMap: name: provisioning-scripts +{{ if .Values.global.kafkaCertAuth }} + - name: kafka-config-certs + configMap: + name: kafka-config-certs +{{ end }} \ No newline at end of file diff --git a/docs/docs/getting-started/installation/helm.md b/docs/docs/getting-started/installation/helm.md index e47ed9b447..4d3738cc0f 100644 --- a/docs/docs/getting-started/installation/helm.md +++ b/docs/docs/getting-started/installation/helm.md @@ -290,6 +290,39 @@ Run the following command to create the `Airy` platform without the bundled inst helm install airy airy/airy --timeout 10m --set prerequisites.kafka.enabled=false --values ./airy.yaml ``` +#### Confluent + +To connect to a Kafka instance in Confluent cloud, settings the `config.kafka.brokers` and `config.kafka.aurhJaas` is enough, prior to deploying the Helm chart. + +#### Aiven + +Aiven cloud uses a keystore and truststore certificates that need to be loaded on the workloads that are connecting to Kafka. Get the necessary certificates and connection files from Aiven using the `avn` CLI and place them in a separate directory. + +``` +avn service user-kafka-java-creds {KAFKA_INSTANCE} --username {USERNAME} -d ./aiven/ --password {PASSWORD} +``` + +Create a Kubernetes ConfigMap that contains the contents of the created directory: + +``` +kubectl create configmap kafka-config-certs --from-file aiven/ +``` + +Set the connection appropriate parameters in your `airy.yaml` file: + +```yaml +config: + kafka: + brokers: "the-aiven-kafka-broker-url" + keyTrustSecret: "the-key-trust-secret" +``` + +Then install Airy with the following command: + +```sh +helm install airy airy/airy --timeout 10m --set prerequisites.kafka.enabled=false --set global.kafkaCertAuth=true --values ./airy.yaml +``` + ### Kafka partitions per topic Currently all the default topics in the Airy instance are created with 10 partitions. To create these topics with a different number of partitions, add the following to your `airy.yaml` file before running `helm install` (before the initial creation of the topics): diff --git a/infrastructure/helm-chart/templates/components/api-admin/deployment.yaml b/infrastructure/helm-chart/templates/components/api-admin/deployment.yaml index 75cab9b8dc..3af2e13445 100644 --- a/infrastructure/helm-chart/templates/components/api-admin/deployment.yaml +++ b/infrastructure/helm-chart/templates/components/api-admin/deployment.yaml @@ -60,6 +60,11 @@ spec: initialDelaySeconds: 120 periodSeconds: 10 failureThreshold: 3 +{{ if .Values.global.kafkaCertAuth }} + volumeMounts: + - name: kafka-config-certs + mountPath: /opt/kafka/certs +{{ end }} resources: {{ toYaml .Values.components.api.admin.resources | indent 10 }} initContainers: @@ -77,3 +82,8 @@ spec: - name: provisioning-scripts configMap: name: provisioning-scripts +{{ if .Values.global.kafkaCertAuth }} + - name: kafka-config-certs + configMap: + name: kafka-config-certs +{{ end }} \ No newline at end of file diff --git a/infrastructure/helm-chart/templates/components/api-communication/deployment.yaml b/infrastructure/helm-chart/templates/components/api-communication/deployment.yaml index 362428ee8b..a461864d1d 100644 --- a/infrastructure/helm-chart/templates/components/api-communication/deployment.yaml +++ b/infrastructure/helm-chart/templates/components/api-communication/deployment.yaml @@ -45,6 +45,11 @@ spec: initialDelaySeconds: 120 periodSeconds: 10 failureThreshold: 3 +{{ if .Values.global.kafkaCertAuth }} + volumeMounts: + - name: kafka-config-certs + mountPath: /opt/kafka/certs +{{ end }} resources: {{ toYaml .Values.components.api.communication.resources | indent 10 }} initContainers: @@ -62,3 +67,8 @@ spec: - name: provisioning-scripts configMap: name: provisioning-scripts +{{ if .Values.global.kafkaCertAuth }} + - name: kafka-config-certs + configMap: + name: kafka-config-certs +{{ end }} \ No newline at end of file diff --git a/infrastructure/helm-chart/templates/components/api-components-installer/deployment.yaml b/infrastructure/helm-chart/templates/components/api-components-installer/deployment.yaml index fd865b890b..363863fafc 100644 --- a/infrastructure/helm-chart/templates/components/api-components-installer/deployment.yaml +++ b/infrastructure/helm-chart/templates/components/api-components-installer/deployment.yaml @@ -81,6 +81,11 @@ spec: initialDelaySeconds: 60 periodSeconds: 10 failureThreshold: 3 +{{ if .Values.global.kafkaCertAuth }} + volumeMounts: + - name: kafka-config-certs + mountPath: /opt/kafka/certs +{{ end }} resources: {{ toYaml .Values.components.api.components.installer.resources | indent 10 }} initContainers: @@ -102,4 +107,8 @@ spec: - name: provisioning-scripts configMap: name: provisioning-scripts - +{{ if .Values.global.kafkaCertAuth }} + - name: kafka-config-certs + configMap: + name: kafka-config-certs +{{ end }} \ No newline at end of file diff --git a/infrastructure/helm-chart/templates/components/api-websocket/deployment.yaml b/infrastructure/helm-chart/templates/components/api-websocket/deployment.yaml index 6739cfe3c0..c4d95e0c57 100644 --- a/infrastructure/helm-chart/templates/components/api-websocket/deployment.yaml +++ b/infrastructure/helm-chart/templates/components/api-websocket/deployment.yaml @@ -45,6 +45,11 @@ spec: initialDelaySeconds: 120 periodSeconds: 10 failureThreshold: 3 +{{ if .Values.global.kafkaCertAuth }} + volumeMounts: + - name: kafka-config-certs + mountPath: /opt/kafka/certs +{{ end }} resources: {{ toYaml .Values.components.api.websocket.resources | indent 10 }} initContainers: @@ -62,3 +67,8 @@ spec: - name: provisioning-scripts configMap: name: provisioning-scripts +{{ if .Values.global.kafkaCertAuth }} + - name: kafka-config-certs + configMap: + name: kafka-config-certs +{{ end }} \ No newline at end of file diff --git a/infrastructure/helm-chart/templates/components/unread-counter/deployment.yaml b/infrastructure/helm-chart/templates/components/unread-counter/deployment.yaml index c8650099f1..4b44212560 100644 --- a/infrastructure/helm-chart/templates/components/unread-counter/deployment.yaml +++ b/infrastructure/helm-chart/templates/components/unread-counter/deployment.yaml @@ -45,6 +45,11 @@ spec: initialDelaySeconds: 120 periodSeconds: 10 failureThreshold: 3 +{{ if .Values.global.kafkaCertAuth }} + volumeMounts: + - name: kafka-config-certs + mountPath: /opt/kafka/certs +{{ end }} resources: {{ toYaml .Values.components.api.unread_counter.resources | indent 10 }} initContainers: @@ -62,3 +67,8 @@ spec: - name: provisioning-scripts configMap: name: provisioning-scripts +{{ if .Values.global.kafkaCertAuth }} + - name: kafka-config-certs + configMap: + name: kafka-config-certs +{{ end }} \ No newline at end of file diff --git a/infrastructure/helm-chart/templates/config/kafka.yaml b/infrastructure/helm-chart/templates/config/kafka.yaml index a00933d800..23897e7214 100644 --- a/infrastructure/helm-chart/templates/config/kafka.yaml +++ b/infrastructure/helm-chart/templates/config/kafka.yaml @@ -13,3 +13,4 @@ data: {{- end }} KAFKA_SCHEMA_REGISTRY_URL: {{ .Values.config.kafka.schemaRegistryUrl }} KAFKA_COMMIT_INTERVAL_MS: "{{ .Values.config.kafka.commitInterval }}" + KAFKA_KEY_TRUST_SECRET: {{ .Values.config.kafka.keyTrustSecret }} diff --git a/infrastructure/helm-chart/values.yaml b/infrastructure/helm-chart/values.yaml index e4f6afa182..982a736b54 100644 --- a/infrastructure/helm-chart/values.yaml +++ b/infrastructure/helm-chart/values.yaml @@ -13,6 +13,7 @@ config: brokers: "kafka-headless:9092" zookeepers: "zookeeper:2181" authJaas: "" + keyTrustSecret: "" minimumReplicas: 1 schemaRegistryUrl: "http://schema-registry:8081" commitInterval: 1000 diff --git a/lib/java/kafka/core/src/main/java/co/airy/kafka/core/KafkaConsumerWrapper.java b/lib/java/kafka/core/src/main/java/co/airy/kafka/core/KafkaConsumerWrapper.java index b5739d9503..ca75dc902b 100644 --- a/lib/java/kafka/core/src/main/java/co/airy/kafka/core/KafkaConsumerWrapper.java +++ b/lib/java/kafka/core/src/main/java/co/airy/kafka/core/KafkaConsumerWrapper.java @@ -12,6 +12,10 @@ import java.time.temporal.ChronoUnit; import java.util.Collection; import java.util.Properties; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SslConfigs; +import java.util.HashMap; + public class KafkaConsumerWrapper { @@ -22,6 +26,7 @@ public class KafkaConsumerWrapper { private KafkaConsumer consumer; private String jaasConfig; + private String kafkaKeyTrustSecret; public KafkaConsumerWrapper(final String brokers, final String schemaRegistryUrl) { props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); @@ -33,13 +38,22 @@ public KafkaConsumerWrapper(final String brokers, final String schemaRegistryUrl props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); } - public KafkaConsumerWrapper withAuthJaas(String jaasConfig) { + public KafkaConsumerWrapper withAuthJaas(String jaasConfig, String kafkaKeyTrustSecret) { this.jaasConfig = jaasConfig; if(jaasConfig != null) { props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", jaasConfig); } + if (kafkaKeyTrustSecret != null) { + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); + props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/opt/kafka/certs/client.truststore.jks"); + props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaKeyTrustSecret); + props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12"); + props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/opt/kafka/certs/client.keystore.p12"); + props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaKeyTrustSecret); + props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyTrustSecret); + } return this; } diff --git a/lib/java/kafka/streams/src/main/java/co/airy/kafka/streams/KafkaStreamsWrapper.java b/lib/java/kafka/streams/src/main/java/co/airy/kafka/streams/KafkaStreamsWrapper.java index f44f5075aa..18ca499cab 100644 --- a/lib/java/kafka/streams/src/main/java/co/airy/kafka/streams/KafkaStreamsWrapper.java +++ b/lib/java/kafka/streams/src/main/java/co/airy/kafka/streams/KafkaStreamsWrapper.java @@ -30,6 +30,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SslConfigs; +import java.util.HashMap; + public class KafkaStreamsWrapper { private static final Logger log = AiryLoggerFactory.getLogger(KafkaStreamsWrapper.class); @@ -37,6 +41,7 @@ public class KafkaStreamsWrapper { private final String brokers; private final String schemaRegistryUrl; private String jaasConfig; + private String kafkaKeyTrustSecret; private long commitIntervalInMs; private long suppressIntervalInMs; private int threadCount; @@ -70,8 +75,9 @@ public KafkaStreamsWrapper(final String brokers, final String schemaRegistryUrl) healthCheckRunnerThread = new HealthCheckRunner(testMode); } - public KafkaStreamsWrapper withJaasConfig(String jaasConfig) { + public KafkaStreamsWrapper withJaasConfig(String jaasConfig, String kafkaKeyTrustSecret) { this.jaasConfig = jaasConfig; + this.kafkaKeyTrustSecret = kafkaKeyTrustSecret; return this; } @@ -227,6 +233,17 @@ public synchronized void start(final Topology topology, final String appId) thro props.put("sasl.jaas.config", jaasConfig); } + if (this.kafkaKeyTrustSecret != null) { + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); + props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/opt/kafka/certs/client.truststore.jks"); + props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaKeyTrustSecret); + props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12"); + props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/opt/kafka/certs/client.keystore.p12"); + props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaKeyTrustSecret); + props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyTrustSecret); + } + + props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, this.maxRequestSize); props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, this.fetchMaxBytes); diff --git a/lib/java/spring/kafka/core/src/main/java/co/airy/spring/kafka/core/KafkaCoreConfig.java b/lib/java/spring/kafka/core/src/main/java/co/airy/spring/kafka/core/KafkaCoreConfig.java index 0cd6c28122..f8488e2f6f 100644 --- a/lib/java/spring/kafka/core/src/main/java/co/airy/spring/kafka/core/KafkaCoreConfig.java +++ b/lib/java/spring/kafka/core/src/main/java/co/airy/spring/kafka/core/KafkaCoreConfig.java @@ -13,6 +13,9 @@ import org.springframework.context.annotation.Scope; import java.util.Properties; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SslConfigs; +import java.util.HashMap; @Configuration @PropertySource("classpath:kafka-core.properties") @@ -21,7 +24,8 @@ public class KafkaCoreConfig { @Lazy @Scope("prototype") public KafkaProducer kafkaProducer(@Value("${kafka.brokers}") final String brokers, @Value("${kafka.schema-registry-url}") final String schemaRegistryUrl, - @Value("${AUTH_JAAS:#{null}}") final String jaasConfig) { + @Value("${AUTH_JAAS:#{null}}") final String jaasConfig, + @Value("${KAFKA_KEY_TRUST_SECRET:#{null}}") final String kafkaKeyTrustSecret) { final Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); @@ -37,6 +41,16 @@ public KafkaProducer kafkaProducer(@Value("${kafka.brokers}") final props.put("sasl.jaas.config", jaasConfig); } + if (kafkaKeyTrustSecret != null) { + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); + props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/opt/kafka/certs/client.truststore.jks"); + props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaKeyTrustSecret); + props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12"); + props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/opt/kafka/certs/client.keystore.p12"); + props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaKeyTrustSecret); + props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyTrustSecret); + } + return new KafkaProducer<>(props); } @@ -44,8 +58,9 @@ public KafkaProducer kafkaProducer(@Value("${kafka.brokers}") final @Lazy @Scope("prototype") public KafkaConsumerWrapper kafkaConsumer(@Value("${kafka.brokers}") final String brokers, @Value("${kafka.schema-registry-url}") final String schemaRegistryUrl, - @Value("${kafka.sasl.jaas.config:#{null}}") final String jaasConfig) { + @Value("${kafka.sasl.jaas.config:#{null}}") final String jaasConfig, + @Value("${KAFKA_KEY_TRUST_SECRET:#{null}}") final String kafkaKeyTrustSecret) { return new KafkaConsumerWrapper(brokers, schemaRegistryUrl) - .withAuthJaas(jaasConfig); + .withAuthJaas(jaasConfig, kafkaKeyTrustSecret); } } diff --git a/lib/java/spring/kafka/streams/src/main/java/co/airy/spring/kafka/streams/KafkaStreamsConfig.java b/lib/java/spring/kafka/streams/src/main/java/co/airy/spring/kafka/streams/KafkaStreamsConfig.java index f9995abfbd..217f2f9a6a 100644 --- a/lib/java/spring/kafka/streams/src/main/java/co/airy/spring/kafka/streams/KafkaStreamsConfig.java +++ b/lib/java/spring/kafka/streams/src/main/java/co/airy/spring/kafka/streams/KafkaStreamsConfig.java @@ -30,6 +30,9 @@ public class KafkaStreamsConfig { @Value("${AUTH_JAAS:#{null}}") private String jaasConfig; + @Value("${KAFKA_KEY_TRUST_SECRET:#{null}}") + private String kafkaKeyTrustSecret; + @Value("${kafka.rpc-port:0}") private int rpcPort; @@ -68,7 +71,7 @@ public class KafkaStreamsConfig { public KafkaStreamsWrapper airyKafkaStreams(@Value("${kafka.brokers}") final String brokers, @Value("${kafka.schema-registry-url}") final String schemaRegistryUrl) { return new KafkaStreamsWrapper(brokers, schemaRegistryUrl) .withCommitIntervalInMs(commitIntervalMs) - .withJaasConfig(jaasConfig) + .withJaasConfig(jaasConfig, kafkaKeyTrustSecret) .withSuppressIntervalInMs(suppressIntervalMs) .withThreadCount(streamsThreadCount) .withAppServerHost(rpcHost)