Python 3.7 - multiprocessing

मल्टीप्रोसेसिंग - प्रक्रिया-आधारित समानता




python

मल्टीप्रोसेसिंग - प्रक्रिया-आधारित समानता

स्रोत कोड: Lib/multiprocessing/

परिचय

multiprocessing एक पैकेज है जो threading मॉड्यूल के समान एपीआई का उपयोग करके स्पॉनिंग प्रक्रियाओं का समर्थन करता है। multiprocessing पैकेज थ्रेड के बजाय उपप्रोसेस का उपयोग करके ग्लोबल इंटरप्रेटर लॉक को प्रभावी ढंग से साइड-स्टेपिंग करने के लिए स्थानीय और दूरस्थ दोनों संगामिति प्रदान करता है। इसके कारण, multiprocessing मॉड्यूल प्रोग्रामर को दिए गए मशीन पर कई प्रोसेसर का पूरी तरह से लाभ उठाने की अनुमति देता है। यह यूनिक्स और विंडोज दोनों पर चलता है।

multiprocessing मॉड्यूल एपीआई का भी परिचय देता है जिसमें threading मॉड्यूल में एनालॉग नहीं होते हैं। इसका एक प्रमुख उदाहरण Pool ऑब्जेक्ट है जो कई इनपुट मूल्यों में फ़ंक्शन के निष्पादन को समानांतर करने का एक सुविधाजनक साधन प्रदान करता है, इनपुट डेटा को प्रक्रियाओं (डेटा समानता) में वितरित करता है। निम्नलिखित उदाहरण एक मॉड्यूल में ऐसे कार्यों को परिभाषित करने की सामान्य प्रथा को प्रदर्शित करता है ताकि बच्चे की प्रक्रियाएं उस मॉड्यूल को सफलतापूर्वक आयात कर सकें। Pool का उपयोग कर डेटा समानता का यह मूल उदाहरण है,

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

मानक आउटपुट पर प्रिंट होगा

[1, 4, 9]

Process वर्ग

multiprocessing , Process ऑब्जेक्ट बनाकर और फिर उसके start() मेथड को कॉल करके Process जाता है। Process threading.Thread के एपीआई का अनुसरण करती है। मल्टीप्रोसेस प्रोग्राम का एक तुच्छ उदाहरण है

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

इसमें शामिल व्यक्तिगत प्रक्रिया आईडी दिखाने के लिए, यहां एक विस्तारित उदाहरण दिया गया है:

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

क्यों if __name__ == '__main__' भाग आवश्यक है, इसकी व्याख्या के लिए, प्रोग्रामिंग दिशानिर्देश देखें

प्रसंग और शुरू करने के तरीके

प्लेटफ़ॉर्म के आधार पर, multiprocessing प्रक्रिया शुरू करने के लिए तीन तरीकों का समर्थन करता है। ये प्रारंभ विधियाँ हैं

अंडे

मूल प्रक्रिया एक ताजा अजगर दुभाषिया प्रक्रिया शुरू करती है। चाइल्ड प्रोसेस केवल उन संसाधनों को इनहेरिट करेगा, जो प्रोसेस ऑब्जेक्ट run() विधि को चलाने के लिए आवश्यक हैं। विशेष रूप से, मूल प्रक्रिया से अनावश्यक फ़ाइल डिस्क्रिप्टर और हैंडल विरासत में नहीं मिलेंगे। इस विधि का उपयोग करके प्रक्रिया शुरू करना कांटा या कांटा लगाने वाले की तुलना में धीमा है।

यूनिक्स और विंडोज पर उपलब्ध है। विंडोज पर डिफ़ॉल्ट।

कांटा

पैरेंट os.fork() को कांटा करने के लिए पैरेंट प्रोसेस os.fork() का उपयोग करता है। बच्चे की प्रक्रिया, जब यह शुरू होती है, तो प्रभावी रूप से मूल प्रक्रिया के समान होती है। माता-पिता के सभी संसाधन बाल प्रक्रिया द्वारा विरासत में मिले हैं। ध्यान दें कि एक मल्टीथ्रेडेड प्रक्रिया को सुरक्षित रूप से फोर्क करना समस्याग्रस्त है।

यूनिक्स पर ही उपलब्ध है। यूनिक्स पर डिफ़ॉल्ट।

forkserver

जब प्रोग्राम शुरू होता है और forkserver start पद्धति का चयन करता है, तो एक सर्वर प्रक्रिया शुरू की जाती है। तब से, जब भी किसी नई प्रक्रिया की आवश्यकता होती है, तो मूल प्रक्रिया सर्वर से जुड़ जाती है और अनुरोध करती है कि यह एक नई प्रक्रिया का दावा करती है। कांटा सर्वर प्रक्रिया सिंगल थ्रेडेड है इसलिए यह os.fork() का उपयोग करने के लिए सुरक्षित है। कोई अनावश्यक संसाधन विरासत में नहीं मिले हैं।

यूनिक्स प्लेटफार्मों पर उपलब्ध है जो यूनिक्स पाइपों पर फाइल डिस्क्रिप्टर पास करने का समर्थन करता है।

संस्करण 3.4 में बदला गया: स्पॉन को सभी यूनिक्स प्लेटफार्मों पर जोड़ा गया, और कुछ यूनिक्स प्लेटफार्मों के लिए फोर्सेवर जोड़ा गया। बाल प्रक्रियाएं अब सभी माता-पिता को विंडोज पर विरासत में प्राप्त होने वाले संभालती हैं।

यूनिक्स पर स्पॉन या फोर्स्कवर स्टार्ट विधियों का उपयोग करते हुए, एक सेमाफ़ोर ट्रैकर प्रक्रिया भी शुरू करेगा जो प्रोग्राम की प्रक्रियाओं द्वारा बनाए गए अनलिंकड नाम के सेमाफोर को ट्रैक करता है। जब सभी प्रक्रियाओं ने सेमाफ़ोर ट्रैकर को बाहर निकाल दिया है तो किसी भी शेष सेमाफ़ोर्स को अनलिंक कर दिया है। आमतौर पर कोई भी नहीं होना चाहिए, लेकिन अगर एक प्रक्रिया को एक सिग्नल द्वारा मार दिया गया था तो कुछ "लीक" सेमाफोर हो सकते हैं। (नामित सेमाफोर्स को अनलिंक करना एक गंभीर मामला है क्योंकि सिस्टम केवल एक सीमित संख्या की अनुमति देता है, और वे अगले रिबूट तक स्वचालित रूप से अनलिंक नहीं होंगे।)

प्रारंभ विधि का चयन करने के लिए आप मुख्य मॉड्यूल के set_start_method() उपयोग करें if __name__ == '__main__' खंड में। उदाहरण के लिए:

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

set_start_method() प्रोग्राम में एक से अधिक बार उपयोग नहीं किया जाना चाहिए।

वैकल्पिक रूप से, आप एक संदर्भ ऑब्जेक्ट प्राप्त करने के लिए get_context() का उपयोग कर सकते हैं। प्रसंग ऑब्जेक्ट्स में बहुप्रोसेसर मॉड्यूल के समान API है, और एक ही प्रोग्राम में एक से अधिक प्रारंभ विधियों का उपयोग करने की अनुमति देता है।

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

ध्यान दें कि एक संदर्भ से संबंधित वस्तुएँ भिन्न संदर्भ के लिए प्रक्रियाओं के अनुकूल नहीं हो सकती हैं। विशेष रूप से, कांटे के संदर्भ का उपयोग करके बनाए गए ताले को स्पॉन या फ़ॉर्स्कवर स्टार्ट विधियों का उपयोग करके शुरू की गई प्रक्रियाओं को पारित नहीं किया जा सकता है।

एक पुस्तकालय जो एक विशेष प्रारंभ विधि का उपयोग करना चाहता है, उसे संभवतः पुस्तकालय उपयोगकर्ता की पसंद में हस्तक्षेप से बचने के लिए get_context() का उपयोग करना चाहिए।

प्रक्रियाओं के बीच वस्तुओं का आदान-प्रदान

multiprocessing प्रक्रियाओं के बीच दो प्रकार के संचार चैनल का समर्थन करता है:

कतार

Queue वर्ग Queue निकट क्लोन है। queue.Queue । उदाहरण के लिए:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

कतारें धागा और प्रक्रिया सुरक्षित हैं।

पाइप्स

Pipe() फ़ंक्शन Pipe() जुड़े कनेक्शन ऑब्जेक्ट की एक जोड़ी देता है जो डिफ़ॉल्ट रूप से द्वैध (दो-तरफा) है। उदाहरण के लिए:

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

Pipe() द्वारा लौटाए गए दो कनेक्शन ऑब्जेक्ट Pipe() के दो सिरों का प्रतिनिधित्व करते हैं। प्रत्येक कनेक्शन ऑब्जेक्ट में send() और recv() विधियों (दूसरों के बीच) हैं। ध्यान दें कि एक पाइप में डेटा दूषित हो सकता है यदि दो प्रक्रियाएं (या धागे) एक ही समय में पाइप के एक ही छोर से पढ़ने या लिखने की कोशिश करती हैं। बेशक एक ही समय में पाइप के विभिन्न सिरों का उपयोग करने वाली प्रक्रियाओं से भ्रष्टाचार का कोई खतरा नहीं है।

प्रक्रियाओं के बीच सिंक्रनाइज़ेशन

multiprocessing में threading से सभी सिंक्रोनाइज़ेशन प्राइमेट के समतुल्य होते हैं। उदाहरण के लिए, कोई यह सुनिश्चित करने के लिए एक लॉक का उपयोग कर सकता है कि एक समय में केवल एक प्रक्रिया मानक आउटपुट पर प्रिंट करती है:

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

विभिन्न प्रक्रियाओं से लॉक आउटपुट का उपयोग किए बिना सभी मिश्रित होने के लिए उत्तरदायी है।

प्रक्रियाओं के बीच स्थिति साझा करना

जैसा कि ऊपर उल्लेख किया गया है, जब समवर्ती प्रोग्रामिंग करते हैं तो आमतौर पर जहां तक ​​संभव हो साझा राज्य का उपयोग करने से बचने के लिए सबसे अच्छा है। कई प्रक्रियाओं का उपयोग करते समय यह विशेष रूप से सच है।

हालाँकि, यदि आपको वास्तव में कुछ साझा डेटा का उपयोग करने की आवश्यकता है, तो multiprocessing ऐसा करने के कुछ तरीके प्रदान करता है।

शेयर्ड मेमोरी

डेटा को Value या Array का उपयोग करके एक साझा मेमोरी मैप में संग्रहीत किया जा सकता है। उदाहरण के लिए, निम्न कोड

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

छप जाएगा

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

num और गिरफ्तारी बनाते समय उपयोग किए जाने वाले 'd' और 'i' तर्क array मॉड्यूल द्वारा उपयोग किए गए प्रकार के टाइपकोड हैं: 'd' एक दोहरे परिशुद्धता फ्लोट को इंगित करता है और 'i' एक हस्ताक्षरित पूर्णांक इंगित करता है। ये साझा ऑब्जेक्ट प्रक्रिया और थ्रेड-सुरक्षित होंगे।

साझा मेमोरी का उपयोग करने में अधिक लचीलेपन के लिए, multiprocessing.sharedctypes मॉड्यूल का उपयोग कर सकते हैं जो साझा मेमोरी से आवंटित मनमाने ढंग से ctypes ऑब्जेक्ट्स के निर्माण का समर्थन करता है।

सर्वर प्रक्रिया

प्रबंधक द्वारा लौटाई गई प्रबंधक ऑब्जेक्ट Manager() एक सर्वर प्रक्रिया को नियंत्रित करती है जो पायथन ऑब्जेक्ट्स को रखती है और अन्य प्रक्रियाओं को प्रॉक्सी का उपयोग करके हेरफेर करने की अनुमति देती है।

प्रबंधक द्वारा लौटाया गया Manager() प्रकार की list , RLock , Namespace , Lock , RLock , Semaphore , BoundedSemaphore , Condition , Event , Barrier , Queue , Value और Array । उदाहरण के लिए,

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

छप जाएगा

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

साझा स्मृति ऑब्जेक्ट्स का उपयोग करने की तुलना में सर्वर प्रक्रिया प्रबंधक अधिक लचीले होते हैं क्योंकि उन्हें मनमाना ऑब्जेक्ट प्रकारों का समर्थन करने के लिए बनाया जा सकता है। इसके अलावा, एक प्रबंधक को एक नेटवर्क पर विभिन्न कंप्यूटरों पर प्रक्रियाओं द्वारा साझा किया जा सकता है। हालांकि, वे साझा मेमोरी का उपयोग करने की तुलना में धीमी हैं।

श्रमिकों का एक पूल का उपयोग करना

Pool वर्ग कार्यकर्ता प्रक्रियाओं के एक पूल का प्रतिनिधित्व करता है। इसमें ऐसी विधियाँ हैं जो कुछ अलग-अलग तरीकों से श्रमिकों की प्रक्रियाओं के लिए कार्यों को बंद करने की अनुमति देती हैं।

उदाहरण के लिए:

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 secs
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

ध्यान दें कि पूल के तरीकों को केवल उस प्रक्रिया द्वारा उपयोग किया जाना चाहिए जिसने इसे बनाया था।

ध्यान दें

इस पैकेज के भीतर कार्यक्षमता के लिए आवश्यक है कि __main__ मॉड्यूल बच्चों द्वारा आयात किया जाए। यह प्रोग्रामिंग दिशानिर्देशों में शामिल है, हालांकि यह यहां इंगित करने के लायक है। इसका मतलब यह है कि कुछ उदाहरण, जैसे कि PoolPool उदाहरण इंटरएक्टिव इंटरप्रेटर में काम नहीं करेंगे। उदाहरण के लिए:

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'

(यदि आप इसे आजमाते हैं, तो यह वास्तव में तीन पूर्ण ट्रेसबैक का उत्पादन करेगा, जो अर्ध-यादृच्छिक रूप से इंटरलेक्टिव होगा, और फिर आपको किसी तरह मास्टर प्रक्रिया को रोकना पड़ सकता है।)

संदर्भ

multiprocessing पैकेज ज्यादातर threading मॉड्यूल के एपीआई की नकल करता है।

Process और अपवाद

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

प्रक्रिया ऑब्जेक्ट एक अलग प्रक्रिया में चलने वाली गतिविधि का प्रतिनिधित्व करते हैं। Process वर्ग में threading.Thread के सभी तरीकों के समकक्ष हैं।

कंस्ट्रक्टर को हमेशा कीवर्ड तर्कों के साथ बुलाया जाना चाहिए। समूह हमेशा None होना चाहिए; यह केवल threading.Thread के साथ संगतता के लिए मौजूद है। लक्ष्य run() विधि द्वारा आह्वान की जाने योग्य कॉल करने योग्य वस्तु है। यह None को None चूकता है, जिसका अर्थ कुछ भी नहीं है। नाम प्रक्रिया का नाम है (अधिक विवरण के लिए name देखें)। आर्ग्स लक्ष्य आह्वान के लिए तर्क टपल है। kwargs लक्ष्य आह्वान के लिए खोजशब्द तर्क का एक शब्दकोश है। यदि प्रदान किया जाता है, तो कीवर्ड-केवल डेमन तर्क True या False लिए प्रक्रिया daemon ध्वज सेट करता है। यदि None (डिफ़ॉल्ट), यह ध्वज निर्माण प्रक्रिया से विरासत में मिला होगा।

डिफ़ॉल्ट रूप से, कोई भी तर्क लक्षित करने के लिए पारित नहीं किया जाता है।

यदि कोई उपवर्ग निर्माणकर्ता को ओवरराइड करता है, तो उसे यह सुनिश्चित करना चाहिए कि वह Process.__init__() लिए कुछ और करने से पहले बेस क्लास कंस्ट्रक्टर ( Process.__init__() ) को आमंत्रित करता है।

संस्करण 3.3 में परिवर्तित: डेमन तर्क जोड़ा गया।

run()

प्रक्रिया की गतिविधि का प्रतिनिधित्व करने वाला तरीका।

