Dreaming in Greater Boston

Effective Python一人輪読会(Item 52 to 74)

Table of Contents

1 Chapter 7: コンカレンシーと並列実行

1.1 Item 52: 子プロセスを管理するために subprosess を使え

Pythonから子プロセスを呼び出すシンプルな方法。

import subprocess

result = subprocess.run(
    ['echo', 'Hello from the child!'],
    capture_output=True,  # stdout/stderrをキャプチャーする
    encoding='utf-8')

result.check_returncode()
print(result.stdout)
>>>
Hello from the child!

Python 3.5で導入された subprocess.run は子プロセスの実行完了を待ちます。公式サイトによると、基本的にはこれを使うのが推奨だそうです。

subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, capture_output=False, shell=False, cwd=None, timeout=None, check=False, encoding=None, errors=None, text=None, env=None, universal_newlines=None, **other_popen_kwargs)

タイムアウトも指定できそうです。

次は、 subprocess.Popen を使ってブロックされずに子プロセスを10個起動したあと、 <process>.communicate で実行完了した各子プロセスを終了(terminate)させる処理です。

import subprocess
import time

start = time.time()
sleep_procs = []
for _ in range(10):
    proc = subprocess.Popen(['sleep', '1'])
    sleep_procs.append(proc)

time.sleep(0.3)

for proc in sleep_procs:
    proc.communicate()

end = time.time()
delta = end - start
print(f'Finished in {delta:.3} seconds')
>>>
Finished in 1.02 seconds

並列に実行するため10秒かからず、1秒強で終わっています。

次の例は、外部コマンドの openssl にランダムな10バイトのバイト列を暗号化させる処理です。

import subprocess
import os
def run_encrypt(data):
    env = os.environ.copy()
    env['password'] = 'start123'
    proc = subprocess.Popen(
	['openssl', 'enc', '-des3', '-pass', 'env:password'],
	env=env,
	stdin=subprocess.PIPE,
	stdout=subprocess.PIPE)
    proc.stdin.write(data)
    proc.stdin.flush()
    return proc

procs = []
for _ in range(3):
    data = os.urandom(10)  # ランダムな10バイトを生成
    proc = run_encrypt(data)
    procs.append(proc)

for proc in procs:
    out, _ = proc.communicate()
    print(out[-10:])  # 後ろから10バイトをスライス
>>>
b'\x0f\xbc4\x94O\x93\xa5G\xbe\xe3'
b'm\xb3\x89\r\xc9pP7\xdc\xeb'
b"\xda\x16z N=\x850v'"

結果は、ランダムなバイト列を暗号化したバイト列なので、意味は特にありません。

複数の外部コマンドを呼び出し、それらをパイプでつなぐこともできます。次の例で、 run_hashopenssl を使って入力バイト列のハッシュを求める関数です。 for 文以下では、100バイトのランダムな文字列を生成し、それからハッシュを求めることを3つのサブプロセスで並列実行します。 run_hash 関数呼び出しの引数に encrypt_proc.stdout を指定することで、これらの処理をパイプでつなげています。

def run_hash(input_stdin):
    return subprocess.Popen(
	['openssl', 'dgst', '-whirlpool', '-binary'],
	stdin=input_stdin,  # stdinを指定
	stdout=subprocess.PIPE)

encrypt_procs = []
hash_procs = []
for _ in range(3):
    data = os.urandom(100)  # ランダムな100バイトを生成

    encrypt_proc = run_encrypt(data)
    encrypt_procs.append(encrypt_proc)
    hash_proc = run_hash(encrypt_proc.stdout)  # stdoutを指定
    hash_procs.append(hash_proc)

    encrypt_proc.stdout.close()  # 閉じてしまってよい???
    encrypt_proc.stdout = None

for proc in encrypt_procs:
    proc.communicate()
    assert proc.returncode == 0

for proc in hash_procs:
    out, _ = proc.communicate()
    print(out[-10:])
    assert proc.returncode == 0
>>>
'\x99\xd8*\x15~\x88\xd4\x89\x1c3'
b'\x00\x87\xd3\x93Ti\x12v\x01\xaa'
b'\x1b\x85\xdf\x94z\x96\xd3\xb0\x91\x9a'

結果の文字列に特に意味はありません。

子プロセスが終わらない場合が気になるなら、タイムアウト値を指定することも出来ます。

import subprocess
proc = subprocess.Popen(['sleep', '10'])
try:
    proc.communicate(timeout=0.1)
except subprocess.TimeoutExpired:
    proc.terminate()
    proc.wait()

print('Exist status', proc.poll())
>>>
Exist status -15

タイムアウト例外が発生したら子プロセスを終わらせます。 proc.poll() でexit codeが得られるようです。

1.2 Item 53: ブロックするI/Oにはスレッドを使い、parallelismを避けよ

普通使うPythonはCPythonですが、CPythonはglobal interpreter lock (GIL)のために複数コアでの並列実行ができません。知りませんでした。衝撃的な事実。。。I/Oによる待ちが無ければ複数スレッド使っても実行時間は短縮されません。

