終了 - python プロセス




Pythonマルチプロセッシング酸洗エラー (5)

このソリューションでは、dillのみをインストールし、他のライブラリはpathosとしてインストールする必要はありません

def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
    """
    Unpack dumped function as target function and call it with arguments.

    :param (dumped_function, item, args, kwargs):
        a tuple of dumped function and its arguments
    :return:
        result of target function
    """
    target_function = dill.loads(dumped_function)
    res = target_function(item, *args, **kwargs)
    return res


def pack_function_for_map(target_function, items, *args, **kwargs):
    """
    Pack function and arguments to object that can be sent from one
    multiprocessing.Process to another. The main problem is:
        «multiprocessing.Pool.map*» or «apply*»
        cannot use class methods or closures.
    It solves this problem with «dill».
    It works with target function as argument, dumps it («with dill»)
    and returns dumped function with arguments of target function.
    For more performance we dump only target function itself
    and don't dump its arguments.
    How to use (pseudo-code):

        ~>>> import multiprocessing
        ~>>> images = [...]
        ~>>> pool = multiprocessing.Pool(100500)
        ~>>> features = pool.map(
        ~...     *pack_function_for_map(
        ~...         super(Extractor, self).extract_features,
        ~...         images,
        ~...         type='png'
        ~...         **options,
        ~...     )
        ~... )
        ~>>>

    :param target_function:
        function, that you want to execute like  target_function(item, *args, **kwargs).
    :param items:
        list of items for map
    :param args:
        positional arguments for target_function(item, *args, **kwargs)
    :param kwargs:
        named arguments for target_function(item, *args, **kwargs)
    :return: tuple(function_wrapper, dumped_items)
        It returs a tuple with
            * function wrapper, that unpack and call target function;
            * list of packed target function and its' arguments.
    """
    dumped_function = dill.dumps(target_function)
    dumped_items = [(dumped_function, item, args, kwargs) for item in items]
    return apply_packed_function_for_map, dumped_items

また、配列の数が少ない場合にも機能します。

私は単純な例でエラーを再現できないことを申し訳ありません。私のコードは投稿するには複雑すぎます。 通常のPythonの代わりにIPythonシェルでプログラムを実行すると、うまくいきます。

私はこの問題に関する以前のメモを調べました。 それらはすべて、クラス関数内で定義されたプールツーコール関数を使用することによって発生しました。 しかし、これは私には当てはまりません。

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

助けていただければ幸いです。

更新:機能I pickleは、モジュールの最上位レベルで定義されています。 ただし、ネストされた関数を含む関数を呼び出します。 すなわちf()はg()を呼び出し、h()は入れ子関数i()を持ち、pool.apply_async(f)を呼び出しています。 f()、g()、h()はすべてトップレベルで定義されています。 私はこのパターンでより単純な例を試みましたが、それは動作します。


Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

このエラーは、非同期ジョブに渡されたモデルオブジェクト内にinbuilt関数がある場合にも発生します。

したがって、渡されたモデルオブジェクトに inbuilt関数がないことを確認してください。 (私たちのケースでは、特定のフィールドを追跡するためにモデル内でdjango-model-utils FieldTracker()関数を使用していFieldTracker() )。 ここに関連するGitHubの問題へのlinkがあります。


ここでは、 ピックすることができるもののリストです 。 特に、関数はモジュールのトップレベルで定義されている場合にのみpicklableです。

このコードは次のとおりです。

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

pool = mp.Pool()
foo = Foo()
pool.apply_async(foo.work)
pool.close()
pool.join()

投稿したものとほぼ同じエラーが表示されます。

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

問題は、 poolメソッドがすべてqueue.Queueを使用して、タスクをワーカープロセスに渡すことです。 queue.Queueを通過するすべてのものqueue.Queueは選択可能でなければならず、 foo.workはモジュールの最上位レベルで定義されていないため、 foo.workはありません。

これは、 foo.work()を呼び出すトップレベルの関数を定義することで修正できます:

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

Fooはトップレベルで定義され、 foo.__dict__はpicklableであるため、 fooは選択可能であることに注意してください。


プロファイラを使って完全に動作するコードにエラー出力を生成することもできます。

これはWindows上で行われたことに注意してください(ここでは、フォークは少しエレガントです)。

私は走っていた:

python -m profile -o output.pstats <script> 

そして、プロファイリングを削除するとエラーが取り除かれ、プロファイリングがそれを復元することがわかりました。 私が働いていたコードを知っていたので、私も頭がおかしくなりました。 私は何かがpool.pyを更新したかどうかをチェックしていましたが、沈没感があり、プロファイリングがなくなりました。

誰か他の人がそれに遭遇した場合のために、アーカイブのためにここに投稿してください。


私は同じ問題を抱えていました。 私が学んだものを小さなオープンソースのPythonスクリプトに入れて、子供のためにマルチプロセッシングと呼ぶことにしました。 私はそれが本当に簡単にマルチプロセッシングを使用すると思う。 あなたはGitHubで見つけることができます:

https://github.com/predictedblog/multiprocessing_for_kids

私はまた、それを使用する方法に関する例を持つ2つのブログ投稿を書いた:

https://predicted.blog/multiprocessing-for-kids/

https://predicted.blog/multiprocessing-for-kids-shared-variables/

複数のプロセスでyourFunctionを実行するには、doMultiprocessingLoop(yourFunction、Iterator)という関数を使用します。

私は、同じような問題に遭遇した人々が、私が行ったように何度も多重処理を使用するのを手助けしたいと思っています。 これは、プロセス間の変数の共有やそれらからの戻り値のような、多くの単純なユースケースで機能します。 結果を返すことによってすべてのプロセスを終了することも可能です。 詳細については、上記のブログ記事をお読みください。 それらを読むことで、少なくともマルチプロセッシングがどのように機能し、どこに限界があるかをよりよく理解することができます。

機能を追加したい場合は、スクリプトの編集をためらってください。 プルのリクエストも歓迎します。 あなたがこの中から何かを大きくしたいのであれば、私に連絡すること自由に感じてください。私はあなたに管理者権限を与えます。





pickle