Python 3.7 - concurrent.futures

समवर्ती.फुट्स - समानांतर कार्यों का शुभारंभ




python

समवर्ती.फुट्स - समानांतर कार्यों का शुभारंभ

संस्करण 3.2 में नया।

स्रोत कोड: Lib/concurrent/futures/thread.py और Lib/concurrent/futures/process.py

concurrent.futures मॉड्यूल अतुल्यकालिक रूप से निष्पादन योग्य कॉल के लिए एक उच्च-स्तरीय इंटरफ़ेस प्रदान करता है।

एसिंक्रोनस निष्पादन थ्रेड्स के साथ किया जा सकता है, ThreadPoolExecutor एक्ज़ीक्यूटर, या अलग-अलग प्रक्रियाओं का उपयोग करके, ProcessPoolExecutor एक्ज़ीक्यूटर का उपयोग करके किया जा सकता है। दोनों एक ही इंटरफ़ेस को लागू करते हैं, जो कि एब्सट्रैक्ट Executor क्लास द्वारा परिभाषित किया गया है।

निष्पादक वस्तुएं

class concurrent.futures.Executor

एक अमूर्त वर्ग जो एसिंक्रोनस रूप से कॉल निष्पादित करने के लिए तरीके प्रदान करता है। इसका उपयोग सीधे नहीं किया जाना चाहिए, लेकिन इसके ठोस उपवर्गों के माध्यम से।

submit(fn, *args, **kwargs)

कॉल करने योग्य, fn , को fn(*args **kwargs) रूप में शेड्यूल किया जाता है और कॉल करने योग्य के निष्पादन का प्रतिनिधित्व करने वाली Future Object लौटाता है।

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
map(func, *iterables, timeout=None, chunksize=1)

सिवाय map(func, *iterables) :

  • iterables lazily के बजाय तुरंत एकत्र किए जाते हैं;
  • func को एसिंक्रोनस रूप से निष्पादित किया जाता है और func को कई कॉल समवर्ती रूप से किए जा सकते हैं।

लौटाए गए __next__() एक concurrent.futures.TimeoutError __next__() उठाता है यदि __next__() कहा जाता है और परिणाम मूल कॉल से __next__() लिए टाइमआउट सेकंड के बाद उपलब्ध नहीं है। टाइमआउट एक इंट या फ्लोट हो सकता है। यदि टाइमआउट निर्दिष्ट नहीं है या None , तो प्रतीक्षा समय की कोई सीमा नहीं है।

यदि कोई फ़ेक कॉल अपवाद को उठाता है, तो उस अपवाद को तब उठाया जाएगा जब उसका मान पुनरावृत्तिकर्ता से लिया गया हो।

ProcessPoolExecutor का उपयोग करते समय, यह विधि पुनरावृत्तियों को कई संख्या में काटता है जो इसे अलग-अलग कार्यों के लिए पूल में जमा करता है। इन चंक्सों के आकार (लगभग) को एक धनात्मक पूर्णांक के लिए विखंडू निर्धारित करके निर्दिष्ट किया जा सकता है। बहुत लंबे पुनरावृत्तियों के लिए, चंक्साइज़ के लिए एक बड़े मूल्य का उपयोग करने से डिफ़ॉल्ट आकार की तुलना में प्रदर्शन में उल्लेखनीय रूप से सुधार हो सकता है। थ्रेडपूल एक्ज़ीक्यूटर के साथ, चंक्साइज़ का कोई प्रभाव नहीं है।

संस्करण 3.5 में परिवर्तित: चंक्साइज़ तर्क जोड़ा गया।

shutdown(wait=True)

निष्पादक को संकेत दें कि यह किसी भी संसाधन को मुक्त कर दे जो इसका उपयोग तब कर रहा है जब वर्तमान में लंबित वायदा निष्पादित हो रहा है। बंद करने के बाद Executor.submit() और Executor.map() लिए कॉल Executor.submit() को बढ़ाएगा।

