静かなる名辞

pythonとプログラミングのこと


concurrent.futuresはなかなか便利かもしれない

概要

 「いまさら?」と思われるかもしれませんが、concurrent.futuresを使う機会があり、けっこう幸せでした。

 本当に「いまさら?」なのですが、どういうとき便利でどういう風に使えるのか書いておきます。

 リファレンス
concurrent.futures -- 並列タスク実行 — Python 3.8.3rc1 ドキュメント

並列化の処理を向こうでやってくれる

 concurrent.futuresにはThreadPoolExecutorとProcessPoolExecutorの2つのクラスがあります。名前に入っているPoolという文字列から想像できる通り、これらは

  • スレッド/プロセスの束を最初に生成する
  • 作った束に処理を投げつけると適当に割り振ってくれる

 という機能を持ちます。

 これの良いところは、スレッド/プロセスの数をmax_workersで指定すれば、あとはそんなに考えることがないことです。処理待ちのタスクは内部的にキューで管理され、前のタスクが終わって空いたスレッド/プロセスに順次放り込まれます。自分でforkさせて……とかやっていると、たとえばタスクで処理対象になるデータ量に応じてスレッド/プロセスが増えていくようなあほくさい実装になってしまうときがありますが、その辺をよしなに制御してくれるのが強みです。

 まあ、これに関してはもっと昔からあったmultiprocessing.Poolでもできるのですが、次のFutureは更に魅力的です。

Futureはすごい!

 concurrent.futuresにはThreadPoolExecutorとProcessPoolExecutorの2つのクラスがあり、このインスタンスにsubmitメソッドを呼ぶと(呼び方はsubmit(fn, *args, **kwargs)です)Futureオブジェクトが返ってきます。

 Futureオブジェクトの中では勝手に処理を進めてくれていて、処理が終わればfnとして渡した関数の返り値をresultメソッドで取得することができます。Futures.resultメソッド自体は処理が終わっていなければ呼び出し元をブロックしますが、重要なのはresultを呼ぼうが呼ぶまいが、子スレッド/子プロセスはノンブロッキングで勝手に走って勝手に終わるということです。呼び出し元がブロックされようがされまいが、子スレッド/子プロセスは走り出してしまえばあとはフルに動き続けるのが良いところです(処理するタスクがmax_workers以上の数残っている限りは)。

 たとえば、以下のプログラムの処理結果はどのようになるでしょうか? ちょっと先に考えてみてください。printが出てくる順番とタイミング(UNIX時間の下5桁を出しています)に注目です。

import time
import threading
from concurrent.futures import ThreadPoolExecutor

def f(t):
    print(f"{t:2} start {threading.get_ident()} {time.time() % 10 ** 5}")
    time.sleep(t)
    print(f"{t:2} end   {threading.get_ident()} {time.time() % 10 ** 5}")
    return t

with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(f, t) for t in [20, 10, 1, 2, 3, 4]]
    time.sleep(25)
    result = [f.result() for f in futures]
    print(result, time.time() % 10 ** 5)

 正解はこうです。

20 start 140118759839488 40802.507059812546
10 start 140118677518080 40802.507822752
10 end   140118677518080 40812.518659353256
 1 start 140118677518080 40812.52083444595
 1 end   140118677518080 40813.52251338959
 2 start 140118677518080 40813.52270030975
 2 end   140118677518080 40815.52499437332
 3 start 140118677518080 40815.525171756744
 3 end   140118677518080 40818.52837443352
 4 start 140118677518080 40818.528562784195
20 end   140118759839488 40822.52741456032
 4 end   140118677518080 40822.533390283585
[20, 10, 1, 2, 3, 4] 40827.53366971016

 スレッドプールがうまく効いている様子がわかります。また、Futuresを捕まえておけば結果の取得も問題なくできます。

 遅延評価……とはちょっと違いますね。バックグラウンド評価とでも言うべきでしょうか。

 また、submitとは別にmapもあります。こちらは組み込みのmapと同様にジェネレータを返してくれるのが強みです。

import time
import threading
from concurrent.futures import ThreadPoolExecutor

def f(t):
    print(f"{t:2} start {threading.get_ident()} {time.time() % 10 ** 5}")
    time.sleep(t)
    print(f"{t:2} end   {threading.get_ident()} {time.time() % 10 ** 5}")
    return t


