python asyncio协程动态添加任务、协程池
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了python asyncio协程动态添加任务、协程池,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含3065字,纯文字阅读大概需要5分钟。
内容图文
![python asyncio协程动态添加任务、协程池](/upload/InfoBanner/zyjiaocheng/619/b70d590f75b346fcbd597b1309c58c3e.jpg)
文章目录
asyncio 协程介绍:
- 动态添加任务:
- 方案是创建一个线程,使事件循环在线程内永久运行
- 设置守护进程,随着主进程一起关闭
- 自动停止任务
- 阻塞任务完成
- 协程池
- 队列自带阻塞机制,当队列满了后会阻塞,因此可以取代 asyncio.Semaphore()
demo
import asyncio
import aiohttp
import time
import nest_asyncio
import queue
from threading import Thread
class AsyncPool(object):
"""
1. 支持动态添加任务
2. 支持自动停止事件循环
3. 支持最大协程数
"""
def __init__(self, loop=None, maxsize=0):
"""
初始化
:param loop:
:param maxsize: 默认0,不限制队列
"""
# 在jupyter需要这个,不然asyncio运行出错
nest_asyncio.apply()
# 获取一个事件循环
if not loop:
self.loop = asyncio.new_event_loop()
# 队列,先进先出,根据队列是否为空判断,退出协程
self.q = queue.Queue(maxsize)
self.loop_thread = None
if self.loop:
self.start_thread_loop()
def add(self, item=1):
"""
添加任务
:param item:
:return:
"""
self.q.put(item)
def done(self, fn):
"""
任务完成
回调函数
:param fn:
:return:
"""
if fn:
pass
self.q.get()
self.q.task_done()
def wait(self):
"""
等待任务执行完毕
:return:
"""
self.q.join()
@staticmethod
def _start_thread_loop(loop):
"""
运行事件循环
:param loop: loop以参数的形式传递进来运行
:return:
"""
# 将当前上下文的事件循环设置为循环。
asyncio.set_event_loop(loop)
# 开始事件循环
loop.run_forever()
def start_thread_loop(self):
"""
运行事件循环
:return:
"""
self.loop_thread = Thread(target=self._start_thread_loop, args=(self.loop,))
# 设置守护进程
self.loop_thread.setDaemon(True)
# 运行线程,同时协程事件循环也会运行
self.loop_thread.start()
def stop_thread_loop(self, loop_time=1):
"""
队列为空,则关闭线程
:param loop_time:
:return:
"""
async def _close_thread_loop():
"""
关闭线程
:return:
"""
while True:
if self.q.empty():
self.loop.stop()
break
await asyncio.sleep(loop_time)
# 等待关闭线程
asyncio.run_coroutine_threadsafe(_close_thread_loop(), self.loop)
def submit(self, func, callback=None):
"""
提交任务到事件循环
:param func: 异步函数对象
:param callback: 回调函数
:return:
"""
# 将协程注册一个到运行在线程中的循环,thread_loop 会获得一个环任务
# 注意:run_coroutine_threadsafe 这个方法只能用在运行在线程中的循环事件使用
future = asyncio.run_coroutine_threadsafe(func, self.loop)
# 回调函数封装
def callback_done(_future):
try:
if callback:
callback(_future)
finally:
self.done(_future)
# 添加回调函数
future.add_done_callback(callback_done)
def release(self, loop_time=1):
"""
释放线程
:param loop_time:
:return:
"""
self.stop_thread_loop(loop_time)
def running(self):
"""
获取当前线程数
:return:
"""
return self.q.qsize()
async def thread_example(i):
url = "http://127.0.0.1:8080/app04/async4?num={}".format(i)
async with aiohttp.ClientSession() as session:
async with session.get(url) as res:
# print(res.status)
# print(res.content)
return await res.text()
def my_callback(future):
result = future.result()
print('返回值: ', result)
def main():
# 任务组, 最大协程数
pool = AsyncPool(maxsize=10000)
# 插入任务任务
for i in range(100000):
pool.add()
pool.submit(thread_example(i), my_callback)
# 停止事件循环
pool.release()
# 等待
pool.wait()
print("等待子线程结束...")
if __name__ == '__main__':
start_time = time.time()
main()
end_time = time.time()
print("run time: ", end_time - start_time)
内容总结
以上是互联网集市为您收集整理的python asyncio协程动态添加任务、协程池全部内容,希望文章能够帮你解决python asyncio协程动态添加任务、协程池所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。