High level library which abstracts low level Message Brokers clients and enforces the Cloudevents standard for the event payload.
Library still in Beta version and in development
- Supported Clients
- Requirements
- Installation
- Configuration
- Event payload format
- Usage
- Tests
- Contribution Guidelines
- License
Node MessageBrokers provides adapters for Apache Kafka and AWS SNS.
Current supported clients are KafkaJs (default) for Apache Kafka and the official AWS Node Client for AWS SNS.
- Node.js v14+
To install Node MessageBrokers, run npm install --save @micheleangioni/node-messagebrokers
.
Node MessageBrokers can be configured by making use of the following environment variables:
-
UNDERLYING_CLIENT
: low level client to use, supported valued:kafkajs
(default): modern Apache Kafka clientawssns
: default AWS SNS client
-
KAFKA_URI
: comma-separated list of Kafka brokers, defaultlocalhost:9092
-
KAFKA_CLIENT_ID
: client id for the Kafka connection. If not set, a random generated will be used -
KAFKA_LOG_LEVEL
: set the logging level of the KafkaJs client:- 0 = nothing
- 1 = error
- 2 = warning
- 4 = info (default)
- 5 = debug
-
SNS_ENDPOINT
: optional AWS SNS endpoint, defaultundefined
-
AWS_REGION
: AWS region, defaulteu-west-1
-
SSL_CERT
: SSL certificate -
SSL_KEY
: SSL key -
SSL_CA
: SSL certificate authority -
REVERSE_DNS
: Reverse DNS to customise thetype
field of the event payload
In order to send an event, the payload must follow the cloudevents format.
Node MessageBrokers exposes an event factory for v1.0 of the standard. In order to use it, import the factory
import { CloudEventFactory } from '@micheleangioni/node-messagebrokers';
and use the createV1
factory method
CloudEventFactory.createV1(
aggregate: string,
eventType: string,
source: string,
data: any,
options: CreateEventV1Options = {},
)
where CreateEventV1Options
is as follows
export type CreateEventV1Options = {
datacontenttype?: string, // default 'application/json'
dataschema?: string,
subject?: string,
};
Instantiating the Client
Instantiation is similar for all clients. In order to create an instance, it's enough to provide the topic list when using the client factory:
import brokerFactory from '@micheleangioni/node-messagebrokers';
const topics = {
user: {
topic: 'myCompany.events.identity.user',
(numPartitions: 2),
(replicationFactor: 1),
}
};
const broker = brokerFactory(topics);
The topics
parameter must follow the following type definition:
type KafkaTopic = {
topic: string
numPartitions?: number // Only for Apache Kafka clients
replicationFactor?: number // Only for Apache Kafka clients
replicaAssignment?: object[] // Only for KafkaJs client
configEntries?: object[], // Only for KafkaJs client
};
type KafkaTopics = {
[aggregate: string]: KafkaTopic,
};
This structure enforces to provide a different topic per Aggregate.
Every client has its own instantiation options as well.
Furthermore, before being used, most clients need to be initialized through the async init()
method.
Due to some particularity of each client, the brokers' init()
methods have different option parameters.
Instantiation
import brokerFactory from '@micheleangioni/node-messagebrokers';
const topics = {
user: {
topic: 'myCompany.events.identity.user',
numPartitions: 16,
replicationFactor: 3,
}
};
const broker = brokerFactory(topics);
If the topics don't exist yet, provide also the createTopics: true
during the initialization.
Initialization
await broker.init(kafkaJsClientConfiguration);
where consumerConfig
has the following signature:
export interface RetryOptions {
maxRetryTime?: number
initialRetryTime?: number
factor?: number
multiplier?: number
retries?: number
}
interface ConsumerConfig {
groupId: string
metadataMaxAge?: number
sessionTimeout?: number
rebalanceTimeout?: number
heartbeatInterval?: number
maxBytesPerPartition?: number
minBytes?: number
maxBytes?: number
maxWaitTimeInMs?: number
retry?: RetryOptions
allowAutoTopicCreation?: boolean
maxInFlightRequests?: number
readUncommitted?: boolean
}
type KafkaJsClientConfiguration = ConsumerConfig & {
createTopics?: boolean;
};
Simple example
await broker.init({ groupId: 'my-consumer-group-id' });
Creating also the topics during the initialization
await broker.init({ createTopics: true, groupId: 'my-consumer-group-id' });
Creating a Consumer
const consumer = await broker.addConsumer([], consumerConfig); // the first argument is ignored
where consumerConfig
has the following type signature:
export interface IHeaders {
[key: string]: Buffer | string
}
export type KafkaMessage = {
key: Buffer
value: Buffer // Message payload
timestamp: string
size: number
attributes: number
offset: string
headers?: IHeaders
}
export type AggregateConsumerConf = {
handler: (message: KafkaMessage) => Promise<void>;
fromBeginning?: boolean; // Fetch messages from the beginning of the topic, default false
topic: string;
};
type KafkaJsConsumerConfig = {
aggregates: {
[aggregate: string]: AggregateConsumerConf;
};
consumerRunConfig?: {
autoCommit?: boolean;
autoCommitInterval?: number | null;
autoCommitThreshold?: number | null;
eachBatchAutoResolve?: boolean;
partitionsConsumedConcurrently?: number; // Number of running concurrent partition handlers, default 1
};
useBatches?: boolean; // true: use batches, false (default): use single messages
};
Simple example:
const consumer = await broker.addConsumer([], {
aggregates: {
user: {
// eslint-disable-next-line @typescript-eslint/require-await
handler: async (message: KafkaMessage) => {
const eventPayload = JSON.parse(message.value.toString());
expect(eventPayload.data).toEqual(data);
done();
},
topic: 'myCompany.events.identity.user',
},
},
consumerRunConfig: {
partitionsConsumedConcurrently: 3,
},
useBatches: false,
});
Sending messages
const cloudEvent = CloudEventFactory.createV1(
aggregate,
eventType,
source,
data,
);
await broker.sendMessage(
aggregate,
[cloudEvent],
{ partitionKey },
)
Simple example:
const aggregate = 'user';
const cloudEvent = CloudEventFactory.createV1(
aggregate,
'UserCreated',
'/users',
{
email: 'voodoo@gmail.com',
username: 'Voodoo',
},
);
await broker.sendMessage(
aggregate,
[cloudEvent],
)
Instantiation
import brokerFactory from '@micheleangioni/node-messagebrokers';
const broker = brokerFactory(topics);
If the topics don't exist yet, provide also the createTopics: true
key during the initialization (next paragraph).
This requires of course the sns:CreateTopic
permission.
When connecting to AWS the client needs to fetch the ARNs list of input topics.
By default, it performs a listTopics
call to SNS and therefore the sns:ListTopic
permission is needed.
There are 2 possibilities to improve the performances and avoid a lookup over all existing topics:
-
The account has the
sns:CreateTopic
permission. In this case, provide thecreateTopics: true
key during initialization even if the topics have already been created -
Pass the AWS Account Id in order to re-build the topics ARNs without having to query AWS
const broker = brokerFactory(topics, { awsAccountId: '1234567890' });
Initialization
await broker.init(initConfigurations);
where initConfigurations
is optional and has the same structure of the
options constructor parameter of the official SDK.
plus the optional createTopics: boolean
key.
Simple example:
await broker.init();
Creating also the topics during the initialization
await broker.init({ createTopics: true });
Adding a Consumer
const subscriptionResponse = await addConsumer(aggregate, consumerConfig);
where aggregate
is a string and consumerConfig
has the following signature
type SnsConsumerOptions = {
attributes?: SubscriptionAttributesMap,
endpoint: string,
protocol: SnsProtocol,
};
where endpoint
is the endpoint to which the consumer can be reached,
SnsProtocol
is one of the supporter SNS protocols
enum SnsProtocol {
EMAIL = 'email',
EMAIL_JSON = 'email-json',
HTTP = 'http',
HTTPS = 'https',
SMS = 'sms',
SQS = 'sqs',
APPLICATION = 'application',
LAMBDA = 'lambda',
}
and SubscriptionAttributesMap
is one of the
attributes of the official SDK.
Simple example for an HTTP consumer:
await broker.addConsumer('user', { endpoint: 'https://myconsumerhost.com/api/example', protocol: SnsProtocol.HTTP });
Sending messages
const cloudEvent = CloudEventFactory.createV1(
aggregate,
eventType,
source,
data,
);
await broker.sendMessage(
aggregate,
[cloudEvent],
)
Simple example:
const aggregate = 'user';
const cloudEvent = CloudEventFactory.createV1(
aggregate,
'UserCreated',
'/users',
{
email: 'voodoo@gmail.com',
username: 'Voodoo',
},
);
await broker.sendMessage(
aggregate,
[cloudEvent],
)
In order to run the tests, follow the following steps:
- Run docker compose via
docker-compose up -d
(TMPDIR=/private$TMPDIR docker-compose up
in MacOS) - Run the tests via
npm test
Pull requests are welcome. Help is needed to add other clients.
Node MessageBrokers is free software distributed under the terms of the MIT license.