Skip to content

Commit

Permalink
feat(rabbit-bus): add more logging
Browse files Browse the repository at this point in the history
  • Loading branch information
gtoselli committed Apr 23, 2024
1 parent 6c179c0 commit f5c753b
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 1 deletion.
5 changes: 5 additions & 0 deletions .changeset/good-penguins-sit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@fizzbuds/ddd-toolkit-rabbit-bus": patch
---

better logging
5 changes: 4 additions & 1 deletion packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export class RabbitConnection {
await this.setupChannel();
await this.setupExchanges();
await this.setupDqlQueue();
this.logger.debug('Rabbit connection established');
this.logger.debug(`Rabbit connection established`);
} catch (error) {
this.logger.error(`Error connection ${inspect(error)}`);
throw error;
Expand Down Expand Up @@ -80,6 +80,7 @@ export class RabbitConnection {
private async setupChannel() {
this.channel = await this.connection.createConfirmChannel();
await this.channel.prefetch(this.prefetch);
this.logger.debug(`Channel created with prefetch ${this.prefetch}`);

this.channel.on('error', async (err) => {
if (!this.stopping) return;
Expand All @@ -95,6 +96,7 @@ export class RabbitConnection {
durable: true,
});
await this.channel.assertExchange(this.deadLetterExchangeName, 'topic');
this.logger.debug(`Exchange ${this.exchangeName} asserted`);
}

private async setupDqlQueue() {
Expand All @@ -106,5 +108,6 @@ export class RabbitConnection {
},
});
await this.channel.bindQueue(this.deadLetterQueueName, this.deadLetterExchangeName, '#');
this.logger.debug(`Dlq ${this.deadLetterQueueName} asserted`);
}
}
5 changes: 5 additions & 0 deletions packages/ddd-toolkit-rabbit-bus/src/rabbit-event-bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,17 @@ export class RabbitEventBus implements IEventBus {

await this.connection.getChannel().consume(queueName, (msg) => this.onMessage(msg, queueName));
await this.connection.getChannel().bindQueue(queueName, this.exchangeName, event.name);
this.logger.debug(
`Handler ${handler.constructor.name} subscribed to event ${event.name} on queue ${queueName}`,
);
}

public async publish<T extends IEvent<unknown>>(event: T): Promise<void> {
const serializedEvent = JSON.stringify(event);
const message = Buffer.from(serializedEvent);
this.connection.getChannel().publish(this.exchangeName, event.name, message);
await this.connection.getChannel().waitForConfirms();
this.logger.debug(`Event ${event.name} published. ${serializedEvent}`);
}

public async terminate(): Promise<void> {
Expand Down Expand Up @@ -103,6 +107,7 @@ export class RabbitEventBus implements IEventBus {
try {
await handler.handle(event);
this.connection.getChannel().ack(rawMessage);
this.logger.debug(`Event ${event.name} handled by ${handler.constructor.name} successfully`);
} catch (e) {
this.logger.warn(`Error handling message due ${inspect(e)}`);
const deliveryCount = rawMessage.properties.headers?.['x-delivery-count'] || 0;
Expand Down

0 comments on commit f5c753b

Please sign in to comment.