आप एक उपवर्ग में इस विधि को ओवरराइड कर सकते हैं। मानक run() विधि ऑब्जेक्ट के कंस्ट्रक्टर को दिए गए कॉल करने योग्य ऑब्जेक्ट को लक्ष्य तर्क के रूप में आमंत्रित करती है, यदि कोई हो, तो क्रमिक और कीवर्ड तर्क से क्रमशः आर्ग्स और कवर्स तर्क से लिया जाता है।

start()

प्रक्रिया की गतिविधि शुरू करें।

इसे प्रति प्रक्रिया ऑब्जेक्ट पर अधिकतम एक बार कॉल किया जाना चाहिए। यह ऑब्जेक्ट के run() विधि को एक अलग प्रक्रिया में लागू करने की व्यवस्था करता है।

join([timeout])

यदि वैकल्पिक तर्क समयावधि None (डिफ़ॉल्ट) नहीं है, तो विधि तब तक ब्लॉक होती है जब तक कि प्रक्रिया में join() विधि को टर्मिनेट कहा जाता है। यदि टाइमआउट एक सकारात्मक संख्या है, तो यह अधिकांश टाइमआउट सेकंड में ब्लॉक हो जाता है। ध्यान दें कि विधि None लौटाता है यदि इसकी प्रक्रिया समाप्त हो जाती है या यदि विधि समय समाप्त हो जाती है। यह निर्धारित करने के लिए कि क्या यह समाप्त हो गया है, प्रक्रिया के exitcode की जाँच करें।

एक प्रक्रिया में कई बार शामिल हो सकते हैं।

एक प्रक्रिया स्वयं शामिल नहीं हो सकती क्योंकि इससे गतिरोध पैदा होगा। यह शुरू होने से पहले एक प्रक्रिया में शामिल होने का प्रयास करने के लिए एक त्रुटि है।

name

प्रक्रिया का नाम। नाम एक स्ट्रिंग है जिसका उपयोग केवल पहचान के उद्देश्य के लिए किया जाता है। इसका कोई शब्दार्थ नहीं है। एकाधिक प्रक्रियाओं को एक ही नाम दिया जा सकता है।

प्रारंभिक नाम निर्माणकर्ता द्वारा निर्धारित किया गया है। यदि कोई स्पष्ट नाम कंस्ट्रक्टर को प्रदान नहीं किया जाता है, तो फॉर्म का एक नाम 'प्रोसेस-एन 1 : एन 2 : ...: एन के ' का निर्माण किया जाता है, जहां प्रत्येक एन के अपने माता-पिता का एन-वें बच्चा है।

is_alive()

वापस लौटें कि क्या प्रक्रिया जीवित है।

मोटे तौर पर, एक प्रक्रिया वस्तु उस क्षण से जीवित है जब start() विधि वापस आती है जब तक कि बच्चा प्रक्रिया समाप्त नहीं हो जाती।

daemon

प्रक्रिया का डेमन ध्वज, एक बूलियन मूल्य। इसे start() करने से पहले सेट किया जाना चाहिए start() कहा जाता है।

प्रारंभिक मूल्य बनाने की प्रक्रिया से विरासत में मिला है।

जब कोई प्रक्रिया बाहर निकलती है, तो वह अपने सभी शैतानी बच्चे की प्रक्रियाओं को समाप्त करने का प्रयास करती है।

ध्यान दें कि एक शैतानी प्रक्रिया को बाल प्रक्रिया बनाने की अनुमति नहीं है। अन्यथा एक शैतानी प्रक्रिया अपने बच्चों को छोड़ देती है यदि इसकी मूल प्रक्रिया से बाहर निकलने पर यह समाप्त हो जाता है। इसके अतिरिक्त, ये यूनिक्स डेमॉन या सेवाएं नहीं हैं, वे सामान्य प्रक्रियाएं हैं जिन्हें समाप्त किया जाएगा (और शामिल नहीं) यदि गैर-डायमोनिक प्रक्रियाएं बाहर निकल गई हैं।

threading.Thread अलावा। एपीआई, Process ऑब्जेक्ट भी निम्नलिखित विशेषताओं और विधियों का समर्थन करते हैं:

pid

प्रक्रिया आईडी वापस करें। इससे पहले कि यह प्रक्रिया शुरू की जाए, यह None होगा।

exitcode

बच्चे का निकास कोड। यह None होगा यदि प्रक्रिया अभी तक समाप्त नहीं हुई है। एक नकारात्मक मान -N इंगित करता है कि बच्चे को संकेत एन द्वारा समाप्त किया गया था।

authkey

प्रक्रिया की प्रमाणीकरण कुंजी (एक बाइट स्ट्रिंग)।

जब multiprocessing को इनिशियलाइज़ किया जाता है तो मुख्य प्रक्रिया को os.urandom() का उपयोग करके एक यादृच्छिक स्ट्रिंग दी जाती है।

जब एक Process ऑब्जेक्ट बनाया जाता है, तो यह अपनी मूल प्रक्रिया की प्रमाणीकरण कुंजी को इनहेरिट कर देगा, हालांकि यह authkey को किसी अन्य स्ट्रिंग स्ट्रिंग में सेट करके परिवर्तित किया जा सकता है।

प्रमाणीकरण कुंजी देखें।

sentinel

एक सिस्टम ऑब्जेक्ट का एक संख्यात्मक हैंडल जो प्रक्रिया समाप्त होने पर "तैयार" हो जाएगा।

यदि आप एक बार multiprocessing.connection.wait() का उपयोग करके कई घटनाओं पर प्रतीक्षा करना चाहते हैं तो आप इस मान का उपयोग कर सकते हैं। अन्यथा कॉलिंग join() सरल है।

विंडोज पर, यह एक OS हैंडल है WaitForMultipleObjects WaitForSingleObject और WaitForMultipleObjects के साथ API कॉल के परिवार के लिए उपयोगी है। यूनिक्स पर, यह एक फाइल डिस्क्रिप्टर है जो select मॉड्यूल से प्राइमिटिव के साथ प्रयोग करने योग्य है।

संस्करण 3.3 में नया।

terminate()

प्रक्रिया को समाप्त करें। यूनिक्स पर यह SIGTERM सिग्नल का उपयोग करके किया जाता है; विंडोज TerminateProcess() पर प्रयोग किया जाता है। ध्यान दें कि बाहर निकलने वाले हैंडलर और अंत में क्लॉज़ इत्यादि को निष्पादित नहीं किया जाएगा।

ध्यान दें कि प्रक्रिया के वंशज प्रक्रियाओं को समाप्त नहीं किया जाएगा - वे बस अनाथ हो जाएंगे।

चेतावनी

यदि इस पद्धति का उपयोग तब किया जाता है जब संबद्ध प्रक्रिया पाइप या कतार का उपयोग कर रही है तो पाइप या कतार भ्रष्ट होने के लिए उत्तरदायी है और अन्य प्रक्रिया से अनुपयोगी हो सकती है। इसी तरह, यदि प्रक्रिया ने एक लॉक या सेमाफोर आदि का अधिग्रहण किया है, तो इसे समाप्त करना अन्य प्रक्रियाओं को गतिरोध पैदा करने के लिए उत्तरदायी है।

kill()

terminate() रूप में भी terminate() लेकिन यूनिक्स पर SIGKILL सिग्नल का उपयोग कर रहा है।

संस्करण 3.7 में नया।

close()

Process ऑब्जेक्ट को बंद करें, इससे जुड़े सभी संसाधन जारी करें। यदि अंतर्निहित प्रक्रिया अभी भी चल ValueError है, तो ValueError को उठाया जाता है। एक बार close() सफलतापूर्वक वापस आने पर, Process ऑब्जेक्ट के अधिकांश अन्य तरीके और विशेषताएं ValueError को बढ़ाएंगे।

संस्करण 3.7 में नया।

ध्यान दें कि start() , join() , is_alive() , terminate() और exitcode मेथड्स को केवल उस प्रोसेस से ही कॉल किया जाना चाहिए जिसने प्रोसेस ऑब्जेक्ट बनाया है।

Process के कुछ तरीकों का उदाहरण उपयोग:

>>> import multiprocessing, time, signal
>>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
>>> print(p, p.is_alive())
<Process(Process-1, initial)> False
>>> p.start()
>>> print(p, p.is_alive())
<Process(Process-1, started)> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print(p, p.is_alive())
<Process(Process-1, stopped[SIGTERM])> False
>>> p.exitcode == -signal.SIGTERM
True
exception multiprocessing.ProcessError

सभी multiprocessing अपवादों का आधार वर्ग।

exception multiprocessing.BufferTooShort

संदेश द्वारा पढ़े गए संदेश के लिए प्रदत्त बफर ऑब्जेक्ट बहुत कम होने पर Connection.recv_bytes_into() द्वारा उठाया गया अपवाद।

यदि e BufferTooShort उदाहरण है तो e.args[0] संदेश को बाइट स्ट्रिंग के रूप में देगा।

exception multiprocessing.AuthenticationError

प्रमाणीकरण त्रुटि होने पर उठाया गया।

exception multiprocessing.TimeoutError

एक टाइमआउट के साथ तरीकों द्वारा उठाया गया जब टाइमआउट समाप्त हो जाता है।

पाइप और कतार

कई प्रक्रियाओं का उपयोग करते समय, आम तौर पर प्रक्रियाओं के बीच संचार के लिए संदेश पासिंग का उपयोग किया जाता है और ताले जैसे किसी भी सिंक्रनाइज़ेशन प्राइमेटिव का उपयोग करने से बचा जाता है।

संदेशों को पास करने के लिए एक Pipe() (दो प्रक्रियाओं के बीच एक कनेक्शन के लिए) या एक कतार (जो कई उत्पादकों और उपभोक्ताओं को अनुमति देता है) का उपयोग कर सकता है।

Queue , SimpleQueue और JoinableQueue प्रकार बहु-निर्माता, बहु-उपभोक्ता FIFO कतार हैं, जो मानक पुस्तकालय में queue.Queue वर्ग पर आधारित हैं। वे इस बात से भिन्न हैं कि Queue में task_done() का अभाव है और पायथन 2.5 की queue.Queue में join() तरीकों join() तरीकों को join() queue.Queue वर्ग।

यदि आप JoinableQueue उपयोग JoinableQueue तो आपको JoinableQueue.task_done() को कतार से हटाए गए प्रत्येक कार्य के लिए कॉल करना होगा वरना अधूरे कार्यों की संख्या की गणना करने के लिए उपयोग किया जाने वाला JoinableQueue.task_done() अंततः एक अपवाद को बढ़ा सकता है।

ध्यान दें कि प्रबंधक ऑब्जेक्ट का उपयोग करके एक साझा कतार भी बना सकता है - Managers देखें।

ध्यान दें

multiprocessing सामान्य queue.Empty का उपयोग करता है। समय-समय पर संकेत देने के लिए queue.Empty और queue.Full अपवाद। वे multiprocessing नेमस्पेस में उपलब्ध नहीं हैं इसलिए आपको उन्हें queue से आयात करने की आवश्यकता है।

ध्यान दें

जब किसी ऑब्जेक्ट को एक कतार में रखा जाता है, तो ऑब्जेक्ट को पिक किया जाता है और एक बैकग्राउंड थ्रेड बाद में एक अंतर्निहित पाइप पर पिकेड डेटा को फ्लश करता है। इसके कुछ परिणाम हैं जो थोड़े आश्चर्यजनक हैं, लेकिन किसी भी व्यावहारिक कठिनाइयों का कारण नहीं होना चाहिए - यदि वे वास्तव में आपको परेशान करते हैं तो आप इसके बजाय एक Managers के साथ बनाई गई कतार का उपयोग कर सकते हैं।

  1. एक खाली कतार पर एक वस्तु लगाने के बाद कतार के empty() होने से पहले एक infinitesimal देरी हो सकती है empty() विधि रिटर्न False और get_nowait() कतार get_nowait() बिना वापस आ queue.Empty queue.Empty
  2. यदि कई प्रक्रियाएं ऑब्जेक्ट्स को जोड़ रही हैं, तो ऑब्जेक्ट्स को दूसरे छोर के आउट-ऑफ-ऑर्डर पर प्राप्त करना संभव है। हालांकि, एक ही प्रक्रिया द्वारा गणना की गई वस्तुएं हमेशा एक दूसरे के संबंध में अपेक्षित क्रम में रहेंगी।

चेतावनी

यदि एक प्रक्रिया का उपयोग करके प्रक्रिया को मार दिया जाता है। terminate() os.kill() terminate() या os.kill() जब यह एक Queue का उपयोग करने की कोशिश कर रहा है, तो Queue में डेटा दूषित होने की संभावना है। यह किसी अन्य प्रक्रिया के अपवाद का कारण हो सकता है जब यह बाद में कतार का उपयोग करने की कोशिश करता है।

चेतावनी

जैसा कि ऊपर उल्लेख किया गया है, अगर एक बच्चे की प्रक्रिया ने एक कतार में आइटम डाल दिया है (और इसमें JoinableQueue.cancel_join_thread ) का उपयोग नहीं किया है, तो उस प्रक्रिया को तब तक समाप्त नहीं किया जाएगा जब तक कि सभी बफ़र किए गए आइटम पाइप पर नहीं JoinableQueue.cancel_join_thread

इसका मतलब यह है कि यदि आप उस प्रक्रिया में शामिल होने का प्रयास करते हैं तो आपको गतिरोध मिल सकता है जब तक कि आप सुनिश्चित नहीं होते हैं कि कतार में लगाए गए सभी वस्तुओं का उपभोग किया गया है। इसी तरह, यदि बच्चे की प्रक्रिया गैर-डायमोनिक है तो माता-पिता की प्रक्रिया बाहर निकलने पर लटक सकती है जब वह अपने सभी गैर-डायनेमिक बच्चों में शामिल होने की कोशिश करता है।

ध्यान दें कि प्रबंधक का उपयोग करके बनाई गई एक कतार में यह समस्या नहीं है। प्रोग्रामिंग दिशानिर्देश देखें।

इंटरप्रोसेस संचार के लिए कतारों के उपयोग के Examples लिए Examples देखें।

multiprocessing.Pipe([duplex])

एक पाइप के सिरों का प्रतिनिधित्व करने वाली Connection वस्तुओं की एक जोड़ी (conn1, conn2) है।

यदि डुप्लेक्स True (डिफ़ॉल्ट) है तो पाइप द्विदिश है। यदि डुप्लेक्स False तो पाइप यूनिडायरेक्शनल है: conn1 उपयोग केवल संदेश प्राप्त करने के लिए किया जा सकता है और conn2 उपयोग केवल संदेश भेजने के लिए किया जा सकता है।

class multiprocessing.Queue([maxsize])

पाइप और कुछ तालों / सेमाफोर का उपयोग करके कार्यान्वित एक साझा साझा कतार लौटाता है। जब एक प्रक्रिया पहले कतार पर एक आइटम डालती है तो एक फीडर धागा शुरू होता है जो एक बफर से वस्तुओं को पाइप में स्थानांतरित करता है।

सामान्य queue । मानक पुस्तकालय के queue मॉड्यूल से queue.Empty और queue.Full अपवाद सिग्नल टाइमआउट के लिए उठाए जाते हैं।

Queue task_done() और join() को छोड़कर Queue सभी तरीकों को लागू करती है।

qsize()

कतार के अनुमानित आकार को वापस करें। मल्टीथ्रेडिंग / मल्टीप्रोसेसिंग शब्दार्थ के कारण, यह संख्या विश्वसनीय नहीं है।

ध्यान दें कि यह मैक ओएस एक्स की तरह यूनिक्स प्लेटफार्मों पर NotImplementedError बढ़ा सकता है जहां sem_getvalue() लागू नहीं है।

empty()

यदि कतार खाली है, तो True लौटें अन्यथा False । मल्टीथ्रेडिंग / मल्टीप्रोसेसिंग शब्दार्थ के कारण, यह विश्वसनीय नहीं है।

full()

