返り値 - python 排他制御




どのようにPythonのスレッドから戻り値を取得するには? (14)

1つの通常の解決策は、関数fooを次のようなデコレータでラップすることです

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

その後、コード全体がそのように見えるかもしれません

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]

for t in threads:
    t.start()
    while(True):
        if(len(threading.enumerate()) < max_num):
            break
for t in threads:
    t.join()
return result

注意

1つの重要な問題は、戻り値が非順序である可能性があることです 。 (実際には、 return valueは必ずしもqueueに保存されるわけではありません。任意のスレッドセーフなデータ構造を選択できるからです)

どのようにスレッドから返される値'foo'を取得するには?

from threading import Thread

def foo(bar):
    print 'hello {}'.format(bar)
    return 'foo'

thread = Thread(target=foo, args=('world!',))
thread.start()
ret = thread.join()
print ret

上に示した1つの明白な方法は、 None返します。


FWIWでは、 multiprocessingモジュールにはPoolクラスを使用するための素晴らしいインターフェイスがあります。 また、プロセスではなくスレッドを使用する場合は、 multiprocessing.pool.ThreadPoolクラスをドロップイン置換として使用できます。

def foo(bar, baz):
  print 'hello {0}'.format(bar)
  return 'foo' + baz

from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)

async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo

# do some other stuff in the main process

return_val = async_result.get()  # get the return value from your function.

Jakeの答えは良いですが、スレッドプールを使用したくない場合(必要なスレッドの数はわかりませんが、必要に応じて作成する必要がある場合)、スレッド間で情報を送信するための良い方法は組み込みですQueue.Queueクラス。スレッドの安全性を提供します。

スレッドプールと同様の方法で動作させるために、次のデコレータを作成しました。

def threaded(f, daemon=False):
    import Queue

    def wrapped_f(q, *args, **kwargs):
        '''this function calls the decorated function and puts the 
        result in a queue'''
        ret = f(*args, **kwargs)
        q.put(ret)

    def wrap(*args, **kwargs):
        '''this is the function returned from the decorator. It fires off
        wrapped_f in a new thread and returns the thread object with
        the result queue attached'''

        q = Queue.Queue()

        t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
        t.daemon = daemon
        t.start()
        t.result_queue = q        
        return t

    return wrap

次に、あなたはそれを次のように使います:

@threaded
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Thread object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result_queue.get()
print result

装飾された関数は、呼び出されるたびに新しいスレッドを作成し、結果を受け取るキューを含むThreadオブジェクトを返します。

更新

私はこの答えを投稿して以来、ずっとずっとずっと続いていますが、まだビューを取得していますので、新しいバージョンのPythonでこれを行う方法を反映するように更新すると思っていました。

並列タスク用の高水準インターフェースを提供するconcurrent.futuresモジュールにPython 3.2が追加されました。 ThreadPoolExecutorProcessPoolExecutor提供するので、同じapiを持つスレッドまたはプロセスプールを使用できます。

このAPIの利点の1つは、 ExecutorタスクをサブミットするとFutureオブジェクトが返されることです。このオブジェクトは、呼び出した呼び出し可能オブジェクトの戻り値で完了します。

これにより、 queueオブジェクトが不要になり、デコレータがかなり単純化されます。

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)

    return wrap

これは、渡されていない場合、デフォルトのモジュールスレッドプールエグゼキュータを使用します。

使い方は前と非常に似ています:

@threadpool
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Future object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result()
print result

Python 3.4以降を使用している場合、このメソッド(およびFutureオブジェクト一般)を使用すると本当に便利な機能の1つは、返された未来をラップして、 asyncio.Futureasyncio.wrap_futureです。 これはコルーチンで簡単に動作します:

result = await asyncio.wrap_future(long_task(10))

基になるconcurrent.Futureオブジェクトにアクセスする必要がない場合は、デコレータにラップを含めることができます。

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))

    return wrap

次に、イベントループスレッドからCPU集中型またはブロッキングコードをプッシュする必要があるときはいつでも、それを装飾された関数に置くことができます:

@threadpool
def some_long_calculation():
    ...

# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()

Python 3に移植されたParris / kindallのanswer join / return回答:

from threading import Thread

def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)

        self._return = None

    def run(self):
        if self._target is not None:
            self._return = self._target(*self._args, **self._kwargs)

    def join(self):
        Thread.join(self)
        return self._return


twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print(twrv.join())   # prints foo

Threadクラスは、Python 3では異なって実装されていることに注意してください。


@ JakeBiesingerの答えに対する@imanのコメントを考慮して、私はそれをさまざまな数のスレッドを持つように再構成しました:

from multiprocessing.pool import ThreadPool

def foo(bar, baz):
    print 'hello {0}'.format(bar)
    return 'foo' + baz

numOfThreads = 3 
results = []

