Releases: oskardudycz/EventSourcing.NodeJS
Introduction to Event Sourcing Workshop
Introduction to Event Sourcing Workshop
Event Sourcing is perceived as a complex pattern. Some believe that it's like Nessie, everyone's heard about it, but rarely seen it. In fact, Event Sourcing is a pretty practical and straightforward concept. It helps build predictable applications closer to business. Nowadays, storage is cheap, and information is priceless. In Event Sourcing, no data is lost.
The workshop aims to build the knowledge of the general concept and its related patterns for the participants. The acquired knowledge will allow for the conscious design of architectural solutions and the analysis of associated risks.
The emphasis will be on a pragmatic understanding of architectures and applying it in practice using Marten and EventStoreDB.
- Introduction to Event-Driven Architectures. Differences from the classical approach are foundations and terminology (event, event streams, command, query).
- What is Event Sourcing, and how is it different from Event Streaming. Advantages and disadvantages.
- Write model, data consistency guarantees on examples from Marten and EventStoreDB.
- Various ways of handling business logic: Aggregates, Command Handlers, functional approach.
- Projections, best practices and concerns for building read models from events on the examples from Marten and EventStoreDB.
- Challenges in Event Sourcing and EDA: deliverability guarantees, sequence of event handling, idempotency, etc.
- Saga, Choreography, Process Manager, distributed processes in practice.
- Event Sourcing in the context of application architecture, integration with other approaches (CQRS, microservices, messaging, etc.).
- Good and bad practices in event modelling.
- Event Sourcing on production, evolution, events' schema versioning, etc.
You can do the workshop as a self-paced kit. That should give you a good foundation for starting your journey with Event Sourcing and learning tools like Marten and EventStoreDB. If you'd like to get full coverage with all nuances of the private workshop, feel free to contact me via email.
Read also more in my article Introduction to Event Sourcing - Self Paced Kit.
Exercises
Follow the instructions in exercises folders.
- Events definition.
- Getting State from events.
- Appending Events:
- Getting State from events
- Business logic:
- Optimistic Concurrency:
- Projections:
Prerequisites
- Install git - https://git-scm.com/downloads.
- Clone this repository.
- Install Node.js 18 - https://Node.js.org/en/download/ (Or better using NVM).
- Install VSCode, WebStorm or other prefered IDE.
- Install docker - https://docs.docker.com/engine/install/.
- Open the current folder in IDE.
Setup
- Install NPM packages by running:
npm install
. - Build source codes:
npm run build
. - If you're using VSCode, you may consider importing profile from the ./.vscode/Node.js.code-profile to get all recommended plugins.
Ensuring that all is setup correctly
- Run:
docker-compose up
to start EventStoreDB docker image. - Run
npm run test:solved
. If all is fine then all tests should be green.
Running
- Run:
docker-compose up
to start EventStoreDB docker image.You should automatically get EventStoreDB UI: http://localhost:2113/ - You can get build watch by running
npm run build:ts:watch
. - To run test for exercises run
npm run test:exercise
. For solutions runnpm run test:solved
, for allnpm run test
. - Whe you're working with exercise and want to have tests running on file change run
npm run test:exercise:watch
.
Added code for full sample with optimistic concurrency and eventual consistency
Added code for the entire sample with optimistic concurrency and eventual consistency described in articles:
Added example of migrating from CRUD to Event Sourcing
From CRUD to Event-Sourced application
Overview
Nowadays, storage is cheap, but the information is priceless. Event sourcing is a valuable pattern that gives you more options around running, tracking and understanding the business workflow. This sample shows how you could benefit and migrate from the traditional approach.
Business scenario
Imagine that the company you work for has an efficiently developed ECommerce platform. The business situation looks so stable that the business is looking for new opportunities. Your current system is now made classically: monolith in Cloud, normalized relational base, ORM, etc.
The business concluded that the information about the final state of the shopping cart is not sufficient. Having the entire workflow history, it would be possible to analyze better the operations performed by the user (e.g. analysis of related products, products taken out of the basket, abandoned baskets, etc.). The business also wants to handle diagnostics and support better to solve reported problems.
The sample focuses on the specific shopping cart workflow, assuming you can reuse the strategies for other functionalities accordingly.
Having the following shopping cart process:
- The customer may add a product to the shopping cart only after opening it.
- When selecting and adding a product to the basket customer needs to provide the quantity chosen. The system calculates the product price based on the current price list.
- The customer may remove a product from the cart.
- The customer can confirm the shopping cart and start the order fulfilment process.
- The customer may also cancel the shopping cart and reject all selected products.
- After shopping cart confirmation or cancellation, the product can no longer be added or removed from the cart.
CRUD application
The current application is a NodeJS monolith application. It has two endpoints:
- insert/update - used for all the business operations, updates database state with the new values,
- get - returns the current shopping cart state.
The assumption to have such a generic approach was that we're using a rich front-end Single Page Application that drives the business logic. For such a case, it may be reasonable just to validate data from the client and put them into the database. It also assumes that the database will enforce invariants/constraints. It may be valid in the first phase, where workflow is simple, and we do not require more sophisticated business logic and are fine with losing business context on each update.
Added code samples to "Introduction to Event Sourcing in TypeScript and NodeJS with EventStoreDB" webinar
Added code samples for "Introduction to Event Sourcing in TypeScript and NodeJS with EventStoreDB"
Watch it below 👇
Code: https://github.com/oskardudycz/EventSourcing.NodeJS/tree/main/samples/webinar
Added sample showing how to retrieve the current state from events
Retrieving the current state from events
In Event Sourcing, the state is stored in events. Events are logically grouped into streams. Streams can be thought of as the entities' representation. Traditionally (e.g. in relational or document approach), each entity is stored as a separate record.
Id | IssuerName | IssuerAddress | Amount | Number | IssuedAt |
---|---|---|---|---|---|
e44f813c | Oscar the Grouch | 123 Sesame Street | 34.12 | INV/2021/11/01 | 2021-11-01 |
In Event Sourcing, the entity is stored as the series of events that happened for this specific object, e.g. InvoiceInitiated
, InvoiceIssued
, InvoiceSent
.
[
{
"id": "e44f813c-1a2f-4747-aed5-086805c6450e",
"type": "invoice-initiated",
"streamId": "INV/2021/11/01",
"streamPosition": 1,
"timestamp": "2021-11-01T00:05:32.000Z",
"data":
{
"issuedTo": {
"name": "Oscar the Grouch",
"address": "123 Sesame Street",
},
"amount": 34.12,
"number": "INV/2021/11/01",
"initiatedAt": "2021-11-01T00:05:32.000Z"
}
},
{
"id": "5421d67d-d0fe-4c4c-b232-ff284810fb59",
"type": "invoice-issued",
"streamId": "INV/2021/11/01",
"streamPosition": 2,
"timestamp": "2021-11-01T00:11:32.000Z",
"data":
{
"issuedTo": "Cookie Monster",
"issuedAt": "2021-11-01T00:11:32.000Z"
}
},
{
"id": "637cfe0f-ed38-4595-8b17-2534cc706abf",
"type": "invoice-sent",
"streamId": "INV/2021/11/01",
"streamPosition": 3,
"timestamp": "2021-11-01T00:12:01.000Z",
"data":
{
"sentVia": "email",
"sentAt": "2021-11-01T00:12:01.000Z"
}
}
]
All of those events shares the stream id ("streamId": "INV/2021/11/01"
), and have incremented stream position.
We can get to conclusion that in Event Sourcing entity is represented by stream, so sequence of event correlated by the stream id ordered by stream position.
To get the current state of entity we need to perform the stream aggregation process. We're translating the set of events into a single entity. This can be done with the following the steps:
- Read all events for the specific stream.
- Order them ascending in the order of appearance (by the event's stream position).
- Apply each event on the entity.
This process is called also stream aggregation or state rehydration.
For this process we'll use the reduce function. It executes a reducer function (that you can provide) on each array element, resulting in a single output value. TypeScript extends it with the type guarantees:
- reduce in TypeScript is a generic method. It allows to provide the result type as a parameter. It doesn’t have to be the same as type of the array elements.
- You can also use optional param to provide the default value for accumulation.
- Use Partial as the generic reduce param. It constructs a type with all properties of Type set to optional. This utility will return a type that represents all subsets of a given type. This is extremely important, as TypeScript forces you to define all required properties. We'll be merging different states of the aggregate state into the final one. Only the first event (
InvoiceInitiated
) will provide all required fields. The other events will just do a partial update (InvoiceSent
only changes the status and sets the sending method and date).
Having event types defined as:
type InvoiceInitiated = Event<
'invoice-initiated',
{
number: string;
amount: number;
issuedTo: Person;
initiatedAt: Date;
}
>;
type InvoiceIssued = Event<
'invoice-issued',
{
number: string;
issuedBy: string;
issuedAt: Date;
}
>;
type InvoiceSent = Event<
'invoice-sent',
{
number: string;
sentVia: InvoiceSendMethod;
sentAt: Date;
}
>;
Entity as:
type Invoice = Readonly<{
number: string;
amount: number;
status: InvoiceStatus;
issuedTo: Person;
initiatedAt: Date;
issued?: Readonly<{
by?: string;
at?: Date;
}>;
sent?: Readonly<{
via?: InvoiceSendMethod;
at?: Date;
}>;
}>;
We can rebuild the state with events using the reduce function:
const result = events.reduce<Partial<Invoice>>((currentState, event) => {
switch (event.type) {
case 'invoice-initiated':
return {
number: event.data.number,
amount: event.data.amount,
status: InvoiceStatus.INITIATED,
issuedTo: event.data.issuedTo,
initiatedAt: event.data.initiatedAt,
};
case 'invoice-issued': {
return {
...currentState,
status: InvoiceStatus.ISSUED,
issued: {
by: event.data.issuedBy,
at: event.data.issuedAt,
},
};
}
case 'invoice-sent': {
return {
...currentState,
status: InvoiceStatus.SENT,
sent: {
via: event.data.sentVia,
at: event.data.sentAt,
},
};
}
default:
throw 'Unexpected event type';
}
}, {});
The only thing left is to translate Partial<Invoice>
into properly typed Invoice
. We'll use type guard for that:
function isInvoice(invoice: Partial<Invoice>): invoice is Invoice {
return (
!!invoice.number &&
!!invoice.amount &&
!!invoice.status &&
!!invoice.issuedTo &&
!!invoice.initiatedAt &&
(!invoice.issued || (!!invoice.issued.at && !!invoice.issued.by)) &&
(!invoice.sent || (!!invoice.sent.via && !!invoice.sent.at))
);
}
if(!isInvoice(result))
throw "Invoice state is not valid!";
const reservation: Invoice = result;
Thanks to that, we have a proper type definition. We can make the stream aggregation more generic and reusable:
export function aggregateStream<Aggregate, StreamEvents extends Event>(
events: StreamEvents[],
when: (
currentState: Partial<Aggregate>,
event: StreamEvents,
currentIndex: number,
allEvents: StreamEvents[]
) => Partial<Aggregate>,
check?: (state: Partial<Aggregate>) => state is Aggregate
): Aggregate {
const state = events.reduce<Partial<Aggregate>>(when, {});
if (!check) {
console.warn('No type check method was provided in the aggregate method');
return <Aggregate>state;
}
if (!check(state)) throw 'Aggregate state is not valid';
return state;
}
See full sample: link.
Read more in my article:
Initial description for Event Sourcing foundations
Event Sourcing
What is Event Sourcing?
Event Sourcing is a design pattern in which results of business operations are stored as a series of events.
It is an alternative way to persist data. In contrast with state-oriented persistence that only keeps the latest version of the entity state, Event Sourcing stores each state change as a separate event.
Thanks for that, no business data is lost. Each operation results in the event stored in the databse. That enables extended auditing and diagnostics capabilities (both technically and business-wise). What's more, as events contains the business context, it allows wide business analysis and reporting.
In this repository I'm showing different aspects, patterns around Event Sourcing. From the basic to advanced practices.
What is Event?
Events, represent facts in the past. They carry information about something accomplished. It should be named in the past tense, e.g. "user added", "order status changed to confirmed". Events are not directed to a specific recipient - they're broadcasted information. It's like telling a story at a party. We hope that someone listens to us, but we may quickly realise that no one is paying attention.
Events:
- are immutable: "What has been seen, cannot be unseen".
- can be ignored but cannot be retracted (as you cannot change the past).
- can be interpreted differently. The basketball match result is a fact. Winning team fans will interpret it positively. Losing team fans - not so much.
Read more in my blog posts:
- 📝 What's the difference between a command and an event?
- 📝 Events should be as small as possible, right?
What is Stream?
Events are logically grouped into streams. In Event Sourcing, streams are the representation of the entities. All the entity state mutations ends up as the persisted events. Entity state is retrieved by reading all the stream events and applying them one by one in the order of appearance.
A stream should have a unique identifier representing the specific object. Each event has its own unique position within a stream. This position is usually represented by a numeric, incremental value. This number can be used to define the order of the events while retrieving the state. It can be also used to detect concurrency issues.
Event representation
Technically events are messages.
They may be represented, e.g. in JSON, Binary, XML format. Besides the data, they usually contain:
- id:
- type: name of the event, e.g. "invoice issued".
- stream id: object id for which event was registered (e.g. invoice id).
- stream position (also named version, order of occurrence, etc.): the number used to decide the order of the event's occurrence for the specific object (stream).
- timestamp: representing a time at which the event happened.
- other metadata like
correlation id
,causation id
, etc.
Sample event JSON can look like:
{
"id": "e44f813c-1a2f-4747-aed5-086805c6450e",
"type": "invoice-issued",
"streamId": "INV/2021/11/01",
"streamPosition": 1,
"timestamp": "2021-11-01T00:05:32.000Z",
"data":
{
"issuer": {
"name": "Oscar the Grouch",
"address": "123 Sesame Street",
},
"amount": 34.12,
"number": "INV/2021/11/01",
"issuedAt": "2021-11-01T00:05:32.000Z"
},
"metadata":
{
"correlationId": "1fecc92e-3197-4191-b929-bd306e1110a4",
"causationId": "c3cf07e8-9f2f-4c2d-a8e9-f8a612b4a7f1"
}
}
This structure could be translated directly into the TypeScript class. However, to make the code less redundant and ensure that all events follow the same convention, it's worth adding the base type. It could look as follows:
type Event<
EventType extends string = string,
EventData extends Record<string, unknown> = Record<string, unknown>
> = {
readonly type: EventType;
readonly data: EventData;
};
Several things are going on there:
- event type definition is not directly string, but it might be defined differently (
EventType extends string = string
). It's added to be able to define the alias for the event type. Thanks to that, we're getting compiler check and IntelliSense support, - event data is defined as Record (
EventData extends Record<string, unknown> = Record<string, unknown>
). It is the way of telling the TypeScript compiler that it may expect any type but allows you to specify your own and get a proper type check. - both
type
anddata
are marked asreadonly
. Having that compiler won't allow us to change the value after the initial object assignment. Thanks to that, we're getting the immutability.
Having that we can define the event as eg.:
// alias for event type
type INVOICE_ISSUED = 'invoice-issued';
// issuer DTO used in event data
type Issuer = {
readonly name: string,
readonly address: string,
}
// event type definition
type InvoiceIssued = Event<
INVOICE_ISSUED,
{
readonly issuer: Issuer,
readonly amount: number,
readonly number: string,
readonly issuedAt: Date
}
>
then create it as:
const invoiceIssued: InvoiceIssued = {
type: 'invoice-issued',
data: {
issuer: {
name: 'Oscar the Grouch',
address: '123 Sesame Street',
},
amount: 34.12,
number: 'INV/2021/11/01',
issuedAt: new Date()
},
}
Event Store
Event Sourcing is not related to any type of storage implementation. As long as it fulfils the assumptions, it can be implemented having any backing database (relational, document, etc.). The state has to be represented by the append-only log of events. The events are stored in chronological order, and new events are appended to the previous event. Event Stores are the databases' category explicitly designed for such purpose.
The simplest (dummy and in-memory) Event Store can be defined in TypeScript as:
class EventStore {
private events: { readonly streamId: string; readonly data: string }[] = [];
appendToStream(streamId: string, ...events: any[]): void {
const serialisedEvents = events.map((event) => {
return { streamId: streamId, data: JSON.stringify(event) };
});
this.events.push(...serialisedEvents);
}
readFromStream<T = any>(streamId: string): T[] {
return this.events
.filter((event) => event.streamId === streamId)
.map<T>((event) => JSON.parse(event.data));
}
}
In the further samples, I'll use EventStoreDB. It's the battle-tested OSS database created and maintained by the Event Sourcing authorities (e.g. Greg Young). It supports many dev environments via gRPC clients, including NodeJS.
API tests with SuperTest
SuperTest is a useful library that allows testing Express HTTP applications.
To install it run:
npm i -D supertest @types/supertest
SuperTest
takes as input Express application. We have to structure our code to return it, e.g.
import express, { Application, Request, Response } from 'express';
import { getGreeting } from './greetings/getGreeting';
const app: Application = express();
app.get('/', (_req: Request, res: Response) => {
res.json(getGreeting());
});
export default app;
Our updated intex will look like:
import app from './app';
import http from 'http';
const server = http.createServer(app);
const PORT = 5000;
server.listen(PORT);
server.on('listening', () => {
console.info('server up listening');
});
Let's create the test for the default route. For that, create a file, e.g. getGreetings.api.test.ts
. We'll be using a different prefix, api.test.ts
, as those tests are not unit but integration/acceptance. They will be running the Express server. Having the Express app extracted, we can use the SuperTest
library as:
import request from 'supertest';
import app from '../app';
describe('GET /', () => {
it('should return greeting "Hello World!"', () => {
return request(app)
.get('/')
.expect('Content-Type', /json/)
.expect(200, { greeting: 'Hello World!' });
});
});
SuperTest
wraps the Express app and making the API calls easier. It also provides a set of useful methods to check the response params.
As the final step we'll add a separate NPM script to package.json for running API tests and also script to run all of them.
{
"scripts": {
"test": "npm run test:unit && npm run test:api", // <-- added
"test:unit": "jest unit",
"test:api": "jest api" // <-- added
}
}
CI setup - run tests in Github Actions
It's important to have your changes be verified during the pull request process. We'll use GitHub Actions as a sample of how to do that. You need to create the .github/workflows folder and putt there new file (e.g. samples_simple.yml). This file will contain YAML configuration for your action:
The simplest setup will look like this:
name: Node.js CI
on:
# run it on push to the default repository branch
push:
branches: [$default-branch]
# run it during pull request
pull_request:
defaults:
run:
# relative path to the place where source code (with package.json) is located
working-directory: samples/simple
jobs:
build:
# use system defined below in the tests matrix
runs-on: windows-latest
steps:
- uses: actions/checkout@v2
- name: Use Node.js 14.x
uses: actions/setup-node@v1
with:
# use the node version defined in matrix above
node-version: 14.x
# install dependencies based on the package log
- run: npm ci
# run linting (ESlint and Prettier)
- run: npm run lint
# run build
- run: npm run build:ts
# run tests
- run: npm run test:unit
If you want to make sure that your code will be running properly for a few Node.js versions and different operating systems (e.g. because developers may have different environment configuration) then you can use matrix tests:
name: Node.js CI
on:
# run it on push to the default repository branch
push:
branches: [$default-branch]
# run it during pull request
pull_request:
defaults:
run:
# relative path to the place where source code (with package.json) is located
working-directory: samples/simple
jobs:
build:
# use system defined below in the tests matrix
runs-on: ${{ matrix.os }}
strategy:
# define the test matrix
matrix:
# selected operation systems to run CI
os: [windows-latest, ubuntu-latest, macos-latest]
# selected node version to run CI
node-version: [14.x, 15.x]
steps:
- uses: actions/checkout@v2
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v1
with:
# use the node version defined in matrix above
node-version: ${{ matrix.node-version }}
# install dependencies
- run: npm ci
# run linting (ESlint and Prettier)
- run: npm run lint
# run build
- run: npm run build:ts
# run tests
- run: npm run test:unit
Jest unit test configuration with VSCode debug settings
npm i -D jest @types/jest ts-jest
- Configure Jest with using npx installer:
npx ts-jest config:init
- This will create jest.config.js with Jest needed configuration:
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node',
};
- Let's add some dummy code to make sure that our tests are working. This can be e.g.
src/greetings/getGreeting.ts
export function getGreeting() {
return {
greeting: 'Hello World!',
};
}
- Let's add also some dummy unit test running this code. I'll put it in the same directory, as in my opinion it makes easier development and focus on the specific test instead of jumping from one place to another. In this case it will be
src/greetings/getGreetings.unit.test.ts
import { getGreeting } from './getGreeting';
describe('getGreeting', () => {
it('should return greeting "Hello World!"', () => {
const result = getGreeting();
expect(result).toBeDefined();
expect(result.greeting).toBe('Hello World!');
});
});
- To run Jest we need to add new NPM script to package.json:
{
"scripts": {
"test:unit": "jest unit",
}
}
Now you can run them with:
npm run test:unit
Jest will be smart enough to find by convention all files with .unit.test.ts
suffix.
7. To be able to debug our tests we have to add new debug configurations to launch.json. We'll be using watch
settings, so we don't have re-run tests when we updated logic or test code.
{
"version": "0.2.0",
"configurations": [
{
"name": "Jest all tests",
"type": "node",
"request": "launch",
"program": "${workspaceRoot}/node_modules/jest/bin/jest.js",
"args": ["--verbose", "-i", "--no-cache", "--watchAll"],
"console": "integratedTerminal",
"internalConsoleOptions": "neverOpen"
},
{
"name": "Jest current test",
"type": "node",
"request": "launch",
"program": "${workspaceFolder}/node_modules/jest/bin/jest",
"args": [
"${fileBasename}",
"--verbose",
"-i",
"--no-cache",
"--watchAll"
],
"console": "integratedTerminal",
"internalConsoleOptions": "neverOpen"
}
]
}
See more in #3.
VSCode debug configuration
To configure VSCode debug you need to add launch.json file in the .vscode folder.
To not need to synchronise two separate configurations, we'll reuse the existing NPM script dev:start
that starts the application.
{
"version": "0.2.0",
"configurations": [
{
"name": "Debug",
"type": "node",
"request": "launch",
"runtimeExecutable": "npm",
"runtimeArgs": ["run-script", "dev:start", "--", "--inspect-brk=9229"],
"port": 9229
}
]
}
As we have TypeScript configured, then we don't need any additional setup. We're reusing the native node debugging capabilities by using the --inspect-brk=9229
parameter. Read more in the Node.js documentation
See more in #2.