Skip to content

Commit

Permalink
Merge pull request kubernetes-sigs#1209 from goto-opensource/feature/…
Browse files Browse the repository at this point in the history
…retry-single-changes-on-batch-failure

Route53: retry single changes in a batch if the batch fails
  • Loading branch information
k8s-ci-robot authored Jan 17, 2023
2 parents 1a556ce + 7dd84a5 commit a18bf2b
Show file tree
Hide file tree
Showing 6 changed files with 405 additions and 182 deletions.
2 changes: 2 additions & 0 deletions endpoint/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
OwnerLabelKey = "owner"
// ResourceLabelKey is the name of the label that identifies k8s resource which wants to acquire the DNS name
ResourceLabelKey = "resource"
// OwnedRecordLabelKey is the name of the label that identifies the record that is owned by the labeled TXT registry record
OwnedRecordLabelKey = "ownedRecord"

// AWSSDDescriptionLabel label responsible for storing raw owner/resource combination information in the Labels
// supposed to be inserted by AWS SD Provider, and parsed into OwnerLabelKey and ResourceLabelKey key by AWS SD Registry
Expand Down
1 change: 1 addition & 0 deletions internal/testutils/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func SameEndpoint(a, b *endpoint.Endpoint) bool {
return a.DNSName == b.DNSName && a.Targets.Same(b.Targets) && a.RecordType == b.RecordType && a.SetIdentifier == b.SetIdentifier &&
a.Labels[endpoint.OwnerLabelKey] == b.Labels[endpoint.OwnerLabelKey] && a.RecordTTL == b.RecordTTL &&
a.Labels[endpoint.ResourceLabelKey] == b.Labels[endpoint.ResourceLabelKey] &&
a.Labels[endpoint.OwnedRecordLabelKey] == b.Labels[endpoint.OwnedRecordLabelKey] &&
SameProviderSpecific(a.ProviderSpecific, b.ProviderSpecific)
}

Expand Down
197 changes: 145 additions & 52 deletions provider/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,22 @@ type Route53API interface {
ListTagsForResourceWithContext(ctx context.Context, input *route53.ListTagsForResourceInput, opts ...request.Option) (*route53.ListTagsForResourceOutput, error)
}

// wrapper to handle ownership relation throughout the provider implementation
type Route53Change struct {
route53.Change
OwnedRecord string
}

type Route53Changes []*Route53Change

func (cs Route53Changes) Route53Changes() []*route53.Change {
ret := []*route53.Change{}
for _, c := range cs {
ret = append(ret, &c.Change)
}
return ret
}

