Skip to content

Commit

Permalink
Update env variables
Browse files Browse the repository at this point in the history
  • Loading branch information
ljupcovangelski committed Feb 7, 2024
1 parent 4749dfb commit e411f75
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ spec:
env:
- name: KAFKA_TOPIC_NAME
value: {{ .Values.resultSender.topic }}
- name: API_COMMUNICATION_URL
value: {{ .Values.apiCommunicationUrl }}
livenessProbe:
httpGet:
path: /actuator/health
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ spec:
env:
- name: KAFKA_TOPIC_NAME
value: {{ .Values.executor.topic }}
- name: FLINK_GATEWAY_URL
value: {{ .Values.gatewayUrl }}
livenessProbe:
httpGet:
path: /actuator/health
Expand Down
2 changes: 2 additions & 0 deletions backend/components/flink/helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ mandatory: false
enabled: false
port: 8080
resources:
gatewayUrl: "http://flink-jobmanager:8083"
apiCommunicationUrl: "http://api-communication/messages.send"
executor:
name: statements-executor
image: connectors/flink/statements-executor
Expand Down
12 changes: 2 additions & 10 deletions backend/components/flink/result-sender/src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,20 @@ import (

func main() {

// Create Kafka consumer to read the statements
kafkaURL := os.Getenv("KAFKA_BROKERS")
schemaRegistryURL := os.Getenv("KAFKA_SCHEMA_REGISTRY_URL")
topicName := os.Getenv("KAFKA_TOPIC_NAME")
systemToken := os.Getenv("systemToken")
authUsername := os.Getenv("AUTH_JAAS_USERNAME")
authPassword := os.Getenv("AUTH_JAAS_PASSWORD")
//timestamp := time.Now().Unix()
//strTimestamp := fmt.Sprintf("%d", timestamp)
groupID := "result-sender" //+ "-" + strTimestamp
groupID := "result-sender"
flinkGatewayURL := os.Getenv("FLINK_GATEWAY_URL")

if kafkaURL == "" || schemaRegistryURL == "" || topicName == "" {
fmt.Println("KAFKA_BROKERS, KAFKA_SCHEMA_REGISTRY_URL, and KAFKA_TOPIC_NAME environment variables must be set")
return
}

// Healthcheck
http.HandleFunc("/actuator/health", func(w http.ResponseWriter, r *http.Request) {
response := map[string]string{"status": "UP"}
jsonResponse, err := json.Marshal(response)
Expand All @@ -50,7 +47,6 @@ func main() {

fmt.Println("Health-check started")

// Create Kafka consumer configuration
fmt.Println("Creating Kafka consumer for topic: ", topicName)

c, err := kafka.NewConsumer(&kafka.ConfigMap{
Expand All @@ -67,7 +63,6 @@ func main() {
return
}
c.SubscribeTopics([]string{topicName}, nil)
// Channel for signals
signals := make(chan os.Signal, 1)
done := make(chan bool, 1)

Expand All @@ -77,7 +72,6 @@ func main() {
for {
select {
case sig := <-signals:
// If an interrupt signal is received, break the loop
fmt.Printf("Caught signal %v: terminating\n", sig)
done <- true
return
Expand All @@ -91,7 +85,6 @@ func main() {
} else {
fmt.Printf("Received message: %+v\n", flinkOutput)

flinkGatewayURL := "http://flink-jobmanager:8083" //"http://flink.us-east-2.aws.confluent.cloud/v1beta1/sql", Replace with your Flink Gateway URL https://flink.region.provider.confluent.cloud
fmt.Println("Flink gateway: ", flinkGatewayURL)
result, err := getFlinkResult(flinkGatewayURL, flinkOutput.SessionID)
if err != nil {
Expand All @@ -107,7 +100,6 @@ func main() {
sendMessage(response, flinkOutput.ConversationID, systemToken)
}
} else {
// Log the error and continue
fmt.Printf("Consumer error: %v\n", err)
}
}
Expand Down
4 changes: 3 additions & 1 deletion backend/components/flink/result-sender/src/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"io/ioutil"
"net/http"
"os"
"strings"
"time"
)
Expand Down Expand Up @@ -77,6 +78,7 @@ func getFlinkResult(url, sessionID string) (FlinkResult, error) {
}

func sendMessage(message string, conversationId string, systemToken string) (int, string, error) {
apiCommunicationUrl := os.Getenv("API_COMMUNICATION_URL")
messageContent := messageContent{
Text: message,
}
Expand All @@ -90,7 +92,7 @@ func sendMessage(message string, conversationId string, systemToken string) (int
return 0, "", errors.New("The message could not be encoded to JSON for sending.")
}

req, err := http.NewRequest("POST", "http://api-communication/messages.send", bytes.NewReader(messageJSON))
req, err := http.NewRequest("POST", apiCommunicationUrl, bytes.NewReader(messageJSON))
if err != nil {
fmt.Printf("Error creating request: %v\n", err)
return 0, "", errors.New("The message could not be sent.")
Expand Down
13 changes: 1 addition & 12 deletions backend/components/flink/statements-executor/src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,21 @@ import (

func main() {

// Create Kafka consumer to read the statements
kafkaURL := os.Getenv("KAFKA_BROKERS")
schemaRegistryURL := os.Getenv("KAFKA_SCHEMA_REGISTRY_URL")
topicName := os.Getenv("KAFKA_TOPIC_NAME")
//systemToken := os.Getenv("systemToken")
authUsername := os.Getenv("AUTH_JAAS_USERNAME")
authPassword := os.Getenv("AUTH_JAAS_PASSWORD")
timestamp := time.Now().Unix()
strTimestamp := fmt.Sprintf("%d", timestamp)
groupID := "statement-executor-" + strTimestamp
//llmEndpoint := "http://llm-controller:5000/llm.proxy"
flinkGatewayURL := os.Getenv("FLINK_GATEWAY_URL")

if kafkaURL == "" || schemaRegistryURL == "" || topicName == "" {
fmt.Println("KAFKA_BROKERS, KAFKA_SCHEMA_REGISTRY_URL, and KAFKA_TOPIC_NAME environment variables must be set")
return
}

// Healthcheck
http.HandleFunc("/actuator/health", func(w http.ResponseWriter, r *http.Request) {
response := map[string]string{"status": "UP"}
jsonResponse, err := json.Marshal(response)
Expand All @@ -52,7 +49,6 @@ func main() {

fmt.Println("Health-check started")

// Create Kafka consumer configuration
fmt.Println("Creating Kafka consumer for topic: ", topicName)

c, err := kafka.NewConsumer(&kafka.ConfigMap{
Expand All @@ -69,7 +65,6 @@ func main() {
return
}
c.SubscribeTopics([]string{topicName}, nil)
// Channel for signals
signals := make(chan os.Signal, 1)
done := make(chan bool, 1)

Expand All @@ -79,7 +74,6 @@ func main() {
for {
select {
case sig := <-signals:
// If an interrupt signal is received, break the loop
fmt.Printf("Caught signal %v: terminating\n", sig)
done <- true
return
Expand All @@ -92,10 +86,7 @@ func main() {
continue
} else {
fmt.Printf("Received message: %+v\n", statementSet)

flinkGatewayURL := "http://flink-jobmanager:8083" //"http://flink.us-east-2.aws.confluent.cloud/v1beta1/sql", Replace with your Flink Gateway URL https://flink.region.provider.confluent.cloud
fmt.Println("Flink gateway: ", flinkGatewayURL)

sessionID, err := sendFlinkSQL(flinkGatewayURL, statementSet)
if err != nil {
fmt.Println("Error running Flink statement:", err)
Expand All @@ -110,11 +101,9 @@ func main() {
err = produceFlinkOutput(flinkOutput, kafkaURL, "flink-producer-"+groupID, authUsername, authPassword)
if err != nil {
fmt.Printf("error producing message to Kafka: %v\n", err)
//sendMessage("I am sorry, I am unable to answer that question.", message.ConversationID, systemToken)
}
}
} else {
// Log the error and continue
fmt.Printf("Consumer error: %v\n", err)
}
}
Expand Down
8 changes: 0 additions & 8 deletions backend/components/flink/statements-executor/src/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ import (
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

// sendFlinkSQL sends an SQL statement to the Flink Gateway
func sendFlinkSQL(url string, statementSet FlinkStatementSet) (string, error) {
// Substitute placeholers with variables
timestamp := time.Now().Unix()
strTimestamp := fmt.Sprintf("%d", timestamp)
replacements := map[string]string{
Expand Down Expand Up @@ -91,40 +89,34 @@ func sendFlinkSQL(url string, statementSet FlinkStatementSet) (string, error) {
defer resp.Body.Close()
}

// Handle the response (check if the request was successful)
return sessionResponse.SessionHandle, nil
}

func produceFlinkOutput(flinkOutput FlinkOutput, kafkaURL, groupID, authUsername, authPassword string) error {

kafkaTopic := "flink.outputs"

// Marshal the query to JSON
flinkOutputJSON, err := json.Marshal(flinkOutput)
if err != nil {
return fmt.Errorf("error marshaling query to JSON: %w", err)
}

// Basic Kafka producer configuration
configMap := kafka.ConfigMap{
"bootstrap.servers": kafkaURL,
}
// Conditionally add SASL/SSL configurations if username and password are provided
if authUsername != "" && authPassword != "" {
configMap.SetKey("security.protocol", "SASL_SSL")
configMap.SetKey("sasl.mechanisms", "PLAIN")
configMap.SetKey("sasl.username", authUsername)
configMap.SetKey("sasl.password", authPassword)
}

// Create a new Kafka producer
producer, err := kafka.NewProducer(&configMap)
if err != nil {
return fmt.Errorf("failed to create producer: %w", err)
}
defer producer.Close()

// Produce the message
message := kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &kafkaTopic, Partition: kafka.PartitionAny},
Key: []byte(flinkOutput.SessionID),
Expand Down

0 comments on commit e411f75

Please sign in to comment.