静かなる名辞

pythonとプログラミングのこと


numbaとnumpy.emptyでbool配列が作れないとき

 タイトル通りのことをやろうとして、なんかエラーになったんですよね。

import numpy as np
from numba import jit

@jit("b1[:]()", nopython=True)
def f():
    a = np.empty(100, np.bool)
    return a

f()

 動きそうに見えますが、

Traceback (most recent call last):
  File "filename.py", line 4, in <module>
    @jit("b1[:]()", nopython=True)
  File "/*/site-packages/numba/decorators.py", line 200, in wrapper
    disp.compile(sig)
  File "/*/site-packages/numba/compiler_lock.py", line 32, in _acquire_compile_lock
    return func(*args, **kwargs)
  File "/*/site-packages/numba/dispatcher.py", line 768, in compile
    cres = self._compiler.compile(args, return_type)
  File "/*/site-packages/numba/dispatcher.py", line 81, in compile
    raise retval
  File "/*/site-packages/numba/dispatcher.py", line 91, in _compile_cached
    retval = self._compile_core(args, return_type)
  File "/*/site-packages/numba/dispatcher.py", line 109, in _compile_core
    pipeline_class=self.pipeline_class)
  File "/*/site-packages/numba/compiler.py", line 551, in compile_extra
    return pipeline.compile_extra(func)
  File "/*/site-packages/numba/compiler.py", line 331, in compile_extra
    return self._compile_bytecode()
  File "/*/site-packages/numba/compiler.py", line 393, in _compile_bytecode
    return self._compile_core()
  File "/*/site-packages/numba/compiler.py", line 373, in _compile_core
    raise e
  File "/*/site-packages/numba/compiler.py", line 364, in _compile_core
    pm.run(self.state)
  File "/*/site-packages/numba/compiler_machinery.py", line 347, in run
    raise patched_exception
  File "/*/site-packages/numba/compiler_machinery.py", line 338, in run
    self._runPass(idx, pass_inst, state)
  File "/*/site-packages/numba/compiler_lock.py", line 32, in _acquire_compile_lock
    return func(*args, **kwargs)
  File "/*/site-packages/numba/compiler_machinery.py", line 302, in _runPass
    mutated |= check(pss.run_pass, internal_state)
  File "/*/site-packages/numba/compiler_machinery.py", line 275, in check
    mangled = func(compiler_state)
  File "/*/site-packages/numba/typed_passes.py", line 95, in run_pass
    raise_errors=self._raise_errors)
  File "/*/site-packages/numba/typed_passes.py", line 67, in type_inference_stage
    infer.propagate(raise_errors=raise_errors)
  File "/*/site-packages/numba/typeinfer.py", line 985, in propagate
    raise errors[0]
numba.errors.TypingError: Failed in nopython mode pipeline (step: nopython frontend)
Invalid use of Function(<built-in function empty>) with argument(s) of type(s): (Literal[int](100), Function(<class 'bool'>))
 * parameterized
In definition 0:
    All templates rejected with literals.
In definition 1:
    All templates rejected without literals.
This error is usually caused by passing an argument of a type that is unsupported by the named function.
[1] During: resolving callee type: Function(<built-in function empty>)
[2] During: typing of call at filename.py (6)


File "filename.py", line 6:
def f():
    a = np.empty(100, np.bool)
    ^

 ふざけたエラーが出ました。

 ググって解決しました。

Related to #1311. np.bool_ works as expected.
inference failure with np.empty on bool type · Issue #2529 · numba/numba · GitHub

 代わりにnp.bool_を使えばいいらしいです。馬鹿馬鹿しいですね。numbaの開発的には直すつもりはなさそうです。

@jit("b1[:]()", nopython=True)
def f():
    a = np.empty(100, np.bool_)
    return a

 こうするとちゃんと動きました。

 bool型のdtypeを指定したいときとかは気をつけましょう。

concurrent.futuresはなかなか便利かもしれない

概要

 「いまさら?」と思われるかもしれませんが、concurrent.futuresを使う機会があり、けっこう幸せでした。

 本当に「いまさら?」なのですが、どういうとき便利でどういう風に使えるのか書いておきます。

 リファレンス
concurrent.futures -- 並列タスク実行 — Python 3.8.3rc1 ドキュメント

並列化の処理を向こうでやってくれる

 concurrent.futuresにはThreadPoolExecutorとProcessPoolExecutorの2つのクラスがあります。名前に入っているPoolという文字列から想像できる通り、これらは

  • スレッド/プロセスの束を最初に生成する
  • 作った束に処理を投げつけると適当に割り振ってくれる

 という機能を持ちます。

 これの良いところは、スレッド/プロセスの数をmax_workersで指定すれば、あとはそんなに考えることがないことです。処理待ちのタスクは内部的にキューで管理され、前のタスクが終わって空いたスレッド/プロセスに順次放り込まれます。自分でforkさせて……とかやっていると、たとえばタスクで処理対象になるデータ量に応じてスレッド/プロセスが増えていくようなあほくさい実装になってしまうときがありますが、その辺をよしなに制御してくれるのが強みです。

 まあ、これに関してはもっと昔からあったmultiprocessing.Poolでもできるのですが、次のFutureは更に魅力的です。

Futureはすごい!

 concurrent.futuresにはThreadPoolExecutorとProcessPoolExecutorの2つのクラスがあり、このインスタンスにsubmitメソッドを呼ぶと(呼び方はsubmit(fn, *args, **kwargs)です)Futureオブジェクトが返ってきます。

 Futureオブジェクトの中では勝手に処理を進めてくれていて、処理が終わればfnとして渡した関数の返り値をresultメソッドで取得することができます。Futures.resultメソッド自体は処理が終わっていなければ呼び出し元をブロックしますが、重要なのはresultを呼ぼうが呼ぶまいが、子スレッド/子プロセスはノンブロッキングで勝手に走って勝手に終わるということです。呼び出し元がブロックされようがされまいが、子スレッド/子プロセスは走り出してしまえばあとはフルに動き続けるのが良いところです(処理するタスクがmax_workers以上の数残っている限りは)。

 たとえば、以下のプログラムの処理結果はどのようになるでしょうか? ちょっと先に考えてみてください。printが出てくる順番とタイミング(UNIX時間の下5桁を出しています)に注目です。

import time
import threading
from concurrent.futures import ThreadPoolExecutor

def f(t):
    print(f"{t:2} start {threading.get_ident()} {time.time() % 10 ** 5}")
    time.sleep(t)
    print(f"{t:2} end   {threading.get_ident()} {time.time() % 10 ** 5}")
    return t

with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(f, t) for t in [20, 10, 1, 2, 3, 4]]
    time.sleep(25)
    result = [f.result() for f in futures]
    print(result, time.time() % 10 ** 5)

 正解はこうです。

20 start 140118759839488 40802.507059812546
10 start 140118677518080 40802.507822752
10 end   140118677518080 40812.518659353256
 1 start 140118677518080 40812.52083444595
 1 end   140118677518080 40813.52251338959
 2 start 140118677518080 40813.52270030975
 2 end   140118677518080 40815.52499437332
 3 start 140118677518080 40815.525171756744
 3 end   140118677518080 40818.52837443352
 4 start 140118677518080 40818.528562784195
20 end   140118759839488 40822.52741456032
 4 end   140118677518080 40822.533390283585
[20, 10, 1, 2, 3, 4] 40827.53366971016

 スレッドプールがうまく効いている様子がわかります。また、Futuresを捕まえておけば結果の取得も問題なくできます。

 遅延評価……とはちょっと違いますね。バックグラウンド評価とでも言うべきでしょうか。

 また、submitとは別にmapもあります。こちらは組み込みのmapと同様にジェネレータを返してくれるのが強みです。

import time
import threading
from concurrent.futures import ThreadPoolExecutor

def f(t):
    print(f"{t:2} start {threading.get_ident()} {time.time() % 10 ** 5}")
    time.sleep(t)
    print(f"{t:2} end   {threading.get_ident()} {time.time() % 10 ** 5}")
    return t


with ThreadPoolExecutor(max_workers=2) as executor:
    mapgen = executor.map(f, [20, 10, 1, 2, 3, 4])
    time.sleep(25)
    print(mapgen)
    result = list(mapgen)
    print(result, time.time() % 10 ** 5)


""" =>
20 start 139664387626752 41522.07571768761
10 start 139664378906368 41522.0759935379
10 end   139664378906368 41532.08786392212
 1 start 139664378906368 41532.08807730675
 1 end   139664378906368 41533.08958363533
 2 start 139664378906368 41533.08977293968
 2 end   139664378906368 41535.09212422371
 3 start 139664378906368 41535.092309713364
 3 end   139664378906368 41538.09562587738
 4 start 139664378906368 41538.09581398964
20 end   139664387626752 41542.09483766556
 4 end   139664378906368 41542.10075592995
<generator object Executor.map.<locals>.result_iterator at 0x7f062a5ea258>
[20, 10, 1, 2, 3, 4] 41547.102033138275
"""

 中身はFutureオブジェクトのlistで、イテレートするとresultメソッドを呼びながら結果を返してくれると考えると理解しやすい気がします。

パフォーマンスがよくなる

 ThreadPoolExecutorは並列処理といってもPythonのGILに縛られていますから、Pythonプログラムをマルチコアで並列化して高速化することはできません。ただ、だからThreadPoolExecutorが駄目なのかというとそうではなくて、

  • I/Oやネットワークの待ちがある
  • 子プロセスの待ちがある

 など、Pythonの処理の外側で時間がかかっているときはちゃんと威力を発揮してくれます。子プロセスの場合は結果的にマルチコア並列化になります(かなり特殊な状況ですが、私はこれをやりました)。

 インターフェースは互換なのでProcessPoolExecutorでも同じことができる訳ですが、スレッド並列のいいところはメモリ空間を共有することです。プロセス立ち上げ・プロセス間通信のオーバーヘッドとは無縁です(Pythonのmultiprocessingはpickle化してデータを送る実装で、これがまたかなり遅い)。しかもスレッド間で共有すべきリソースがあれば、グローバル変数に置いておけばそのまま使えます(ただし同じデータに複数スレッドからアクセスする場合、状況によっては同期処理を考慮する必要はある。threadingを使って書く)。また、スレッドごとに継続して情報やリソースを持たせておくこともできます。

 参考:
www.haya-programming.com


 そしてProcessPoolExecutorですが、こちらはなんといってもプロセス並列ができ、処理の高速化に寄与します。ただ、multiprocessingの仕組みを理解して慣れておいた方が良さげではあります。オーバーヘッドも大きいし、微妙に癖があったりするので、活用するには慣れが必要です。それでも素のPythonではどうあがいても実現できないマルチコア並列化をPythonの枠組みの中でやれるということは素晴らしく、安直な高速化が可能です。もちろん何をどう並列化するかはよく考えておかないといけないのですが。

