Skip to content


Repository files navigation


An interface that can implement publishing and consuming messages from various message queue platforms.

Executing the tests

The tests are integration tests and depend on zookeeper and kafka running.

run zookeeper and kafka

$ make run

stop zookeeper and kafka

$ make stop

execute the tests

./gradlew clean test


Environment Vars

export MESSAGE_QUEUE_HOST='localhost:9092'


PelicanAppConfig appConfig = new PelicanAppConfig();

Change publish and subscribe settings

In order to change the subscribe and publish settings implement a local PelicanAppConfig and override the methods, propertiesForSubscribe, propertiesForPublish. Then use the local implementation to construct publishers and subscribers.

public class PubSubConfig extends PelicanAppConfig {

    public Properties propertiesForSubscribe(String clientId, String consumerGroup) {
        Properties properties = super.propertiesForSubscribe(clientId, consumerGroup);
        properties.put("", "");
        return properties;

    public Properties propertiesForPublish(String clientId) {
        Properties properties = super.propertiesForPublish(clientId);
        properties.put("", "");
        return properties;

How to Publish

PelicanAppConfig appConfig = new PelicanAppConfig();

Map<String, String> message = new HashMap<>();
message.put("test_key", "test_value");

Publish publish = appConfig.publish();

String topic = "test";
publish.send(topic, message);

How to Subscribe

AppConfig appConfig = new AppConfig();

List<String> topics = Arrays.asList("test");
String consumerGroup = "test";

Subscribe subject = appConfig.subscribe(topics, consumerGroup);

Duration timeout = Duration.ofSeconds(100);
List<Map<String, String>> messages = subject.poll(timeout);

// tell the queue the record has been processed.


This repo also has the definitions for:

They contain image tags that mirror this project's versions.

The images are to be used for local development only and not production ready.