यदि कतार पूर्ण है, तो True लौटें अन्यथा False । मल्टीथ्रेडिंग / मल्टीप्रोसेसिंग शब्दार्थ के कारण, यह विश्वसनीय नहीं है।

put(obj[, block[, timeout]])

कतार में लगाओ। यदि वैकल्पिक तर्क खंड True (डिफ़ॉल्ट) और समयबाह्य None (डिफ़ॉल्ट) नहीं है, यदि आवश्यक हो तो नि: शुल्क स्लॉट उपलब्ध होने तक ब्लॉक करें। यदि टाइमआउट एक पॉजिटिव नंबर है, तो यह ज्यादातर टाइमआउट सेकंड में ब्लॉक हो जाता है और queue.Full को queue.Full है। यदि कोई मुफ्त स्लॉट उस समय के भीतर उपलब्ध नहीं था, तो अपवाद को छोड़ दें। अन्यथा ( ब्लॉक False ), एक आइटम को कतार पर रखें यदि एक मुफ्त स्लॉट तुरंत उपलब्ध है, तो कतार को बढ़ाएं। queue.Full अपवाद (उस मामले में समय की अनदेखी की गई है)।

put_nowait(obj)

put(obj, False) लिए बराबर put(obj, False)

get([block[, timeout]])

किसी आइटम को कतार से निकालें और वापस करें। यदि वैकल्पिक आर्ग्स ब्लॉक True (डिफॉल्ट) है और टाइमआउट None (डिफॉल्ट), तब तक ब्लॉक करें जब तक कोई आइटम उपलब्ध न हो। यदि टाइमआउट एक पॉजिटिव नंबर है, तो यह ज्यादातर टाइमआउट सेकंड्स में ब्लॉक हो जाता है और queue.Empty बढ़ा देता है। उस समय के भीतर कोई आइटम उपलब्ध न होने पर queue.Empty अपवाद दें। अन्यथा (ब्लॉक False ), एक आइटम वापस करें यदि कोई तुरंत उपलब्ध है, तो queue.Empty बढ़ाएं। queue.Empty अपवाद (उस मामले में समय की अनदेखी की गई है)।

get_nowait()

get(False) करने के get(False) बराबर get(False)

Queue में queue.Queue में कुछ अतिरिक्त तरीके नहीं पाए queue.Queue queue.Queue । ये विधियां आमतौर पर अधिकांश कोड के लिए अनावश्यक हैं:

close()

संकेत दें कि वर्तमान प्रक्रिया द्वारा इस कतार में कोई और डेटा नहीं डाला जाएगा। एक बार जब यह पाइप के सभी बफ़र किए गए डेटा को फ्लश कर देगा तो बैकग्राउंड थ्रेड छोड़ देगा। यह स्वचालित रूप से कहा जाता है जब कतार कचरा एकत्र किया जाता है।

join_thread()

पृष्ठभूमि धागा में शामिल हों। इसे केवल close() बाद उपयोग किया जा सकता है। यह तब तक ब्लॉक करता है जब तक कि बैकग्राउंड थ्रेड बाहर नहीं निकल जाता, यह सुनिश्चित करता है कि बफर में सभी डेटा को पाइप में फ्लश कर दिया गया है।

डिफ़ॉल्ट रूप से यदि कोई प्रक्रिया कतार का निर्माता नहीं है तो बाहर निकलने पर यह कतार की पृष्ठभूमि के धागे से जुड़ने का प्रयास करेगा। join_thread() कुछ नहीं करने के लिए प्रक्रिया JoinableQueue.cancel_join_thread कर सकती है JoinableQueue.cancel_join_thread join_thread()

cancel_join_thread()

join_thread() को ब्लॉक होने से रोकें। विशेष रूप से, यह पृष्ठभूमि थ्रेड को स्वचालित रूप से जुड़ने से रोकता है जब प्रक्रिया बाहर निकलती है - join_thread()

इस विधि के लिए एक बेहतर नाम allow_exit_without_flush() हो सकता है। इसके कारण डेटा के खो जाने की संभावना है, और आपको निश्चित रूप से इसका उपयोग करने की आवश्यकता नहीं होगी। यह वास्तव में केवल वहाँ है यदि आपको मौजूदा प्रक्रिया को अंतर्निहित पाइप में संलग्न डेटा को फ्लश किए बिना तुरंत बाहर निकलने की आवश्यकता है, और आप खोए हुए डेटा की परवाह नहीं करते हैं।

ध्यान दें

इस वर्ग की कार्यक्षमता को मेजबान ऑपरेटिंग सिस्टम पर एक कामकाजी साझा अर्ध-कार्यान्वयन की आवश्यकता होती है। एक के बिना, इस वर्ग में कार्यक्षमता अक्षम हो जाएगी, और एक Queue को तत्काल करने के प्रयासों के परिणामस्वरूप एक ImportError । अतिरिक्त जानकारी के लिए bpo-3770 देखें। नीचे सूचीबद्ध किसी विशेष कतार प्रकार के लिए भी यही सही है।

class multiprocessing.SimpleQueue

यह एक सरलीकृत Queue प्रकार है, जो एक बंद Pipe() बहुत करीब है।

empty()

यदि कतार खाली है, तो True लौटें अन्यथा False

get()

किसी आइटम को कतार से निकालें और वापस करें।

put(item)

आइटम को कतार में रखें।

class multiprocessing.JoinableQueue([maxsize])

JoinableQueue , एक Queue JoinableQueue , एक कतार है जिसके अतिरिक्त JoinableQueue.task_done() और join() विधियां हैं।

task_done()

इंगित करें कि पूर्व में संलग्न कार्य पूर्ण है। कतार उपभोक्ताओं द्वारा उपयोग किया जाता है। कार्य प्राप्त करने के लिए उपयोग किए जाने वाले प्रत्येक get() लिए, बाद में JoinableQueue.task_done() को कॉल कतार को बताती है कि कार्य पूरा हो गया है।

यदि कोई JoinableQueue.task_done() वर्तमान में अवरुद्ध हो रहा है, तो यह तब शुरू होगा जब सभी आइटम संसाधित हो गए होंगे (इसका अर्थ है कि प्रत्येक आइटम के लिए एक JoinableQueue.task_done() कॉल प्राप्त हुआ था जिसे कतार में put() गया था)।

यदि कतार में रखी गई वस्तुओं से अधिक बार कॉल किया जाता है, तो एक ValueError

join()

तब तक रोकें जब तक कतार में सभी आइटम मिल नहीं गए और संसाधित नहीं हो गए।

जब भी कोई आइटम कतार में जोड़ा जाता है तो अधूरे कार्यों की गिनती बढ़ जाती है। जब भी कोई उपभोक्ता JoinableQueue.task_done() को यह बताता है कि आइटम को पुनर्प्राप्त किया गया था और उस पर सभी काम पूरा हो गया है, तो गिनती नीचे जाती है। जब अधूरे कामों की गिनती शून्य हो जाती है, तो join() अनब्लॉक करें।

कई तरह का

multiprocessing.active_children()

वर्तमान प्रक्रिया के सभी जीवित बच्चों की वापसी सूची।

इसे कॉल करने से किसी भी प्रक्रिया के "जुड़ने" का साइड इफेक्ट होता है, जो पहले ही समाप्त हो चुका है।

multiprocessing.cpu_count()

सिस्टम में सीपीयू की संख्या लौटाएं।

यह संख्या सीपीयू की संख्या के बराबर नहीं है जिसे वर्तमान प्रक्रिया उपयोग कर सकती है। प्रयोग करने योग्य सीपीयू की संख्या len(os.sched_getaffinity(0)) साथ प्राप्त की जा सकती है len(os.sched_getaffinity(0))

NotImplementedError बढ़ा NotImplementedError

यह भी देखें

os.cpu_count()

multiprocessing.current_process()

वर्तमान प्रक्रिया के अनुरूप Process ऑब्जेक्ट लौटाएं।

threading.current_thread() का एक एनालॉग। threading.current_thread()

multiprocessing.freeze_support()

जब एक प्रोग्राम जो multiprocessing का उपयोग करता है, उसके लिए समर्थन जोड़ें, तो विंडोज निष्पादन योग्य बनाने के लिए जमे हुए हैं। ( P22exe , PyInstaller और cx_Freeze के साथ परीक्षण किया गया है।)

मुख्य मॉड्यूल के if __name__ == '__main__' लाइन के बाद इस फ़ंक्शन को सीधे कॉल करने की आवश्यकता है। उदाहरण के लिए:

from multiprocessing import Process, freeze_support

def f():
    print('hello world!')

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

यदि freeze_support() लाइन को छोड़ दिया जाता है तो जमे हुए निष्पादन योग्य को चलाने की कोशिश freeze_support() को बढ़ाएगी।

विंडोज के अलावा किसी भी ऑपरेटिंग सिस्टम पर freeze_support() कॉलिंग का कोई असर नहीं होता है। इसके अलावा, यदि विंडोज पर मॉड्यूल को पायथन इंटरप्रेटर द्वारा सामान्य रूप से चलाया जा रहा है (प्रोग्राम को फ्रीज नहीं किया गया है), तो freeze_support() का कोई प्रभाव नहीं है।

multiprocessing.get_all_start_methods()

समर्थित प्रारंभ विधियों की सूची लौटाता है, जिनमें से पहला डिफ़ॉल्ट है। संभावित आरंभ विधियाँ 'forkserver' 'fork' , and 'spawn' और 'forkserver' । विंडोज पर केवल 'spawn' उपलब्ध है। यूनिक्स पर 'fork' और 'spawn' को हमेशा समर्थन दिया जाता है, जिसमें 'fork' डिफ़ॉल्ट होता है।

संस्करण 3.4 में नया।

multiprocessing.get_context(method=None)

एक संदर्भ वस्तु लौटाएं जिसमें multiprocessing मॉड्यूल के समान गुण हैं।

यदि विधि None तो डिफ़ॉल्ट संदर्भ वापस कर दिया जाता है। अन्यथा विधि , 'fork' , 'forkserver' 'spawn' , 'forkserver' होनी चाहिए। यदि निर्दिष्ट प्रारंभ विधि उपलब्ध नहीं है, तो ValueError को उठाया जाता है।

संस्करण 3.4 में नया।

multiprocessing.get_start_method(allow_none=False)

प्रक्रिया शुरू करने के लिए उपयोग की जाने वाली प्रारंभ विधि का नाम लौटाएँ।

यदि प्रारंभ विधि तय नहीं की गई है और allow_none गलत है, तो प्रारंभ विधि डिफ़ॉल्ट पर तय हो गई है और नाम वापस कर दिया गया है। यदि प्रारंभ विधि तय नहीं की गई है और allow_none सत्य है तो None भी वापस None जाता है।

वापसी का मूल्य 'forkserver' 'fork' ,, 'spawn' , 'forkserver' या None'fork' यूनिक्स पर डिफ़ॉल्ट है, जबकि विंडोज पर 'spawn' डिफ़ॉल्ट है।

संस्करण 3.4 में नया।

multiprocessing.set_executable()

एक बच्चे की प्रक्रिया शुरू करते समय उपयोग करने के लिए पायथन दुभाषिया का मार्ग निर्धारित करता है। (डिफ़ॉल्ट रूप से sys.executable का उपयोग किया जाता है)। एंबेडर्स को शायद कुछ काम करने की ज़रूरत होगी

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

इससे पहले कि वे बच्चे की प्रक्रिया बना सकें।

संस्करण 3.4 में बदला: अब 'spawn' स्टार्ट विधि का उपयोग किए जाने पर यूनिक्स पर समर्थित है।

multiprocessing.set_start_method(method)

वह विधि सेट करें जिसका उपयोग बाल प्रक्रियाओं को शुरू करने के लिए किया जाना चाहिए। विधि 'fork' , 'spawn' या 'forkserver'

ध्यान दें कि इसे सबसे अधिक बार बुलाया जाना चाहिए, और इसे मुख्य मॉड्यूल के if __name__ == '__main__' खंड के अंदर संरक्षित किया जाना चाहिए।

संस्करण 3.4 में नया।

कनेक्शन ऑब्जेक्ट

कनेक्शन ऑब्जेक्ट्स को लेने योग्य वस्तुओं या तारों को भेजने और प्राप्त करने की अनुमति देता है। उन्हें संदेश उन्मुख कनेक्टेड सॉकेट के रूप में सोचा जा सकता है।

कनेक्शन ऑब्जेक्ट्स आमतौर पर Pipe() का उपयोग करके बनाए जाते हैं - श्रोताओं और ग्राहकों को भी देखें।

class multiprocessing.connection.Connection
send(obj)

एक ऑब्जेक्ट को कनेक्शन के दूसरे छोर पर भेजें, जिसे recv() का उपयोग करके पढ़ा जाना चाहिए।

वस्तु को अचूक होना चाहिए। बहुत बड़े अचार (लगभग 32 MiB +, हालांकि यह OS पर निर्भर करता है) एक ValueError अपवाद उठा सकता है।

recv()

send() का उपयोग करके कनेक्शन के दूसरे छोर से भेजी गई वस्तु लौटाएं। प्राप्त करने के लिए कुछ होने तक ब्लॉक करता है। EOFError उठाता है अगर कुछ भी प्राप्त करने के लिए नहीं बचा है और दूसरा छोर बंद था।

fileno()

कनेक्शन द्वारा उपयोग की गई फ़ाइल डिस्क्रिप्टर या हैंडल लौटाएं।

close()

कनेक्शन बंद करें।

इसे स्वचालित रूप से कहा जाता है जब कनेक्शन कचरा एकत्र किया जाता है।

poll([timeout])

वापसी करें कि क्या कोई डेटा पढ़ने के लिए उपलब्ध है।

यदि टाइमआउट निर्दिष्ट नहीं है, तो यह तुरंत वापस आ जाएगा। यदि टाइमआउट एक संख्या है तो यह ब्लॉक करने के लिए सेकंड में अधिकतम समय निर्दिष्ट करता है। यदि समयबाह्य है None तो एक अनंत समय का उपयोग किया जाता है।

ध्यान दें कि एक ही बार में कई कनेक्शन ऑब्जेक्ट्स का उपयोग करके मतदान किया जा सकता है multiprocessing.connection.wait()

send_bytes(buffer[, offset[, size]])

पूर्ण संदेश के रूप में बाइट्स जैसी ऑब्जेक्ट से बाइट डेटा भेजें ।

यदि ऑफसेट दिया जाता है, तो बफर में उस स्थिति से डेटा पढ़ा जाता है । यदि आकार दिया जाता है, तो बफर से कई बाइट्स पढ़े जाएंगे। बहुत बड़े बफ़र (लगभग 32 MiB +, हालाँकि यह OS पर निर्भर करता है) एक ValueError अपवाद को बढ़ा सकता है

recv_bytes([maxlength])

एक स्ट्रिंग के रूप में कनेक्शन के दूसरे छोर से भेजे गए बाइट डेटा का पूरा संदेश लौटाएं। प्राप्त करने के लिए कुछ होने तक ब्लॉक करता है। उठाता है EOFError यदि प्राप्त करने के लिए कुछ भी नहीं बचा है और दूसरा छोर बंद हो गया है।

यदि अधिकतम गति निर्दिष्ट है और संदेश अधिकतम गति से अधिक है तो OSError उठाया जाता है और कनेक्शन अब पठनीय नहीं होगा।

संस्करण 3.3 में परिवर्तित: यह फ़ंक्शन बढ़ाता था IOError , जो अब एक उपनाम है OSError

recv_bytes_into(buffer[, offset])

बफर में कनेक्शन के दूसरे छोर से भेजे गए बाइट डेटा का पूरा संदेश पढ़ें और संदेश में बाइट्स की संख्या वापस करें। प्राप्त करने के लिए कुछ होने तक ब्लॉक करता है। उठाता है EOFError अगर वहाँ कुछ भी नहीं बचा है और दूसरे छोर को बंद कर दिया गया था।

बफर एक लिखने योग्य बाइट्स जैसी वस्तु होना चाहिए । यदि ऑफसेट दिया जाता है तो संदेश उस स्थिति से बफर में लिखा जाएगा। ऑफसेट बफर (बाइट्स) की लंबाई से कम एक गैर-नकारात्मक पूर्णांक होना चाहिए ।

