From e411f759606f5e0bf286583ddbbb5cb2c26d9eb4 Mon Sep 17 00:00:00 2001 From: ljupcovangelski Date: Wed, 7 Feb 2024 08:55:54 +0100 Subject: [PATCH] Update env variables --- .../helm/templates/result-sender/deployment.yaml | 2 ++ .../templates/statements-executor/deployment.yaml | 2 ++ backend/components/flink/helm/values.yaml | 2 ++ backend/components/flink/result-sender/src/main.go | 12 ++---------- backend/components/flink/result-sender/src/tools.go | 4 +++- .../flink/statements-executor/src/main.go | 13 +------------ .../flink/statements-executor/src/tools.go | 8 -------- 7 files changed, 12 insertions(+), 31 deletions(-) diff --git a/backend/components/flink/helm/templates/result-sender/deployment.yaml b/backend/components/flink/helm/templates/result-sender/deployment.yaml index 193cd11a5..b35f3bafd 100644 --- a/backend/components/flink/helm/templates/result-sender/deployment.yaml +++ b/backend/components/flink/helm/templates/result-sender/deployment.yaml @@ -36,6 +36,8 @@ spec: env: - name: KAFKA_TOPIC_NAME value: {{ .Values.resultSender.topic }} + - name: API_COMMUNICATION_URL + value: {{ .Values.apiCommunicationUrl }} livenessProbe: httpGet: path: /actuator/health diff --git a/backend/components/flink/helm/templates/statements-executor/deployment.yaml b/backend/components/flink/helm/templates/statements-executor/deployment.yaml index fdf52ddb4..5d6d4c1dc 100644 --- a/backend/components/flink/helm/templates/statements-executor/deployment.yaml +++ b/backend/components/flink/helm/templates/statements-executor/deployment.yaml @@ -36,6 +36,8 @@ spec: env: - name: KAFKA_TOPIC_NAME value: {{ .Values.executor.topic }} + - name: FLINK_GATEWAY_URL + value: {{ .Values.gatewayUrl }} livenessProbe: httpGet: path: /actuator/health diff --git a/backend/components/flink/helm/values.yaml b/backend/components/flink/helm/values.yaml index db62a9851..61480685f 100644 --- a/backend/components/flink/helm/values.yaml +++ b/backend/components/flink/helm/values.yaml @@ -3,6 +3,8 @@ mandatory: false enabled: false port: 8080 resources: +gatewayUrl: "http://flink-jobmanager:8083" +apiCommunicationUrl: "http://api-communication/messages.send" executor: name: statements-executor image: connectors/flink/statements-executor diff --git a/backend/components/flink/result-sender/src/main.go b/backend/components/flink/result-sender/src/main.go index 9c358bf15..6f5fc32a6 100644 --- a/backend/components/flink/result-sender/src/main.go +++ b/backend/components/flink/result-sender/src/main.go @@ -13,23 +13,20 @@ import ( func main() { - // Create Kafka consumer to read the statements kafkaURL := os.Getenv("KAFKA_BROKERS") schemaRegistryURL := os.Getenv("KAFKA_SCHEMA_REGISTRY_URL") topicName := os.Getenv("KAFKA_TOPIC_NAME") systemToken := os.Getenv("systemToken") authUsername := os.Getenv("AUTH_JAAS_USERNAME") authPassword := os.Getenv("AUTH_JAAS_PASSWORD") - //timestamp := time.Now().Unix() - //strTimestamp := fmt.Sprintf("%d", timestamp) - groupID := "result-sender" //+ "-" + strTimestamp + groupID := "result-sender" + flinkGatewayURL := os.Getenv("FLINK_GATEWAY_URL") if kafkaURL == "" || schemaRegistryURL == "" || topicName == "" { fmt.Println("KAFKA_BROKERS, KAFKA_SCHEMA_REGISTRY_URL, and KAFKA_TOPIC_NAME environment variables must be set") return } - // Healthcheck http.HandleFunc("/actuator/health", func(w http.ResponseWriter, r *http.Request) { response := map[string]string{"status": "UP"} jsonResponse, err := json.Marshal(response) @@ -50,7 +47,6 @@ func main() { fmt.Println("Health-check started") - // Create Kafka consumer configuration fmt.Println("Creating Kafka consumer for topic: ", topicName) c, err := kafka.NewConsumer(&kafka.ConfigMap{ @@ -67,7 +63,6 @@ func main() { return } c.SubscribeTopics([]string{topicName}, nil) - // Channel for signals signals := make(chan os.Signal, 1) done := make(chan bool, 1) @@ -77,7 +72,6 @@ func main() { for { select { case sig := <-signals: - // If an interrupt signal is received, break the loop fmt.Printf("Caught signal %v: terminating\n", sig) done <- true return @@ -91,7 +85,6 @@ func main() { } else { fmt.Printf("Received message: %+v\n", flinkOutput) - flinkGatewayURL := "http://flink-jobmanager:8083" //"http://flink.us-east-2.aws.confluent.cloud/v1beta1/sql", Replace with your Flink Gateway URL https://flink.region.provider.confluent.cloud fmt.Println("Flink gateway: ", flinkGatewayURL) result, err := getFlinkResult(flinkGatewayURL, flinkOutput.SessionID) if err != nil { @@ -107,7 +100,6 @@ func main() { sendMessage(response, flinkOutput.ConversationID, systemToken) } } else { - // Log the error and continue fmt.Printf("Consumer error: %v\n", err) } } diff --git a/backend/components/flink/result-sender/src/tools.go b/backend/components/flink/result-sender/src/tools.go index 0ba7f1cb6..6be5dee2f 100644 --- a/backend/components/flink/result-sender/src/tools.go +++ b/backend/components/flink/result-sender/src/tools.go @@ -8,6 +8,7 @@ import ( "io" "io/ioutil" "net/http" + "os" "strings" "time" ) @@ -77,6 +78,7 @@ func getFlinkResult(url, sessionID string) (FlinkResult, error) { } func sendMessage(message string, conversationId string, systemToken string) (int, string, error) { + apiCommunicationUrl := os.Getenv("API_COMMUNICATION_URL") messageContent := messageContent{ Text: message, } @@ -90,7 +92,7 @@ func sendMessage(message string, conversationId string, systemToken string) (int return 0, "", errors.New("The message could not be encoded to JSON for sending.") } - req, err := http.NewRequest("POST", "http://api-communication/messages.send", bytes.NewReader(messageJSON)) + req, err := http.NewRequest("POST", apiCommunicationUrl, bytes.NewReader(messageJSON)) if err != nil { fmt.Printf("Error creating request: %v\n", err) return 0, "", errors.New("The message could not be sent.") diff --git a/backend/components/flink/statements-executor/src/main.go b/backend/components/flink/statements-executor/src/main.go index a1b60e883..89aea27e3 100644 --- a/backend/components/flink/statements-executor/src/main.go +++ b/backend/components/flink/statements-executor/src/main.go @@ -14,24 +14,21 @@ import ( func main() { - // Create Kafka consumer to read the statements kafkaURL := os.Getenv("KAFKA_BROKERS") schemaRegistryURL := os.Getenv("KAFKA_SCHEMA_REGISTRY_URL") topicName := os.Getenv("KAFKA_TOPIC_NAME") - //systemToken := os.Getenv("systemToken") authUsername := os.Getenv("AUTH_JAAS_USERNAME") authPassword := os.Getenv("AUTH_JAAS_PASSWORD") timestamp := time.Now().Unix() strTimestamp := fmt.Sprintf("%d", timestamp) groupID := "statement-executor-" + strTimestamp - //llmEndpoint := "http://llm-controller:5000/llm.proxy" + flinkGatewayURL := os.Getenv("FLINK_GATEWAY_URL") if kafkaURL == "" || schemaRegistryURL == "" || topicName == "" { fmt.Println("KAFKA_BROKERS, KAFKA_SCHEMA_REGISTRY_URL, and KAFKA_TOPIC_NAME environment variables must be set") return } - // Healthcheck http.HandleFunc("/actuator/health", func(w http.ResponseWriter, r *http.Request) { response := map[string]string{"status": "UP"} jsonResponse, err := json.Marshal(response) @@ -52,7 +49,6 @@ func main() { fmt.Println("Health-check started") - // Create Kafka consumer configuration fmt.Println("Creating Kafka consumer for topic: ", topicName) c, err := kafka.NewConsumer(&kafka.ConfigMap{ @@ -69,7 +65,6 @@ func main() { return } c.SubscribeTopics([]string{topicName}, nil) - // Channel for signals signals := make(chan os.Signal, 1) done := make(chan bool, 1) @@ -79,7 +74,6 @@ func main() { for { select { case sig := <-signals: - // If an interrupt signal is received, break the loop fmt.Printf("Caught signal %v: terminating\n", sig) done <- true return @@ -92,10 +86,7 @@ func main() { continue } else { fmt.Printf("Received message: %+v\n", statementSet) - - flinkGatewayURL := "http://flink-jobmanager:8083" //"http://flink.us-east-2.aws.confluent.cloud/v1beta1/sql", Replace with your Flink Gateway URL https://flink.region.provider.confluent.cloud fmt.Println("Flink gateway: ", flinkGatewayURL) - sessionID, err := sendFlinkSQL(flinkGatewayURL, statementSet) if err != nil { fmt.Println("Error running Flink statement:", err) @@ -110,11 +101,9 @@ func main() { err = produceFlinkOutput(flinkOutput, kafkaURL, "flink-producer-"+groupID, authUsername, authPassword) if err != nil { fmt.Printf("error producing message to Kafka: %v\n", err) - //sendMessage("I am sorry, I am unable to answer that question.", message.ConversationID, systemToken) } } } else { - // Log the error and continue fmt.Printf("Consumer error: %v\n", err) } } diff --git a/backend/components/flink/statements-executor/src/tools.go b/backend/components/flink/statements-executor/src/tools.go index 5021370e7..57ec01fda 100644 --- a/backend/components/flink/statements-executor/src/tools.go +++ b/backend/components/flink/statements-executor/src/tools.go @@ -13,9 +13,7 @@ import ( "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) -// sendFlinkSQL sends an SQL statement to the Flink Gateway func sendFlinkSQL(url string, statementSet FlinkStatementSet) (string, error) { - // Substitute placeholers with variables timestamp := time.Now().Unix() strTimestamp := fmt.Sprintf("%d", timestamp) replacements := map[string]string{ @@ -91,7 +89,6 @@ func sendFlinkSQL(url string, statementSet FlinkStatementSet) (string, error) { defer resp.Body.Close() } - // Handle the response (check if the request was successful) return sessionResponse.SessionHandle, nil } @@ -99,17 +96,14 @@ func produceFlinkOutput(flinkOutput FlinkOutput, kafkaURL, groupID, authUsername kafkaTopic := "flink.outputs" - // Marshal the query to JSON flinkOutputJSON, err := json.Marshal(flinkOutput) if err != nil { return fmt.Errorf("error marshaling query to JSON: %w", err) } - // Basic Kafka producer configuration configMap := kafka.ConfigMap{ "bootstrap.servers": kafkaURL, } - // Conditionally add SASL/SSL configurations if username and password are provided if authUsername != "" && authPassword != "" { configMap.SetKey("security.protocol", "SASL_SSL") configMap.SetKey("sasl.mechanisms", "PLAIN") @@ -117,14 +111,12 @@ func produceFlinkOutput(flinkOutput FlinkOutput, kafkaURL, groupID, authUsername configMap.SetKey("sasl.password", authPassword) } - // Create a new Kafka producer producer, err := kafka.NewProducer(&configMap) if err != nil { return fmt.Errorf("failed to create producer: %w", err) } defer producer.Close() - // Produce the message message := kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &kafkaTopic, Partition: kafka.PartitionAny}, Key: []byte(flinkOutput.SessionID),