यदि प्रतीक्षा True तो यह विधि तब तक वापस नहीं आएगी जब तक कि सभी लंबित वायदा को पूरा नहीं किया जाता है और निष्पादक से जुड़े संसाधनों को मुक्त कर दिया गया है। यदि प्रतीक्षा False तो यह विधि तुरंत वापस आ जाएगी और निष्पादक से जुड़े संसाधनों को मुक्त कर दिया जाएगा जब सभी लंबित वायदा को पूरा किया जाएगा। प्रतीक्षा के मूल्य के बावजूद, पूरे पायथन कार्यक्रम से बाहर नहीं निकलेंगे जब तक कि सभी लंबित वायदा को पूरा नहीं किया जाता है।

यदि आप कथन के with उपयोग करते हैं, तो आप इस पद्धति को स्पष्ट रूप से कॉल करने से बच सकते हैं, जो Executor को बंद कर देगा (प्रतीक्षा के रूप में यदि Executor.shutdown() शटडाउन Executor.shutdown() को Executor.shutdown() सेट टू True कहा जाता है):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

ThreadPoolExecutor

ThreadPoolExecutor एक Executor subclass है जो एसिंक्रोनस रूप से कॉल निष्पादित करने के लिए थ्रेड्स के पूल का उपयोग करता है।

डेडलॉक तब हो सकता है जब एक Future साथ जुड़े कॉलेबल दूसरे Future के रिजल्ट का इंतजार करता है। उदाहरण के लिए:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

तथा:

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

Executor उपवर्ग जो अतुल्यकालिक रूप से कॉल निष्पादित करने के लिए अधिकांश max_workers थ्रेड्स के एक पूल का उपयोग करता है।

initializer एक वैकल्पिक कॉल करने योग्य है जिसे प्रत्येक श्रमिक थ्रेड की शुरुआत में कहा जाता है; initargs आरंभिकता के लिए पारित तर्कों का एक समूह है। इनिशियलाइज़र को एक अपवाद उठाना चाहिए, वर्तमान में लंबित सभी नौकरियां BrokenThreadPool , साथ ही पूल में अधिक नौकरियों को प्रस्तुत करने का कोई भी प्रयास।

संस्करण 3.5 में परिवर्तित: यदि max_workers None या नहीं दिया गया है, तो यह मशीन पर प्रोसेसर की संख्या को 5 गुणा करेगा, यह मानते हुए कि ThreadPoolExecutor का उपयोग अक्सर सीपीयू काम के बजाय I / O को ओवरलैप करने के लिए किया जाता है और श्रमिकों की संख्या होनी चाहिए ProcessPoolExecutor लिए श्रमिकों की संख्या से अधिक हो।

संस्करण 3.6 में नया: थ्रेड_name_prefix तर्क को उपयोगकर्ताओं को थ्रेडिंग को नियंत्रित करने की अनुमति देने के लिए जोड़ा गया था। आसान डिबगिंग के लिए पूल द्वारा बनाए गए वर्कर थ्रेड्स के लिए यह नाम।

संस्करण 3.7 में परिवर्तित: आरंभिक और initargs तर्क जोड़े गए।

ThreadPoolExecutor उदाहरण

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

ProcessPoolExecutor class एक Executor subclass है जो कॉल को निष्पादित करने के लिए प्रक्रियाओं के एक पूल का उपयोग करता है। ProcessPoolExecutor multiprocessing मॉड्यूल का उपयोग करता है, जो इसे ग्लोबल इंटरप्रेटर लॉक को साइड-स्टेप करने की अनुमति देता है, लेकिन इसका अर्थ यह भी है कि केवल पिकेबल ऑब्जेक्ट्स को निष्पादित और वापस किया जा सकता है।

__main__ मॉड्यूल वर्कर __main__ द्वारा आयात योग्य होना चाहिए। इसका मतलब है कि ProcessPoolExecutor इंटरैक्टिव दुभाषिया में काम नहीं करेगा।

