RocketMQ系列(五)广播与延迟消息 (2)

setDelayTimeLevel是什么意思,设置的是2,难道是2s后消费吗?怎么参数也没有时间单位呢?如果我要自定义延迟时间怎么办?我相信很多小伙伴都有这样的疑问,我也是带着这样的疑问查了很多资料,最后在RocketMQ的Github官网上看到了说明,

在RocketMQ的源码中,有一个MessageStoreConfig类,这个类中定义了延迟的时间,我们看一下,

// org/apache/rocketmq/store/config/MessageStoreConfig.java private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

我们在程序中设置的是2,那么这个消息将在5s以后被消费。

目前RocketMQ还不支持自定义延迟时间,延迟时间只能从上面的时间中选。如果你非要定义一个时间怎么办呢?RocketMQ是开源的,下载代码,把上面的时间改一下,再打包部署,就OK了。

再看看消费端的代码,

@Bean(name = "broadcast", initMethod = "start",destroyMethod = "shutdown") public DefaultMQPushConsumer broadcast() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast"); consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;"); consumer.subscribe("cluster-topic","*"); consumer.setMessageModel(MessageModel.BROADCASTING); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { for (MessageExt msg : msgs) { Date now = new Date(); System.out.println("消费时间:"+now); Date msgTime = new Date(); msgTime.setTime(msg.getBornTimestamp()); System.out.println("消息生成时间:"+msgTime); System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); return consumer; }

我们还是使用广播的模式,没有变。

打印出了当前的时间,这个时间就是消费的时间。

通过msg.getBornTimestamp()方法,获得了消息的生成时间,也打印出来,看看是不是延迟5s。

启动两个消费者8080和8081,发送消息,再看看消费者的后台日志,

消费时间:Thu Jun 11 14:45:53 CST 2020 消息生成时间:Thu Jun 11 14:45:48 CST 2020 this is simpleMQ,my NO is 0---Thu Jun 11 14:45:47 CST 2020

我们看到消费时间比生成时间晚5s,符合我们的预期。这个功能还是比较实用的,如果能够自定义延迟时间就更好了。

总结

RocketMQ的这两个知识点还是比较简单的,大家要分清楚什么是消费者组,什么是消费者,什么是消费者线程。另外就是延迟消息是不支持自定义的,大家可以在Github上看一下源码。好了~今天就到这里了。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpsgdd.html