更に、ネットワーク等の非同期I/Oでは、より効率の良いasyncio(後で出てきます)を使うことになるため、Pythonでのスレッドの出番はブロックする(ie, 非同期システムコールが無い)ディスクI/Oくらいしか無さそうです。。。というのは言い過ぎでした。キュー等でも使えますね。

Pythonでのスレッドの使い方例:

import select
import socket
import time
from threading import Thread
def slow_systemcall():
    select.select([socket.socket()],[],[],0.1)

start = time.time()
threads = []
for _ in range(5):
    thread = Thread(target=slow_systemcall)
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

end = time.time()
delta = end - start
print(f'Took {delta:.3f} seconds')
>>>
Took 0.103 seconds

1.3 Item 54: スレッド間のデータレースを避けるために Lock を使え

単一コアで動くマルチスレッドにもロックは必要という話。 Pythonでmutexを用意する例:

from threading import Lock
class LockingCounter:
    def __init__(self):
	self.lock = Lock()
	self.count = 0  # ロック対象
    def increment(self, offset):
	with self.lock:
	    self.count += offset

Lock クラスを使うと with でクリティカルリージョン(ie, ロック範囲)の指定ができるのが便利ですね。

おさらいです。mutexとbinary semaphoreは一見とても似ていますが、用途が違います。

  • mutexは資源の排他(ロック)のため
  • binary semaphoreはイベントが起きたことを通知(シグナル)するため

スピンロックと違って、両方とも待ちスレッドはスリープします。

1.4 Item 55: スレッド間のワークアサインの調整には Queue を使え

Queue クラスはパイプラインを実装するのに便利です。 Queue の getメソッドは新データが来るまでブロックするため、自前でbusyウエイトを実装する必要がありません。

from threading import Thread
from queue import Queue

my_queue = Queue()  # キューのクラスが用意されています

def consumer():
    print('Consumer waiting')
    my_queue.get()  # ブロックされます
    print('Consumer done')

thread = Thread(target=consumer)
thread.start()

print('Producer putting')
my_queue.put(object())
print('Producer done')
thread.join()
>>>
Consumer waiting  # アイテム(object)が入ってくるまで待ちます
Producer putting
Producer done
Consumer done

最初に consumer がキューに来た後、producerが put するまで consumer は動き出さないことがわかります。。

キューを作るときにキューバッファのサイズを指定することもできます。いくつのconsumerがキューに入れるかを示し、それ以上のconsumerが来ても put でブロックします。

from threading import Thread
from queue import Queue
import time

my_queue = Queue(1)  # バッファサイズが1

def consumer():
    time.sleep(0.1)  # まず0.1秒スリープする
    my_queue.get()
    print('Consumer got 1')
    my_queue.get()
    print('Consumer got 2')
    print('Consumer done')

thread = Thread(target=consumer)
thread.start()

my_queue.put(object())  # producerは立て続けに二つputしようとする
print('Producer put 1')
my_queue.put(object())  # ここでブロックする
print('Producer put 2')
print('Producer done')
thread.join()
>>>
Producer put 1  # 最初にproducerがputするのは前回と同じ
Consumer got 1  # 0.1秒待ってからgetする
Producer put 2  # consumerがgetして、やっとputから戻る
Producer done
Consumer got 2
Consumer done

この例でのポイントは put 2のメッセージが got 1の後に来ているところです。consumerスレッドはスタートしてからまず0.1秒スリープしますが、その間にメインスレッドのproducerはputできずにブロックされていることがわかります。

次に、 Queue.task_done() はそのキューに対してそのタスクが完了したことを宣言します。全てのタスクの完了を待つにはそのキューに対して Queue.join() を呼べばよく、それまでブロックされます。これはスレッドのjoinとは別なことに注意です。

キューのタスクが完了するというのは、そのキューからgetしてきた仕事(アイテム)を最後の1個まで、全て処理し終わったという意味です。

from threading import Thread
from queue import Queue
import time

in_queue = Queue()
def consumer():
    print('Consumer waiting')
    work = in_queue.get()
    print('Consumer working')
    time.sleep(1)  # この例でのタスクはスリープすること
    print('Consumer done')
    in_queue.task_done()  # タスク完了を宣言する

thread = Thread(target=consumer)
thread.start()

print('Producer putting')
in_queue.put(object())
print('Producer waiting')
in_queue.join()  # in_queueの完了(=task_doneが呼ばれる)までブロックされる
print('Producer done')
thread.join()
>>>
Consumer waiting
Producer putting
Producer waiting
Consumer working
# ここで1秒スリープする
Consumer done
Producer done

この例でのポイントはもちろん、consumer doneまでproducer doneが出ないところです。

さて、これらの知識を使ってパイプラインを実装します。パイプラインはdownload, resize, uploadの3ステージからなるとします。写真をカメラからダウンロードして、サイズを変えてまたアップロードする場合を想定しています。

