From a6be349036bb8e9fc80e09bba21f4361052b22aa Mon Sep 17 00:00:00 2001 From: Chris Anderson Date: Thu, 19 Dec 2024 20:18:52 -0600 Subject: [PATCH] Adds priority jobs (#585) --- .../platform/src/lists/ListEvaluateUserJob.ts | 7 +++++ apps/platform/src/queue/Job.ts | 6 +++++ apps/platform/src/queue/RedisQueueProvider.ts | 14 ++++++---- .../ui/src/views/organization/Performance.tsx | 27 ++++++++++++++++--- 4 files changed, 45 insertions(+), 9 deletions(-) diff --git a/apps/platform/src/lists/ListEvaluateUserJob.ts b/apps/platform/src/lists/ListEvaluateUserJob.ts index d1071164..d95e527c 100644 --- a/apps/platform/src/lists/ListEvaluateUserJob.ts +++ b/apps/platform/src/lists/ListEvaluateUserJob.ts @@ -1,6 +1,7 @@ import App from '../app' import { cacheIncr } from '../config/redis' import { Job } from '../queue' +import { JobPriority } from '../queue/Job' import { getUser } from '../users/UserRepository' import { DynamicList } from './List' import { CacheKeys, cleanupList, evaluateUserList, getList } from './ListService' @@ -16,6 +17,12 @@ interface ListEvaluateUserParams { export default class ListEvaluateUserJob extends Job { static $name = 'list_evaluate_user_job' + options = { + delay: 0, + attempts: 3, + priority: JobPriority.low, + } + static from(params: ListEvaluateUserParams): ListEvaluateUserJob { return new this(params) } diff --git a/apps/platform/src/queue/Job.ts b/apps/platform/src/queue/Job.ts index e41e5f59..60483331 100644 --- a/apps/platform/src/queue/Job.ts +++ b/apps/platform/src/queue/Job.ts @@ -16,6 +16,12 @@ export interface EncodedJob { export class JobError extends Error {} export class RetryError extends JobError {} +export const JobPriority = { + none: 0, + high: 1, + low: 2, +} + export default class Job implements EncodedJob { data: any options: JobOptions = { diff --git a/apps/platform/src/queue/RedisQueueProvider.ts b/apps/platform/src/queue/RedisQueueProvider.ts index 7bb6d4e5..1219a437 100644 --- a/apps/platform/src/queue/RedisQueueProvider.ts +++ b/apps/platform/src/queue/RedisQueueProvider.ts @@ -2,7 +2,7 @@ import { MetricsTime, Queue as BullQueue, Worker, JobsOptions, DelayedError } fr import { subMinutes } from 'date-fns' import { logger } from '../config/logger' import { batch } from '../utilities' -import { EncodedJob } from './Job' +import { EncodedJob, JobPriority } from './Job' import Queue, { QueueTypeConfig } from './Queue' import QueueProvider, { MetricPeriod, QueueMetric } from './QueueProvider' import { DefaultRedis, Redis, RedisConfig } from '../config/redis' @@ -82,9 +82,7 @@ export default class RedisQueueProvider implements QueueProvider { count: 50, age: 24 * 3600, // keep up to 24 hours }, - delay: job.options.delay, - attempts: job.options.attempts, - jobId: job.options.jobId, + ...job.options, }, } } @@ -123,6 +121,10 @@ export default class RedisQueueProvider implements QueueProvider { async metrics(period: MetricPeriod): Promise { const waiting = await this.bull.getWaitingCount() + const priorities = await this.bull.getCountsPerPriority([ + JobPriority.high, + JobPriority.low, + ]) const completed = await this.bull.getMetrics('completed') const data = completed.data .slice(0, period) @@ -133,7 +135,9 @@ export default class RedisQueueProvider implements QueueProvider { data.reverse() return { data, - waiting, + waiting: waiting + + priorities[JobPriority.high] + + priorities[JobPriority.low], } } diff --git a/apps/ui/src/views/organization/Performance.tsx b/apps/ui/src/views/organization/Performance.tsx index 45456a56..e348acdd 100644 --- a/apps/ui/src/views/organization/Performance.tsx +++ b/apps/ui/src/views/organization/Performance.tsx @@ -8,6 +8,7 @@ import Tile, { TileGrid } from '../../ui/Tile' import PageContent from '../../ui/PageContent' import { SingleSelect } from '../../ui/form/SingleSelect' import { DataTable, JsonPreview, Modal } from '../../ui' +import { useSearchParams } from 'react-router-dom' interface Series { label: string @@ -21,8 +22,11 @@ export default function Performance() { const [metrics, setMetrics] = useState() + const [searchParams, setSearchParams] = useSearchParams() + const job = searchParams.get('job') ?? undefined + const [jobs, setJobs] = useState([]) - const [currentJob, setCurrentJob] = useState() + const [currentJob, setCurrentJob] = useState(job) const [jobMetrics, setJobMetrics] = useState() const [failed, setFailed] = useState>>([]) @@ -47,7 +51,7 @@ export default function Performance() { api.organizations.jobs() .then((jobs) => { setJobs(jobs) - setCurrentJob(jobs[0]) + if (!currentJob) setCurrentJob(jobs[0]) }) .catch(() => {}) @@ -74,6 +78,14 @@ export default function Performance() { .catch(() => {}) }, [currentJob]) + const handleChangeJob = (job: string | undefined) => { + setCurrentJob(job) + if (job) { + searchParams.set('job', job) + setSearchParams(searchParams) + } + } + const primaryAxis = useMemo( (): AxisOptions => ({ getValue: datum => datum.date, @@ -118,7 +130,7 @@ export default function Performance() { size="small" options={jobs} value={currentJob} - onChange={setCurrentJob} + onChange={handleChangeJob} /> } /> {jobMetrics &&
@@ -143,7 +155,14 @@ export default function Performance() { { key: 'name', title: 'Name' }, { key: 'attemptsMade', title: 'Attempts Made' }, { key: 'failedReason', title: 'Reason' }, - { key: 'timestamp', title: 'Timestamp' }, + { + key: 'timestamp', + title: 'Timestamp', + cell: ({ item: { timestamp } }) => { + const date = new Date(timestamp) + return date.toLocaleString() + }, + }, ]} onSelectRow={row => setSelectedFailed(row) }/>
}