Skip to content

Commit

Permalink
fix: improve discovery implement. (#176)
Browse files Browse the repository at this point in the history
  • Loading branch information
mo3et authored Dec 12, 2024
1 parent 1b4cebe commit fe2a5ec
Showing 1 changed file with 70 additions and 17 deletions.
87 changes: 70 additions & 17 deletions discovery/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package kubernetes
import (
"context"
"fmt"
"log"
"os"
"sync"
"time"

Expand All @@ -21,6 +23,7 @@ type KubernetesConnManager struct {
namespace string
dialOptions []grpc.DialOption

rpcTargets map[string]string
selfTarget string

mu sync.RWMutex
Expand Down Expand Up @@ -62,11 +65,14 @@ func (k *KubernetesConnManager) initializeConns(serviceName string) error {
return fmt.Errorf("failed to get endpoints for service %s: %v", serviceName, err)
}

// fmt.Println("Endpoints:", endpoints, "endpoints.Subsets:", endpoints.Subsets)

var conns []*grpc.ClientConn
for _, subset := range endpoints.Subsets {
for _, address := range subset.Addresses {
target := fmt.Sprintf("%s:%d", address.IP, port)
conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
// fmt.Println("IP target:", target)
conn, err := grpc.Dial(target, append(k.dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
if err != nil {
return fmt.Errorf("failed to dial endpoint %s: %v", target, err)
}
Expand All @@ -75,34 +81,32 @@ func (k *KubernetesConnManager) initializeConns(serviceName string) error {
}

k.mu.Lock()
defer k.mu.Unlock()
k.connMap[serviceName] = conns

// go k.watchEndpoints(serviceName)
k.mu.Unlock()

return nil
}

// GetConns returns gRPC client connections for a given Kubernetes service name.
func (k *KubernetesConnManager) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
k.mu.RLock()
conns, exists := k.connMap[serviceName]
defer k.mu.RUnlock()

conns, exists := k.connMap[serviceName]
k.mu.RUnlock()
if exists {
return conns, nil
}

k.mu.Lock()
defer k.mu.Unlock()

// Check if another goroutine has already initialized the connections when we released the read lock
conns, exists = k.connMap[serviceName]
if exists {
return conns, nil
}
k.mu.Unlock()

if err := k.initializeConns(serviceName); err != nil {
fmt.Println("Failed to initialize connections:", err)
return nil, fmt.Errorf("failed to initialize connections for service %s: %v", serviceName, err)
}

Expand All @@ -111,26 +115,64 @@ func (k *KubernetesConnManager) GetConns(ctx context.Context, serviceName string

// GetConn returns a single gRPC client connection for a given Kubernetes service name.
func (k *KubernetesConnManager) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
port, err := k.getServicePort(serviceName)
if err != nil {
return nil, err
}
var target string

fmt.Println("SVC port:", port)
if k.rpcTargets[serviceName] == "" {
var err error

svcPort, err := k.getServicePort(serviceName)
if err != nil {
return nil, err
}

target := fmt.Sprintf("%s.%s.svc.cluster.local:%d", serviceName, k.namespace, port)
target = fmt.Sprintf("%s.%s.svc.cluster.local:%d", serviceName, k.namespace, svcPort)

fmt.Println("SVC target:", target)
// fmt.Println("SVC target:", target)
} else {
target = k.rpcTargets[serviceName]
}

return grpc.DialContext(
ctx,
target,
append([]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, k.dialOptions...)...,
append([]grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024*10), grpc.MaxCallSendMsgSize(1024*1024*20)),
}, k.dialOptions...)...,
)
}

// GetSelfConnTarget returns the connection target for the current service.
func (k *KubernetesConnManager) GetSelfConnTarget() string {
if k.selfTarget == "" {
hostName := os.Getenv("HOSTNAME")

pod, err := k.clientset.CoreV1().Pods(k.namespace).Get(context.Background(), hostName, metav1.GetOptions{})
if err != nil {
log.Printf("failed to get pod %s: %v \n", hostName, err)
}

for pod.Status.PodIP == "" {
pod, err = k.clientset.CoreV1().Pods(k.namespace).Get(context.TODO(), hostName, metav1.GetOptions{})
if err != nil {
log.Printf("Error getting pod: %v \n", err)
}

time.Sleep(3 * time.Second)
}

var selfPort int32

for _, port := range pod.Spec.Containers[0].Ports {
if port.ContainerPort != 10001 {
selfPort = port.ContainerPort
break
}
}

k.selfTarget = fmt.Sprintf("%s:%d", pod.Status.PodIP, selfPort)
}

return k.selfTarget
}

Expand Down Expand Up @@ -161,6 +203,7 @@ func (k *KubernetesConnManager) Close() {
func (k *KubernetesConnManager) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
return nil
}

func (k *KubernetesConnManager) UnRegister() error {
return nil
}
Expand All @@ -170,6 +213,8 @@ func (k *KubernetesConnManager) GetUserIdHashGatewayHost(ctx context.Context, us
}

func (k *KubernetesConnManager) getServicePort(serviceName string) (int32, error) {
var svcPort int32

svc, err := k.clientset.CoreV1().Services(k.namespace).Get(context.Background(), serviceName, metav1.GetOptions{})
if err != nil {
fmt.Print("namespace:", k.namespace)
Expand All @@ -180,7 +225,15 @@ func (k *KubernetesConnManager) getServicePort(serviceName string) (int32, error
return 0, fmt.Errorf("service %s has no ports defined", serviceName)
}

return svc.Spec.Ports[0].Port, nil
for _, port := range svc.Spec.Ports {
// fmt.Println(serviceName, " Now Get Port:", port.Port)
if port.Port != 10001 {
svcPort = port.Port
break
}
}

return svcPort, nil
}

// watchEndpoints listens for changes in Pod resources.
Expand Down

0 comments on commit fe2a5ec

Please sign in to comment.