यदि बफर बहुत छोटा है, तो एक BufferTooShort अपवाद उठाया जाता है और पूरा संदेश उपलब्ध होता है, e.args[0] जहां e अपवाद उदाहरण है।

संस्करण 3.3 में बदला गया: कनेक्शन ऑब्जेक्ट्स का उपयोग करके प्रक्रियाओं के बीच खुद को स्थानांतरित किया जा सकता है send() और recv()

संस्करण 3.3 में नया: कनेक्शन ऑब्जेक्ट अब संदर्भ प्रबंधन प्रोटोकॉल का समर्थन करते हैं - संदर्भ प्रबंधक प्रकार देखें __enter__() कनेक्शन ऑब्जेक्ट, और __exit__() कॉल लौटाता है close()

उदाहरण के लिए:

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

चेतावनी

recv() विधि स्वतः डेटा यह प्राप्त करता है, जो एक सुरक्षा जोखिम हो सकते जब तक कि आप प्रक्रिया है जो संदेश भेजा भरोसा कर सकते हैं unpickles।

इसलिए, जब तक कि कनेक्शन ऑब्जेक्ट का उपयोग करके उत्पादन नहीं किया गया था, तब तक Pipe() आपको किसी प्रकार के प्रमाणीकरण के बाद केवल recv() और send() विधियों का उपयोग करना चाहिए । प्रमाणीकरण कुंजी देखें ।

चेतावनी

यदि एक प्रक्रिया को मार दिया जाता है, जबकि यह एक पाइप को पढ़ने या लिखने की कोशिश कर रहा है, तो पाइप में डेटा दूषित होने की संभावना है, क्योंकि यह सुनिश्चित करना असंभव हो सकता है कि संदेश सीमाएं कहाँ झूठ हैं।

सिंक्रोनाइज़ेशन प्राइमेटीज़

आमतौर पर सिंक्रोनाइज़ेशन प्राइमेटिव्स मल्टीप्रोसेस प्रोग्राम में उतने आवश्यक नहीं होते हैं जितने कि वे एक मल्टीथ्रेडेड प्रोग्राम में होते हैं। threading मॉड्यूल के लिए प्रलेखन देखें ।

ध्यान दें कि एक प्रबंधक ऑब्जेक्ट का उपयोग करके भी सिंक्रनाइज़ेशन प्राइमेटिव्स बना सकता है - Managers देखें ।

class multiprocessing.Barrier(parties[, action[, timeout]])

एक बाधा वस्तु: का एक क्लोन threading.Barrier

संस्करण 3.3 में नया।

class multiprocessing.BoundedSemaphore([value])

एक बंधी हुई सीमफोर ऑब्जेक्ट: का एक करीबी एनालॉग threading.BoundedSemaphore

इसके निकटवर्ती एनालॉग से एकान्त अंतर मौजूद है: इसकी acquire विधि का पहला तर्क ब्लॉक का नाम है , जैसा कि इसके अनुरूप है Lock.acquire()

ध्यान दें

मैक ओएस एक्स पर, यह उस प्लेटफॉर्म पर लागू नहीं होने के Semaphore कारण से अप्रभेद्य sem_getvalue() है।

class multiprocessing.Condition([lock])

एक शर्त चर: के लिए एक उपनाम threading.Condition

यदि लॉक निर्दिष्ट है, तो यह एक Lock या RLock वस्तु से होना चाहिए multiprocessing

संस्करण 3.3 में परिवर्तित: wait_for() विधि जोड़ दिया गया।

class multiprocessing.Event

का एक क्लोन threading.Event

class multiprocessing.Lock

एक गैर-पुनरावर्ती लॉक ऑब्जेक्ट: का एक करीबी एनालॉग threading.Lock । एक बार जब एक प्रक्रिया या धागे ने एक ताला प्राप्त कर लिया है, तो बाद में किसी भी प्रक्रिया या धागे से इसे प्राप्त करने का प्रयास जारी होने तक अवरुद्ध हो जाएगा; कोई भी प्रक्रिया या धागा इसे जारी कर सकता है। threading.Lock यह थ्रेड पर लागू होने वाली अवधारणाओं और व्यवहारों को यहां पर दोहराया जाता है Lock क्योंकि यह या तो प्रक्रियाओं या थ्रेड्स पर लागू होता है, सिवाय नोट किए हुए।

ध्यान दें कि Lock वास्तव में एक फैक्ट्री फ़ंक्शन है जो multiprocessing.synchronize.Lock एक डिफ़ॉल्ट संदर्भ के साथ आरंभीकृत का एक उदाहरण देता है।

Lock संदर्भ प्रबंधक प्रोटोकॉल का समर्थन करता है और इस प्रकार with बयानों में इस्तेमाल किया जा सकता है ।

acquire(block=True, timeout=None)

एक ताला, अवरुद्ध या गैर-अवरुद्ध प्राप्त करें।

ब्लॉक तर्क True (डिफ़ॉल्ट) पर सेट होने के साथ , विधि कॉल तब तक ब्लॉक रहेगी जब तक ताला एक अनलॉक स्थिति में नहीं होता है, फिर इसे लॉक और वापसी पर सेट करें True । ध्यान दें कि इस पहले तर्क का नाम उसमें से अलग है threading.Lock.acquire()

सेट किए गए ब्लॉक तर्क के साथ False , विधि कॉल ब्लॉक नहीं होती है। यदि वर्तमान में ताला बंद अवस्था में है, तो वापस लौटें False ; अन्यथा लॉक वाली स्थिति पर लॉक सेट करें और वापस लौटें True

जब टाइमआउट के लिए एक सकारात्मक, फ्लोटिंग-पॉइंट वैल्यू के साथ आह्वान किया जाता है, तो टाइमआउट द्वारा निर्दिष्ट अधिकांश सेकंड के लिए ब्लॉक करें जब तक कि लॉक हासिल नहीं किया जा सकता। के लिए नकारात्मक मान के साथ आमंत्रण टाइमआउट एक के बराबर हैं टाइमआउट शून्य की। समयबाह्य मान None (डिफ़ॉल्ट) के साथ इनवॉइस टाइमआउट अवधि को अनंत पर सेट करता है। ध्यान दें कि टाइमआउट के None लिए नकारात्मक या मूल्यों का उपचार कार्यान्वित व्यवहार से भिन्न होता है । समय समाप्ति तर्क कोई व्यावहारिक निहितार्थ अगर है ब्लॉक तर्क पर सेट किया जाता है और इस तरह नजरअंदाज कर दिया है। रिटर्न threading.Lock.acquire() False True यदि लॉक अधिग्रहित किया गया है या False यदि टाइमआउट अवधि समाप्त हो गई है।

release()

एक ताला जारी करो। इसे किसी भी प्रक्रिया या धागे से बुलाया जा सकता है, न केवल उस प्रक्रिया या धागे से जो मूल रूप से लॉक का अधिग्रहण करता है।

व्यवहार वही है, threading.Lock.release() सिवाय इसके कि जब एक ताला ताला पर लगाया जाता है, तो एक ValueError उठाया जाता है।

class multiprocessing.RLock

एक पुनरावर्ती लॉक ऑब्जेक्ट: का एक करीबी एनालॉग threading.RLock । एक पुनरावर्ती लॉक को उस प्रक्रिया या थ्रेड द्वारा जारी किया जाना चाहिए जिसने इसे अधिग्रहित किया था। एक बार एक प्रक्रिया या धागे ने एक पुनरावर्ती ताला प्राप्त कर लिया है, वही प्रक्रिया या धागा इसे फिर से अवरुद्ध किए बिना प्राप्त कर सकता है; उस प्रक्रिया या धागे को हर बार प्राप्त होने के बाद इसे एक बार जारी करना होगा।

ध्यान दें कि RLock वास्तव में एक फैक्ट्री फ़ंक्शन है जो multiprocessing.synchronize.RLock एक डिफ़ॉल्ट संदर्भ के साथ आरंभीकृत का एक उदाहरण देता है।

RLock संदर्भ प्रबंधक प्रोटोकॉल का समर्थन करता है और इस प्रकार with बयानों में इस्तेमाल किया जा सकता है ।

acquire(block=True, timeout=None)

एक ताला, अवरुद्ध या गैर-अवरुद्ध प्राप्त करें।

जब तक ब्लॉक तर्क के साथ लागू किया जाता है True , तब तक ब्लॉक करें जब तक कि लॉक एक अनलॉक स्थिति में न हो (किसी प्रक्रिया या थ्रेड के स्वामित्व में नहीं) जब तक कि लॉक पहले से ही वर्तमान प्रक्रिया या थ्रेड के स्वामित्व में न हो। वर्तमान प्रक्रिया या थ्रेड फिर लॉक का स्वामित्व लेता है (यदि इसके पास पहले से स्वामित्व नहीं है) और लॉक इनक्रीमेंट के अंदर रिकर्सन लेवल एक के बाद एक हो जाता है, जिसके परिणामस्वरूप रिटर्न वैल्यू होती है True । ध्यान दें कि इस तर्क के कार्यान्वयन की तुलना में इस पहले तर्क के व्यवहार में कई अंतर हैं threading.RLock.acquire() , जो तर्क के नाम से शुरू होता है।

जब ब्लॉक तर्क के साथ लागू किया जाता है False , तो ब्लॉक न करें। यदि किसी अन्य प्रक्रिया या थ्रेड द्वारा लॉक को पहले ही अधिग्रहित कर लिया गया है (और इस प्रकार उसका स्वामित्व है), तो वर्तमान प्रक्रिया या थ्रेड स्वामित्व नहीं लेता है और लॉक के भीतर पुनरावृत्ति स्तर नहीं बदला गया है, जिसके परिणामस्वरूप रिटर्न वैल्यू है False । यदि ताला एक अनलॉक स्थिति में है, तो वर्तमान प्रक्रिया या धागा स्वामित्व लेता है और पुनरावृत्ति स्तर बढ़ जाता है, जिसके परिणामस्वरूप वापसी मूल्य होता है True

टाइमआउट तर्क का उपयोग और व्यवहार उसी रूप में हैं Lock.acquire() । ध्यान दें कि टाइमआउट के इन व्यवहारों में से कुछ कार्यान्वित व्यवहारों से भिन्न हैं threading.RLock.acquire()

release()

एक ताला जारी करें, पुनरावृत्ति स्तर को घटाएं। यदि वेतन वृद्धि के बाद पुनरावृत्ति स्तर शून्य है, तो लॉक को अनलॉक किए गए (किसी भी प्रक्रिया या थ्रेड के स्वामित्व में नहीं) पर रीसेट करें और यदि किसी अन्य प्रक्रिया या थ्रेड को लॉक के अनलॉक होने के इंतजार में अवरुद्ध किया जाता है, तो उनमें से किसी एक को आगे बढ़ने दें। यदि वेतनवृद्धि के बाद पुनरावृत्ति स्तर अभी भी गैर-शून्य है, तो कॉलिंग प्रक्रिया या थ्रेड द्वारा लॉक लॉक और स्वामित्व में रहता है।

कॉलिंग प्रक्रिया या थ्रेड का मालिक होने पर केवल इस विधि को कॉल करें। एक AssertionError उठाया है इस विधि एक प्रक्रिया के द्वारा कहा जाता है या मालिक के अलावा अन्य सूत्र है, तो या ताला एक खुला (बिना स्वामित्व वाले) राज्य में है। ध्यान दें कि इस स्थिति में उठाए गए अपवाद का प्रकार कार्यान्वित व्यवहार से अलग है threading.RLock.release()

class multiprocessing.Semaphore([value])

एक सेमाफोर ऑब्जेक्ट: का एक करीबी एनालॉग threading.Semaphore

इसके निकटवर्ती एनालॉग से एकान्त अंतर मौजूद है: इसकी acquire विधि का पहला तर्क ब्लॉक का नाम है , जैसा कि इसके अनुरूप है Lock.acquire()

ध्यान दें

मैक ओएस एक्स पर, sem_timedwait असमर्थित है, इसलिए acquire() टाइमआउट के साथ कॉल करने से नींद लूप का उपयोग करके उस फ़ंक्शन के व्यवहार का अनुकरण होगा।

ध्यान दें

SIGINT द्वारा उत्पन्न संकेत तो Ctrl-C आता है, जबकि मुख्य थ्रेड कॉल द्वारा अवरुद्ध है करने के लिए BoundedSemaphore.acquire() , Lock.acquire() , RLock.acquire() , Semaphore.acquire() , Condition.acquire() या Condition.wait() तो कॉल तुरंत बाधित किया जाएगा और KeyboardInterrupt बढ़ा दी जाएगी।

यह उस व्यवहार से अलग है threading जहां SIGINT को नजरअंदाज किया जाएगा जबकि समतुल्य अवरोधक कॉल प्रगति पर हैं।

ध्यान दें

इस पैकेज की कार्यक्षमता में से कुछ के लिए मेजबान ऑपरेटिंग सिस्टम पर एक कामकाजी साझा अर्ध-कार्यान्वयन की आवश्यकता होती है। एक के बिना, multiprocessing.synchronize मॉड्यूल को अक्षम कर दिया जाएगा, और इसे आयात करने के प्रयासों का परिणाम होगा ImportError । देखें bpo-3770 अतिरिक्त जानकारी के लिए।

साझा की गई ctypes वस्तुएं

साझा मेमोरी का उपयोग करके साझा किए गए ऑब्जेक्ट बनाना संभव है जो कि बाल प्रक्रियाओं द्वारा विरासत में मिला हो सकता है।

multiprocessing.Value(typecode_or_type, *args, lock=True)

ctypes साझा की गई मेमोरी से आवंटित ऑब्जेक्ट लौटाएं । डिफ़ॉल्ट रूप से रिटर्न वैल्यू वास्तव में ऑब्जेक्ट के लिए एक सिंक्रनाइज़ रैपर है। ऑब्जेक्ट को स्वयं के मान विशेषता के माध्यम से एक्सेस किया जा सकता है Value

टाइपबॉस्‍टर_or_type लौटे हुए ऑब्जेक्ट के प्रकार को निर्धारित करता है: यह या तो ctypes प्रकार है या array मॉड्यूल द्वारा उपयोग किए जाने वाले प्रकार का एक वर्ण टंकण है । * प्रकार के लिए आर्गर्स को कंस्ट्रक्टर पर पास किया जाता है।

तो ताला है True (डिफ़ॉल्ट) फिर एक नया पुनरावर्ती ताला वस्तु मूल्य के लिए उपयोग सिंक्रनाइज़ करने के लिए बनाया जाता है। यदि ताला एक वस्तु Lock या RLock वस्तु है, तो इसका उपयोग मूल्य तक पहुंच को सिंक्रनाइज़ करने के लिए किया जाएगा। यदि लॉक है False तो लौटी हुई वस्तु तक पहुंच स्वचालित रूप से लॉक द्वारा संरक्षित नहीं होगी, इसलिए यह जरूरी नहीं कि "प्रक्रिया-सुरक्षित" हो।

संचालन, += जिसमें एक पढ़ना और लिखना शामिल है, परमाणु नहीं हैं। इसलिए, उदाहरण के लिए, यदि आप साझा मूल्य को परमाणु रूप से बढ़ाना चाहते हैं तो यह सिर्फ करना अपर्याप्त है

counter.value += 1

मान लें कि संबंधित लॉक पुनरावर्ती है (जो कि डिफ़ॉल्ट रूप से है) आप इसके बजाय कर सकते हैं

with counter.get_lock():
    counter.value += 1

ध्यान दें कि ताला एक खोजशब्द-मात्र तर्क है।

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

साझा मेमोरी से आवंटित एक ctypes सरणी लौटें। डिफ़ॉल्ट रूप से रिटर्न मान वास्तव में सरणी के लिए एक सिंक्रनाइज़ रैपर है।