एक ProcessPoolExecutor को सौंपे गए कॉल करने वाले से Executor या Future मेथड को कॉल करने से गतिरोध उत्पन्न होगा।

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

एक Executor सबक्लास जो अधिकतम max_workers प्रक्रियाओं के एक पूल का उपयोग करके एसिंक्रोनस रूप से कॉल निष्पादित करता है। यदि max_workers None या नहीं दिया गया है, तो यह मशीन पर प्रोसेसर की संख्या के लिए डिफ़ॉल्ट होगा। यदि max_workers 0 बराबर या उससे कम है, तो एक ValueError जाएगी। mp_context एक बहु संदर्भ या कोई नहीं हो सकता है। इसका उपयोग श्रमिकों को लॉन्च करने के लिए किया जाएगा। यदि mp_context None या नहीं दिया गया है, तो डिफ़ॉल्ट मल्टीप्रोसेसिंग संदर्भ का उपयोग किया जाता है।

initializer एक वैकल्पिक कॉल करने योग्य है जिसे प्रत्येक कार्यकर्ता प्रक्रिया की शुरुआत में कहा जाता है; initargs आरंभिकता के लिए पारित तर्कों का एक समूह है। इनिशियलाइज़र को एक अपवाद उठाना चाहिए, वर्तमान में लंबित सभी नौकरियां BrokenProcessPool , साथ ही पूल में और अधिक नौकरियों को प्रस्तुत करने का कोई भी प्रयास।

संस्करण 3.3 में बदला: जब कार्यकर्ता प्रक्रियाओं में से एक अचानक समाप्त हो जाता है, तो अब BrokenProcessPool त्रुटि उठाई जाती है। पहले, व्यवहार अपरिभाषित था लेकिन निष्पादक या उसके वायदा पर परिचालन अक्सर फ्रीज या गतिरोध करता था।

संस्करण 3.7 में परिवर्तित: पूल द्वारा बनाई गई कार्यकर्ता प्रक्रियाओं के लिए उपयोगकर्ताओं को start_method को नियंत्रित करने की अनुमति देने के लिए mp_context तर्क जोड़ा गया था।

आरंभिक और initargs तर्क जोड़े गए।

ProcessPoolExecutor उदाहरण

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

भविष्य की वस्तुएं

Future क्लास एक कॉल करने योग्य के अतुल्यकालिक निष्पादन को एन्क्रिप्ट करता है। Future उदाहरण Executor.submit() द्वारा बनाए गए हैं।

class concurrent.futures.Future

कॉल करने योग्य के अतुल्यकालिक निष्पादन को एन्क्रिप्ट करता है। Future उदाहरण Executor.submit() द्वारा बनाए गए हैं और परीक्षण को छोड़कर सीधे नहीं बनाए जाने चाहिए।

cancel()

कॉल रद्द करने का प्रयास करें। यदि कॉल वर्तमान में निष्पादित की जा रही है और रद्द नहीं की जा सकती है, तो विधि False वापस आ जाएगी, अन्यथा कॉल रद्द हो जाएगी और विधि True वापस आ जाएगी।

cancelled()

यदि कॉल सफलतापूर्वक रद्द कर दी गई थी, तो True लौटें

running()

यदि कॉल वर्तमान में निष्पादित की जा रही है और रद्द नहीं किया जा सकता तो True लौटें।

done()

कॉल को सफलतापूर्वक रद्द या समाप्त होने पर वापस लौटें।

result(timeout=None)

कॉल द्वारा वापस किया गया मान लौटाएं। यदि कॉल अभी तक पूरी नहीं हुई है, तो यह विधि समय-समय पर सेकंड तक प्रतीक्षा करेगी। यदि कॉल टाइमआउट सेकंड में पूरा नहीं हुआ है, तो एक concurrent.futures.TimeoutError को उठाया जाएगा। टाइमआउट एक इंट या फ्लोट हो सकता है। यदि टाइमआउट निर्दिष्ट नहीं है या None , तो प्रतीक्षा समय की कोई सीमा नहीं है।

