Streamz / Dask:收集不等待缓冲区的所有结果 - python

进口:

from dask.distributed import Client
import streamz
import time

模拟的工作量:

def increment(x):
    time.sleep(0.5)
    return x + 1

假设我想在本地Dask客户端上处理一些工作负载:

if __name__ == "__main__":
    with Client() as dask_client:
        ps = streamz.Stream()
        ps.scatter().map(increment).gather().sink(print)

        for i in range(10):
            ps.emit(i)

这可以按预期工作,但是sink(print)当然会强制等待每个结果,因此流将不会并行执行。

但是,如果我使用buffer()允许将结果缓存,则gather()似乎不再正确收集所有结果,并且解释器在获取结果之前退出。这种方法:

if __name__ == "__main__":
    with Client() as dask_client:
        ps = streamz.Stream()
        ps.scatter().map(increment).buffer(10).gather().sink(print)
                                     # ^
        for i in range(10):          # - allow parallel execution 
            ps.emit(i)               # - before gather()

...不会为我打印任何结果。 Python解释器仅在启动脚本之后且buffer()发出结果之前不久退出,因此不会打印任何内容。

但是,如果主进程被迫等待一段时间,结果将以并行方式打印(因此它们不会彼此等待,而是几乎同时打印):

if __name__ == "__main__":
    with Client() as dask_client:
        ps = streamz.Stream()
        ps.scatter().map(increment).buffer(10).gather().sink(print)

        for i in range(10):
            ps.emit(i)

        time.sleep(10)  # <- force main process to wait while ps is working

这是为什么?我认为gather()应该等待一批10个结果,因为buffer()在将它们刷新到gather()之前应该并行缓存正好10个结果。为什么在这种情况下gather()不阻止?

是否有一种很好的方法来检查Stream是否仍然包含正在处理的元素,以防止主进程过早退出?

参考方案

“为什么?”:因为Dask分布式调度程序(执行流映射器和接收器功能)和python脚本在不同的进程中运行。当“ with”块上下文结束时,在发送到流的项目能够到达接收器功能之前,Dask Client将关闭并且执行将关闭。
“是否有一种很好的方法来检查Stream是否仍然包含正在处理的元素”:我不知道。但是:如果您想要的行为是(我只是在这里猜测)一堆项目的并行处理,那么Streamz不是您应该使用的对象,香草Dask就足够了。

使用Dask读取多个文件 - python

我正在尝试简单地并行读取24个科学数据文件,每个文件约250MB,因此总计约6GB。数据为2D数组格式。它存储在并行文件系统中,并从群集中读取,尽管我现在仅从单个节点读取。数据采用类似于HDF5(称为Adios)的格式,并且类似于h5py包进行读取。每个文件大约需要4秒钟才能读取。我正在阅读此处(http://docs.dask.org/en/latest/…

Python GPU资源利用 - python

我有一个Python脚本在某些深度学习模型上运行推理。有什么办法可以找出GPU资源的利用率水平?例如,使用着色器,float16乘法器等。我似乎在网上找不到太多有关这些GPU资源的文档。谢谢! 参考方案 您可以尝试在像Renderdoc这样的GPU分析器中运行pyxthon应用程序。它将分析您的跑步情况。您将能够获得有关已使用资源,已用缓冲区,不同渲染状态上…

在快速本地集群上管理工作人员内存 - python

我试图用dask加载数据集,但是当需要计算我的数据集时,我总是遇到这样的问题: 警告-工作者超出了95%的内存预算。正在重新启动。我只是在本地计算机上工作,因此启动了dask,如下所示:if __name__ == '__main__': libmarket.config.client = Client() # use dask.dist…

Python:图像处理可产生皱纹纸效果 - python

也许很难描述我的问题。我正在寻找Python中的算法,以在带有某些文本的白色图像上创建皱纹纸效果。我的第一个尝试是在带有文字的图像上添加一些真实的皱纹纸图像(具有透明度)。看起来不错,但副作用是文本没有真正起皱。所以我正在寻找更好的解决方案,有什么想法吗?谢谢 参考方案 除了使用透明性之外,假设您有两张相同尺寸的图像,一张在皱纹纸上明亮,一张在白色背景上有深…

Python uuid4,如何限制唯一字符的长度 - python

在Python中,我正在使用uuid4()方法创建唯一的字符集。但是我找不到将其限制为10或8个字符的方法。有什么办法吗?uuid4()ffc69c1b-9d87-4c19-8dac-c09ca857e3fc谢谢。 参考方案 尝试:x = uuid4() str(x)[:8] 输出:"ffc69c1b" Is there a way to…