プロセス並列ではmp_contextも指定できる

 プロセス並列化を行う際には、mp_contextというオプションを渡すことができます。これはmultiprocessing.get_contextで取得可能です。

 実用的な使い方は、プロセス生成に使う方法をforkやspawnなどの間で切り替えることでしょう。経験的には、メモリ消費が大きいけど速いfork、省メモリだけど遅いspawnといった関係があり、状況に応じて簡単に切り替えられるのは便利です。

地味に嬉しいshutdown・with文との組み合わせ

 自分でthreadingやmultiprocessingを書くと、使い終わったスレッド/プロセスの終了がけっこう手間で、しかも失敗しがちです。特にプロセスが生き残ると、メモリを無駄に食ったりしていいことがありません。

 その辺を深く考えなくてもよしなにやってくれるのがshutdownメソッドで、単純に結果を取ってから呼べばいいのです。

 もっと嬉しいことに、Executorはwith文と組み合わせて使うことができ、そうするとwith文を抜けるとき勝手にshutdownしてくれます。open関数のようなものです。便利です。

まとめ

 非同期処理をしたいけどasyncioはとっつきづらいしとか、マルチコアを活かした並列化を書きたいけどmultiprocessingを直接叩くのはしんどいしとか、そういう状況ではだいたいこれを使っておけばいいだろうという性能になっています。しかもスレッド/プロセスプールは無駄なく実行されるので、自分で書いて最適化するより効率が良かったりします。並列化したいときに最初に検討するのはこれ、くらいの扱いでいいですね。

ThreadPoolExecutorのinitializerについて調べたのでメモ

概要

 ThreadPoolExecutorにはinitializerという便利そうなオプションがあります。でもリファレンスの説明があっさりしていて、挙動がよくわからなかったので調べました。

 先に断っておくと、このオプションはPython3.7で追加されたもので、それ以前のバージョンでは存在しません。その場合の代替案も書いておくので参考にしてください。

はじめに

 とりあえず先に書いておくと、concurrent.futures.ThreadPoolExecutorは以下のように使えるものです。

import time
import threading
from concurrent.futures import ThreadPoolExecutor

def job(t):
    print("in job:", t, int(time.time()) % 1000, threading.get_ident())
    time.sleep(t)
    return t

print(threading.get_ident())
with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(job, i) for i in range(6)]
    result = [f.result() for f in futures]
    
    print("finished:", int(time.time()) % 1000)
    print(result)

""" =>
140248999868224
in job: 0 233 140248964409088
in job: 1 233 140248964409088
in job: 2 233 140248955684608
in job: 3 234 140248964409088
in job: 4 235 140248955684608
in job: 5 237 140248964409088
finished: 242
[0, 1, 2, 3, 4, 5]
"""

 他の使い方もありますが、今回はこれしか検討しません。順番通りに結果が得られた方が都合が良いことの方が多いでしょう。

 リファレンス
concurrent.futures -- 並列タスク実行 — Python 3.8.2 ドキュメント

 表示されているUNIX時間の下3桁と、threading.get_identで取得されているスレッドのIDに注目しておいてください。親スレッドと、他に子スレッド2つが存在していることがわかります。ThreadPoolExecutorなので子スレッドは2つっきりで、同じスレッドが最初から最後まで使いまわされます。submitされたものはキューに突っ込まれて、空いているスレッドに放り込まれて終了までそのスレッドで実行されると考えるとわかりやすいでしょう。

 なお、後述するinitializerはget_identでIDを取らないと基本的に役に立たないと思います。

initializerを使う

 initializerはスレッド開始時の処理を書いておくと良いようです。

 ちょっと改造したプログラムです。

import time
import threading
from concurrent.futures import ThreadPoolExecutor

def initializer():
    print("in init", threading.get_ident())

def job(t):
    print("in job:", t, int(time.time()) % 1000, threading.get_ident())
    time.sleep(t)
    return t

print(threading.get_ident())
with ThreadPoolExecutor(max_workers=2, initializer=initializer) as executor:
    futures = [executor.submit(job, i) for i in range(6)]
    result = [f.result() for f in futures]
    
    print("finished:", int(time.time()) % 1000)
    print(result)

""" =>
140134438733632
in init 140134403274496
in job: 0 369 140134403274496
in init 140134394550016
in job: 1 369 140134403274496
in job: 2 369 140134394550016
in job: 3 370 140134403274496
in job: 4 371 140134394550016
in job: 5 373 140134403274496
finished: 378
[0, 1, 2, 3, 4, 5]
"""

 スレッド開始時に一回だけ呼ばれている様子がここからわかります。

 そうは言ってもどう使うの? と思う人もいるかもしれませんが、意外と使いではあります。「スレッドごとに一回だけ最初にやれば後は使いまわせるが、最初の1回にはそこそこ時間がかかる」ような処理を考えてみます。たとえばI/OとかDBのコネクションを作るのに時間がかかる、みたいなシチュエーションでしょうか。

 問題は、initializerはtargetとスコープを共有してくれたりはしない(そんな特殊なことはできるはずもない)ので、いまいち使いづらいことです。けっきょく自分でリソース管理を書かないといけません。といっても、GILがあるので、スレッドごとにリソースを確保して互いに触らない、という条件なら同期を考える必要まではありません。スレッドごとに独立にリソースアクセスできるようにグローバル変数でdictを置いておけば十分でしょう。

import time
import random
import threading
from concurrent.futures import ThreadPoolExecutor

resources = dict()
def initializer():
    id_ = threading.get_ident()
    resources[id_] = random.random()
    print("in", id_, ", its resource is ", resources[id_])
    
def job(t):
    print(threading.get_ident(), resources[threading.get_ident()])
    time.sleep(2)
    return t

print(threading.get_ident())
with ThreadPoolExecutor(max_workers=2, initializer=initializer) as executor:
    futures = [executor.submit(job, i) for i in range(6)]
    result = [f.result() for f in futures]
    
    print("finished:", int(time.time()) % 1000)
    print(result)


""" =>
140374921525056
in 140374878914304 , its resource is  0.06718002352497798
140374878914304 0.06718002352497798
in 140374870189824 , its resource is  0.4636817738099904
140374870189824 0.4636817738099904
140374878914304 0.06718002352497798
140374870189824 0.4636817738099904
140374878914304 0.06718002352497798
140374870189824 0.4636817738099904
finished: 781
[0, 1, 2, 3, 4, 5]
"""

代替案

 普通にtarget先頭でif使えばいいのでは・・・

import time
import random
import threading
from concurrent.futures import ThreadPoolExecutor

resources = dict()    
def job(t):
    id_ = threading.get_ident()
    if id_ not in resources:
        resources[id_] = random.random()

    print(id_, int(time.time()) % 1000, resources[id_])
    time.sleep(2)
    return t

print(threading.get_ident())
with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(job, i) for i in range(6)]
    result = [f.result() for f in futures]
    
    print("finished:", int(time.time()) % 1000)
    print(result)


""" =>
140465058035520
140465015424768 933 0.8531529694670728
140465006700288 933 0.279548446733512
140465015424768 935 0.8531529694670728
140465006700288 935 0.279548446733512
140465015424768 937 0.8531529694670728
140465006700288 937 0.279548446733512
finished: 939
[0, 1, 2, 3, 4, 5]
"""

 最初の一回だけ実行されればいいので、なければないでなんとかなります。initializer使うとちょっと見通しが良いかな?

まとめ

 なんかあってもなくても良いような気がしてきましたが、とにかくこんな感じで使えはします。

 どちらかというと同じインターフェースのProcessPoolExecutorの方が使いでがあるかもしれません。プロセス内のグローバル変数として確保すれば素直に使えるはずだからです。インターフェースを揃えないといけないのでThreadPoolExecutorにもつけたとか・・・

 もっと他に有益な使い方がある、ということを知っている人は教えてください。

Pythonで遅いサブプロセスをスレッド並列でたくさん叩く

概要

 いつ使うんだと言われてしまうPythonのスレッドですが、Pythonの外で遅い原因があるときは高速化に威力を発揮します。

 たとえばこんな感じです。言語はbashです。

#!/bin/bash

sleep 3
echo "hoge"

 特にひねりはありません。slow_command.shとでもして保存しておきます。

 Pythonから呼ぶと、

import subprocess

for _ in range(5):
    result = subprocess.run(["./slow_command.sh"], stdout=subprocess.PIPE)
    print(result.stdout.decode())

 いい加減ですがこんな感じでしょうか。当たり前のように15秒かけて実行してくれます。

ThreadPoolExecutorを使う

  • CPUバウンドではない
  • 遅いのはPythonではない

 といった条件が揃っているので、Pythonのスレッドを使う良い機会です。

 threading直叩きはこの場合つらいものがあるので、concurrent.futures.ThreadPoolExecutorを使います。

concurrent.futures -- 並列タスク実行 — Python 3.8.2 ドキュメント

import subprocess
from concurrent.futures import ThreadPoolExecutor

def f():
    result = subprocess.run(["./slow_command.sh"], stdout=subprocess.PIPE)
    return result.stdout.decode()

with ThreadPoolExecutor(max_workers=5) as pool:
    futures = [pool.submit(f) for _ in range(5)]
    for f in futures:
        print(f.result())

 今度はちゃんと5秒で終わります。

asyncio

 asyncioはこれまでとっつきづらい気がしてほとんど触ってこなかったのですが、やればできるんじゃないでしょうか。

python - How to use asyncio with existing blocking library? - Stack Overflow

 まだ試していないので、そのうち試してみて記事をアップデートします(か、別記事になるかもしれない)。

使いみち

 遅いサブプロセスの結果を大量に受けるときはありです。

 スレッド並列といってもけっきょく子プロセスが生えてしまうので、ワーカーの数には気をつけた方が良いでしょう(同時に子プロセス生やしすぎてメモリエラー、とか)。

Pythonの文字列は同じ長さでもメモリ消費量が違うときがある

概要

 Pythonの文字列は、内容によって一文字の幅が違います。

 なお、Python3のstrを前提にさせてください。

実験

 sys.getsizeofで測ってみます。これを使うのはちょっと議論の余地がありますが、

object のサイズをバイト数で返します。object は任意の型のオブジェクトです。すべての組み込みオブジェクトは正しい値を返します。サードパーティー製の型については実装依存になります。
オブジェクトに直接起因するメモリ消費のみを表し、参照するオブジェクトは含みません。
sys --- システムパラメータと関数 — Python 3.8.2 ドキュメント

 とりあえず信じていいと信じます。

