この記事は アカンクリスマスアドベントカレンダー2023 21日目 の記事です.
- はじめに
- 1.shutdownしないExecutorは走り続けてしまう
- 2.decimalの有効桁数がthread内で初期化される
- 3.privateメンバ変数にアクセスするデコレータを作る
- 4.staticmethodを適当にデコレートするとdocstringが消える
- 5.async defしたAPIが自分にリクエストを送ると上手くいかないことがある
- おわりに
はじめに
本ブログは80%近くがアドカレ(アドベントカレンダー)用の記事で構成されており,毎年アドカレの時期になるとどんな記事を書こうかと毎日ワクワクしています.こんな生活が今年で6年目になるのですが,だんだん楽しくなってアドカレ時期でなくとも何かしらの発見があるとアドカレ用のネタとしてストックして書く記事を想像するようになりました.その多くは大したことがないため没になって消えていき,今年も多くの没ネタがあります.
不本意ながら今年は1年を通してPythonを書き続けており,Pythonに関する没ネタが溜まっています.本来ならば書くほどのことではないのですが,塵も積もれば山になるということで,全部まとめて1記事分として本記事で放流してみようと思います.なお断りがない限りPythonコードはPython 3.9で実行しているものとします.
1.shutdownしないExecutorは走り続けてしまう
PythonではThreadPoolExecutorやProcessPoolExecutorを用いることでThreadやProcessを起動しての非同期処理が実現できます.これらは,インスタンスに対してsubmitして実行する処理を登録し,resultによって待機,値の取り出しを行います.以下に使用例を示します.本例では1,2,3秒待機する3つの関数を非同期で処理しており,一秒ごとにprintされる様子が見れます.
import time from concurrent.futures import ThreadPoolExecutor def func(k): time.sleep(k) print(k) return 1 if __name__ == "__main__": executor = ThreadPoolExecutor() futures = [ executor.submit(func, 1), executor.submit(func, 2), executor.submit(func, 3), ] [f.result() for f in futures] print("finish")
1 2 3 finish
ここで,非同期処理する関数内で例外が発生することを考えます.ここでは簡単のため,以下のように単にRuntimeErrorをraiseするだけの関数で非同期処理を実行してみます.
import time from concurrent.futures import ThreadPoolExecutor def func(k): time.sleep(k) print(k) raise RuntimeError("ERROR") if __name__ == "__main__": executor = ThreadPoolExecutor() futures = [ executor.submit(func, 1), executor.submit(func, 2), executor.submit(func, 3), ] try: [f.result() for f in futures] except RuntimeError as e: print(e) print("finish")
1 ERROR finish 2 3
実行の様子や出力結果を見ると少しおかしなことになっています.まず,例外発生により処理が終了して最後のfinishがprintされているにも関わらず,非同期実行していた処理が走り続けています.さらに,走り続けている処理内でもRuntimeErrorがraiseされてるはずですが,catchされることも例外が吐き出されるわけでもなく何事もなく終了しています.
一見不思議に見える挙動ですが,これは実は仕様通りの正しい挙動で,executorをshutdownし忘れたことで起きうるバグです.試しにshutdownを付け足してみます.
if __name__ == "__main__": executor = ThreadPoolExecutor() futures = [ executor.submit(func, 1), executor.submit(func, 2), executor.submit(func, 3), ] try: [f.result() for f in futures] except RuntimeError as e: print(e) executor.shutdown() # 追加 print("finish")
1 ERROR 2 3 finish
例外が発生した時点で例外処理,その後全ての処理の後にfinishのprintが行われるようになりました.これはshutdownが処理の待機を担っているためです.resultも待機はされるのですが,本実装ではresultのうち1つでも例外が発生するとcatchされるようになっており,以降のresultは全て無視されたまま先に進んでしまいます.ここで,最後にshutdownを挟むことで,全ての処理が揃うまで待機させて以降の処理の足並みを合わせることができます.
なお,shutdownを書かなきゃいけないとは言いつつ,人間なので書き忘れてしまうことはあると思います.そこで,with文を用いることをおすすめしようと思います.Executorクラスはwithのスコープを抜ける時に勝手にshutdownが呼ばれるようになっています.そのため,withを使うようにさえしておけば,shutdownを書くことを忘れることもなくなり非常に安全です.
if __name__ == "__main__": with ThreadPoolExecutor() as executor: futures = [ executor.submit(func, 1), executor.submit(func, 2), executor.submit(func, 3), ] try: [f.result() for f in futures] except RuntimeError as e: print(e) print("finish")
1 ERROR 2 3 finish
2.decimalの有効桁数がthread内で初期化される
Pythonは標準で提供されているdecimalを使用することで十進数の固定小数や浮動小数を容易に扱うことができます.decimalの有効桁数はデフォルトでは28桁とされていますが,decimal.getcontext().prec
というグローバル値をいじることで好きな桁数に調整することができます.
import decimal def f(): print(decimal.getcontext().prec) # 28 if __name__ == "__main__": f()
import decimal decimal.getcontext().prec = 50 def f(): print(decimal.getcontext().prec) # 50 if __name__ == "__main__": f()
ここで,decimalをthread内で扱いたいケースを考えます.あらかじめ有効桁数を増やした上でthread内でdecimalを使ってみます.
import decimal from concurrent.futures import ThreadPoolExecutor decimal.getcontext().prec = 50 def f(): print(decimal.getcontext().prec) # 28 if __name__ == '__main__': with ThreadPoolExecutor() as executor: executor.submit(f)
なんと値が28に初期化されてしまいました.これはぁ...なんでなんですかねぇ...???よく分からないですが,documentに次のような記載がありました.
多重スレッドで処理を行う場合には各スレッドごとに現在のコンテキストがあり、 getcontext() や setcontext() といった関数でアクセスしたり設定変更できます
https://docs.python.org/ja/3/library/decimal.html
どうやらスレッドごとに独立したコンテキスト(有効桁数)を持つようです.そのため,スレッド毎にコンテキストを設定しなければなりません.
適当にやるとスレッド毎に設定することで冗長になったり管理が大変になったりしますが,contextを使用してinjectionしたり,そもそもThreadではなくProcessを用いるなどすれば非常に楽になります.
import decimal from concurrent.futures import ThreadPoolExecutor def f(ctx): decimal.setcontext(ctx) print(decimal.getcontext().prec) if __name__ == "__main__": decimal.getcontext().prec = 50 with ThreadPoolExecutor() as executor: executor.submit(f, decimal.getcontext())
import decimal from concurrent.futures import ProcessPoolExecutor decimal.getcontext().prec = 50 def f(): print(decimal.getcontext().prec) # 50 if __name__ == '__main__': with ProcessPoolExecutor() as executor: executor.submit(f)
3.privateメンバ変数にアクセスするデコレータを作る
ある複数の処理に対して共通の前処理や後処理を挟みたい場合は,デコレータが非常に強力なツールとして役立ちます.以下にデコレータを用いる例を示します.非常に簡単に前処理や後処理が実行できることが分かります.
import functools def __decorator(f): @functools.wraps(f) def wrapper(*args, **kwargs): print("前処理") ret = f(*args, **kwargs) print("後処理") return ret return wrapper @__decorator def f(): print("処理") if __name__ == "__main__": f()
前処理 処理 後処理
ここで,クラス内のmethodにデコレートするとして,privateなメンバ変数にアクセスするデコレータを作成することを考えます.例えば以下のようなデコレータを作ってみたとします.
import dataclasses import functools @dataclasses.dataclass(frozen=True) class C: __x: int def __decorator(f): @functools.wraps(f) def wrapper(self, *args, **kwargs): print(self.__x) return f(*args, **kwargs) return wrapper @__decorator def f(self): ... if __name__ == "__main__": C(5).f()
これはエラーにより正常に実行ができません.pythonのmethodは特殊な仕様になっており,実態として他の言語のようなmethodは存在しません.例えば上記コードの場合は,C(5).f()
という呼び出しは内部ではC.f(C(5))
と解釈されており,クラス変数は引数selfとしてまとめて渡されます.これらのことから,クラスのスコープ内にあるものは簡単に呼び出しができるように見えますが,実際は関数が独立に存在しているだけなので,selfを介す以外でメンバ変数にアクセスする手段はないことが分かります.
では,以下のようにクラス外でデコレータを定義してselfを参照できるようにしてみるとどうでしょうか?
import dataclasses import functools def _decorator(f): @functools.wraps(f) def wrapper(self, *args, **kwargs): print(self.__x) return f(self, *args, **kwargs) return wrapper @dataclasses.dataclass(frozen=True) class C: __x: int @_decorator def f(self): ... if __name__ == "__main__": C(5).f()
残念ながら本コードもエラーにより正常に動作しません.当たり前の話ではありますが,メンバ変数はprivateなので(普通の方法では)外の関数からはアクセスできません.
しかし,外の関数からアクセスできないだけなので,メンバ変数にアクセスする部分だけを抽出してmethodを定義することで解決することができます.
import dataclasses import functools def _decorator(f): @functools.wraps(f) def wrapper(self, *args, **kwargs): self._decorate_func() return f(self, *args, **kwargs) return wrapper @dataclasses.dataclass(frozen=True) class C: __x: int def _decorate_func(self): print(self.__x) @_decorator def f(self): ... if __name__ == "__main__": C(5).f()
無事に動きました.結局遠回りする感じにはなりますが,privateメンバ変数にアクセスデコレータを作成するには,デコレータをクラス外で定義した上でメンバ変数にアクセスする部分だけを抽出するという2手順が必要です.
4.staticmethodを適当にデコレートするとdocstringが消える
デコレータは非常に便利な機能ではありますが,関数を上書きすることになるためdocstringといった関数に紐づく情報が失われてしまうことがあります.以下に設定したdocstringが消失する例を示します.
def decorator(f): def wrapper(*args, **kwargs): return f(*args, **kwargs) return wrapper @decorator def f(): """ fのdocument """ print(2) if __name__ == '__main__': print(f.__doc__)
None
本来ならば関数fのdocstringである"fのdocument"が出力されて欲しいのですが,実際の出力は"None",つまり設定されてないことになっています.これはデコレートした時に関数fの実態がデコレータ内の関数wrapperに置換されているためです.関数fのdocを読んだように見えて実際はwrapperのdocを呼び出しており,wrapperにはdocstringが記載されていないためNoneになっていたのです.
Pythonは便利なので,このような問題に対処するための方法が標準で提供されています.それがfunctools.wraps
やfunctools.update_wrapper
です.使用例を以下に示します.
import functools def decorator(f): @functools.wraps(f) def wrapper(*args, **kwargs): return f(*args, **kwargs) return wrapper @decorator def f(): """ fのdocument """ print(2) if __name__ == '__main__': print(f.__doc__)
fのdocument
正しくdocstringが出力されました.デコレータについて調べているとおまじないのように書かれていることが多いですが,実はこのような効果がありました.
なお,自作のデコレータは上記のようにwraps
などをつけることでdocstringが継承されますが,staticmethod
など標準で使えるデコレータもちゃんとdocstringが継承されるようになっています.
class C: @staticmethod def f(): """ fのdocument """ print(2) if __name__ == '__main__': print(C.f.__doc__)
fのdocument
さらに,適切に設計されたデコレータは連続してデコレートしても問題なく情報が継承されます.
import functools def decorator(f): @functools.wraps(f) def wrapper(*args, **kwargs): return f(*args, **kwargs) return wrapper class C: @decorator @staticmethod def f(): """ fのdocument """ print(2) if __name__ == '__main__': print(C.f.__doc__)
staticmethod(function) -> method Convert a function to be a static method. A static method does not receive an implicit first argument. To declare a static method, use this idiom: class C: @staticmethod def f(arg1, arg2, ...): ... It can be called either on the class (e.g. C.f()) or on an instance (e.g. C().f()). Both the class and the instance are ignored, and neither is passed implicitly as the first argument to the method. Static methods in Python are similar to those found in Java or C++. For a more advanced concept, see the classmethod builtin.
このように適切に...ん???なぜかstaticmethod
のdocstringが出力されました.これは一体なぜでしょうか....本当にいったいなんでなんでしょう???試しにデコレートする順番を変えてみます.
import functools def decorator(f): @functools.wraps(f) def wrapper(*args, **kwargs): return f(*args, **kwargs) return wrapper class C: @staticmethod @decorator def f(): """ fのdocument """ print(2) if __name__ == '__main__': print(C.f.__doc__)
fのdocument
?????期待通りの挙動になりました.全く理解できなかったので色々調べたところ,documentに次の記載がありました.
バージョン 3.10 で変更: 静的メソッドはメソッド属性 (__module__, __name__, __qualname__, __doc__ そして __annotations__) を継承するようになり、また新たに __wrapped__ 属性を持つようになりました。
https://docs.python.org/ja/3.12/library/functions.html#staticmethod
バグかよ!
Python 3.10で動かしてみたところ全て正常に動作しました.まさか言語そのものがおかしいとは夢にも思いませんでした.本経験を通して言語バージョンを上げることの大切さを痛感しました.便利な機能が追加されるだけでなくちゃんとバグも解消されるので,できるだけ言語バージョンは更新していきたいですね.
5.async defしたAPIが自分にリクエストを送ると上手くいかないことがある
PythonでAPI serverを立てる際はfastAPIが便利です.fastAPIは必要最小限で簡単にAPIを記述することができ,それでいて高いパフォーマンスを発揮できます.以下に簡単なAPI serverの例を示します.動作させる場合は事前にfastAPIをinstallしてください.
""" server.py """ import requests import uvicorn from fastapi import FastAPI app = FastAPI() @app.get("/hello") def hello(): return {"text": "Hello World!"} if __name__ == "__main__": uvicorn.run("server:app", host="0.0.0.0", port=8080, log_level="info")
$ curl http://localhost:8080/hello {"text":"Hello World!"}
簡単ですね.
さらに,上記の例のコードではAPI内で同期的に処理が行われますが,fastAPIではコルーチンによる非同期処理も導入できます.これは関数定義にasync
も付けるだけで非常に簡単です.以下に例を示します.
""" server.py """ import requests import uvicorn from fastapi import FastAPI app = FastAPI() @app.get("/hello") async def hello(): return {"text": "Hello World!"} if __name__ == "__main__": uvicorn.run("server:app", host="0.0.0.0", port=8080, log_level="info")
$ curl http://localhost:8080/hello {"text":"Hello World!"}
本例ではすぐに値を返しているので恩恵を受けれてはないですが,この変更により重い処理などをawaitを使って呼び出すことができるようになっています.
では本題に入ります.あるAPIがリクエストを受け取った際に,処理の中で自分自身に対してリクエストを送りたい場合を考えます.これを通常のdef
で定義したAPIとasync def
で定義したAPIとの2つで試してみます.
@app.get("/hello") def hello(): return {"text": "Hello World!"} @app.get("/call_self_api") def call_self_api(): return requests.get("http://localhost:8080/hello").json()
$ curl http://localhost:8080/call_self_api {"text":"Hello World!"}
@app.get("/hello") async def hello(): return {"text": "Hello World!"} @app.get("/call_self_api") async def call_self_api(): return requests.get("http://localhost:8080/hello").json()
$ curl http://localhost:8080/call_self_api # 永遠に終わらない
def
の場合は正常に動作しましたが,async def
の場合は永遠にレスポンスが返ってこず止まってしまいました.実は,後者の実装ではfastAPIの非同期処理仕様によって内部でブロッキングが発生しています.
fastAPIでは,def
で定義したAPIはmain threadとは別のthreadで処理されます.対して,async def
で定義したAPIは全てmain threadで処理され,awaitしない限りあるリクエストを処理している間は別のリクエストの実行ができません.よって,async def
を使用する場合は重い処置は並行処理,コルーチンを使用するようして,別のリクエストをブロックしないようにする必要があります.
先ほどのコードをコルーチンを使用して書き直してみます.
@app.get("/hello") async def hello(): return {"text": "Hello World!"} @app.get("/call_self_api") async def call_self_api(): loop = asyncio.get_event_loop() res = await loop.run_in_executor(None, requests.get, "http://localhost:8080/hello") return res.json()
$ curl http://localhost:8080/call_self_api {"text":"Hello World!"}
正常に動作しました.本コードでは全てmain threadで動作はしていますが,await
した時点で他の処理を受け付ける状態となり別のリクエストを捌けます.
なお,このコードでは自分でコルーチンを記述していますが,aiohttpというライブラリを使用すれば自分で記述する必要もなく簡単に非同期リクエストを実現できます.特に非同期処理周りは自分で書くとバグらせやすいため,ライブラリは積極的に使用していきたいです.以下にaiohttpで書き換えたコードを示します.
import aiohttp @app.get("/hello") async def hello(): return {"text": "Hello World!"} @app.get("/call_self_api") async def call_self_api(): async with aiohttp.ClientSession() as session: async with session.get("http://localhost:8080/hello") as response: return await response.json()
$ curl http://localhost:8080/call_self_api {"text":"Hello World!"}
ここまで見ると,無理してasnyc def
を使わずにdef
で十分じゃないかと思うかもしれません.これはおおよそその通りで,fastapiのdocumentにも基本的にはdef
を使う旨の記載がされています.ただし,詳細は省きますがPythonではGIL(Global Interpreter Lock)が用いられているため,def
でthreadが生えたからパフォーマンスが高いということはほとんどないです.結局どちらを使うべきかは場合によるので,非同期処理についてしっかり調べて最適な方法を選べると良いですね.そうしない場合は何も考えずdef
を使っておきましょう.