Skip to content

Commit

Permalink
implement leader election in system:admin workspace
Browse files Browse the repository at this point in the history
Signed-off-by: Marvin Beckers <marvin@kubermatic.com>
  • Loading branch information
embik committed Sep 22, 2023
1 parent 65a0a28 commit ac8fe7a
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 2 deletions.
13 changes: 13 additions & 0 deletions pkg/server/options/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/spf13/pflag"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/keyutil"
"k8s.io/klog/v2"
kcmoptions "k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
Expand All @@ -34,6 +35,10 @@ type Controllers struct {
EnableAll bool
IndividuallyEnabled []string

EnableLeaderElection bool
LeaderElectionNamespace string
LeaderElectionName string

SAController kcmoptions.SAControllerOptions
}

Expand All @@ -52,6 +57,10 @@ func NewControllers() *Controllers {
return &Controllers{
EnableAll: true,

EnableLeaderElection: false,
LeaderElectionNamespace: metav1.NamespaceSystem,
LeaderElectionName: "kcp-controllers",

SAController: *kcmDefaults.SAController,
}
}
Expand All @@ -62,6 +71,10 @@ func (c *Controllers) AddFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&c.IndividuallyEnabled, "unsupported-run-individual-controllers", c.IndividuallyEnabled, "Run individual controllers in-process. The controller names can change at any time.")
fs.MarkHidden("unsupported-run-individual-controllers") //nolint:errcheck

fs.BoolVar(&c.EnableLeaderElection, "enable-leader-election", c.EnableLeaderElection, "Enable a leader election for kcp controllers running in the system:admin workspace")
fs.StringVar(&c.LeaderElectionNamespace, "leader-election-namespace", c.LeaderElectionNamespace, "Namespace in system:admin workspace to use for leader election")
fs.StringVar(&c.LeaderElectionName, "leader-election-name", c.LeaderElectionName, "Name of the lease to use for leader election")

c.SAController.AddFlags(fs)
}

Expand Down
89 changes: 87 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,24 @@ package server

import (
"context"
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"time"

"github.com/kcp-dev/logicalcluster/v3"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/genericcontrolplane"

Expand Down Expand Up @@ -138,7 +143,7 @@ func (s *Server) Run(ctx context.Context) error {

hookName := "kcp-start-informers"
if err := s.AddPostStartHook(hookName, func(hookContext genericapiserver.PostStartHookContext) error {
logger := logger.WithValues("postStartHook", hookName)
logger = logger.WithValues("postStartHook", hookName)
ctx = klog.NewContext(ctx, logger)

logger.Info("starting kube informers")
Expand Down Expand Up @@ -499,7 +504,11 @@ func (s *Server) Run(ctx context.Context) error {
logger := klog.FromContext(ctx).WithValues("postStartHook", "kcp-start-controllers")

s.WaitForPhase1Finished()
s.startControllers(goContext(hookContext), logger)
if s.Options.Controllers.EnableLeaderElection {
go s.leaderElectAndStartControllers(goContext(hookContext))
} else {
s.startControllers(goContext(hookContext), logger)
}

return nil
}); err != nil {
Expand Down Expand Up @@ -532,3 +541,79 @@ func goContext(parent genericapiserver.PostStartHookContext) context.Context {
func (s *Server) WaitForPhase1Finished() {
<-s.rootPhase1FinishedCh
}

func (s *Server) leaderElectAndStartControllers(ctx context.Context) {
logger := klog.FromContext(ctx)

leaderElectionConfig := rest.CopyConfig(s.GenericConfig.LoopbackClientConfig)
leaderElectionConfig = rest.AddUserAgent(leaderElectionConfig, "kcp-leader-election")
// TODO(embik): is there a better way to access / generate the system:admin workspace url?
leaderElectionConfig.Host = fmt.Sprintf("%s/clusters/system:admin", leaderElectionConfig.Host)

hostname, err := os.Hostname()
if err != nil {
logger.Error(err, "failed to determine hostname")
return
}

id := fmt.Sprintf("%s_%s", hostname, string(uuid.NewUUID()))

rl, err := resourcelock.NewFromKubeconfig("leases",
s.Options.Controllers.LeaderElectionNamespace,
s.Options.Controllers.LeaderElectionName,
resourcelock.ResourceLockConfig{
Identity: id,
},
leaderElectionConfig,
time.Second*30,
)

if err != nil {
logger.Error(err, "failed to set up resource lock")
return
}

logger = logger.WithValues("identity", id)
electionLogger := logger.WithValues("namespace", s.Options.Controllers.LeaderElectionNamespace, "name", s.Options.Controllers.LeaderElectionName)

// for the whole runtime of the kcp process, we want to try becoming the leader to run
// the kcp controllers. Even if we once were leader and lost that lock, we want to
// restart the leader election (thus the loop) until the complete process terminates.
loop:
for {
select {
case <-ctx.Done():
electionLogger.Info("context canceled, will not retry leader election")
break loop
default:
electionLogger.Info("(re-)starting leader election")
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: time.Second * 60,
RenewDeadline: time.Second * 5,
RetryPeriod: time.Second * 2,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(leaderElectionCtx context.Context) {
electionLogger.Info("started leading")
s.startControllers(leaderElectionCtx, logger)
},
OnStoppedLeading: func() {
electionLogger.Info("stopped leading")
},
OnNewLeader: func(current_id string) {
if current_id == id {
// we already log when we start leading, no reason to also log something here.
return
}

electionLogger.WithValues("leader", current_id).Info("leader is someone else")
},
},
WatchDog: leaderelection.NewLeaderHealthzAdaptor(time.Second * 5),
Name: s.Options.Controllers.LeaderElectionName,
})
}
}

electionLogger.Info("leader election loop has been terminated")
}

0 comments on commit ac8fe7a

Please sign in to comment.