>>> import sys
>>> sys.getsizeof("hogefuga")
57
>>> sys.getsizeof("ほげふが")
82

 なんかすでにヘンなことになっています。

>>> len("hogefuga")
8
>>> len("ほげふが")
4

 あ、lenが違いました。

>>> sys.getsizeof("ほげふが"*2)
90

 ひらがなで4文字増えて、さっきの82バイトから8バイト増えたので、1文字2バイトで持っている訳です。

 ASCIIのhogefugaでも同じ要領で1文字のメモリ消費量を測ってみましょう。

>>> sys.getsizeof("hogefuga"*2)
65

 (65-57)/8で、1文字1バイトです。

理由

 まあ、こういう仕様が合理的というのは誰でも理解はできるのではないかと思います。単バイト文字なのかマルチバイト文字なのかによって扱いが変わる訳です。

 リファレンスにも記述があります。「Python/C API リファレンスマニュアル」という、普段馴染みのない方に書いてありますが。

Python3.3 の PEP 393 実装から、メモリ効率を維持しながらUnicode文字の完全な範囲を扱えるように、Unicodeオブジェクトは内部的に多様な表現形式を用いています。

Unicode オブジェクトと codec — Python 3.8.2 ドキュメント

 内部表現は1バイトか2バイトか4バイトのどれかです! ASCIIなら1バイト、日本語はだいたい2バイトでしょうね。他の言語とか絵文字記号の類はきっと4バイトもあり得るのでしょう。

 内部でうまく処理するには1文字の長さが固定長でないとめちゃくちゃ都合が悪く、かといって32bitに揃えるのもメモリ効率が悪いので(というかASCII主義のアルファベット話者から見たら無駄なので)、こういう策を取っているのだと思います。詳しくは調査していませんが。

 Pythonの文字列はimmutableなので、これで困らない訳です。操作するときに適切に扱う処理を、組み込みで実装してくれているから。

こうなっているから起こること

 表現できる文字種に応じて内部表現が決まるので、ASCIIの長い文にマルチバイト文字1つ付け足すと内部表現の大きさがほぼ二倍になったりします。

>>> sys.getsizeof("hogefuga"*1000)
8049
>>> sys.getsizeof("hogefuga"*1000 + "あ")
16076

 異なる内部表現の文字列同士を結合すると、当然大きい方に揃えられます。処理速度に差が出るかもしれません。

>>> import timeit
>>> a = "hoge" * 1000
>>> b = "fuga" * 1000
>>> timeit.timeit(lambda : a + b)
0.4121553519998997
>>> c = "ぴよ" * 1000
>>> timeit.timeit(lambda : a + c)
1.6905145230000471

 出ました。

結論

 普通にPythonを使う分には気にする必要はまったくない事柄なのですが、知っておくと文字列がちょっと思っていたのと違うものに見えてくるときがあるかもしれないと思いました。

Pythonの文字列メソッドとバイト列の微妙な関係

 Python3になってからは普段あまり気にしなくても良いようになりましたが、Pythonの文字列っぽい型にはstrとbytesがあります*1。そして、strもbytesも同じようなメソッドを実装してくれています。

組み込み型 — Python 3.8.2 ドキュメント

組み込み型 — Python 3.8.2 ドキュメント

>>> [x for x in dir(str) if "__" not in x] 
['capitalize', 'casefold', 'center', 'count', 'encode', 'endswith', 'expandtabs', 'find', 'format', 'format_map', 'index', 'isalnum', 'isalpha', 'isdecimal', 'isdigit', 'isidentifier', 'islower', 'isnumeric', 'isprintable', 'isspace', 'istitle', 'isupper', 'join', 'ljust', 'lower', 'lstrip', 'maketrans', 'partition', 'replace', 'rfind', 'rindex', 'rjust', 'rpartition', 'rsplit', 'rstrip', 'split', 'splitlines', 'startswith', 'strip', 'swapcase', 'title', 'translate', 'upper', 'zfill']
>>> [x for x in dir(bytes) if "__" not in x] 
['capitalize', 'center', 'count', 'decode', 'endswith', 'expandtabs', 'find', 'fromhex', 'hex', 'index', 'isalnum', 'isalpha', 'isdigit', 'islower', 'isspace', 'istitle', 'isupper', 'join', 'ljust', 'lower', 'lstrip', 'maketrans', 'partition', 'replace', 'rfind', 'rindex', 'rjust', 'rpartition', 'rsplit', 'rstrip', 'split', 'splitlines', 'startswith', 'strip', 'swapcase', 'title', 'translate', 'upper', 'zfill']

 完全に互換という訳でもないようですが、それでもだいたい同じ操作がサポートされています。

 問題は、strとbytesを混在させて使えないことです。

 splitで見てみます。

>>> "a,b,c".split(b",")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: Can't convert 'bytes' object to str implicitly
>>> b"a,b,c".split(",")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: a bytes-like object is required, not 'str'

 エラーを回避するためには、型を合わせる必要があります。strのメソッドならstrを、bytesのメソッドならbytesを渡してあげてください。

>>> "a,b,c".split(",")
['a', 'b', 'c']
>>> b"a,b,c".split(b",")
[b'a', b'b', b'c']

 駄目押しで、joinでも見てみます。

>>> ",".join([b'a', b'b', b'c'])
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: sequence item 0: expected str instance, bytes found
>>> b",".join(['a', 'b', 'c'])
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: sequence item 0: expected a bytes-like object, str found

 やっぱりできません。

 逆にこの点に注意しておけば、bytesのまま扱うことも可能ですが、実際問題としてはなかなか厄介です。素直に入力された時点でstrにデコードしてしまうことをおすすめします。str→bytes変換はencode, 逆はdecodeメソッドです。

*1:すべてPython3準拠で説明します。さすがにPython2はもういいでしょ

DataFrameをprintしたときヘッダの日本語の列名がずれないようにする

 日本語の列名のDataFrameを扱うことは、日本人のpandasユーザにとってはありがちな展開だと思うのですが、問題はprintするとヘッダがずれてしまうことです。

>>> import pandas as pd
>>> pd.DataFrame({"あああ":[1, 2], "いいい":[3, 4], "ううう":[5, 6], "えええ":[7, 8]})
   あああ  いいい  ううう  えええ
0    1    3    5    7
1    2    4    6    8

 ASCIIの等幅文字を前提にしているからこんなものなのだろうと諦めていましたが、実はよしなに表示するオプションがありました。teratailでこの件の質問を見かけて、ふと久々にリファレンスを見たら見つけました。こういうことがあるからQAサイトは楽しいのです。

Some East Asian countries use Unicode characters whose width corresponds to two Latin characters. If a DataFrame or Series contains these characters, the default output mode may not align them properly.
(中略)
Enabling display.unicode.east_asian_width allows pandas to check each character’s “East Asian Width” property. These characters can be aligned properly by setting this option to True. However, this will result in longer render times than the standard len function.
Options and settings — pandas 1.0.3 documentation

 うまく表示してもらうためには、

pd.set_option('display.unicode.east_asian_width', True)

 を打ちます。東アジアで使われている文字幅で表示してあげますよというオプション名。要するに我々のためにあるようなものなので、ありがたく使わせてもらいましょう。

 このようにうまくいきます。

>>> pd.set_option('display.unicode.east_asian_width', True)
>>> pd.DataFrame({"あああ":[1, 2], "いいい":[3, 4], "ううう":[5, 6], "えええ":[7, 8]})
   あああ  いいい  ううう  えええ
0       1       3       5       7
1       2       4       6       8

(といいつつ、ブラウザ環境次第ですがこのブログ記事上ではうまく見えていない可能性が高いです。全角文字をきっちり半角文字二文字分の幅で表示してくれる環境で確認してください。)

 なお、これはPythonを起動する度に毎回設定する必要があります。永続化させるために、Pythonのスタートアップ時にpandasをimportして走らせる方法が公式では推奨されています。

Options and settings — pandas 1.0.3 documentation

 素のCPythonならPYTHONSTARTUP、IPythonなら$IPYTHONDIR/profile_default/startupなどに仕掛けておけとのことですが、そのためだけにpandasをimportさせるのも微妙な感じですね。そこだけは残念なところです。

1. コマンドラインと環境 — Python 3.8.2 ドキュメント
PYTHONSTARTUP で Python のインタラクティブシェルを便利にする | まくまくPythonノート
IPython起動時にいつも使うモジュールをimportする設定 - Qiita

numbaとnumpyで速いループ処理を書くためのガイド(スレッド並列化のおまけつき)(実はポエム)

はじめに

 この記事は「Pythonおっせーよ」と思っている、そこのあなたのためのものです。

 PythonはLLなので遅いです。その分、楽に書けるし、動的型付けでダイナミックなことができて「楽しぃぃいい」のですが、それでも遅くて困るときがあります。特に数値計算的なことをやらせようとすると*1

 え、numpy使えばいける? numpyは、numpyに組み込まれている関数では処理できなくて、ベクトル演算も効かないようなタイプの処理には基本的に無力です。いけません。

 どうせC/C++で書かれてコンパイルされたバイナリのwrapperと割り切って使うから良い? ライブラリが見つかればそれで良いでしょう。問題はライブラリに頼るまでもない、ちょっとした処理の実装です。コンパイル言語で書いたりすると(Cythonも含みます)、面倒くさくて割りに合わない気がします。なんでこんなことやってるんだという気持ちになれます。

 numbaだと、並のコンパイル言語よりは気楽に書けて、単純な数値計算とかなら並のコンパイル言語くらいの速度が実際に出ます。素晴らしいですね。

 ということで、numpy+numbaで書くときに気をつけることを色々と書いておこうと思います。

 はい。気をつけることが色々あるんです。気をつければ速いんですけどねー。けっこう面倒くさいんですよねー。それでもCythonより楽というのがnumbaの位置づけなので、知っておくとたまに得するかもしれません。どうしても高速に走らないといけないループだけnumbaで書き下すといったプログラミングが可能になります。それくらいのものです。

 目次

