RabbitMQ封装实战

先说下背景:上周开始给项目添加曾经没有过的消息中间件。虽然说,一路到头非常容易,直接google,万事不愁~可是生活远不仅是眼前的“苟且”。首先是想使用其他项目使用过的一套对mq封装的框架,融合进来。虽然折腾了上周六周日两天,总算吧老框架融进项目中了,可是周一来公司和大数据哥们儿一联调发现,收不到数据!所以没办法,当场使用原生那一套撸了个版本出来~可是,可是,可是,俗话说得好:生命在于折腾!在上周末融合老框架的时候,我把源码读了遍,发现了很多很好的封装思想,Ok,这周末总算闲了下来,我就运用这个思想,封装一个轻量级的呗,说干就干!

主要思想

说到封装,我想,应该主要是要尽可能减小用户使用的复杂度,尽量少的进行配置,书写,甚至能尽量少的引入第三发或是原生类库。所以在这种想法之下,这套框架的精髓主要在以下几点:

使用注解,减少用户配置

将不同的生产者消费者的初始化方式统一

初次注册生产者或者消费者的时候,进行队列的自动注册

再统一的初始化方式中,使用动态代理的方式,代理到具体的生产者或是消费者的发送接收方法

在这种模式下,我们不用过多的配置,直接建立一个接口,接口上面使用注解声明队列的名称,然后使用同一的Bean进行初始化,就齐活了!

统一初始化Bean的实现

不说啥,直接上代码:

public class RabbitMQProducerFactoryBean<T> extends RabbitMQProducerInterceptor implements FactoryBean<T> { private Logger logger = LoggerFactory.getLogger(getClass()); private Class<?> serviceInterface; @Autowired private ConnectionFactory rabbitConnectionFactory; @Value("${mq.queue.durable}") private String durable; @Value("${mq.queue.exclusive}") private String exclusive; @Value("${mq.queue.autoDelete}") private String autoDelete; @SuppressWarnings("unchecked") /** 这个方法很特殊,继承自FactoryBean,就是说管理权归属IoC容器。每次注册一个队列的时候,并且注入到具体的service中使用的时候,就会调用这个getObject方法。所以,对于使用本类初始化的bean,其类型并非本类,而是本类的属性serviceInterface类型,因为最终getObject的结果是返回了一个动态代理,代理到了serviceInterface。 **/ @Override public T getObject() throws Exception { //初始化 if (getQueueName() != null) { logger.info("指定的目标列队名[{}],覆盖接口定义。", getQueueName()); } else { RPCQueueName name = serviceInterface.getAnnotation(RPCQueueName.class); if (name == null) throw new IllegalArgumentException("接口" + serviceInterface.getCanonicalName() + "没有指定@RPCQueueName"); setQueueName(name.value()); } //创建队列 declareQueue(); logger.info("建立MQ客户端代理接口[{}],目标队列[{}]。", serviceInterface.getCanonicalName(), getQueueName()); return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class<?>[]{serviceInterface}, this);//动态代理到目标接口 } private void declareQueue() { Connection connection = rabbitConnectionFactory.createConnection(); Channel channel = connection.createChannel(true); try { channel.queueDeclare(getQueueName(), Boolean.valueOf(durable), Boolean.valueOf(exclusive) , Boolean.valueOf(autoDelete), null); logger.info("注册队列成功!"); } catch (IOException e) { logger.warn("队列注册失败", e); } } ...... } public class RabbitMQProducerInterceptor implements InvocationHandler { private Logger logger = LoggerFactory.getLogger(getClass()); private String queueName; @Autowired private AmqpTemplate amqpTemplate; @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Object sendObj; Class<?>[] parameterTypes = method.getParameterTypes(); String methodName = method.getName(); boolean isSendOneJson = Objects.nonNull(args) && args.length == 1 && (args[0] instanceof String); if (isSendOneJson) { sendObj = args[0]; logger.info("发送单一json字符串消息:{}", (String) sendObj); } else { sendObj = new RemoteInvocation(methodName, parameterTypes, args); logger.info("发送封装消息体:{}", JSONSerializeUtil.jsonSerializerNoType(sendObj)); } logger.info("发送异步消息到[{}],方法名为[{}]", queueName, method.getName()); //异步方式使用,同时要告知服务端不要发送响应 amqpTemplate.convertAndSend(queueName, sendObj); return null; } ...... }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wspfxw.html