Skip to content

Commit

Permalink
Merge pull request #15 from fizzbuds/local-event-bus
Browse files Browse the repository at this point in the history
Local event bus (mediator pattern)
  • Loading branch information
lucagiove authored Mar 16, 2024
2 parents 6cc0b26 + 7554505 commit aa61a43
Show file tree
Hide file tree
Showing 10 changed files with 390 additions and 3 deletions.
18 changes: 18 additions & 0 deletions src/event-bus/event-bus.interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
export interface IEvent<T> {
name: string;
payload: T;
}

export interface IEventClass<E extends IEvent<unknown>> {
new (payload: unknown): E;
}

export interface IEventHandler<E extends IEvent<unknown>> {
handle: (event: E) => Promise<void>;
}

export interface IEventBus {
subscribe<E extends IEvent<unknown>>(event: IEventClass<E>, handler: IEventHandler<E>): void;

publish<E extends IEvent<unknown>>(event: E): Promise<void>;
}
9 changes: 9 additions & 0 deletions src/event-bus/event.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { IEvent } from './event-bus.interface';

export abstract class Event<TPayload> implements IEvent<TPayload> {
readonly name: string;

protected constructor(public readonly payload: TPayload) {
this.name = this.constructor.name;
}
}
24 changes: 24 additions & 0 deletions src/event-bus/exponential-backoff.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { ExponentialBackoff } from './exponential-backoff';

describe('ExponentialBackoff', () => {
let exponentialBackoff: ExponentialBackoff;
const initialDelayMs = 1000;

beforeEach(() => {
exponentialBackoff = new ExponentialBackoff(initialDelayMs);
});

describe('When get delay', () => {
it('Should return initial delay when retryCount is 1', () => {
expect(exponentialBackoff.getDelay(1)).toBe(initialDelayMs);
});

it('Should return initial delay * 2 when retryCount is 2', () => {
expect(exponentialBackoff.getDelay(2)).toBe(initialDelayMs * 2);
});

it('Should return initial delay * 4 when retryCount is 3', () => {
expect(exponentialBackoff.getDelay(3)).toBe(initialDelayMs * 4);
});
});
});
11 changes: 11 additions & 0 deletions src/event-bus/exponential-backoff.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export interface IRetryMechanism {
getDelay(retryCount: number): number;
}

export class ExponentialBackoff implements IRetryMechanism {
constructor(private readonly initialDelayMs: number) {}

public getDelay(retryCount: number): number {
return Math.floor(this.initialDelayMs * Math.pow(2, retryCount - 1));
}
}
260 changes: 260 additions & 0 deletions src/event-bus/local-event-bus.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
import { LocalEventBus } from './local-event-bus';
import { Event } from './event';
import { ILogger } from '../logger';

class FooEvent extends Event<{ foo: string }> {
constructor(public readonly payload: { foo: string }) {
super(payload);
}
}

class BarEvent extends Event<{ foo: string }> {
constructor(public readonly payload: { foo: string }) {
super(payload);
}
}

const loggerMock: ILogger = {
log: jest.fn(),
debug: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
};