from threading import Thread
from queue import Queue
import time

def download(item):
    pass
def resize(item):
    pass
def upload(item):
    pass

Queue を継承した ClosableQueue を定義します。これはメソッド close を持ち、キューにこれ以降の入力が無いことを示す sentinel を入れます。sentinel は歩哨・見張りの意味で、終わりの印です。Item 10で出てきましたね。

__iter__ を準備したことで、このキューを iterate することができます。 get() でキューから写真を取り出し、sentinel以外ならyieldして写真を返します。キューに何も入っていなかったら get() がブロックします。キューから取ってきたアイテムが写真でなくsentinelだったら、終わりの印なのでリターンしています。

class ClosableQueue(Queue):
    SENTINEL = object()
    def close(self):
	self.put(self.SENTINEL)
    def __iter__(self):
	while(True):
	    item = self.get()  # 写真を一つ取り出す。無かったらブロックする
	    try:
		if item is self.SENTINEL:
		    return
		yield item  # まだfinallyは実行しない
	    finally:
		self.task_done()  # この写真の処理が完了

ここのtry - finallyの使い方に注目します。exceptで例外処理を行わないtry - finallyは、tryブロックの中で何が起こったとしてもfinallyブロックで取得しているロックを解放する(後始末する)ようなユースケースで使うようです。

この例ではロックは使っていませんが、Queue.task_doneをロック解放、Queue.joinをロック解放待ちのアナロジーとして考えると、似たユースケースと言えそうです。

StoppableWorkerClosableQueue に対応した新ワーカースレッドです。スレッドは写真ではなくステージ(で作業する人)に対応します。 in_queue から写真(item)を取り出し、処理をして、処理後の写真(result)を out_queue に入れます。

class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
	super().__init__()
	self.func = func  # やる作業
	self.in_queue = in_queue
	self.out_queue = out_queue

    def run(self):
	for item in self.in_queue:  # queueをiterateする
	    result = self.func(item)
	    self.out_queue.put(result)

キューとスレッドを用意します。キューとキューの間にワーカー(スレッド)がいるイメージですね。

download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()
threads = [
    StoppableWorker(download, download_queue, resize_queue),
    StoppableWorker(resize, resize_queue, upload_queue),
    StoppableWorker(upload, upload_queue, done_queue),
    ]

最後にこれらをまとめます。SENTINELを投入する Queue.close() はここで呼んでいるのですね。

for thread in threads:
    thread.start()

for _ in range(1000):
    download_queue.put(object())  # object()=写真を入れる

download_queue.close()  # SENTINEL投入
download_queue.join()  # task_done()が呼ばれるまでここでブロックされる
resize_queue.close()  # SENTINEL投入
resize_queue.join()  # task_done()が呼ばれるまでここでブロックされる
upload_queue.close()  # SENTINEL投入
upload_queue.join()  # task_done()が呼ばれるまでここでブロックされる
print(done_queue.qsize(), 'items finished')
for thread in threads:
    thread.join()
>>>
1000 items finished

あれ、まだ終わりじゃありませんでした。。。次は、ステージ毎に複数のワーカースレッドを用意してI/Oの並列度を上げることを考えます。

まず、複数スレッドをスタート、ストップさせるヘルパー関数を用意します。 start_threads 関数では引数 count の数だけ StoppableWorker スレッドを作ってスタートし、そのリストを返します。 stop_threads 関数ではキューの close を呼んでsentinelを投入し、キューの join でタスクの完了を待ってからスレッドを完了させます。

def start_threads(count, *args):
    threads = [StoppableWorker(*args) for _ in range(count)]
    for thread in threads:
	thread.start()  # スレッドをスタートさせる
    return threads

def stop_threads(closable_queue, threads):
    for _ in threads:
	closable_queue.close()  # SENTINEL投入
    closable_queue.join()  # 全てのtask_done()を待ち、キューをクローズする
    for thread in threads:
	thread.join()  # 全てのスレッドの完了を待つ

最後にこれらをまとめます。ダウンロードスレッドは3多重、リサイズは4多重、アップロードは5多重を指定してスレッドを作成しています。後は1000個の写真を投入し、スレッドを1種類ずつ止めていきます。ポイントは、 stop_threads はsentinelを投入し、それが出てくるまでブロックするところでしょうか。このお陰で、後片付けが中途半端な状態で次の stop_threads に行くことはありません。

download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()

download_threads = start_threads(
    3, download, download_queue, resize_queue)  # ダウンロードは3多重
resize_threads = start_threads(
    4, resize, resize_queue, upload_queue)  # リサイズは4多重
upload_threads = start_threads(
    5, upload, upload_queue, done_queue)  # アップロードは5多重

for _ in range(1000):  # 1000個(枚)の写真を投入
    download_queue.put(object())

stop_threads(download_queue, download_threads)  # 完了待ちする
stop_threads(resize_queue, resize_threads)
stop_threads(upload_queue, upload_threads)

