Skip to content

Commit

Permalink
AP-5015 update bullmq metrics plugin to use redis configs instead of …
Browse files Browse the repository at this point in the history
…redis instances (#189)

* AP-5015 update bullmq metrics plugin to use redis configs instead of redis instances

* AP-5015 README update

* added postversion script
  • Loading branch information
kjamrog authored Sep 2, 2024
1 parent f7e1fdf commit 02039ec
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 62 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ This plugin depends on the following peer-installed packages:

Add the plugin to your Fastify instance by registering it with the following possible options:

- `redisClients`, a Redis client instances which are used by the BullMQ: plugin uses it to discover the queues.
- `redisConfigs`, Redis configurations used for BullMQ. Plugin uses them to discover the queues.
- `bullMqPrefix` (optional, default: `bull`). The prefix used by BullMQ to store the queues in Redis;
- `metricsPrefix` (optional, default: `bullmq`). The prefix for the metrics in Prometheus;
- `queueDiscoverer` (optional, default: `BackgroundJobsBasedQueueDiscoverer`). The queue discoverer to use. The default one relies on the logic implemented by `@lokalise/background-jobs-common` where queue names are registered by the background job processors; If you are not using `@lokalise/background-jobs-common`, you can use your own queue discoverer by instantiating a `RedisBasedQueueDiscoverer` or implementing a `QueueDiscoverer` interface;
Expand Down
2 changes: 1 addition & 1 deletion lib/plugins/bull-mq-metrics/MetricsCollector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class MetricsCollector {
.filter((queue) => !this.options.excludedQueues.includes(queue.queueName))
.map(
(queue) =>
new ObservableQueue(queue.queueName, queue.redisInstance, this.metrics, this.logger),
new ObservableQueue(queue.queueName, queue.redisConfig, this.metrics, this.logger),
)
}

Expand Down
8 changes: 4 additions & 4 deletions lib/plugins/bull-mq-metrics/ObservableQueue.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Queue, QueueEvents } from 'bullmq'
import type { FinishedStatus } from 'bullmq'
import type { FastifyBaseLogger } from 'fastify'
import type { Redis } from 'ioredis'

import type { RedisConfig } from '@lokalise/node-core'
import type { Metrics } from './MetricsCollector'