टाइपबॉस्टर_ओर_टाइप लौटे सरणी के तत्वों के प्रकार को निर्धारित करता है: यह या तो एक ctypes प्रकार है या array मॉड्यूल द्वारा उपयोग किए जाने वाले प्रकार का एक वर्ण टाइपबेल्ट है । यदि size_or_initializer एक पूर्णांक है, तो यह सरणी की लंबाई निर्धारित करता है, और सरणी शुरू में शून्य हो जाएगी। अन्यथा, size_or_initializer एक अनुक्रम है जिसका उपयोग सरणी को आरंभ करने के लिए किया जाता है और जिसकी लंबाई सरणी की लंबाई निर्धारित करती है।

तो ताला है True (डिफ़ॉल्ट) फिर एक नया ताला वस्तु मूल्य के लिए उपयोग सिंक्रनाइज़ करने के लिए बनाया जाता है। यदि ताला एक वस्तु Lock या RLock वस्तु है, तो इसका उपयोग मूल्य तक पहुंच को सिंक्रनाइज़ करने के लिए किया जाएगा। यदि लॉक है False तो लौटी हुई वस्तु तक पहुंच स्वचालित रूप से लॉक द्वारा संरक्षित नहीं होगी, इसलिए यह जरूरी नहीं कि "प्रक्रिया-सुरक्षित" हो।

ध्यान दें कि ताला केवल एक तर्क है।

ध्यान दें कि एक सरणी ctypes.c_char में मूल्य और कच्ची विशेषताएँ होती हैं जो किसी को स्ट्रिंग्स को स्टोर और पुनः प्राप्त करने के लिए उपयोग करने की अनुमति देती हैं।

multiprocessing.sharedctypes मॉड्यूल

multiprocessing.sharedctypes मॉड्यूल आवंटित करने के लिए कार्य प्रदान करता ctypes है जो बच्चे प्रक्रियाओं के द्वारा प्राप्त की जा सकती साझा स्मृति से वस्तुओं।

ध्यान दें

यद्यपि यह साझा मेमोरी में एक पॉइंटर को स्टोर करना संभव है, यह याद रखें कि यह एक विशिष्ट प्रक्रिया के पता स्थान में एक स्थान को संदर्भित करेगा। हालाँकि, दूसरी प्रक्रिया के संदर्भ में पॉइंटर अवैध होने की संभावना है और दूसरी प्रक्रिया से पॉइंटर को डीरेल करने की कोशिश करने से दुर्घटना हो सकती है।

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

साझा मेमोरी से आवंटित एक ctypes सरणी लौटें।

टाइपबॉस्टर_ओर_टाइप लौटे सरणी के तत्वों के प्रकार को निर्धारित करता है: यह या तो एक ctypes प्रकार है या array मॉड्यूल द्वारा उपयोग किए जाने वाले प्रकार का एक वर्ण टाइपबेल्ट है । यदि size_or_initializer एक पूर्णांक है, तो यह सरणी की लंबाई निर्धारित करता है, और सरणी शुरू में शून्य हो जाएगी। अन्यथा size_or_initializer एक अनुक्रम है जिसका उपयोग सरणी को आरंभ करने के लिए किया जाता है और जिसकी लंबाई सरणी की लंबाई निर्धारित करती है।

ध्यान दें कि एक तत्व की स्थापना और प्राप्त करना संभवतः गैर-परमाणु है - Array() यह सुनिश्चित करने के लिए उपयोग करें कि लॉक का उपयोग करके पहुंच स्वचालित रूप से सिंक्रनाइज़ है।

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

साझा मेमोरी से आवंटित ctypes ऑब्जेक्ट लौटाएं।

टाइपबॉस्‍टर_or_type लौटे हुए ऑब्जेक्ट के प्रकार को निर्धारित करता है: यह या तो ctypes प्रकार है या array मॉड्यूल द्वारा उपयोग किए जाने वाले प्रकार का एक वर्ण टंकण है । * प्रकार के लिए आर्गर्स को कंस्ट्रक्टर पर पास किया जाता है।

ध्यान दें कि मूल्य निर्धारित करना और प्राप्त करना संभवतः गैर-परमाणु है - Value() इसके बजाय यह सुनिश्चित करने के लिए उपयोग करें कि लॉक का उपयोग करके एक्सेस स्वचालित रूप से सिंक्रनाइज़ है।

ध्यान दें कि एक सरणी के ctypes.c_char पास value और raw विशेषताएँ हैं जो किसी को स्ट्रिंग्स को संग्रहीत करने और पुनर्प्राप्त करने के लिए इसका उपयोग करने की अनुमति देती हैं - के लिए प्रलेखन देखें ctypes

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)

के रूप में ही RawArray() है कि सिवाय के मूल्य के आधार ताला एक प्रक्रिया सुरक्षित तुल्यकालन आवरण एक कच्चे ctypes सरणी के बजाय लौटाया जा सकता है।

तो ताला है True (डिफ़ॉल्ट) फिर एक नया ताला वस्तु मूल्य के लिए उपयोग सिंक्रनाइज़ करने के लिए बनाया जाता है। यदि ताला एक वस्तु Lock या RLock वस्तु है, तो इसका उपयोग मूल्य तक पहुंच को सिंक्रनाइज़ करने के लिए किया जाएगा। यदि लॉक है False तो लौटी हुई वस्तु तक पहुंच स्वचालित रूप से लॉक द्वारा संरक्षित नहीं होगी, इसलिए यह जरूरी नहीं कि "प्रक्रिया-सुरक्षित" हो।

ध्यान दें कि ताला एक खोजशब्द-मात्र तर्क है।

multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)

के रूप में ही RawValue() है कि सिवाय के मूल्य के आधार ताला एक प्रक्रिया सुरक्षित तुल्यकालन आवरण एक कच्चे ctypes वस्तु के स्थान पर लौटाया जा सकता है।

तो ताला है True (डिफ़ॉल्ट) फिर एक नया ताला वस्तु मूल्य के लिए उपयोग सिंक्रनाइज़ करने के लिए बनाया जाता है। यदि ताला एक वस्तु Lock या RLock वस्तु है, तो इसका उपयोग मूल्य तक पहुंच को सिंक्रनाइज़ करने के लिए किया जाएगा। यदि लॉक है False तो लौटी हुई वस्तु तक पहुंच स्वचालित रूप से लॉक द्वारा संरक्षित नहीं होगी, इसलिए यह जरूरी नहीं कि "प्रक्रिया-सुरक्षित" हो।

ध्यान दें कि ताला एक खोजशब्द-मात्र तर्क है।

multiprocessing.sharedctypes.copy(obj)

साझा मेमोरी से आवंटित एक ctypes ऑब्जेक्ट लौटाएं जो ctypes ऑब्जेक्ट obj की एक प्रति है ।

multiprocessing.sharedctypes.synchronized(obj[, lock])

Ctypes ऑब्जेक्ट के लिए एक प्रक्रिया-सुरक्षित आवरण ऑब्जेक्ट लौटाएं जो एक्सेस को सिंक्रनाइज़ करने के लिए लॉक का उपयोग करता है । तो ताला है None (डिफ़ॉल्ट) फिर एक RLock वस्तु अपने आप बन जाता।

एक सिंक्रनाइज़ रैपर में ऑब्जेक्ट के अलावा दो तरीके होंगे जो इसे लपेटता है: get_obj() लिपटे ऑब्जेक्ट को get_lock() लौटाता है और सिंक्रनाइज़ेशन के लिए उपयोग की गई लॉक ऑब्जेक्ट को वापस करता है।

ध्यान दें कि आवरण के माध्यम से ctypes ऑब्जेक्ट को एक्सेस करना कच्चे ctypes ऑब्जेक्ट को एक्सेस करने की तुलना में बहुत धीमा हो सकता है।

संस्करण 3.5 में परिवर्तित: सिंक्रनाइज़ किए गए ऑब्जेक्ट संदर्भ प्रबंधक प्रोटोकॉल का समर्थन करते हैं ।

नीचे दी गई तालिका सामान्य ctypes सिंटैक्स के साथ साझा मेमोरी से साझा ctypes ऑब्जेक्ट बनाने के लिए सिंटैक्स की तुलना करती है। (तालिका MyStruct में कुछ उपवर्ग है ctypes.Structure ।)

ctypes प्रकार का उपयोग कर साझा किए गए टंकण का उपयोग करते हुए साझाकरण
c_double (2.4) RawValue (c_double, 2.4) रॉवेल्यू ('डी', 2.4)
MyStruct (4, 6) RawValue (MyStruct, 4, 6)
(c_short * 7) () रॉअरे (c_short, 7) रावरायरे ('एच', 7)
(c_int * 3) (9, 2, 8) रावेर्रे (c_int, (9, 2, 8)) रॉयर्रे ('i', (9, 2, 8))

नीचे एक उदाहरण है जहां कई ctypes ऑब्जेक्ट्स को एक चाइल्ड प्रोसेस द्वारा संशोधित किया जाता है:

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

मुद्रित परिणाम हैं

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

प्रबंधक

प्रबंधक डेटा बनाने का एक तरीका प्रदान करते हैं जिसे विभिन्न प्रक्रियाओं के बीच साझा किया जा सकता है, जिसमें विभिन्न मशीनों पर चलने वाली प्रक्रियाओं के बीच एक नेटवर्क पर साझा करना शामिल है। एक प्रबंधक ऑब्जेक्ट एक सर्वर प्रक्रिया को नियंत्रित करता है जो साझा वस्तुओं का प्रबंधन करता है । अन्य प्रक्रियाएँ सांकेतिक वस्तुओं का उपयोग करके साझा वस्तुओं तक पहुँच सकती हैं।

multiprocessing.Manager()

एक आरंभ की गई SyncManager वस्तु लौटाती है जिसका उपयोग प्रक्रियाओं के बीच वस्तुओं को साझा करने के लिए किया जा सकता है। लौटाया गया प्रबंधक ऑब्जेक्ट एक स्पॉन्डेड चाइल्ड प्रक्रिया से मेल खाता है और इसमें ऐसी विधियाँ हैं जो साझा ऑब्जेक्ट्स बनाएंगे और संबंधित प्रॉक्सिज़ को वापस करेंगे।

जैसे ही वे कचरा एकत्र करते हैं या उनकी मूल प्रक्रिया से बाहर निकल जाते हैं, प्रबंधक प्रक्रियाएं बंद हो जाएंगी। प्रबंधक कक्षाएं multiprocessing.managers मॉड्यूल में परिभाषित की जाती हैं :

class multiprocessing.managers.BaseManager([address[, authkey]])

एक BaseManager ऑब्जेक्ट बनाएँ।

एक बार बनाए जाने के बाद, कॉल करने start() या get_server().serve_forever() यह सुनिश्चित करने के लिए कि प्रबंधक ऑब्जेक्ट एक प्रारंभ प्रबंधक प्रक्रिया को संदर्भित करता है।

पता वह पता है जिस पर प्रबंधक प्रक्रिया नए कनेक्शन के लिए सुनती है। यदि पता है None तो एक मनमाना चुना जाता है।

ऑरिजिट प्रमाणीकरण कुंजी है जिसका उपयोग सर्वर प्रक्रिया में आने वाले कनेक्शन की वैधता की जांच करने के लिए किया जाएगा। यदि authkey है None तो current_process().authkey प्रयोग किया जाता है। अन्यथा ऑर्कुट का उपयोग किया जाता है और यह एक बाइट स्ट्रिंग होना चाहिए।

start([initializer[, initargs]])

प्रबंधक शुरू करने के लिए एक उपप्रकार शुरू करें। यदि प्रारंभकर्ता नहीं है None तो उपप्रक्रिया कॉल करेंगे initializer(*initargs) जब यह शुरू होता है।

get_server()

एक Server वस्तु लौटाता है जो प्रबंधक के नियंत्रण में वास्तविक सर्वर का प्रतिनिधित्व करता है। Server वस्तु का समर्थन करता है serve_forever() विधि:

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey=b'abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server इसके अतिरिक्त एक address विशेषता है।

connect()

दूरस्थ प्रबंधक प्रक्रिया में स्थानीय प्रबंधक ऑब्जेक्ट कनेक्ट करें:

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 5000), authkey=b'abc')
>>> m.connect()
shutdown()

प्रबंधक द्वारा उपयोग की जाने वाली प्रक्रिया को रोकें। यह केवल तभी उपलब्ध है जब start() सर्वर प्रक्रिया को शुरू करने के लिए उपयोग किया गया है।

इसे कई बार कहा जा सकता है।

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

एक क्लासमेथोड जिसका उपयोग प्रबंधक वर्ग के साथ एक प्रकार या कॉल करने योग्य रजिस्टर करने के लिए किया जा सकता है।

टाइपिड एक "प्रकार पहचानकर्ता" है, जिसका उपयोग किसी विशेष प्रकार की साझा वस्तु की पहचान करने के लिए किया जाता है। यह एक तार होना चाहिए।

कॉल करने योग्य एक कॉल करने योग्य है जो इस प्रकार के पहचानकर्ता के लिए ऑब्जेक्ट बनाने के लिए उपयोग किया जाता है। यदि प्रबंधक का प्रबंधक connect() विधि का उपयोग कर सर्वर से जुड़ा होगा , या अगर create_method तर्क है False तो इसे इस रूप में छोड़ा जा सकता है None

proxytype एक उपवर्ग है BaseProxy जिसका उपयोग इस प्रकार के साथ साझा किए गए ऑब्जेक्ट के लिए परदे के पीछे बनाने के लिए किया जाता है । यदि None तब एक प्रॉक्सी क्लास अपने आप बन जाती है।

उजागर का उपयोग विधि नामों के अनुक्रम को निर्दिष्ट करने के लिए किया जाता है जो इस टाइपिड के लिए प्रॉक्सी का उपयोग करने की अनुमति दी जानी चाहिए BaseProxy._callmethod() । (यदि उजागर किया गया है None तो proxytype._exposed_ इसका उपयोग इसके बजाय यदि मौजूद है तो किया जाता है।) उस मामले में जहां कोई उजागर सूची निर्दिष्ट नहीं है, साझा वस्तु के सभी "सार्वजनिक तरीके" सुलभ होंगे। (यहाँ एक "सार्वजनिक विधि" का अर्थ है किसी भी विशेषता जिसमें एक __call__() विधि है और जिसका नाम शुरू नहीं होता है '_' ।)

Method_to_typeid एक मानचित्रण है जो उन उजागर तरीकों की वापसी प्रकार को निर्दिष्ट करने के लिए उपयोग किया जाता है जो एक प्रॉक्सी को वापस करना चाहिए। यह स्ट्रिंग्स टाइप करने के लिए मेथड मैप्स को मैप करता है। (यदि method_to_typeid है None तो proxytype._method_to_typeid_ इसका उपयोग इसके बजाय यदि मौजूद है, तो किया जाता है।) यदि किसी विधि का नाम इस मैपिंग की कुंजी नहीं है या यदि मैपिंग है, None तो विधि द्वारा दी गई ऑब्जेक्ट को मान द्वारा कॉपी किया जाएगा।

create_method यह निर्धारित करता है कि क्या नाम टाइपिड के साथ एक विधि बनाई जानी चाहिए जिसका उपयोग सर्वर प्रक्रिया को एक नई साझा ऑब्जेक्ट बनाने और इसके लिए एक प्रॉक्सी वापस करने के लिए किया जा सकता है। डिफ़ॉल्ट रूप से यह है True

BaseManager उदाहरणों में भी केवल एक ही संपत्ति है:

address

प्रबंधक द्वारा उपयोग किया गया पता।

संस्करण 3.3 में परिवर्तित: प्रबंधक ऑब्जेक्ट संदर्भ प्रबंधन प्रोटोकॉल का समर्थन करते हैं - संदर्भ प्रबंधक प्रकार देखें __enter__() सर्वर प्रक्रिया शुरू करता है (यदि यह पहले से शुरू नहीं हुआ है) और फिर प्रबंधक ऑब्जेक्ट लौटाता है। __exit__() कॉल करता है shutdown()

पिछले संस्करणों __enter__() में प्रबंधक की सर्वर प्रक्रिया शुरू नहीं की थी अगर यह पहले से ही शुरू नहीं हुई थी।

