我的目标是在 python 字典上使用 map reduce aka Multiprocessing pool。我希望它将键值对映射到不同的核心,然后将结果聚合为字典。
from multiprocessing.pool import Pool
elems = {i:i for i in range(1_000_000)}
def func(x):
return (x, elems[x]**2)
with Pool() as pool:
results = pool.map(func, elems.keys())
results = {a:b for a,b in results}
这是一个有点棘手的解决方案,但是否有更 Pythonic 的方式来接收字典输入并使用 Python 中的多处理池生成字典输出?
您可以使用 ProcessPoolExecutor
轻松进行 map-reduce:
from concurrent.futures import ProcessPoolExecutor
def process(item):
return (item[0], item[1] ** 2)
def main():
elems = {i: i for i in range(1_000_000)}
output = {}
with ProcessPoolExecutor() as pool:
results = pool.map(process, elems.items(), chunksize=1_000)
for result in results:
output[result[0]] = result[1]
print(output)
if __name__ == "__main__":
main()
在这里,results
是一个(异步)迭代器,每次并行处理的结果可用时它都会产生一个值,您可以对其进行迭代以减少部分。由于数据的进程间通信,多处理的成本可能很高,因此,您应该调整 chunksize
参数以适合您的用例。
多处理的一些建议:
if __name == __main__
中保护您的主要功能,否则您会遇到问题;elems
字典),否则它将在每个子进程中被复制,声明一个主函数;def func(key, value)
而不是elems[x]
。