type zonesListCache struct {
age time.Time
duration time.Duration
Expand All @@ -160,6 +176,8 @@ type AWSProvider struct {
zoneTagFilter provider.ZoneTagFilter
preferCNAME bool
zonesCache *zonesListCache
// queue for collecting changes to submit them in the next iteration, but after all other changes
failedChangesQueue map[string]Route53Changes
}

// AWSConfig contains configuration to create a new AWS provider.
Expand Down Expand Up @@ -224,6 +242,7 @@ func NewAWSProvider(awsConfig AWSConfig) (*AWSProvider, error) {
preferCNAME: awsConfig.PreferCNAME,
dryRun: awsConfig.DryRun,
zonesCache: &zonesListCache{duration: awsConfig.ZoneCacheDuration},
failedChangesQueue: make(map[string]Route53Changes),
}

return provider, nil
Expand Down Expand Up @@ -467,7 +486,7 @@ func (p *AWSProvider) requiresDeleteCreate(old *endpoint.Endpoint, new *endpoint
return false
}

func (p *AWSProvider) createUpdateChanges(newEndpoints, oldEndpoints []*endpoint.Endpoint) []*route53.Change {
func (p *AWSProvider) createUpdateChanges(newEndpoints, oldEndpoints []*endpoint.Endpoint) Route53Changes {
var deletes []*endpoint.Endpoint
var creates []*endpoint.Endpoint
var updates []*endpoint.Endpoint
Expand All @@ -483,7 +502,7 @@ func (p *AWSProvider) createUpdateChanges(newEndpoints, oldEndpoints []*endpoint
}
}

combined := make([]*route53.Change, 0, len(deletes)+len(creates)+len(updates))
combined := make(Route53Changes, 0, len(deletes)+len(creates)+len(updates))
combined = append(combined, p.newChanges(route53.ChangeActionCreate, creates)...)
combined = append(combined, p.newChanges(route53.ChangeActionUpsert, updates)...)
combined = append(combined, p.newChanges(route53.ChangeActionDelete, deletes)...)
Expand Down Expand Up @@ -514,7 +533,7 @@ func (p *AWSProvider) ApplyChanges(ctx context.Context, changes *plan.Changes) e

updateChanges := p.createUpdateChanges(changes.UpdateNew, changes.UpdateOld)

combinedChanges := make([]*route53.Change, 0, len(changes.Delete)+len(changes.Create)+len(updateChanges))
combinedChanges := make(Route53Changes, 0, len(changes.Delete)+len(changes.Create)+len(updateChanges))
combinedChanges = append(combinedChanges, p.newChanges(route53.ChangeActionCreate, changes.Create)...)
combinedChanges = append(combinedChanges, p.newChanges(route53.ChangeActionDelete, changes.Delete)...)
combinedChanges = append(combinedChanges, updateChanges...)
Expand All @@ -523,7 +542,7 @@ func (p *AWSProvider) ApplyChanges(ctx context.Context, changes *plan.Changes) e
}

// submitChanges takes a zone and a collection of Changes and sends them as a single transaction.
func (p *AWSProvider) submitChanges(ctx context.Context, changes []*route53.Change, zones map[string]*route53.HostedZone) error {
func (p *AWSProvider) submitChanges(ctx context.Context, changes Route53Changes, zones map[string]*route53.HostedZone) error {
// return early if there is nothing to change
if len(changes) == 0 {
log.Info("All records are already up to date")
Expand All @@ -540,9 +559,16 @@ func (p *AWSProvider) submitChanges(ctx context.Context, changes []*route53.Chan
for z, cs := range changesByZone {
var failedUpdate bool

batchCs := batchChangeSet(cs, p.batchChangeSize)
// group changes into new changes and into changes that failed in a previous iteration and are retried
retriedChanges, newChanges := findChangesInQueue(cs, p.failedChangesQueue[z])
p.failedChangesQueue[z] = nil

batchCs := append(batchChangeSet(newChanges, p.batchChangeSize), batchChangeSet(retriedChanges, p.batchChangeSize)...)
for i, b := range batchCs {
if len(b) == 0 {
continue
}

for _, c := range b {
log.Infof("Desired change: %s %s %s [Id: %s]", *c.Action, *c.ResourceRecordSet.Name, *c.ResourceRecordSet.Type, z)
}
Expand All @@ -551,17 +577,45 @@ func (p *AWSProvider) submitChanges(ctx context.Context, changes []*route53.Chan
params := &route53.ChangeResourceRecordSetsInput{
HostedZoneId: aws.String(z),
ChangeBatch: &route53.ChangeBatch{
Changes: b,
Changes: b.Route53Changes(),
},
}

successfulChanges := 0

if _, err := p.client.ChangeResourceRecordSetsWithContext(ctx, params); err != nil {
log.Errorf("Failure in zone %s [Id: %s]", aws.StringValue(zones[z].Name), z)
log.Error(err) // TODO(ideahitme): consider changing the interface in cases when this error might be a concern for other components
failedUpdate = true
log.Errorf("Failure in zone %s [Id: %s] when submitting change batch: %v", aws.StringValue(zones[z].Name), z, err)

changesByOwnership := groupChangesByNameAndOwnershipRelation(b)

if len(changesByOwnership) > 1 {
log.Debug("Trying to submit change sets one-by-one instead")

for _, changes := range changesByOwnership {
for _, c := range changes {
log.Debugf("Desired change: %s %s %s [Id: %s]", *c.Action, *c.ResourceRecordSet.Name, *c.ResourceRecordSet.Type, z)
}
params.ChangeBatch = &route53.ChangeBatch{
Changes: changes.Route53Changes(),
}
if _, err := p.client.ChangeResourceRecordSetsWithContext(ctx, params); err != nil {
failedUpdate = true
log.Errorf("Failed submitting change (error: %v), it will be retried in a separate change batch in the next iteration", err)
p.failedChangesQueue[z] = append(p.failedChangesQueue[z], changes...)
} else {
successfulChanges = successfulChanges + len(changes)
}
}
} else {
failedUpdate = true
}
} else {
successfulChanges = len(b)
}

if successfulChanges > 0 {
// z is the R53 Hosted Zone ID already as aws.StringValue
log.Infof("%d record(s) in zone %s [Id: %s] were successfully updated", len(b), aws.StringValue(zones[z].Name), z)
log.Infof("%d record(s) in zone %s [Id: %s] were successfully updated", successfulChanges, aws.StringValue(zones[z].Name), z)
}

if i != len(batchCs)-1 {
Expand All @@ -583,16 +637,16 @@ func (p *AWSProvider) submitChanges(ctx context.Context, changes []*route53.Chan
}

// newChanges returns a collection of Changes based on the given records and action.
func (p *AWSProvider) newChanges(action string, endpoints []*endpoint.Endpoint) []*route53.Change {
changes := make([]*route53.Change, 0, len(endpoints))
func (p *AWSProvider) newChanges(action string, endpoints []*endpoint.Endpoint) Route53Changes {
changes := make(Route53Changes, 0, len(endpoints))

for _, endpoint := range endpoints {
change, dualstack := p.newChange(action, endpoint)
changes = append(changes, change)
if dualstack {
// make a copy of change, modify RRS type to AAAA, then add new change
rrs := *change.ResourceRecordSet
change2 := &route53.Change{Action: change.Action, ResourceRecordSet: &rrs}
change2 := &Route53Change{Change: route53.Change{Action: change.Action, ResourceRecordSet: &rrs}}
change2.ResourceRecordSet.Type = aws.String(route53.RRTypeAaaa)
changes = append(changes, change2)
}
Expand Down Expand Up @@ -635,11 +689,13 @@ func (p *AWSProvider) AdjustEndpoints(endpoints []*endpoint.Endpoint) []*endpoin
// returned Change is based on the given record by the given action, e.g.
// action=ChangeActionCreate returns a change for creation of the record and
// action=ChangeActionDelete returns a change for deletion of the record.
func (p *AWSProvider) newChange(action string, ep *endpoint.Endpoint) (*route53.Change, bool) {
change := &route53.Change{
Action: aws.String(action),
ResourceRecordSet: &route53.ResourceRecordSet{
Name: aws.String(ep.DNSName),
func (p *AWSProvider) newChange(action string, ep *endpoint.Endpoint) (*Route53Change, bool) {
change := &Route53Change{
Change: route53.Change{
Action: aws.String(action),
ResourceRecordSet: &route53.ResourceRecordSet{
Name: aws.String(ep.DNSName),
},
},
}
dualstack := false
Expand Down Expand Up @@ -718,9 +774,51 @@ func (p *AWSProvider) newChange(action string, ep *endpoint.Endpoint) (*route53.
change.ResourceRecordSet.HealthCheckId = aws.String(prop.Value)
}

if ownedRecord, ok := ep.Labels[endpoint.OwnedRecordLabelKey]; ok {
change.OwnedRecord = ownedRecord
}

return change, dualstack
}

// searches for `changes` that are contained in `queue` and returns the `changes` separated by whether they were found in the queue (`foundChanges`) or not (`notFoundChanges`)
func findChangesInQueue(changes Route53Changes, queue Route53Changes) (foundChanges, notFoundChanges Route53Changes) {
if queue == nil {
return Route53Changes{}, changes
}

for _, c := range changes {
found := false
for _, qc := range queue {
if c == qc {
foundChanges = append(foundChanges, c)
found = true
break
}
}
if !found {
notFoundChanges = append(notFoundChanges, c)
}
}

return
}

// group the given changes by name and ownership relation to ensure these are always submitted in the same transaction to Route53;
// grouping by name is done to always submit changes with the same name but different set identifier together,
// grouping by ownership relation is done to always submit changes of records and e.g. their corresponding TXT registry records together
func groupChangesByNameAndOwnershipRelation(cs Route53Changes) map[string]Route53Changes {
changesByOwnership := make(map[string]Route53Changes)
for _, v := range cs {
key := v.OwnedRecord
if key == "" {
key = aws.StringValue(v.ResourceRecordSet.Name)
}
changesByOwnership[key] = append(changesByOwnership[key], v)
}
return changesByOwnership
}

func (p *AWSProvider) tagsForZone(ctx context.Context, zoneID string) (map[string]string, error) {
response, err := p.client.ListTagsForResourceWithContext(ctx, &route53.ListTagsForResourceInput{
ResourceType: aws.String("hostedzone"),
Expand All @@ -736,55 +834,48 @@ func (p *AWSProvider) tagsForZone(ctx context.Context, zoneID string) (map[strin
return tagMap, nil
}

func batchChangeSet(cs []*route53.Change, batchSize int) [][]*route53.Change {
func batchChangeSet(cs Route53Changes, batchSize int) []Route53Changes {
if len(cs) <= batchSize {
res := sortChangesByActionNameType(cs)
return [][]*route53.Change{res}
return []Route53Changes{res}
}

batchChanges := make([][]*route53.Change, 0)
batchChanges := make([]Route53Changes, 0)

changesByName := make(map[string][]*route53.Change)
for _, v := range cs {
changesByName[*v.ResourceRecordSet.Name] = append(changesByName[*v.ResourceRecordSet.Name], v)
}
changesByOwnership := groupChangesByNameAndOwnershipRelation(cs)

names := make([]string, 0)
for v := range changesByName {
for v := range changesByOwnership {
names = append(names, v)
}
sort.Strings(names)

for _, name := range names {
totalChangesByName := len(changesByName[name])

if totalChangesByName > batchSize {
log.Warnf("Total changes for %s exceeds max batch size of %d, total changes: %d", name,
batchSize, totalChangesByName)
currentBatch := Route53Changes{}
for k, name := range names {
v := changesByOwnership[name]
if len(v) > batchSize {
log.Warnf("Total changes for %v exceeds max batch size of %d, total changes: %d; changes will not be performed", k, batchSize, len(v))
continue
}

var existingBatch bool
for i, b := range batchChanges {
if len(b)+totalChangesByName <= batchSize {
batchChanges[i] = append(batchChanges[i], changesByName[name]...)
existingBatch = true
break
}
}
if !existingBatch {
batchChanges = append(batchChanges, changesByName[name])
if len(currentBatch)+len(v) > batchSize {
// currentBatch would be too large if we add this changeset;
// add currentBatch to batchChanges and start a new currentBatch
batchChanges = append(batchChanges, sortChangesByActionNameType(currentBatch))
currentBatch = append(Route53Changes{}, v...)
} else {
currentBatch = append(currentBatch, v...)
}
}

for i, batch := range batchChanges {
batchChanges[i] = sortChangesByActionNameType(batch)
if len(currentBatch) > 0 {
// add final currentBatch
batchChanges = append(batchChanges, sortChangesByActionNameType(currentBatch))
}

return batchChanges
}

func sortChangesByActionNameType(cs []*route53.Change) []*route53.Change {
func sortChangesByActionNameType(cs Route53Changes) Route53Changes {
sort.SliceStable(cs, func(i, j int) bool {
if *cs[i].Action > *cs[j].Action {
return true
Expand All @@ -805,11 +896,11 @@ func sortChangesByActionNameType(cs []*route53.Change) []*route53.Change {
}

// changesByZone separates a multi-zone change into a single change per zone.
func changesByZone(zones map[string]*route53.HostedZone, changeSet []*route53.Change) map[string][]*route53.Change {
changes := make(map[string][]*route53.Change)
func changesByZone(zones map[string]*route53.HostedZone, changeSet Route53Changes) map[string]Route53Changes {
changes := make(map[string]Route53Changes)

for _, z := range zones {
changes[aws.StringValue(z.Id)] = []*route53.Change{}
changes[aws.StringValue(z.Id)] = Route53Changes{}
}

for _, c := range changeSet {
Expand All @@ -828,9 +919,11 @@ func changesByZone(zones map[string]*route53.HostedZone, changeSet []*route53.Ch
aliasTarget := *rrset.AliasTarget
aliasTarget.HostedZoneId = aws.String(cleanZoneID(aws.StringValue(z.Id)))
rrset.AliasTarget = &aliasTarget
c = &route53.Change{
Action: c.Action,
ResourceRecordSet: &rrset,
c = &Route53Change{
Change: route53.Change{
Action: c.Action,
ResourceRecordSet: &rrset,
},
}
}
changes[aws.StringValue(z.Id)] = append(changes[aws.StringValue(z.Id)], c)
Expand Down
Loading

0 comments on commit a18bf2b

Please sign in to comment.