所以现在3.8已经发布了,所以我要在asyncio
模块上另辟径。但是,当尝试正常关闭事件循环时,我得到了意外的结果。具体来说,我正在监听SIGINT
,取消正在运行的Task
,收集这些Task
,然后使用.stop()
进行事件循环。我知道Task
在取消时会引发CancelledError
,它将传播并结束对asyncio.gather
的调用,除非根据documentation,我将return_exceptions=True
传递给asyncio.gather
,这会使gather
等待所有Task
取消并返回CancelledError
的数组。但是,如果我尝试return_exceptions=True
取消gather
,gather
仍然会立即导致我的Task
调用中断。
这是重现效果的代码。我正在运行python 3.8.0:
# demo.py
import asyncio
import random
import signal
async def worker():
sleep_time = random.random() * 3
await asyncio.sleep(sleep_time)
print(f"Slept for {sleep_time} seconds")
async def dispatcher(queue):
while True:
await queue.get()
asyncio.create_task(worker())
tasks = asyncio.all_tasks()
print(f"Running Tasks: {len(tasks)}")
async def shutdown(loop):
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
print(f"Cancelling {len(tasks)} outstanding tasks")
results = await asyncio.gather(*tasks, return_exceptions=True)
print(f"results: {results}")
loop.stop()
async def main():
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(shutdown(loop)))
queue = asyncio.Queue()
asyncio.create_task(dispatcher(queue))
while True:
await queue.put('tick')
await asyncio.sleep(1)
asyncio.run(main())
输出:
>> python demo.py
Running Tasks: 3
Slept for 0.3071352174511871 seconds
Running Tasks: 3
Running Tasks: 4
Slept for 0.4152310498820644 seconds
Running Tasks: 4
^CCancelling 4 outstanding tasks
Traceback (most recent call last):
File "demo.py", line 38, in <module>
asyncio.run(main())
File "/Users/max.taggart/.pyenv/versions/3.8.0/lib/python3.8/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/Users/max.taggart/.pyenv/versions/3.8.0/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
return future.result()
asyncio.exceptions.CancelledError
我猜想事件循环还有一些我不了解的地方,但是我希望所有CancelledError
都以存储在results
中的对象数组的形式返回,然后能够继续运行而不是看到一个立即出错。
参考方案
是什么导致错误?
使用asyncio.all_tasks()
的问题是,它会返回所有任务,即使您不是直接创建的。按照以下方式更改代码,以查看要取消的操作:
for task in tasks:
print(task)
task.cancel()
您不仅会看到与worker
相关的任务,还会看到:
<Task pending coro=<main() running at ...>
取消main
会导致asyncio.run(main())
内部混乱,并且会出错。让我们进行快速/脏修改以将此任务排除在取消之外:
tasks = [
t
for t
in asyncio.all_tasks()
if (
t is not asyncio.current_task()
and t._coro.__name__ != 'main'
)
]
for task in tasks:
print(task)
task.cancel()
现在,您将看到results
。
loop.stop()导致错误
在达到results
的同时,还会收到另一个错误Event loop stopped before Future completed
。这是因为asyncio.run(main())
想要一直运行到main()
完成。
您必须重组代码以允许完成传递给asyncio.run
的协程,而不是停止事件循环,或者例如,使用loop.run_forever()代替asyncio.run
。
这是我的意思的快速/肮脏的演示:
async def shutdown(loop):
# ...
global _stopping
_stopping = True
# loop.stop()
_stopping = False
async def main():
# ...
while not _stopping:
await queue.put('tick')
await asyncio.sleep(1)
现在,您的代码可以正常运行了。请勿在实践中使用以上代码,这只是示例。如上所述,尝试重组您的代码。
如何正确处理任务
不要使用asyncio.all_tasks()
。
如果创建将来要取消的某些任务,请存储它并仅取消存储的任务。伪代码:
i_created = []
# ...
task = asyncio.create_task(worker())
i_created.append(task)
# ...
for task in i_created:
task.cancel()
似乎并不方便,但这是一种确保您不取消不想取消的东西的方法。
还有一件事
还要注意,asyncio.run()
不仅可以启动事件循环,还可以执行much more。特别地,it cancels在完成之前所有挂起的任务。在某些情况下,它可能很有用,尽管我建议您手动处理所有取消操作。
我应该在代码中替换while True(不使用asyncio)还是应该使用asyncio事件循环来实现相同的结果。目前,我正在处理某种与“ zeromq”连接的“工作者”,接收一些数据,然后对外部工具(服务器)执行一些请求(http)。一切都以普通的阻塞IO编写。使用asyncio事件循环摆脱while True: ...是否有意义?将来可能会用asynci…
Python-尝试使用意外的mimetype解码JSON: - python我最近从请求切换到了aiohttp,因为我无法在asyncio循环中使用它。交换进行得很顺利,除一件事外,其他一切都进行得很好。我的控制台充满了Attempt to decode JSON with unexpected mimetype: 和Attempt to decode JSON with unexpected mimetype: txt/html;…
通过参数传递异步循环或使用默认异步循环 - python我在我的应用程序中使用asyncio,我有点困惑将事件循环作为参数传递。使用事件循环编写函数/方法时,有三种可能性:将异步事件循环作为参数传递不要在事件循环中使用参数,而应使用asyncio.get_event_loop() 使其可选,以将事件循环作为参数传递。如果未通过,请使用asyncio.get_event_loop() 似乎大多数情况下都使用最后一种…
如何将Celery与asyncio结合? - python如何创建使芹菜任务看起来像asyncio.Task的包装器?还是有更好的方法将Celery与asyncio集成?@ asksol,Celery的创建者said this:: 使用Celery作为异步I / O框架之上的分布式层是很常见的(提示:将CPU绑定的任务路由到prefork worker意味着它们不会阻塞事件循环)。但是我找不到任何专门针对async…
我什么时候应该在常规线程上使用asyncio,为什么?它可以提高性能吗? - python我对Python中的多线程有基本的了解,甚至对asyncio也有较基本的了解。我目前正在编写一个基于Curses的小型程序(最终将使用完整的GUI,但这是另一个故事),该程序处理主线程中的UI和用户IO,然后有两个其他守护程序线程(每个线程都有自己的守护程序)排队/工人从队列中得到东西的方法):一个watcher线程,监视是否发生基于时间的和有条件的(例如,…