Skip to content

Commit

Permalink
[#4139] Add Flink connector (#4147)
Browse files Browse the repository at this point in the history
  • Loading branch information
ljupcovangelski committed Apr 2, 2024
1 parent fafddd7 commit 78e5f54
Show file tree
Hide file tree
Showing 18 changed files with 1,154 additions and 0 deletions.
16 changes: 16 additions & 0 deletions backend/components/flink-connector/Dockerfile.result-sender
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM golang:1.17

WORKDIR /app

COPY ./src/types.go ./src/tools.go ./src/result-sender.go ./

RUN go mod init main && \
go get github.com/confluentinc/confluent-kafka-go/v2/kafka && \
go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry && \
go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde && \
go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro && \
go get golang.org/x/net

RUN go build -o app

CMD ["./app"]
16 changes: 16 additions & 0 deletions backend/components/flink-connector/Dockerfile.statements-executor
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM golang:1.17

WORKDIR /app

COPY ./src/types.go ./src/tools.go ./src/statements-executor.go ./

RUN go mod init main && \
go get github.com/confluentinc/confluent-kafka-go/v2/kafka && \
go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry && \
go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde && \
go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro && \
go get golang.org/x/net

RUN go build -o app

CMD ["./app"]
13 changes: 13 additions & 0 deletions backend/components/flink-connector/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
build-statements-executor:
docker build -t flink-connector/statements-executor -f Dockerfile.statements-executor .

release-statements-executor: build-statements-executor
docker tag flink-connector/statements-executor ghcr.io/airyhq/connectors/flink/statements-executor:release
docker push ghcr.io/airyhq/connectors/flink/statements-executor:release

build-result-sender:
docker build -t flink-connector/result-sender -f Dockerfile.result-sender .

release-result-sender: build-result-sender
docker tag flink-connector/result-sender ghcr.io/airyhq/connectors/flink/result-sender:release
docker push ghcr.io/airyhq/connectors/flink/result-sender:release
3 changes: 3 additions & 0 deletions backend/components/flink-connector/helm/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
load("//tools/build:helm.bzl", "helm_ruleset_core_version")

helm_ruleset_core_version()
6 changes: 6 additions & 0 deletions backend/components/flink-connector/helm/Chart.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

apiVersion: v2
appVersion: "1.0"
description: Flink connector
name: flink-connector
version: 1.0
10 changes: 10 additions & 0 deletions backend/components/flink-connector/helm/templates/configmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ .Values.component }}
labels:
core.airy.co/managed: "true"
core.airy.co/mandatory: "{{ .Values.mandatory }}"
core.airy.co/component: "{{ .Values.component }}"
annotations:
core.airy.co/enabled: "{{ .Values.enabled }}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.component }}-{{ .Values.resultSender.name }}
labels:
app: {{ .Values.component }}
core.airy.co/managed: "true"
core.airy.co/mandatory: "{{ .Values.mandatory }}"
core.airy.co/component: {{ .Values.component }}
spec:
replicas: {{ if .Values.enabled }} 1 {{ else }} 0 {{ end }}
selector:
matchLabels:
app: {{ .Values.component }}-{{ .Values.resultSender.name }}
strategy:
rollingUpdate:
maxSurge: 1
maxUnavailable: 1
type: RollingUpdate
template:
metadata:
labels:
app: {{ .Values.component }}-{{ .Values.resultSender.name }}
spec:
containers:
- name: app
image: "ghcr.io/airyhq/{{ .Values.resultSender.image }}:release"
imagePullPolicy: Always
envFrom:
- configMapRef:
name: security
- configMapRef:
name: kafka-config
- configMapRef:
name: {{ .Values.component }}
env:
- name: KAFKA_TOPIC_NAME
value: {{ .Values.resultSender.topic }}
- name: API_COMMUNICATION_URL
value: {{ .Values.apiCommunicationUrl }}
livenessProbe:
httpGet:
path: /actuator/health
port: {{ .Values.port }}
httpHeaders:
- name: Health-Check
value: health-check
initialDelaySeconds: 43200
periodSeconds: 10
failureThreshold: 3
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: {{ .Values.component }}-{{ .Values.resultSender.name }}
labels:
app: {{ .Values.component }}-{{ .Values.resultSender.name }}
spec:
type: ClusterIP
clusterIP: None
ports:
- name: {{ .Values.component }}-{{ .Values.resultSender.name }}
port: 80
targetPort: {{ .Values.port }}
selector:
app: {{ .Values.component }}-{{ .Values.resultSender.name }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.component }}-{{ .Values.executor.name }}
labels:
app: {{ .Values.component }}
core.airy.co/managed: "true"
core.airy.co/mandatory: "{{ .Values.mandatory }}"
core.airy.co/component: {{ .Values.component }}
spec:
replicas: {{ if .Values.enabled }} 1 {{ else }} 0 {{ end }}
selector:
matchLabels:
app: {{ .Values.component }}-{{ .Values.executor.name }}
strategy:
rollingUpdate:
maxSurge: 1
maxUnavailable: 1
type: RollingUpdate
template:
metadata:
labels:
app: {{ .Values.component }}-{{ .Values.executor.name }}
spec:
containers:
- name: app
image: "ghcr.io/airyhq/{{ .Values.executor.image }}:release"
imagePullPolicy: Always
envFrom:
- configMapRef:
name: security
- configMapRef:
name: kafka-config
- configMapRef:
name: {{ .Values.component }}
env:
- name: KAFKA_TOPIC_NAME
value: {{ .Values.executor.topic }}
- name: FLINK_GATEWAY_URL
value: {{ .Values.gatewayUrl }}
livenessProbe:
httpGet:
path: /actuator/health
port: {{ .Values.port }}
httpHeaders:
- name: Health-Check
value: health-check
initialDelaySeconds: 43200
periodSeconds: 10
failureThreshold: 3
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

