From ac8fe7ab858466c864e5b533222d38beb9e1a999 Mon Sep 17 00:00:00 2001 From: Marvin Beckers Date: Fri, 14 Jul 2023 10:26:35 +0200 Subject: [PATCH] implement leader election in system:admin workspace Signed-off-by: Marvin Beckers --- pkg/server/options/controllers.go | 13 +++++ pkg/server/server.go | 89 ++++++++++++++++++++++++++++++- 2 files changed, 100 insertions(+), 2 deletions(-) diff --git a/pkg/server/options/controllers.go b/pkg/server/options/controllers.go index f03f01a8f800..eb511e3b188e 100644 --- a/pkg/server/options/controllers.go +++ b/pkg/server/options/controllers.go @@ -25,6 +25,7 @@ import ( "github.com/spf13/pflag" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/util/keyutil" "k8s.io/klog/v2" kcmoptions "k8s.io/kubernetes/cmd/kube-controller-manager/app/options" @@ -34,6 +35,10 @@ type Controllers struct { EnableAll bool IndividuallyEnabled []string + EnableLeaderElection bool + LeaderElectionNamespace string + LeaderElectionName string + SAController kcmoptions.SAControllerOptions } @@ -52,6 +57,10 @@ func NewControllers() *Controllers { return &Controllers{ EnableAll: true, + EnableLeaderElection: false, + LeaderElectionNamespace: metav1.NamespaceSystem, + LeaderElectionName: "kcp-controllers", + SAController: *kcmDefaults.SAController, } } @@ -62,6 +71,10 @@ func (c *Controllers) AddFlags(fs *pflag.FlagSet) { fs.StringSliceVar(&c.IndividuallyEnabled, "unsupported-run-individual-controllers", c.IndividuallyEnabled, "Run individual controllers in-process. The controller names can change at any time.") fs.MarkHidden("unsupported-run-individual-controllers") //nolint:errcheck + fs.BoolVar(&c.EnableLeaderElection, "enable-leader-election", c.EnableLeaderElection, "Enable a leader election for kcp controllers running in the system:admin workspace") + fs.StringVar(&c.LeaderElectionNamespace, "leader-election-namespace", c.LeaderElectionNamespace, "Namespace in system:admin workspace to use for leader election") + fs.StringVar(&c.LeaderElectionName, "leader-election-name", c.LeaderElectionName, "Name of the lease to use for leader election") + c.SAController.AddFlags(fs) } diff --git a/pkg/server/server.go b/pkg/server/server.go index a44921afdf98..d225624c8daa 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -18,8 +18,10 @@ package server import ( "context" + "fmt" "net/http" _ "net/http/pprof" + "os" "time" "github.com/kcp-dev/logicalcluster/v3" @@ -27,10 +29,13 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/genericcontrolplane" @@ -138,7 +143,7 @@ func (s *Server) Run(ctx context.Context) error { hookName := "kcp-start-informers" if err := s.AddPostStartHook(hookName, func(hookContext genericapiserver.PostStartHookContext) error { - logger := logger.WithValues("postStartHook", hookName) + logger = logger.WithValues("postStartHook", hookName) ctx = klog.NewContext(ctx, logger) logger.Info("starting kube informers") @@ -499,7 +504,11 @@ func (s *Server) Run(ctx context.Context) error { logger := klog.FromContext(ctx).WithValues("postStartHook", "kcp-start-controllers") s.WaitForPhase1Finished() - s.startControllers(goContext(hookContext), logger) + if s.Options.Controllers.EnableLeaderElection { + go s.leaderElectAndStartControllers(goContext(hookContext)) + } else { + s.startControllers(goContext(hookContext), logger) + } return nil }); err != nil { @@ -532,3 +541,79 @@ func goContext(parent genericapiserver.PostStartHookContext) context.Context { func (s *Server) WaitForPhase1Finished() { <-s.rootPhase1FinishedCh } + +func (s *Server) leaderElectAndStartControllers(ctx context.Context) { + logger := klog.FromContext(ctx) + + leaderElectionConfig := rest.CopyConfig(s.GenericConfig.LoopbackClientConfig) + leaderElectionConfig = rest.AddUserAgent(leaderElectionConfig, "kcp-leader-election") + // TODO(embik): is there a better way to access / generate the system:admin workspace url? + leaderElectionConfig.Host = fmt.Sprintf("%s/clusters/system:admin", leaderElectionConfig.Host) + + hostname, err := os.Hostname() + if err != nil { + logger.Error(err, "failed to determine hostname") + return + } + + id := fmt.Sprintf("%s_%s", hostname, string(uuid.NewUUID())) + + rl, err := resourcelock.NewFromKubeconfig("leases", + s.Options.Controllers.LeaderElectionNamespace, + s.Options.Controllers.LeaderElectionName, + resourcelock.ResourceLockConfig{ + Identity: id, + }, + leaderElectionConfig, + time.Second*30, + ) + + if err != nil { + logger.Error(err, "failed to set up resource lock") + return + } + + logger = logger.WithValues("identity", id) + electionLogger := logger.WithValues("namespace", s.Options.Controllers.LeaderElectionNamespace, "name", s.Options.Controllers.LeaderElectionName) + + // for the whole runtime of the kcp process, we want to try becoming the leader to run + // the kcp controllers. Even if we once were leader and lost that lock, we want to + // restart the leader election (thus the loop) until the complete process terminates. +loop: + for { + select { + case <-ctx.Done(): + electionLogger.Info("context canceled, will not retry leader election") + break loop + default: + electionLogger.Info("(re-)starting leader election") + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: rl, + LeaseDuration: time.Second * 60, + RenewDeadline: time.Second * 5, + RetryPeriod: time.Second * 2, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(leaderElectionCtx context.Context) { + electionLogger.Info("started leading") + s.startControllers(leaderElectionCtx, logger) + }, + OnStoppedLeading: func() { + electionLogger.Info("stopped leading") + }, + OnNewLeader: func(current_id string) { + if current_id == id { + // we already log when we start leading, no reason to also log something here. + return + } + + electionLogger.WithValues("leader", current_id).Info("leader is someone else") + }, + }, + WatchDog: leaderelection.NewLeaderHealthzAdaptor(time.Second * 5), + Name: s.Options.Controllers.LeaderElectionName, + }) + } + } + + electionLogger.Info("leader election loop has been terminated") +}