Supported Versions:
This project contains all needed classes to bind JavaEE enterprise events to a RabbitMQ exchange for outgoing events. Inbound events can also be bound to the respective queues and will be handed over to all JavaEE event observers.
The RabbitMQ message content is done via JSON serialization of a Java Bean compatible PoJo object and vice versa.
First you need to define event objects using standard Java Bean syntax:
public class EventOne {
private boolean enabled;
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
}
public class EventTwo {
private String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
As second step you have to define the bindings:
@Dependent
public class RabbitBinder extends EventBinder {
@Override
protected void bindEvents() {
ExchangeDeclaration testExchangeOne = declarerFactory()
.createExchangeDeclaration("test.exchange.one")
.withExchangeType(BuiltinExchangeType.DIRECT)
.withAutoDelete(false)
.withDurable(false); // (1)
QueueDeclaration testQueue = declarerFactory()
.createQueueDeclaration("test.queue")
.withAutoDelete(false)
.withDurable(false)
.withExclusiveAccess(false); // (2)
bind(EventOne.class)
.toExchange("test.exchange.one");
.withDeclaration(testExchangeOne) // (3)
bind(EventOne.class)
.toExchange("test.exchange.two")
.withRoutingKey("test.key")
.withEncoder(new MyCustomEncoder()); // (4)
bind(EventTwo.class)
.toQueue("test.queue");
bind(EventTwo.class)
.toQueue("test.queue")
.withDecoder(new MyCustomDecoder())
.withDeclaration(testQueue)
.autoAck(); // (5)
}
}
-
Create exchange declaration for exchange with name 'test.exchange.one' and type 'direct'
-
Creates queue declaration for queue with name 'test.queue'
-
Uses an empty routing key for exchange 'test.exchange.one' and adds exchange declaration for this publisher
-
Specifies a custom event encoder
-
Specifies a custom event decoder, enables auto acknowledge and adds queue declaration for this consumer
Note to declarations:
Created declarations are not automatically declared on broker side. For each consumer/producer own channels exist because of that reason it is necessary to add the declaration needed to a binding via .withDeclaration(..) Only these declarations are applied for this consumer/producer
As last step you need to initialize the binder either in a singleton startup bean or servlet after having also configured the connection settings:
@Singleton
@Startup
public class BindingInitializer {
@Inject
private RabbitBinder binder;
@PostConstruct
public void initialize() {
try {
binder.configuration()
.addHost("somehost.somedomain") // (1)
.setUsername("myuser") // (2)
.setPassword("mypassword"); // (3)
.setSecure(true) // (4)
.setConnectTimeout(10000) // (5)
.setConnectRetryWaitTime(10000) // (6)
.setRequestedConnectionHeartbeatTimeout(3) // (7)
.withPrefetchCount(5); //8
binder.initialize();
} catch (IOException e) {
LoggerFactory.getLogger(getClass()).error("Unable to initialize", e);
}
}
}
-
Specifies a AMQP host name (there can be added more than one here)
-
Specifies a connection user name
-
Specifies a connection password
-
Enables the transport layer security (TLS)
-
Sets the connect timeout to 10 sec
-
Set the time to wait between connection retries to 10 sec
-
Set the heartbeat timeout to 3 sec (To detect connection problems)
-
Set the prefetch count which configures how many messages are downloaded at once from broker
Alternatively the connection can also be configured using a respective URI string:
@Singleton
@Startup
public class BindingInitializer {
@Inject
private RabbitBinder binder;
@PostConstruct
public void initialize() {
try {
binder.configuration()
.setUri("amqp://user:mysecret@somehost.somedomain/virtualhost"); // (1)
binder.initialize();
} catch (IOException e) {
LoggerFactory.getLogger(getClass()).error("Unable to initialize", e);
}
}
}
-
Specifies a AMQP connection URI
More information about the detailed URI can be found in the RabbitMQ URI specification.
In case you have to support two different servers, create a binder implementation for each host and initialize them in one single binding initializer:
@Singleton
@Startup
public class BindingInitializer {
@Inject
private RabbitBinder binderOne;
@Inject
private RabbitBinder binderTwo;
@PostConstruct
public void initialize() {
try {
binderOne.configuration()
.addHost("hostOne.somedomain")
.setUsername("myuser")
.setPassword("mypassword");
binderTwo.configuration()
.addHost("hostTwo.somedomain")
.setUsername("myuser")
.setPassword("mypassword");
binderOne.initialize();
binderTwo.initialize();
} catch (IOException e) {
LoggerFactory.getLogger(getClass()).error("Unable to initialize", e);
}
}
}
Now the events can be used within your JavaEE container:
public class EventDemoBean {
@Inject
private Event<EventOne> eventOnes;
public void submitEvent(boolean enabled) {
EventOne eventOne = new EventOne();
eventOne.setEnabled(enabled);
eventOnes.fire(eventOne);
}
public void receiveEvent(@Observes EventTwo eventTwo) {
String data = eventTwo.getData();
// do some work
}
}
Contributions are always welcome. Use Google code style format for your changes.
This project is licensed under the MIT license