Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Service LoadBalancer kube-controller #9221

Open
wants to merge 14 commits into
base: master
Choose a base branch
from

Conversation

MichalFupso
Copy link
Contributor

Description

Related issues/PRs

Todos

  • Tests
  • Documentation
  • Release note

Release Note

TBD

Reminder for the reviewer

Make sure that this PR has the correct labels and milestone set.

Every PR needs one docs-* label.

  • docs-pr-required: This change requires a change to the documentation that has not been completed yet.
  • docs-completed: This change has all necessary documentation completed.
  • docs-not-required: This change has no user-facing impact and requires no docs.

Every PR needs one release-note-* label.

  • release-note-required: This PR has user-facing changes. Most PRs should have this label.
  • release-note-not-required: This PR has no user-facing changes.

Other optional labels:

  • cherry-pick-candidate: This PR should be cherry-picked to an earlier release. For bug fixes only.
  • needs-operator-pr: This PR is related to install and requires a corresponding change to the operator.

@MichalFupso MichalFupso requested a review from a team as a code owner September 12, 2024 01:58
@marvin-tigera marvin-tigera added this to the Calico v3.29.0 milestone Sep 12, 2024
@marvin-tigera marvin-tigera added release-note-required Change has user-facing impact (no matter how small) docs-pr-required Change is not yet documented labels Sep 12, 2024
@MichalFupso MichalFupso added the needs-operator-pr PRs that require follow-on operator work label Sep 12, 2024
Copy link
Member

@caseydavenport caseydavenport left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MichalFupso first pass done - there are some structural changes we need to make to the LB controller in kube-controllers but I don't think they are going to be difficult.

Mostly a matter of aligning patterns to get rid of the ResourceCache and instead consistently use SharedIndexInformer for Services and a common DataFeed for Calico resources.

I'm going to keep reviewing in more detail, but wanted to get the larger changes back to yoru ASAP.

api/pkg/apis/projectcalico/v3/ippool.go Show resolved Hide resolved
api/pkg/apis/projectcalico/v3/kubecontrollersconfig.go Outdated Show resolved Hide resolved
calicoctl/calicoctl/commands/ipam/check.go Outdated Show resolved Hide resolved
kube-controllers/pkg/config/config.go Outdated Show resolved Hide resolved
@@ -283,7 +283,8 @@ func convertIpPoolFromStorage(pool *apiv3.IPPool) error {
pool.Spec.VXLANMode = apiv3.VXLANModeNever
}

