Table of Contents
- 1. Chapter 7: コンカレンシーと並列実行
- 1.1. Item 52: 子プロセスを管理するために subprosess を使え
- 1.2. Item 53: ブロックするI/Oにはスレッドを使い、parallelismを避けよ
- 1.3. Item 54: スレッド間のデータレースを避けるために Lock を使え
- 1.4. Item 55: スレッド間のワークアサインの調整には Queue を使え
- 1.5. Item 56: いつコンカレンシーが必要になるかをどう理解するか知れ
- 1.6. Item 57: オンデマンドのfan-outで新たなスレッドインスタンスを作るのは避けろ
- 1.7. Item 58: コンカレンシーのためにQueueを使うにはリファクタリングが必要なことを理解せよ
- 1.8. Item 59: コンカレンシーでスレッドが必要なら ThreadPoolExecutorを検討せよ
- 1.9. Item 60: コルーチン(Coroutines)を使って高コンカレントI/Oを実現せよ
- 1.10. Item 61: スレッド化されたI/Oをasyncioにポートする方法を知れ
- 1.11. Item 62: asyncioへの移行を楽にするため、スレッドとコルーチンを混在させよ
- 1.12. Item 63: レスポンスを最大化するためにはasyncioのイベントループをブロックするな
- 1.13. Item 64: 真の並行動作にはconcurrent.futuresを検討せよ
- 2. Chapter 8: 堅牢性(robustness)と性能
- 2.1. Item 65: try/except/else/finallyで各ブロックを有効に使え
- 2.2. Item 66: try/finally挙動を再利用するためにcontextlibとwithステートメントを考えよ
- 2.3. Item 67: ローカル時間にはtimeの代わりにdatetimeを使え
- 2.4. Item 68: copyregでpickleをreliableにせよ
- 2.5. Item 69: 精度が重要なら decimal を使え
- 2.6. Item 70: 最適化の前にプロファイルせよ
- 2.7. Item 71: 生産者-消費者キューにはdequeを使え
- 2.8. Item 72: ソートされたシーケンス内をサーチするにはbisectを使え
- 2.9. Item 73: 優先度キューのために heapq をどう使うかを知れ
- 2.10. Item 74: bytesとゼロコピーでinteractするにはmemoryviewとbytearrayを使え
1 Chapter 7: コンカレンシーと並列実行
1.1 Item 52: 子プロセスを管理するために subprosess を使え
Pythonから子プロセスを呼び出すシンプルな方法。
import subprocess result = subprocess.run( ['echo', 'Hello from the child!'], capture_output=True, # stdout/stderrをキャプチャーする encoding='utf-8') result.check_returncode() print(result.stdout) >>> Hello from the child!
Python 3.5で導入された subprocess.run
は子プロセスの実行完了を待ちます。公式サイトによると、基本的にはこれを使うのが推奨だそうです。
subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, capture_output=False, shell=False, cwd=None, timeout=None, check=False, encoding=None, errors=None, text=None, env=None, universal_newlines=None, **other_popen_kwargs)
タイムアウトも指定できそうです。
次は、 subprocess.Popen
を使ってブロックされずに子プロセスを10個起動したあと、 <process>.communicate
で実行完了した各子プロセスを終了(terminate)させる処理です。
import subprocess import time start = time.time() sleep_procs = [] for _ in range(10): proc = subprocess.Popen(['sleep', '1']) sleep_procs.append(proc) time.sleep(0.3) for proc in sleep_procs: proc.communicate() end = time.time() delta = end - start print(f'Finished in {delta:.3} seconds') >>> Finished in 1.02 seconds
並列に実行するため10秒かからず、1秒強で終わっています。
次の例は、外部コマンドの openssl にランダムな10バイトのバイト列を暗号化させる処理です。
import subprocess import os def run_encrypt(data): env = os.environ.copy() env['password'] = 'start123' proc = subprocess.Popen( ['openssl', 'enc', '-des3', '-pass', 'env:password'], env=env, stdin=subprocess.PIPE, stdout=subprocess.PIPE) proc.stdin.write(data) proc.stdin.flush() return proc procs = [] for _ in range(3): data = os.urandom(10) # ランダムな10バイトを生成 proc = run_encrypt(data) procs.append(proc) for proc in procs: out, _ = proc.communicate() print(out[-10:]) # 後ろから10バイトをスライス >>> b'\x0f\xbc4\x94O\x93\xa5G\xbe\xe3' b'm\xb3\x89\r\xc9pP7\xdc\xeb' b"\xda\x16z N=\x850v'"
結果は、ランダムなバイト列を暗号化したバイト列なので、意味は特にありません。
複数の外部コマンドを呼び出し、それらをパイプでつなぐこともできます。次の例で、 run_hash
は openssl
を使って入力バイト列のハッシュを求める関数です。 for
文以下では、100バイトのランダムな文字列を生成し、それからハッシュを求めることを3つのサブプロセスで並列実行します。 run_hash
関数呼び出しの引数に encrypt_proc.stdout
を指定することで、これらの処理をパイプでつなげています。
def run_hash(input_stdin): return subprocess.Popen( ['openssl', 'dgst', '-whirlpool', '-binary'], stdin=input_stdin, # stdinを指定 stdout=subprocess.PIPE) encrypt_procs = [] hash_procs = [] for _ in range(3): data = os.urandom(100) # ランダムな100バイトを生成 encrypt_proc = run_encrypt(data) encrypt_procs.append(encrypt_proc) hash_proc = run_hash(encrypt_proc.stdout) # stdoutを指定 hash_procs.append(hash_proc) encrypt_proc.stdout.close() # 閉じてしまってよい??? encrypt_proc.stdout = None for proc in encrypt_procs: proc.communicate() assert proc.returncode == 0 for proc in hash_procs: out, _ = proc.communicate() print(out[-10:]) assert proc.returncode == 0 >>> '\x99\xd8*\x15~\x88\xd4\x89\x1c3' b'\x00\x87\xd3\x93Ti\x12v\x01\xaa' b'\x1b\x85\xdf\x94z\x96\xd3\xb0\x91\x9a'
結果の文字列に特に意味はありません。
子プロセスが終わらない場合が気になるなら、タイムアウト値を指定することも出来ます。
import subprocess proc = subprocess.Popen(['sleep', '10']) try: proc.communicate(timeout=0.1) except subprocess.TimeoutExpired: proc.terminate() proc.wait() print('Exist status', proc.poll()) >>> Exist status -15
タイムアウト例外が発生したら子プロセスを終わらせます。 proc.poll()
でexit codeが得られるようです。
1.2 Item 53: ブロックするI/Oにはスレッドを使い、parallelismを避けよ
普通使うPythonはCPythonですが、CPythonはglobal interpreter lock (GIL)のために複数コアでの並列実行ができません。知りませんでした。衝撃的な事実。。。I/Oによる待ちが無ければ複数スレッド使っても実行時間は短縮されません。
更に、ネットワーク等の非同期I/Oでは、より効率の良いasyncio(後で出てきます)を使うことになるため、Pythonでのスレッドの出番はブロックする(ie, 非同期システムコールが無い)ディスクI/Oくらいしか無さそうです。。。というのは言い過ぎでした。キュー等でも使えますね。
Pythonでのスレッドの使い方例:
import select import socket import time from threading import Thread def slow_systemcall(): select.select([socket.socket()],[],[],0.1) start = time.time() threads = [] for _ in range(5): thread = Thread(target=slow_systemcall) thread.start() threads.append(thread) for thread in threads: thread.join() end = time.time() delta = end - start print(f'Took {delta:.3f} seconds') >>> Took 0.103 seconds
1.3 Item 54: スレッド間のデータレースを避けるために Lock を使え
単一コアで動くマルチスレッドにもロックは必要という話。 Pythonでmutexを用意する例:
from threading import Lock class LockingCounter: def __init__(self): self.lock = Lock() self.count = 0 # ロック対象 def increment(self, offset): with self.lock: self.count += offset
Lock
クラスを使うと with
でクリティカルリージョン(ie, ロック範囲)の指定ができるのが便利ですね。
おさらいです。mutexとbinary semaphoreは一見とても似ていますが、用途が違います。
- mutexは資源の排他(ロック)のため
- binary semaphoreはイベントが起きたことを通知(シグナル)するため
スピンロックと違って、両方とも待ちスレッドはスリープします。
1.4 Item 55: スレッド間のワークアサインの調整には Queue を使え
Queue
クラスはパイプラインを実装するのに便利です。 Queue
の getメソッドは新データが来るまでブロックするため、自前でbusyウエイトを実装する必要がありません。
from threading import Thread from queue import Queue my_queue = Queue() # キューのクラスが用意されています def consumer(): print('Consumer waiting') my_queue.get() # ブロックされます print('Consumer done') thread = Thread(target=consumer) thread.start() print('Producer putting') my_queue.put(object()) print('Producer done') thread.join() >>> Consumer waiting # アイテム(object)が入ってくるまで待ちます Producer putting Producer done Consumer done
最初に consumer がキューに来た後、producerが put
するまで consumer は動き出さないことがわかります。。
キューを作るときにキューバッファのサイズを指定することもできます。いくつのconsumerがキューに入れるかを示し、それ以上のconsumerが来ても put
でブロックします。
from threading import Thread from queue import Queue import time my_queue = Queue(1) # バッファサイズが1 def consumer(): time.sleep(0.1) # まず0.1秒スリープする my_queue.get() print('Consumer got 1') my_queue.get() print('Consumer got 2') print('Consumer done') thread = Thread(target=consumer) thread.start() my_queue.put(object()) # producerは立て続けに二つputしようとする print('Producer put 1') my_queue.put(object()) # ここでブロックする print('Producer put 2') print('Producer done') thread.join() >>> Producer put 1 # 最初にproducerがputするのは前回と同じ Consumer got 1 # 0.1秒待ってからgetする Producer put 2 # consumerがgetして、やっとputから戻る Producer done Consumer got 2 Consumer done
この例でのポイントは put 2のメッセージが got 1の後に来ているところです。consumerスレッドはスタートしてからまず0.1秒スリープしますが、その間にメインスレッドのproducerはputできずにブロックされていることがわかります。
次に、 Queue.task_done()
はそのキューに対してそのタスクが完了したことを宣言します。全てのタスクの完了を待つにはそのキューに対して Queue.join()
を呼べばよく、それまでブロックされます。これはスレッドのjoinとは別なことに注意です。
キューのタスクが完了するというのは、そのキューからgetしてきた仕事(アイテム)を最後の1個まで、全て処理し終わったという意味です。
from threading import Thread from queue import Queue import time in_queue = Queue() def consumer(): print('Consumer waiting') work = in_queue.get() print('Consumer working') time.sleep(1) # この例でのタスクはスリープすること print('Consumer done') in_queue.task_done() # タスク完了を宣言する thread = Thread(target=consumer) thread.start() print('Producer putting') in_queue.put(object()) print('Producer waiting') in_queue.join() # in_queueの完了(=task_doneが呼ばれる)までブロックされる print('Producer done') thread.join() >>> Consumer waiting Producer putting Producer waiting Consumer working # ここで1秒スリープする Consumer done Producer done
この例でのポイントはもちろん、consumer doneまでproducer doneが出ないところです。
さて、これらの知識を使ってパイプラインを実装します。パイプラインはdownload, resize, uploadの3ステージからなるとします。写真をカメラからダウンロードして、サイズを変えてまたアップロードする場合を想定しています。
from threading import Thread from queue import Queue import time def download(item): pass def resize(item): pass def upload(item): pass
Queue
を継承した ClosableQueue
を定義します。これはメソッド close
を持ち、キューにこれ以降の入力が無いことを示す sentinel を入れます。sentinel は歩哨・見張りの意味で、終わりの印です。Item 10で出てきましたね。
__iter__
を準備したことで、このキューを iterate することができます。 get()
でキューから写真を取り出し、sentinel以外ならyieldして写真を返します。キューに何も入っていなかったら get()
がブロックします。キューから取ってきたアイテムが写真でなくsentinelだったら、終わりの印なのでリターンしています。
class ClosableQueue(Queue): SENTINEL = object() def close(self): self.put(self.SENTINEL) def __iter__(self): while(True): item = self.get() # 写真を一つ取り出す。無かったらブロックする try: if item is self.SENTINEL: return yield item # まだfinallyは実行しない finally: self.task_done() # この写真の処理が完了
ここのtry - finallyの使い方に注目します。exceptで例外処理を行わないtry - finallyは、tryブロックの中で何が起こったとしてもfinallyブロックで取得しているロックを解放する(後始末する)ようなユースケースで使うようです。
この例ではロックは使っていませんが、Queue.task_doneをロック解放、Queue.joinをロック解放待ちのアナロジーとして考えると、似たユースケースと言えそうです。
StoppableWorker
は ClosableQueue
に対応した新ワーカースレッドです。スレッドは写真ではなくステージ(で作業する人)に対応します。 in_queue
から写真(item
)を取り出し、処理をして、処理後の写真(result
)を out_queue
に入れます。
class StoppableWorker(Thread): def __init__(self, func, in_queue, out_queue): super().__init__() self.func = func # やる作業 self.in_queue = in_queue self.out_queue = out_queue def run(self): for item in self.in_queue: # queueをiterateする result = self.func(item) self.out_queue.put(result)
キューとスレッドを用意します。キューとキューの間にワーカー(スレッド)がいるイメージですね。
download_queue = ClosableQueue() resize_queue = ClosableQueue() upload_queue = ClosableQueue() done_queue = ClosableQueue() threads = [ StoppableWorker(download, download_queue, resize_queue), StoppableWorker(resize, resize_queue, upload_queue), StoppableWorker(upload, upload_queue, done_queue), ]
最後にこれらをまとめます。SENTINELを投入する Queue.close()
はここで呼んでいるのですね。
for thread in threads: thread.start() for _ in range(1000): download_queue.put(object()) # object()=写真を入れる download_queue.close() # SENTINEL投入 download_queue.join() # task_done()が呼ばれるまでここでブロックされる resize_queue.close() # SENTINEL投入 resize_queue.join() # task_done()が呼ばれるまでここでブロックされる upload_queue.close() # SENTINEL投入 upload_queue.join() # task_done()が呼ばれるまでここでブロックされる print(done_queue.qsize(), 'items finished') for thread in threads: thread.join() >>> 1000 items finished
あれ、まだ終わりじゃありませんでした。。。次は、ステージ毎に複数のワーカースレッドを用意してI/Oの並列度を上げることを考えます。
まず、複数スレッドをスタート、ストップさせるヘルパー関数を用意します。 start_threads
関数では引数 count
の数だけ StoppableWorker
スレッドを作ってスタートし、そのリストを返します。 stop_threads
関数ではキューの close
を呼んでsentinelを投入し、キューの join
でタスクの完了を待ってからスレッドを完了させます。
def start_threads(count, *args): threads = [StoppableWorker(*args) for _ in range(count)] for thread in threads: thread.start() # スレッドをスタートさせる return threads def stop_threads(closable_queue, threads): for _ in threads: closable_queue.close() # SENTINEL投入 closable_queue.join() # 全てのtask_done()を待ち、キューをクローズする for thread in threads: thread.join() # 全てのスレッドの完了を待つ
最後にこれらをまとめます。ダウンロードスレッドは3多重、リサイズは4多重、アップロードは5多重を指定してスレッドを作成しています。後は1000個の写真を投入し、スレッドを1種類ずつ止めていきます。ポイントは、 stop_threads
はsentinelを投入し、それが出てくるまでブロックするところでしょうか。このお陰で、後片付けが中途半端な状態で次の stop_threads
に行くことはありません。
download_queue = ClosableQueue() resize_queue = ClosableQueue() upload_queue = ClosableQueue() done_queue = ClosableQueue() download_threads = start_threads( 3, download, download_queue, resize_queue) # ダウンロードは3多重 resize_threads = start_threads( 4, resize, resize_queue, upload_queue) # リサイズは4多重 upload_threads = start_threads( 5, upload, upload_queue, done_queue) # アップロードは5多重 for _ in range(1000): # 1000個(枚)の写真を投入 download_queue.put(object()) stop_threads(download_queue, download_threads) # 完了待ちする stop_threads(resize_queue, resize_threads) stop_threads(upload_queue, upload_threads) print(done_queue.qsize(), 'items finished') >>> 1000 items finished
1.5 Item 56: いつコンカレンシーが必要になるかをどう理解するか知れ
あるワークを、コンカレントに実行できるものにばらまくことを fan-out、ばらまいたものを回収することを fan-inと言うそうです。Pythonにはこれらを実現するツールがたくさんあって、それぞれトレードオフがあります。次の節以降で説明していきます。
1.6 Item 57: オンデマンドのfan-outで新たなスレッドインスタンスを作るのは避けろ
ダイナミックにfan-out/fan-inを繰り返すような用途や、非常に多くにfan-outするケースにはスレッドは合いません。
- 1スレッドあたり8MBのメモリを消費する
- スレッドの作成、開始、ロックなどでオーバーヘッドが大きい
- 複雑になりデバッグが大変
1.7 Item 58: コンカレンシーのためにQueueを使うにはリファクタリングが必要なことを理解せよ
Queueを使うとスレッド数はワーカーの数に限定されるので、上限を定めないスレッドよりはマシですが、仕組みが複雑なことと、仕様変更によっては大きなリファクタリングが必要になるため、よい方法とは言えません。
1.8 Item 59: コンカレンシーでスレッドが必要なら ThreadPoolExecutorを検討せよ
スレッドプールはなかなか良さそうです。例外を呼び元に伝搬する仕組みもあります。ただ、 max_workers
をあらかじめ決めておく必要があることがネックです
公式サイトから実装例です。
import concurrent.futures import urllib.request URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] # Retrieve a single page and report the URL and contents def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # {future: url, ...}の辞書がfuture_to_urlに入ります。 # futureはそのcallableの実行を表すオブジェクトです。 # ...この場合はワーカースレッドですね。 future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} # as_completedはfuture_to_urlのfuturesの完了(またはキャンセル)した # インスタンスのiteratorを返します。それをiterateしてfuture # = スレッドを得ます for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data))) >>> 'http://www.foxnews.com/' page is 323006 bytes 'http://www.cnn.com/' page is 1131345 bytes 'http://europe.wsj.com/' generated an exception: HTTP Error 404: Not Found 'http://some-made-up-domain.com/' page is 64668 bytes 'http://www.bbc.co.uk/' page is 300118 bytes
上の例で、 ThreadPoolExecutor
によるスレッドプールを executor
としています。次の行で URLS
リストに入っているURLに対して、複数のスレッドで関数 load_url
を実行するようにfan-outしています。そして次の行の for
文で完了したスレッドを刈り取っています(fan-in)。スレッド内で発生した例外は、呼び元で future.result()
を呼んで結果を刈り取る時に伝わるようです。意外と簡単に使えるのですね。
実際の実行結果で例外が発生したのはsome-made-up-domainではなくeurope.wsj.comの方だったのが笑えます。ブラウザーで見ると、前者はドメインが売りに出されており、後者はwsj.comにフォワードされました。
1.9 Item 60: コルーチン(Coroutines)を使って高コンカレントI/Oを実現せよ
Asyncronus I/Oです。これは1スレッド内で、スレッドとは異なる仕組みを使ってコンテキストスイッチを行います。スレッドはOSカーネルの仕組みを使って、プリエンプティブにコンテキストスイッチしますが、Async I/Oでは長い待ちが発生する時(eg, ネットワーク待ち)に自発的に処理の実行を明け渡します。Async I/OはCPUネックの処理では効果がありません。Async I/Oの仕組みはスレッドよりもずっと軽く、数千のコンテキストをコンカレントに処理することができます。
PythonのAsynchronous I/Oについては、Real Pythonのこの記事が最新(Python 3.7)の情報を元に、わかりやすく詳細に解説しています。PythonのAsynchronous I/Oの仕組みはまだ整備されている途中であり、ネット上には古い情報が多く混乱しがちですが、この記事は情報を整理する意味でもお勧めです。
Item 33にてジェネレーターとコルーチンについて書きました。コルーチンはジェネレーターの yield
等の仕組みを使って、ルーチンの途中で他のコルーチンにコンテキストスイッチを行い、後で再び中断した行から処理を再開することができます。
最新のPythonではジェネレーターは表に出ず、新たに導入した async/await
のシンタックスを使ってコルーチンを使います。Pythonでは、 async def
で定義された関数がコルーチンです。コルーチン内の await
文でコンテキストスイッチを行います。ジェネレーターのyield文がそうであったように、コルーチンではawait文のところから、以前のコンテキストのまま処理が再開されます。(実は、 await
は yield from
と等価だそうです)
import asyncio async def some_coroutine(): ... await slow_io_disk_read() ... await slow_io_network_transfer() ...
上記のコルーチンの例では、 await slow_io_disk_read(), await slow_io_network_transfer()
で待ちが発生し、別のコルーチンにコンテキストスイッチします。
以下にコルーチンの基本的な実装パターンを示します。(Real Pythonの記事より)
async def count(): print("One") await asyncio.sleep(1) print("Two") async def main(): # コルーチンcount()を3つ実行する await asyncio.gather(count(), count(), count()) if __name__ == "__main__": import time s = time.perf_counter() # コルーチンメイン関数実行(完了を待つ) asyncio.run(main()) elapsed = time.perf_counter() - s print(f"{__file__} executed in {elapsed:0.2f} seconds.")
asyncio.run
でコルーチンの main
関数を起動し、 main
の await asyncio.gather
からコルーチン count
を3つ起動〜回収します。 asyncio.run
はこれらが全て実行完了するまでブロックされて待ちます。
コルーチンの main
を用意し、そこから個別のコルーチンを複数起動するやり方です。
asyncio.run
関数はPython 3.7で導入され、コルーチンを起動する標準の方法になりました。 run
1行でイベントループを生成、タスクを起動〜完了〜回収、イベントループのクローズまで行います。以下の古い書式と同じ事をします。
loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) finally: loop.close()
run
のお陰でイベントループを意識する必要が無く、使いやすくなりました。(main
関数も隠蔽してくれるともっとすっきりする気がしますが)
1.10 Item 61: スレッド化されたI/Oをasyncioにポートする方法を知れ
主な作業
- I/O待ちの発生する箇所に
await
を付ける。 - 待ちの発生する箇所を含む関数やfor, with等のブロックに
async
を付ける - 関数名、クラス名を修正する
- asyncioのビルトインモジュールを使う
- スレッドの仕組みは全て置き換える
というところでしょうか。まだasyncioに未対応のモジュールもあることに注意。
1.11 Item 62: asyncioへの移行を楽にするため、スレッドとコルーチンを混在させよ
asyncioへの移行はblocking I/Oには効果がありません。例えばディスクからリードするシステムコール read
は完了までスリープせずにブロックされるため、別タスクにコンテキストスイッチをする機会がありません。blocking I/Oにはスレッドが有効です。
用途によってasyncioとスレッドを使い分ける(混在させる)ことが必要です。
1.12 Item 63: レスポンスを最大化するためにはasyncioのイベントループをブロックするな
以下の例のように、コルーチン用のイベントループ内でblocking I/Oをするとよくありません。
async def run_tasks(handles, interval, output_path): with open(output_path, 'wb') as output: async def write_async(data): output.write(data) # ブロックされるI/O tasks = [] for handle in handles: coro = tail_async(handle, interval, write_async) task = asyncio.create_task(coro) tasks.append(task) await asyncio.gather(*tasks)
解決策として、ファイル操作を別スレッドとして独立させます。
async def run_fully_async(handles, interval, output_path): async with WriteThread(output_path) as output: tasks = [] for handle in handles: coro = tail_async(handle, interval, output.write) task = asyncio.create_task(coro) tasks.append(task) await asyncio.gather(*tasks)
そしてそのために、スレッドのクラスを async with
文で扱えるように aenter, aexit
を用意します(PEP 492)。このスレッドの使い方は便利そうです。
ところで、ここではファイル操作系をスレッドとして独立させていますが、aiofilesを使えば、ファイル操作をasync化できそうです。
async with aiofiles.open('filename', mode='r') as f: contents = await f.read() print(contents) 'My file contents'
aiofilesのドキュメントを見ると、ファイル操作を別のスレッドプールにdelegateするとあります。
aiofiles helps with this by introducing asynchronous versions of files that support delegating operations to a separate thread pool.
こういうライブラリを使うのと、自分でスレッドを作るのと、どちらがいいのでしょうね。。
1.13 Item 64: 真の並行動作にはconcurrent.futuresを検討せよ
Pythonのglobal interpreter lock (GIL)のせいで、マルチコアを使った真の並行動作は簡単には実現できません。Cエクステンションは高速化には適していますが、大きなコストがかかります。通常、遅くなる原因は多くの場所にあり、一部だけエクステンションとして抜き出して高速化する訳にはいかないようです。
concurrent.futures
ビルトインモジュール経由でアクセスできる multiprocessing
ビルトインモジュールが使えるかもしれません。利用する側は ThreadPoolExecutor
の代わりに ProcessPoolExecutor
で置き換えるだけでよいです。
ただしこれは、自プロセスと子プロセスの間のデータのやりとりでpickleを使ったバイナリエンコード・デコードが必要で、オーバーヘッドが馬鹿になりません。よって ProcessPoolExecutor
で効果があるのは、プロセス間のデータ転送量及び頻度が少ない場合に限られます。
multiprocessing
は共有メモリやプロセス間のロック、キュー、プロキシーといったより高度な手段を提供してはいますがが、これらは非常に複雑だそうです。
こんなところでPythonの限界が見えてきてしまいました。。。(インタプリター言語に何を求めているのか、という話もありますが)
2 Chapter 8: 堅牢性(robustness)と性能
2.1 Item 65: try/except/else/finallyで各ブロックを有効に使え
try/except/else/finally
ブロックを整理します。
def some_func(): # 例えばファイルをオープンする処理 # ここでの例外はすぐに呼び元に上がる try: # 例外が上がる可能性のあるオペレーション except ZeroDivisionError as e: # 想定した例外が上がった場合 else: # tryで例外が上がらなかった場合 # ここでの例外は呼び元に伝搬する finally: # (tryに来ていたら)関数がリターンする前に必ず実行される # 例えばファイルのクローズ処理
2.2 Item 66: try/finally挙動を再利用するためにcontextlibとwithステートメントを考えよ
@contextmanager
デコレーターで修飾した関数はコンテキストマネージャーとなり、 with
ステートメントで使えるようになります。正式に __enter__
, __exit__
を準備するよりも簡単です。
from contextlib import contextmanager import logging @contextmanager def debug_logging(level): logger = logging.getLogger() old_level = logger.getEffectiveLevel() logger.setLevel(level) # 一時的に指定ログレベルを設定 try: yield finally: logger.setLevel(old_level) # ログレベルを戻す
上記関数では(一時的に) level
にデバッグレベルを変更します。
def my_function(): logging.debug('Some debug data') # DEBUG logging.error('Error log here') # ERROR logging.debug('More debug data') # DEBUG with debug_logging(logging.DEBUG): # DEBUGレベルのブロック print('* Inside:') my_function() print('* After:') my_function() >>> * Inside: DEBUG:root:Some debug data # DEBUGレベルが表示されている ERROR:root:Error log here DEBUG:root:More debug data # DEBUGレベルが表示されている * After: ERROR:root:Error log here
上記 with
ブロックはデバッグレベルをDEBUGにします。出力結果から、実際にwithブロックでのみDEBUGレベルのメッセージ出力されていることがわかります。
下の例で示すように、 with
ステートメントに渡されるコンテキストマネージャーは yield
でオブジェクトを返すことができ、 as
でローカル変数に入ります。これによって、 with
ブロック内のコードがそのコンテキストと直接interactできます。
from contextlib import contextmanager import logging @contextmanager def log_level(level, name): logger = logging.getLogger(name) old_level = logger.getEffectiveLevel() logger.setLevel(level) try: yield logger # コンテキストloggerを返す finally: logger.setLevel(old_level) logging.basicConfig() with log_level(logging.DEBUG, 'my-log') as logger: logger.debug(f'This is a message for {logger.name}!') logging.debug('This will not print') >>> DEBUG:my-log:This is a message for my-log!
with
ブロック内で logger.debug
のメッセージは表示されましたが、 logging.debug
は表示されていません。なお、本には載っていませんが、 logging.basicConfig()
を呼ばないと logger.debug
の方も表示されませんでした。。。
2.3 Item 67: ローカル時間にはtimeの代わりにdatetimeを使え
timeはUTCとローカルしか扱えないため、datatimeを使うべきです。pytzというコミュニティーが作っているライブラリを使うと世界中の時間が使えます。datetimeでは一度UTCに変換してから時間操作を行います。
from datetime import datetime, timezone import pytz time_format = '%Y-%m-%d %H:%M:%S' arrival_bos = '2020-08-29 10:01:00' bos_dt_native = datetime.strptime(arrival_bos, time_format) edt = pytz.timezone('US/Eastern') bos_dt = edt.localize(bos_dt_native) # datetime形式のボストン時間 utc_dt = pytz.utc.normalize(bos_dt.astimezone(pytz.utc)) # UTCに変換 print(utc_dt) >>> 2020-08-29 14:01:00+00:00
UTCを日本時間に変換します。
jst = pytz.timezone('Asia/Tokyo') tokyo_dt = jst.normalize(utc_dt.astimezone(jst)) # JSTに変換 print(tokyo_dt) >>> 2020-08-29 23:01:00+09:00
このインタフェースだと、いったんUTCに変換しなくてもいいような??? 単なるタイムゾーン間のコンバートならば、それでもいいかもしれません。
2.4 Item 68: copyregでpickleをreliableにせよ
Pythonでデータをシリアライズする場合、
- Python以外とデータ共有する場合はjson, xmlを使う
- Pythonとデータ共有する場合はpickleを使う
ことになると思います。
pickleにcopyregを組み合わせると、以下のような場合に対応できるようになります。
- pickleしたクラスのメンバーアトリビュートが追加された
- pickleしたクラスのメンバーアトリビュートが削除された
- pickleしたクラス名が変更になった
copyregは、pickle及びunpickleする時に呼ばれる関数を指定することで、そこでクラスのアトリビュート追加・削除の面倒を見ます。また、copyregを使うとクラス名がシリアライズされたデータに含まれないようになるため、クラス名の変更に対応できます。
以下の例ではGameStateクラスをpickleすることを考えます。copyregを使うためのヘルパー関数を、pickle用とunpickle用の二つ用意します。pickle用の pickle_game_state
はunpickle用の unpickle_game_state
を引数の kwargs
とセットで返します。このためcopyregにはpickle用の関数(とpickleするクラス名)だけを登録すればよいです。 copyreg.pickle
で pickle 用の関数を登録しています。
import pickle import copyreg class GameState: def __init__(self, level=0, lives=4, points=0): self.level = level self.lives = lives self.points = points def pickle_game_state(game_state): # pickle.dumpsすると呼ばれる print("pickling") kwargs = game_state.__dict__ return unpickle_game_state, (kwargs,) # unpickle用関数を返す def unpickle_game_state(kwargs): # pickle.loadsすると呼ばれる print("unpickling") return GameState(**kwargs) copyreg.pickle(GameState, pickle_game_state) # pickle用関数を登録する
pickleしてみましょう。
state = GameState() state.points += 1000 print("call pickling") serialized = pickle.dumps(state) print("call unpickling") state_after = pickle.loads(serialized) print(state_after.__dict__) >>> call pickling pickling # pickle_game_stateで表示している b'\x80\x04\x95L\x00\x00\x00\x00\x00\x00\x00\x8c\x08__main__\x94\x8c\x13unpickle_game_state\x94\x93\x94}\x94(\x8c\x05level\x94K\x00\x8c\x05lives\x94K\x04\x8c\x06points\x94M\xe8\x03u\x85\x94R\x94.' call unpickling unpickling # unpickle_game_stateで表示している {'level': 0, 'lives': 4, 'points': 1000}
実際に、pickleする際に pickle_game_state
が、unpickleする際に unpickle_game_state
が呼ばれていることがわかります。
クラスへのアトリビュートの追加に対応できるのは、 unpickle_game_state
で GameState のインスタンスを作る際に、(追加後のクラスの)コンストラクターを呼ぶためです。この中で追加したアトリビュートのデフォルト値が設定されます。
pickleするクラスのアトリビュートを削除するとbackward compatibilityが保てなくなります。この場合はバージョンを指定し、古いバージョンなら明示的に不要となったアトリビュートを削除します。
def pickle_game_state(game_state): kwargs = game_state.__dict__ kwargs['version'] = 2 return unpickle_game_state, (kwargs,) def unpickle_game_state(kwargs): version = kwargs.pop('version', 1) if version == 1: # 読んできたバージョンが1なら古いアトリビュートがある del kwargs['deleted_attribute'] return GameState(**kwargs)
2.5 Item 69: 精度が重要なら decimal を使え
Pythonのfloatの扱いで、1.44999999…のようになる場合があります。 これをきちっと1.45と見せたい場合、decimalを使うとよい、という話です。
まずは、Decimalに小数点の付いた値を渡す際には文字列を使うと正確です。
>>> from decimal import Decimal >>> print(Decimal('1.45')) # 文字列渡し 1.45 >>> print(Decimal(1.45)) # float渡し 1.4499999999999999555910790149937383830547332763671875
お金の計算などで四捨五入してゼロにされると困るような場合にも対応可能です。
from decimal import Decimal, ROUND_UP rate = Decimal('0.05') seconds = Decimal('5') small_cost = rate * seconds / Decimal(60) print("実の値 - ", small_cost) print("四捨五入 - ", round(small_cost, 2)) rounded = small_cost.quantize(Decimal('0.01'), rounding=ROUND_UP) print("切り上げ - ", rounded) >>> 実の値 - 0.004166666666666666666666666667 四捨五入 - 0.00 切り上げ - 0.01
2.6 Item 70: 最適化の前にプロファイルせよ
プロファイルにはCで書かれたcProfileを使うとよい、とのことです。
test
関数をプロファイルして、統計を出す例:
from cProfile import Profile profiler = Profile() profiler.runcall(test) from pstats import Stats stats = Stats(profiler) stats.strip_dirs() stats.sort_stats('cumulative') stats.print_stats() >>> 30003 function calls in 0.026 seconds Ordered by: cumulative time ncalls tottime percall cumtime percall filename:lineno(function) 1 0.000 0.000 0.026 0.026 t.py:17(<lambda>) 1 0.002 0.002 0.026 0.026 t.py:1(insertion_sort) 10000 0.003 0.000 0.024 0.000 t.py:9(insert_value) 10000 0.016 0.000 0.016 0.000 {method 'insert' of 'list' objects} 10000 0.005 0.000 0.005 0.000 {built-in method _bisect.bisect_left} 1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
2.7 Item 71: 生産者-消費者キューにはdequeを使え
普通のリスト []
はキューに使えますが、キューが長くなると特に pop(0)
が2乗のオーダーで遅くなるそうです。collectionsのdequeはリニアに遅くなるだけなので、キュー操作が性能ネックとなっているようなら、こちらを使った方が良いです。
マイクロベンチマーク timeit
の使い方。
import collections import timeit def print_result(count, tests): avg_iteration = sum(tests) / len(tests) print(f'Count {count:>5,} takes {avg_iteration:.4f}s') def deque_append_benchmark(count) def prepare(): return collections.deque() def run(queue): for i in range(count): queue.append(i) tests = timeit.repeat( setup='queue = prepare()', stmt='run(queue)', globals=locals(), repeat=1000 number=1) return print_result(count, tests)
注意点として、timeitはループするのでキャッシュされて速くなってしまうようなことには使えません。
2.8 Item 72: ソートされたシーケンス内をサーチするにはbisectを使え
ソートされているリストなどの中で、指定の値がどこに来るかを調べるのに、 bisect_left
は速いのでいいですよ、ということでした。
2.9 Item 73: 優先度キューのために heapq をどう使うかを知れ
FIFOでない、何かのアトリビュートの順番で処理する必要のあるキューを優先度キュー、プライオリティーキューと言います。優先度キューの実装に heapq が使えます。
headqのアイテムは比較可能でnatural sort orderを持たなくてはいけません。これにはfunctoolsビルトインモジュールの total_ordering
クラスデコレーターを使い、 __lt__
(less than)スペシャルメソッドを実装する必要があります。
以降は図書館の貸し出し本を管理する例です。キュー内の操作はコストがかかるため、なるべくキューには手を付けないようにします。本の返却は通常、キューから抜く操作が必要になりますが、ここではキューの中の返却された本には返却マークを付けるのみで、キューそのものは変更しないようにしています。
import functools @functools.total_ordering class Book: def __init__(self, title, due_date): self.title = title self.due_date = due_date self.returnd = False # 返却フラグ def __lt__(self, other): return self.due_date < other.due_date # 返却日で比較する
due_dateによるキューのソートは以下のようにできます。
from heapq import heapify queue = [ Book('Pride and Prejudice', '2019-06-10'), Book('The Time Machine', '2019-05-30'), ... ] queue.sort() または heapify(queue)
次は、期限切れの本を表示します。返却済みの本が出てきたら、ひっそりとキューから抜きます。
from heapq import heappop class NoOverdueBooks(Exception): pass def next_overdue_book(queue, now): while queue: book = queue[0] # Most overdue first if book.returned: # 返却済みなら、、 heappop(queue) continue if book.due_date < now: # 期限切れなら heappop(queue) return book break raise NoOverdueBooks
本の返却処理です。
def return_book(queue, book): book.returned = True
このやり方の欠点は、返却された本をキューから抜かないため、キューが大きくなりうることです。これはメモリを圧迫します。ワーストケースを想定して必要メモリ量等のシステム設計をする必要があります。
2.10 Item 74: bytesとゼロコピーでinteractするにはmemoryviewとbytearrayを使え
出ました。ゼロコピーです。Pythonのバッファープロトコルとゼロコピーについては、ここにわかりやすい説明がありました。
bytesのデータを直接スライスするとメモリコピーが発生します。ビルトインの memoryview
タイプを使うとゼロコピーで行けます。
data = b'shave and a haircut, two bits' view = memoryview(data) chunk = view[12:19] print(type(chunk)) print('Size: ', chunk.nbytes) print('Data in view', chunk.tobytes()) print('Underlying data:', chunk.obj) >>> <class 'memoryview'> Size: 7 Data in view b'haircut' Underlying data: b'shave and a haircut, two bits'
memoryview をスライスした chunk
のタイプはmemoryviewであることがわかります。
bytesはリードオンリーのため、スライスした部分を上書きしたいなら bytearray
を使います。
my_array = bytearray(b'row, row, row your boat') my_view = memoryview(my_array) write_view = my_view[3:13] write_view[:] = b'1234567890' print(my_array) >>> bytearray(b'row1234567890 your boat')
socket.recv_intoはゼロコピーに対応します。ビデオストリーミングデータを受け取る場合の例。
socket = ... # クライアントへのソケットコネクション video_cache = ... # 入ってくるビデオストリーム用のキャッシュ byte_offset = ... # バッファ上にデータが入ってくる位置 size = 1024 * 1024 # 入ってくるデータのチャンクサイズ video_array = bytearray(video_cache) write_view = memoryview(video_array) chunk = write_view[byte_offset:byte_offset + size] socket.recv_into(chunk)