如何正确处理Python的`asyncio.gather`中已取消的任务 - python

所以现在3.8已经发布了,所以我要在asyncio模块上另辟径。但是,当尝试正常关闭事件循环时,我得到了意外的结果。具体来说,我正在监听SIGINT,取消正在运行的Task,收集这些Task,然后使用.stop()进行事件循环。我知道Task在取消时会引发CancelledError,它将传播并结束对asyncio.gather的调用,除非根据documentation,我将return_exceptions=True传递给asyncio.gather,这会使gather等待所有Task取消并返回CancelledError的数组。但是,如果我尝试return_exceptions=True取消gathergather仍然会立即导致我的Task调用中断。

这是重现效果的代码。我正在运行python 3.8.0:

# demo.py

import asyncio
import random
import signal


async def worker():
    sleep_time = random.random() * 3
    await asyncio.sleep(sleep_time)
    print(f"Slept for {sleep_time} seconds")

async def dispatcher(queue):
    while True:
        await queue.get()
        asyncio.create_task(worker())
        tasks = asyncio.all_tasks()
        print(f"Running Tasks: {len(tasks)}")

async def shutdown(loop):
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    for task in tasks:
        task.cancel()
    print(f"Cancelling {len(tasks)} outstanding tasks")
    results = await asyncio.gather(*tasks, return_exceptions=True)
    print(f"results: {results}")
    loop.stop()

async def main():
    loop = asyncio.get_event_loop()
    loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(shutdown(loop)))
    queue = asyncio.Queue()
    asyncio.create_task(dispatcher(queue))

    while True:
        await queue.put('tick')
        await asyncio.sleep(1)


asyncio.run(main())

输出:

>> python demo.py 
Running Tasks: 3
Slept for 0.3071352174511871 seconds
Running Tasks: 3
Running Tasks: 4
Slept for 0.4152310498820644 seconds
Running Tasks: 4
^CCancelling 4 outstanding tasks
Traceback (most recent call last):
  File "demo.py", line 38, in <module>
    asyncio.run(main())
  File "/Users/max.taggart/.pyenv/versions/3.8.0/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/Users/max.taggart/.pyenv/versions/3.8.0/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
    return future.result()
asyncio.exceptions.CancelledError

我猜想事件循环还有一些我不了解的地方,但是我希望所有CancelledError都以存储在results中的对象数组的形式返回,然后能够继续运行而不是看到一个立即出错。

参考方案

是什么导致错误?

使用asyncio.all_tasks()的问题是,它会返回所有任务,即使您不是直接创建的。按照以下方式更改代码,以查看要取消的操作:

for task in tasks:
    print(task)
    task.cancel()

您不仅会看到与worker相关的任务,还会看到:

<Task pending coro=<main() running at ...>

取消main会导致asyncio.run(main())内部混乱,并且会出错。让我们进行快速/脏修改以将此任务排除在取消之外:

tasks = [
    t 
    for t 
    in asyncio.all_tasks() 
    if (
        t is not asyncio.current_task()
        and t._coro.__name__ != 'main'
    )
]

for task in tasks:
    print(task)
    task.cancel()

现在,您将看到results

loop.stop()导致错误

在达到results的同时,还会收到另一个错误Event loop stopped before Future completed。这是因为asyncio.run(main())想要一直运行到main()完成。

您必须重组代码以允许完成传递给asyncio.run的协程,而不是停止事件循环,或者例如,使用loop.run_forever()代替asyncio.run

这是我的意思的快速/肮脏的演示:

async def shutdown(loop):
    # ...

    global _stopping
    _stopping = True
    # loop.stop()

_stopping = False

async def main():
    # ...

    while not _stopping:
        await queue.put('tick')
        await asyncio.sleep(1)

现在,您的代码可以正常运行了。请勿在实践中使用以上代码,这只是示例。如上所述,尝试重组您的代码。

如何正确处理任务

不要使用asyncio.all_tasks()

如果创建将来要取消的某些任务,请存储它并仅取消存储的任务。伪代码:

i_created = []

# ...

task = asyncio.create_task(worker())
i_created.append(task)

# ...

for task in i_created:
    task.cancel()

似乎并不方便,但这是一种确保您不取消不想取消的东西的方法。

还有一件事

还要注意,asyncio.run()不仅可以启动事件循环,还可以执行much more。特别地,it cancels在完成之前所有挂起的任务。在某些情况下,它可能很有用,尽管我建议您手动处理所有取消操作。

python asyncio run_forever或True - python

我应该在代码中替换while True(不使用asyncio)还是应该使用asyncio事件循环来实现相同的结果。目前,我正在处理某种与“ zeromq”连接的“工作者”,接收一些数据,然后对外部工具(服务器)执行一些请求(http)。一切都以普通的阻塞IO编写。使用asyncio事件循环摆脱while True: ...是否有意义?将来可能会用asynci…

Python-尝试使用意外的mimetype解码JSON: - python

我最近从请求切换到了aiohttp,因为我无法在asyncio循环中使用它。交换进行得很顺利,除一件事外,其他一切都进行得很好。我的控制台充满了Attempt to decode JSON with unexpected mimetype: 和Attempt to decode JSON with unexpected mimetype: txt/html;…

通过参数传递异步循环或使用默认异步循环 - python

我在我的应用程序中使用asyncio,我有点困惑将事件循环作为参数传递。使用事件循环编写函数/方法时,有三种可能性:将异步事件循环作为参数传递不要在事件循环中使用参数,而应使用asyncio.get_event_loop() 使其可选,以将事件循环作为参数传递。如果未通过,请使用asyncio.get_event_loop() 似乎大多数情况下都使用最后一种…

如何将Celery与asyncio结合? - python

如何创建使芹菜任务看起来像asyncio.Task的包装器?还是有更好的方法将Celery与asyncio集成?@ asksol,Celery的创建者said this:: 使用Celery作为异步I / O框架之上的分布式层是很常见的(提示:将CPU绑定的任务路由到prefork worker意味着它们不会阻塞事件循环)。但是我找不到任何专门针对async…

我什么时候应该在常规线程上使用asyncio,为什么?它可以提高性能吗? - python

我对Python中的多线程有基本的了解,甚至对asyncio也有较基本的了解。我目前正在编写一个基于Curses的小型程序(最终将使用完整的GUI,但这是另一个故事),该程序处理主线程中的UI和用户IO,然后有两个其他守护程序线程(每个线程都有自己的守护程序)排队/工人从队列中得到东西的方法):一个watcher线程,监视是否发生基于时间的和有条件的(例如,…