pool = ThreadPool(numOfThreads)

for i in range(0, numOfThreads):
    results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)

# do some other stuff in the main process
# ...
# ...

results = [r.get() for r in results]
print results

pool.close()
pool.join()

乾杯、

男。


Python3でのKindallの答え

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon)
        self._return = None 

    def run(self):
        try:
            if self._target:
                self._return = self._target(*self._args, **self._kwargs)
        finally:
            del self._target, self._args, self._kwargs 

    def join(self,timeout=None):
        Thread.join(self,timeout)
        return self._return

キューを使用する:

import threading, queue

def calc_square(num, out_queue1):
  l = []
  for x in num:
    l.append(x*x)
  out_queue1.put(l)


arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())

スレッド関数のスコープの上に変更可能なものを定義し、それに結果を追加することができます。 (私はpython3と互換性があるようにコードを修正しました)

returns = {}
def foo(bar):
    print('hello {0}'.format(bar))
    returns[bar] = 'foo'

from threading import Thread
t = Thread(target=foo, args=('world!',))
t.start()
t.join()
print(returns)

これは{'world!': 'foo'}返します{'world!': 'foo'}

関数inputを結果dictのキーとして使用すると、すべての一意の入力が結果にエントリを与えることが保証されます


以下のように、プールをワーカープロセスのプールとして使用できます。

from multiprocessing import Pool


def f1(x, y):
    return x*y


if __name__ == '__main__':
    with Pool(processes=10) as pool:
        result = pool.apply(f1, (2, 3))
        print(result)

前述したように、マルチプロセッシングプールは基本スレッドよりもはるかに低速です。 いくつかの回答で提案されているようにキューを使用することは、非常に効果的な方法です。 私は小さなスレッドをたくさん実行し、辞書とそれらを組み合わせて複数の答えを回復するために辞書でそれを使用している:

#!/usr/bin/env python3

import threading
# use Queue for python2
import queue
import random

LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]

NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

def randoms(k, q):
    result = dict()
    result['letter'] = random.choice(LETTERS)
    result['number'] = random.choice(NUMBERS)
    q.put({k: result})

threads = list()
q = queue.Queue()
results = dict()

for name in ('alpha', 'oscar', 'yankee',):
    threads.append( threading.Thread(target=randoms, args=(name, q)) )
    threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
    results.update(q.get())

print(results)

私が見た1つの方法は、リストや辞書などの可変オブジェクトを、スレッドのコンストラクタに、ある種のインデックスやその他の識別子とともに渡すことです。 スレッドは、その結果をそのオブジェクトの専用スロットに格納することができます。 例えば:

def foo(bar, result, index):
    print 'hello {0}'.format(bar)
    result[index] = "foo"

from threading import Thread

threads = [None] * 10
results = [None] * 10

for i in range(len(threads)):
    threads[i] = Thread(target=foo, args=('world!', results, i))
    threads[i].start()

# do some other stuff

for i in range(len(threads)):
    threads[i].join()

print " ".join(results)  # what sound does a metasyntactic locomotive make?

join()が呼び出された関数の戻り値を返すようにしたい場合は、次のようなThreadサブクラスでこれを行うことができます:

from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs, Verbose)
        self._return = None
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args,
                                                **self._Thread__kwargs)
    def join(self):
        Thread.join(self)
        return self._return

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print twrv.join()   # prints foo

いくつかの名前のマングリングのために少し毛深くなり、 Thread実装に固有の「私的な」データ構造にアクセスしますが、それは機能します。

python3の場合

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs)
        self._return = None
    def run(self):
        print(type(self._target))
        if self._target is not None:
            self._return = self._target(*self._args,
                                                **self._kwargs)
    def join(self, *args):
        Thread.join(self, *args)
        return self._return

私のような人形のためにこれを行うための非常に簡単な方法:

import queue
import threading

# creating queue instance
q = queue.Queue()

# creating threading class
class AnyThread():
    def __init__ (self):
        threading.Thread.__init__(self)

    def run(self):
        # in this class and function we will put our test target function
        test()

t = AnyThread()

# having our test target function
def test():
    # do something in this function:
    result = 3 + 2
    # and put result to a queue instance
    q.put(result)

for i in range(3): #calling our threading fucntion 3 times (just for example)
    t.run()
    output = q.get() # here we get output from queue instance
    print(output)

>>> 5
>>> 5
>>> 5

ここの主なものはqueueモジュールです。 queue.Queue()インスタンスを作成し、それを関数にqueue.Queue()ます。 私たちは私たちの結果をそれに与え、後で私たちはスレッドを超えます。

私たちのテスト関数に引き渡された引数を使ってもう一つの例を見てください:

import queue
import threading

# creating queue instance
q = queue.Queue()