यदि भविष्य को पूरा करने से पहले रद्द कर दिया जाता है, तो CancelledError को उठाया जाएगा।

यदि कॉल उठाया जाता है, तो यह विधि समान अपवाद उठाएगी।

exception(timeout=None)

कॉल द्वारा उठाए गए अपवाद को वापस करें। यदि कॉल अभी तक पूरी नहीं हुई है, तो यह विधि समय-समय पर सेकंड तक प्रतीक्षा करेगी। यदि कॉल टाइमआउट सेकंड में पूरा नहीं हुआ है, तो एक concurrent.futures.TimeoutError को उठाया जाएगा। टाइमआउट एक इंट या फ्लोट हो सकता है। यदि टाइमआउट निर्दिष्ट नहीं है या None , तो प्रतीक्षा समय की कोई सीमा नहीं है।

यदि भविष्य को पूरा करने से पहले रद्द कर दिया जाता है, तो CancelledError को उठाया जाएगा।

यदि कॉल बिना उठाए पूरा हो जाता है, तो None भी वापस None जाता है।

add_done_callback(fn)

भविष्य के लिए कॉल करने योग्य fn देता है। fn कहा जाएगा, भविष्य के साथ इसका एकमात्र तर्क, जब भविष्य को रद्द कर दिया जाता है या चल रहा है।

जोड़े गए कॉलबल्स को उस क्रम में बुलाया जाता है जिसे वे जोड़ा गया था और हमेशा उन्हें जोड़ने वाली प्रक्रिया से संबंधित थ्रेड में कहा जाता है। यदि कॉल करने योग्य Exception उपवर्ग उठाता है, तो उसे लॉग और अनदेखा किया जाएगा। यदि BaseException उपवर्ग को उठाता है, तो व्यवहार अपरिभाषित है।

यदि भविष्य पहले ही पूरा हो चुका है या रद्द हो गया है, तो fn को तुरंत बुलाया जाएगा।

निम्नलिखित Future तरीकों का उपयोग इकाई परीक्षणों और Executor कार्यान्वयन में उपयोग के लिए है।

set_running_or_notify_cancel()

इस पद्धति को केवल Future साथ काम करने और इकाई परीक्षणों द्वारा निष्पादित करने से पहले Executor कार्यान्वयन द्वारा बुलाया जाना चाहिए।

