One best practice to decouple components is to use a stream and build an event based architecture (service choreography). In such a service choreography the only communication path that exists is the connection of each service to the stream. Each service then published events on the stream and don’t cares what happen with these records. Other services subscribe to the stream and decide by their own how to react on a certain record.
Producer and Consumer as coupled services The Event Consumer subscribe for specific topic and get notified on new record(s) using the "stream-listenr-plugin". Once record(s) are received it will react on a certain record and it will mark it as success or error regarding to the exception handling on event-consumer.
- Event subscribers should handle a large numbers of events by pulling them self in batch mode.
- The all logic and configuration should be provided via event-stream plugin.
- Should handle a record consumer.
- Should handle a batch consumer.
- Should filter records before consuming.
- Should handle exception during consumer execution.
- The pull and commit should be part of plugin.
- Usage of plugin should match nearly to Kafka Listener logic, so that the migration to kafka can be seamless.
- Git Flow
- Maven
- 85 %
- Java 17
- Spring boot 3.0
- Maven 3
- Package by Feature
- No TODO's or FIXME are present in code
- No Dead, commented or unused code are present
- No warnings are present
- Use standard Code Formatter from IDEA
- Follow Java Style Guide
- Use english for Programing language
- Use english for documentation
https://google.github.io/styleguide/javaguide.html
- Follow SOLID Patterns
- S - Single-responsibility Principle
- O - Open-closed Principle
- L - Liskov Substitution Principle
- I - Interface Segregation Principle
- D - Dependency Inversion Principle
- Follow KISS Patterns
- Follow DRY Patterns
provides a scalable and secure platform that allows enterprises to: ingest any data from any system at any speed—even if the data comes from on-premises, cloud, or edge-computing systems; store any type or volume of data in full fidelity; process data in real time or batch mode; and analyze data using SQL, Python, R, or any other language, third-party data, or analytics application.
Any Spring boot application that include the stream-listener-starter as spring boot starter library
is any stereo spring boot component that annotate any method with @StreamListener annotation
is a third party spring boot starter library, this will receive records from data-provider by given specific endpoint
is an interface that must be implemented by application using a Batch Rest API or different data lakes endpoints.
same as source data lakes may contains more or less platforms/systems
This library may be used in other implementation and speed up the development.
Consumers pull events in batch modus via plugin from different endpoints.
Endpoint may be Rest Service, Database Repositories, Filesystem or Subscribers from message broker
Consumer need to configure the topics, group, id, etc.
we'll define the BeanPostProcessor which will check each bean for the Subscriber annotation. This class is also a DestructionAwareBeanPostProcessor, which is a Spring interface adding a before-destruction callback to BeanPostProcessor. If the annotation is present, we'll register it with the Endpoint identified by the annotation's SpEL expression on bean initialization and unregister it on bean destruction:
- Annotation that marks a method to be registered as listener. Exactly one of the topic attribute must be specified.
- The annotated method must expect arguments of typo single record or collection of records.
- It will typically have a void return type; if not, the returned value will be ignored when called through the factory.
public class InvoiceListener {
@StreamListener("dwh_invoice_created")
public void on(InvouceEvent event) {
---
}
}
application.properties
stream-listener.name=dwh
stream-listener.id=invoice-consumer_1
stream-listener.batch=true
Name | Type | Required | Default Value | Description |
---|---|---|---|---|
topics | array | true | empty | The topics for this listener. The entries can be 'topic name', 'property-placeholder keys' or 'expressions'. An expression must be resolved to the topic name. |
id | string | false | empty | The unique identifier of the container for this listener. If none is specified an auto-generated id is used SpEL {@code #{...}} and property place holders {@code ${...}} are supported. |
name | string | false | empty | Spring application name |
errorHandler | object | false | null | Set an {@link StreamEventListenerErrorHandler} bean name to invoke if the listener method throws an exception. If a SpEL expression is provided ({@code #{...}}), the expression can either evaluate to a {StreamEventListenerErrorHandler} instance or a bean name. |
groupId | string | false | empty | Override the {@code group.id} property for the consumer factory with this value for this listener only. SpEL {@code #{...}} and property place holders {@code ${...}} are supported. |
concurrency | string | false | empty | Override the container factory's {@code concurrency} setting for this listener. May be a property placeholder or SpEL expression that evaluates to a {@link Number}, in which case {@link Number#intValue()} is used to obtain the value. SpEL {@code #{...}} and property place holders {@code ${...}} are supported. |
autoStartup | boolean | false | false | Set to true or false, to override the default setting in the container factory. May be a property placeholder or SpEL expression that evaluates to a {@link Boolean} or a {@link String}, in which case the {@link Boolean#parseBoolean(String)} is used to obtain the value. SpEL {@code #{...}} and property place holders {@code ${...}} are supported. |
properties | array | false | empty | Stream consumer properties; they will supersede any properties with the same name defined in the consumer factory (if the consumer factory supports property overrides). Supported Syntax The supported syntax for key-value pairs is the same as the syntax defined for entries in a Java {@linkplain java.util.Properties#load(java.io.Reader) properties file}: {@code key=value} {@code key:value} {@code key value} {@code group.id} and {@code client.id} are ignored. SpEL {@code #{...}} and property place holders {@code ${...}} are supported. SpEL expressions must resolve to a {@link String}, a @{link String[]} or a {@code Collection} where each member of the array or collection is a property name + value with the above formats. |
batch | boolean | false | auto | The listener method signature should receive a {@code List<?>}; refer to the reference documentation. This allows a single container factory to be used for both record and batch listeners; previously separate container factories were required. @return "true" for the annotated method to be a batch listener or "false" for a record listener. If not set, the container factory setting is used. SpEL and property placeholders are not supported because the listener type cannot be variable. |
filter | object | false | null | Set an {@link RecordFilterStrategy} bean name to override the strategy configured on the container factory. If a SpEL expression is provided ({@code #{...}}), the expression can either evaluate to a {@link RecordFilterStrategy} instance or a bean name. |
Enable stream-listener-starter on spring boot context
@EnableStreamListener
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
Define the BeanPostProcessor which will check each bean for the Subscriber annotation. This class is also a DestructionAwareBeanPostProcessor, which is a Spring interface adding a before-destruction callback to BeanPostProcessor. If the annotation is present, we'll register it identified by the annotation's SpEL expression on bean initialization and unregister it on bean destruction:
Filter an entire batch of records; to filter all records, return an empty list never null
An error handler which is called when a {@code @StreamListener} method throws an exception. This is invoked higher up the stack than the listener container's error handler. The error handler can return a result.
Stream Listener Data Provider provide receive and commit implementation and should be implemented by subscriber
/**
* Stream Listener Data Provider provide receive and commit implementation and should be implemented by subscriber
* @param <T>
*/
public interface StreamListenerDataProvider<T extends EventIdAware> {
/**
* Receive records by specific endpoint
*
* @param endpoint may not be null
* @return List
*/
List<T> receive(StreamListenerEndpoint<T> endpoint);
/**
* Commit received records within specific endpoint
*
* @param endpoint may not be null
* @param responses may not be null or empty
*/
void commit(StreamListenerEndpoint<T> endpoint, List<ResponseStatus> responses);
}
Since this is a java library, this will be deployed into central maven repository via ci/cd pipelines
- Externalize all properties within spring boot configuration properties
- Follow nearly Kafka Listener concept
- Few dependencies, so the binary stays as small as possible
- Low code complexity, so that it easy to grasp the code
- High Test Coverage, so the issues are low as possible
This section contains all quality requirements as quality tree with scenarios. The most important ones have already been described in section 1.2. (quality goals) Here you can also capture quality requirements with lesser priority, which will not create high risks when they are not fully achieved.
Test Scenario | Description | Status | Link |
---|---|---|---|
Pull Event for given topic | It call REST API endpoint by given topic and get only event specified for given endpoint-id | Done | Link to Document |
Commit results for given topic and endpoint-id | It call REST API endpoint by given endpoint-id and release (locked) event specified for given endpoint-id | Done | Link to Document |
- Due to the few code maintainers the code needs to be tested within high load and long duration in multiple use cases
- Graceful shutdown need to be observed for erroneously behaviors
Term | Description |
---|---|
Event-Streaming | Streaming data is processed in real-time as it's delivered to a system, although the type of data or the nature of events typically will affect any resulting action |
? | ? |
Feedback and pull-requests welcome! microtema [at] web [.] de
- Java 17
- Streams
- Lambdas
- Third Party Libraries
- Commons-Lang3 (Apache License)
- Jackson XML Databind (Apache License)
- Junit (EPL 1.0 License)
- Code-Analyses
- Sonar
- Jacoco
MIT (unless noted otherwise)
95 %