如何在 Python 中的字典输入上使用多处理池?

发布于 2023-02-12 17:36:33

我的目标是在 python 字典上使用 map reduce aka Multiprocessing pool。我希望它将键值对映射到不同的核心,然后将结果聚合为字典。

from multiprocessing.pool import Pool
elems = {i:i for i in range(1_000_000)}

def func(x):
    return (x, elems[x]**2)

with Pool() as pool:
    results = pool.map(func, elems.keys())
    results = {a:b for a,b in results}

这是一个有点棘手的解决方案,但是否有更 Pythonic 的方式来接收字典输入并使用 Python 中的多处理池生成字典输出?

查看更多

1楼回答 2023-02-08

您可以使用 ProcessPoolExecutor 轻松进行 map-reduce:

from concurrent.futures import ProcessPoolExecutor


def process(item):
    return (item[0], item[1] ** 2)


def main():
    elems = {i: i for i in range(1_000_000)}

    output = {}

    with ProcessPoolExecutor() as pool:
        results = pool.map(process, elems.items(), chunksize=1_000)

        for result in results:
            output[result[0]] = result[1]

    print(output)


if __name__ == "__main__":
    main()

在这里,results 是一个(异步)迭代器,每次并行处理的结果可用时它都会产生一个值,您可以对其进行迭代以减少部分。由于数据的进程间通信,多处理的成本可能很高,因此,您应该调整 chunksize 参数以适合您的用例。

多处理的一些建议:

  • 永远不要改变将要并发执行的函数的共享状态,这会导致数据竞争;
  • if __name == __main__ 中保护您的主要功能,否则您会遇到问题;
  • 不要将大数据声明为全局状态(例如你的elems字典),否则它将在每个子进程中被复制,声明一个主函数;
  • 在并发执行的函数中尽可能避免访问共享状态,即使用def func(key, value)而不是elems[x]
请登录后再发布答案,点击登录