python – 当prefetch_count == 1时拒绝并重新排队RabbitMQ任务
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了python – 当prefetch_count == 1时拒绝并重新排队RabbitMQ任务,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含2448字,纯文字阅读大概需要4分钟。
内容图文
![python – 当prefetch_count == 1时拒绝并重新排队RabbitMQ任务](/upload/InfoBanner/zyjiaocheng/725/09ed3456742b4c7ba610bc629e96a148.jpg)
假设我有一个包含五个项目的队列:
(tail) E, D, C, B, A (head)
我使用来自此队列头部的消息,但确定消息A不适合当前处理.我以requeue = True拒绝该项,队列变为:
(tail) A, E, D, C, B (head)
然后我消耗B,C,D和E,对每一个进行处理.现在队列只保留A,我不断消耗并在一个永无止境的循环中反复拒绝.如果有新的非A消息进入,它几乎立即消耗,然后该进程恢复其尝试消耗A的循环.
我通过Pika docs对Twisted Consumer Example的略微修改来做到这一点:
import pika
from pika import exceptions
from pika.adapters import twisted_connection
from twisted.internet import defer, reactor, protocol,task
@defer.inlineCallbacks
def run(connection):
channel = yield connection.channel()
exchange = yield channel.exchange_declare(exchange='topic_link',type='topic')
queue = yield channel.queue_declare(queue='hello', auto_delete=False, exclusive=False)
yield channel.queue_bind(exchange='topic_link',queue='hello',routing_key='hello.world')
#yield channel.basic_qos(prefetch_count=1)
queue_object, consumer_tag = yield channel.basic_consume(queue='hello',no_ack=False)
l = task.LoopingCall(read, queue_object)
l.start(0.01)
@defer.inlineCallbacks
def read(queue_object):
ch,method,properties,body = yield queue_object.get()
print body
if body == 'A':
yield ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
else:
yield ch.basic_ack(delivery_tag=method.delivery_tag)
parameters = pika.ConnectionParameters()
cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters)
d = cc.connectTCP('hostname', 5672)
d.addCallback(lambda protocol: protocol.ready)
d.addCallback(run)
reactor.run()
问题:请注意以下注释掉的行:
#yield channel.basic_qos(prefetch_count=1)
当我取消注释,并且消费者到达消息A时,它将在拒绝它之后立即再次拾起它,忽略可能在其后面的队列中等待的任何其他项目.它不是将被拒绝的项目放在队列的尾部,而是一遍又一遍地重复尝试,完全阻止队列中的其他所有内容.
随着该行注释,它正常工作(尽管有点慢).如果该行存在且prefetch_count> 1,它也有效.将其设置为1的某些操作会触发此行为.
在拒绝消息A时是否缺少一个步骤?或者Pika的预取系统是否与这种边缘情况根本不相容?
解决方法:
如果你只有一个消费者,那么RabbitMQ除了向同一个消费者发送消息之外别无他法(无论如何:使用basic.reject或basic.nack).
设置prefetch_count>时1然后你的消费者将在循环旁边的头部加上你的循环消息加上新的消息(字面意思是,你的循环消息将留在头部).
如果您意外地获得N * M循环消息withprefetch_count< = N且消费者编号<= M,您将使所有消息循环(这会导致CPU烧毁等等),因此检查被拒绝可能是一个好方法.如果消息已经重新传递,则消息标志并具有一些高级逻辑.
内容总结
以上是互联网集市为您收集整理的python – 当prefetch_count == 1时拒绝并重新排队RabbitMQ任务全部内容,希望文章能够帮你解决python – 当prefetch_count == 1时拒绝并重新排队RabbitMQ任务所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。