From dfd57d8ae9c4831de3b9b735c7a4bb12c1c125bc Mon Sep 17 00:00:00 2001 From: ljupcovangelski Date: Wed, 12 Jul 2023 19:42:19 +0200 Subject: [PATCH] Add Kafka proxy to the airy-controller --- infrastructure/controller/pkg/endpoints/BUILD | 1 + .../controller/pkg/endpoints/proxy.go | 58 +++++++++++++++++++ .../controller/pkg/endpoints/server.go | 6 ++ .../templates/components/ingress.yaml | 2 +- 4 files changed, 66 insertions(+), 1 deletion(-) create mode 100644 infrastructure/controller/pkg/endpoints/proxy.go diff --git a/infrastructure/controller/pkg/endpoints/BUILD b/infrastructure/controller/pkg/endpoints/BUILD index fdcbf72037..dff54a6802 100644 --- a/infrastructure/controller/pkg/endpoints/BUILD +++ b/infrastructure/controller/pkg/endpoints/BUILD @@ -14,6 +14,7 @@ go_library( "cors.go", "server.go", "services.go", + "proxy.go", ], importpath = "github.com/airyhq/airy/infrastructure/controller/pkg/endpoints", visibility = ["//visibility:public"], diff --git a/infrastructure/controller/pkg/endpoints/proxy.go b/infrastructure/controller/pkg/endpoints/proxy.go new file mode 100644 index 0000000000..8dc0f90afc --- /dev/null +++ b/infrastructure/controller/pkg/endpoints/proxy.go @@ -0,0 +1,58 @@ +package endpoints + +import ( + "log" + "net/http" + "net/http/httputil" + "net/url" + "strings" + + "k8s.io/client-go/kubernetes" + "k8s.io/helm/cmd/helm/search" +) + +type proxyTarget struct { + name string + url string + stripUri string +} + +type KafkaSubjects struct { + ClientSet *kubernetes.Clientset + Namespace string + Index *search.Index +} + +type KafkaTopics struct { + ClientSet *kubernetes.Clientset + Namespace string + Index *search.Index +} + +func (s *KafkaSubjects) ServeHTTP(w http.ResponseWriter, r *http.Request) { + var proxyUpstream = proxyTarget{"subjects", "http://schema-registry:8081/subjects", "/kafka/subjects"} + proxyRequest(w, r, proxyUpstream) +} + +func (s *KafkaTopics) ServeHTTP(w http.ResponseWriter, r *http.Request) { + var proxyUpstream = proxyTarget{"subjects", "http://schema-registry:8082/topics", "/kafka/topics"} + proxyRequest(w, r, proxyUpstream) +} + +func proxyRequest(w http.ResponseWriter, r *http.Request, proxyUpstream proxyTarget) { + target, err := url.Parse(proxyUpstream.url) + if err != nil { + log.Printf("Error parsing target URL: %v\n", err) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + proxy := httputil.NewSingleHostReverseProxy(target) + r.URL.Host = target.Host + r.URL.Scheme = target.Scheme + r.Header.Set("X-Forwarded-Host", r.Header.Get("Host")) + r.Host = target.Host + r.URL.Path = strings.TrimPrefix(r.URL.Path, proxyUpstream.stripUri) + + proxy.ServeHTTP(w, r) +} diff --git a/infrastructure/controller/pkg/endpoints/server.go b/infrastructure/controller/pkg/endpoints/server.go index 27e24c426c..b3c68443fe 100644 --- a/infrastructure/controller/pkg/endpoints/server.go +++ b/infrastructure/controller/pkg/endpoints/server.go @@ -70,6 +70,12 @@ func Serve(clientSet *kubernetes.Clientset, namespace string, kubeConfig *rest.C componentsList := ComponentsList{ClientSet: clientSet, Namespace: namespace, Index: helmIndex} r.Handle("/components.list", &componentsList) + kafkaSubjects := KafkaSubjects{ClientSet: clientSet, Namespace: namespace, Index: helmIndex} + r.Handle("/kafka/subjects", &kafkaSubjects) + + kafkaTopics := KafkaTopics{ClientSet: clientSet, Namespace: namespace, Index: helmIndex} + r.Handle("/kafka/topics", &kafkaTopics) + log.Fatal(http.ListenAndServe(":8080", r)) } diff --git a/infrastructure/helm-chart/templates/components/ingress.yaml b/infrastructure/helm-chart/templates/components/ingress.yaml index b4ddcdb3db..5a65593cd7 100644 --- a/infrastructure/helm-chart/templates/components/ingress.yaml +++ b/infrastructure/helm-chart/templates/components/ingress.yaml @@ -82,7 +82,7 @@ spec: pathType: Prefix backend: service: - name: api-admin + name: airy-controller port: number: 80 - path: /kafka