Skip to content

Commit

Permalink
address un-deleted legacy jobs causing flapping
Browse files Browse the repository at this point in the history
The job-handling logic has been re-written to be easier to follow. It
should also now be more correct in that jobs from previous versions of
the controller will no longer be applied and thus cause flapping for
plans with cordon/drain configured.
  • Loading branch information
dweomer committed May 6, 2020
1 parent e3d69e5 commit 5671782
Showing 1 changed file with 74 additions and 37 deletions.
111 changes: 74 additions & 37 deletions pkg/upgrade/handle_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
upgradejob "github.com/rancher/system-upgrade-controller/pkg/upgrade/job"
batchctlv1 "github.com/rancher/wrangler-api/pkg/generated/controllers/batch/v1"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)
Expand All @@ -28,39 +29,70 @@ func (ctl *Controller) handleJobs(ctx context.Context) error {
jobSelector := labels.SelectorFromSet(labels.Set{
upgradeapi.LabelController: ctl.Name,
})
// avoid commandeering jobs that do not contain the label for upgrade controller
if obj.Labels != nil && jobSelector.Matches(labels.Set(obj.Labels)) {
if planName, ok := obj.Labels[upgradeapi.LabelPlan]; ok {
defer plans.Enqueue(obj.Namespace, planName)
if upgradejob.ConditionComplete.IsTrue(obj) {
planLabel := upgradeapi.LabelPlanName(planName)
if planHash, ok := obj.Labels[planLabel]; ok {
if nodeName, ok := obj.Labels[upgradeapi.LabelNode]; ok {
node, err := nodes.Cache().Get(nodeName)
if err != nil {
return obj, err
}
plan, err := plans.Cache().Get(obj.Namespace, planName)
if err != nil {
return obj, err
}
node.Labels[planLabel] = planHash
if node.Spec.Unschedulable && (plan.Spec.Cordon || plan.Spec.Drain != nil) {
node.Spec.Unschedulable = false
}
if node, err = nodes.Update(node); err != nil {
return obj, err
}
}
}
}
if upgradejob.ConditionComplete.IsTrue(obj) {
return obj, enqueueOrDelete(jobs, obj, upgradejob.ConditionComplete)
// avoid commandeering jobs from other controllers
if obj.Labels == nil || !jobSelector.Matches(labels.Set(obj.Labels)) {
return obj, nil
}
// identify the plan that this job is applying
planName, ok := obj.Labels[upgradeapi.LabelPlan]
if !ok {
// malformed, just delete it and move on
return obj, deleteJob(jobs, obj, metav1.DeletePropagationBackground)
}
// what version of the plan is this job applying?
planVersion, ok := obj.Labels[upgradeapi.LabelVersion]
if !ok {
// malformed, just delete it and move on
return obj, deleteJob(jobs, obj, metav1.DeletePropagationBackground)
}
// get the plan being applied
plan, err := plans.Cache().Get(obj.Namespace, planName)
switch {
case errors.IsNotFound(err):
// plan is gone, delete
return obj, deleteJob(jobs, obj, metav1.DeletePropagationBackground)
case err != nil:
return obj, err
}
// if this job was applying a different version then just delete it
// this has the side-effect of only ever retaining one job per node during the TTL window
if planVersion != plan.Status.LatestVersion {
return obj, deleteJob(jobs, obj, metav1.DeletePropagationBackground)
}
// trigger the plan when we're done, might free up a concurrency slot
defer plans.Enqueue(obj.Namespace, planName)
// identify the node that this job is targeting
nodeName, ok := obj.Labels[upgradeapi.LabelNode]
if !ok {
// malformed, just delete it and move on
return obj, deleteJob(jobs, obj, metav1.DeletePropagationBackground)
}
// get the node that the plan is being applied to
node, err := nodes.Cache().Get(nodeName)
switch {
case errors.IsNotFound(err):
// node is gone, delete
return obj, deleteJob(jobs, obj, metav1.DeletePropagationBackground)
case err != nil:
return obj, err
}
// if the job has failed enqueue-or-delete it depending on the TTL window
if upgradejob.ConditionFailed.IsTrue(obj) {
return obj, enqueueOrDelete(jobs, obj, upgradejob.ConditionFailed)
}
// if the job has completed tag the node then enqueue-or-delete depending on the TTL window
if upgradejob.ConditionComplete.IsTrue(obj) {
planLabel := upgradeapi.LabelPlanName(planName)
if planHash, ok := obj.Labels[planLabel]; ok {
node.Labels[planLabel] = planHash
if node.Spec.Unschedulable && (plan.Spec.Cordon || plan.Spec.Drain != nil) {
node.Spec.Unschedulable = false
}
if upgradejob.ConditionFailed.IsTrue(obj) {
return obj, enqueueOrDelete(jobs, obj, upgradejob.ConditionFailed)
if node, err = nodes.Update(node); err != nil {
return obj, err
}
}
return obj, enqueueOrDelete(jobs, obj, upgradejob.ConditionComplete)
}
return obj, nil
})
Expand All @@ -77,19 +109,24 @@ func enqueueOrDelete(jobController batchctlv1.JobController, job *batchv1.Job, d
var ttlSecondsAfterFinished time.Duration

if job.Spec.TTLSecondsAfterFinished == nil {
fallbackTTLSecondsAfterFinished, err := strconv.ParseInt(job.Annotations[upgradeapi.AnnotationTTLSecondsAfterFinished], 10, 32)
if err != nil {
return err
if annotation, ok := job.Annotations[upgradeapi.AnnotationTTLSecondsAfterFinished]; ok {
fallbackTTLSecondsAfterFinished, err := strconv.ParseInt(annotation, 10, 32)
if err != nil {
// malformed, delete
return deleteJob(jobController, job, metav1.DeletePropagationBackground)
}
ttlSecondsAfterFinished = time.Second * time.Duration(fallbackTTLSecondsAfterFinished)
}
ttlSecondsAfterFinished = time.Second * time.Duration(fallbackTTLSecondsAfterFinished)
} else {
ttlSecondsAfterFinished = time.Second * time.Duration(*job.Spec.TTLSecondsAfterFinished)
}
if interval := time.Now().Sub(lastTransitionTime); interval < ttlSecondsAfterFinished {
jobController.EnqueueAfter(job.Namespace, job.Name, ttlSecondsAfterFinished-interval)
return nil
}
deletePropagationBackground := metav1.DeletePropagationForeground
deleteOptions := metav1.DeleteOptions{PropagationPolicy: &deletePropagationBackground}
return jobController.Delete(job.Namespace, job.Name, &deleteOptions)
return deleteJob(jobController, job, metav1.DeletePropagationBackground)
}

func deleteJob(jobController batchctlv1.JobController, job *batchv1.Job, deletionPropagation metav1.DeletionPropagation) error {
return jobController.Delete(job.Namespace, job.Name, &metav1.DeleteOptions{PropagationPolicy: &deletionPropagation})
}

0 comments on commit 5671782

Please sign in to comment.