コンセプト

 前提として、numpyがどんな仕組みで高速な処理を達成しているのかを知らない人は、この記事を読んでも活用できないと思います。一応、この章で最低限のことは説明しますが、これを読んでわかる人には要らないような説明です。わからない人はCの勉強から。

 先にこの章の結論を書いてしまいます。numpyはぶっちゃけていえば、C配列のwrapperみたいなものです*2

 A = np.zeros(shape=(3, 4), dtype=np.float64)とかすると内部でmalloc(sizeof(double) * 3 * 4)されてメモリ領域が確保され、そしてこれへアクセスするためのPython Objectも作られます。プロキシ・オブジェクトです。PythonでA[i, j] = 1.0すると、内部で確保した配列に対して[i][j] = 1.0が走るわけです。

 ということで、numpy配列は原理的には、その気になればCの配列っぽく扱えます。scikit-learnなどの多くの機械学習ライブラリはnumpy配列で表現されたデータを受け取るように実装されていますが、「すべてがオブジェクト」的な世界観とは無縁のバイナリの世界で処理をするからこそ高速に処理を達成できる訳です。

 逆に、Python側からnumpy配列の中にある数値を取り出したり、numpy配列に数値を書き込んだりする処理は、びっくりするほど遅くなります。それをやる度に、いちいち対応するPython Objectを生成せざるを得ないのです。なにしろPythonの世界は「すべてがオブジェクト」だから、そうならざるを得ない。

 そういう事情から、Pythonでnumpyにバリバリ添字アクセスする処理を書いても、だいたい悲惨な結果になります。

 それでもnumpyは、Pythonで高速な数値計算を実現する方法を、不完全ではありますが提供してくれています。それがベクトル演算で、複数要素にまとめて同じような処理をするのであればPythonの世界とのやり取りはそんなにしなくて済むので、速いという発想です。

 多くの場合ベクトル演算は有用です。が、本質的には予め用意されている処理しか書けないし、どんな処理でもベクトル演算で書ける訳ではないので、限界があります。そういう限界に突き当たった人は添字アクセスの処理を書いて、あまりの遅さに絶望し、なんとかならないかと思ってnumbaにたどり着く訳です*3

 さらに言うと、numpyのベクトル演算にはもう一つ問題っぽいものがあります。素直に書くとベクトル演算した結果が新たな配列として返ることが多いので、けっこう豪快にメモリを使います。ある程度は累積代入文などでカバーできますが、駄目なときもあります。これもなんとかしたい点です。メモリを余計に食うから……ではなくて(GBとかの死ぬほどでかい配列でない限り、メモリ消費自体は普通は問題にはなりません)、メモリを新規に確保するのはそもそも遅い処理だし、キャッシュ効率も悪くなるので、できればインプレースで済むことはインプレースで済ませたいからです。

 ということで、numpy配列の中にあるC配列を直接添字アクセスしてループできると嬉しい、という需要がある訳です。というかそのためのnumpy配列です。もし本格的にやるならC/C++かCythonの出番ですが、もう少し気楽にやりたいという需要もある訳です。それがnumbaの存在する理由です。

 そしてこの記事は、実際にnumbaを使う考え方というか実践的なコツみたいなものを、思いつくままに書いていくというものです。

jitを効かせる

 numbaはJust-in-time compilationという技術でコードの高速化を行います。背後ではLLVMが動きます。私にとってはほとんど謎の技術ですが、ここではnumbaのバックエンドと割り切って気にしないことにします。コンパイラの賢さを信じる。

 重要なのはjitです。というかjitデコレータです。numbaの中核です。極論すればfrom numba import jitだけでnumbaは使えます。

 ただし、日本語Web圏でjitデコレータについて言われていることには注意が必要です。

 検索すると普通に出てくるnumbaの解説記事では「関数に@jitつけるだけでjitコンパイルされて速くなる!」とか書いてあることがあります。それしか内容がなかったら、ブラウザバックしましょう。それかタブを閉じましょう。役に立たない記事です。

 numbaは「オレの書いたイケイケなPythonコードをチョースゴイ技術でJITコンパイルしてくれる」技術ではありません。「色々気を配ってCっぽいPythonを書けばCっぽい速さで動いてくれる」というものです。そこを勘違いした人たちはみんなインターネットの夜空に輝く素晴らしい星々になりました。
(この流れでついでに書いておくと、Cythonは「PythonっぽいC」「Cではないにしても少なくともPythonではないなにか」です。numbaの場合はギリギリ構文まではPythonしています。Pythonパーサで構文解析されてPythonの抽象構文木をJITコンパイルするのです。あれ、それはそれでPythonではないっ?)

 なんで「@jitだけで」が駄目なのかについてもう少し書いておきます。単純な話、そもそもjitコンパイルできていないnumbaの記事が死ぬほどたくさんあります。numba非対応の機能を使うとそうなるのです。その場合は「Python互換モード」みたいなこれまた正体のよくわからないもので動かしてくれますが、こちらは劇的に遅いのが普通です。

 ということで、最低限以下の使い方をしましょう。

@jit(nopython=True)
def f(...):
    ...

 nopython=Trueというオプションが追加されました。こうすると謎の「Python互換モード」とはおさらばでき、コンパイルに失敗したらエラーが出ます。うまく動かないときは動かさない、これが第一歩です。


numpy配列で入出力のやり取りをする

 JITが速いのは実は静的型付けでコンパイルして動かすからです。

 私個人の心情としては静的型付けには窮屈とか面倒くさいといった印象を抱いてしまいがちですが*4、パフォーマンス上の理由なので好き嫌いは置いておいて静的型付けを使いましょう。

 numbaの良いところは型推論してくれることです。a = 42と書いてあったら、aはたぶんint64です、とか。なので型の指定は書かなくて済みます。cdefまみれになるCythonとの違いです。

 ただし、引数と返り値の型は指定した方が何かと高速化の恩恵を受けられます。というか、やらないとたぶんコンパイルエラーでまともに走らないでしょう。

2.2. Just-in-Time compilation — Numba 0.48.0-py3.6-macosx-10.7-x86_64.egg documentation

 指定の仕方も実は3通りもあったりします。1つ目はtupleで引数の型のみ指定する方法、2つ目はnumbaの型オブジェクトを作って渡す方法、3つ目は文字列で書く方法です。文字列で書きたい人が多いと思うので、それを紹介します。

 シグネチャはデコレータの第一引数であり、「使え」という強い意志を感じます。

@jit("i8(f8, f8)", nopython=True)
def f(a, b):
    ...

 "i8(f8, f8)"というのが型の指定になります。カッコの前が関数の返り値、後ろのカッコの中身が引数の型であり、i8は8バイト整数(つまりint64)、f8は8バイト浮動小数点数(float64)です。浮動小数点数を二つ受け取って整数を返す関数です。

 なお、Pythonオブジェクトをこういうnumbaの関数に渡した場合はそれなりに相互変換はしてくれますが、普通に「numpy配列の要素を取り出すとき」の問題が生じるので損な選択になります。

 ということで、numpy配列でやり取りしましょう。配列はスライス表記で表します。

@jit("i8[:](f8[:, :], f8[:, :, :])", nopython=True)
def f(a, b):
    ...

 返り値は整数の一次元配列、第一引数は浮動小数点数の二次元配列、第二引数は浮動小数点数の三次元配列みたいな関数です。

 これが、最初に書いた「numpy配列の中にあるC配列を直接いじりたい」を実現するための魔法です。こうしておけばnumbaが適切にやり取りしてやってくれます。なかなか夢が広がりますね。

 使える型とかの詳細はリファレンスを見てください。

2.1. Types and signatures — Numba 0.48.0-py3.6-macosx-10.7-x86_64.egg documentation

 また、numba.typeofを使うことで任意のオブジェクトに適合するシグネチャを調べることもできます。簡略表記で返してくれないというか、numbaの型オブジェクトを返してくれる(なので型オブジェクトを作って渡す方法向きではあります)のが微妙ですが。このオブジェクトどう書くんだ? というとき使いましょう。

>>> import numba
>>> numba.typeof(1)
int64
>>> numba.typeof(1.0)
float64
>>> @numba.jit("float64(int64)", nopython=True)
... def f(a):
...     return a + 1.5
... 
>>> f(10)
11.5

2.1. Types and signatures — Numba 0.48.0-py3.6-macosx-10.7-x86_64.egg documentation


 なお、当たり前ですが、合致しない型とか形状の配列を渡すと、実行時にエラーになります。JITコンパイルする本丸の関数とは別に、pythonレイヤで形状チェックとかastypeとかしてから本丸を呼び出すようなwrapper関数を書いておくと実用的です。どうせ自分しか使わないしコードのこの箇所では入ってくる型は確定しているしということで割り切るなら、それはそれで実用的です。

 あと、numpy配列を渡せば普通に引数に副作用を及ぼせるので、返り値は必須ではありません。インプレース処理にしてもいいし、返り値用の配列を引数として渡してもいいのです。返り値が要らないときは返り値の型のところにはvoidと書いてください。

jitコンパイルできるコードを書く

 
 ここと、

2.6. Supported Python features — Numba 0.48.0-py3.6-macosx-10.7-x86_64.egg documentation

 ここを、

2.7. Supported NumPy features — Numba 0.48.0-py3.6-macosx-10.7-x86_64.egg documentation


 見て、書いてある機能だけで書いてください。終了。

 では不親切なので、もう少し書きます。

 たとえばnumbaではループを書くのにPythonのrangeが普通に使えます。いえ、使えるようにみえます。

# @jit(nopython=True)しても動く
def f():
    a = 0
    for i in range(10):
        a += i
    print(i)

 Pythonだとrangeオブジェクトを生成して、そこから生成したイテレータから値を一つずつ取り出して……という感じでこのコードは動きます。

>>> range(10)
range(0, 10)

 numbaはまったく違う仕組みで実行します。最終的には機械語のループに変換されます。カウンタ足してってジャンプ命令とかする奴です。

 なので当然ながら制約があります。rangeオブジェクトを引数として受け取ったりはできません。

# @jit(nopython=True)できない
def f(r):  # f(range(10))とか呼び出したいができない
    a = 0
    for i in r:
        a += i
    print(a)

 といった面倒くさい決まりごとが色々あるのがnumbaです。細かく把握して慣れ親しんでいけば、そのうち「こんなのnumbaで書けるんだ!」という驚きが発生するかもしれませんが、普通はだるいのでコンサバティブなコードを書くことになります。

 それでも一昔前のnumbaに比べれば最近のnumbaはかなり立派になっていて、たとえばリスト内包表記なんか昔はできなかったのにいつの間にかできるようになっていて私なんかはかなりびびったのですが、でもPythonオブジェクトのリストを作るんだから遅えんだろうな……という諦観も生じてきて複雑な気持ちになったりして、まあ要するに「Python的な」コード、向いてませんよということです。どうせ相手にするのはnumpy配列、CかFORTRANを書くつもりで書くくらいの気持ちで扱いましょう。そうすると高級なコンパイル言語……? という気がしてきて幸せになれます。

 ということで、手っ取り早いのはrangeループによるインデックスアクセスとnumpyの機能とを組み合わせて使うことなので、numpyへのサポートを見てみます。

 結論を先に言ってしまうと、我々が想像することはだいたいできます。基本的なindexingやslicingはできますし、代表的な関数もだいたい使えます。

 numpyのリッチな機能が使えても、それはつまりnumpyとして走るだけなので、numba的に速くはないのが微妙なのですが。
