終了 - python プロセス
Pythonマルチプロセッシング酸洗エラー (5)
このソリューションでは、dillのみをインストールし、他のライブラリはpathosとしてインストールする必要はありません
def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
"""
Unpack dumped function as target function and call it with arguments.
:param (dumped_function, item, args, kwargs):
a tuple of dumped function and its arguments
:return:
result of target function
"""
target_function = dill.loads(dumped_function)
res = target_function(item, *args, **kwargs)
return res
def pack_function_for_map(target_function, items, *args, **kwargs):
"""
Pack function and arguments to object that can be sent from one
multiprocessing.Process to another. The main problem is:
«multiprocessing.Pool.map*» or «apply*»
cannot use class methods or closures.
It solves this problem with «dill».
It works with target function as argument, dumps it («with dill»)
and returns dumped function with arguments of target function.
For more performance we dump only target function itself
and don't dump its arguments.
How to use (pseudo-code):
~>>> import multiprocessing
~>>> images = [...]
~>>> pool = multiprocessing.Pool(100500)
~>>> features = pool.map(
~... *pack_function_for_map(
~... super(Extractor, self).extract_features,
~... images,
~... type='png'
~... **options,
~... )
~... )
~>>>
:param target_function:
function, that you want to execute like target_function(item, *args, **kwargs).
:param items:
list of items for map
:param args:
positional arguments for target_function(item, *args, **kwargs)
:param kwargs:
named arguments for target_function(item, *args, **kwargs)
:return: tuple(function_wrapper, dumped_items)
It returs a tuple with
* function wrapper, that unpack and call target function;
* list of packed target function and its' arguments.
"""
dumped_function = dill.dumps(target_function)
dumped_items = [(dumped_function, item, args, kwargs) for item in items]
return apply_packed_function_for_map, dumped_items
また、配列の数が少ない場合にも機能します。
私は単純な例でエラーを再現できないことを申し訳ありません。私のコードは投稿するには複雑すぎます。 通常のPythonの代わりにIPythonシェルでプログラムを実行すると、うまくいきます。
私はこの問題に関する以前のメモを調べました。 それらはすべて、クラス関数内で定義されたプールツーコール関数を使用することによって発生しました。 しかし、これは私には当てはまりません。
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
助けていただければ幸いです。
更新:機能I pickleは、モジュールの最上位レベルで定義されています。 ただし、ネストされた関数を含む関数を呼び出します。 すなわちf()はg()を呼び出し、h()は入れ子関数i()を持ち、pool.apply_async(f)を呼び出しています。 f()、g()、h()はすべてトップレベルで定義されています。 私はこのパターンでより単純な例を試みましたが、それは動作します。
Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
このエラーは、非同期ジョブに渡されたモデルオブジェクト内にinbuilt関数がある場合にも発生します。
したがって、渡されたモデルオブジェクトに inbuilt関数がないことを確認してください。 (私たちのケースでは、特定のフィールドを追跡するためにモデル内でdjango-model-utils FieldTracker()
関数を使用していFieldTracker()
)。 ここに関連するGitHubの問題へのlinkがあります。
ここでは、 ピックすることができるもののリストです 。 特に、関数はモジュールのトップレベルで定義されている場合にのみpicklableです。
このコードは次のとおりです。
import multiprocessing as mp
class Foo():
@staticmethod
def work(self):
pass
pool = mp.Pool()
foo = Foo()
pool.apply_async(foo.work)
pool.close()
pool.join()
投稿したものとほぼ同じエラーが表示されます。
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
問題は、 pool
メソッドがすべてqueue.Queue
を使用して、タスクをワーカープロセスに渡すことです。 queue.Queue
を通過するすべてのものqueue.Queue
は選択可能でなければならず、 foo.work
はモジュールの最上位レベルで定義されていないため、 foo.work
はありません。
これは、 foo.work()
を呼び出すトップレベルの関数を定義することで修正できます:
def work(foo):
foo.work()
pool.apply_async(work,args=(foo,))
Foo
はトップレベルで定義され、 foo.__dict__
はpicklableであるため、 foo
は選択可能であることに注意してください。
プロファイラを使って完全に動作するコードにエラー出力を生成することもできます。
これはWindows上で行われたことに注意してください(ここでは、フォークは少しエレガントです)。
私は走っていた:
python -m profile -o output.pstats <script>
そして、プロファイリングを削除するとエラーが取り除かれ、プロファイリングがそれを復元することがわかりました。 私が働いていたコードを知っていたので、私も頭がおかしくなりました。 私は何かがpool.pyを更新したかどうかをチェックしていましたが、沈没感があり、プロファイリングがなくなりました。
誰か他の人がそれに遭遇した場合のために、アーカイブのためにここに投稿してください。
私は同じ問題を抱えていました。 私が学んだものを小さなオープンソースのPythonスクリプトに入れて、子供のためにマルチプロセッシングと呼ぶことにしました。 私はそれが本当に簡単にマルチプロセッシングを使用すると思う。 あなたはGitHubで見つけることができます:
https://github.com/predictedblog/multiprocessing_for_kids 。
私はまた、それを使用する方法に関する例を持つ2つのブログ投稿を書いた:
https://predicted.blog/multiprocessing-for-kids/
https://predicted.blog/multiprocessing-for-kids-shared-variables/
複数のプロセスでyourFunctionを実行するには、doMultiprocessingLoop(yourFunction、Iterator)という関数を使用します。
私は、同じような問題に遭遇した人々が、私が行ったように何度も多重処理を使用するのを手助けしたいと思っています。 これは、プロセス間の変数の共有やそれらからの戻り値のような、多くの単純なユースケースで機能します。 結果を返すことによってすべてのプロセスを終了することも可能です。 詳細については、上記のブログ記事をお読みください。 それらを読むことで、少なくともマルチプロセッシングがどのように機能し、どこに限界があるかをよりよく理解することができます。
機能を追加したい場合は、スクリプトの編集をためらってください。 プルのリクエストも歓迎します。 あなたがこの中から何かを大きくしたいのであれば、私に連絡すること自由に感じてください。私はあなたに管理者権限を与えます。