- Section 19: Listening for Events and Handling Concurrency Issues
- Table of Contents
- Time for Listeners!
- Reminder on Listeners
- Blueprint for Listeners
- A Few More Reminders
- Simple onMessage Implementation
- ID Adjustment
- Ticket Updated Listener Implementation
- Initializing the Listeners
- A Quick Manual Test
- Clear Concurrency Issues
- Reminder on Versioning Records
- Optimistic Concurrency Control
- Mongoose Update-If-Current
- Implementing OCC with Mongoose
- Testing OCC
- One More Test
- Who Updates Versions?
- Including Versions in Events
- Updating Tickets Event Definitions
- Applying a Version Query
- Did it Work?
- Abstracted Query Method
- [Optional] Versioning Without Update-If-Current
- Testing Listeners
- A Complete Listener Test
- Testing the Ack Call
- Testing the Ticket Updated Listener
- Success Case Testing
- Out-Of-Order Events
- The Next Few Video
- Fixing a Few Tests
- Listeners in the Tickets Service
- Building the Listener
- Strategies for Locking a Ticket
- Reserving a Ticket
- Setup for Testing Reservation
- Test Implementation
- Missing Update Event
- Private vs Protected Properties
- Publishing While Listening
- Mock Function Arguments
- Order Cancelled Listener
- A Lightning-Quick Test
- Don't Forget to Listen!
- Rejecting Edits of Reserved Tickets
- Extends from Listener class
- Define subject and queueGroupName variables
- Define onMessage function
- Implement Event interface
import { Message } from 'node-nats-streaming';
import { Subjects, Listener, TicketCreatedEvent } from '@chticketing/common';
import { Ticket } from '../../models/ticket';
export class TicketCreatedListener extends Listener<TicketCreatedEvent> {
subject: Subjects.TicketCreated = Subjects.TicketCreated;
queueGroupName = 'orders-service';
onMessage(data: TicketCreatedEvent['data'], msg: Message) {}
}
- Publisher send a ticket:created Event is send to one of any members in orders-service Queue Group
export const queueGroupName = 'orders-service';
async onMessage(data: TicketCreatedEvent['data'], msg: Message) {
const { title, price } = data;
const ticket = Ticket.build({
title, price
});
await ticket.save();
msg.ack;
}
- Need to adjust Id so both tickets data in Tickets and Orders Service have same Id
ticketSchema.statics.build = (attrs: TicketAttrs) => {
return new Ticket({
_id: attrs.id,
title: attrs.title,
price: attrs.price
});
};
async onMessage(data: TicketCreatedEvent['data'], msg: Message) {
const { id, title, price } = data;
const ticket = Ticket.build({
id,
title,
price,
});
await ticket.save();
msg.ack();
}
import { Message } from 'node-nats-streaming';
import { Subjects, Listener, TicketUpdatedEvent } from '@chticketing/common';
import { Ticket } from '../../models/ticket';
import { queueGroupName } from './queue-group-name';
export class TicketUpdatedListener extends Listener<TicketUpdatedEvent> {
subject: Subjects.TicketUpdated = Subjects.TicketUpdated;
queueGroupName = queueGroupName;
async onMessage(data: TicketUpdatedEvent['data'], msg: Message) {
const ticket = await Ticket.findById(data.id);
if (!ticket) {
throw new Error('Ticket not found');
}
const { title, price } = data;
ticket.set({ title, price });
await ticket.save();
msg.ack();
}
}
new TicketCreatedListener(natsWrapper.client).listen();
new TicketUpdatedListener(natsWrapper.client).listen();
- Sign up
- Create Ticket
- Update Ticker
- Process ticket:updated price: 15 first
- Then, process ticket:updated price: 10
- Create 4 orders service instances
replicas: 4
<!-- #1 method -->
kubectl get pods
kubectl port-forward tickets-mongo-depl-685f7f898-tp27w 27017:27017
mongo mongodb://localhost:27017/tickets
> db.tickets.find({})
<!-- or -->
kubectl port-forward orders-mongo-depl-5b54d94b4d-vwnqw 27017:27017
mongo mongodb://localhost:27017/orders
> db.tickets.find({})
<!-- #2 method -->
kubectl get pods
kubectl exec -it tickets-mongo-depl-664cc88d8f-ss9mv mongo mongodb://localhost:27017/tickets
> db.tickets.find({})
> db.tickets.find({ price: 10 }).length()
> db.tickets.remove({})
kubectl exec -it orders-mongo-depl-59db4f4877-4k9bq mongo mongodb://localhost:27017/orders
> db.tickets.find({})
> db.tickets.find({ price: 10 }).length()
> db.tickets.remove({})
Optimistic Concurrency Control
ticketSchema.set('versionKey', 'version');
ticketSchema.plugin(updateIfCurrentPlugin);
import { Ticket } from '../ticket';
it('implements optimistic concurrency control', async (done) => {
// Create an instance of a ticket
const ticket = Ticket.build({
title: 'concert',
price: 5,
userId: '123',
});
// Save the ticket to the database
await ticket.save();
// fetch the ticket twice
const firstInstance = await Ticket.findById(ticket.id);
const secondInstance = await Ticket.findById(ticket.id);
// make two separate changes to the tickets we fetched
firstInstance!.set({ price: 10 });
secondInstance!.set({ price: 15 });
// save the first fetched ticket
await firstInstance!.save();
// save the second fetched ticket and expect an error
try {
await secondInstance!.save();
} catch (err) {
return done();
}
throw new Error('Should not reach this point');
});
it('increments the version number on multiple saves', async () => {
const ticket = Ticket.build({
title: 'concert',
price: 20,
userId: '123',
});
await ticket.save();
expect(ticket.version).toEqual(0);
await ticket.save();
expect(ticket.version).toEqual(1);
await ticket.save();
expect(ticket.version).toEqual(2);
});
- When should we increment or include the 'version' number of a record with an event?
Increment the 'version' number whenever the primary service responsible for a record emits an event to describe a create/update/destroy to a record
version: number;
await new TicketCreatedPublisher(natsWrapper.client).publish({
id: ticket.id,
title: ticket.title,
price: ticket.price,
userId: ticket.userId,
version: ticket.version
});
await new TicketUpdatedPublisher(natsWrapper.client).publish({
id: ticket.id,
title: ticket.title,
price: ticket.price,
userId: ticket.userId,
version: ticket.version
});
// tickets.ts
ticketSchema.set('versionKey', 'version');
ticketSchema.plugin(updateIfCurrentPlugin);
// ticket-uodated-listener.ts
const ticket = await Ticket.findOne({
_id: data.id,
version: data.version - 1
});
cd section-19/ticketing
skaffold dev
cd ../t
node index.js
kubectl get pods
kubectl exec -it tickets-mongo-depl-664cc88d8f-ss9mv mongo mongodb://localhost:27017/tickets
> db
> db.tickets.find({ price: 15 }).length()
kubectl exec -it orders-mongo-depl-59db4f4877-4k9bq mongo mongodb://localhost:27017/orders
> db
> db.tickets.find({ price: 15 }).length()
ticketSchema.statics.findByEvent = (event: { id: string; version: number }) => {
return Ticket.findOne({
_id: event.id,
version: event.version - 1,
});
};
mongoose-update-if-current
- Updates the version number on records before they are saved
// ticket-updated-listener.ts
const { title, price, version } = data;
ticket.set({ title, price, version });
await ticket.save();
- Customizes the find-and-update operation (save) to look for the correct version
// ticket.ts
ticketSchema.pre('save', function(done) {
// @ts-ignore
this.$where = {
version: this.get('version') - 1
};
done();
})
kubectl get pods
kubectl exec -it orders-mongo-depl-857959646-s576x mongo
> show dbs
> use orders
> db.tickets
> db.tickets.find({ price: 200.34 })
const setup = async () => {
// create an instance of the listener
// create a fake data event
// create a fake message object
};
it('creates and saves a ticket', async () => {
// call the onMessage function with the data object + message object
// write assertions to make sure a ticket was created!
});
it('acks the message', async () => {
// call the onMessage function with the data object + message object
// write assertions to make sure ack function is called
});
const setup = async () => {
// create an instance of the listener
const listener = new TicketCreatedListener(natsWrapper.client);
// create a fake data event
const data: TicketCreatedEvent['data'] = {
version: 0,
id: new mongoose.Types.ObjectId().toHexString(),
title: 'concert',
price: 10,
userId: new mongoose.Types.ObjectId().toHexString(),
};
// create a fake message object
// @ts-ignore
const msg: Message = {
ack: jest.fn(),
};
return { listener, data, msg };
};
it('creates and saves a ticket', async () => {
const { listener, data, msg } = await setup();
// call the onMessage function with the data object + message object
await listener.onMessage(data, msg);
// write assertions to make sure a ticket was created!
const ticket = await Ticket.findById(data.id);
expect(ticket).toBeDefined();
expect(ticket!.title).toEqual(data.title);
expect(ticket!.price).toEqual(data.price);
});
it('acks the message', async () => {
const { data, listener, msg } = await setup();
// call the onMessage function with the data object + message object
await listener.onMessage(data, msg);
// write assertions to make sure ack function is called
expect(msg.ack).toHaveBeenCalled();
});
const setup = async () => {
// Create a listener
const listener = new TicketUpdatedListener(natsWrapper.client);
// Create and save a ticket
const ticket = Ticket.build({
id: mongoose.Types.ObjectId().toHexString(),
title: 'concert',
price: 20,
});
await ticket.save();
// Create a fake data object
const data: TicketUpdatedEvent['data'] = {
id: ticket.id,
version: ticket.version + 1,
title: 'new concert',
price: 999,
userId: 'ablskdjf',
};
// Create a fake msg object
// @ts-ignore
const msg: Message = {
ack: jest.fn(),
};
// return all of this stuff
return { msg, data, ticket, listener };
};
it('finds, updates, and saves a ticket', async () => {
const { msg, data, ticket, listener } = await setup();
await listener.onMessage(data, msg);
const updatedTicket = await Ticket.findById(ticket.id);
expect(updatedTicket!.title).toEqual(data.title);
expect(updatedTicket!.price).toEqual(data.price);
expect(updatedTicket!.version).toEqual(data.version);
});
it('acks the message', async () => {
const { msg, data, listener } = await setup();
await listener.onMessage(data, msg);
expect(msg.ack).toHaveBeenCalled();
});
it('does not call ack if the event has a skipped version number', async () => {
const { msg, data, listener, ticket } = await setup();
data.version = 10;
try {
await listener.onMessage(data, msg);
} catch (err) {}
expect(msg.ack).not.toHaveBeenCalled();
});
- Add the 'mongoose-update-if-current' module into the Orders mo
interface OrderDoc extends mongoose.Document {
userId: string;
status: OrderStatus;
expiresAt: Date;
ticket: TicketDoc;
version: number;
}
orderSchema.set('versionKey', 'version');
orderSchema.plugin(updateIfCurrentPlugin);
- Fix up some tests - we are creating some Tickets in the Orders service without providing them an ID
- Fix up some route handlers - we are publishing events around orders but not providing the version of the order
- Fix up some tests - we are creating some Tickets in the Orders service without providing them an ID
const ticket = Ticket.build({
id: mongoose.Types.ObjectId().toHexString(),
title: 'concert',
price: 20,
});
- Fix up some route handlers - we are publishing events around orders but not providing the version of the order
new OrderCancelledPublisher(natsWrapper.client).publish({
id: order.id,
version: order.version,
ticket: {
id: order.ticket.id,
},
});
import { Message } from 'node-nats-streaming';
import { Listener, OrderCreatedEvent, Subjects } from '@chticketing/common';
import { queueGroupName } from './queue-group-name';
export class OrderCreatedListener extends Listener<OrderCreatedEvent> {
subject: Subjects.OrderCreated = Subjects.OrderCreated;
queueGroupName = queueGroupName;
async onMessage(data: OrderCreatedEvent['data'], msg: Message) {}
}
interface TicketDoc extends mongoose.Document {
title: string;
price: number;
userId: string;
version: number;
orderId?: string;
}
async onMessage(data: OrderCreatedEvent['data'], msg: Message) {
// Find the ticket that the order is reserving
const ticket = await Ticket.findById(data.ticket.id);
// If no ticket, throw error
if (!ticket) {
throw new Error('Ticket not found');
}
// Mark the ticket as being reserved by setting its orderId property
ticket.set({ orderId: data.id });
// Save the ticket
await ticket.save();
// ack the message
msg.ack();
}
const setup = async () => {
// Create an instance of the listener
const listener = new OrderCreatedListener(natsWrapper.client);
// Create and save a ticket
const ticket = Ticket.build({
title: 'concert',
price: 99,
userId: 'asdf',
});
await ticket.save();
// Create the fake data event
const data: OrderCreatedEvent['data'] = {
id: mongoose.Types.ObjectId().toHexString(),
version: 0,
status: OrderStatus.Created,
userId: 'alskdfj',
expiresAt: 'alskdjf',
ticket: {
id: ticket.id,
price: ticket.price,
},
};
// @ts-ignore
const msg: Message = {
ack: jest.fn(),
};
return { listener, ticket, data, msg };
};
it('sets the userId of the ticket', async () => {
const { listener, ticket, data, msg } = await setup();
await listener.onMessage(data, msg);
const updatedTicket = await Ticket.findById(ticket.id);
expect(updatedTicket!.orderId).toEqual(data.id);
});
it('acks the message', async () => {
const { listener, ticket, data, msg } = await setup();
await listener.onMessage(data, msg);
expect(msg.ack).toHaveBeenCalled();
});
// base-listener.ts
export abstract class Listener<T extends Event> {
abstract subject: T['subject'];
abstract queueGroupName: string;
abstract onMessage(data: T['data'], msg: Message): void;
protected client: Stan;
protected ackWait = 5 * 1000;
}
// ticket-updated-event.ts
export interface TicketUpdatedEvent {
subject: Subjects.TicketUpdated;
data: {
id: string;
version: number;
title: string;
price: number;
userId: string;
orderId?: string;
};
}
// order-created-listener.ts
await new TicketUpdatedPublisher(this.client).publish({
id: ticket.id,
price: ticket.price,
title: ticket.title,
userId: ticket.userId,
orderId: ticket.orderId,
version: ticket.version,
});
it('publishes a ticket updated event', async () => {
const { listener, ticket, data, msg } = await setup();
await listener.onMessage(data, msg);
expect(natsWrapper.client.publish).toHaveBeenCalled();
const ticketUpdatedData = JSON.parse(
(natsWrapper.client.publish as jest.Mock).mock.calls[0][1]
);
expect(data.id).toEqual(ticketUpdatedData.orderId);
});
import { Listener, OrderCancelledEvent, Subjects } from '@chticketing/common';
import { Message } from 'node-nats-streaming';
import { queueGroupName } from './queue-group-name';
import { Ticket } from '../../models/ticket';
import { TicketUpdatedPublisher } from '../publishers/ticket-updated-publisher';
export class OrderCancelledListener extends Listener<OrderCancelledEvent> {
subject: Subjects.OrderCancelled = Subjects.OrderCancelled;
queueGroupName = queueGroupName;
async onMessage(data: OrderCancelledEvent['data'], msg: Message) {
const ticket = await Ticket.findById(data.ticket.id);
if (!ticket) {
throw new Error('Ticket not found');
}
ticket.set({ orderId: undefined });
await ticket.save();
await new TicketUpdatedPublisher(this.client).publish({
id: ticket.id,
orderId: ticket.orderId,
userId: ticket.userId,
price: ticket.price,
title: ticket.title,
version: ticket.version,
});
msg.ack();
}
}
it('updates the ticket, publishes an event, and acks the message', async () => {
const { msg, data, ticket, orderId, listener } = await setup();
await listener.onMessage(data, msg);
const updatedTicket = await Ticket.findById(ticket.id);
expect(updatedTicket!.orderId).not.toBeDefined();
expect(msg.ack).toHaveBeenCalled();
expect(natsWrapper.client.publish).toHaveBeenCalled();
});
new OrderCreatedListener(natsWrapper.client).listen();
new OrderCancelledListener(natsWrapper.client).listen();
// update.ts
if (ticket.orderId) {
throw new BadRequestError('Cannot edit a reserved ticket');
}
// update.test.ts
it('rejects updates if the ticket is reserved', async () => {
const cookie = global.signin();
const response = await request(app)
.post('/api/tickets')
.set('Cookie', cookie)
.send({
title: 'asldkfj',
price: 20,
});
const ticket = await Ticket.findById(response.body.id);
ticket!.set({ orderId: mongoose.Types.ObjectId().toHexString() });
await ticket!.save();
await request(app)
.put(`/api/tickets/${response.body.id}`)
.set('Cookie', cookie)
.send({
title: 'new title',
price: 100,
})
.expect(400);
});