(たとえば単一のnumpy関数を呼び出して時間がかかっている処理をそのまま関数に入れて@jitしても、高速化されません。つーかそれで速くなったら大慌てだよね。たぶんみんな使うし、それができるならnumpy開発者が最初から組み込むよね)

 コツとしては「Pythonコードで書かれた複雑な処理」をそのまま移すんじゃなくて、ボトルネックになっている書かざるを得ないループをnumbaに移すようにしましょう。短い方がコンパイルを通すのが楽です。あとは走らせるとコンパイルエラーがたくさん出ると思うので、だいたい読んでググればなんとかなります。それと、最初から書いたものを分析やシミュレーションなどの処理の中に組み込で使おうとすると、書き換える度に五分前処理してエラーになるとか辛い事態が発生するので、ダミーデータで走らせるテストも書いておくと良いでしょう(これはnumbaに限らず大切なことなのですが)。

ちょっとだけ実践編

 お待たせしました、実践編です。実際にnumba+numpyでどんなことができるのか、簡単に見ていきます。

 この記事ではfloat64のcumsumを実装します。二次元配列を受け取り、行ごとにcumsumするものを目指します*5

 え、cumsumはnumpyにありますよね。numba要らないって? ブラウザバックしないでください

 私が定義するcumsum(mycumsumと名付けます)は以下のような計算です。

 任意の要素xに対して

  • 0.0 <= x

 普通にcumsm

  • -1.0 <= x < 0

 リセットしてxを0にする

  • -2.0 <= x < -1.0

 前の値を0.5倍したものをxにする(最初なら0)

  • x < -2.0

 前の値に42.0を足してxにする(最初なら42.0)

 はい、cumsumじゃないですよね。名前が思いつきませんでしたが、cumsumに余計な処理が増えたなにかです。

 とりあえずpythonで書いてみましょう。

import random

def mycumsum(lst):
    t = 0.0
    result = []
    for x in lst:
        if 0.0 <= x:
            t += x
        elif -1.0 <= x < 0.0:
            t = 0.0
        elif -2.0 <= x < 1.0:
            t *= 0.5
        else:
            t += 42.0
        result.append(t)
    return result


random.seed(42)
data = [random.uniform(-10.0, 10.0)  # 見づらいので[-10, 10]にする
        for _ in range(10)]

print(data)
print(mycumsum(data))

""" =>
[2.7885359691576745, -9.49978489554666, -4.499413632617615, -5.5357852370235445, 4.729424283280249, 3.533989748458225, 7.843591354096908, -8.261223347411677, -1.561563606294591, -9.404055611238594]
[2.7885359691576745, 44.78853596915768, 86.78853596915768, 128.7885359691577, 133.51796025243794, 137.05195000089617, 144.89554135499307, 186.89554135499307, 93.44777067749654, 135.44777067749652]
"""


 書けたけど……と思いますよね。これは遅いPythonの処理です。上述の理由によりnumpy配列の要素にPythonレイヤからアクセスすると遅いので、numpy配列を渡してもそんなに変わりません(かえって遅い可能性もあります)。そしてこの関数をnumpyのベクトル演算で効率的に実装することはできません(たぶん)。

 まあでも、一応速度を測ってみるか。意外と速いかもしれない。

import time
import random
import numpy as np


def mycumsum_row(lst):
    t = 0.0
    result = []
    for x in lst:
        if 0.0 <= x:
            t += x
        elif -1.0 <= x < 0.0:
            t = 0.0
        elif -2.0 <= x < 1.0:
            t *= 0.5
        else:
            t += 42.0
        result.append(t)
    return result

def mycumsum(A):
    return [mycumsum_row(row) for row in A]

random.seed(42)
# (1000, 1000)配列とします
data = [[random.uniform(-10.0, 10.0) for _ in range(1000)]
        for _ in range(1000)]

# 実行時間計測
times = []
for i in range(10):
    t1 = time.time()
    res = mycumsum(data)
    t2 = time.time()
    times.append(t2 - t1)

print(np.mean(times)) # => 0.11440501213073731

# numpyのcumsumもやります
data = np.array(data)  # numpy配列にしないと遅いんで・・・

# 実行時間計測
times = []
for i in range(10):
    t1 = time.time()
    res = np.cumsum(data, axis=1)
    t2 = time.time()
    times.append(t2 - t1)

print(np.mean(times)) # => 0.003912162780761719

# 全体の結果を改めて
""" =>
0.11293444633483887
0.003912162780761719
"""

 知 っ て た 。

 まあ、処理の複雑性を考えると意外と健闘している方かもしれません。約29倍で済んでいます。普通にpythonでループさせるとこれくらいになるという例です。

 じゃあ、次はnumbaで書きましょう。実装の仕方はこんな感じです。

@jit("f8[:, :](f8[:, :])", nopython=True)
def mycumsum_numba(A):
    result = np.empty(A.shape, dtype=np.float64)
    for i in range(A.shape[0]):  # 行のループ
        t = 0.0
        for j in range(A.shape[1]):  # 列のループ
            x = A[i, j]
            if 0.0 <= x:
                t += x
            elif -1.0 <= x < 0.0:
                t = 0.0
            elif -2.0 <= x < 1.0:
                t *= 0.5
            else:
                t += 42.0
            result[i, j] = t
    return result

 特に解説は要らないでしょう。最適化とかはまったく考えていない愚直な実装です。

 また測る。

import time
import random
import numpy as np
from numba import jit

def mycumsum_row(lst):
    t = 0.0
    result = []
    for x in lst:
        if 0.0 <= x:
            t += x
        elif -1.0 <= x < 0.0:
            t = 0.0
        elif -2.0 <= x < 1.0:
            t *= 0.5
        else:
            t += 42.0
        result.append(t)
    return result

def mycumsum(A):
    return [mycumsum_row(row) for row in A]

@jit("f8[:, :](f8[:, :])", nopython=True)
def mycumsum_numba(A):
    result = np.empty(A.shape, dtype=np.float64)
    for i in range(A.shape[0]):  # 行のループ
        t = 0.0
        for j in range(A.shape[1]):  # 列のループ
            x = A[i, j]
            if 0.0 <= x:
                t += x
            elif -1.0 <= x < 0.0:
                t = 0.0
            elif -2.0 <= x < 1.0:
                t *= 0.5
            else:
                t += 42.0
            result[i, j] = t
    return result
            

random.seed(42)
# (1000, 1000)配列とします
data = [[random.uniform(-10.0, 10.0) for _ in range(1000)]
        for _ in range(1000)]

# 実行時間計測
times = []
for i in range(10):
    t1 = time.time()
    res_mycumsum = mycumsum(data)
    t2 = time.time()
    times.append(t2 - t1)

print(np.mean(times)) # => 0.11440501213073731

# numpyのcumsum
data = np.array(data)

# 実行時間計測
times = []
for i in range(10):
    t1 = time.time()
    res = np.cumsum(data, axis=1)
    t2 = time.time()
    times.append(t2 - t1)

print(np.mean(times)) # => 0.003912162780761719

# 自作numba版
times = []
for i in range(10):
    t1 = time.time()
    res_mycumsum_numba = mycumsum_numba(data)
    t2 = time.time()
    times.append(t2 - t1)

assert (res_mycumsum_numba == np.array(res_mycumsum)).all()  # 同じ結果か確認

print(np.mean(times)) # => 0.005931735038757324


# 全体の結果を改めて
""" =>
0.11293444633483887
0.003912162780761719
0.005931735038757324
"""

 numpyのcumsumの1.5倍で実行できます。処理の複雑性を考えると大健闘と言えます。

 という感じで使えるのがnumbaで、これはなかなかすごいことだと思います。

GILを切ってマルチスレッドする(おまけ)

 もっと速くしたいときは、numbaではGILを切れます。

 これを使ってマルチスレッド並列化による高速化ができます。Python的には未知の世界です。しかもpython標準のthreadingから叩けば勝手にGILなしで動くので、使いやすいものです。

 ただし先に言っておきますが、スレッド間の複雑な同期処理は難しそうなので、独立に動かせるスレッドがあることを前提にしてください……

 公式の例はこんなのですが、

1.19. Examples — Numba 0.48.0-py3.6-macosx-10.7-x86_64.egg documentation

 私もやってみます。



 まず、並列化する前のコードです。速すぎるので調子に乗って配列を大きくしました。

import time
import numpy as np
from numba import jit


@jit("f8[:, :](f8[:, :])", nopython=True)
def mycumsum_numba(A):
    result = np.empty(A.shape, dtype=np.float64)
    for i in range(A.shape[0]):  # 行のループ
        t = 0.0
        for j in range(A.shape[1]):  # 列のループ
            x = A[i, j]
            if 0.0 <= x:
                t += x
            elif -1.0 <= x < 0.0:
                t = 0.0
            elif -2.0 <= x < 1.0:
                t *= 0.5
            else:
                t += 42.0
            result[i, j] = t
    return result

np.random.seed(42)
data = np.random.uniform(-10.0, 10.0, size=(5000, 50000))

# 自作numba版
times = []
for i in range(10):
    t1 = time.time()
    res_mycumsum_numba = mycumsum_numba(data)
    t2 = time.time()
    times.append(t2 - t1)

print(np.mean(times)) # => 1.8200997829437255

 そろそろ速度的にきつくなってきました。スレッド並列化でこれを数倍速くします。

 GILを切るにはnogil=Trueを指定します。が、GILを切ることで速くなるわけではありません。切った上でマルチスレッド処理を書くことで速くなるのです。

 実装としてはこんな感じです。

# 並列版の中身
@jit("void(f8[:, :], f8[:, :], i8, i8)", nopython=True, nogil=True)
def mycumsum_numba_pi(A, result, start, stop):
    for i in range(start, stop):  # 行のループ
        t = 0.0
        for j in range(A.shape[1]):  # 列のループ
            x = A[i, j]
            if 0.0 <= x:
                t += x
            elif -1.0 <= x < 0.0:
                t = 0.0
            elif -2.0 <= x < 1.0:
                t *= 0.5
            else:
                t += 42.0
            result[i, j] = t