apiVersion: v1
kind: Service
metadata:
name: {{ .Values.component }}-{{ .Values.executor.name }}
labels:
app: {{ .Values.component }}-{{ .Values.executor.name }}
spec:
type: ClusterIP
clusterIP: None
ports:
- name: {{ .Values.component }}-{{ .Values.executor.name }}
port: 80
targetPort: {{ .Values.port }}
selector:
app: {{ .Values.component }}-{{ .Values.executor.name }}
16 changes: 16 additions & 0 deletions backend/components/flink-connector/helm/values.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

component: flink-connector
mandatory: false
enabled: false
port: 8080
resources:
gatewayUrl: "http://flink-jobmanager:8083"
apiCommunicationUrl: "http://api-communication/messages.send"
executor:
name: statements-executor
image: connectors/flink/statements-executor
topic: flink.statements
resultSender:
name: result-sender
image: connectors/flink/result-sender
topic: flink.output
158 changes: 158 additions & 0 deletions backend/components/flink-connector/src/result-sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package main

import (
"encoding/json"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {

// Create Kafka consumer to read the statements
kafkaURL := os.Getenv("KAFKA_BROKERS")
schemaRegistryURL := os.Getenv("KAFKA_SCHEMA_REGISTRY_URL")
topicName := os.Getenv("KAFKA_TOPIC_NAME")
systemToken := os.Getenv("systemToken")
authUsername := os.Getenv("AUTH_JAAS_USERNAME")
authPassword := os.Getenv("AUTH_JAAS_PASSWORD")
flinkProvider := os.Getenv("provider")
groupID := "result-sender"
msgNormal := false
msgDebug := true

if kafkaURL == "" || schemaRegistryURL == "" || topicName == "" {
fmt.Println("KAFKA_BROKERS, KAFKA_SCHEMA_REGISTRY_URL, and KAFKA_TOPIC_NAME environment variables must be set")
return
}

var confluentConnection ConfluentConnection
confluentConnection.Token = os.Getenv("confluentToken")
confluentConnection.ComputePoolID = os.Getenv("confluentComputePoolID")
confluentConnection.Principal = os.Getenv("confluentPrincipal")
confluentConnection.SQLCurrentCatalog = os.Getenv("confluentSQLCurrentCatalog")
confluentConnection.SQLCurrentDatabase = os.Getenv("confluentSQLCurrentDatabase")

// Healthcheck
http.HandleFunc("/actuator/health", func(w http.ResponseWriter, r *http.Request) {
response := map[string]string{"status": "UP"}
jsonResponse, err := json.Marshal(response)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(jsonResponse)
})

go func() {
if err := http.ListenAndServe(":80", nil); err != nil {
panic(err)
}
}()

fmt.Println("Health-check started")

// Create Kafka consumer configuration
fmt.Println("Creating Kafka consumer for topic: ", topicName)

c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": kafkaURL,
"group.id": groupID,
"auto.offset.reset": "earliest",
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": authUsername,
"sasl.password": authPassword,
})
if err != nil {
fmt.Printf("Error creating consumer: %v\n", err)
return
}
c.SubscribeTopics([]string{topicName}, nil)
// Channel for signals
signals := make(chan os.Signal, 1)
done := make(chan bool, 1)

