RabbitMQ 学习初入门(7)

消费者:

package com.maxfunner; import com.maxfunner.mq.QueueConsumer; import com.rabbitmq.client.*; import org.apache.commons.lang.SerializationUtils; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * Consumer */ public class Consumer { private Connection connection; private Channel channel; private Map<Integer,String> messageMap = new HashMap<Integer, String>(); private static final String EXCHANGE_NAME = "MY_EXCHANGE"; /** * 对,你猜得一点都没有错,我是复制的 * @throws IOException */ public void createConnectionAndChannel() throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); //服务器地址 factory.setUsername("guest"); //默认用户名 factory.setPassword("guest"); //默认密码 factory.setPort(5672); //默认端口,对就是这么屌全部默认的 this.connection = factory.newConnection(); //创建链接 this.channel = this.connection.createChannel(); } public void createAndBindQueue() throws IOException { /** * 创建了一个交换器,类型为 direct 非持久化 自动删除 没有额外参数 */ this.channel.exchangeDeclare(EXCHANGE_NAME,"direct",false,true,null); //最好也创建一下交换器,反正已经创建也没有关系 /** * 创建了一个队列, 名称为 QUEUE_A 非持久化 非独有的 自动删除的 没有额外删除的 */ this.channel.queueDeclare("QUEUE_A",false,false,true,null); this.channel.queueBind("QUEUE_A",EXCHANGE_NAME,"KEY_A"); } public static void main(String args[]) throws IOException { final Consumer consumer = new Consumer(); consumer.createConnectionAndChannel(); consumer.createAndBindQueue(); System.out.println("等待消息中。。。。"); new Thread(new Runnable() { public void run() { try { /** * 订阅消息,订阅队列QUEUE_A 获得消息后自动确认 */ consumer.channel.basicConsume("QUEUE_A", true, new com.rabbitmq.client.Consumer() { public void handleConsumeOk(String consumerTag) { } public void handleCancelOk(String consumerTag) { } public void handleCancel(String consumerTag) throws IOException { } public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { } public void handleRecoverOk(String consumerTag) { } public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("message : " + SerializationUtils.deserialize(body)); } }); } catch (IOException e) { e.printStackTrace(); } } }).start(); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/14812.html