print(done_queue.qsize(), 'items finished')
>>>
1000 items finished

1.5 Item 56: いつコンカレンシーが必要になるかをどう理解するか知れ

あるワークを、コンカレントに実行できるものにばらまくことを fan-out、ばらまいたものを回収することを fan-inと言うそうです。Pythonにはこれらを実現するツールがたくさんあって、それぞれトレードオフがあります。次の節以降で説明していきます。

1.6 Item 57: オンデマンドのfan-outで新たなスレッドインスタンスを作るのは避けろ

ダイナミックにfan-out/fan-inを繰り返すような用途や、非常に多くにfan-outするケースにはスレッドは合いません。

  • 1スレッドあたり8MBのメモリを消費する
  • スレッドの作成、開始、ロックなどでオーバーヘッドが大きい
  • 複雑になりデバッグが大変

1.7 Item 58: コンカレンシーのためにQueueを使うにはリファクタリングが必要なことを理解せよ

Queueを使うとスレッド数はワーカーの数に限定されるので、上限を定めないスレッドよりはマシですが、仕組みが複雑なことと、仕様変更によっては大きなリファクタリングが必要になるため、よい方法とは言えません。

1.8 Item 59: コンカレンシーでスレッドが必要なら ThreadPoolExecutorを検討せよ

スレッドプールはなかなか良さそうです。例外を呼び元に伝搬する仕組みもあります。ただ、 max_workers をあらかじめ決めておく必要があることがネックです

公式サイトから実装例です。

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
	'http://www.cnn.com/',
	'http://europe.wsj.com/',
	'http://www.bbc.co.uk/',
	'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
	return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # {future: url, ...}の辞書がfuture_to_urlに入ります。
    # futureはそのcallableの実行を表すオブジェクトです。
    # ...この場合はワーカースレッドですね。
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    # as_completedはfuture_to_urlのfuturesの完了(またはキャンセル)した
    # インスタンスのiteratorを返します。それをiterateしてfuture
    # = スレッドを得ます
    for future in concurrent.futures.as_completed(future_to_url):
	url = future_to_url[future]
	try:
	    data = future.result()
	except Exception as exc:
	    print('%r generated an exception: %s' % (url, exc))
	else:
	    print('%r page is %d bytes' % (url, len(data)))
>>>
'http://www.foxnews.com/' page is 323006 bytes
'http://www.cnn.com/' page is 1131345 bytes
'http://europe.wsj.com/' generated an exception: HTTP Error 404: Not Found
'http://some-made-up-domain.com/' page is 64668 bytes
'http://www.bbc.co.uk/' page is 300118 bytes

上の例で、 ThreadPoolExecutor によるスレッドプールを executor としています。次の行で URLS リストに入っているURLに対して、複数のスレッドで関数 load_url を実行するようにfan-outしています。そして次の行の for 文で完了したスレッドを刈り取っています(fan-in)。スレッド内で発生した例外は、呼び元で future.result() を呼んで結果を刈り取る時に伝わるようです。意外と簡単に使えるのですね。

実際の実行結果で例外が発生したのはsome-made-up-domainではなくeurope.wsj.comの方だったのが笑えます。ブラウザーで見ると、前者はドメインが売りに出されており、後者はwsj.comにフォワードされました。

1.9 Item 60: コルーチン(Coroutines)を使って高コンカレントI/Oを実現せよ

Asyncronus I/Oです。これは1スレッド内で、スレッドとは異なる仕組みを使ってコンテキストスイッチを行います。スレッドはOSカーネルの仕組みを使って、プリエンプティブにコンテキストスイッチしますが、Async I/Oでは長い待ちが発生する時(eg, ネットワーク待ち)に自発的に処理の実行を明け渡します。Async I/OはCPUネックの処理では効果がありません。Async I/Oの仕組みはスレッドよりもずっと軽く、数千のコンテキストをコンカレントに処理することができます。

PythonのAsynchronous I/Oについては、Real Pythonのこの記事が最新(Python 3.7)の情報を元に、わかりやすく詳細に解説しています。PythonのAsynchronous I/Oの仕組みはまだ整備されている途中であり、ネット上には古い情報が多く混乱しがちですが、この記事は情報を整理する意味でもお勧めです。

Item 33にてジェネレーターとコルーチンについて書きました。コルーチンはジェネレーターの yield 等の仕組みを使って、ルーチンの途中で他のコルーチンにコンテキストスイッチを行い、後で再び中断した行から処理を再開することができます。

最新のPythonではジェネレーターは表に出ず、新たに導入した async/await のシンタックスを使ってコルーチンを使います。Pythonでは、 async def で定義された関数がコルーチンです。コルーチン内の await 文でコンテキストスイッチを行います。ジェネレーターのyield文がそうであったように、コルーチンではawait文のところから、以前のコンテキストのまま処理が再開されます。(実は、 awaityield from と等価だそうです)

import asyncio
async def some_coroutine():
    ...
    await slow_io_disk_read()
    ...
    await slow_io_network_transfer()
    ...

