我想在异步上下文中使用PyAudio
库,但是该库的主要入口只有一个基于回调的API:
import pyaudio
def callback(in_data, frame_count, time_info, status):
# Do something with data
pa = pyaudio.PyAudio()
self.stream = self.pa.open(
stream_callback=callback
)
我希望如何使用它是这样的:
pa = SOME_ASYNC_COROUTINE()
async def listen():
async for block in pa:
# Do something with block
问题是,我不确定如何将该回调语法转换为将来在回调触发时完成的功能。在JavaScript中,我会使用promise.promisify()
,但是Python似乎没有这样的东西。
参考方案
promisify
的等效项不适用于此用例,原因有两个:
PyAudio的异步API不使用asyncio事件循环-文档指定了从后台线程调用回调。这需要采取预防措施以正确地与asyncio通信。
回调不能由单个Future建模,因为它被多次调用,而Future只有一个结果。相反,必须将其转换为异步迭代器,如示例代码所示。
这是一种可能的实现:
def make_iter():
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
def put(*args):
loop.call_soon_threadsafe(queue.put_nowait, args)
async def get():
while True:
yield await queue.get()
return get(), put
make_iter
返回一对。返回的对象具有调用回调导致迭代器产生其下一个值(传递给回调的参数)的属性。可以从任意线程调用该回调,从而可以安全地传递给pyaudio.open
,而异步迭代器应在异步协程中提供给async for
,该协程将在等待下一个值时挂起:
async def main():
stream_get, stream_put = make_iter()
stream = pa.open(stream_callback=stream_put)
stream.start_stream()
async for in_data, frame_count, time_info, status in stream_get:
# ...
asyncio.get_event_loop().run_until_complete(main())
请注意,根据documentation,回调还必须返回有意义的值,帧的元组和布尔值标志。可以通过更改fill
函数将其合并到设计中,以也从异步端接收数据。不包括该实现,因为如果不了解该域,它可能就没有多大意义。
我有一个Python脚本在某些深度学习模型上运行推理。有什么办法可以找出GPU资源的利用率水平?例如,使用着色器,float16乘法器等。我似乎在网上找不到太多有关这些GPU资源的文档。谢谢! 参考方案 您可以尝试在像Renderdoc这样的GPU分析器中运行pyxthon应用程序。它将分析您的跑步情况。您将能够获得有关已使用资源,已用缓冲区,不同渲染状态上…
Python:图像处理可产生皱纹纸效果 - python也许很难描述我的问题。我正在寻找Python中的算法,以在带有某些文本的白色图像上创建皱纹纸效果。我的第一个尝试是在带有文字的图像上添加一些真实的皱纹纸图像(具有透明度)。看起来不错,但副作用是文本没有真正起皱。所以我正在寻找更好的解决方案,有什么想法吗?谢谢 参考方案 除了使用透明性之外,假设您有两张相同尺寸的图像,一张在皱纹纸上明亮,一张在白色背景上有深…
Python uuid4,如何限制唯一字符的长度 - python在Python中,我正在使用uuid4()方法创建唯一的字符集。但是我找不到将其限制为10或8个字符的方法。有什么办法吗?uuid4()ffc69c1b-9d87-4c19-8dac-c09ca857e3fc谢谢。 参考方案 尝试:x = uuid4() str(x)[:8] 输出:"ffc69c1b" Is there a way to…
Python sqlite3数据库已锁定 - python我在Windows上使用Python 3和sqlite3。我正在开发一个使用数据库存储联系人的小型应用程序。我注意到,如果应用程序被强制关闭(通过错误或通过任务管理器结束),则会收到sqlite3错误(sqlite3.OperationalError:数据库已锁定)。我想这是因为在应用程序关闭之前,我没有正确关闭数据库连接。我已经试过了: connectio…
python:ConfigParser对象,然后再阅读一次 - python场景:我有一个配置文件,其中包含要执行的自动化测试的列表。这些测试是长期循环执行的。 配置文件的设计方式使ConfigParser可以读取它。由于有两个三个参数,因此我需要通过每个测试。现在,此配置文件由script(s1)调用,并且按照配置文件中的列表执行测试。Script(s1)第一次读取配置,并且在每次测试完成后都会执行。阅读两次的要求:由于可能会…