यदि विधि False लौटाती है, तो Future रद्द कर दिया गया था, अर्थात Future.cancel() को कॉल किया गया और True लौटा दिया गया। Future पूरा होने पर प्रतीक्षा कर रहे किसी भी धागे ( as_completed() या wait() माध्यम से as_completed() जाएगा।

यदि यह विधि True तो Future रद्द नहीं किया गया है और उसे चालू स्थिति में रखा गया है, यानी Future.running() कॉल True वापस आ जाएगी।

इस विधि को केवल एक बार कॉल किया जा सकता है और Future.set_result() या Future.set_exception() बाद Future.set_result() नहीं किया जा सकता है।

set_result(result)

परिणाम के लिए Future साथ संबद्ध कार्य का परिणाम सेट करता है।

इस पद्धति का उपयोग केवल Executor कार्यान्वयन और इकाई परीक्षणों द्वारा किया जाना चाहिए।

set_exception(exception)

Future से Exception अपवाद से जुड़े कार्य का परिणाम Exception है

इस पद्धति का उपयोग केवल Executor कार्यान्वयन और इकाई परीक्षणों द्वारा किया जाना चाहिए।

मॉड्यूल कार्य

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

Future उदाहरणों के लिए प्रतीक्षा करें (संभवतः अलग-अलग Executor इंस्टेंस द्वारा बनाई गई) एफएस को पूरा करने के लिए। सेट के 2-ट्यूपल नाम देता है। पहले सेट, नामित done , पूर्ण होने से पहले किए गए वायदा (पूर्ण किए गए या रद्द किए गए) में शामिल है। दूसरा सेट, जिसका नाम not_done , जिसमें अपूर्ण वायदा शामिल है।

टाइमआउट का उपयोग लौटने से पहले प्रतीक्षा करने के लिए अधिकतम सेकंड नियंत्रित करने के लिए किया जा सकता है। टाइमआउट एक इंट या फ्लोट हो सकता है। यदि टाइमआउट निर्दिष्ट नहीं है या None , तो प्रतीक्षा समय की कोई सीमा नहीं है।

return_when इंगित करता है कि यह फ़ंक्शन कब लौटना चाहिए। यह निम्नलिखित में से एक होना चाहिए:

स्थिर विवरण
FIRST_COMPLETED जब कोई भविष्य खत्म या रद्द हो जाएगा तो फ़ंक्शन वापस आ जाएगा।
FIRST_EXCEPTION जब कोई अपवाद समाप्त करके भविष्य समाप्त हो जाता है, तो फ़ंक्शन वापस आ जाएगा। यदि कोई भविष्य अपवाद नहीं उठाता है, तो यह ALL_COMPLETED बराबर है।
ALL_COMPLETED फ़ंक्शन तब वापस आएगा जब सभी वायदा समाप्त हो जाएंगे या रद्द कर दिए जाएंगे।
concurrent.futures.as_completed(fs, timeout=None)

Future इंस्टेंसेस (संभवतः अलग-अलग Executor इंस्टेंस द्वारा बनाए गए) पर एक पुनरावर्तक देता है जो एफएस द्वारा दिया जाता है जो वायदा पूरा करता है (समाप्त या रद्द कर दिया गया)। नकली द्वारा दिए गए किसी भी वायदा को एक बार वापस कर दिया जाएगा। as_completed() कहा जाता है से पहले पूरा किया गया कोई भी वायदा पहले as_completed() होगा। लौटाए गए __next__() एक concurrent.futures.TimeoutError __next__() उठाता है यदि __next__() कहा जाता है और परिणाम मूल कॉल से as_completed() तक टाइमआउट सेकंड के बाद उपलब्ध नहीं है। टाइमआउट एक इंट या फ्लोट हो सकता है। यदि टाइमआउट निर्दिष्ट नहीं है या None , तो प्रतीक्षा समय की कोई सीमा नहीं है।

यह भी देखें

पीईपी 3148 - वायदा - अभिकलन रूप से कम्प्यूटेशन निष्पादित करता है
जिस प्रस्ताव में पायथन मानक पुस्तकालय में शामिल करने के लिए इस सुविधा का वर्णन किया गया था।

अपवाद कक्षाएं

exception concurrent.futures.CancelledError

भविष्य रद्द होने पर उठाया गया।

exception concurrent.futures.TimeoutError

जब भविष्य का ऑपरेशन दिए गए समय से अधिक हो तो उठाया।

exception concurrent.futures.BrokenExecutor

RuntimeError से व्युत्पन्न, यह अपवाद वर्ग तब उठाया जाता है जब किसी कारण के लिए एक निष्पादक टूट जाता है, और नए कार्यों को प्रस्तुत करने या निष्पादित करने के लिए उपयोग नहीं किया जा सकता है।

संस्करण 3.7 में नया।

exception concurrent.futures.thread.BrokenThreadPool

BrokenExecutor से व्युत्पन्न, यह अपवाद वर्ग तब उठाया जाता है जब किसी ThreadPoolExecutor के किसी एक कार्यकर्ता को प्रारंभ करने में विफल रहा हो।

संस्करण 3.7 में नया।

exception concurrent.futures.process.BrokenProcessPool

BrokenExecutor (पूर्व में BrokenExecutor ) से व्युत्पन्न, यह अपवाद वर्ग तब उठाया जाता है जब एक BrokenExecutor के श्रमिकों में से एक ProcessPoolExecutor गैर-स्वच्छ फैशन में समाप्त कर दिया है (उदाहरण के लिए, यदि वह बाहर से मारा गया था)।

संस्करण 3.3 में नया।