Skip to content

Commit

Permalink
Fall back to polling the supervisor for apiserver addresses when the …
Browse files Browse the repository at this point in the history
…watch fails

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
  • Loading branch information
brandond committed Nov 22, 2024
1 parent 9c585eb commit bb6b0cb
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 18 deletions.
25 changes: 14 additions & 11 deletions pkg/agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,24 @@ func KubeProxyDisabled(ctx context.Context, node *config.Node, proxy proxy.Proxy
return disabled
}

// APIServers returns a list of apiserver endpoints, suitable for seeding client loadbalancer configurations.
// WaitForAPIServers returns a list of apiserver endpoints, suitable for seeding client loadbalancer configurations.
// This function will block until it can return a populated list of apiservers, or if the remote server returns
// an error (indicating that it does not support this functionality).
func APIServers(ctx context.Context, node *config.Node, proxy proxy.Proxy) []string {
func WaitForAPIServers(ctx context.Context, node *config.Node, proxy proxy.Proxy) []string {
var addresses []string
var info *clientaccess.Info
var err error

_ = wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) {
addresses, err = getAPIServers(ctx, node, proxy)
if info == nil {
withCert := clientaccess.WithClientCertificate(node.AgentConfig.ClientKubeletCert, node.AgentConfig.ClientKubeletKey)
info, err = clientaccess.ParseAndValidateToken(proxy.SupervisorURL(), node.Token, withCert)
if err != nil {
logrus.Warnf("Failed to validate server token: %v", err)
return false, nil
}
}
addresses, err = GetAPIServers(ctx, info)
if err != nil {
logrus.Infof("Failed to retrieve list of apiservers from server: %v", err)
return false, err
Expand Down Expand Up @@ -772,14 +781,8 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N
return nodeConfig, nil
}

// getAPIServers attempts to return a list of apiservers from the server.
func getAPIServers(ctx context.Context, node *config.Node, proxy proxy.Proxy) ([]string, error) {
withCert := clientaccess.WithClientCertificate(node.AgentConfig.ClientKubeletCert, node.AgentConfig.ClientKubeletKey)
info, err := clientaccess.ParseAndValidateToken(proxy.SupervisorURL(), node.Token, withCert)
if err != nil {
return nil, err
}

// GetAPIServers attempts to return a list of apiservers from the server.
func GetAPIServers(ctx context.Context, info *clientaccess.Info) ([]string, error) {
data, err := info.Get("/v1-" + version.Program + "/apiservers")
if err != nil {
return nil, err
Expand Down
41 changes: 34 additions & 7 deletions pkg/agent/tunnel/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
agentconfig "github.com/k3s-io/k3s/pkg/agent/config"
"github.com/k3s-io/k3s/pkg/agent/loadbalancer"
"github.com/k3s-io/k3s/pkg/agent/proxy"
"github.com/k3s-io/k3s/pkg/clientaccess"
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
Expand Down Expand Up @@ -138,17 +139,18 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
// connecting to. If that fails, fall back to querying the endpoints list from Kubernetes. This
// fallback requires that the server we're joining be running an apiserver, but is the only safe
// thing to do if its supervisor is down-level and can't provide us with an endpoint list.
addresses := agentconfig.APIServers(ctx, config, proxy)
logrus.Infof("Got apiserver addresses from supervisor: %v", addresses)

addresses := agentconfig.WaitForAPIServers(ctx, config, proxy)
if len(addresses) > 0 {
logrus.Infof("Got apiserver addresses from supervisor: %v", addresses)
if localSupervisorDefault {
proxy.SetSupervisorDefault(addresses[0])
}
proxy.Update(addresses)
} else {
if endpoint, _ := client.CoreV1().Endpoints(metav1.NamespaceDefault).Get(ctx, "kubernetes", metav1.GetOptions{}); endpoint != nil {
addresses = util.GetAddresses(endpoint)
if endpoint, err := client.CoreV1().Endpoints(metav1.NamespaceDefault).Get(ctx, "kubernetes", metav1.GetOptions{}); err != nil {
logrus.Errorf("Failed to get apiserver addresses from kubernetes endpoints: %v", err)
} else {
addresses := util.GetAddresses(endpoint)
logrus.Infof("Got apiserver addresses from kubernetes endpoints: %v", addresses)
if len(addresses) > 0 {
proxy.Update(addresses)
Expand All @@ -159,7 +161,7 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er

wg := &sync.WaitGroup{}

go tunnel.watchEndpoints(ctx, apiServerReady, wg, tlsConfig, proxy)
go tunnel.watchEndpoints(ctx, apiServerReady, wg, tlsConfig, config, proxy)

wait := make(chan int, 1)
go func() {
Expand Down Expand Up @@ -302,9 +304,10 @@ func (a *agentTunnel) watchPods(ctx context.Context, apiServerReady <-chan struc
// WatchEndpoints attempts to create tunnels to all supervisor addresses. Once the
// apiserver is up, go into a watch loop, adding and removing tunnels as endpoints come
// and go from the cluster.
func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan struct{}, wg *sync.WaitGroup, tlsConfig *tls.Config, proxy proxy.Proxy) {
func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan struct{}, wg *sync.WaitGroup, tlsConfig *tls.Config, node *daemonconfig.Node, proxy proxy.Proxy) {
// Attempt to connect to supervisors, storing their cancellation function for later when we
// need to disconnect.
var info *clientaccess.Info
disconnect := map[string]context.CancelFunc{}
for _, address := range proxy.SupervisorAddresses() {
if _, ok := disconnect[address]; !ok {
Expand All @@ -315,10 +318,34 @@ func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan
}

<-apiServerReady

refreshFromSupervisor := func() {
if info == nil {
var err error
withCert := clientaccess.WithClientCertificate(node.AgentConfig.ClientKubeletCert, node.AgentConfig.ClientKubeletKey)
info, err = clientaccess.ParseAndValidateToken(proxy.SupervisorURL(), node.Token, withCert)
if err != nil {
logrus.Warnf("Failed to validate server token: %v", err)
return
}
}

if addresses, err := agentconfig.GetAPIServers(ctx, info); err != nil {
logrus.Warnf("Failed to get apiserver addresses from supervisor: %v", err)
} else {
proxy.Update(addresses)
}
}

endpoints := a.client.CoreV1().Endpoints(metav1.NamespaceDefault)
fieldSelector := fields.Set{metav1.ObjectNameField: "kubernetes"}.String()
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
// if we're being called to re-list, then likely there was an
// interruption to the apiserver connection and the listwatch is retrying
// its connection. This is a good suggestion that it might be necessary
// to refresh the apiserver address from the supervisor.
go refreshFromSupervisor()
options.FieldSelector = fieldSelector
return endpoints.List(ctx, options)
},
Expand Down

0 comments on commit bb6b0cb

Please sign in to comment.