上記のコルーチンの例では、 await slow_io_disk_read(), await slow_io_network_transfer() で待ちが発生し、別のコルーチンにコンテキストスイッチします。

以下にコルーチンの基本的な実装パターンを示します。(Real Pythonの記事より)

async def count():
    print("One")
    await asyncio.sleep(1)
    print("Two")

async def main():
    # コルーチンcount()を3つ実行する
    await asyncio.gather(count(), count(), count())

if __name__ == "__main__":
    import time
    s = time.perf_counter()
    # コルーチンメイン関数実行(完了を待つ)
    asyncio.run(main())
    elapsed = time.perf_counter() - s
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")

asyncio.run でコルーチンの main 関数を起動し、 mainawait asyncio.gather からコルーチン count を3つ起動〜回収します。 asyncio.run はこれらが全て実行完了するまでブロックされて待ちます。

コルーチンの main を用意し、そこから個別のコルーチンを複数起動するやり方です。

asyncio.run 関数はPython 3.7で導入され、コルーチンを起動する標準の方法になりました。 run 1行でイベントループを生成、タスクを起動〜完了〜回収、イベントループのクローズまで行います。以下の古い書式と同じ事をします。

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

run のお陰でイベントループを意識する必要が無く、使いやすくなりました。(main 関数も隠蔽してくれるともっとすっきりする気がしますが)

1.10 Item 61: スレッド化されたI/Oをasyncioにポートする方法を知れ

主な作業

  • I/O待ちの発生する箇所に await を付ける。
  • 待ちの発生する箇所を含む関数やfor, with等のブロックに async を付ける
  • 関数名、クラス名を修正する
  • asyncioのビルトインモジュールを使う
  • スレッドの仕組みは全て置き換える

というところでしょうか。まだasyncioに未対応のモジュールもあることに注意。

1.11 Item 62: asyncioへの移行を楽にするため、スレッドとコルーチンを混在させよ

asyncioへの移行はblocking I/Oには効果がありません。例えばディスクからリードするシステムコール read は完了までスリープせずにブロックされるため、別タスクにコンテキストスイッチをする機会がありません。blocking I/Oにはスレッドが有効です。

用途によってasyncioとスレッドを使い分ける(混在させる)ことが必要です。

1.12 Item 63: レスポンスを最大化するためにはasyncioのイベントループをブロックするな

以下の例のように、コルーチン用のイベントループ内でblocking I/Oをするとよくありません。

async def run_tasks(handles, interval, output_path):
    with open(output_path, 'wb') as output:
	async def write_async(data):
	    output.write(data)  # ブロックされるI/O
	tasks = []
	for handle in handles:
	    coro = tail_async(handle, interval, write_async)
	    task = asyncio.create_task(coro)
	    tasks.append(task)
	await asyncio.gather(*tasks)

解決策として、ファイル操作を別スレッドとして独立させます。

async def run_fully_async(handles, interval, output_path):
    async with WriteThread(output_path) as output:
	tasks = []
	for handle in handles:
	    coro = tail_async(handle, interval, output.write)
	    task = asyncio.create_task(coro)
	    tasks.append(task)
	await asyncio.gather(*tasks)

そしてそのために、スレッドのクラスを async with 文で扱えるように aenter, aexit を用意します(PEP 492)。このスレッドの使い方は便利そうです。

ところで、ここではファイル操作系をスレッドとして独立させていますが、aiofilesを使えば、ファイル操作をasync化できそうです。

async with aiofiles.open('filename', mode='r') as f:
    contents = await f.read()
print(contents)
'My file contents'

aiofilesのドキュメントを見ると、ファイル操作を別のスレッドプールにdelegateするとあります。

aiofiles helps with this by introducing asynchronous versions of files that support delegating operations to a separate thread pool.

こういうライブラリを使うのと、自分でスレッドを作るのと、どちらがいいのでしょうね。。

1.13 Item 64: 真の並行動作にはconcurrent.futuresを検討せよ

Pythonのglobal interpreter lock (GIL)のせいで、マルチコアを使った真の並行動作は簡単には実現できません。Cエクステンションは高速化には適していますが、大きなコストがかかります。通常、遅くなる原因は多くの場所にあり、一部だけエクステンションとして抜き出して高速化する訳にはいかないようです。

concurrent.futures ビルトインモジュール経由でアクセスできる multiprocessing ビルトインモジュールが使えるかもしれません。利用する側は ThreadPoolExecutor の代わりに ProcessPoolExecutor で置き換えるだけでよいです。

ただしこれは、自プロセスと子プロセスの間のデータのやりとりでpickleを使ったバイナリエンコード・デコードが必要で、オーバーヘッドが馬鹿になりません。よって ProcessPoolExecutor で効果があるのは、プロセス間のデータ転送量及び頻度が少ない場合に限られます。

multiprocessing は共有メモリやプロセス間のロック、キュー、プロキシーといったより高度な手段を提供してはいますがが、これらは非常に複雑だそうです。

