- Section 14: NATS Streaming Server - An Event Bus Implementation
- Table of Contents
- What Now?
- Three Important Items
- Creating a NATS Streaming Deployment
- Big Notes on NATS Streaming
- Building a NATS Test Project
- Port-Forwarding with Kubectl
- Publishing Events
- Listening For Data
- Accessing Event Data
- Client ID Generation
- Queue Groups
- Manual Ack Mode
- Client Health Checks
- Graceful Client Shutdown
- Core Concurrency Issues
- Common Questions
- [Optional] More Possible Concurrency Solutions
- Solving Concurrency Issues
- Concurrency Control with the Tickets App
- Event Redelivery
- Durable Subscriptions
NATS Streaming Server
- Docs at: docs.nats.io
- NATS and NATS Streaming Server are two different things
- NATS Streaming implements some extraordinarily important design decisions that will affect our app
- We are going to run the official 'nats-streaming' docker image in kubernetes.
- Need to read the image's docs: Commandline Options
- Event-Driven Microservices With NATS Streaming
apiVersion: apps/v1
kind: Deployment
metadata:
name: nats-depl
spec:
replicas: 1
selector:
matchLabels:
app: nats
template:
metadata:
labels:
app: nats
spec:
containers:
- name: nats
image: nats-streaming:0.17.0
args:
[
'-p',
'4222',
'-m',
'8222',
'-hbi',
'5s',
'-hbt',
'5s',
'-hbf',
'2',
'-SD',
'-cid',
'ticketing',
]
---
apiVersion: v1
kind: Service
metadata:
name: nats-srv
spec:
selector:
app: nats
ports:
- name: client
protocol: TCP
port: 4222
targetPort: 4222
- name: monitoring
protocol: TCP
port: 8222
targetPort: 8222
cd section-14/ticketing
skaffold dev
kubectl get pods
Stan.js - Node.js client for NATS Streaming
Short Term Goal
- Create a new sub-project with typescript support
- Install node-nats-streaming library and connect to nats streaming server
- We should have two npm scripts, one to run code to emit events, and one to run code to listen for events
- This program will be ran outside of kubernetes!
// publisher.ts
import nats from 'node-nats-streaming';
const stan = nats.connect('ticketing', 'abc', {
url: 'http://localhost:4222',
});
stan.on('connect', () => {
console.log('Publisher connected to NATS');
});
- Option #3 is selected for small test program
kubectl get pods
kubectl port-forward nats-depl-7cf98f65b8-p8nk6 4222:4222
cd section-14/ticketing/nats-test
npm run publish
// publisher.ts
import nats from 'node-nats-streaming';
const stan = nats.connect('ticketing', 'abc', {
url: 'http://localhost:4222',
});
stan.on('connect', () => {
console.log('Publisher connected to NATS');
const data = JSON.stringify({
id: '123',
title: 'concert',
price: 20
});
stan.publish('ticket:created', data, () => {
console.log('Event published');
})
});
// listener.ts
import nats from 'node-nats-streaming';
console.clear();
const stan = nats.connect('ticketing', '123', {
url: 'http://localhost:4222',
});
stan.on('connect', () => {
console.log('Listener connected to NATS');
const subscription = stan.subscribe('ticket:created');
subscription.on('message', (msg) => {
console.log('Message recieved');
});
});
import nats, { Message } from 'node-nats-streaming';
console.clear();
const stan = nats.connect('ticketing', '123', {
url: 'http://localhost:4222',
});
stan.on('connect', () => {
console.log('Listener connected to NATS');
const subscription = stan.subscribe('ticket:created');
subscription.on('message', (msg: Message) => {
const data = msg.getData();
if (typeof data === 'string') {
console.log(`Received event #${msg.getSequence()}, with data: ${data}`);
}
});
});
import { randomBytes } from 'crypto';
const stan = nats.connect('ticketing', randomBytes(4).toString('hex'), {
url: 'http://localhost:4222',
});
const subscription = stan.subscribe(
'ticket:created',
'orders-service-queue-group'
);
- listener join 'orders-service-queue-group'
- publisher send a event
- only one listener in 'orders-service-queue-group' receive the event at a time
- event can be lost for auto acknowledgement when error occurs
- listener manually acknowledge once it process the message successfully
stan.on('connect', () => {
console.log('Listener connected to NATS');
const options = stan
.subscriptionOptions()
.setManualAckMode(true);
const subscription = stan.subscribe(
'ticket:created',
'orders-service-queue-group',
options
);
subscription.on('message', (msg: Message) => {
const data = msg.getData();
if (typeof data === 'string') {
console.log(`Received event #${msg.getSequence()}, with data: ${data}`);
}
msg.ack();
});
});
- monitoring port 8222 for debugging
kubectl get pods
kubectl port-forward nats-depl-7cf98f65b8-p8nk6 8222:8222
- open chrome
- goto localhost:8222/streaming
goto http://localhost:8222/streaming/channelsz?subs=1
- 2 listeners are available
- if re-start one listener, within 30s there are 3 listeners
- after 30s, drops back to 2 listeners
const stan = nats.connect('ticketing', randomBytes(4).toString('hex'), {
url: 'http://localhost:4222',
});
stan.on('connect', () => {
console.log('Listener connected to NATS');
stan.on('close', () => {
console.log('NATS connection closed!');
process.exit();
});
const options = stan
.subscriptionOptions()
.setManualAckMode(true);
const subscription = stan.subscribe(
'ticket:created',
'orders-service-queue-group',
options
);
subscription.on('message', (msg: Message) => {
const data = msg.getData();
if (typeof data === 'string') {
console.log(`Received event #${msg.getSequence()}, with data: ${data}`);
}
msg.ack();
});
});
process.on('SIGINT', () => stan.close());
process.on('SIGTERM', () => stan.close());
- Success
- Fail to update +$70 at file storage
- One listener might run more quicker than another
- -$100 is done faster than +$70 and +$40
- NATS might think a client is still alive when it is dead
- We might receive the same event twice
- Async (event-based) communication sounds terrible, right?!?!
- Oh, turns out this happens with sync communications
- Oh, and it happens with classic monolith style apps too.
- Instance A and B are busy
- Instance C do -$100 before +$70 and +$40 complete
- receive +$70, +$40 and -$100 events, any event can fail too
- bottleneck for listener
- hard to scale
- vertically: increase specification per service
- horizontally: add more instance of the service
Solution that won't work #2 - Figure out every possible error case and write code to handle it
- An infinite number of things can fail
- Engineering time = $$$$$
- Does it matter if two tweets are out of order?
- Share state between services of last event processed
- Event #1 fail. Cannot +$70 to User A account
- Event #2: +$40 to User B account will be delay
- Last event processed tracked by resource ID
- Last Sequence ID
- We are working with a poorly designed system and relying on NATS to somehow save us
- We should revisit the service design.
- If we redesign the system, a better solution to this concurrency stuff will present itself
const options = stan
.subscriptionOptions()
.setManualAckMode(true)
.setDeliverAllAvailable();
const options = stan
.subscriptionOptions()
.setManualAckMode(true)
.setDeliverAllAvailable()
.setDurableName('accounting-service');
const subscription = stan.subscribe(
'ticket:created',
'queue-group-name',
options
);