# 並列版の外側(マルチスレッドで呼び出すためのもの)
def mycumsum_numba_p(A, n_threads=8):  # n_threadsはマシンに合わせて調整してください。当方8C/16Tです。16も試しましたがかえって遅かったので8にしています
    n = A.shape[0]
    result = np.empty(A.shape, dtype=np.float64)
    cl = n // n_threads
    l = [cl * x for x in range(n_threads)] + [n]
    args = [(A, result, start, stop) for start, stop in zip(l, l[1:])]
    threads = [threading.Thread(target=mycumsum_numba_pi, args=a)
               for a in args]
    for th in threads:
        th.start()
    for th in threads:
        th.join()
    return result

 中身にはnogil=Trueしないと何も意味のないことになるので付けましょう。外側はnumbaの関数にしなくてオッケーです。

 結果を受け取るために、返り値を使うのをやめて引数への副作用で結果を取り出すテクを使います。公式の例を見る限り、うまくviewを活かせば、もしかしたらここまで愚直なインデックスアクセスに頼らなくてもできそうな気もしますが、よくわからないのでstartとstopで実装です。

 これによりnumbaで走るスレッドの外側で生成されたresultに対してスレッド並列に書き込みが行われるはずです。

 計測に使ったコードです。

import time
import threading
import numpy as np
from numba import jit

# 愚直な実装
@jit("f8[:, :](f8[:, :])", nopython=True)
def mycumsum_numba(A):
    result = np.empty(A.shape, dtype=np.float64)
    for i in range(A.shape[0]):  # 行のループ
        t = 0.0
        for j in range(A.shape[1]):  # 列のループ
            x = A[i, j]
            if 0.0 <= x:
                t += x
            elif -1.0 <= x < 0.0:
                t = 0.0
            elif -2.0 <= x < 1.0:
                t *= 0.5
            else:
                t += 42.0
            result[i, j] = t
    return result

# 並列版の中身
@jit("void(f8[:, :], f8[:, :], i8, i8)", nopython=True, nogil=True)
def mycumsum_numba_pi(A, result, start, stop):
    for i in range(start, stop):  # 行のループ
        t = 0.0
        for j in range(A.shape[1]):  # 列のループ
            x = A[i, j]
            if 0.0 <= x:
                t += x
            elif -1.0 <= x < 0.0:
                t = 0.0
            elif -2.0 <= x < 1.0:
                t *= 0.5
            else:
                t += 42.0
            result[i, j] = t

# 並列版の外側(マルチスレッドで呼び出すためのもの)
def mycumsum_numba_p(A, n_threads=8):
    n = A.shape[0]
    result = np.empty(A.shape, dtype=np.float64)
    cl = n // n_threads
    l = [cl * x for x in range(n_threads)] + [n]
    args = [(A, result, start, stop) for start, stop in zip(l, l[1:])]
    threads = [threading.Thread(target=mycumsum_numba_pi, args=a)
               for a in args]
    for th in threads:
        th.start()
    for th in threads:
        th.join()
    return result
    
np.random.seed(42)
data = np.random.uniform(-10.0, 10.0, size=(5000, 50000))

# 検証
assert (mycumsum_numba(data) == mycumsum_numba_p(data)).all()

# 自作numba版
times = []
for i in range(10):
    t1 = time.time()
    res = mycumsum_numba(data)
    t2 = time.time()
    times.append(t2 - t1)

print(np.mean(times)) # => 1.819163990020752

# 自作numba並列版
times = []
for i in range(10):
    t1 = time.time()
    res = mycumsum_numba_p(data)
    t2 = time.time()
    times.append(t2 - t1)

print(np.mean(times)) # => 0.2936941385269165


""" =>
1.8091564416885375
0.2936941385269165
"""

 ということで8スレッドで並列に動かして6倍くらい速くなりました。オーバーヘッドもあれば記憶領域の効率(キャッシュとメモリ帯域)的にも悪いのでこんなものです。それでもmultiprocessingでプロセス並列でやるより、原理的にははるかに効率的な方法です。

まとめ

 こんな感じで使えて速くて便利なnumbaを使わない手はないっ……!

 という訳でもないんですよね。必要になるシチュエーションは限られているので、数値計算寄りのことをしている一般的Pythonユーザでも一年に一回あるかどうかでしょう。私は4年間くらい実用的に使わずに過ごしてきましたが*6、そんなに困りませんでした。

 本格的にアルゴリズムの実装などに使うには何かと機能的に物足りないのは事実で、C/C++やCythonの方がたぶん向いています。また、日常的にインデックスループを書いて走らせる必要がある人はJuliaの方がいいかもしれません。

 じゃあなんで? という話になるのですが、こちらの記事の冒頭にある表を見てください。

qiita.com

 速度的には、CやJuliaと互角に見えます(CとJuliaが互角なのがそもそもびっくりですが、numbaはそれと互角、という意味)。つまりCの速度で動くギリギリPythonです。唯一無二の存在と言っていいでしょう。

 これは強い……

 どんな感じでやるのか知っておいて、頭の引き出しに入れておけば、必要になったときに取り出せます。雑に使うだけなら学習コスト低いのが良いところです*7。それで一年に一回の行き詰まりのときに、諦めるしかないか、突破できるかが決まると思います。

*1:ちなみにこの記事では実践的な数値計算はしません。それでも多少はなにかの手助けになるでしょう。

*2:動的メモリ確保してるから配列じゃないとか面倒くさいこと言わないでください、配列です。

*3:あるいはCythonかもしれないし、Pythonを見限ってJuliaに行ってしまうかもしれませんが

*4:あ、でもCとか古めの静的型付けは好きです。苦手な気がするのはオブジェクト指向以降のクラスと絡みついたモダンな静的型付けです。Javaのインターフェースとか必要なのはわかるけど好きになれる気がしません。新しめのpandasのリファレンスは丁寧にシグネチャにPythonの型ヒントを書いてくれていますが、あれとか正気の沙汰とは思えません。

*5:numpyのaxis=1相当

*6:存在は知ってるけどどう使うのかよくわからない、@jitしても速くならないし……とかそんなレベルでした。

*7:この記事読めばできるでしょという期待を込めて書いています。

scikit-learnのStandardScalerで疎行列型のまま標準化する

ことのあらまし

 データの標準化は機械学習の前処理としてとても重要です。そして疎行列型データ構造は、スパースなデータを表現するためにはとても適しています。

 残念ながら、普通に疎行列型を標準化しようとすると、疎行列性が失せます。考えてみればわかるのですが、普通の標準化では平均0にしてしまいます。たとえば非負の疎行列だとすると、大半を占める0のデータはたぶん負値になることでしょう。そしてスパース性は維持できません。

scikit-learnの素敵な対策

 ま、この辺は当然考えられていて、なんと超親切なメッセージが出ます。

from scipy.sparse import csr_matrix
from sklearn.preprocessing import StandardScaler

a = csr_matrix([[0, 1, 0, 3, 0],
                [1, 0, 0, 1, 2],
                [0, 0, 4, 0, 0]])
print(repr(a))


ss = StandardScaler()
result = ss.fit_transform(a)  # ここでエラー発生
print(repr(result))
ValueError: Cannot center sparse matrices: pass `with_mean=False` instead. See docstring for motivation and alternatives.

 少し前のscikit-learnだと本当にこの挙動だったかどうかは少し記憶が怪しいんですが、今0.22で試したらこうなっていました。

with_mean=Falseとは?

 標準化の式というと、普通は

\displaystyle z_i = \frac{x_i - \bar{x}}{\sigma _x}

 これで考えると思います。しかし今回は中心化を行わずに

\displaystyle z_i = \frac{x_i}{\sigma _x}

 で処理します。

 こうすると0は0のままで、かつ分散は1に調整されます。

 この場合は疎行列として表現できてよさげですね。scikit-learnはこの場合、疎行列型を維持したまま計算してくれます。とても助かります。

 値としてはこんな結果になります。

from scipy.sparse import csr_matrix
from sklearn.preprocessing import StandardScaler

a = csr_matrix([[0, 1, 0, 3, 0],
                [1, 0, 0, 1, 2],
                [0, 0, 4, 0, 0]])
print(repr(a.A))
""" =>
array([[0, 1, 0, 3, 0],
       [1, 0, 0, 1, 2],
       [0, 0, 4, 0, 0]], dtype=int64)
"""

ss = StandardScaler(with_mean=False)
result = ss.fit_transform(a)
print(repr(result.A))

""" =>
array([[0.        , 2.12132034, 0.        , 2.40535118, 0.        ],
       [2.12132034, 0.        , 0.        , 0.80178373, 2.12132034],
       [0.        , 0.        , 2.12132034, 0.        , 0.        ]])
"""

 なお、ただのnumpy配列としてスパースなデータを入力した場合は特にメリットはありません。

まとめ

 疎行列型のまま扱うとパフォーマンス上のメリットが得られることが多いので、積極的に試してみるべきだと思います。

【python】zipを使ってn-gram列を生成する

はじめに

 n-gramは自然言語処理でよく使われる方法です。

n-gram - Wikipedia

 さて、以下のような関数を作りたいとします。

n_gram("abcde", n=2, sep="-")  # ["a-b", "b-c", "c-d", "d-e"]

 n=2ならbigram, n=3ならtrigramという言い方があります。さて、たとえばbigramなら以下のように書けます。

>>> def bigram(seq, sep="-"):
...     return [sep.join(x) for x in zip(seq, seq[1:])]
... 
>>> bigram("abcde")
['a-b', 'b-c', 'c-d', 'd-e']

 これは私の発明ではありませんが、pythonに慣れている人なら誰でも思いつく可能性はあると思います。

 trigramはこう。

>>> def trigram(seq, sep="-"):
...     return [sep.join(x) for x in zip(seq, seq[1:], seq[2:])]
... 
>>> trigram("abcde")
['a-b-c', 'b-c-d', 'c-d-e']

一般化

 zipの引数部分をnの数だけ繰り返せば、一般化することができます。ここはジェネレータ式の出番です。

>>> def n_gram(seq, n=2, sep="-"):
...     return [sep.join(x) for x in zip(*(seq[i:] for i in range(n)))]
... 
>>> n_gram("abcde", n=1)
['a', 'b', 'c', 'd', 'e']
>>> n_gram("abcde", n=2)
['a-b', 'b-c', 'c-d', 'd-e']
>>> n_gram("abcde", n=3)
['a-b-c', 'b-c-d', 'c-d-e']
>>> n_gram("abcde", n=4)
['a-b-c-d', 'b-c-d-e']
>>> n_gram("abcde", n=5)
['a-b-c-d-e']
>>> n_gram("abcde", n=6)
[]

効率化

 上の実装はラフに見てseqのn倍のメモリ領域を食ってしまいます。

 そんなに深刻な問題でもありませんが、とはいえこのままではなんとなくエレガントではないので、isliceを使いたいと思います。

itertools --- 効率的なループ実行のためのイテレータ生成関数 — Python 3.8.1 ドキュメント

 ドキュメントには明記されていなかったと思いますが、seq[i:]とするにはislice(seq, i, None)でいいのだと思います。