こんなところでPythonの限界が見えてきてしまいました。。。(インタプリター言語に何を求めているのか、という話もありますが)

2 Chapter 8: 堅牢性(robustness)と性能

2.1 Item 65: try/except/else/finallyで各ブロックを有効に使え

try/except/else/finally ブロックを整理します。

def some_func():
    # 例えばファイルをオープンする処理
    # ここでの例外はすぐに呼び元に上がる
    try:
	# 例外が上がる可能性のあるオペレーション
    except ZeroDivisionError as e:
	# 想定した例外が上がった場合
    else:
	# tryで例外が上がらなかった場合
	# ここでの例外は呼び元に伝搬する
    finally:
	# (tryに来ていたら)関数がリターンする前に必ず実行される
	# 例えばファイルのクローズ処理

2.2 Item 66: try/finally挙動を再利用するためにcontextlibとwithステートメントを考えよ

@contextmanager デコレーターで修飾した関数はコンテキストマネージャーとなり、 with ステートメントで使えるようになります。正式に __enter__, __exit__ を準備するよりも簡単です。

from contextlib import contextmanager
import logging

@contextmanager
def debug_logging(level):
    logger = logging.getLogger()
    old_level = logger.getEffectiveLevel()
    logger.setLevel(level)  # 一時的に指定ログレベルを設定
    try:
	yield
    finally:
	logger.setLevel(old_level)  # ログレベルを戻す

上記関数では(一時的に) level にデバッグレベルを変更します。

def my_function():
    logging.debug('Some debug data')  # DEBUG
    logging.error('Error log here')  # ERROR 
    logging.debug('More debug data')  # DEBUG

with debug_logging(logging.DEBUG):  # DEBUGレベルのブロック
    print('* Inside:')
    my_function()

print('* After:')
my_function()
>>>
​* Inside:
DEBUG:root:Some debug data  # DEBUGレベルが表示されている
ERROR:root:Error log here
DEBUG:root:More debug data  # DEBUGレベルが表示されている
​* After:
ERROR:root:Error log here

上記 with ブロックはデバッグレベルをDEBUGにします。出力結果から、実際にwithブロックでのみDEBUGレベルのメッセージ出力されていることがわかります。

下の例で示すように、 with ステートメントに渡されるコンテキストマネージャーは yield でオブジェクトを返すことができ、 as でローカル変数に入ります。これによって、 with ブロック内のコードがそのコンテキストと直接interactできます。

from contextlib import contextmanager
import logging

@contextmanager
def log_level(level, name):
    logger = logging.getLogger(name)
    old_level = logger.getEffectiveLevel()
    logger.setLevel(level)
    try:
	yield logger  # コンテキストloggerを返す
    finally:
	logger.setLevel(old_level)

logging.basicConfig()
with log_level(logging.DEBUG, 'my-log') as logger:
    logger.debug(f'This is a message for {logger.name}!')
    logging.debug('This will not print')
>>>
DEBUG:my-log:This is a message for my-log!

with ブロック内で logger.debug のメッセージは表示されましたが、 logging.debug は表示されていません。なお、本には載っていませんが、 logging.basicConfig() を呼ばないと logger.debug の方も表示されませんでした。。。

2.3 Item 67: ローカル時間にはtimeの代わりにdatetimeを使え

timeはUTCとローカルしか扱えないため、datatimeを使うべきです。pytzというコミュニティーが作っているライブラリを使うと世界中の時間が使えます。datetimeでは一度UTCに変換してから時間操作を行います。

from datetime import datetime, timezone
import pytz
time_format = '%Y-%m-%d %H:%M:%S'
arrival_bos = '2020-08-29 10:01:00'
bos_dt_native = datetime.strptime(arrival_bos, time_format)
edt = pytz.timezone('US/Eastern')
bos_dt = edt.localize(bos_dt_native)  # datetime形式のボストン時間
utc_dt = pytz.utc.normalize(bos_dt.astimezone(pytz.utc))  # UTCに変換
print(utc_dt)
>>>
2020-08-29 14:01:00+00:00

UTCを日本時間に変換します。

jst = pytz.timezone('Asia/Tokyo')
tokyo_dt = jst.normalize(utc_dt.astimezone(jst))  # JSTに変換
print(tokyo_dt)
>>>
2020-08-29 23:01:00+09:00

このインタフェースだと、いったんUTCに変換しなくてもいいような??? 単なるタイムゾーン間のコンバートならば、それでもいいかもしれません。

2.4 Item 68: copyregでpickleをreliableにせよ

Pythonでデータをシリアライズする場合、

  • Python以外とデータ共有する場合はjson, xmlを使う
  • Pythonとデータ共有する場合はpickleを使う

ことになると思います。

pickleにcopyregを組み合わせると、以下のような場合に対応できるようになります。

  • pickleしたクラスのメンバーアトリビュートが追加された
  • pickleしたクラスのメンバーアトリビュートが削除された
  • pickleしたクラス名が変更になった

