diff --git a/src/event-bus/event-bus.interface.ts b/src/event-bus/event-bus.interface.ts new file mode 100644 index 0000000..ae7814b --- /dev/null +++ b/src/event-bus/event-bus.interface.ts @@ -0,0 +1,18 @@ +export interface IEvent { + name: string; + payload: T; +} + +export interface IEventClass> { + new (payload: unknown): E; +} + +export interface IEventHandler> { + handle: (event: E) => Promise; +} + +export interface IEventBus { + subscribe>(event: IEventClass, handler: IEventHandler): void; + + publish>(event: E): Promise; +} diff --git a/src/event-bus/event.ts b/src/event-bus/event.ts new file mode 100644 index 0000000..07f2aca --- /dev/null +++ b/src/event-bus/event.ts @@ -0,0 +1,9 @@ +import { IEvent } from './event-bus.interface'; + +export abstract class Event implements IEvent { + readonly name: string; + + protected constructor(public readonly payload: TPayload) { + this.name = this.constructor.name; + } +} diff --git a/src/event-bus/exponential-backoff.spec.ts b/src/event-bus/exponential-backoff.spec.ts new file mode 100644 index 0000000..ce6e4a7 --- /dev/null +++ b/src/event-bus/exponential-backoff.spec.ts @@ -0,0 +1,24 @@ +import { ExponentialBackoff } from './exponential-backoff'; + +describe('ExponentialBackoff', () => { + let exponentialBackoff: ExponentialBackoff; + const initialDelayMs = 1000; + + beforeEach(() => { + exponentialBackoff = new ExponentialBackoff(initialDelayMs); + }); + + describe('When get delay', () => { + it('Should return initial delay when retryCount is 1', () => { + expect(exponentialBackoff.getDelay(1)).toBe(initialDelayMs); + }); + + it('Should return initial delay * 2 when retryCount is 2', () => { + expect(exponentialBackoff.getDelay(2)).toBe(initialDelayMs * 2); + }); + + it('Should return initial delay * 4 when retryCount is 3', () => { + expect(exponentialBackoff.getDelay(3)).toBe(initialDelayMs * 4); + }); + }); +}); diff --git a/src/event-bus/exponential-backoff.ts b/src/event-bus/exponential-backoff.ts new file mode 100644 index 0000000..d4388fb --- /dev/null +++ b/src/event-bus/exponential-backoff.ts @@ -0,0 +1,11 @@ +export interface IRetryMechanism { + getDelay(retryCount: number): number; +} + +export class ExponentialBackoff implements IRetryMechanism { + constructor(private readonly initialDelayMs: number) {} + + public getDelay(retryCount: number): number { + return Math.floor(this.initialDelayMs * Math.pow(2, retryCount - 1)); + } +} diff --git a/src/event-bus/local-event-bus.spec.ts b/src/event-bus/local-event-bus.spec.ts new file mode 100644 index 0000000..716f5b5 --- /dev/null +++ b/src/event-bus/local-event-bus.spec.ts @@ -0,0 +1,260 @@ +import { LocalEventBus } from './local-event-bus'; +import { Event } from './event'; +import { ILogger } from '../logger'; + +class FooEvent extends Event<{ foo: string }> { + constructor(public readonly payload: { foo: string }) { + super(payload); + } +} + +class BarEvent extends Event<{ foo: string }> { + constructor(public readonly payload: { foo: string }) { + super(payload); + } +} + +const loggerMock: ILogger = { + log: jest.fn(), + debug: jest.fn(), + warn: jest.fn(), + error: jest.fn(), +}; + +describe('LocalEventBus', () => { + describe('Given an event bus', () => { + let eventBus: LocalEventBus; + + beforeEach(() => { + eventBus = new LocalEventBus(loggerMock, 3, 100); + }); + + afterEach(() => { + jest.resetAllMocks(); + }); + + describe('Given no subscribed handler to foo event', () => { + describe('When publish a foo event', () => { + it('Should log warning message', async () => { + const event = new FooEvent({ foo: 'bar' }); + await eventBus.publish(event); + + expect(loggerMock.warn).toBeCalledWith(`No handler found for ${FooEvent.name}`); + }); + }); + + describe('When publishAndWaitForHandlers a foo event', () => { + it('Should log warning message', async () => { + const event = new FooEvent({ foo: 'bar' }); + await eventBus.publishAndWaitForHandlers(event); + + expect(loggerMock.warn).toBeCalledWith(`No handler found for ${FooEvent.name}`); + }); + }); + }); + + describe('Given one subscribed handler to foo event', () => { + const handler1Mock = jest.fn(); + + class FooEventHandler { + async handle(event: FooEvent) { + await sleep(10); + await handler1Mock(event); + } + } + + beforeEach(() => { + eventBus.subscribe(FooEvent, new FooEventHandler()); + }); + + describe('When publish a foo event', () => { + it('Should call handler with eventName and payload', async () => { + const event = new FooEvent({ foo: 'bar' }); + await eventBus.publish(event); + + await waitFor(() => expect(handler1Mock).toBeCalledWith(event)); + }); + }); + + describe('When publishAndWaitForHandlers a foo event', () => { + it('Should call handler with eventName and payload', async () => { + const event = new FooEvent({ foo: 'bar' }); + await eventBus.publishAndWaitForHandlers(event); + + expect(handler1Mock).toBeCalledWith(event); + }); + }); + + describe('Given another subscribed handler to foo event', () => { + const handler2Mock = jest.fn(); + + class FooEventHandler2 { + async handle(event: FooEvent) { + await handler2Mock(event); + } + } + + beforeEach(() => { + eventBus.subscribe(FooEvent, new FooEventHandler2()); + }); + + describe('When publish event', () => { + it('Should call two handlers with eventName and payload', async () => { + const event = new FooEvent({ foo: 'bar' }); + await eventBus.publish(event); + + await waitFor(() => expect(handler1Mock).toBeCalledWith(event)); + await waitFor(() => expect(handler2Mock).toBeCalledWith(event)); + }); + }); + }); + + describe('Given a handler subscribed for bar event', () => { + const handler3Mock = jest.fn(); + + class BarEventHandler { + async handle(event: BarEvent) { + await handler3Mock(event); + } + } + + beforeEach(() => { + eventBus.subscribe(BarEvent, new BarEventHandler()); + }); + + describe('When publish FooEvent', () => { + it('Should call only FooEvent handler', async () => { + const event = new FooEvent({ foo: 'bar' }); + await eventBus.publish(event); + + await waitFor(() => expect(handler1Mock).toBeCalledWith(event)); + expect(handler3Mock).not.toBeCalled(); + }); + }); + + describe('When publish BarEvent', () => { + it('Should call only BarEvent handler', async () => { + const event = new BarEvent({ foo: 'bar' }); + await eventBus.publish(event); + + expect(handler1Mock).not.toBeCalled(); + expect(handler3Mock).toBeCalledWith(event); + }); + }); + }); + }); + + describe('Given two subscribed handlers (with one that fail) for foo event', () => { + const handlerOkMock = jest.fn(); + const handlerKoMock = jest.fn(); + + class FooEventHandlerOk { + async handle(event: FooEvent) { + await handlerOkMock(event); + } + } + + class FooEventHandlerKo { + async handle(event: FooEvent) { + await handlerKoMock(event); + throw new Error('ko'); + } + } + + beforeEach(() => { + handlerOkMock.mockResolvedValue('ok'); + handlerKoMock.mockRejectedValue(new Error('ko')); + eventBus.subscribe(FooEvent, new FooEventHandlerOk()); + eventBus.subscribe(FooEvent, new FooEventHandlerKo()); + }); + + describe('When publish event', () => { + const event = new FooEvent({ foo: 'bar' }); + + it('publish should not throw any exception', async () => { + await eventBus.publish(event); + }); + + it('both handler should be called', async () => { + await eventBus.publish(event); + expect(handlerOkMock).toBeCalledWith(event); + expect(handlerKoMock).toBeCalledWith(event); + }); + + it('should log error for failing handler', async () => { + await eventBus.publish(event); + await waitFor(() => + expect(loggerMock.error).toBeCalledWith( + expect.stringContaining('HandlerKo failed to handle FooEvent event'), + ), + ); + }); + }); + }); + + describe('Given one subscribed handler which fails the first execution but not the second', () => { + const handlerMock = jest.fn(); + + class FooEventHandlerOk { + async handle(event: FooEvent) { + await handlerMock(event); + } + } + + beforeEach(() => { + handlerMock.mockRejectedValueOnce(new Error('ko')).mockResolvedValueOnce('ok'); + eventBus.subscribe(FooEvent, new FooEventHandlerOk()); + }); + + describe('When publish event', () => { + const event = new FooEvent({ foo: 'bar' }); + + beforeEach(async () => await eventBus.publish(event)); + + it('handler should be called two times', async () => { + await waitFor(() => { + expect(handlerMock).toBeCalledTimes(2); + }); + }); + + it('should not log error for failing handler', async () => { + await waitFor(() => { + expect(handlerMock).toBeCalledTimes(2); + expect(loggerMock.error).not.toBeCalled(); + }); + }); + + it('should log one retry for failing handler', async () => { + await waitFor(() => { + expect(loggerMock.warn).toBeCalledTimes(1); + expect(loggerMock.warn).toBeCalledWith( + expect.stringContaining('FooEventHandlerOk failed to handle FooEvent event. Attempt 1/3'), + ); + }); + }); + }); + }); + }); +}); + +async function waitFor(statement: () => void, timeout = 1000): Promise { + const startTime = Date.now(); + + let latestStatementError; + while (true) { + try { + statement(); + return; + } catch (e) { + latestStatementError = e; + } + + if (Date.now() - startTime > timeout) throw latestStatementError; + + await new Promise((resolve) => setTimeout(resolve, 100)); + } +} + +async function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/src/event-bus/local-event-bus.ts b/src/event-bus/local-event-bus.ts new file mode 100644 index 0000000..1f36ad8 --- /dev/null +++ b/src/event-bus/local-event-bus.ts @@ -0,0 +1,64 @@ +import { inspect } from 'util'; +import { ILogger } from '../logger'; +import { IEvent, IEventBus, IEventClass, IEventHandler } from './event-bus.interface'; +import { ExponentialBackoff, IRetryMechanism } from './exponential-backoff'; + +export class LocalEventBus implements IEventBus { + private readonly retryMechanism: IRetryMechanism; + + private handlers: { [key: string]: IEventHandler>[] } = {}; + + constructor( + private logger: ILogger, + private readonly retryMaxAttempts = 5, + retryInitialDelay = 100, + ) { + this.retryMechanism = new ExponentialBackoff(retryInitialDelay); + } + + public subscribe>(event: IEventClass, handler: IEventHandler): void { + if (!this.handlers[event.name]) this.handlers[event.name] = []; + this.handlers[event.name].push(handler); + } + + public async publish>(event: T): Promise { + const handlers = this.handlers[event.name] as IEventHandler[]; + if (!handlers || !handlers.length) { + this.logger.warn(`No handler found for ${event.name}`); + return; + } + + void this.handleEvent(event, handlers); + } + + public async publishAndWaitForHandlers>(event: T): Promise { + const handlers = this.handlers[event.name] as IEventHandler[]; + if (!handlers || !handlers.length) { + this.logger.warn(`No handler found for ${event.name}`); + return; + } + + await this.handleEvent(event, handlers); + } + + private async handleEvent>(event: T, handlers: IEventHandler[], attempt = 0) { + const results = await Promise.allSettled(handlers.map((handler) => handler.handle(event))); + results.forEach((result, index) => { + if (result.status === 'fulfilled') return; + + const handler = handlers[index]; + const handlerName = handler.constructor.name; + + if (attempt < this.retryMaxAttempts) { + const nextAttempt = attempt + 1; + const delay = this.retryMechanism.getDelay(nextAttempt); + this.logger.warn( + `${handlerName} failed to handle ${event.name} event. Attempt ${nextAttempt}/${this.retryMaxAttempts}. Delaying for ${delay}ms.`, + ); + setTimeout(() => this.handleEvent(event, [handler], nextAttempt), delay); + return; + } + this.logger.error(`${handlerName} failed to handle ${event.name} event due to ${inspect(result.reason)}`); + }); + } +} diff --git a/src/repo/logger.ts b/src/logger.ts similarity index 77% rename from src/repo/logger.ts rename to src/logger.ts index bd98847..0a5289d 100644 --- a/src/repo/logger.ts +++ b/src/logger.ts @@ -2,4 +2,5 @@ export interface ILogger { log: (message: string, ...optionalParams: any[]) => void; debug: (message: string, ...optionalParams: any[]) => void; warn: (message: string, ...optionalParams: any[]) => void; + error: (message: string, ...optionalParams: any[]) => void; } diff --git a/src/repo/logged-mongo-collection.ts b/src/repo/logged-mongo-collection.ts index 5ee5e01..d7c09fb 100644 --- a/src/repo/logged-mongo-collection.ts +++ b/src/repo/logged-mongo-collection.ts @@ -12,7 +12,7 @@ import { UpdateFilter, UpdateOptions, } from 'mongodb'; -import { ILogger } from './logger'; +import { ILogger } from '../logger'; export class LoggedMongoCollection { public readonly rawCollection: Collection; diff --git a/src/repo/mongo-aggregate-repo.ts b/src/repo/mongo-aggregate-repo.ts index 0ea2248..1f0561e 100644 --- a/src/repo/mongo-aggregate-repo.ts +++ b/src/repo/mongo-aggregate-repo.ts @@ -3,7 +3,7 @@ import { Collection, Document, MongoClient } from 'mongodb'; import { ISerializer } from './serializer.interface'; import { merge } from 'lodash'; import { DuplicatedIdError, OptimisticLockError, RepoHookError } from '../errors'; -import { ILogger } from './logger'; +import { ILogger } from '../logger'; import { IInit } from '../init.interface'; export interface IAggregateRepo { diff --git a/src/repo/mongo-query-repo.ts b/src/repo/mongo-query-repo.ts index 56a6700..3bfb87e 100644 --- a/src/repo/mongo-query-repo.ts +++ b/src/repo/mongo-query-repo.ts @@ -1,6 +1,6 @@ import { Collection, CreateIndexesOptions, Document, IndexSpecification, MongoClient } from 'mongodb'; import { isEmpty } from 'lodash'; -import { ILogger } from './logger'; +import { ILogger } from '../logger'; import { IInit } from '../init.interface'; export abstract class MongoQueryRepo implements IInit {