>>> from itertools import islice
>>> def n_gram(seq, n=2, sep="-"):
...     return [sep.join(x) for x in zip(*(islice(seq, i, None) for i in range(n)))]
... 
>>> n_gram("abcde", n=1)
['a', 'b', 'c', 'd', 'e']
>>> n_gram("abcde", n=2)
['a-b', 'b-c', 'c-d', 'd-e']
>>> n_gram("abcde", n=3)
['a-b-c', 'b-c-d', 'c-d-e']
>>> n_gram("abcde", n=4)
['a-b-c-d', 'b-c-d-e']
>>> n_gram("abcde", n=5)
['a-b-c-d-e']
>>> n_gram("abcde", n=6)
[]

シーケンスを第一引数に取る意味

 任意のイテラブルを取れるようにしておくと、

>>> n_gram(["I",  "Am", "a", "Cat"])
['I-Am', 'Am-a', 'a-Cat']

 のように応用が効いたりして便利です。

sklearnで混同行列をヒートマップにして描画するplot_confusion_matrix

はじめに

 scikit-learnのv0.22で、混同行列をプロットするための便利関数であるsklearn.metrics.plot_confusion_matrixが追加されました。

 使いやすそうなので試してみます。

使い方

 リファレンスはこちらです。

sklearn.metrics.plot_confusion_matrix — scikit-learn 0.22 documentation

 引数のフォーマットを見ると、

sklearn.metrics.plot_confusion_matrix(estimator, X, y_true, labels=None, sample_weight=None, normalize=None, display_labels=None, include_values=True, xticks_rotation='horizontal', values_format=None, cmap='viridis', ax=None)

 あ、予測器とXとyを入れるタイプの関数だ。なんか微妙に使いづらいですね。この時点でなんか困惑気味ですが、やってみます。

import matplotlib.pyplot as plt
from sklearn.datasets import load_wine
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import plot_confusion_matrix


wine = load_wine()
X_train, X_test, y_train, y_test = train_test_split(
    wine.data, wine.target, stratify=wine.target)

clf = LogisticRegression()
clf.fit(X_train, y_train)
plot_confusion_matrix(clf, X_test, y_test,
                      display_labels=wine.target_names, 
                      cmap=plt.cm.Blues,)
plt.savefig("result.png")

result.png
result.png

 ソースを見る限り、内部で交差検証などしてくれる訳ではないようなので、学習済みモデルとテストデータを渡してプロットさせます。また、labelsという引数がありますが微妙に罠っぽくて、表示に使われるラベルはdisplay_labelsの方です。

 一応各引数の説明など。

  • estimator, X, y_true

 は、説明要らないよね。上で示したのと同じ使い方をします。

  • labels

 ラベルの順序を並び替えたり、一部のラベルのみ取り出してプロットしたいとき使うそうです。y_trueの中身が[0,0,0,1,1,1]なら[0,1]や[1,0]などが指定できます。別に要らないでしょう。

  • sample_weightarray-like of shape (n_samples,), default=None

 サンプルの重み。

  • normalize{"true", "pred", "all"}, default=None

 全体を正規化するかどうか。するならその方法を文字列で指定します。

  • display_labels

 表示されるラベルの名前はこちらで指定します。使用頻度は高いはずです。

  • include_valuesbool, default=True

 Falseに設定すると数字が出てこなくなります。普通は数字があったほうが好ましいでしょう。

  • xticks_rotation{"vertical", "horizontal"}

 x軸のラベルが回転するかどうか。デフォルトでは回転しません。

  • values_format

 "d"や".2f"などが指定できる。表示の書式で、format関数などに準じると思われる。

  • cmap, ax

 matplotlib関連です。デフォルトのcmapの"viridis"がexampleで「ダサいよねこれ」とBluesにされているあたり泣けます。 

使いづらいのでConfusionMatrixDisplayを使うことにする

 なんで混同行列を描くためだけにpredictメソッド走らせなきゃいかんのだと思ったので、仕様を確認します。すると、ConfusionMatrixDisplayなるクラスがあることがわかります。

It is recommend to use plot_confusion_matrix to create a ConfusionMatrixDisplay.
sklearn.metrics.ConfusionMatrixDisplay — scikit-learn 0.22 documentation

 うるせえ、あるもんは使うんじゃ。

 インスタンスを作ってplotメソッドを呼ぶと動きます。引数はだいたい上と共通ですが、インスタンスを作るときに

class sklearn.metrics.ConfusionMatrixDisplay(confusion_matrix, display_labels)

 なので柔軟性が多少上がります。

import matplotlib.pyplot as plt
from sklearn.datasets import load_wine
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay


wine = load_wine()
X_train, X_test, y_train, y_test = train_test_split(
    wine.data, wine.target, stratify=wine.target)

clf = LogisticRegression()
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
cmx = confusion_matrix(y_test, y_pred)
cmd = ConfusionMatrixDisplay(cmx, wine.target_names)
cmd.plot()
plt.savefig("result.png")

 結果の図は同じなので省略。任意の予測ラベルで描画しようと思えばできます。ちょっと微妙な感じもしますが、許容範囲でしょう。

まとめ

 このようなものができましたので、今後は混同行列のプロットではそんなに困らないと思います。

 あとどうでもいい話、そもそもこのブログは混同行列の描き方がわからなくて調べてまとめたのが始まりですが、

www.haya-programming.com

 なんとなく原点回帰した感があって感動しています。

pandasで年月日時刻の列を結合して一列にする(datetime64で)

概要

 ローデータ(生データ)を取り込むと、年月日が独立して入っている感じの嫌なデータになっていることがあります。

年,月,日
1996,8,1
1998,12,2
2012,05,3

 こういうのは嬉しくないので、できるだけ単一のdatetime風の型に変換しておきたいのですが、意外と難しかったりします。

文字列操作として考える

 以下のように読み込みます(io.StringIOを使っていますが実際はCSVファイルだと思ってください)。

import io
import pandas as pd

data = """
年,月,日
1996,8,1
1998,12,2
2012,05,3
"""

df = pd.read_csv(io.StringIO(data), dtype={k:object for k in "年月日"})

 型をobject型にしておくのがミソで、整数型にされると文字列操作で変換できません。読み込んでからastypeで変換してもいいですが、二度手間ですね。

df["DateTime"] = pd.to_datetime(df["年"].str.cat([df["月"], df["日"]], sep=" "))
print(df)
""" =>
      年   月  日   DateTime
0  1996   8  1 1996-08-01
1  1998  12  2 1998-12-02
2  2012  05  3 2012-05-03
"""

 これはこれでできるのですが、文字列を介すると二度手間感が否めません。

時刻もある場合

 とにかくそれっぽいフォーマットに無理矢理仕立てれば、この方法でできます(というかできるはずです)。

import io
import pandas as pd

data = """
年,月,日,時,分
1996,8,1,12,5
1998,12,2,4,12
2012,05,3,23,56
"""

df = pd.read_csv(io.StringIO(data), dtype={k:object for k in "年月日時分"})
df["DateTime"] = pd.to_datetime(df["年"].str.cat([df["月"], df["日"], df["時"]], sep=" ").str.cat(df["分"], sep=":"))
print(df)
""" =>
      年   月  日   時   分            DateTime
0  1996   8  1  12   5 1996-08-01 12:05:00
1  1998  12  2   4  12 1998-12-02 04:12:00
2  2012  05  3  23  56 2012-05-03 23:56:00
"""

 やはりスマートではない。

内包表記でdatetimeっぽい型のリストにすればいいんだよ

 そう、普通はそうしたいところ。

 型で迷うと思いますが、たぶんTimestampでいいと思います。

pandas.Timestamp — pandas 0.25.3 documentation

import io
import pandas as pd

data = """
年,月,日,時,分
1996,8,1,12,5
1998,12,2,4,12
2012,05,3,23,56
"""

df = pd.read_csv(io.StringIO(data))
df["DateTime"] = [
    pd.Timestamp(
        year=row["年"], month=row["月"], day=row["日"],
        hour=row["時"], minute=row["分"])
    for i, row in df.iterrows()]

print(df)
""" =>
      年   月  日   時   分            DateTime
0  1996   8  1  12   5 1996-08-01 12:05:00
1  1998  12  2   4  12 1998-12-02 04:12:00
2  2012   5  3  23  56 2012-05-03 23:56:00
"""

 読み込みで文字列にしないといけない、二度手間、といった微妙さがなくなりました。わーい。

 これはこれで上手くいきます。が、スマートなはずなのにスマートに見えない。キーワード引数の指定が汚すぎるからですね。

 ダブルアスタリスクのアンパックを使えば……とか一瞬は思いましたが、そのためには列名を変えたdfをコピーして作らないといけません。

import io
import pandas as pd

data = """
年,月,日,時,分
1996,8,1,12,5
1998,12,2,4,12
2012,05,3,23,56
"""

df = pd.read_csv(io.StringIO(data))
df_d = df[["年", "月", "日", "時", "分"]].copy()
df_d.columns = ["year", "month", "day", "hour", "minute"]
df["DateTime"] = [pd.Timestamp(**row)
                  for i, row in df_d.iterrows()]

print(df)
""" =>
      年   月  日   時   分            DateTime
0  1996   8  1  12   5 1996-08-01 12:05:00
1  1998  12  2   4  12 1998-12-02 04:12:00
2  2012   5  3  23  56 2012-05-03 23:56:00
"""

 こっちの方が多少スマートかな。上の書き方でも別に困ることはないです。

まとめ

 普通にTimestampのiterableを突っ込めばいいだけだけど、このやり方が調べても出てこなくて、できないのかなとか思って焦りつつやってみたらできたので記事にしました。

 日付時刻の扱いは割と面倒ですが、けっきょくのところ素直に組んでいけば良いはず。

参考

Pandasでの日付・時間周りのちょっとしたチートシート - Qiita
 これと同じようなことをやっています。

【python】UnboundLocalErrorの原因と対処法

はじめに

 関数の中で関数の外の変数を操作するようなコードを書いていると、たまに下記のようなエラーが出ます。

UnboundLocalError: local variable '***' referenced before assignment

 初歩的ですが、意外とまとまった良い解説がないので、記事にしておきます。

原因

 まず、実際にこのエラーが出るコードを示します。

def f():
    print(a)
    a = 10

f()

 実際に出るエラーメッセージは以下のようなものです。

Traceback (most recent call last):
  File "***.py", line 5, in <module>
    f()
  File "***.py", line 2, in f
    print(a)
UnboundLocalError: local variable 'a' referenced before assignment

 シンプルな例ですが、ローカル変数aが代入の前にprintで参照されているのがおわかりいただけるかと思います。

 さて、実は以下のようにしても同じエラーが出ます。

a = 20
def f():
    print(a)
    a = 10

f()

 今度はaが定義されているのになぜ? と思うかもしれませんが、変数のスコープについて考察すると理解できます。