copyregは、pickle及びunpickleする時に呼ばれる関数を指定することで、そこでクラスのアトリビュート追加・削除の面倒を見ます。また、copyregを使うとクラス名がシリアライズされたデータに含まれないようになるため、クラス名の変更に対応できます。

以下の例ではGameStateクラスをpickleすることを考えます。copyregを使うためのヘルパー関数を、pickle用とunpickle用の二つ用意します。pickle用の pickle_game_state はunpickle用の unpickle_game_state を引数の kwargs とセットで返します。このためcopyregにはpickle用の関数(とpickleするクラス名)だけを登録すればよいです。 copyreg.pickle で pickle 用の関数を登録しています。

import pickle
import copyreg
class GameState:
    def __init__(self, level=0, lives=4, points=0):
	self.level = level
	self.lives = lives
	self.points = points

def pickle_game_state(game_state):  # pickle.dumpsすると呼ばれる
    print("pickling")
    kwargs = game_state.__dict__
    return unpickle_game_state, (kwargs,)  # unpickle用関数を返す

def unpickle_game_state(kwargs):  # pickle.loadsすると呼ばれる
    print("unpickling")
    return GameState(**kwargs)

copyreg.pickle(GameState, pickle_game_state)  # pickle用関数を登録する

pickleしてみましょう。

state = GameState()
state.points += 1000
print("call pickling")
serialized = pickle.dumps(state)
print("call unpickling")
state_after = pickle.loads(serialized)
print(state_after.__dict__)
>>>
call pickling
pickling  # pickle_game_stateで表示している
b'\x80\x04\x95L\x00\x00\x00\x00\x00\x00\x00\x8c\x08__main__\x94\x8c\x13unpickle_game_state\x94\x93\x94}\x94(\x8c\x05level\x94K\x00\x8c\x05lives\x94K\x04\x8c\x06points\x94M\xe8\x03u\x85\x94R\x94.'
call unpickling
unpickling  # unpickle_game_stateで表示している
{'level': 0, 'lives': 4, 'points': 1000}

実際に、pickleする際に pickle_game_state が、unpickleする際に unpickle_game_state が呼ばれていることがわかります。

クラスへのアトリビュートの追加に対応できるのは、 unpickle_game_state で GameState のインスタンスを作る際に、(追加後のクラスの)コンストラクターを呼ぶためです。この中で追加したアトリビュートのデフォルト値が設定されます。

pickleするクラスのアトリビュートを削除するとbackward compatibilityが保てなくなります。この場合はバージョンを指定し、古いバージョンなら明示的に不要となったアトリビュートを削除します。

def pickle_game_state(game_state):
    kwargs = game_state.__dict__
    kwargs['version'] = 2
    return unpickle_game_state, (kwargs,)

def unpickle_game_state(kwargs):
    version = kwargs.pop('version', 1)
    if version == 1:  # 読んできたバージョンが1なら古いアトリビュートがある
	del kwargs['deleted_attribute']
    return GameState(**kwargs)

2.5 Item 69: 精度が重要なら decimal を使え

Pythonのfloatの扱いで、1.44999999…のようになる場合があります。 これをきちっと1.45と見せたい場合、decimalを使うとよい、という話です。

まずは、Decimalに小数点の付いた値を渡す際には文字列を使うと正確です。

>>> from decimal import Decimal
>>> print(Decimal('1.45'))  # 文字列渡し
1.45
>>> print(Decimal(1.45))  # float渡し
1.4499999999999999555910790149937383830547332763671875

お金の計算などで四捨五入してゼロにされると困るような場合にも対応可能です。

from decimal import Decimal, ROUND_UP
rate = Decimal('0.05')
seconds = Decimal('5')
small_cost = rate * seconds / Decimal(60)
print("実の値 - ", small_cost)
print("四捨五入 - ", round(small_cost, 2))
rounded = small_cost.quantize(Decimal('0.01'), rounding=ROUND_UP)
print("切り上げ - ", rounded)
>>>
実の値 -  0.004166666666666666666666666667
四捨五入 -  0.00
切り上げ -  0.01

2.6 Item 70: 最適化の前にプロファイルせよ

プロファイルにはCで書かれたcProfileを使うとよい、とのことです。 test 関数をプロファイルして、統計を出す例:

from cProfile import Profile
profiler = Profile()
profiler.runcall(test)

from pstats import Stats
stats = Stats(profiler)
stats.strip_dirs()
stats.sort_stats('cumulative')
stats.print_stats()
>>>
	 30003 function calls in 0.026 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
	1    0.000    0.000    0.026    0.026 t.py:17(<lambda>)
	1    0.002    0.002    0.026    0.026 t.py:1(insertion_sort)
    10000    0.003    0.000    0.024    0.000 t.py:9(insert_value)
    10000    0.016    0.000    0.016    0.000 {method 'insert' of 'list' objects}
    10000    0.005    0.000    0.005    0.000 {built-in method _bisect.bisect_left}
	1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}

