我有一个名为asyncio.Condition
的cond
。我希望等待,但只能等待很久,然后放弃。由于asyncio.Condition.wait
不会超时,因此无法直接完成。 The docs指出应使用asyncio.wait_for
来包装并提供超时:
超时后,可以使用asyncio.wait_for()函数取消任务。
因此,我们得出以下解决方案:
async def coro():
print("Taking lock...")
async with cond:
print("Lock acquired.")
print("Waiting!")
await asyncio.wait_for(cond.wait(), timeout=999)
print("Was notified!")
print("Lock released.")
现在假设coro
本身在运行五秒钟后被取消。这会在CancelledError
中引发wait_for
,从而在重新引发错误之前取消cond.wait
。错误然后传播到coro
,由于async with
块,该错误隐式尝试释放cond
中的锁定。但是,当前未持有该锁。 cond.wait
已被取消,但没有机会处理该取消并重新获得该锁。因此,我们得到了一个丑陋的异常,如下所示:
Taking lock...
Lock acquired.
Waiting!
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<coro() done, defined at [REDACTED]> exception=RuntimeError('Lock is not acquired.',)>
Traceback (most recent call last):
[REDACTED], in coro
await asyncio.wait_for(cond.wait(), timeout=999)
[REDACTED], in wait_for
yield from waiter
concurrent.futures._base.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
[REDACTED], in coro
print("Was notified!")
[REDACTED], in coro
res = func(*args, **kw)
[REDACTED], in __aexit__
self.release()
[REDACTED], in release
raise RuntimeError('Lock is not acquired.')
RuntimeError: Lock is not acquired.
换句话说,在处理CancelledError
时,coro
从试图释放未持有的锁中引发了RuntimeError
。 stacktrace显示print("Was notified!")
行的原因是,这是有问题的async with
块的最后一行。
这感觉我无法解决。我开始怀疑这是库本身的错误。但是,我想不出任何办法来避免该问题或创建解决方法,因此任何想法都将不胜感激。
在编写此问题并进行进一步调查的同时,我在Python Bug Tracker上遇到了类似的问题,最终检查了asyncio
源代码,并确定这实际上是asyncio
本身的错误。
对于具有相同问题的人员,我已将其提交给问题跟踪器here,并使用我创建的解决方法回答了我自己的问题。
编辑:按照ParkerD的要求,这是产生上述问题的完整可运行示例:
编辑2:更新示例以使用Python 3.7+中的新asyncio.run
和asyncio.create_task
功能
import asyncio
async def coro():
cond = asyncio.Condition()
print("Taking lock...")
async with cond:
print("Lock acquired.")
print("Waiting!")
await asyncio.wait_for(cond.wait(), timeout=999)
print("Was notified!")
print("Lock released.")
async def cancel_after_5(c):
task = asyncio.create_task(c)
await asyncio.sleep(5)
task.cancel()
await asyncio.wait([task])
asyncio.run(cancel_after_5(coro()))
参考方案
如问题末尾所述,我已确定问题实际上是库中的错误。我将重申该错误的问题跟踪器为here,并介绍我的解决方法。
以下函数基于wait_for
本身(源here),并且是该函数的一个版本,专门用于等待条件,并额外保证取消它是安全的。
调用wait_on_condition_with_timeout(cond, timeout)
大致等同于asyncio.wait_for(cond.wait(), timeout)
。
async def wait_on_condition_with_timeout(condition: asyncio.Condition, timeout: float) -> bool:
loop = asyncio.get_event_loop()
# Create a future that will be triggered by either completion or timeout.
waiter = loop.create_future()
# Callback to trigger the future. The varargs are there to consume and void any arguments passed.
# This allows the same callback to be used in loop.call_later and wait_task.add_done_callback,
# which automatically passes the finished future in.
def release_waiter(*_):
if not waiter.done():
waiter.set_result(None)
# Set up the timeout
timeout_handle = loop.call_later(timeout, release_waiter)
# Launch the wait task
wait_task = loop.create_task(condition.wait())
wait_task.add_done_callback(release_waiter)
try:
await waiter # Returns on wait complete or timeout
if wait_task.done():
return True
else:
raise asyncio.TimeoutError()
except (asyncio.TimeoutError, asyncio.CancelledError):
# If timeout or cancellation occur, clean up, cancel the wait, let it handle the cancellation,
# then re-raise.
wait_task.remove_done_callback(release_waiter)
wait_task.cancel()
await asyncio.wait([wait_task])
raise
finally:
timeout_handle.cancel()
关键部分是,如果发生超时或取消,该方法将在重新引发异常之前等待条件重新获取锁:
except (asyncio.TimeoutError, asyncio.CancelledError):
# If timeout or cancellation occur, clean up, cancel the wait, let it handle the cancellation,
# then re-raise.
wait_task.remove_done_callback(release_waiter)
wait_task.cancel()
await asyncio.wait([wait_task]) # This line is missing from the real wait_for
raise
我已经在Python 3.6.9上进行了测试,它可以完美运行。 3.7和3.8中也存在相同的错误,因此我认为它对于那些版本也很有用。如果您想知道何时修复错误,请检查上面的问题跟踪器。如果要使用Condition
以外的版本,则更改参数和create_task
行应该很简单。
我应该在代码中替换while True(不使用asyncio)还是应该使用asyncio事件循环来实现相同的结果。目前,我正在处理某种与“ zeromq”连接的“工作者”,接收一些数据,然后对外部工具(服务器)执行一些请求(http)。一切都以普通的阻塞IO编写。使用asyncio事件循环摆脱while True: ...是否有意义?将来可能会用asynci…
Python uuid4,如何限制唯一字符的长度 - python在Python中,我正在使用uuid4()方法创建唯一的字符集。但是我找不到将其限制为10或8个字符的方法。有什么办法吗?uuid4()ffc69c1b-9d87-4c19-8dac-c09ca857e3fc谢谢。 参考方案 尝试:x = uuid4() str(x)[:8] 输出:"ffc69c1b" Is there a way to…
Python:无法识别Pip命令 - python这是我拍摄的屏幕截图。当我尝试在命令提示符下使用pip时,出现以下错误消息:pip无法识别为内部或外部命令,可操作程序或批处理文件。我已经检查了这个线程:How do I install pip on Windows?我所能找到的就是我必须将"C:\PythonX\Scripts"添加到我的类路径中,其中X代表python版本。如您在我的…
Python:如何将有效的uuid从String转换为UUID? - python我收到的数据是 { "name": "Unknown", "parent": "Uncategorized", "uuid": "06335e84-2872-4914-8c5d-3ed07d2a2f16" }, 我需要将uuid从Strin…
Python 3会流行吗? - python我已经学习了一些Python 2和Python 3,似乎Python 2总体上比Python 3更好。这就是我的问题所在。是否有充分的理由真正切换到python 3? 参考方案 总体上,甚至在大多数细节上,Python3都比Python2更好。关于第三方库, Python 3落后于的唯一区域是。使Python变得如此出色的原因不仅在于它作为一种语言的内在特性…