如何在项目中引入MetaQ消息收发机制

当需要异步发送和接收大量消息时,需要在Crystal项目中引入MetaQ消息收发机制。

关于MetaQ使用的官方例子可参考:https://github.com/killme2008/Metamorphosis/wiki/%E7%AE%80%E5%8D%95%E4%BE%8B%E5%AD%90

Crystal框架将MetaQ进行封装,简化MetaQ的使用,具体如下:

消息生产端

引入crystal-metaq-producer项目最为依赖:

<dependency> <groupId>com.gsoft.crystal</groupId> <artifactId>crystal-metaq-producer</artifactId> </dependency>

调用消息发送对象,发送指定消息:

@Resource private MessageProducer mp; @Resource private MessageConsumer mc; @Test public void testProducer() { String topic = "test"; final String msg = "test message !"; mp.publish(topic); Message message = new Message(topic, msg.getBytes()); try { mp.sendMessage(message); } catch (MetaClientException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }

其中,topic必须是MetaQ服务器中定义的主题之一。

在发送消息前,必须先mp.publish(topic),与指定主题关联。

消息消费端

引入crystal-metaq-consumer项目最为依赖:

<dependency> <groupId>com.gsoft.crystal</groupId> <artifactId>crystal-metaq-consumer</artifactId> </dependency>

调用消息消费对象,注册监听器:

@Resource private MessageConsumer mc; @Test public void testProducer() { String topic = "test"; try { mc.subscribe(topic, 1024*1024, new MessageListener() { @Override public void recieveMessages(Message message) throws InterruptedException { String str = new String(message.getData()); System.out.println("Recived Message: " + str); Assert.assertEquals(msg, str); } @Override public Executor getExecutor() { return null; } }); mc.completeSubscribe(); } catch (MetaClientException e1) { e1.printStackTrace(); }

其中,mc.subscribe()方法可执行多次,最后需执行mc.completeSubscribe()方法。

另,上述方法中的1024*1024参数为接收的消息内容最大字节数,可自行调整以优化性能(不了解具体如何优化情况下,建议不要调整)。

监听器中的recieveMessages方法即为消息消费方法,getExecutor方法返回线程池的执行器,如返回null,则不采用线程池。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/0938b4d66aca6ebc80c353f1d7bc6d33.html