Skip to content

Commit

Permalink
Add Kafka proxy to the airy-controller
Browse files Browse the repository at this point in the history
  • Loading branch information
ljupcovangelski committed Jul 12, 2023
1 parent a3445e7 commit dfd57d8
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 1 deletion.
1 change: 1 addition & 0 deletions infrastructure/controller/pkg/endpoints/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"cors.go",
"server.go",
"services.go",
"proxy.go",
],
importpath = "github.com/airyhq/airy/infrastructure/controller/pkg/endpoints",
visibility = ["//visibility:public"],
Expand Down
58 changes: 58 additions & 0 deletions infrastructure/controller/pkg/endpoints/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package endpoints

import (
"log"
"net/http"
"net/http/httputil"
"net/url"
"strings"

"k8s.io/client-go/kubernetes"
"k8s.io/helm/cmd/helm/search"
)

type proxyTarget struct {
name string
url string
stripUri string
}

type KafkaSubjects struct {
ClientSet *kubernetes.Clientset
Namespace string
Index *search.Index
}

type KafkaTopics struct {
ClientSet *kubernetes.Clientset
Namespace string
Index *search.Index
}

func (s *KafkaSubjects) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var proxyUpstream = proxyTarget{"subjects", "http://schema-registry:8081/subjects", "/kafka/subjects"}
proxyRequest(w, r, proxyUpstream)
}

func (s *KafkaTopics) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var proxyUpstream = proxyTarget{"subjects", "http://schema-registry:8082/topics", "/kafka/topics"}
proxyRequest(w, r, proxyUpstream)
}

func proxyRequest(w http.ResponseWriter, r *http.Request, proxyUpstream proxyTarget) {
target, err := url.Parse(proxyUpstream.url)
if err != nil {
log.Printf("Error parsing target URL: %v\n", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}

proxy := httputil.NewSingleHostReverseProxy(target)
r.URL.Host = target.Host
r.URL.Scheme = target.Scheme
r.Header.Set("X-Forwarded-Host", r.Header.Get("Host"))
r.Host = target.Host
r.URL.Path = strings.TrimPrefix(r.URL.Path, proxyUpstream.stripUri)

proxy.ServeHTTP(w, r)
}
6 changes: 6 additions & 0 deletions infrastructure/controller/pkg/endpoints/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ func Serve(clientSet *kubernetes.Clientset, namespace string, kubeConfig *rest.C
componentsList := ComponentsList{ClientSet: clientSet, Namespace: namespace, Index: helmIndex}
r.Handle("/components.list", &componentsList)

kafkaSubjects := KafkaSubjects{ClientSet: clientSet, Namespace: namespace, Index: helmIndex}
r.Handle("/kafka/subjects", &kafkaSubjects)

kafkaTopics := KafkaTopics{ClientSet: clientSet, Namespace: namespace, Index: helmIndex}
r.Handle("/kafka/topics", &kafkaTopics)

log.Fatal(http.ListenAndServe(":8080", r))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ spec:
pathType: Prefix
backend:
service:
name: api-admin
name: airy-controller
port:
number: 80
- path: /kafka
Expand Down

0 comments on commit dfd57d8

Please sign in to comment.