静かなる名辞

pythonとプログラミングのこと


【python】Pool.mapをProcessPoolExecutor.mapに置き換えてみる

はじめに

 concurrent.futures.ProcessPoolExecutorは便利そうなので、Poolの代わりに使ってみようと思います。

17.2. multiprocessing — プロセスベースの並列処理 — Python 3.6.5 ドキュメント
17.4. concurrent.futures – 並列タスク実行 — Python 3.6.5 ドキュメント

相違点

 非同期で使えることを除くと、以下のような違いがあります。

  • ProcessPoolExecutor.mapは複数の引数をサポートする。つまり普通のmapみたいに使える
  • ただし返り値がitertools.chainになる。リストでほしければ自分で変換する

 利便性の観点からすると、まあまあよさげです。

実験1:使えるかどうか確認する

 とりあえず同等に使えることを確認するため、次のようなコードを書いてみました。

import time
import numpy as np
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor

def f1(x):
    a, b = x
    return a * b

def f2(a, b):
    return a * b

def main():
    p1 = Pool(4)
    p2 = ProcessPoolExecutor(4)
    a, b = np.random.rand(2, 10**2, 10**2)
    alst = [a+x for x in range(1000)]
    blst = [b+x for x in range(1000)]

    t1 = time.time()
    result1 = p1.map(f1, zip(alst, blst))
    t2 = time.time()
    print("Pool:{:.6f}".format(t2 - t1))

    t1 = time.time()
    result2 = list(p2.map(f2, alst, blst))
    t2 = time.time()
    print("ProcessPoolExecutor:{:.6f}".format(t2 - t1))
    print(np.array_equal(result1, result2))

if __name__ == "__main__":
    main()

# 結果
""" =>
Pool:0.586397
ProcessPoolExecutor:1.137316
True
"""

 動いてるけど、おっそ。ドキュメントには「長いデータ渡すときはchunksizeを指定すると速くなるよ」と書いてあったので、そうしてみます。

# 中略

    result2 = list(p2.map(f2, alst, blst, chunksize=128))

# 中略
""" =>
Pool:0.637173
ProcessPoolExecutor:0.655285
True
"""

 何回か回しているとなんとなく微妙に遅い気もするのですが(たぶん5%~10%くらい?)、とりあえず一応ほぼ互角。

実験2:メソッドを試す

 インスタンスメソッドでも行けるかどうか見ます。

import time
import numpy as np
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor

class F:
    def f1(self, x):
        a, b = x
        return a * b

    def f2(self, a, b):
        return a * b

def main():
    p1 = Pool(4)
    p2 = ProcessPoolExecutor(4)
    a, b = np.random.rand(2, 10**2, 10**2)
    alst = [a+x for x in range(1000)]
    blst = [b+x for x in range(1000)]
    f = F()

    t1 = time.time()
    result1 = p1.map(f.f1, zip(alst, blst))
    t2 = time.time()
    print("Pool:{:.6f}".format(t2 - t1))

    t1 = time.time()
    result2 = list(p2.map(f.f2, alst, blst, chunksize=128))
    t2 = time.time()
    print("ProcessPoolExecutor:{:.6f}".format(t2 - t1))
    print(np.array_equal(result1, result2))

if __name__ == "__main__":
    main()

 普通に走ったので問題なさそう。古めのPythonだとメソッドは渡せなかったはずなのですが、最近は渡せるみたいですね。

実験3:if __name__ == "__main__":を外す

 multiprocessingを使っているとたまにこれがないと落ちることがあるみたいなので、外してみます。

import time
import numpy as np
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor

def f1(x):
    a, b = x
    return a * b

def f2(a, b):
    return a * b

p1 = Pool(4)
p2 = ProcessPoolExecutor(4)
a, b = np.random.rand(2, 10**2, 10**2)
alst = [a+x for x in range(1000)]
blst = [b+x for x in range(1000)]

t1 = time.time()
result1 = p1.map(f1, zip(alst, blst))
t2 = time.time()
print("Pool:{:.6f}".format(t2 - t1))

t1 = time.time()
result2 = list(p2.map(f2, alst, blst, chunksize=128))
t2 = time.time()
print("ProcessPoolExecutor:{:.6f}".format(t2 - t1))
print(np.array_equal(result1, result2))

 これも問題なかったです。

結論

 概ね同等に使えます。なにしろ引数を複数渡せるのが便利なところで、wrapper関数をわざわざ書かなくて済みます。あとはlistに変換するまでは非同期で動いている訳で、それを期待して使うのも状況次第ではありでしょう。

 ただしchunksizeは明示的に大きめに指定することです。