RabbitMQ 学习初入门(8)

生产者运行结果:

message : message_A ! 发送成功 message : message_B ! 发送成功 message : message_C ! 发送成功 message : message_D ! 发送成功 message : message_E ! 发送成功 message : message_F ! 发送成功

消费者运行结果:

等待消息中。。。。 message : message_A message : message_B message : message_C message : message_D message : message_E message : message_F

项目我是用maven创建的贴一下maven 的pom.xml文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 "> <modelVersion>4.0.0</modelVersion> <groupId>com.maxfunner</groupId> <artifactId>rabbitlearning</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>rabbitlearning</name> <url></url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.0.4</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.1</version> </dependency> </dependencies> </project>

最后一个重点问题

消息“黑洞”,如果在没有绑定队列和交换器之前,所有发出的消息都无法匹配到相应的队列当中,那些消息将永远不会被消费。而且confirm的callback也是会返回成功的即使消息进入了消息“黑洞”。所以在发送消息之前 必须确定队列已经绑定,确保消息能分配到相应的队列当中。 测试很简单,上面的DEMO 先运行 Producer 显示所有消息发送成功,然后再运行 Consumer 发现没有消息可以接收。 再尝试先运行Consumer 再运行 Producer 就发现一切都正常了,这也是为什么我把autoDelete设置为true的原因,有了autoDelete当队列没有人用的时候就会自动删除。所以每次运行都可以测试出问题。要保证消息能够到达指定的队列最好也在Producer中建立队列 而且进行相关的绑定 然后再发送消息 修改一下 Producer 的其中一方法即可。 不写了 累了~

public void initChannelAndCreateExchange() throws IOException { this.channel.confirmSelect(); //启用消息确认已经投递成功的回调 /** * 创建了一个交换器,类型为 direct 非持久化 自动删除 没有额外参数 */ this.channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, true, null); this.channel.addConfirmListener(new ConfirmListener() { /** * 成功的时候回调【这个是当消息到达交换器的时候回调】 * @param deliveryTag 每一条消息都有一个唯一ID【只是同一个channel唯一】,每次发出消息递增1 因为同一个channel所有也保证了消息的流水性。 * @param multiple * @throws IOException */ public void handleAck(long deliveryTag, boolean multiple) throws IOException { String message = messageMap.get(deliveryTag); System.out.println("message : " + message + " ! 发送成功"); messageMap.remove(message); //最后一个消息都搞掂之后 关闭所有东西 if (deliveryTag >= maxID) { closeAnything(); } } /** * 失败的时候回调 * @param deliveryTag 每一条消息都有一个唯一ID【只是同一个channel唯一】,每次发出消息递增1 因为同一个channel所有也保证了消息的流水性。 * @param multiple * @throws IOException */ public void handleNack(long deliveryTag, boolean multiple) throws IOException { String message = messageMap.get(deliveryTag); System.out.println("message : " + message + " ! 发送失败"); messageMap.remove(message); //发送失败就不重发了,发脾气 //最后一个消息都搞掂之后 关闭所有东西 if (deliveryTag >= maxID) { closeAnything(); } } }); /** * 创建了一个队列, 名称为 QUEUE_A 非持久化 非独有的 自动删除的 没有额外删除的 */ this.channel.queueDeclare("QUEUE_A",false,false,true,null); this.channel.queueBind("QUEUE_A",EXCHANGE_NAME,"KEY_A");

CentOS7下安装RabbitMQ 

CentOS 7.2 下 RabbitMQ 集群搭建 

CentOS7环境安装使用专业的消息队列产品RabbitMQ

配置与管理RabbitMQ 

RabbitMQ概念及环境搭建 

RabbitMQ入门教程 

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/14812.html