redis-dqueue
is a redis
+ Java 8
base delayed queue library.
- Push message delay
- Consumer allowed to try again
- Based on the message separation of the topic
- Integrated SpringBoot
With Maven
<dependency>
<groupId>io.github.biezhi</groupId>
<artifactId>redis-dqueue-core</artifactId>
<version>0.0.3.ALPHA</version>
</dependency>
RDQueue rdQueue = new RDQueue(new Config());
// "hello world" messages sent after 10 seconds
Message<String> message = new Message<>("TEST_TOPIC", "hello world", 10);
// async push delay message
rdQueue.asyncPush(message, (key, throwable) -> log.info("key send ok:" + key));
// subscribe topic
rdQueue.subscribe("TEST_TOPIC", callback());
Callback
private static Callback<String> callback() {
return new Callback<String>() {
@Override
public ConsumeStatus execute(String data) {
log.info("消费数据:: {}", data);
return ConsumeStatus.CONSUMED;
}
};
}
With Maven
<dependency>
<groupId>io.github.biezhi</groupId>
<artifactId>redis-dqueue-spring-boot-starter</artifactId>
<version>0.0.3.ALPHA</version>
</dependency>
@Autowired
private RDQueueTemplate rdQueueTemplate;
@GetMapping("/push")
public String push(String id) throws RDQException {
Message<String> message = new Message<>();
message.setTopic("order-cancel");
message.setPayload(id);
message.setDelayTime(10);
rdQueueTemplate.asyncPush(message, (s, throwable) -> {
// TODO async push result
});
return "推送成功";
}
You need to implement MessageListener
, subscribe to the related topic, process delay messages in the execute method.
Ensure that the class was Spring managed.
@Component
public class OrderCancelListener implements MessageListener<String> {
@Override
public String topic() {
return "order-cancel";
}
@Override
public ConsumeStatus execute(String data) {
log.info("取消订单: {}", data);
return ConsumeStatus.CONSUMED;
}
}