python – 调用celery任务挂起延迟和apply_async
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了python – 调用celery任务挂起延迟和apply_async,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含8966字,纯文字阅读大概需要13分钟。
内容图文
![python – 调用celery任务挂起延迟和apply_async](/upload/InfoBanner/zyjiaocheng/818/bfd3051642304bbcb2bea8cce04e1c9c.jpg)
我创建了一个具有以下目录结构的芹菜应用程序(如芹菜网站中所示):
proj
|-- celery.py
|-- celery.pyc
|-- __init__.py
|-- __init__.pyc
|-- tasks.py
`-- tasks.pyc
以下是celery.py的内容
from __future__ import absolute_import
from celery import Celery
app = Celery('proj',
broker='amqp://rabbitmquser:<my_passowrd>@localhost:5672/localvhost',
#backend='amqp://',
include=['proj.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600,
)
if __name__ == '__main__':
app.start()
以下是tasks.py的内容
from __future__ import absolute_import
from proj.celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
现在我用以下命令启动芹菜工作者:
celery -A proj worker -l debug
我认为工作人员运行良好,因为它输出如下:
[2014-06-12 21:25:02,326: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2014-06-12 21:25:02,328: DEBUG/MainProcess] | Worker: Building graph...
[2014-06-12 21:25:02,328: DEBUG/MainProcess] | Worker: New boot order: {Timer, Hub, Queues (intra), Pool, Autoscaler, Beat, Autoreloader, StateDB, Consumer}
[2014-06-12 21:25:02,331: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2014-06-12 21:25:02,331: DEBUG/MainProcess] | Consumer: Building graph...
[2014-06-12 21:25:02,334: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Tasks, Control, Agent, Heart, Gossip, event loop}
[2014-06-12 21:25:02,335: WARNING/MainProcess] /home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/apps/worker.py:161: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default.
The pickle serializer is a security concern as it may give attackers
the ability to execute any command. It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.
If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
You must only enable the serializers that you will actually use.
warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))
-------------- celery@ansumanb-u12 v3.1.12 (Cipater)
---- **** -----
--- * *** * -- Linux-3.5.0-25-generic-x86_64-with-Ubuntu-12.04-precise
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: proj:0x1f46690
- ** ---------- .> transport: amqp://rabbitmquser:**@localhost:5672/localvhost
- ** ---------- .> results: disabled
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. celery.backend_cleanup
. celery.chain
. celery.chord
. celery.chord_unlock
. celery.chunks
. celery.group
. celery.map
. celery.starmap
. proj.tasks.add
. proj.tasks.mul
. proj.tasks.xsum
[2014-06-12 21:25:02,336: DEBUG/MainProcess] | Worker: Starting Hub
[2014-06-12 21:25:02,336: DEBUG/MainProcess] ^-- substep ok
[2014-06-12 21:25:02,336: DEBUG/MainProcess] | Worker: Starting Pool
[2014-06-12 21:25:02,344: DEBUG/MainProcess] ^-- substep ok
[2014-06-12 21:25:02,345: DEBUG/MainProcess] | Worker: Starting Consumer
[2014-06-12 21:25:02,345: DEBUG/MainProcess] | Consumer: Starting Connection
在运行worker之后,我打开终端并从python解释器执行以下操作:
>>> from proj.tasks import add
>>> add(2,2)
4
>>> add.delay(2,3)
这里延迟挂起(apply_async的故事相同).当我通过Ctrl C停止它时,我得到以下信息:
^CTraceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/task.py", line 453, in delay
return self.apply_async(args, kwargs)
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/task.py", line 555, in apply_async
**dict(self._get_exec_options(), **options)
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/base.py", line 352, in send_task
reply_to=reply_to or self.oid, **options
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/amqp.py", line 305, in publish_task
**kwargs
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 168, in publish
routing_key, mandatory, immediate, exchange, declare)
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 436, in _ensured
return fun(*args, **kwargs)
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 173, in _publish
channel = self.channel
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 190, in _get_channel
channel = self._channel = channel()
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/utils/__init__.py", line 422, in __call__
value = self.__value__ = self.__contract__()
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 205, in <lambda>
channel = ChannelPromise(lambda: connection.default_channel)
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 756, in default_channel
self.connection
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 741, in connection
self._connection = self._establish_connection()
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 696, in _establish_connection
conn = self.transport.establish_connection()
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 112, in establish_connection
conn = self.Connection(**opts)
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/connection.py", line 171, in __init__
(10, 10), # start
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 67, in wait
self.channel_id, allowed_methods)
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/connection.py", line 237, in _wait_method
self.method_reader.read_method()
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 186, in read_method
self._next_method()
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 107, in _next_method
frame_type, channel, payload = read_frame()
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/transport.py", line 153, in read_frame
frame_type, channel, size = unpack('>BHI', read(7, True))
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/transport.py", line 272, in _read
s = recv(n - len(rbuf))
KeyboardInterrupt
任何建议或评论将不胜感激.
我已经通过其他链接谈论/ var目录大小,但我认为我有足够的空间.
df -h的结果
Filesystem Size Used Avail Use% Mounted on
/dev/sda3 283G 99G 170G 37% /
udev 1.9G 4.0K 1.9G 1% /dev
tmpfs 388M 1.1M 387M 1% /run
none 5.0M 0 5.0M 0% /run/lock
none 1.9G 28M 1.9G 2% /run/shm
以下是rabbitmqctl状态的结果
Status of node 'rabbit@ansumanb-u12' ...
[{pid,12014},
{running_applications,[{rabbit,"RabbitMQ","3.3.2"},
{os_mon,"CPO CXC 138 46","2.2.7"},
{xmerl,"XML parser","1.2.10"},
{mnesia,"MNESIA CXC 138 12","4.5"},
{sasl,"SASL CXC 138 11","2.1.10"},
{stdlib,"ERTS CXC 138 10","1.17.5"},
{kernel,"ERTS CXC 138 10","2.14.5"}]},
{os,{unix,linux}},
{erlang_version,"Erlang R14B04 (erts-5.8.5) [source] [64-bit] [smp:4:4] [rq:4] [async-threads:30] [kernel-poll:true]\n"},
{memory,[{total,27919080},
{connection_procs,2704},
{queue_procs,5408},
{plugins,0},
{other_proc,9099992},
{mnesia,63776},
{mgmt_db,0},
{msg_index,34080},
{other_ets,784160},
{binary,12144},
{code,14685283},
{atom,1367393},
{other_system,1864140}]},
{alarms,[]},
{listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]},
{vm_memory_high_watermark,0.4},
{vm_memory_limit,1625165004},
{disk_free_limit,50000000},
{disk_free,181684699136},
{file_descriptors,[{total_limit,2},
{total_used,0},
{sockets_limit,0},
{sockets_used,0}]},
{processes,[{limit,1048576},{used,127}]},
{run_queue,0},
{uptime,20072}]
...done.
我检查了rabbitmq日志,但没有得到任何东西.芹菜版是3.1.12.
我用以下命令创建了rabbitmq虚拟主机和用户
$sudo rabbitmqctl add_user rabbitmquser <mypassword>
$sudo rabbitmqctl add_vhost localvhost
$sudo rabbitmqctl set_permissions -p localvhost rabbitmquser ".*" ".*" ".*"
谢谢
解决方法:
我犯了一个很大的错误.我的问题是我试图改变一些我不知道的事情.我正在添加它供其他人参考.
在安装rabbitmq时,我在/ etc / default / rabbitmq-server中将ulimit的默认值从1024更改为100.
我将值更改回1024,现在问题已修复.
内容总结
以上是互联网集市为您收集整理的python – 调用celery任务挂起延迟和apply_async全部内容,希望文章能够帮你解决python – 调用celery任务挂起延迟和apply_async所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。