An interface that can implement publishing and consuming messages from various message queue platforms.
The tests are integration tests and depend on zookeeper and kafka running.
$ make run
$ make stop
./gradlew clean test
export MESSAGE_QUEUE_HOST='localhost:9092'
PelicanAppConfig appConfig = new PelicanAppConfig();
appConfig.setMessageQueueHost("localhost:9092");
In order to change the subscribe and publish settings implement a local PelicanAppConfig
and override the methods,
propertiesForSubscribe
, propertiesForPublish
. Then use the local implementation to construct publishers and subscribers.
public class PubSubConfig extends PelicanAppConfig {
@Override
public Properties propertiesForSubscribe(String clientId, String consumerGroup) {
Properties properties = super.propertiesForSubscribe(clientId, consumerGroup);
properties.put("my.new.prop.key", "my.new.prop.value");
return properties;
}
@Override
public Properties propertiesForPublish(String clientId) {
Properties properties = super.propertiesForPublish(clientId);
properties.put("my.new.prop.key", "my.new.prop.value");
return properties;
}
}
PelicanAppConfig appConfig = new PelicanAppConfig();
Map<String, String> message = new HashMap<>();
message.put("test_key", "test_value");
Publish publish = appConfig.publish();
String topic = "test";
publish.send(topic, message);
AppConfig appConfig = new AppConfig();
List<String> topics = Arrays.asList("test");
String consumerGroup = "test";
Subscribe subject = appConfig.subscribe(topics, consumerGroup);
Duration timeout = Duration.ofSeconds(100);
List<Map<String, String>> messages = subject.poll(timeout);
// tell the queue the record has been processed.
subject.processed();
This repo also has the definitions for:
They contain image tags that mirror this project's versions.
The images are to be used for local development only and not production ready.