# creating threading class
class AnyThread():
    def __init__ (self):
        threading.Thread.__init__(self)

    def run(self, a, b):
        # in this class and function we will put our execution test function
        test(a, b)

t = AnyThread()

# having our test target function
def test(a, b):
    # do something in this function:
    result = a + b
    # and put result to a queue instance
    q.put(result)

for i in range(3): #calling our threading fucntion 3 times (just for example)
    t.run(3+i, 2+i)
    output = q.get() # here we get output from queue instance
    print(output)

>>> 5
>>> 7
>>> 9

私はこのラッパーを使用しています。このラッパーは、 Threadで実行するための関数を快適に変換します。戻り値または例外を処理します。 Queueオーバーヘッドは追加されません。

def threading_func(f):
    """Decorator for running a function in a thread and handling its return
    value or exception"""
    def start(*args, **kw):
        def run():
            try:
                th.ret = f(*args, **kw)
            except:
                th.exc = sys.exc_info()
        def get(timeout=None):
            th.join(timeout)
            if th.exc:
                raise th.exc[0], th.exc[1], th.exc[2] # py2
                ##raise th.exc[1] #py3                
            return th.ret
        th = threading.Thread(None, run)
        th.exc = None
        th.get = get
        th.start()
        return th
    return start

使用例

def f(x):
    return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))

@threading_func
def th_mul(a, b):
    return a * b
th = th_mul("text", 2.5)

try:
    print(th.get())
except TypeError:
    print("exception thrown ok.")

threadingモジュールに関する注意

スレッド関数の快適な戻り値と例外処理は頻繁に "Pythonの"必要性があり、実際にはthreadingモジュールによって提供されているはずです。 ThreadPoolは簡単なタスクのためにあまりにも多くのオーバーヘッドを持っています - 3スレッドを管理し、多くの官僚主義。 残念なことに、 Threadのレイアウトは元々はJavaからコピーされました。例えば、まだ役に立たない1st(!)コンストラクタのパラメータgroupから見ればわかります。


私は親切な答えを盗んでちょっとだけきれいにしました。

重要な部分は、* argsと** kwargsをjoin()に追加してタイムアウトを処理することです

class threadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super(threadWithReturn, self).__init__(*args, **kwargs)

        self._return = None

    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)

    def join(self, *args, **kwargs):
        super(threadWithReturn, self).join(*args, **kwargs)

        return self._return

下の更新された回答

これは私の最も普及したupvoted答えですので、py2とpy3の両方で実行されるコードで更新することにしました。

さらに、私はThread.join()に関する理解の欠如を示すこの質問に対する多くの答えを見る。 いくつかは完全にtimeout argを処理できません。 しかし、(1) Noneを返すターゲット関数があり、(2) timeout argをjoin()に渡したときのインスタンスについても知っておくべきコーナーケースがあります。 このコーナーケースを理解するには、「テスト4」を参照してください。

py2とpy3で動作するThreadWithReturnクラス:

import sys
from threading import Thread
from builtins import super    # https://.com/a/30159479

if sys.version_info >= (3, 0):
    _thread_target_key = '_target'
    _thread_args_key = '_args'
    _thread_kwargs_key = '_kwargs'
else:
    _thread_target_key = '_Thread__target'
    _thread_args_key = '_Thread__args'
    _thread_kwargs_key = '_Thread__kwargs'

class ThreadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._return = None

    def run(self):
        target = getattr(self, _thread_target_key)
        if not target is None:
            self._return = target(*getattr(self, _thread_args_key), **getattr(self, _thread_kwargs_key))

    def join(self, *args, **kwargs):
        super().join(*args, **kwargs)
        return self._return

いくつかのサンプルテストを以下に示します。

import time, random

# TEST TARGET FUNCTION
def giveMe(arg, seconds=None):
    if not seconds is None:
        time.sleep(seconds)
    return arg

# TEST 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')

# TEST 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)

# TEST 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # because join() timed out before giveMe() finished

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

TEST 4で遭遇するかもしれないコーナーケースを特定できますか?

問題は、giveMe()がNone(TEST 2を参照)を返すと期待していますが、タイムアウトするとjoin()もNoneを返すことが予想されます。

returned is Noneです。

(1)giveMe()が返したもの、または

(2)join()タイムアウト

この例は、giveMe()が常にNoneを返すことがわかって以来、些細なことです。 しかし、実際には(ターゲットがNoneか他の何かを正当に返すかもしれない)場合、何が起こったかを明示的にチェックしたいと思うでしょう。

このコーナーケースに対処する方法は次のとおりです。

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

if my_thread.isAlive():
    # returned is None because join() timed out
    # this also means that giveMe() is still running in the background
    pass
    # handle this based on your app's logic
else:
    # join() is finished, and so is giveMe()
    # BUT we could also be in a race condition, so we need to update returned, just in case
    returned = my_thread.join()




multithreading