Skip to content

Commit

Permalink
Adds priority jobs (#585)
Browse files Browse the repository at this point in the history
  • Loading branch information
pushchris authored Dec 20, 2024
1 parent d248c74 commit a6be349
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 9 deletions.
7 changes: 7 additions & 0 deletions apps/platform/src/lists/ListEvaluateUserJob.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import App from '../app'
import { cacheIncr } from '../config/redis'
import { Job } from '../queue'
import { JobPriority } from '../queue/Job'
import { getUser } from '../users/UserRepository'
import { DynamicList } from './List'
import { CacheKeys, cleanupList, evaluateUserList, getList } from './ListService'
Expand All @@ -16,6 +17,12 @@ interface ListEvaluateUserParams {
export default class ListEvaluateUserJob extends Job {
static $name = 'list_evaluate_user_job'

options = {
delay: 0,
attempts: 3,
priority: JobPriority.low,
}

static from(params: ListEvaluateUserParams): ListEvaluateUserJob {
return new this(params)
}
Expand Down
6 changes: 6 additions & 0 deletions apps/platform/src/queue/Job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ export interface EncodedJob {
export class JobError extends Error {}
export class RetryError extends JobError {}

export const JobPriority = {
none: 0,
high: 1,
low: 2,
}

export default class Job implements EncodedJob {
data: any
options: JobOptions = {
Expand Down
14 changes: 9 additions & 5 deletions apps/platform/src/queue/RedisQueueProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { MetricsTime, Queue as BullQueue, Worker, JobsOptions, DelayedError } fr
import { subMinutes } from 'date-fns'
import { logger } from '../config/logger'
import { batch } from '../utilities'
import { EncodedJob } from './Job'
import { EncodedJob, JobPriority } from './Job'
import Queue, { QueueTypeConfig } from './Queue'
import QueueProvider, { MetricPeriod, QueueMetric } from './QueueProvider'
import { DefaultRedis, Redis, RedisConfig } from '../config/redis'
Expand Down Expand Up @@ -82,9 +82,7 @@ export default class RedisQueueProvider implements QueueProvider {
count: 50,
age: 24 * 3600, // keep up to 24 hours
},
delay: job.options.delay,
attempts: job.options.attempts,
jobId: job.options.jobId,
...job.options,
},
}
}
Expand Down Expand Up @@ -123,6 +121,10 @@ export default class RedisQueueProvider implements QueueProvider {

async metrics(period: MetricPeriod): Promise<QueueMetric> {
const waiting = await this.bull.getWaitingCount()
const priorities = await this.bull.getCountsPerPriority([
JobPriority.high,
JobPriority.low,
])
const completed = await this.bull.getMetrics('completed')
const data = completed.data
.slice(0, period)
Expand All @@ -133,7 +135,9 @@ export default class RedisQueueProvider implements QueueProvider {
data.reverse()
return {
data,
waiting,
waiting: waiting
+ priorities[JobPriority.high]
+ priorities[JobPriority.low],
}
}

Expand Down
27 changes: 23 additions & 4 deletions apps/ui/src/views/organization/Performance.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import Tile, { TileGrid } from '../../ui/Tile'
import PageContent from '../../ui/PageContent'
import { SingleSelect } from '../../ui/form/SingleSelect'
import { DataTable, JsonPreview, Modal } from '../../ui'
import { useSearchParams } from 'react-router-dom'

interface Series {
label: string
Expand All @@ -21,8 +22,11 @@ export default function Performance() {

const [metrics, setMetrics] = useState<Series[] | undefined>()

const [searchParams, setSearchParams] = useSearchParams()
const job = searchParams.get('job') ?? undefined

const [jobs, setJobs] = useState<string[]>([])
const [currentJob, setCurrentJob] = useState<string | undefined>()
const [currentJob, setCurrentJob] = useState<string | undefined>(job)
const [jobMetrics, setJobMetrics] = useState<Series[] | undefined>()

const [failed, setFailed] = useState<Array<Record<string, any>>>([])
Expand All @@ -47,7 +51,7 @@ export default function Performance() {
api.organizations.jobs()
.then((jobs) => {
setJobs(jobs)
setCurrentJob(jobs[0])
if (!currentJob) setCurrentJob(jobs[0])
})
.catch(() => {})

Expand All @@ -74,6 +78,14 @@ export default function Performance() {
.catch(() => {})
}, [currentJob])

const handleChangeJob = (job: string | undefined) => {
setCurrentJob(job)
if (job) {
searchParams.set('job', job)
setSearchParams(searchParams)
}
}

const primaryAxis = useMemo(
(): AxisOptions<Metric> => ({
getValue: datum => datum.date,
Expand Down Expand Up @@ -118,7 +130,7 @@ export default function Performance() {
size="small"
options={jobs}
value={currentJob}
onChange={setCurrentJob}
onChange={handleChangeJob}
/>
} />
{jobMetrics && <div style={{ position: 'relative', minHeight: '200px' }}>
Expand All @@ -143,7 +155,14 @@ export default function Performance() {
{ key: 'name', title: 'Name' },
{ key: 'attemptsMade', title: 'Attempts Made' },
{ key: 'failedReason', title: 'Reason' },
{ key: 'timestamp', title: 'Timestamp' },
{
key: 'timestamp',
title: 'Timestamp',
cell: ({ item: { timestamp } }) => {
const date = new Date(timestamp)
return date.toLocaleString()
},
},
]} onSelectRow={row => setSelectedFailed(row) }/>
</div>
</>}
Expand Down

0 comments on commit a6be349

Please sign in to comment.