class multiprocessing.managers.SyncManager

एक उपवर्ग का BaseManager उपयोग प्रक्रियाओं के सिंक्रनाइज़ेशन के लिए किया जा सकता है। इस प्रकार की वस्तुओं को वापस कर दिया जाता है multiprocessing.Manager()

इसकी विधियाँ कई प्रकार के उपयोग किए जाने वाले डेटा प्रकारों के लिए प्रॉक्सी ऑब्जेक्ट बनाती हैं और वापस लौटाती हैं जिन्हें पूरी प्रक्रियाओं में सिंक्रनाइज़ किया जाता है। इसमें विशेष रूप से साझा सूचियाँ और शब्दकोश शामिल हैं।

Barrier(parties[, action[, timeout]])

एक साझा threading.Barrier ऑब्जेक्ट बनाएं और इसके लिए एक प्रॉक्सी लौटाएं।

संस्करण 3.3 में नया।

BoundedSemaphore([value])

एक साझा threading.BoundedSemaphore ऑब्जेक्ट बनाएं और इसके लिए एक प्रॉक्सी लौटाएं।

Condition([lock])

एक साझा threading.Condition ऑब्जेक्ट बनाएं और इसके लिए एक प्रॉक्सी लौटाएं।

यदि लॉक की आपूर्ति की जाती है, तो यह एक वस्तु threading.Lock या threading.RLock वस्तु के लिए एक प्रॉक्सी होना चाहिए ।

संस्करण 3.3 में परिवर्तित: wait_for() विधि जोड़ दिया गया।

Event()

एक साझा threading.Event ऑब्जेक्ट बनाएं और इसके लिए एक प्रॉक्सी लौटाएं।

Lock()

एक साझा threading.Lock ऑब्जेक्ट बनाएं और इसके लिए एक प्रॉक्सी लौटाएं।

Namespace()

एक साझा Namespace ऑब्जेक्ट बनाएं और इसके लिए एक प्रॉक्सी लौटाएं।

Queue([maxsize])

एक साझा queue.Queue ऑब्जेक्ट बनाएं और इसके लिए एक प्रॉक्सी लौटाएं।

RLock()

एक साझा threading.RLock ऑब्जेक्ट बनाएं और इसके लिए एक प्रॉक्सी लौटाएं।

Semaphore([value])

एक साझा threading.Semaphore ऑब्जेक्ट बनाएं और इसके लिए एक प्रॉक्सी लौटाएं।

Array(typecode, sequence)

एक सरणी बनाएं और इसके लिए एक प्रॉक्सी लौटाएं।

Value(typecode, value)

एक लिखने योग्य value विशेषता के साथ एक ऑब्जेक्ट बनाएं और इसके लिए एक प्रॉक्सी लौटाएं।

dict()
dict(mapping)
dict(sequence)

एक साझा dict ऑब्जेक्ट बनाएं और इसके लिए एक प्रॉक्सी लौटाएं।

list()
list(sequence)

एक साझा list ऑब्जेक्ट बनाएं और इसके लिए एक प्रॉक्सी लौटाएं।

संस्करण 3.6 में परिवर्तित: साझा की गई वस्तुएँ नेस्टेड होने में सक्षम हैं। उदाहरण के लिए, एक साझा कंटेनर ऑब्जेक्ट जैसे कि एक साझा सूची में अन्य साझा किए गए ऑब्जेक्ट शामिल हो सकते हैं जो सभी द्वारा प्रबंधित और सिंक्रनाइज़ किए जाएंगे SyncManager

class multiprocessing.managers.Namespace

एक प्रकार जो साथ रजिस्टर कर सकता है SyncManager

एक नामस्थान ऑब्जेक्ट में कोई सार्वजनिक विधियाँ नहीं होती हैं, लेकिन उनमें गुणात्मक गुण होते हैं। इसका प्रतिनिधित्व इसकी विशेषताओं के मूल्यों को दर्शाता है।

हालाँकि, किसी नेमस्पेस ऑब्जेक्ट के लिए प्रॉक्सी का उपयोग करते समय, एक विशेषता जिसके साथ शुरुआत होती है '_' , वह प्रॉक्सी की विशेषता होगी न कि संदर्भ की एक विशेषता:

>>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print(Global)
Namespace(x=10, y='hello')

अनुकूलित प्रबंधक

अपने स्वयं का प्रबंधक बनाने के लिए, कोई व्यक्ति उप-वर्ग बनाता है BaseManager और register() प्रबंधक वर्ग के साथ नए प्रकार या कॉलबेल्स को पंजीकृत करने के लिए क्लासमेथोड का उपयोग करता है । उदाहरण के लिए:

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

रिमोट मैनेजर का उपयोग करना

एक मशीन पर एक प्रबंधक सर्वर को चलाना संभव है और ग्राहक इसे अन्य मशीनों से उपयोग करते हैं (यह मानते हुए कि इसमें शामिल फायरवॉल अनुमति देते हैं)।

निम्न आदेशों को चलाने से एक एकल साझा कतार के लिए एक सर्वर बनता है जिसे दूरस्थ ग्राहक एक्सेस कर सकते हैं:

>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

एक ग्राहक निम्नानुसार सर्वर तक पहुंच सकता है:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

एक अन्य ग्राहक भी इसका उपयोग कर सकते हैं:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

स्थानीय प्रक्रियाएँ उस कतार तक पहुँच प्राप्त कर सकती हैं, जो ऊपर से कोड का उपयोग करके इसे दूर से उपयोग कर सकती है:

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super(Worker, self).__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

प्रॉक्सी ऑब्जेक्ट्स

एक छद्म एक वस्तु है जो एक साझा वस्तु को संदर्भित करती है जो (संभवतः) एक अलग प्रक्रिया में रहती है। साझा वस्तु को प्रॉक्सी का संदर्भ कहा जाता है । एकाधिक प्रॉक्सी ऑब्जेक्ट में एक ही रेफ़रेंट हो सकता है।

एक प्रॉक्सी ऑब्जेक्ट में ऐसे तरीके होते हैं जो अपने संदर्भ के संबंधित तरीकों को लागू करते हैं (हालांकि संदर्भ का हर तरीका प्रॉक्सी के माध्यम से आवश्यक रूप से उपलब्ध नहीं होगा) इस तरह, एक प्रॉक्सी का उपयोग उसके रेफरेंट कैन की तरह ही किया जा सकता है:

>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

ध्यान दें कि str() प्रॉक्सी पर आवेदन करने से संदर्भ repr() का प्रतिनिधित्व वापस आ जाएगा , जबकि आवेदन करने पर प्रॉक्सी का प्रतिनिधित्व वापस आ जाएगा।

प्रॉक्सी ऑब्जेक्ट्स की एक महत्वपूर्ण विशेषता यह है कि वे पिकलेबल हैं ताकि उन्हें प्रक्रियाओं के बीच पारित किया जा सके। जैसे, एक संदर्भ में प्रॉक्सी ऑब्जेक्ट हो सकते हैं । यह इन प्रबंधित सूचियों, dicts, और अन्य प्रॉक्सी वस्तुओं के घोंसले के शिकार की अनुमति देता है :

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']

इसी तरह, तानाशाही और सूची की समीपता एक दूसरे के अंदर निहित हो सकती है:

>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}

यदि मानक (गैर-प्रॉक्सी) list या dict ऑब्जेक्ट्स को एक संदर्भ में समाहित किया गया है, तो उन उत्परिवर्तनीय मूल्यों में संशोधन प्रबंधक के माध्यम से प्रचारित नहीं किए जाएंगे क्योंकि प्रॉक्सी में यह जानने का कोई तरीका नहीं है कि भीतर मौजूद मानों को संशोधित किया गया है या नहीं। हालांकि, एक कंटेनर प्रॉक्सी में एक मान को संग्रहीत करना (जो __setitem__ प्रॉक्सी ऑब्जेक्ट पर ट्रिगर होता है) प्रबंधक के माध्यम से प्रचार करता है और इसलिए इस तरह के आइटम को प्रभावी ढंग से संशोधित करने के लिए, एक कंटेनर प्रॉक्सी को संशोधित मूल्य को फिर से असाइन कर सकता है:

# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d

यह दृष्टिकोण संभवतः सबसे अधिक उपयोग के मामलों के लिए नेस्टेड प्रॉक्सी ऑब्जेक्ट्स को नियोजित करने से कम सुविधाजनक है, लेकिन यह सिंक्रनाइज़ेशन पर नियंत्रण के स्तर को भी दर्शाता है।

ध्यान दें

multiprocessing मूल्य से तुलना का समर्थन करने के लिए प्रॉक्सी प्रकार कुछ भी नहीं करते हैं। उदाहरण के लिए, हमारे पास:

>>> manager.list([1,2,3]) == [1,2,3]
False

तुलना करते समय व्यक्ति को केवल संदर्भ की एक प्रति का उपयोग करना चाहिए।

class multiprocessing.managers.BaseProxy

प्रॉक्सी वस्तुएं उप-वर्ग के उदाहरण हैं BaseProxy

_callmethod(methodname[, args[, kwds]])

कॉल और प्रॉक्सी के संदर्भ की एक विधि का परिणाम लौटाएं।

यदि proxy एक प्रॉक्सी है जिसका संदर्भ obj तब अभिव्यक्ति है

proxy._callmethod(methodname, args, kwds)

अभिव्यक्ति का मूल्यांकन करेंगे

getattr(obj, methodname)(*args, **kwds)

प्रबंधक की प्रक्रिया में।

दिए गए मान कॉल या एक नया साझा वस्तु के लिए एक प्रॉक्सी के परिणाम की एक प्रति हो जाएगा - के लिए दस्तावेज़ देखें method_to_typeid का तर्क register()

यदि कॉल द्वारा एक अपवाद उठाया जाता है, तो फिर से उठाया जाता है BaseProxy._callmethod() । यदि प्रबंधक की प्रक्रिया में कुछ अन्य अपवाद उठाए जाते हैं तो इसे RemoteError अपवाद में बदल दिया जाता है और इसके द्वारा उठाया जाता है BaseProxy._callmethod()

विशेष रूप से ध्यान दें कि एक अपवाद उठाया जाएगा यदि मेथडैम उजागर नहीं किया गया है

के उपयोग का एक उदाहरण BaseProxy._callmethod() :

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7]
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))          # equivalent to l[20]
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

संदर्भ की एक प्रति लौटाएं।

यदि संदर्भ असंगत है, तो यह एक अपवाद को बढ़ाएगा।

__repr__()

प्रॉक्सी ऑब्जेक्ट का प्रतिनिधित्व लौटें।

__str__()

रेफ़रेंट का प्रतिनिधित्व लौटाएँ।

साफ - सफाई

एक प्रॉक्सी ऑब्जेक्ट एक कमजोर कॉलबैक का उपयोग करता है ताकि जब यह कचरा इकट्ठा हो जाए तो इसे प्रबंधक से स्वयं डेरेगिस्ट ले लें जो इसके संदर्भ का मालिक है।

एक साझा ऑब्जेक्ट प्रबंधक प्रक्रिया से हटा दिया जाता है जब कोई संदर्भ नहीं होते हैं जो इसका संदर्भ देते हैं।

प्रक्रिया ताल

एक प्रक्रियाओं का एक पूल बना सकता है जो Pool कक्षा के साथ इसे प्रस्तुत किए गए कार्यों को पूरा करेगा ।

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

एक प्रक्रिया पूल ऑब्जेक्ट जो कार्यकर्ता प्रक्रियाओं के एक पूल को नियंत्रित करता है जिससे नौकरी प्रस्तुत की जा सकती है। यह टाइमआउट और कॉलबैक के साथ अतुल्यकालिक परिणामों का समर्थन करता है और इसका समानांतर नक्शा कार्यान्वयन है।

प्रक्रियाओं का उपयोग करने के लिए कार्यकर्ता प्रक्रियाओं की संख्या है। यदि प्रक्रियाओं है None तो संख्या से लौटे os.cpu_count() प्रयोग किया जाता है।

यदि इनिशलाइज़र नहीं है, None तो प्रत्येक श्रमिक प्रक्रिया initializer(*initargs) शुरू होने पर कॉल करेगी ।

मैक्सटेसपेरचाइल्ड उन कार्यों की संख्या है जो एक श्रमिक प्रक्रिया पूरी होने से पहले पूरी हो जाएगी और एक नए कार्यकर्ता प्रक्रिया से प्रतिस्थापित किया जाएगा, ताकि अप्रयुक्त संसाधनों को मुक्त किया जा सके। डिफ़ॉल्ट मैक्सटचस्पचाइल्ड है None , जिसका अर्थ है कि श्रमिक प्रक्रियाएं पूल के रूप में लंबे समय तक रहेंगी।

संदर्भ का उपयोग कार्यकर्ता प्रक्रियाओं को शुरू करने के लिए उपयोग किए जाने वाले संदर्भ को निर्दिष्ट करने के लिए किया जा सकता है। आमतौर पर एक पूल फ़ंक्शन multiprocessing.Pool() या Pool किसी संदर्भ ऑब्जेक्ट की विधि का उपयोग करके बनाया जाता है । दोनों मामलों में संदर्भ उचित रूप से निर्धारित किया गया है।

ध्यान दें कि पूल ऑब्जेक्ट के तरीकों को केवल उस प्रक्रिया से बुलाया जाना चाहिए जिसने पूल बनाया था।

संस्करण 3.2 में नया: मैक्सटेसपेरचाइल्ड

संस्करण 3.4 में नया: संदर्भ

ध्यान दें

कार्यकर्ता Pool आमतौर पर पूल की कार्य कतार की पूरी अवधि के लिए रहते हैं। अन्य प्रणालियों (जैसे अपाचे, mod_wsgi, आदि) में श्रमिकों द्वारा रखे गए संसाधनों को मुक्त करने के लिए एक लगातार पैटर्न पाया जाता है कि एक श्रमिक को बाहर निकलने से पहले केवल एक सेट राशि को पूरा करने की अनुमति दी जाती है, सफाई की जा रही है और एक नई प्रक्रिया शुरू की गई पुराने को बदलने के लिए। Maxtasksperchild को तर्क Pool उजागर अंत उपयोगकर्ता को यह क्षमता।

apply(func[, args[, kwds]])

तर्क args और कीवर्ड तर्क kwds के साथ कॉल func । यह परिणाम तैयार होने तक अवरुद्ध रहता है। इस ब्लॉक को देखते हुए, समानांतर में काम करने के लिए बेहतर अनुकूल है। इसके अतिरिक्त, फंक केवल पूल के श्रमिकों में से एक में निष्पादित होता है। apply_async()

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

apply() विधि का एक प्रकार जो परिणाम वस्तु देता है।

यदि कॉलबैक निर्दिष्ट किया गया है, तो यह एक कॉल करने योग्य होना चाहिए जो एकल तर्क को स्वीकार करता है। जब परिणाम तैयार हो जाता है तो कॉलबैक उस पर लागू कर दिया जाता है, जब तक कि कॉल विफल नहीं होती है, उस स्थिति में इसके बजाय error_callback लागू होता है।

यदि error_callback निर्दिष्ट किया गया है, तो यह एक कॉल करने योग्य होना चाहिए जो एकल तर्क को स्वीकार करता है। यदि लक्ष्य फ़ंक्शन विफल रहता है, तो error_callback अपवाद उदाहरण के साथ कहा जाता है।

कॉलबैक तुरंत पूरा होना चाहिए अन्यथा जो धागा परिणामों को संभालता है वह अवरुद्ध हो जाएगा।

map(func, iterable[, chunksize])

map() अंतर्निहित फ़ंक्शन के समानांतर समानांतर (यह हालांकि केवल एक पुनरावृत्त तर्क का समर्थन करता है )। यह परिणाम तैयार होने तक अवरुद्ध रहता है।

यह विधि कई क्रमों में चलने योग्य है जो इसे अलग-अलग कार्यों के रूप में प्रक्रिया पूल में जमा करती है। इन चंक्सों के आकार (लगभग) को एक धनात्मक पूर्णांक के लिए विखंडू निर्धारित करके निर्दिष्ट किया जा सकता है।