export class ObservableQueue {
Expand Down Expand Up @@ -33,12 +33,12 @@ export class ObservableQueue {

constructor(
readonly name: string,
private readonly redis: Redis,
readonly redisConfig: RedisConfig,
private readonly metrics: Metrics,
private readonly logger: FastifyBaseLogger,
) {
this.queue = new Queue(name, { connection: redis })
this.events = new QueueEvents(name, { connection: redis, autorun: true })
this.queue = new Queue(name, { connection: redisConfig })
this.events = new QueueEvents(name, { connection: redisConfig, autorun: true })

this.events.on('failed', async ({ jobId }) => {
await this.collectDurationMetric(jobId, 'failed')
Expand Down
32 changes: 18 additions & 14 deletions lib/plugins/bull-mq-metrics/queueDiscoverers.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
import { backgroundJobProcessorGetActiveQueueIds } from '@lokalise/background-jobs-common'
import {
backgroundJobProcessorGetActiveQueueIds,
createSanitizedRedisClient,
} from '@lokalise/background-jobs-common'
import type { RedisConfig } from '@lokalise/node-core'
import { PromisePool } from '@supercharge/promise-pool'
import type { Redis } from 'ioredis'

export type QueueDiscoverer = {
discoverQueues: () => Promise<RedisQueue[]>
}

type RedisQueue = {
redisInstance: Redis
redisConfig: RedisConfig
queueName: string
}

const QUEUE_DISCOVERY_CONCURRENCY = 3

export abstract class AbstractRedisBasedQueueDiscoverer implements QueueDiscoverer {
constructor(protected readonly redisInstances: Redis[]) {}
constructor(protected readonly redisConfigs: RedisConfig[]) {}

async discoverQueues(): Promise<RedisQueue[]> {
const { results, errors } = await PromisePool.withConcurrency(QUEUE_DISCOVERY_CONCURRENCY)
.for(this.redisInstances)
.for(this.redisConfigs)
.process((redisInstance) => this.discoverQueuesForInstance(redisInstance))

if (errors.length > 0) {
Expand All @@ -29,19 +32,20 @@ export abstract class AbstractRedisBasedQueueDiscoverer implements QueueDiscover
return results.flat()
}

protected abstract discoverQueuesForInstance(redisInstance: Redis): Promise<RedisQueue[]>
protected abstract discoverQueuesForInstance(redisConfig: RedisConfig): Promise<RedisQueue[]>
}

export class RedisBasedQueueDiscoverer extends AbstractRedisBasedQueueDiscoverer {
constructor(
redisInstances: Redis[],
redisConfigs: RedisConfig[],
private readonly queuesPrefix: string,
) {
super(redisInstances)
super(redisConfigs)
}

protected async discoverQueuesForInstance(redisInstance: Redis): Promise<RedisQueue[]> {
const scanStream = redisInstance.scanStream({
protected async discoverQueuesForInstance(redisConfig: RedisConfig): Promise<RedisQueue[]> {
const redis = createSanitizedRedisClient(redisConfig)
const scanStream = redis.scanStream({
match: `${this.queuesPrefix}:*:meta`,
})

Expand All @@ -57,17 +61,17 @@ export class RedisBasedQueueDiscoverer extends AbstractRedisBasedQueueDiscoverer
return Array.from(queues)
.sort()
.map((queueName) => ({
redisInstance: redisInstance,
redisConfig,
queueName,
}))
}
}

export class BackgroundJobsBasedQueueDiscoverer extends AbstractRedisBasedQueueDiscoverer {
protected async discoverQueuesForInstance(redisInstance: Redis): Promise<RedisQueue[]> {
return backgroundJobProcessorGetActiveQueueIds(redisInstance).then((queueNames) =>
protected async discoverQueuesForInstance(redisConfig: RedisConfig): Promise<RedisQueue[]> {
return backgroundJobProcessorGetActiveQueueIds(redisConfig).then((queueNames) =>
queueNames.map((queueName) => ({
redisInstance,
redisConfig,
queueName,
})),
)
Expand Down
25 changes: 15 additions & 10 deletions lib/plugins/bullMqMetricsPlugin.spec.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import { setTimeout } from 'node:timers/promises'

import { buildClient, sendGet } from '@lokalise/backend-http-client'
import type {
AbstractBackgroundJobProcessor,
BaseJobPayload,
import {
type AbstractBackgroundJobProcessor,
type BaseJobPayload,
createSanitizedRedisClient,
} from '@lokalise/background-jobs-common'
import type { FastifyInstance } from 'fastify'
import fastify from 'fastify'
import type { Redis } from 'ioredis'
import type Redis from 'ioredis'

import { TestBackgroundJobProcessor } from '../../test/mocks/TestBackgroundJobProcessor'
import { TestDepedendencies } from '../../test/mocks/TestDepedendencies'

import type { RedisConfig } from '@lokalise/node-core'
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
import { z } from 'zod'
import { RedisBasedQueueDiscoverer } from './bull-mq-metrics/queueDiscoverers'
Expand Down Expand Up @@ -42,7 +44,7 @@ async function initAppWithBullMqMetrics(
}

await app.register(bullMqMetricsPlugin, {
queueDiscoverer: new RedisBasedQueueDiscoverer(pluginOptions.redisClients, 'bull'),
queueDiscoverer: new RedisBasedQueueDiscoverer(pluginOptions.redisConfigs, 'bull'),
collectionOptions: {
type: 'interval',
intervalInMs: 50,
Expand All @@ -69,11 +71,14 @@ describe('bullMqMetricsPlugin', () => {
let app: FastifyInstance
let dependencies: TestDepedendencies
let processor: AbstractBackgroundJobProcessor<BaseJobPayload, JobReturn>
let redisConfig: RedisConfig
let redis: Redis

beforeEach(async () => {
dependencies = new TestDepedendencies()
redis = dependencies.startRedis()
redisConfig = dependencies.getRedisConfig()

redis = createSanitizedRedisClient(redisConfig)
await redis.flushall()

processor = new TestBackgroundJobProcessor<BaseJobPayload, JobReturn>(
Expand All @@ -88,15 +93,15 @@ describe('bullMqMetricsPlugin', () => {
if (app) {
await app.close()
}
await dependencies.dispose()
await redis.quit()
await processor.dispose()
})

it('throws if fastify-metrics was not initialized', async () => {
await expect(() => {
return initAppWithBullMqMetrics(
{
redisClients: [redis],
redisConfigs: [redisConfig],
},
{
enableMetricsPlugin: false,
Expand All @@ -109,7 +114,7 @@ describe('bullMqMetricsPlugin', () => {

it('exposes metrics collect() function', async () => {
app = await initAppWithBullMqMetrics({
redisClients: [redis],
redisConfigs: [redisConfig],
collectionOptions: {
type: 'manual',
},
Expand Down Expand Up @@ -141,7 +146,7 @@ describe('bullMqMetricsPlugin', () => {

it('works with multiple redis clients', async () => {
app = await initAppWithBullMqMetrics({
redisClients: [redis, redis],
redisConfigs: [redisConfig, redisConfig],
collectionOptions: {
type: 'manual',
},
Expand Down
6 changes: 3 additions & 3 deletions lib/plugins/bullMqMetricsPlugin.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { FastifyInstance } from 'fastify'
import 'fastify-metrics'
import fp from 'fastify-plugin'
import type { Redis } from 'ioredis'

import type { RedisConfig } from '@lokalise/node-core'
import type { CollectionScheduler } from './bull-mq-metrics/CollectionScheduler'
import { PromiseBasedCollectionScheduler } from './bull-mq-metrics/CollectionScheduler'
import type { MetricCollectorOptions } from './bull-mq-metrics/MetricsCollector'
Expand All @@ -19,7 +19,7 @@ declare module 'fastify' {
}

export type BullMqMetricsPluginOptions = {
redisClients: Redis[]
redisConfigs: RedisConfig[]
collectionOptions?:
| {
type: 'interval'
Expand All @@ -46,7 +46,7 @@ function plugin(
const options = {
bullMqPrefix: 'bull',
metricsPrefix: 'bullmq',
queueDiscoverer: new BackgroundJobsBasedQueueDiscoverer(pluginOptions.redisClients),
queueDiscoverer: new BackgroundJobsBasedQueueDiscoverer(pluginOptions.redisConfigs),
excludedQueues: [],
histogramBuckets: [20, 50, 150, 400, 1000, 3000, 8000, 22000, 60000, 150000],
collectionOptions: {
Expand Down
20 changes: 5 additions & 15 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,9 @@
"type": "git",
"url": "git://github.com/lokalise/fastify-extras.git"
},
"keywords": [
"fastify",
"newrelic",
"bugsnag",
"request-context",
"request-id",
"split-io"
],
"keywords": ["fastify", "newrelic", "bugsnag", "request-context", "request-id", "split-io"],
"homepage": "https://github.com/lokalise/fastify-extras",
"files": [
"dist/**",
"LICENSE",
"README.md"
],
"files": ["dist/**", "LICENSE", "README.md"],
"main": "dist/index.js",
"types": "dist/index.d.ts",
"type": "commonjs",
Expand All @@ -40,13 +29,14 @@
"lint:fix": "biome check --write",
"docker:start": "docker compose -f docker-compose.yml up --build -d redis && docker compose -f docker-compose.yml up --build -d wait_for_redis",
"docker:stop": "docker compose -f docker-compose.yml down",
"version": "auto-changelog -p && git add CHANGELOG.md"
"version": "auto-changelog -p && git add CHANGELOG.md",
"postversion": "biome check --write package.json"
},
"dependencies": {
"@bugsnag/js": "^7.25.0",
"@supercharge/promise-pool": "^3.2.0",
"@lokalise/error-utils": "^2.0.0",
"@lokalise/background-jobs-common": "^7.1.0",
"@lokalise/background-jobs-common": "^7.6.0",
"@splitsoftware/splitio": "^10.27.0",
"@amplitude/analytics-node": "^1.3.6",
"fastify-metrics": "^11.0.0",
Expand Down
29 changes: 15 additions & 14 deletions test/mocks/TestDepedendencies.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { BackgroundJobProcessorDependencies } from '@lokalise/background-jobs-common'
import { CommonBullmqFactory } from '@lokalise/background-jobs-common'
import { globalLogger } from '@lokalise/node-core'
import { Redis } from 'ioredis'
import { type RedisConfig, globalLogger } from '@lokalise/node-core'
import type { MockInstance } from 'vitest'
import { vi, vitest } from 'vitest'

Expand All @@ -10,8 +9,6 @@ export let lastInfoSpy: MockInstance
export let lastErrorSpy: MockInstance

export class TestDepedendencies {
private client?: Redis

// eslint-disable-next-line @typescript-eslint/no-explicit-any
createMocksForBackgroundJobProcessor(): BackgroundJobProcessorDependencies<any, any> {
const originalChildFn = testLogger.child.bind(testLogger)
Expand All @@ -37,11 +34,7 @@ export class TestDepedendencies {
}
}

async dispose(): Promise<void> {
await this.client?.quit()
}

startRedis(): Redis {
getRedisConfig(): RedisConfig {
const db = process.env.REDIS_DB ? Number.parseInt(process.env.REDIS_DB) : undefined
const host = process.env.REDIS_HOST
const port = process.env.REDIS_PORT ? Number(process.env.REDIS_PORT) : undefined
Expand All @@ -53,18 +46,26 @@ export class TestDepedendencies {
const commandTimeout = process.env.REDIS_COMMAND_TIMEOUT
? Number.parseInt(process.env.REDIS_COMMAND_TIMEOUT, 10)
: undefined
this.client = new Redis({

if (!host) {
throw new Error('Missing REDIS_HOST env')
}

if (!port) {
throw new Error('Missing REDIS_PORT env')
}

return {
host,
db,
port,
username,
password,
connectTimeout,
commandTimeout,
maxRetriesPerRequest: null,
enableReadyCheck: false,
})

return this.client
maxRetriesPerRequest: null,
useTls: false,
}
}
}

0 comments on commit 02039ec

Please sign in to comment.