signal.Notify(signals, os.Interrupt, syscall.SIGTERM)

go func() {
for {
select {
case sig := <-signals:
// If an interrupt signal is received, break the loop
fmt.Printf("Caught signal %v: terminating\n", sig)
done <- true
return
default:
msg, err := c.ReadMessage(-1)
if err == nil {
var flinkOutput FlinkOutput
if err := json.Unmarshal(msg.Value, &flinkOutput); err != nil {
fmt.Printf("Error unmarshalling message: %v\n", err)
continue
} else {
fmt.Printf("Received message: %+v\n", flinkOutput)

flinkGatewayURL := os.Getenv("FLINK_GATEWAY_URL")
confluentGatewayURL := os.Getenv("CONFLUENT_GATEWAY_URL")

var result FlinkResult
var headerConfluent []string
var resultConfluent string

if flinkProvider == "flink" {
fmt.Println("Flink gateway: ", flinkGatewayURL)
result, err = getFlinkResult(flinkGatewayURL, flinkOutput.SessionID)
headerConfluent = []string{}
} else {
fmt.Println("Flink gateway: ", confluentGatewayURL)
fmt.Println("Waiting 20 seconds...")
time.Sleep(20 * time.Second)
headerConfluent, resultConfluent, err = getFlinkResultConfluent(confluentGatewayURL, flinkOutput.SessionID, confluentConnection)
}
if err != nil {
fmt.Println("Unable to get Flink result:", err)
sendMessage("Error: "+err.Error(), flinkOutput.ConversationID, systemToken, msgDebug)
return
}
if flinkProvider == "flink" {
sendMessage("Result retrieved from Flink: "+fmt.Sprintf("%#v", result), flinkOutput.ConversationID, systemToken, msgDebug)
sendMessage("Now converting the result to Markdown", flinkOutput.ConversationID, systemToken, msgDebug)
response, err := convertResultToMarkdown(result)
if err != nil {
fmt.Println("Unable to generate Markdown from result:", err)
sendMessage("Error: "+err.Error(), flinkOutput.ConversationID, systemToken, msgDebug)
sendMessage("I'm sorry, I am unable to fetch the results from the Flink table.", flinkOutput.ConversationID, systemToken, msgNormal)
return
}
sendMessage(response, flinkOutput.ConversationID, systemToken, msgNormal)
} else {
sendMessage("Result retrieved from Flink: "+fmt.Sprintf("%#v", resultConfluent), flinkOutput.ConversationID, systemToken, msgDebug)
sendMessage("Now converting the result to Markdown", flinkOutput.ConversationID, systemToken, msgDebug)
response, err := convertConfluentResultToMarkdown(headerConfluent, resultConfluent)
if err != nil {
fmt.Println("Unable to generate Markdown from result:", err)
sendMessage("Error: "+err.Error(), flinkOutput.ConversationID, systemToken, msgDebug)
sendMessage("I'm sorry, I am unable to fetch the results from the Flink table.", flinkOutput.ConversationID, systemToken, msgNormal)
return
}
sendMessage(response, flinkOutput.ConversationID, systemToken, msgNormal)
}
}
} else {
fmt.Printf("Consumer error: %v\n", err)
}
}
}
}()
<-done
c.Close()
fmt.Println("Consumer closed")
}
Loading

0 comments on commit 78e5f54

Please sign in to comment.