map_async(func, iterable[, chunksize[, callback[, error_callback]]])

map() विधि का एक प्रकार जो परिणाम वस्तु देता है।

यदि कॉलबैक निर्दिष्ट किया गया है, तो यह एक कॉल करने योग्य होना चाहिए जो एकल तर्क को स्वीकार करता है। जब परिणाम तैयार हो जाता है तो कॉलबैक उस पर लागू कर दिया जाता है, जब तक कि कॉल विफल नहीं होती है, उस स्थिति में इसके बजाय error_callback लागू होता है।

यदि error_callback निर्दिष्ट किया गया है, तो यह एक कॉल करने योग्य होना चाहिए जो एकल तर्क को स्वीकार करता है। यदि लक्ष्य फ़ंक्शन विफल रहता है, तो error_callback अपवाद उदाहरण के साथ कहा जाता है।

कॉलबैक तुरंत पूरा होना चाहिए अन्यथा जो धागा परिणामों को संभालता है वह अवरुद्ध हो जाएगा।

imap(func, iterable[, chunksize])

का एक लज़ीज़ संस्करण map()

Chunksize तर्क द्वारा इस्तेमाल किया वैसा ही है जैसा map() विधि। लंबे समय तक चलने के लिए एक बड़े मूल्य का उपयोग करने के लिए iterables डिफ़ॉल्ट मान का उपयोग करने की तुलना में काम को बहुत तेजी से पूरा कर सकता है 1

इसके अलावा अगर चंक्साइज़ है, 1 तो next() विधि द्वारा लौटाए गए पुनरावृत्ति की imap() विधि में एक वैकल्पिक टाइमआउट पैरामीटर है: यदि परिणाम टाइमआउट सेकंड के भीतर वापस नहीं किया जा सकता है, तो next(timeout) इसे बढ़ाएगा । multiprocessing.TimeoutError

imap_unordered(func, iterable[, chunksize])

के रूप में ही imap() , सिवाय इसके कि लौटे पुनरावर्तक से परिणामों का क्रम मनमाने ढंग से विचार किया जाना चाहिए। (केवल तब जब केवल एक कार्यकर्ता प्रक्रिया "सही" होने की गारंटी देने वाला आदेश हो।)

starmap(func, iterable[, chunksize])

जैसा map() सिवाय इसके कि के तत्वों iterable iterables कि तर्कों के रूप अनपैक किया जाता है हो सकता है की संभावना है।

इसलिए एक iterable की [(1,2), (3, 4)] में परिणाम [func(1,2), func(3,4)]

संस्करण 3.3 में नया।

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])

का एक संयोजन starmap() और map_async() उस पर दोहराता iterable iterables की और कॉल समारोह iterables अनपैक किया है। परिणाम वस्तु देता है।

संस्करण 3.3 में नया।

close()

पूल में जमा होने से किसी भी अधिक कार्य को रोकता है। एक बार सभी कार्य पूरे हो जाने के बाद कार्यकर्ता प्रक्रिया से बाहर निकल जाएंगे।

terminate()

बकाया काम पूरा किए बिना कार्यकर्ता प्रक्रियाओं को तुरंत रोक देता है। जब पूल ऑब्जेक्ट कचरा एकत्र terminate() किया जाता है तो तुरंत बुलाया जाएगा।

join()

कार्यकर्ता प्रक्रियाओं से बाहर निकलने की प्रतीक्षा करें। उपयोग करने से पहले close() या कॉल करना चाहिए । terminate() join()

संस्करण 3.3 में नया: पूल ऑब्जेक्ट अब संदर्भ प्रबंधन प्रोटोकॉल का समर्थन करते हैं - संदर्भ प्रबंधक प्रकार देखें __enter__() पूल ऑब्जेक्ट, और __exit__() कॉल लौटाता है terminate()

class multiprocessing.pool.AsyncResult

परिणाम का वर्ग इसके द्वारा लौट आया apply_async() और map_async()

get([timeout])

रिजल्ट आने पर उसे वापस कर दें। यदि टाइमआउट नहीं है None और परिणाम टाइमआउट सेकंड के भीतर नहीं आता है तो multiprocessing.TimeoutError उठाया जाता है। यदि दूरस्थ कॉल ने एक अपवाद उठाया, तो उस अपवाद को फिर से जोड़ दिया जाएगा get()

wait([timeout])

परिणाम उपलब्ध होने तक या समय समाप्त होने तक प्रतीक्षा करें ।

ready()

कॉल पूरा हो गया है या नहीं।

successful()

वापसी करें कि क्या अपवाद को उठाए बिना कॉल पूरा हुआ। AssertionError रिजल्ट तैयार नहीं होने पर उठाएंगे ।

निम्न उदाहरण एक पूल के उपयोग को दर्शाता है:

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

श्रोता और ग्राहक

आमतौर पर प्रक्रियाओं के बीच गुजरने वाले संदेश कतारों का उपयोग करके या Connection लौटाए गए ऑब्जेक्ट का उपयोग करके किया जाता है Pipe()

हालांकि, multiprocessing.connection मॉड्यूल कुछ अतिरिक्त लचीलापन देता है। यह मूल रूप से सॉकेट्स या विंडोज नाम के पाइप से निपटने के लिए एक उच्च स्तरीय संदेश उन्मुख एपीआई देता है। इसमें मॉड्यूल का उपयोग करके पाचन प्रमाणीकरण के लिए समर्थन है hmac , और एक ही समय में कई कनेक्शनों को मतदान के लिए।

multiprocessing.connection.deliver_challenge(connection, authkey)

कनेक्शन के दूसरे छोर पर एक यादृच्छिक रूप से उत्पन्न संदेश भेजें और उत्तर की प्रतीक्षा करें।

यदि उत्तर कुंजी के रूप में ऑर्किटेक का उपयोग करके संदेश के पाचन से मेल खाता है तो कनेक्शन के दूसरे छोर पर एक स्वागत संदेश भेजा जाता है। अन्यथा AuthenticationError उठाया जाता है।

multiprocessing.connection.answer_challenge(connection, authkey)

एक संदेश प्राप्त करें, कुंजी के रूप में ऑर्किटेक का उपयोग करके संदेश के पाचन की गणना करें , और फिर पाचन वापस भेजें।

यदि स्वागत योग्य संदेश नहीं मिलता है, तो AuthenticationError उठाया जाता है।

multiprocessing.connection.Client(address[, family[, authkey]])

श्रोता के लिए एक कनेक्शन स्थापित करने का प्रयास जो पते पते का उपयोग कर रहा है, वापस लौट रहा है Connection

कनेक्शन का प्रकार पारिवारिक तर्क द्वारा निर्धारित किया जाता है, लेकिन यह आमतौर पर छोड़ा जा सकता है क्योंकि यह आमतौर पर पते के प्रारूप से अनुमान लगाया जा सकता है । ( पता प्रारूप देखें )

यदि ऑक्टीकी दी गई है और कोई नहीं, यह एक बाइट स्ट्रिंग होना चाहिए और एचएमएसी -आधारित प्रमाणीकरण चुनौती के लिए गुप्त कुंजी के रूप में उपयोग किया जाएगा। अगर कोई प्रमाणीकरण किया जाता है authkey कोई नहीं है। AuthenticationError प्रमाणीकरण विफल होने पर उठाया जाता है। प्रमाणीकरण कुंजी देखें ।

class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])

एक बाध्य सॉकेट या विंडोज नाम के पाइप के लिए एक आवरण जो कनेक्शन के लिए 'सुनना' है।

पता , श्रोता ऑब्जेक्ट के बाध्य सॉकेट या नामित पाइप द्वारा उपयोग किया जाने वाला पता है।

ध्यान दें

यदि '0.0.0.0' के पते का उपयोग किया जाता है, तो पता विंडोज़ पर कनेक्ट करने योग्य अंत बिंदु नहीं होगा। यदि आपको एक कनेक्ट करने योग्य अंत-बिंदु की आवश्यकता है, तो आपको '127.0.0.1' का उपयोग करना चाहिए।

परिवार उपयोग करने के लिए सॉकेट (या नामित पाइप) का प्रकार है। यह स्ट्रिंग्स 'AF_INET' (टीसीपी सॉकेट के लिए), 'AF_UNIX' (एक यूनिक्स डोमेन सॉकेट के लिए) या 'AF_PIPE' (विंडोज पाइप के लिए) में से एक हो सकता है। इनमें से केवल पहले उपलब्ध होने की गारंटी है। अगर परिवार है None तो परिवार पते के प्रारूप से प्रभावित है । यदि पता भी है None तो एक डिफ़ॉल्ट चुना जाता है। यह डिफ़ॉल्ट वह परिवार है जिसे सबसे तेज़ उपलब्ध माना जाता है। पता प्रारूप देखें । ध्यान दें कि यदि परिवार है 'AF_UNIX' और पता है None तो सॉकेट का उपयोग कर बनाया एक निजी अस्थायी निर्देशिका में बनाया जाएगा tempfile.mkstemp()

यदि श्रोता ऑब्जेक्ट एक सॉकेट का उपयोग करता है तो बैकलॉग (1 बाय डिफ़ॉल्ट) listen() सॉकेट की विधि से एक बार इसे बाध्य कर दिया जाता है।

यदि ऑक्टीकी दी गई है और कोई नहीं, यह एक बाइट स्ट्रिंग होना चाहिए और एचएमएसी -आधारित प्रमाणीकरण चुनौती के लिए गुप्त कुंजी के रूप में उपयोग किया जाएगा। अगर कोई प्रमाणीकरण किया जाता है authkey कोई नहीं है। AuthenticationError प्रमाणीकरण विफल होने पर उठाया जाता है। प्रमाणीकरण कुंजी देखें ।

accept()

श्रोता ऑब्जेक्ट के बाध्य सॉकेट या नामित पाइप पर एक कनेक्शन स्वीकार करें और एक Connection ऑब्जेक्ट वापस करें । यदि प्रमाणीकरण का प्रयास किया जाता है और विफल रहता है, तो AuthenticationError उठाया जाता है।

close()

श्रोता ऑब्जेक्ट के बाध्य सॉकेट या नामित पाइप को बंद करें। यह स्वचालित रूप से कहा जाता है जब श्रोता कचरा एकत्र करता है। हालांकि इसे स्पष्ट रूप से कॉल करना उचित है।

श्रोता वस्तुओं में केवल पढ़ने के लिए निम्नलिखित गुण होते हैं:

address

वह पता जो श्रोता ऑब्जेक्ट द्वारा उपयोग किया जा रहा है।

last_accepted

जिस पते से अंतिम स्वीकृत कनेक्शन आया था। यदि यह अनुपलब्ध है तो यह है None

संस्करण 3.3 में नया: श्रोता ऑब्जेक्ट अब संदर्भ प्रबंधन प्रोटोकॉल का समर्थन करते हैं - संदर्भ प्रबंधक प्रकार देखें __enter__() श्रोता वस्तु, और __exit__() कॉल लौटाता है close()

multiprocessing.connection.wait(object_list, timeout=None)

Object_list में किसी वस्तु के तैयार होने तक प्रतीक्षा करें । में उन वस्तुओं की सूची देता है object_list जो तैयार हैं। यदि टाइमआउट एक फ्लोट है तो कॉल अधिकांश सेकंड के लिए ब्लॉक हो जाती है। अगर टाइमआउट होता है None तो यह असीमित अवधि के लिए ब्लॉक हो जाएगा। एक नकारात्मक टाइमआउट एक शून्य टाइमआउट के बराबर है।

दोनों यूनिक्स और विंडोज के लिए, एक वस्तु में प्रदर्शित हो सकता object_list अगर यह होता है

एक कनेक्शन या सॉकेट ऑब्जेक्ट तब तैयार होता है जब उसमें से पढ़ा जाने वाला डेटा उपलब्ध हो, या दूसरा छोर बंद हो गया हो।

यूनिक्स : wait(object_list, timeout) लगभग बराबर select.select(object_list, [], [], timeout) । अंतर यह है कि, यदि select.select() एक संकेत द्वारा बाधित किया जाता है, तो यह OSError त्रुटि संख्या के साथ बढ़ सकता है EINTR , जबकि multiprocessing.connection.wait() नहीं।

विंडोज : object_list में एक आइटम या तो एक पूर्णांक हैंडल होना चाहिए जो कि प्रतीक्षा योग्य है (Win32 फ़ंक्शन के प्रलेखन द्वारा उपयोग की गई परिभाषा के अनुसार WaitForMultipleObjects() ) या यह एक ऐसी fileno() विधि के साथ एक वस्तु हो सकती है जो सॉकेट हैंडल या पाइप हैंडल लौटाती है। (ध्यान दें कि पाइप के हैंडल और सॉकेट के हैंडल वाफ़ेबल हैंडल नहीं हैं ।)

संस्करण 3.3 में नया।

उदाहरण

निम्न सर्वर कोड एक श्रोता बनाता है जो 'secret password' प्रमाणीकरण कुंजी के रूप में उपयोग करता है । यह तब एक कनेक्शन की प्रतीक्षा करता है और क्लाइंट को कुछ डेटा भेजता है:

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'

with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)

        conn.send([2.25, None, 'junk', float])

        conn.send_bytes(b'hello')

        conn.send_bytes(array('i', [42, 1729]))

निम्न कोड सर्वर से जुड़ता है और सर्वर से कुछ डेटा प्राप्त करता है:

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]

    print(conn.recv_bytes())            # => 'hello'

    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

निम्न कोड multiprocessing.connection.wait() एक साथ कई प्रक्रियाओं के संदेशों की प्रतीक्षा करने के लिए उपयोग करता है:

import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait

def foo(w):
    for i in range(10):
        w.send((i, current_process().name))
    w.close()

if __name__ == '__main__':
    readers = []

    for i in range(4):
        r, w = Pipe(duplex=False)
        readers.append(r)
        p = Process(target=foo, args=(w,))
        p.start()
        # We close the writable end of the pipe now to be sure that
        # p is the only process which owns a handle for it.  This
        # ensures that when p closes its handle for the writable end,
        # wait() will promptly report the readable end as being ready.
        w.close()

    while readers:
        for r in wait(readers):
            try:
                msg = r.recv()
            except EOFError:
                readers.remove(r)
            else:
                print(msg)

पता प्रारूप

  • एक 'AF_INET' पता उस प्रपत्र का एक हिस्सा है (hostname, port) जहां होस्टनाम एक स्ट्रिंग है और पोर्ट एक पूर्णांक है।
  • एक 'AF_UNIX' पता एक स्ट्रिंग है जो फ़ाइल सिस्टम पर फ़ाइल नाम का प्रतिनिधित्व करता है।
  • An 'AF_PIPE' address is a string of the form
    r'\\.\pipe\ PipeName ' । उपयोग करने के लिए Client() कहा जाता है एक दूरस्थ कंप्यूटर पर किसी नामित पाइप से कनेक्ट करने के सर्वर नाम एक रूप के एक पते का उपयोग करना चाहिए बजाय। r'\\ ServerName \pipe\ PipeName '

ध्यान दें कि दो बैकस्लैश के साथ किसी भी स्ट्रिंग की शुरुआत डिफ़ॉल्ट रूप से एक 'AF_PIPE' पते के बजाय एक 'AF_UNIX' पते के रूप में होती है।

प्रमाणीकरण कुंजी

जब कोई उपयोग करता है recv() , तो प्राप्त डेटा स्वचालित रूप से अप्रकाशित होता है। दुर्भाग्य से एक अविश्वसनीय स्रोत से डेटा को अनप्लग करना एक सुरक्षा जोखिम है। इसलिए Listener और डाइजेस्ट प्रमाणीकरण प्रदान करने के लिए मॉड्यूल का Client() उपयोग करें hmac

