RabbitMQ入门(二)工作队列

  在文章RabbitMQ入门(一)之Hello World,我们编写程序通过指定的队列来发送和接受消息。在本文中,我们将会创建工作队列(Work Queue),通过多个workers来分配耗时任务。
  工作队列(Work Queue,也被成为Task Queue,任务队列)的中心思想是,避免立即执行一个资源消耗巨大且必须等待其完成的任务。相反地,我们调度好队列可以安排该任务稍后执行。我们将一个任务(task)封装成一个消息,将它发送至队列。一个在后台运行的work进程将会抛出该任务,并最终执行该任务。当你运行多个workers的时候,任务将会在它们之中共享。
  这个概念在web开发中很有用,因为通过一个短的HTTP请求不可能处理复杂的任务。
  在之前的文章中,我们发送了一个包含“Hello World!”的消息。现在我们将会发送代表复杂任务的字串符。我们并没有实际上的任务,比如重新调整图片的尺寸或者渲染PDF,我们假装有这样的复杂任务,通过使用time.sleep()函数。我们将会用字符串中的点(.)来代表复杂度;每一个点代表一秒中的任务。举例来说,字符串Hello...需要花费三秒。
  我们需要稍微修改下sent.py中的代码,允许在命令中输入任意字符串。该程序会调度任务至工作队列,因此命名为new_task.py:

import sys message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print(" [x] Sent %r" % message)

  我们原先的receive.py也需要改动:它需要在消息体中将字符串的每一个点代表1秒钟的任务。它会从队列中抛出消息并执行该消息,因此命名为task.py:

import time def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") Round-Robin分发(轮询分发)

  使用工作队列的一个好处就是它能够轻松实现并行工作。如果我们创建了一项积压的工作,那么我们可以增加更多的worker来使它的扩展性更好。
  首先,我们同时运行两个worker.py脚本。他们都能够从队列中获取消息,但是具体是怎么实现的呢?让我们接着阅读。
  你需要打开三个终端查看。两个终端用于运行worker.py脚本。这两个终端将会成为两个消费者——C1和C2。

# shell 1 python worker.py # => [*] Waiting for messages. To exit press CTRL+C # shell 2 python worker.py # => [*] Waiting for messages. To exit press CTRL+C

在第三个终端中,我们将会产生新的任务。一旦你启动了这些消费者,你就可以发送一些消息了:

# shell 3 python new_task.py First message. python new_task.py Second message.. python new_task.py Third message... python new_task.py Fourth message.... python new_task.py Fifth message.....

让我们看看这两个workers传递了什么:

# shell 1 python worker.py # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....' # shell 2 python worker.py # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'

RabbitMQ默认会将每个消息依次地发送给下一个消费者。因此总的来说,每个消费者将会同样数量的消息。这种消息分配的方法叫Round-Robin。你可以尝试三个或者更多的worker。

消息确认(Message Acknowledgement)

  执行一项任务需要花费几秒钟。你也许会好奇,如果其中一个消费者执行一项耗时很长的任务,并且在执行了一部分的时候挂掉了,将会发生什么?根据我们现在的代码,一旦RabbitMQ将消息传送至消费者,那么RabbitMQ就会标志它为删除状态。在这种情况下,如果我们杀死某个worker,我们将会失去他正在处理的消息。我们也会失去所有分配至这个worker的消息,当然,这些消息还未被处理。
  但是,我们不希望失去任何一项任务。如果有一个worker挂掉了,我们希望这些任务能够被传送至另一个worker。
  为了确保消息不丢失,RabbitMQ支持消息确认。一个ack(nowledgement)是由消费者发送回来的,用于告诉RabbitMQ,这个特定的消息已经被接受,被处理,可以被删除了。
  如果一个消费者挂了(它的channel关闭了,连接关闭了,或者TCP连接丢失)但是没有发送一个ack,RabbitMQ就会知道这个消息并未被完全处理,会将它重新塞进队列。如果同时还存在着其他在线消费者,RabbbitMQ将会将这个消息重新传送给另一个消费者。用这种方式可以确保没有消息丢失,即使workers偶尔会刮掉。
  并不存在消息超时;如果消费者挂了,RabbitMQ将会重新传送消息。这样即使处理一个消息需要消耗很长很长的时间,也是可以的。
  默认的消息确认方式为人工消息确认。在我们之前的例子中,我们清晰地将它关闭了,使用了auto_ack=True这个命令。当我们完成一项任务的时候,根据需要,移除这个标志,从worker中发送一个合适的确认。

def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep( body.count('.') ) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(queue='hello', on_message_callback=callback)

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwgpjd.html