スコープ (scope)とは|「分かりそう」で「分からない」でも「分かった」気になれるIT用語辞典

 a = 20と書かれている部分のaはグローバル変数、関数内でa = 10と書かれている部分のaは別物です。前者がある位置をグローバルスコープ、後者がある位置を関数のスコープと言ったりします。この違いがあるからこそ、外の名前空間に悪影響を及ぼさないようにしつつ自由な処理を関数で行うことができるわけです。

ローカル変数とグローバル変数の動作の例

a = 20
def printa():
    a = 10
    print(a)

print(a)  # 20
printa()  # 10
print(a)  # 20

 もちろん関数内のスコープから外のスコープの変数が見えないという訳ではなく、通常同名のローカル変数がなければ外側のスコープから変数が探索されます。

a = 20
def printa():
    print(a)

printa()  # 20

 ただし、同名のローカル変数がなければ、というのが曲者で、ローカル変数があるかどうかは「構文から静的に決定される」ことになります。具体的にいうと、関数の中でその変数に対して代入文があれば、それはローカル変数とみなされます。なお、+=などの累積代入文なども代入文とみなされます。

 どのタイミングで代入文が実行されるのかは考慮されません。一度ローカル変数とみなされてしまえば、最初から最後までローカル変数です。

なんとなく動きそうだけど実際は駄目なコード

a = 30  # 関数の中とは無関係
def f():
    print(a)  # ここで30になったりはしない(この時点でエラー)
    a = 40   # これがある限りaはローカル変数
    print(a)  # 40が出たりはしない()
f()

 繰り返しますが、「関数の中でその変数に対して代入したらローカル変数」「ローカル変数に決まったとしても、その変数が参照されたタイミングで変数が実在している保障はない(むしろない可能性だって十分にある)」ということです。気をつけましょう。

対処法

 ローカル変数なんて面倒くさいものは使わなくていい、という場合はglobal宣言を使うことが出来ます。当たり前ですが、こうしてしまうとグローバル変数を直接扱うのと同じことになり、呼び出しで副作用が発生します。

a = 20
def f():
    global a
    print(a)
    a = 10
print(a)  # 20
f()       # 20
print(a)  # 10

 ローカル変数でいいのだが外部の情報を使いたい、という場合は、引数で値を渡す設計にするべきでしょう。

a = 20
def f(a):
    print(a)
    a = 10
    print(a)
print(a)  # 20
f(a)      # 20
          # 10 
print(a)  # 20

 また、この記事でこれまで書いてきた解説とはあまり関係ありませんが、通常のNameErrorと同様のシチュエーションでもUnboundLocalErrorになることがあります。条件分岐が絡んだりすると、起こりがちです。

 たとえばこういうコードは通常NameErrorになります。

if False:  # 実際にはTrueにもFalseにもなり得る条件。エラーになるのはFalseになったとき
    a = 10
print(a)  # NameError: name 'a' is not defined

 関数の中で同様のことをやるとUnboundLocalErrorです。

def f():
    if False:
        a = 10
    print(a)  # UnboundLocalError: local variable 'a' referenced before assignment

f()

 この場合の対処法はNameErrorに準じます。if文の上でaに初期値を代入しておくか、else節を設けてaに適切な値を代入するかで解決します。

scikit-learnで重み付きk近傍法(Weighted kNN)を試してみる

はじめに

 k近傍法には、近傍点の重み付けをどうするかで複数のやり方が考えられます。普通のk近傍点では予測対象の点のkつの近傍点を取ってきて、そのクラスを単純に多数決します。一方で、より近い点にはより大きい重みを持たせるという発想もまた自然です。

 という訳で、scikit-learnのKNeighborsClassifierはweightsオプションを指定することで重み付けを加味したKNNモデルを作成できます。これはとても簡単なので、やってみようと思います。

sklearn.neighbors.KNeighborsClassifier — scikit-learn 0.21.3 documentation

しくみとやること

 weightsオプションには"uniform"と"distance"、または任意のcallableを渡せます。"uniform"は重みなし(普通のkNN、デフォルト)、"distance"は距離の逆数を重みにします。callableを渡す場合、距離行列を引数に取り、重み行列を返すようなcallableでなければなりません。たとえば、lambda x:1/xを指定することは(おそらく)"distance"を指定することと同じ意味を持ちます。

 今回はk=3, 5, 10の条件で、デフォルト(指定なし:"uniform")と"distance"を比べてみます。二次元の2クラスのデータに対して予測させることで、そのまま可視化します。

コード

 以前やった実験のコードを書き換えて使用しました。

君はKNN(k nearest neighbor)の本当のすごさを知らない - 静かなる名辞

 そもそものベースになっているのは、scikit-learnが公式で出している分類器の比較のためのコードです。下記ページで雰囲気を掴んでおくと読みやすいと思います。

Classifier comparison — scikit-learn 0.21.3 documentation

 今回の実験だとサンプル数が少ないほうが傾向がわかりやすいので、サンプル数50でやっています。

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap

from sklearn.datasets import make_moons, make_circles,\
    make_classification
from sklearn.neighbors import KNeighborsClassifier

def main():
    knn3 = KNeighborsClassifier(n_neighbors=3)
    knn3w = KNeighborsClassifier(n_neighbors=3, weights="distance")
    knn5 = KNeighborsClassifier(n_neighbors=5)
    knn5w = KNeighborsClassifier(n_neighbors=5, weights="distance")
    knn10 = KNeighborsClassifier(n_neighbors=10)
    knn10w = KNeighborsClassifier(n_neighbors=10, weights="distance")

    X, y = make_classification(
        n_samples=50, n_features=2, n_redundant=0, n_informative=2,
        random_state=1, n_clusters_per_class=1)
    rng = np.random.RandomState(2)
    X += 2 * rng.uniform(size=X.shape)
    linearly_separable = (X, y)

    datasets = [make_moons(n_samples=50, noise=0.3, random_state=0),
                make_circles(
                    n_samples=50, noise=0.2, factor=0.5, random_state=1),
                linearly_separable]

    fig, axes = plt.subplots(
        nrows=3, ncols=6, figsize=(32, 18))
    plt.subplots_adjust(wspace=0.2, hspace=0.3)

    cm = plt.cm.RdBu
    cm_bright = ListedColormap(['#FF0000', '#0000FF'])
    for i, (X, y) in enumerate(datasets):
        x_min = X[:, 0].min()-0.5
        x_max = X[:, 0].max()+0.5
        y_min = X[:, 1].min()-0.5
        y_max = X[:, 1].max()+0.5

        xx, yy = np.meshgrid(np.arange(x_min, x_max, 0.1),
                             np.arange(y_min, y_max, 0.1))

        for j, (cname, clf) in enumerate(
                [("KNN k=3", knn3), ("KNN k=3 weighted", knn3w),
                 ("KNN k=5", knn5), ("KNN k=5 weighted", knn5w),
                 ("KNN k=10", knn10), ("KNN k=10 weighted", knn10w)]):
            clf.fit(X, y)
            Z = clf.predict_proba(
                np.c_[xx.ravel(), yy.ravel()])[:, 1]
            Z = Z.reshape(xx.shape)
            axes[i,j].contourf(xx, yy, Z, 20, cmap=cm, alpha=.8)
            axes[i,j].scatter(X[:,0], X[:,1], c=y, s=20,
                              cmap=cm_bright, edgecolors="black")
            
            axes[i,j].set_title(cname)
    plt.savefig("result.png", bbox_inches="tight")

if __name__ == "__main__":
    main()

結果

 結果はこんな感じになりました。

result.png
result.png

 全体の傾向はさほど変わりませんが、

  • 重み付きの方が確率の階段がなめらか
  • ノイズっぽいサンプルに引っ張られる(過学習気味になる)。特に周囲に他のサンプルがないと引っ張られやすい

 という傾向が見られました。

考察

 距離の逆数で重みを付けて多数決することで、分離境界が少しなめらかになります。

 また、近くの点をより重視するので、kを小さくしたかのような効果があります。そのままでは過学習っぽくなってしまいますが、kを大きくして使うことで、汎化性能もある程度は確保できます。

 原理的には、より広い領域の上方をまったく捨てるのではなく多少は活用できることになります。そういうポリシーのもとで使うべきでしょう。

 また、"distance"を指定すると少し重み付けが効きすぎるかな? という気がするので、lambda x:x**(-1/2)を試してみるといった努力の方向性もありだと色々試してみて感じました。

 以前やったバギングと比べると計算が軽く、境界の確率がなめらかになるという意味では近い効果があります。

まとめ

 ということで気軽に指定できるので、kNNで精度を求める時(なにそれ)は"distance"も試してみる価値はあります。

【python】キーワード引数と可変長キーワード引数(kwargs)の競合によるエラー

はじめに

 既存の関数のwrapperを作るときなど、可変長キーワード引数を使いたいときがあります。

 これは通常のキーワード引数と併用できますが、稀に問題になることがあります。

関数定義のとき

 定義するときは割と単純で、問題も少ないです。

 以下のような例を考えます。

>>> def f(a, b=0, **kwargs):
...     print(a, b, kwargs)
... 

 a, bをキーワード引数として渡した場合、明示的に定義されている引数が吸い取ります。kwargsには与えられることはありません。

>>> f(a=10, b=20, c=30)
10 20 {'c': 30}

 逆に言うと明示的に定義されていなければキーワード引数として渡された引数はkwargsに入るので、この声質をうまく利用すると関数のwrapperが書きやすくなるときがあります。

アンパックで呼び出すとき

 辞書のアンパック展開によって引数を渡す場合、いささか問題を含んでいます。

 端的に言うと、辞書のキーと他の引数とが名前が被るとエラーになります。

>>> kwargs = {"a":20, "c":100}
>>> f(a=10, **kwargs)  # aの値として優先したいのは10
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: f() got multiple values for keyword argument 'a'

 こういう事態を避けるのは一見簡単そうな気もしますが、

  • 任意のkwargs(どこかからやってきた)でとりあえず無事に渡せるようにしておきたい
  • 引数aは(今この場所で)値を指定して使いたい

 というようなときは意外と困難があります。

 こういうときは、kwargsを渡さない訳にはいかないですから、aを指定するのをやめてあげます。代わりにkwargsの中に新しくaを入れます。

>>> kwargs = {"a":20, "c":100}
>>> kwargs_copy = kwargs.copy()  # 変更するのでコピーに対して操作する
>>> kwargs_copy["a"] = 10
>>> f(**kwargs_copy)
10 0 {'c': 100}

 たぶんこれでいいでしょう。

まとめ

 ということで、Pythonの引数周りは意外と面倒くさいんです。柔軟といえば柔軟ですが、破綻すれすれな感じもする。