प्रमाणीकरण कुंजी एक बाइट स्ट्रिंग है जिसे पासवर्ड के रूप में सोचा जा सकता है: एक बार कनेक्शन स्थापित होने के बाद दोनों छोर प्रमाण की मांग करेंगे कि दूसरा प्रमाणीकरण कुंजी जानता है। (यह दर्शाता है कि दोनों सिरे एक ही कुंजी का उपयोग कर रहे हैं , कनेक्शन पर कुंजी भेजना शामिल नहीं है।)

यदि प्रमाणीकरण का अनुरोध किया गया है, लेकिन कोई प्रमाणीकरण कुंजी निर्दिष्ट नहीं है, तो वापसी मूल्य का current_process().authkey उपयोग किया जाता है (देखें Process )। यह मान स्वचालित रूप से किसी भी Process ऑब्जेक्ट द्वारा विरासत में प्राप्त होगा जो वर्तमान प्रक्रिया बनाता है। इसका मतलब है कि (डिफ़ॉल्ट रूप से) एक बहु-प्रक्रिया कार्यक्रम की सभी प्रक्रियाएं एक एकल प्रमाणीकरण कुंजी साझा करेंगी, जिसका उपयोग स्वयं के बीच कनेक्शन स्थापित करते समय किया जा सकता है।

उपयुक्त प्रमाणीकरण कुंजी का उपयोग करके भी उत्पन्न किया जा सकता है os.urandom()

लॉगिंग

लॉगिंग के लिए कुछ सहायता उपलब्ध है। ध्यान दें, हालांकि, logging पैकेज साझा किए गए ताले का उपयोग नहीं करता है , इसलिए यह संभव है (हैंडलर प्रकार के आधार पर) विभिन्न प्रक्रियाओं के संदेशों के लिए मिलाया जाए।

multiprocessing.get_logger()

द्वारा उपयोग किए गए लकड़हारे को लौटाता है multiprocessing । यदि आवश्यक हो, तो एक नया बनाया जाएगा।

जब पहली बार बनाया लकड़हारे का स्तर logging.NOTSET और कोई डिफ़ॉल्ट हैंडलर नहीं है। इस लकड़हारे को भेजे गए संदेश मूल लकड़हारे को डिफ़ॉल्ट रूप से प्रचारित नहीं करेंगे।

ध्यान दें कि विंडोज पर बच्चे की प्रक्रियाएं केवल मूल प्रक्रिया के लकड़हारे के स्तर को विरासत में प्राप्त करेंगी - लकड़हारे के किसी अन्य अनुकूलन को विरासत में नहीं मिलेगा।

multiprocessing.log_to_stderr()

यह फ़ंक्शन कॉल को करता है, get_logger() लेकिन get_logger द्वारा बनाए गए लॉगर को वापस करने के अलावा, यह एक हैंडलर जोड़ता है जो sys.stderr प्रारूप का उपयोग करके आउटपुट भेजता है '[%(levelname)s/%(processName)s] %(message)s'

नीचे लॉग ऑन के साथ एक उदाहरण सत्र दिया गया है:

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

लॉगिंग स्तरों की पूरी तालिका के लिए, logging मॉड्यूल देखें ।

multiprocessing.dummy मॉड्यूल

multiprocessing.dummy एपीआई की नकल करता है, multiprocessing लेकिन threading मॉड्यूल के आसपास एक आवरण से अधिक नहीं है ।

प्रोग्रामिंग दिशानिर्देश

कुछ दिशानिर्देश और मुहावरे हैं जिनका उपयोग करते समय पालन किया जाना चाहिए multiprocessing

सभी शुरू करने के तरीके

निम्नलिखित सभी प्रारंभ विधियों पर लागू होता है।

साझा स्थिति से बचें

जहाँ तक संभव हो प्रक्रियाओं के बीच बड़ी मात्रा में डेटा को स्थानांतरित करने से बचने का प्रयास करना चाहिए।

निचले स्तर के सिंक्रनाइज़ेशन प्राइमेटिव का उपयोग करने के बजाय प्रक्रियाओं के बीच संचार के लिए कतारों या पाइपों का उपयोग करने के लिए छड़ी करना सबसे अच्छा है।

Picklability

सुनिश्चित करें कि परदे के पीछे के तरीकों के तर्क दयनीय हैं।

परदे के पीछे की सुरक्षा

जब तक आप इसे लॉक से नहीं बचाते हैं तब तक एक से अधिक थ्रेड वाली प्रॉक्सी ऑब्जेक्ट का उपयोग न करें।

(एक ही प्रॉक्सी का उपयोग करके विभिन्न प्रक्रियाओं के साथ कोई समस्या नहीं है ।)

ज़ोंबी प्रक्रियाओं में शामिल होना

यूनिक्स पर जब एक प्रक्रिया खत्म होती है, लेकिन इसमें शामिल नहीं हुआ तो यह एक ज़ोंबी बन जाता है। बहुत अधिक कभी नहीं होना चाहिए क्योंकि हर बार एक नई प्रक्रिया शुरू होती है (या active_children() कहा जाता है) सभी पूर्ण की गई प्रक्रियाएं जो अभी तक शामिल नहीं हुई हैं, वे शामिल हो जाएंगी। इसके अलावा एक समाप्त प्रक्रिया के बुलावा प्रक्रिया में is_alive() शामिल हो जाएगा। यहां तक ​​कि आपके द्वारा शुरू की जाने वाली सभी प्रक्रियाओं में स्पष्ट रूप से शामिल होने के लिए शायद यह अच्छा अभ्यास है।

अचार / अनपिकल की तुलना में इनहेरिट करने के लिए बेहतर है

जब स्पॉन या फोर्स्कवर स्टार्ट का उपयोग करते हैं, तो कई प्रकारों को multiprocessing अचार बनाने की आवश्यकता होती है ताकि बच्चे की प्रक्रियाएं उनका उपयोग कर सकें। हालांकि, एक को आम तौर पर पाइप या कतारों का उपयोग करके साझा वस्तुओं को अन्य प्रक्रियाओं में भेजने से बचना चाहिए। इसके बजाय आपको कार्यक्रम की व्यवस्था करनी चाहिए ताकि एक प्रक्रिया जिसे साझा संसाधनों तक पहुंच की आवश्यकता हो, उसे कहीं और पूर्वजों की प्रक्रिया से विरासत में मिल सके।

प्रक्रियाओं को समाप्त करने से बचें

terminate() किसी प्रक्रिया को रोकने के लिए विधि का उपयोग करना किसी भी साझा संसाधनों (जैसे कि ताले, सेमीफोरर्स, पाइप और कतारों) का कारण बनने के लिए उत्तरदायी है जो वर्तमान में प्रक्रिया द्वारा उपयोग की जा रही है, अन्य प्रक्रियाओं के टूटने या अनुपलब्ध होने के लिए।

इसलिए यह केवल terminate() उन प्रक्रियाओं पर उपयोग करने पर विचार करना सबसे अच्छा है जो कभी भी किसी साझा संसाधनों का उपयोग नहीं करते हैं।

उन प्रक्रियाओं में शामिल होना जो कतारों का उपयोग करते हैं

ध्यान रखें कि एक प्रक्रिया जिसने वस्तुओं को कतार में रखा है, समाप्त होने से पहले प्रतीक्षा करेगी जब तक कि सभी बफर आइटम अंतर्निहित पाइप के लिए "फीडर" थ्रेड द्वारा खिलाए नहीं जाते हैं। (बच्चे की प्रक्रिया JoinableQueue.cancel_join_thread इस व्यवहार से बचने के लिए कतार की विधि कह सकती है।)

इसका मतलब यह है कि जब भी आप एक कतार का उपयोग करते हैं, तो आपको यह सुनिश्चित करने की आवश्यकता होती है कि कतार में लगाए गए सभी आइटम अंततः प्रक्रिया में शामिल होने से पहले हटा दिए जाएंगे। अन्यथा आप यह सुनिश्चित नहीं कर सकते हैं कि जिन प्रक्रियाओं ने कतार में वस्तुओं को रखा है वे समाप्त हो जाएंगे। यह भी याद रखें कि गैर-शैतानी प्रक्रियाएं स्वचालित रूप से शामिल हो जाएंगी।

एक उदाहरण जो गतिरोध है वह निम्नलिखित है:

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

यहां एक फिक्स अंतिम दो लाइनों को स्वैप करना होगा (या बस p.join() लाइन को हटा दें )।

बाल प्रक्रियाओं के लिए स्पष्ट रूप से संसाधन पास करें

कांटा शुरू करने की विधि का उपयोग करने वाले यूनिक्स पर , एक बच्चे की प्रक्रिया एक वैश्विक संसाधन का उपयोग करके मूल प्रक्रिया में निर्मित साझा संसाधन का उपयोग कर सकती है। हालांकि, बच्चे की प्रक्रिया के लिए निर्माता के तर्क के रूप में ऑब्जेक्ट को पास करना बेहतर है।

कोड (संभावित) को विंडोज के साथ संगत बनाने के अलावा और दूसरी शुरुआत के तरीकों से यह भी सुनिश्चित होता है कि जब तक बच्चे की प्रक्रिया अभी भी जीवित है तब तक वस्तु मूल प्रक्रिया में एकत्रित कचरा नहीं होगी। यह महत्वपूर्ण हो सकता है अगर कुछ संसाधन को मुक्त किया जाता है जब ऑब्जेक्ट को मूल प्रक्रिया में एकत्र किया जाता है।

इसलिए उदाहरण के लिए

from multiprocessing import Process, Lock

def f():
    ... do something using "lock" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

के रूप में फिर से लिखा जाना चाहिए

from multiprocessing import Process, Lock

def f(l):
    ... do something using "l" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

sys.stdin "ऑब्जेक्ट जैसी फ़ाइल" के साथ प्रतिस्थापित करने से सावधान रहें

multiprocessing मूल रूप से बिना शर्त बुलाया:

os.close(sys.stdin.fileno())

में multiprocessing.Process._bootstrap() विधि - इस प्रक्रियाओं-प्रक्रियाओं में साथ मुद्दों में हुई। इसे बदल दिया गया है:

sys.stdin.close()
sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)

जो एक दूसरे के साथ टकराने वाली प्रक्रियाओं के मूलभूत मुद्दे को हल करता है, जिसके परिणामस्वरूप एक खराब फाइल डिस्क्रिप्टर त्रुटि होती है, लेकिन उन अनुप्रयोगों के लिए एक संभावित खतरे का परिचय देती है जो sys.stdin आउटपुट बफ़रिंग के साथ "फ़ाइल जैसी ऑब्जेक्ट" से बदलते हैं । यह खतरा यह है कि यदि कई प्रक्रियाएं close() इस फाइल की तरह ऑब्जेक्ट पर कॉल करती हैं, तो यह एक ही डेटा को कई बार ऑब्जेक्ट पर फ़्लश किया जा सकता है, जिसके परिणामस्वरूप भ्रष्टाचार होता है।

यदि आप फ़ाइल जैसी वस्तु लिखते हैं और अपनी कैशिंग को कार्यान्वित करते हैं, तो जब भी आप कैश में संलग्न होते हैं, और जब पीआईडी ​​बदल जाता है तो कैश को त्याग कर आप इसे संग्रहीत कर सकते हैं। उदाहरण के लिए:

@property
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    return self._cache

अधिक जानकारी के लिए, bpo-5155 , bpo-5313 और bpo-5331

अंडे और forkserver शुरू तरीकों

कुछ अतिरिक्त प्रतिबंध हैं जो कांटा प्रारंभ विधि पर लागू नहीं होते हैं ।

अधिक पसंद है

सुनिश्चित करें कि सभी तर्क देने योग्य Process.__init__() हैं। इसके अलावा, अगर आप उपवर्ग करते हैं, Process तो सुनिश्चित करें कि उदाहरणों को start() विधि कहा जाता है , जब उठाएगा ।

सार्वत्रिक चर

इस बात को ध्यान में रखें कि यदि चाइल्ड प्रोसेस में कोड रन एक ग्लोबल वैरिएबल को एक्सेस करने की कोशिश करता है, तो जो वैल्यू देखी जाती है (यदि कोई हो) उस समय के पेरेंट प्रोसेस में वैल्यू जैसी नहीं होती है start()

हालाँकि, वैश्विक चर जो कि केवल मॉड्यूल स्तर के स्थिरांक हैं, कोई समस्या नहीं है।

मुख्य मॉड्यूल का सुरक्षित आयात

सुनिश्चित करें कि बिना अनपेक्षित साइड इफेक्ट्स (जैसे कि एक नई प्रक्रिया शुरू करना) के बिना मुख्य मॉड्यूल को नए पायथन दुभाषिया द्वारा सुरक्षित रूप से आयात किया जा सकता है।

उदाहरण के लिए, निम्न मॉड्यूल को चलाने वाले स्पॉन या फोर्स्कवर स्टार्ट विधि का उपयोग करना निम्नलिखित में विफल होगा RuntimeError :

from multiprocessing import Process

def foo():
    print('hello')

p = Process(target=foo)
p.start()

इसके बजाय किसी if __name__ == '__main__': को इस प्रकार से प्रोग्राम के "एंट्री पॉइंट" की सुरक्षा करनी चाहिए :

from multiprocessing import Process, freeze_support, set_start_method

def foo():
    print('hello')

if __name__ == '__main__':
    freeze_support()
    set_start_method('spawn')
    p = Process(target=foo)
    p.start()

( freeze_support() यदि प्रोग्राम को फ्रोजन के बजाय सामान्य रूप से चलाया जाएगा तो लाइन को छोड़ा जा सकता है।)

यह नव स्पंदित पायथन दुभाषिया को मॉड्यूल को सुरक्षित रूप से आयात करने और फिर मॉड्यूल के foo() कार्य को चलाने की अनुमति देता है ।

यदि पूल या प्रबंधक मुख्य मॉड्यूल में बनाया गया है तो इसी तरह के प्रतिबंध लागू होते हैं।

उदाहरण

अनुकूलित प्रबंधक और सम्‍मिलन बनाने और उपयोग करने के तरीके का प्रदर्शन:

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo:
    def f(self):
        print('you called Foo.f()')
    def g(self):
        print('you called Foo.g()')
    def _h(self):
        print('you called Foo._h()')

# A simple generator function
def baz():
    for i in range(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ['__next__']
    def __iter__(self):
        return self
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print('-' * 20)

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print('-' * 20)

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print('-' * 20)

    it = manager.baz()
    for i in it:
        print('<%d>' % i, end=' ')
    print()

    print('-' * 20)

    op = manager.operator()
    print('op.add(23, 45) =', op.add(23, 45))
    print('op.pow(2, 94) =', op.pow(2, 94))
    print('op._exposed_ =', op._exposed_)

##

if __name__ == '__main__':
    freeze_support()
    test()

का उपयोग कर Pool :

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

#
# Test code
#

def test():
    PROCESSES = 4
    print('Creating pool with %d processes\n' % PROCESSES)

    with multiprocessing.Pool(PROCESSES) as pool:
        #
        # Tests
        #

        TASKS = [(mul, (i, 7)) for i in range(10)] + \
                [(plus, (i, 8)) for i in range(10)]

        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('Ordered results using pool.apply_async():')
        for r in results:
            print('\t', r.get())
        print()

        print('Ordered results using pool.imap():')
        for x in imap_it:
            print('\t', x)
        print()

        print('Unordered results using pool.imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)
        print()

        print('Ordered results using pool.map() --- will block till complete:')
        for x in pool.map(calculatestar, TASKS):
            print('\t', x)
        print()

        #
        # Test error handling
        #

        print('Testing error handling:')

        try:
            print(pool.apply(f, (5,)))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(pool.map(f, list(range(10))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(list(pool.imap(f, list(range(10)))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')

        it = pool.imap(f, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')

        assert i == 9
        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
        print()

        #
        # Testing timeouts
        #

        print('Testing ApplyResult.get() with timeout:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % res.get(0.02))
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()

        print('Testing IMapIterator.next() with timeout:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % it.next(0.02))
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()


if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

एक उदाहरण जो दिखा रहा है कि श्रमिकों की प्रक्रियाओं के संग्रह में कार्यों को खिलाने के लिए कतारों का उपयोग कैसे करें और परिणाम एकत्र करें:

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()

Original text