2.7 Item 71: 生産者-消費者キューにはdequeを使え

普通のリスト [] はキューに使えますが、キューが長くなると特に pop(0) が2乗のオーダーで遅くなるそうです。collectionsのdequeはリニアに遅くなるだけなので、キュー操作が性能ネックとなっているようなら、こちらを使った方が良いです。

マイクロベンチマーク timeit の使い方。

import collections
import timeit
def print_result(count, tests):
    avg_iteration = sum(tests) / len(tests)
    print(f'Count {count:>5,} takes {avg_iteration:.4f}s')
def deque_append_benchmark(count)
    def prepare():
	return collections.deque()
    def run(queue):
	for i in range(count):
	    queue.append(i)
    tests = timeit.repeat(
	setup='queue = prepare()',
	stmt='run(queue)',
	globals=locals(),
	repeat=1000
	number=1)
    return print_result(count, tests)

注意点として、timeitはループするのでキャッシュされて速くなってしまうようなことには使えません。

2.8 Item 72: ソートされたシーケンス内をサーチするにはbisectを使え

ソートされているリストなどの中で、指定の値がどこに来るかを調べるのに、 bisect_left は速いのでいいですよ、ということでした。

2.9 Item 73: 優先度キューのために heapq をどう使うかを知れ

FIFOでない、何かのアトリビュートの順番で処理する必要のあるキューを優先度キュー、プライオリティーキューと言います。優先度キューの実装に heapq が使えます。

headqのアイテムは比較可能でnatural sort orderを持たなくてはいけません。これにはfunctoolsビルトインモジュールの total_ordering クラスデコレーターを使い、 __lt__ (less than)スペシャルメソッドを実装する必要があります。

以降は図書館の貸し出し本を管理する例です。キュー内の操作はコストがかかるため、なるべくキューには手を付けないようにします。本の返却は通常、キューから抜く操作が必要になりますが、ここではキューの中の返却された本には返却マークを付けるのみで、キューそのものは変更しないようにしています。

import functools
@functools.total_ordering
class Book:
    def __init__(self, title, due_date):
	self.title = title
	self.due_date = due_date
	self.returnd = False  # 返却フラグ
    def __lt__(self, other):
	return self.due_date < other.due_date  # 返却日で比較する

due_dateによるキューのソートは以下のようにできます。

from heapq import heapify
queue = [
    Book('Pride and Prejudice', '2019-06-10'),
    Book('The Time Machine', '2019-05-30'),
    ...
    ]
queue.sort()
または
heapify(queue)

次は、期限切れの本を表示します。返却済みの本が出てきたら、ひっそりとキューから抜きます。

from heapq import heappop
class NoOverdueBooks(Exception):
    pass
def next_overdue_book(queue, now):
    while queue:
	book = queue[0]  # Most overdue first
	if book.returned:  # 返却済みなら、、
	    heappop(queue)
	    continue
	if book.due_date < now:  # 期限切れなら
	    heappop(queue)
	    return book
	break

    raise NoOverdueBooks

本の返却処理です。

def return_book(queue, book):
    book.returned = True

このやり方の欠点は、返却された本をキューから抜かないため、キューが大きくなりうることです。これはメモリを圧迫します。ワーストケースを想定して必要メモリ量等のシステム設計をする必要があります。

2.10 Item 74: bytesとゼロコピーでinteractするにはmemoryviewとbytearrayを使え

出ました。ゼロコピーです。Pythonのバッファープロトコルとゼロコピーについては、ここにわかりやすい説明がありました。

bytesのデータを直接スライスするとメモリコピーが発生します。ビルトインの memoryview タイプを使うとゼロコピーで行けます。

data = b'shave and a haircut, two bits'
view = memoryview(data)
chunk = view[12:19]
print(type(chunk))
print('Size: ', chunk.nbytes)
print('Data in view', chunk.tobytes())
print('Underlying data:', chunk.obj)
>>>
<class 'memoryview'>
Size:  7
Data in view b'haircut'
Underlying data: b'shave and a haircut, two bits'

memoryview をスライスした chunk のタイプはmemoryviewであることがわかります。

bytesはリードオンリーのため、スライスした部分を上書きしたいなら bytearray を使います。

my_array = bytearray(b'row, row, row your boat')
my_view = memoryview(my_array)
write_view = my_view[3:13]
write_view[:] = b'1234567890'
print(my_array)
>>>
bytearray(b'row1234567890 your boat')

socket.recv_intoはゼロコピーに対応します。ビデオストリーミングデータを受け取る場合の例。

socket = ... # クライアントへのソケットコネクション
video_cache = ... # 入ってくるビデオストリーム用のキャッシュ
byte_offset = ... # バッファ上にデータが入ってくる位置
size = 1024 * 1024 # 入ってくるデータのチャンクサイズ

video_array = bytearray(video_cache)
write_view = memoryview(video_array)
chunk = write_view[byte_offset:byte_offset + size]
socket.recv_into(chunk)