Skip to content

Commit

Permalink
Ensure count items is not already running
Browse files Browse the repository at this point in the history
A race condition where a count items was running when the new
one started at the beggining of the quota tests, and the old
one completing after the new one, would lead to an override of
the infostore collection, in turn leading to missing metrics
for the quota buckets/accounts, making the quota tests fail
till a new count items starts.

Use a file locking mechanism to ensure no concurrent job runs

- Watching the resource or using kube api is prone to race
  condition, as their is a delay between the request to create
  a job, and its run, plus, the logic is not easy to centralize
  if we consider job already completed or not yet started, as
  we delete the old jobs upon completion
- Using alock mechanism is similar to what we already have for
  quotas, plus, it ensures no race condition and tests being
  able to restart as soon as possible.

Issue: ZENKO-4941
  • Loading branch information
williamlardier committed Dec 4, 2024
1 parent d460fcb commit 22d65a2
Showing 1 changed file with 54 additions and 16 deletions.
70 changes: 54 additions & 16 deletions tests/ctst/steps/utils/kubernetes.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import fs from 'fs';
import * as path from 'path';
import lockFile from 'proper-lockfile';
import { KubernetesHelper, Utils } from 'cli-testing';
import Zenko from 'world/Zenko';
import {
Expand Down Expand Up @@ -71,12 +74,40 @@ export function createKubeCustomObjectClient(world: Zenko): CustomObjectsApi {
return KubernetesHelper.customObject;
}

export async function createJobAndWaitForCompletion(world: Zenko, jobName: string, customMetadata?: string) {
export async function createJobAndWaitForCompletion(
world: Zenko,
jobName: string,
customMetadata?: string
) {
const batchClient = createKubeBatchClient(world);
const watchClient = createKubeWatchClient(world);

const lockFilePath = path.join('/tmp', `${jobName}.lock`);
let releaseLock: (() => Promise<void>) | false = false;

if (!fs.existsSync(lockFilePath)) {
fs.writeFileSync(lockFilePath, '');
}

try {
releaseLock = await lockFile.lock(lockFilePath, {
// Expect the job does not take more than 2 minutes to complete
stale: 2 * 60 * 1000,
// use a non-exponential backoff strategy
// try once per second for 2min 10s
retries: {
retries: 130,
factor: 1,
minTimeout: 1000,
maxTimeout: 1000,
},
});
world.logger.debug(`Acquired lock for job: ${jobName}`);

// Read the cron job and prepare the job spec
const cronJob = await batchClient.readNamespacedCronJob(jobName, 'default');
const cronJobSpec = cronJob.body.spec?.jobTemplate.spec;

const job = new V1Job();
const metadata = new V1ObjectMeta();
job.apiVersion = 'batch/v1';
Expand All @@ -87,50 +118,57 @@ export async function createJobAndWaitForCompletion(world: Zenko, jobName: strin
'cronjob.kubernetes.io/instantiate': 'ctst',
};
if (customMetadata) {
metadata.annotations = {
custom: customMetadata,
};
metadata.annotations.custom = customMetadata;
}
job.metadata = metadata;

// Create the job
const response = await batchClient.createNamespacedJob('default', job);
world.logger.debug('job created', {
job: response.body.metadata,
});
world.logger.debug('Job created', { job: response.body.metadata });

const expectedJobName = response.body.metadata?.name;

// Watch for job completion
await new Promise<void>((resolve, reject) => {
void watchClient.watch(
'/apis/batch/v1/namespaces/default/jobs',
{},
(type: string, apiObj, watchObj) => {
if (job.metadata?.name && expectedJobName &&
(watchObj.object?.metadata?.name as string)?.startsWith?.(expectedJobName)) {
if (
expectedJobName &&
(watchObj.object?.metadata?.name as string)?.startsWith?.(expectedJobName)
) {
if (watchObj.object?.status?.succeeded) {
world.logger.debug('job succeeded', {
job: job.metadata,
});
world.logger.debug('Job succeeded', { job: job.metadata });
resolve();
} else if (watchObj.object?.status?.failed) {
world.logger.debug('job failed', {
world.logger.debug('Job failed', {
job: job.metadata,
object: watchObj.object,
});
reject(new Error('job failed'));
reject(new Error('Job failed'));
}
}
}, reject);
},
reject
);
});
} catch (err: unknown) {
world.logger.error('error creating job', {
world.logger.error('Error creating or waiting for job completion', {
jobName,
err,
});
throw err;
} finally {
// Ensure the lock is released
if (releaseLock) {
await releaseLock();
world.logger.debug(`Released lock for job: ${jobName}`);
}
}
}


export async function waitForZenkoToStabilize(
world: Zenko, needsReconciliation = false, timeout = 15 * 60 * 1000, namespace = 'default') {
// ZKOP pulls the overlay configuration from Pensieve every 5 seconds
Expand Down

0 comments on commit 22d65a2

Please sign in to comment.