describe('LocalEventBus', () => {
describe('Given an event bus', () => {
let eventBus: LocalEventBus;

beforeEach(() => {
eventBus = new LocalEventBus(loggerMock, 3, 100);
});

afterEach(() => {
jest.resetAllMocks();
});

describe('Given no subscribed handler to foo event', () => {
describe('When publish a foo event', () => {
it('Should log warning message', async () => {
const event = new FooEvent({ foo: 'bar' });
await eventBus.publish(event);

expect(loggerMock.warn).toBeCalledWith(`No handler found for ${FooEvent.name}`);
});
});

describe('When publishAndWaitForHandlers a foo event', () => {
it('Should log warning message', async () => {
const event = new FooEvent({ foo: 'bar' });
await eventBus.publishAndWaitForHandlers(event);

expect(loggerMock.warn).toBeCalledWith(`No handler found for ${FooEvent.name}`);
});
});
});

describe('Given one subscribed handler to foo event', () => {
const handler1Mock = jest.fn();

class FooEventHandler {
async handle(event: FooEvent) {
await sleep(10);
await handler1Mock(event);
}
}

beforeEach(() => {
eventBus.subscribe(FooEvent, new FooEventHandler());
});

describe('When publish a foo event', () => {
it('Should call handler with eventName and payload', async () => {
const event = new FooEvent({ foo: 'bar' });
await eventBus.publish(event);

await waitFor(() => expect(handler1Mock).toBeCalledWith(event));
});
});

describe('When publishAndWaitForHandlers a foo event', () => {
it('Should call handler with eventName and payload', async () => {
const event = new FooEvent({ foo: 'bar' });
await eventBus.publishAndWaitForHandlers(event);

expect(handler1Mock).toBeCalledWith(event);
});
});

describe('Given another subscribed handler to foo event', () => {
const handler2Mock = jest.fn();

class FooEventHandler2 {
async handle(event: FooEvent) {
await handler2Mock(event);
}
}

beforeEach(() => {
eventBus.subscribe(FooEvent, new FooEventHandler2());
});

describe('When publish event', () => {
it('Should call two handlers with eventName and payload', async () => {
const event = new FooEvent({ foo: 'bar' });
await eventBus.publish(event);

await waitFor(() => expect(handler1Mock).toBeCalledWith(event));
await waitFor(() => expect(handler2Mock).toBeCalledWith(event));
});
});
});

describe('Given a handler subscribed for bar event', () => {
const handler3Mock = jest.fn();

class BarEventHandler {
async handle(event: BarEvent) {
await handler3Mock(event);
}
}

beforeEach(() => {
eventBus.subscribe(BarEvent, new BarEventHandler());
});

describe('When publish FooEvent', () => {
it('Should call only FooEvent handler', async () => {
const event = new FooEvent({ foo: 'bar' });
await eventBus.publish(event);

await waitFor(() => expect(handler1Mock).toBeCalledWith(event));
expect(handler3Mock).not.toBeCalled();
});
});

describe('When publish BarEvent', () => {
it('Should call only BarEvent handler', async () => {
const event = new BarEvent({ foo: 'bar' });
await eventBus.publish(event);

expect(handler1Mock).not.toBeCalled();
expect(handler3Mock).toBeCalledWith(event);
});
});
});
});

describe('Given two subscribed handlers (with one that fail) for foo event', () => {
const handlerOkMock = jest.fn();
const handlerKoMock = jest.fn();

class FooEventHandlerOk {
async handle(event: FooEvent) {
await handlerOkMock(event);
}
}

class FooEventHandlerKo {
async handle(event: FooEvent) {
await handlerKoMock(event);
throw new Error('ko');
}
}

beforeEach(() => {
handlerOkMock.mockResolvedValue('ok');
handlerKoMock.mockRejectedValue(new Error('ko'));
eventBus.subscribe(FooEvent, new FooEventHandlerOk());
eventBus.subscribe(FooEvent, new FooEventHandlerKo());
});

describe('When publish event', () => {
const event = new FooEvent({ foo: 'bar' });

it('publish should not throw any exception', async () => {
await eventBus.publish(event);
});

it('both handler should be called', async () => {
await eventBus.publish(event);
expect(handlerOkMock).toBeCalledWith(event);
expect(handlerKoMock).toBeCalledWith(event);
});

it('should log error for failing handler', async () => {
await eventBus.publish(event);
await waitFor(() =>
expect(loggerMock.error).toBeCalledWith(
expect.stringContaining('HandlerKo failed to handle FooEvent event'),
),
);
});
});
});

describe('Given one subscribed handler which fails the first execution but not the second', () => {
const handlerMock = jest.fn();

class FooEventHandlerOk {
async handle(event: FooEvent) {
await handlerMock(event);
}
}

beforeEach(() => {
handlerMock.mockRejectedValueOnce(new Error('ko')).mockResolvedValueOnce('ok');
eventBus.subscribe(FooEvent, new FooEventHandlerOk());
});

describe('When publish event', () => {
const event = new FooEvent({ foo: 'bar' });

beforeEach(async () => await eventBus.publish(event));

it('handler should be called two times', async () => {
await waitFor(() => {
expect(handlerMock).toBeCalledTimes(2);
});
});

it('should not log error for failing handler', async () => {
await waitFor(() => {
expect(handlerMock).toBeCalledTimes(2);
expect(loggerMock.error).not.toBeCalled();
});
});

it('should log one retry for failing handler', async () => {
await waitFor(() => {
expect(loggerMock.warn).toBeCalledTimes(1);
expect(loggerMock.warn).toBeCalledWith(
expect.stringContaining('FooEventHandlerOk failed to handle FooEvent event. Attempt 1/3'),
);
});
});
});
});
});
});

