静かなる名辞

pythonとプログラミングのこと


ThreadPoolExecutorのinitializerについて調べたのでメモ

概要

 ThreadPoolExecutorにはinitializerという便利そうなオプションがあります。でもリファレンスの説明があっさりしていて、挙動がよくわからなかったので調べました。

 先に断っておくと、このオプションはPython3.7で追加されたもので、それ以前のバージョンでは存在しません。その場合の代替案も書いておくので参考にしてください。

はじめに

 とりあえず先に書いておくと、concurrent.futures.ThreadPoolExecutorは以下のように使えるものです。

import time
import threading
from concurrent.futures import ThreadPoolExecutor

def job(t):
    print("in job:", t, int(time.time()) % 1000, threading.get_ident())
    time.sleep(t)
    return t

print(threading.get_ident())
with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(job, i) for i in range(6)]
    result = [f.result() for f in futures]
    
    print("finished:", int(time.time()) % 1000)
    print(result)

""" =>
140248999868224
in job: 0 233 140248964409088
in job: 1 233 140248964409088
in job: 2 233 140248955684608
in job: 3 234 140248964409088
in job: 4 235 140248955684608
in job: 5 237 140248964409088
finished: 242
[0, 1, 2, 3, 4, 5]
"""

 他の使い方もありますが、今回はこれしか検討しません。順番通りに結果が得られた方が都合が良いことの方が多いでしょう。

 リファレンス
concurrent.futures -- 並列タスク実行 — Python 3.8.2 ドキュメント

 表示されているUNIX時間の下3桁と、threading.get_identで取得されているスレッドのIDに注目しておいてください。親スレッドと、他に子スレッド2つが存在していることがわかります。ThreadPoolExecutorなので子スレッドは2つっきりで、同じスレッドが最初から最後まで使いまわされます。submitされたものはキューに突っ込まれて、空いているスレッドに放り込まれて終了までそのスレッドで実行されると考えるとわかりやすいでしょう。

 なお、後述するinitializerはget_identでIDを取らないと基本的に役に立たないと思います。

initializerを使う

 initializerはスレッド開始時の処理を書いておくと良いようです。

 ちょっと改造したプログラムです。

import time
import threading
from concurrent.futures import ThreadPoolExecutor

def initializer():
    print("in init", threading.get_ident())

def job(t):
    print("in job:", t, int(time.time()) % 1000, threading.get_ident())
    time.sleep(t)
    return t

print(threading.get_ident())
with ThreadPoolExecutor(max_workers=2, initializer=initializer) as executor:
    futures = [executor.submit(job, i) for i in range(6)]
    result = [f.result() for f in futures]
    
    print("finished:", int(time.time()) % 1000)
    print(result)

""" =>
140134438733632
in init 140134403274496
in job: 0 369 140134403274496
in init 140134394550016
in job: 1 369 140134403274496
in job: 2 369 140134394550016
in job: 3 370 140134403274496
in job: 4 371 140134394550016
in job: 5 373 140134403274496
finished: 378
[0, 1, 2, 3, 4, 5]
"""

 スレッド開始時に一回だけ呼ばれている様子がここからわかります。

 そうは言ってもどう使うの? と思う人もいるかもしれませんが、意外と使いではあります。「スレッドごとに一回だけ最初にやれば後は使いまわせるが、最初の1回にはそこそこ時間がかかる」ような処理を考えてみます。たとえばI/OとかDBのコネクションを作るのに時間がかかる、みたいなシチュエーションでしょうか。

 問題は、initializerはtargetとスコープを共有してくれたりはしない(そんな特殊なことはできるはずもない)ので、いまいち使いづらいことです。けっきょく自分でリソース管理を書かないといけません。といっても、GILがあるので、スレッドごとにリソースを確保して互いに触らない、という条件なら同期を考える必要まではありません。スレッドごとに独立にリソースアクセスできるようにグローバル変数でdictを置いておけば十分でしょう。

import time
import random
import threading
from concurrent.futures import ThreadPoolExecutor

resources = dict()
def initializer():
    id_ = threading.get_ident()
    resources[id_] = random.random()
    print("in", id_, ", its resource is ", resources[id_])
    
def job(t):
    print(threading.get_ident(), resources[threading.get_ident()])
    time.sleep(2)
    return t

print(threading.get_ident())
with ThreadPoolExecutor(max_workers=2, initializer=initializer) as executor:
    futures = [executor.submit(job, i) for i in range(6)]
    result = [f.result() for f in futures]
    
    print("finished:", int(time.time()) % 1000)
    print(result)


""" =>
140374921525056
in 140374878914304 , its resource is  0.06718002352497798
140374878914304 0.06718002352497798
in 140374870189824 , its resource is  0.4636817738099904
140374870189824 0.4636817738099904
140374878914304 0.06718002352497798
140374870189824 0.4636817738099904
140374878914304 0.06718002352497798
140374870189824 0.4636817738099904
finished: 781
[0, 1, 2, 3, 4, 5]
"""

代替案

 普通にtarget先頭でif使えばいいのでは・・・

import time
import random
import threading
from concurrent.futures import ThreadPoolExecutor

resources = dict()    
def job(t):
    id_ = threading.get_ident()
    if id_ not in resources:
        resources[id_] = random.random()

    print(id_, int(time.time()) % 1000, resources[id_])
    time.sleep(2)
    return t

print(threading.get_ident())
with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(job, i) for i in range(6)]
    result = [f.result() for f in futures]
    
    print("finished:", int(time.time()) % 1000)
    print(result)


""" =>
140465058035520
140465015424768 933 0.8531529694670728
140465006700288 933 0.279548446733512
140465015424768 935 0.8531529694670728
140465006700288 935 0.279548446733512
140465015424768 937 0.8531529694670728
140465006700288 937 0.279548446733512
finished: 939
[0, 1, 2, 3, 4, 5]
"""

 最初の一回だけ実行されればいいので、なければないでなんとかなります。initializer使うとちょっと見通しが良いかな?

まとめ

 なんかあってもなくても良いような気がしてきましたが、とにかくこんな感じで使えはします。

 どちらかというと同じインターフェースのProcessPoolExecutorの方が使いでがあるかもしれません。プロセス内のグローバル変数として確保すれば素直に使えるはずだからです。インターフェースを揃えないといけないのでThreadPoolExecutorにもつけたとか・・・

 もっと他に有益な使い方がある、ということを知っている人は教えてください。