with ThreadPoolExecutor(max_workers=2) as executor:
    mapgen = executor.map(f, [20, 10, 1, 2, 3, 4])
    time.sleep(25)
    print(mapgen)
    result = list(mapgen)
    print(result, time.time() % 10 ** 5)


""" =>
20 start 139664387626752 41522.07571768761
10 start 139664378906368 41522.0759935379
10 end   139664378906368 41532.08786392212
 1 start 139664378906368 41532.08807730675
 1 end   139664378906368 41533.08958363533
 2 start 139664378906368 41533.08977293968
 2 end   139664378906368 41535.09212422371
 3 start 139664378906368 41535.092309713364
 3 end   139664378906368 41538.09562587738
 4 start 139664378906368 41538.09581398964
20 end   139664387626752 41542.09483766556
 4 end   139664378906368 41542.10075592995
<generator object Executor.map.<locals>.result_iterator at 0x7f062a5ea258>
[20, 10, 1, 2, 3, 4] 41547.102033138275
"""

 中身はFutureオブジェクトのlistで、イテレートするとresultメソッドを呼びながら結果を返してくれると考えると理解しやすい気がします。

パフォーマンスがよくなる

 ThreadPoolExecutorは並列処理といってもPythonのGILに縛られていますから、Pythonプログラムをマルチコアで並列化して高速化することはできません。ただ、だからThreadPoolExecutorが駄目なのかというとそうではなくて、

  • I/Oやネットワークの待ちがある
  • 子プロセスの待ちがある

 など、Pythonの処理の外側で時間がかかっているときはちゃんと威力を発揮してくれます。子プロセスの場合は結果的にマルチコア並列化になります(かなり特殊な状況ですが、私はこれをやりました)。

 インターフェースは互換なのでProcessPoolExecutorでも同じことができる訳ですが、スレッド並列のいいところはメモリ空間を共有することです。プロセス立ち上げ・プロセス間通信のオーバーヘッドとは無縁です(Pythonのmultiprocessingはpickle化してデータを送る実装で、これがまたかなり遅い)。しかもスレッド間で共有すべきリソースがあれば、グローバル変数に置いておけばそのまま使えます(ただし同じデータに複数スレッドからアクセスする場合、状況によっては同期処理を考慮する必要はある。threadingを使って書く)。また、スレッドごとに継続して情報やリソースを持たせておくこともできます。

 参考:
www.haya-programming.com


 そしてProcessPoolExecutorですが、こちらはなんといってもプロセス並列ができ、処理の高速化に寄与します。ただ、multiprocessingの仕組みを理解して慣れておいた方が良さげではあります。オーバーヘッドも大きいし、微妙に癖があったりするので、活用するには慣れが必要です。それでも素のPythonではどうあがいても実現できないマルチコア並列化をPythonの枠組みの中でやれるということは素晴らしく、安直な高速化が可能です。もちろん何をどう並列化するかはよく考えておかないといけないのですが。

プロセス並列ではmp_contextも指定できる

 プロセス並列化を行う際には、mp_contextというオプションを渡すことができます。これはmultiprocessing.get_contextで取得可能です。

 実用的な使い方は、プロセス生成に使う方法をforkやspawnなどの間で切り替えることでしょう。経験的には、メモリ消費が大きいけど速いfork、省メモリだけど遅いspawnといった関係があり、状況に応じて簡単に切り替えられるのは便利です。

地味に嬉しいshutdown・with文との組み合わせ

 自分でthreadingやmultiprocessingを書くと、使い終わったスレッド/プロセスの終了がけっこう手間で、しかも失敗しがちです。特にプロセスが生き残ると、メモリを無駄に食ったりしていいことがありません。

 その辺を深く考えなくてもよしなにやってくれるのがshutdownメソッドで、単純に結果を取ってから呼べばいいのです。

 もっと嬉しいことに、Executorはwith文と組み合わせて使うことができ、そうするとwith文を抜けるとき勝手にshutdownしてくれます。open関数のようなものです。便利です。

まとめ

 非同期処理をしたいけどasyncioはとっつきづらいしとか、マルチコアを活かした並列化を書きたいけどmultiprocessingを直接叩くのはしんどいしとか、そういう状況ではだいたいこれを使っておけばいいだろうという性能になっています。しかもスレッド/プロセスプールは無駄なく実行されるので、自分で書いて最適化するより効率が良かったりします。並列化したいときに最初に検討するのはこれ、くらいの扱いでいいですね。