存储结果ThreadPoolExecutor - python

我对使用“ concurrent.futures”进行并行处理还很陌生,并且正在测试一些简单的实验。我编写的代码似乎有效,但是我不确定如何存储结果。我试图创建一个列表(“ futures”)并将结果附加到该列表中,但这会大大减慢该过程。我想知道是否有更好的方法可以做到这一点。谢谢。

import concurrent.futures
import time

couple_ods= []
futures=[]

dtab={}
for i in range(100):
    for j in range(100):
       dtab[i,j]=i+j/2
       couple_ods.append((i,j))

avg_speed=100
def task(i):
    origin=i[0]
    destination=i[1]
    time.sleep(0.01)
    distance=dtab[origin,destination]/avg_speed
    return distance
start1=time.time()
def main():
    with concurrent.futures.ThreadPoolExecutor() as executor:
       for number in couple_ods:
          future=executor.submit(task,number)
          futures.append(future.result())

if __name__ == '__main__':
    main()
end1=time.time()

参考方案

当您调用future.result()时,它将阻塞直到值准备就绪。因此,在这里并行性并没有带来任何好处-您启动一个任务,等待它完成,开始另一个任务,等待它完成,依此类推。

当然,您的示例首先不会从线程中受益。您的任务除了CPU受限的Python计算外什么都不做,这意味着(至少在CPython,MicroPython和PyPy中,它们是concurrent.futures随附的唯一完整实现),GIL(全局解释器锁定)将阻止更多操作。而不是您的一个线程一次进展。

希望您的真实程序有所不同。如果它正在做I / O绑定的工作(发出网络请求,读取文件等),或使用扩展库(如NumPy)释放CPU繁重的GIL,那么它将正常工作。但是否则,您将要在此处使用ProcessPoolExecutor

无论如何,您想要做的是将future本身附加到列表中,因此在等待任何期货之前,您会获得所有期货的列表:

for number in couple_ods:
    future=executor.submit(task,number)
    futures.append(future)

然后,在您开始所有作业之后,就可以开始等待它们了。有三种简单的选择,而一种复杂的选择则在您需要更多控制时。

(1)您可以直接将它们循环浏览,以按照提交的顺序等待它们:

for future in futures:
    result = future.result()
    dostuff(result)

(2)如果需要在完成任何工作之前等待它们完成,则可以调用wait

futures, _ = concurrent.futures.wait(futures)
for future in futures:
    result = future.result()
    dostuff(result)

(3)如果您希望每一个准备好后立即处理,即使它们出现故障,请使用as_completed

for result in concurrent.futures.as_completed(futures):
    dostuff(result)

请注意,在文档中使用此功能的示例提供了一些方法来标识完成的任务。如果需要,可以简单地向每个索引传递一个索引,然后传递return index, real_result,然后可以传递for index, result in …进行循环。

(4)如果您需要更多控制权,则可以循环进行wait到目前为止所做的一切:

while futures:
    done, futures = concurrent.futures.wait(concurrent.futures.FIRST_COMPLETED)
    for future in done:
        result = future.result()
        dostuff(result)

该示例执行与as_completed相同的操作,但是您可以在其上编写较小的变体以执行不同的操作,例如等待所有操作完成,但是如果有任何异常会提前取消。

对于许多简单情况,您可以仅使用执行程序的map方法来简化第一个选项。就像内置的map函数一样,它对参数中的每个值调用一次函数,然后为您提供一些内容,您可以对其进行循环操作以按相同的顺序获取结果,但它是并行进行的。所以:

for result in executor.map(task, couple_ods):
    dostuff(result)

什么时候不使用asyncio有意义? - python

在什么情况下会在asyncio上使用一个线程或执行程序(使用线程)?随着我使用Python(CPython)的经验的进步,它集中于优化工作脚本以批量执行某种形式的Web服务调用并处理响应。但是,经过几代脚本构建之后,我发现自己想知道为什么我不使用最新的脚本?请允许我在下面提供一些背景信息...问题:从服务器A向客户端B请求N个文件,进行处理并将其保存到磁盘。…

Python sqlite3数据库已锁定 - python

我在Windows上使用Python 3和sqlite3。我正在开发一个使用数据库存储联系人的小型应用程序。我注意到,如果应用程序被强制关闭(通过错误或通过任务管理器结束),则会收到sqlite3错误(sqlite3.OperationalError:数据库已锁定)。我想这是因为在应用程序关闭之前,我没有正确关闭数据库连接。我已经试过了: connectio…

python-docx应该在空单元格已满时返回空单元格 - python

我试图遍历文档中的所有表并从中提取文本。作为中间步骤,我只是尝试将文本打印到控制台。我在类似的帖子中已经看过scanny提供的其他代码,但是由于某种原因,它并没有提供我正在解析的文档的预期输出可以在https://www.ontario.ca/laws/regulation/140300中找到该文档from docx import Document from…

Python:集群作业管理 - python

我在具有两个阶段的计算群集(Slurm)上运行python脚本,它们是顺序的。我编写了两个python脚本,一个用于阶段1,另一个用于阶段2。每天早上,我检查所有第1阶段的工作是否都以视觉方式完成。只有这样,我才开始第二阶段。通过在单个python脚本中组合所有阶段和作业管理,是否有一种更优雅/自动化的方法?我如何知道工作是否完成?工作流程类似于以下内容:w…

Python-Excel导出 - python

我有以下代码:import pandas as pd import requests from bs4 import BeautifulSoup res = requests.get("https://www.bankier.pl/gielda/notowania/akcje") soup = BeautifulSoup(res.cont…