async function waitFor(statement: () => void, timeout = 1000): Promise<void> {
const startTime = Date.now();

let latestStatementError;
while (true) {
try {
statement();
return;
} catch (e) {
latestStatementError = e;
}

if (Date.now() - startTime > timeout) throw latestStatementError;

await new Promise((resolve) => setTimeout(resolve, 100));
}
}

async function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
64 changes: 64 additions & 0 deletions src/event-bus/local-event-bus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { inspect } from 'util';
import { ILogger } from '../logger';
import { IEvent, IEventBus, IEventClass, IEventHandler } from './event-bus.interface';
import { ExponentialBackoff, IRetryMechanism } from './exponential-backoff';

export class LocalEventBus implements IEventBus {
private readonly retryMechanism: IRetryMechanism;

private handlers: { [key: string]: IEventHandler<IEvent<unknown>>[] } = {};

constructor(
private logger: ILogger,
private readonly retryMaxAttempts = 5,
retryInitialDelay = 100,
) {
this.retryMechanism = new ExponentialBackoff(retryInitialDelay);
}

public subscribe<T extends IEvent<unknown>>(event: IEventClass<T>, handler: IEventHandler<T>): void {
if (!this.handlers[event.name]) this.handlers[event.name] = [];
this.handlers[event.name].push(handler);
}

public async publish<T extends IEvent<unknown>>(event: T): Promise<void> {
const handlers = this.handlers[event.name] as IEventHandler<T>[];
if (!handlers || !handlers.length) {
this.logger.warn(`No handler found for ${event.name}`);
return;
}

void this.handleEvent(event, handlers);
}

public async publishAndWaitForHandlers<T extends IEvent<unknown>>(event: T): Promise<void> {
const handlers = this.handlers[event.name] as IEventHandler<T>[];
if (!handlers || !handlers.length) {
this.logger.warn(`No handler found for ${event.name}`);
return;
}

await this.handleEvent(event, handlers);
}

private async handleEvent<T extends IEvent<unknown>>(event: T, handlers: IEventHandler<T>[], attempt = 0) {
const results = await Promise.allSettled(handlers.map((handler) => handler.handle(event)));
results.forEach((result, index) => {
if (result.status === 'fulfilled') return;

const handler = handlers[index];
const handlerName = handler.constructor.name;

if (attempt < this.retryMaxAttempts) {
const nextAttempt = attempt + 1;
const delay = this.retryMechanism.getDelay(nextAttempt);
this.logger.warn(
`${handlerName} failed to handle ${event.name} event. Attempt ${nextAttempt}/${this.retryMaxAttempts}. Delaying for ${delay}ms.`,
);
setTimeout(() => this.handleEvent(event, [handler], nextAttempt), delay);
return;
}
this.logger.error(`${handlerName} failed to handle ${event.name} event due to ${inspect(result.reason)}`);
});
}
}
1 change: 1 addition & 0 deletions src/repo/logger.ts → src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ export interface ILogger {
log: (message: string, ...optionalParams: any[]) => void;
debug: (message: string, ...optionalParams: any[]) => void;
warn: (message: string, ...optionalParams: any[]) => void;
error: (message: string, ...optionalParams: any[]) => void;
}
Loading

0 comments on commit aa61a43

Please sign in to comment.