等待条件变量超时:未及时重新获得锁 - python

我有一个名为asyncio.Conditioncond。我希望等待,但只能等待很久,然后放弃。由于asyncio.Condition.wait不会超时,因此无法直接完成。 The docs指出应使用asyncio.wait_for来包装并提供超时:

超时后,可以使用asyncio.wait_for()函数取消任务。

因此,我们得出以下解决方案:

async def coro():
    print("Taking lock...")
    async with cond:
        print("Lock acquired.")
        print("Waiting!")
        await asyncio.wait_for(cond.wait(), timeout=999)
        print("Was notified!")
    print("Lock released.")

现在假设coro本身在运行五秒钟后被取消。这会在CancelledError中引发wait_for,从而在重新引发错误之前取消cond.wait。错误然后传播到coro,由于async with块,该错误隐式尝试释放cond中的锁定。但是,当前未持有该锁。 cond.wait已被取消,但没有机会处理该取消并重新获得该锁。因此,我们得到了一个丑陋的异常,如下所示:

Taking lock...
Lock acquired.
Waiting!
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<coro() done, defined at [REDACTED]> exception=RuntimeError('Lock is not acquired.',)>
Traceback (most recent call last):
  [REDACTED], in coro
    await asyncio.wait_for(cond.wait(), timeout=999)
  [REDACTED], in wait_for
    yield from waiter
concurrent.futures._base.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  [REDACTED], in coro
    print("Was notified!")
  [REDACTED], in coro
    res = func(*args, **kw)
  [REDACTED], in __aexit__
    self.release()
  [REDACTED], in release
    raise RuntimeError('Lock is not acquired.')
RuntimeError: Lock is not acquired.

换句话说,在处理CancelledError时,coro从试图释放未持有的锁中引发了RuntimeError。 stacktrace显示print("Was notified!")行的原因是,这是有问题的async with块的最后一行。

这感觉我无法解决。我开始怀疑这是库本身的错误。但是,我想不出任何办法来避免该问题或创建解决方法,因此任何想法都将不胜感激。

在编写此问题并进行进一步调查的同时,我在Python Bug Tracker上遇到了类似的问题,最终检查了asyncio源代码,并确定这实际上是asyncio本身的错误。

对于具有相同问题的人员,我已将其提交给问题跟踪器here,并使用我创建的解决方法回答了我自己的问题。

编辑:按照ParkerD的要求,这是产生上述问题的完整可运行示例:

编辑2:更新示例以使用Python 3.7+中的新asyncio.runasyncio.create_task功能

import asyncio

async def coro():
    cond = asyncio.Condition()
    print("Taking lock...")
    async with cond:
        print("Lock acquired.")
        print("Waiting!")
        await asyncio.wait_for(cond.wait(), timeout=999)
        print("Was notified!")
    print("Lock released.")

async def cancel_after_5(c):
    task = asyncio.create_task(c)
    await asyncio.sleep(5)
    task.cancel()
    await asyncio.wait([task])

asyncio.run(cancel_after_5(coro()))

参考方案

如问题末尾所述,我已确定问题实际上是库中的错误。我将重申该错误的问题跟踪器为here,并介绍我的解决方法。

以下函数基于wait_for本身(源here),并且是该函数的一个版本,专门用于等待条件,并额外保证取消它是安全的。

调用wait_on_condition_with_timeout(cond, timeout)大致等同于asyncio.wait_for(cond.wait(), timeout)

async def wait_on_condition_with_timeout(condition: asyncio.Condition, timeout: float) -> bool:
    loop = asyncio.get_event_loop()

    # Create a future that will be triggered by either completion or timeout.
    waiter = loop.create_future()

    # Callback to trigger the future. The varargs are there to consume and void any arguments passed.
    # This allows the same callback to be used in loop.call_later and wait_task.add_done_callback,
    # which automatically passes the finished future in.
    def release_waiter(*_):
        if not waiter.done():
            waiter.set_result(None)

    # Set up the timeout
    timeout_handle = loop.call_later(timeout, release_waiter)

    # Launch the wait task
    wait_task = loop.create_task(condition.wait())
    wait_task.add_done_callback(release_waiter)

    try:
        await waiter  # Returns on wait complete or timeout
        if wait_task.done():
            return True
        else:
            raise asyncio.TimeoutError()

    except (asyncio.TimeoutError, asyncio.CancelledError):
        # If timeout or cancellation occur, clean up, cancel the wait, let it handle the cancellation,
        # then re-raise.
        wait_task.remove_done_callback(release_waiter)
        wait_task.cancel()
        await asyncio.wait([wait_task])
        raise

    finally:
        timeout_handle.cancel()

关键部分是,如果发生超时或取消,该方法将在重新引发异常之前等待条件重新获取锁:

except (asyncio.TimeoutError, asyncio.CancelledError):
        # If timeout or cancellation occur, clean up, cancel the wait, let it handle the cancellation,
        # then re-raise.
        wait_task.remove_done_callback(release_waiter)
        wait_task.cancel()
        await asyncio.wait([wait_task])  # This line is missing from the real wait_for
        raise

我已经在Python 3.6.9上进行了测试,它可以完美运行。 3.7和3.8中也存在相同的错误,因此我认为它对于那些版本也很有用。如果您想知道何时修复错误,请检查上面的问题跟踪器。如果要使用Condition以外的版本,则更改参数和create_task行应该很简单。

python asyncio run_forever或True - python

我应该在代码中替换while True(不使用asyncio)还是应该使用asyncio事件循环来实现相同的结果。目前,我正在处理某种与“ zeromq”连接的“工作者”,接收一些数据,然后对外部工具(服务器)执行一些请求(http)。一切都以普通的阻塞IO编写。使用asyncio事件循环摆脱while True: ...是否有意义?将来可能会用asynci…

Python uuid4,如何限制唯一字符的长度 - python

在Python中,我正在使用uuid4()方法创建唯一的字符集。但是我找不到将其限制为10或8个字符的方法。有什么办法吗?uuid4()ffc69c1b-9d87-4c19-8dac-c09ca857e3fc谢谢。 参考方案 尝试:x = uuid4() str(x)[:8] 输出:"ffc69c1b" Is there a way to…

Python:无法识别Pip命令 - python

这是我拍摄的屏幕截图。当我尝试在命令提示符下使用pip时,出现以下错误消息:pip无法识别为内部或外部命令,可操作程序或批处理文件。我已经检查了这个线程:How do I install pip on Windows?我所能找到的就是我必须将"C:\PythonX\Scripts"添加到我的类路径中,其中X代表python版本。如您在我的…

Python:如何将有效的uuid从String转换为UUID? - python

我收到的数据是 { "name": "Unknown", "parent": "Uncategorized", "uuid": "06335e84-2872-4914-8c5d-3ed07d2a2f16" }, 我需要将uuid从Strin…

Python 3会流行吗? - python

我已经学习了一些Python 2和Python 3,似乎Python 2总体上比Python 3更好。这就是我的问题所在。是否有充分的理由真正切换到python 3? 参考方案 总体上,甚至在大多数细节上,Python3都比Python2更好。关于第三方库, Python 3落后于的唯一区域是。使Python变得如此出色的原因不仅在于它作为一种语言的内在特性…