if pool.Spec.AssignmentMode == "" {
assignmentMode := pool.Spec.AssignmentMode
if &assignmentMode == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think something is off here - it doesn't make sense to check if the address of a variable is nil.

@@ -320,6 +307,9 @@ func (c ipamClient) determinePools(ctx context.Context, requestedPoolNets []net.
// We only want to use IP pools which actually match this node, so do a filter based on
// selector.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment needs updating here as well

@@ -1693,6 +1683,17 @@ func (c ipamClient) releaseByHandle(ctx context.Context, blockCIDR net.IPNet, op
if err = c.ensureConsistentAffinity(ctx, block.AllocationBlock); err != nil {
logCtx.WithError(err).Warn("Error ensuring consistent affinity but IP already released. Returning no error.")
}

// If this is loadBalancer we delete the block without waiting
if *block.Affinity == "virtual:virtual-load-balancer" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should define this as a constant.

@@ -211,7 +211,11 @@ func (rw blockReaderWriter) claimAffineBlock(ctx context.Context, aff *model.KVP
logCtx := log.WithFields(log.Fields{"host": host, "subnet": subnet})

// Create the new block.
affinityKeyStr := "host:" + host
prefix := "host:"
if host == v3.VirtualLoadBalancer {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this quite solves the problem - we're still effectively keying off the hostname in order to determine this.

So, someone can still make a Kubernetes node in their cluster that triggers this code.

I think we need to plumb this knowledge down through the IPAM code from the caller somehow, so that we can make sure that only the LB controller is ever generating affinities in this way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, we could:

  • use the IntendedUse field of the AutoAssignArgs (and add it to AssignIPArgs)
  • infer this from the provided Attrs map on AutoAssignArgs / AssignIPArgs
  • add a new AffinityType: Host | Virtual option to those XArgs structs.

I think my preference is the first of those three (at least for now, while we only have a single scenario where it's possible for have a non-host affinity).

If we ever end up making this more flexible, then option #3 might be better - but until then best not to enable combinations that we don't want.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to consider might be to replace all of the host string arguments passed around to something more generic - e.g.,

type affinityConfig struct {
  type affinityType

  // The affinity value - a hostname for "host" type affinities. 
  value string
}
func (c ipamClient) ensureBlock(ctx context.Context, rsvdAttr *HostReservedAttr, requestedPools []net.IPNet, version int, host string) (*net.IPNet, error) {

Would become

func (c ipamClient) ensureBlock(ctx context.Context, rsvdAttr *HostReservedAttr, requestedPools []net.IPNet, version int, affinityCfg affinityConfig) (*net.IPNet, error) {

This would at least help ensure we're passing the necessary context wherever it might be needed.

if svcNew.Spec.Type != v1.ServiceTypeLoadBalancer &&
svcOld.Spec.Type == v1.ServiceTypeLoadBalancer {
ccache.Delete(createHandle(svcOld))
lbc.syncerUpdates <- serviceObject{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syncerUpdates implies that the updates are coming from the syncer, but these are actually coming from the informer

ccache.Delete(createHandle(svcOld))
lbc.syncerUpdates <- serviceObject{
updateType: serviceUpdateTypeDELETE,
}
} else if svcOld.Annotations[annotationIpv4Pools] != svcNew.Annotations[annotationIpv4Pools] ||
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's going to be a lot cleaner to keep all of the logic about whether or not we should act on updates within the reconcile loop - when we get an update, let's just send it through as an update. The other side of the channel should be doing all of the processing.

So basically, this branch should look something like this:

UpdateFunc: func(oldObj, newObj interface{}) {
  // Tell the main loop that this service has changed.
  lbc.serviceUpdates <- NamespacedName{Name: newObj.Name, Namespace: newObj.Namespace}
}

The main loop can decide if it cares about this update, it can decide to batch actions, etc., etc.

It doesn't even need to cache the object - the informer already has a backing cache that is accessible and updated at this point, so we just need to tell the main loop "something changed, here's the context"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@caseydavenport I think that we should do a check here to see if we care about the update. It's mostly because we want to update any IPs if the annotations have changed. If we just trigger an update here without checking if the annotations are different it will be harder for us to know if we should re-assign the IP or the change was not relevant to us. I agree that we do not need to pass the service, but rather just use the informer

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might work - I need to think about this one a little bit more. My fear is that comparing the annotations between the "new" and the "old" version of the Service isn't strictly speaking the thing this controller cares about.

The controller wants to compare the annotations, the LoadBalancer.Ingress values, and the actual IPAM allocation database to determine whether we need to allocate or release IP addresses.

Comparing the annotations is a proxy for that, and probably catches most cases but isn't guaranteed to catch all of the edge cases we care about.

For example, what happens if another controller modifies the Service and overwrites the status.LoadBalancer values that were previously written by this controller? We could try to write special logic to catch that here, but we may not be able to foresee every scenario that requires an update on the channel.

}

lbc.RegisterWith(lbc.dataFeed)
lbc.dataFeed.Start()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple controllers are going to call Start() - we either need to make sure that the dataFeed.Start() function is idempotent and can be called multiple times, or we need to call dataFeed.Start() only once - one level up where the controllers are initialized.

}
},
})
if err != nil {
log.WithError(err).Fatal("Failed to add event handle for Service LoadBalancer")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.WithError(err).Fatal("Failed to add event handle for Service LoadBalancer")
log.WithError(err).Fatal("Failed to add event handler for Service LoadBalancer")

}

}

func (c *loadBalancerController) handleBlockUpdate(kvp model.KVPair) {
if kvp.Value != nil {
host := kvp.Value.(*model.AllocationBlock).Affinity
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
host := kvp.Value.(*model.AllocationBlock).Affinity
affinity := kvp.Value.(*model.AllocationBlock).Affinity

}

func (c *loadBalancerController) handleBlockUpdate(kvp model.KVPair) {
if kvp.Value != nil {
host := kvp.Value.(*model.AllocationBlock).Affinity
if host != nil && *host == fmt.Sprintf("host:%s", api.VirtualLoadBalancer) {
if host != nil && *host == fmt.Sprintf("virtual:%s", api.VirtualLoadBalancer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if host != nil && *host == fmt.Sprintf("virtual:%s", api.VirtualLoadBalancer) {
if affinity != nil && *affinity == fmt.Sprintf("virtual:%s", api.VirtualLoadBalancer) {

return c.checkStatus(svc)
if len(c.servicesToUpdate) != 0 {
for key, svcObj := range c.servicesToUpdate {
switch svcObj.updateType {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal is to make the reconcile loop completely unaware of HOW it got to the state it is in. The type of update shouldn't matter at this point - all that the loop should know is what the current state of the cluster is, and what the desired state is.

This makes us much less prone to bugs where we encounter an unknown or unexpected ordering of events, which tends to happen in complex systems like Kubernetes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is somewhat hastily written out, so may not be bulletproof, but just as something to run past you to see what you think - the following pseudocode is probably where I would start from if trying to figure out this logic myself.


The controller would have some internal state that it uses to efficiently track IP allocation state. It would probably need to initialize these maps on startup with IPAM state, and then make incremental changes to them as it allocates and releases IP addresses.

Could use a helper structure to encapsulate that logic, which might make it a bit tidier:

type allocationTracker struct {
  // Keep track of IP allocations by Service, as well as a 
  // reverse lookup of service by IP. These maps are populated based on the IPAM DB state 
  // and NOT by the Service objects.
  servicesByIP map[string]string
  ipsByService map[string]map[string]bool
}

func (t *allocationTracker) AssignAddress(svc *v1.Service, ip string)
func (t *allocationTracker) ReleaseAddress(svc *v1.Service, ip string)
func (t *allocationTracker) DeleteService(svc *v1.Service)

The syncIPAM function would have something like this:

// Go through each Service that we know about and make sure it is properly in sync.
for _, svc := range c.informer.List() {
  if err := c.syncService(svc); err != nil {
    logrus.WithError(err).Warn("warning")
  }
}
// syncService does the following:
// - Releases any IP addresses in the IPAM DB associated with the Service that are not in the Service status.
// - Allocates any addresses necessary to satisfy the Service LB request
// - Updates the controllers internal state tracking of which IP addresses are allocated.
// - Updates the IP addresses in the Service Status to match the IPAM DB.
func syncService(svc *v1.Service) error {
  // TODO
  return nil
}

you could even have "event driven" logic accept incoming service updates funneled into the same function, simply calling syncService when we get service updates:

select {
  case <-kickChan:
    syncIPAM();
  case <- syncerUpdates:
    handleUpdate(upd)
  case svcID <- serviceUpdates:
    svc := c.informer.Get(svcID)
    syncService(svc)
}

}

hash := base64.RawURLEncoding.EncodeToString(hasher.Sum(nil))
regex := regexp.MustCompile("([-_.])")
hash = regex.ReplaceAllString(hash, "")
handle = prefix + hash
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine, and no need to change I don't think - but what I was suggesting was that we only add the hash if the handle exceeds the name limit.

e.g.,

handle := strings.ToLower(fmt.Sprintf("%s-%s-%s", svc.Name, svc.Namespace, svc.UID))

if len(handle) > k8svalidation.DNS1123LabelMaxLength {
  // Handle is too long - trim off the end, leaving enough room for 8 bytes of hash.
  hash := generateHash(handle)
  handle = handle[:k8svalidation.DNS1123LabelMaxLength-8]
  handle = fmt.Sprintf("%s%s", handle, hash[:8])
}

This just makes the actual handle IDs human readable in most cases.

ccache.Delete(createHandle(svcOld))
lbc.syncerUpdates <- serviceObject{
updateType: serviceUpdateTypeDELETE,
}
} else if svcOld.Annotations[annotationIpv4Pools] != svcNew.Annotations[annotationIpv4Pools] ||
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might work - I need to think about this one a little bit more. My fear is that comparing the annotations between the "new" and the "old" version of the Service isn't strictly speaking the thing this controller cares about.

The controller wants to compare the annotations, the LoadBalancer.Ingress values, and the actual IPAM allocation database to determine whether we need to allocate or release IP addresses.

Comparing the annotations is a proxy for that, and probably catches most cases but isn't guaranteed to catch all of the edge cases we care about.

For example, what happens if another controller modifies the Service and overwrites the status.LoadBalancer values that were previously written by this controller? We could try to write special logic to catch that here, but we may not be able to foresee every scenario that requires an update on the channel.

"k8s.io/client-go/kubernetes"
v1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
)

const (
annotationIpv4Pools = "projectcalico.org/ipv4pools"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
annotationIpv4Pools = "projectcalico.org/ipv4pools"
annotationIPv4Pools = "projectcalico.org/ipv4pools"

"k8s.io/client-go/kubernetes"
v1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
)

const (
annotationIpv4Pools = "projectcalico.org/ipv4pools"
annotationIpv6Pools = "projectcalico.org/ipv6pools"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
annotationIpv6Pools = "projectcalico.org/ipv6pools"
annotationIPv6Pools = "projectcalico.org/ipv6pools"

"k8s.io/client-go/kubernetes"
v1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
)

const (
annotationIpv4Pools = "projectcalico.org/ipv4pools"
annotationIpv6Pools = "projectcalico.org/ipv6pools"
annotationLoadBalancerIp = "projectcalico.org/loadBalancerIPs"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
annotationLoadBalancerIp = "projectcalico.org/loadBalancerIPs"
annotationLoadBalancerIP = "projectcalico.org/loadBalancerIPs"

Copy link
Member

@caseydavenport caseydavenport left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MichalFupso About to go into meetings, but wanted to get another round in - only reviewed the libcalico-go changes in this one and didn't get to the controller yet.

@@ -486,7 +487,7 @@ type blockAssignState struct {
// and assign affinity.
// It returns a block, a boolean if block is newly claimed and any error encountered.
func (s *blockAssignState) findOrClaimBlock(ctx context.Context, minFreeIps int) (*model.KVPair, bool, error) {
logCtx := log.WithFields(log.Fields{"host": s.host})
logCtx := log.WithFields(log.Fields{"Host": s.affinityCfg.Host})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to augment these logs to include the affinityCfg type as well.

Host: hostname,
}

//if args.IntendedUse == v3.IPPoolAllowedUseLoadBalancer {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to uncomment this bit?

@@ -34,6 +34,18 @@ import (
cnet "github.com/projectcalico/calico/libcalico-go/lib/net"
)

type AffinityConfig struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably suggest putting this in the interfaces file since it's used on the "user facing" interfaces.

return *block.Affinity == "host:"+host
if strings.HasPrefix(*block.Affinity, "host:") {
return *block.Affinity == "host:"+host
} else if strings.HasPrefix(*block.Affinity, "virtual:") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the number of places we're using it, might be worth using the constants for these to avoid potential for typos

@@ -204,18 +216,13 @@ func (rw blockReaderWriter) getPendingAffinity(ctx context.Context, host string,

// claimAffineBlock claims the provided block using the given pending affinity. If successful, it will confirm the affinity. If another host
// steals the block, claimAffineBlock will attempt to delete the provided pending affinity.
func (rw blockReaderWriter) claimAffineBlock(ctx context.Context, aff *model.KVPair, config IPAMConfig, rsvdAttr *HostReservedAttr) (*model.KVPair, error) {
func (rw blockReaderWriter) claimAffineBlock(ctx context.Context, aff *model.KVPair, config IPAMConfig, rsvdAttr *HostReservedAttr, affinityCfg AffinityConfig) (*model.KVPair, error) {
// Pull out relevant fields.
subnet := aff.Key.(model.BlockAffinityKey).CIDR
host := aff.Key.(model.BlockAffinityKey).Host
logCtx := log.WithFields(log.Fields{"host": host, "subnet": subnet})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should include the type in this context (and any other contexts where we're using an affinityConfig)

@@ -190,13 +190,21 @@ func (b *allocationBlock) assign(affinityCheck bool, address cnet.IP, handleID *

// hostAffinityMatches checks if the provided host matches the provided affinity.
func hostAffinityMatches(host string, block *model.AllocationBlock) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to actually pass down the AffinityConfig to this function rather than base the check off of block's affinity.

Say for example a block is affine to host:load-balancer and we attempt to assign from that block for virtual:load-balancer - this function will return that the affinity matches even though it doesn't. It's a niche case, but not impossible I don't think.

I'd expect that most cases we want to replace the host string function with and AffinityConfig instead.

} else if strings.HasPrefix(*block.Affinity, "virtual:") {
return *block.Affinity == "virtual:"+host
}
return false
}

func getHostAffinity(block *model.AllocationBlock) string {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function needs to be adjusted to return an AffinityConfig object instead, so that the callers can properly distinguish the type of affinity.

@@ -212,7 +212,7 @@ var _ = testutils.E2eDatastoreDescribe("IPAM affine block allocation tests", tes
}
})

By("assigning from host twice the number of available blocks all at once", func() {
By("assigning from Host twice the number of available blocks all at once", func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should definitely add some tests that verify the correct behavior when using two affinities with the same hostname but different affinity types.

e.g.,

  • Assigning IPs, Releasing IPs (both from the same block and different blocks)
  • Releasing affinities in the various ways
  • Claiming affinities in the various ways

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs-pr-required Change is not yet documented needs-operator-pr PRs that require follow-on operator work release-note-required Change has user-facing impact (no matter how small)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants