Julia 1.0 - Distributed Computing

वितरित अभिकलन




julia

वितरित अभिकलन

Distributed.addprocs

addprocs(manager::ClusterManager; kwargs...) -> List of process identifiers

निर्दिष्ट क्लस्टर प्रबंधक के माध्यम से कार्यकर्ता प्रक्रियाओं को लॉन्च करता है।

उदाहरण के लिए, बियोवुल्फ़ क्लस्टर्स पैकेज ClusterManagers.jl में लागू कस्टम क्लस्टर प्रबंधक के माध्यम से समर्थित हैं।

एक नए लॉन्च किए गए वर्कर की संख्या मास्टर से कनेक्शन स्थापना की प्रतीक्षा JULIA_WORKER_TIMEOUT है, वर्कर प्रक्रिया के वातावरण में परिवर्तनशील JULIA_WORKER_TIMEOUT माध्यम से निर्दिष्ट किया जा सकता है। परिवहन के रूप में केवल टीसीपी / आईपी का उपयोग करते समय प्रासंगिक।

source
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> List of process identifiers

SSH के माध्यम से दूरस्थ मशीनों पर प्रक्रियाएँ जोड़ें। julia को प्रत्येक नोड पर एक ही स्थान पर स्थापित करने की आवश्यकता है, या एक साझा फ़ाइल सिस्टम के माध्यम से उपलब्ध होने की आवश्यकता है।

machines मशीन की विशिष्टताओं का एक वेक्टर है। प्रत्येक विनिर्देश के लिए श्रमिकों को शुरू किया जाता है।

एक मशीन विनिर्देश या तो एक स्ट्रिंग machine_spec या टपल है - (machine_spec, count)

machine_spec एक प्रकार की स्ट्रिंग है [[email protected]]host[:port] [bind_addr[:port]]user वर्तमान उपयोगकर्ता के लिए डिफॉल्ट करता है, मानक ssh पोर्ट पर पोर्ट करता है। यदि [bind_addr[:port]] निर्दिष्ट किया गया है, तो अन्य कर्मचारी निर्दिष्ट bind_addr और port पर इस कार्यकर्ता से bind_addr

count निर्दिष्ट होस्ट पर लॉन्च किए जाने वाले श्रमिकों की संख्या है। यदि इसे निर्दिष्ट किया जाता है :auto इसे कई श्रमिकों के रूप में लॉन्च करेगा जो विशिष्ट होस्ट पर सीपीयू थ्रेड की संख्या के रूप में हैं।

कीवर्ड तर्क:

  • tunnel : यदि true तो मास्टर प्रक्रिया से कार्यकर्ता को जोड़ने के लिए SSH सुरंग का उपयोग किया जाएगा। डिफ़ॉल्ट false

  • sshflags : अतिरिक्त ssh विकल्प निर्दिष्ट करता है, जैसे sshflags=`-i /home/foo/bar.pem

  • max_parallel : एक होस्ट में समानांतर में जुड़े श्रमिकों की अधिकतम संख्या को निर्दिष्ट करता है। 10 से चूक।

  • dir : श्रमिकों पर कार्यशील निर्देशिका निर्दिष्ट करता है। होस्ट की वर्तमान निर्देशिका में pwd() जैसा कि pwd() द्वारा पाया गया है pwd() )

  • enable_threaded_blas : यदि true तो enable_threaded_blas अतिरिक्त प्रक्रियाओं में कई थ्रेड्स पर चलेगा। डिफ़ॉल्ट false

  • नाम: julia निष्पादन योग्य का नाम। जैसा कि मामला हो सकता है "$(Sys.BINDIR)/julia" या "$(Sys.BINDIR)/julia-debug" के लिए "$(Sys.BINDIR)/julia-debug"

  • exeflags : अतिरिक्त झंडे कार्यकर्ता प्रक्रियाओं को पारित कर दिया।

  • topology : निर्दिष्ट करता है कि श्रमिक एक दूसरे से कैसे जुड़ते हैं। असंबद्ध श्रमिकों के बीच एक संदेश भेजने से एक त्रुटि होती है।

    • topology=:all_to_all : सभी प्रक्रियाएं एक-दूसरे से जुड़ी हुई हैं। डिफ़ॉल्ट।

    • topology=:master_worker : केवल चालक प्रक्रिया, अर्थात pid 1 श्रमिकों से जुड़ती है। श्रमिक एक-दूसरे से नहीं जुड़ते हैं।

    • topology=:custom : क्लस्टर मैनेजर की launch विधि connect_idents ident और connect_idents में WorkerConfig जरिए कनेक्शन टोपोलॉजी को निर्दिष्ट WorkerConfig । क्लस्टर प्रबंधक पहचान पहचान वाला एक कार्यकर्ता connect_idents में निर्दिष्ट सभी श्रमिकों से कनेक्ट होगा।

  • lazy : केवल topology=:all_to_all साथ लागू: topology=:all_to_all । यदि true , तो वर्कर-वर्कर कनेक्शन सेटअप लेज़ीली होते हैं, अर्थात वे श्रमिकों के बीच रिमोट कॉल के पहले उदाहरण में सेटअप होते हैं। डिफ़ॉल्ट सत्य है।

पर्यावरण चर :

यदि मास्टर प्रक्रिया 60.0 सेकंड के भीतर एक नए लॉन्च किए गए कार्यकर्ता के साथ संबंध स्थापित करने में विफल रहती है, तो कार्यकर्ता इसे एक घातक स्थिति के रूप में मानता है और समाप्त करता है। इस समयसीमा को पर्यावरण चर JULIA_WORKER_TIMEOUT माध्यम से नियंत्रित किया जा सकता है। मास्टर प्रक्रिया पर JULIA_WORKER_TIMEOUT का मान सेकंड की संख्या निर्दिष्ट करता है जो एक नया लॉन्च किया गया कार्यकर्ता कनेक्शन स्थापना के लिए प्रतीक्षा करता है।

source
addprocs(; kwargs...) -> List of process identifiers

addprocs(Sys.CPU_THREADS; kwargs...) बराबर

ध्यान दें कि श्रमिक अन्य चलने वाली प्रक्रियाओं में से कोई भी .julia/config/startup.jl स्टार्टअप. .julia/config/startup.jl स्टार्टअप स्क्रिप्ट नहीं चलाते हैं, न ही वे अपने वैश्विक राज्य (जैसे वैश्विक चर, नई विधि परिभाषाएं, और लोड किए गए मॉड्यूल) को सिंक्रनाइज़ करते हैं।

source
addprocs(np::Integer; restrict=true, kwargs...) -> List of process identifiers

इन-बिल्ट LocalManager का उपयोग करके श्रमिकों को लॉन्च करता है जो केवल स्थानीय होस्ट पर श्रमिकों को लॉन्च करता है। इसका उपयोग कई कोर का लाभ उठाने के लिए किया जा सकता है। addprocs(4) स्थानीय मशीन पर 4 प्रक्रियाएँ जोड़ेगा। यदि restrict true , तो बंधन 127.0.0.1 तक सीमित है। कीवर्ड में dir , exename , exeflags , topology , lazy और enable_threaded_blas हैं जो addprocs(machines) लिए प्रलेखित के समान प्रभाव addprocs(machines)

source

Distributed.nprocs फंक्शन

nprocs()

उपलब्ध प्रक्रियाओं की संख्या प्राप्त करें।

उदाहरण

julia> nprocs()
3

julia> workers()
5-element Array{Int64,1}:
 2
 3
source

Distributed.nworkers फंक्शन

nworkers()

उपलब्ध कार्यकर्ता प्रक्रियाओं की संख्या प्राप्त करें। यह nprocs() से कम है। nprocs() बराबर अगर nprocs() == 1

उदाहरण

$ julia -p 5

julia> nprocs()
6

julia> nworkers()
5
source

Distributed.procs विधि विधि

procs()

पीआईडी ​​1 (जो workers() द्वारा शामिल नहीं है workers() सहित सभी प्रक्रिया पहचानकर्ताओं की सूची लौटाएं।

उदाहरण

$ julia -p 5

julia> procs()
3-element Array{Int64,1}:
 1
 2
 3
source

Distributed.procs विधि विधि

procs(pid::Integer)

समान भौतिक नोड पर सभी प्रक्रिया पहचानकर्ताओं की सूची लौटाएं। विशेष रूप से pid रूप में एक ही आईपी पते के लिए बाध्य सभी श्रमिकों को वापस कर दिया जाता है।

source

Distributed.workers

workers()

सभी कार्यकर्ता प्रक्रिया पहचानकर्ताओं की सूची लौटाएं।

उदाहरण

$ julia -p 5

julia> workers()
2-element Array{Int64,1}:
 2
 3
source

Distributed.rmprocs

rmprocs(pids...; waitfor=typemax(Int))

निर्दिष्ट कार्यकर्ता निकालें। ध्यान दें कि केवल 1 प्रक्रिया श्रमिकों को जोड़ या हटा सकती है।

तर्क यह बताता है कि श्रमिकों को बंद करने के लिए कितने समय तक इंतजार करना होगा:

  • यदि अनिर्दिष्ट है, तो rmprocs तब तक प्रतीक्षा करेंगे जब तक कि सभी अनुरोधित pids हटा नहीं दिए जाते।
  • यदि सभी श्रमिक अनुरोध किए गए waitfor सेकंड से पहले समाप्त नहीं किए जा सकते, तो एक ErrorException उठाया जाता है।
  • 0 की waitfor साथ, कॉल किसी अलग कार्य में हटाने के लिए निर्धारित किए गए कर्मचारियों के साथ तुरंत लौटता है। अनुसूचित Task ऑब्जेक्ट वापस आ गया है। उपयोगकर्ता को किसी अन्य समानांतर कॉल को आमंत्रित करने से पहले कार्य पर wait को कॉल wait चाहिए।

उदाहरण

$ julia -p 5

julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0

julia> wait(t)

julia> workers()
3-element Array{Int64,1}:
 4
 5
 6
source

Distributed.interrupt

interrupt(pids::Integer...)

निर्दिष्ट श्रमिकों पर वर्तमान निष्पादन कार्य को बाधित करें। यह स्थानीय मशीन पर Ctrl-C दबाने के बराबर है। यदि कोई तर्क नहीं दिया जाता है, तो सभी कार्यकर्ता बाधित होते हैं।

source
interrupt(pids::AbstractVector=workers())

निर्दिष्ट श्रमिकों पर वर्तमान निष्पादन कार्य को बाधित करें। यह स्थानीय मशीन पर Ctrl-C दबाने के बराबर है। यदि कोई तर्क नहीं दिया जाता है, तो सभी कार्यकर्ता बाधित होते हैं।

source

Distributed.myid

myid()

वर्तमान प्रक्रिया की आईडी प्राप्त करें।

उदाहरण

julia> myid()
1

julia> remotecall_fetch(() -> myid(), 4)
4
source

Distributed.pmap

pmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection

उपलब्ध कार्यकर्ताओं और कार्यों का उपयोग करके प्रत्येक तत्व को f लागू करके संग्रह c को ट्रांसफॉर्म करें।

कई संग्रह तर्कों के लिए, f एलिमेंटवाइज लागू करें।

ध्यान दें कि सभी कार्यकर्ता प्रक्रियाओं के लिए f उपलब्ध कराया जाना चाहिए; विवरण के लिए कोड उपलब्धता और लोडिंग पैकेज देखें।

यदि एक कार्यकर्ता पूल निर्दिष्ट नहीं है, तो सभी उपलब्ध श्रमिक, अर्थात, डिफ़ॉल्ट कार्यकर्ता पूल का उपयोग किया जाता है।

डिफ़ॉल्ट रूप से, pmap सभी निर्दिष्ट श्रमिकों पर गणना वितरित करता है। केवल स्थानीय प्रक्रिया का उपयोग करने और कार्यों को वितरित करने के लिए, distributed=false निर्दिष्ट करें। यह asyncmap का उपयोग करने के बराबर है। उदाहरण के लिए, pmap(f, c; distributed=false) asyncmap(f,c; ntasks=()->nworkers()) बराबर है asyncmap(f,c; ntasks=()->nworkers())

pmap भी pmap तर्क के माध्यम से प्रक्रियाओं और कार्यों के मिश्रण का उपयोग कर सकता है। 1 से अधिक बैच के आकार के लिए, संग्रह को कई बैचों में संसाधित किया जाता है, प्रत्येक की लंबाई batch_size या उससे कम होती है। एक बैच को एक नि: शुल्क कार्यकर्ता के लिए एक अनुरोध के रूप में भेजा जाता है, जहां एक स्थानीय asyncmap कई समवर्ती कार्यों का उपयोग करके बैच से तत्वों को संसाधित करता है।

कोई भी त्रुटि pmap को शेष संग्रह को संसाधित करने से pmap है। इस व्यवहार को ओवरराइड करने के लिए आप एक त्रुटि हैंडलिंग फ़ंक्शन को on_error माध्यम से निर्दिष्ट कर सकते हैं जो एकल तर्क, अर्थात अपवाद में लेता है। फ़ंक्शन त्रुटि को पुन: पेश करके प्रसंस्करण को रोक सकता है, या, जारी रखने के लिए, किसी भी मूल्य को वापस कर सकता है जो कॉल करने वाले के परिणाम के साथ इनलाइन वापस कर दिया जाता है।

निम्नलिखित दो उदाहरणों पर विचार करें। पहले वाला अपवाद ऑब्जेक्ट इनलाइन लौटाता है, दूसरा किसी अपवाद के स्थान पर 0:

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
 1
  ErrorException("foo")
 3
  ErrorException("foo")

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
 1
 0
 3
 0

असफल संगणना को पुन: प्रयास करके त्रुटियों को भी संभाला जा सकता है। कीवर्ड तर्क retry_delays और retry_check को कीवर्ड तर्क delays रूप में retry delays और क्रमशः retry_check लिए पारित किया जाता है। यदि बैचिंग निर्दिष्ट है, और एक पूरा बैच विफल रहता है, तो बैच में सभी आइटम पुनर्प्राप्त किए जाते हैं।

ध्यान दें कि यदि on_error और retry_delays दोनों निर्दिष्ट हैं, तो on_error हुक को पुन: प्रयास करने से पहले कहा जाता है। यदि on_error एक अपवाद को नहीं फेंकता (या on_error ) करता है, तो तत्व को वापस नहीं लिया जाएगा।

उदाहरण: त्रुटियों पर, रिट के बीच किसी भी देरी के बिना अधिकतम 3 बार एक तत्व पर f को पुनः प्रयास करें।

pmap(f, c; retry_delays = zeros(3))

उदाहरण: केवल तभी पुन: प्रयास करें यदि अपवाद प्रकार के नहीं है InexactError , तेजी से बढ़ रही देरी के साथ 3 गुना तक। सभी InexactError घटनाओं के लिए जगह में एक NaN लौटें।

pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow(e)), retry_delays = ExponentialBackOff(n = 3))
source

Distributed.RemoteException प्रकार

RemoteException(captured)

दूरस्थ संगणना पर अपवादों को पकड़ लिया जाता है और स्थानीय स्तर पर पुनर्विचार किया जाता है। एक RemoteException कार्यकर्ता के pid और एक अपवादित अपवाद को लपेटता है। एक CapturedException दूरस्थ अपवाद और कॉल स्टैक के क्रमिक रूप को CapturedException करता है जब अपवाद उठाया गया था।

source

Distributed.Future प्रकार

Future(pid::Integer=myid())

प्रक्रिया pid पर Future बनाएं। डिफ़ॉल्ट pid वर्तमान प्रक्रिया है।

source

Distributed.RemoteChannel प्रकार

RemoteChannel(pid::Integer=myid())

प्रक्रिया pid पर एक Channel{Any}(1) संदर्भ दें। डिफ़ॉल्ट pid वर्तमान प्रक्रिया है।

RemoteChannel(f::Function, pid::Integer=myid())

एक विशिष्ट आकार और प्रकार के दूरस्थ चैनलों के संदर्भ बनाएं। f एक ऐसा कार्य है जिसे जब pid पर निष्पादित किया जाता है तो उसे AbstractChannel कार्यान्वयन को वापस करना चाहिए।

उदाहरण के लिए, RemoteChannel(()->Channel{Int}(10), pid) , प्रकार Int चैनल का संदर्भ लौटाएगा और 10 pid पर आकार देगा।

डिफ़ॉल्ट pid वर्तमान प्रक्रिया है।

source

Base.wait फंक्शन

wait([x])

तर्क के प्रकार के आधार पर कुछ घटना होने तक वर्तमान कार्य को रोकें:

  • Channel : Channel संलग्न किए जाने वाले मूल्य के लिए प्रतीक्षा करें।
  • Condition : किसी शर्त पर notify लिए प्रतीक्षा notify
  • Process : एक प्रक्रिया या प्रक्रिया श्रृंखला से बाहर निकलने के लिए प्रतीक्षा करें। सफलता या विफलता का निर्धारण करने के लिए एक प्रक्रिया के exitcode फ़ील्ड का उपयोग किया जा सकता है।
  • Task : Task समाप्त करने के लिए प्रतीक्षा करें। यदि कार्य अपवाद के साथ विफल हो जाता है, तो अपवाद प्रचारित किया जाता है (उस कार्य में पुन: फेंक दिया जाता है जिसे wait कहा जाता है)।
  • RawFD : फाइल डिस्क्रिप्टर पर बदलाव के लिए प्रतीक्षा करें ( FileWatching पैकेज देखें)।

यदि कोई तर्क पारित नहीं होता है, तो कार्य अपरिभाषित अवधि के लिए ब्लॉक हो जाता है। किसी कार्य को केवल schedule या yieldto लिए एक स्पष्ट कॉल द्वारा पुनः आरंभ किया जा सकता है।

कार्यवाही से पहले प्रतीक्षा-की शर्त पूरी करने के लिए अक्सर wait को while भीतर कहा जाता है।

source
wait(r::Future)

निर्दिष्ट Future लिए एक मूल्य उपलब्ध होने की प्रतीक्षा करें।

source
wait(r::RemoteChannel, args...)

किसी मूल्य के लिए निर्दिष्ट RemoteChannel पर उपलब्ध होने की RemoteChannel

source

Base.fetch विधि

fetch(x)

प्रतीक्षा करता है और x के प्रकार के आधार पर x से मूल्य प्राप्त करता है:

  • Future : प्रतीक्षा करें और Future का मूल्य प्राप्त करें। प्राप्त मूल्य स्थानीय रूप से कैश किया गया है। एक ही संदर्भ पर fetch लिए आगे कॉल कैश्ड मान वापस करते हैं। यदि दूरस्थ मान अपवाद है, तो दूरस्थ अपवाद को फेंकता है जो दूरस्थ अपवाद और बैकट्रेस को कैप्चर करता है।
  • RemoteChannel : प्रतीक्षा करें और दूरस्थ संदर्भ का मान प्राप्त करें। उठाए गए अपवाद Future लिए समान हैं।

निकाले गए आइटम को नहीं निकालता है।

source

Distributed.remotecall .remotecall विधि

remotecall(f, id::Integer, args...; kwargs...) -> Future

किसी फ़ंक्शन को निर्दिष्ट प्रक्रिया पर दिए गए तर्कों पर अतुल्यकालिक रूप से कॉल करें। एक Future लौटें। कीवर्ड तर्क, यदि कोई हो, f माध्यम से पारित किए जाते हैं।

source

Distributed.remotecall_wait .remotecall_wait विधि

remotecall_wait(f, id::Integer, args...; kwargs...)

कार्यकर्ता आईडी id द्वारा निर्दिष्ट Worker पर एक संदेश में एक तेज wait(remotecall(...)) । कीवर्ड तर्क, यदि कोई हो, f माध्यम से पारित किए जाते हैं।

wait और remotecall भी देखें।

source

Distributed.remotecall_fetch .remotecall_fetch विधि

remotecall_fetch(f, id::Integer, args...; kwargs...)

एक संदेश में fetch(remotecall(...)) । कीवर्ड तर्क, यदि कोई हो, f माध्यम से पारित किए जाते हैं। किसी भी दूरस्थ अपवाद को एक RemoteException में कैप्चर किया RemoteException और फेंक दिया जाता है।

यह भी देखें और remotecall

उदाहरण

$ julia -p 2

julia> remotecall_fetch(sqrt, 2, 4)
2.0

julia> remotecall_fetch(sqrt, 2, -4)
ERROR: On worker 2:
DomainError with -4.0:
sqrt will only return a complex result if called with a complex argument. Try sqrt(Complex(x)).
...
source

Distributed.remote_do .remote_do विधि

remote_do(f, id::Integer, args...; kwargs...) -> nothing

एसिंक्रोनस रूप से कार्यकर्ता id पर f remotecall विपरीत, यह गणना के परिणाम को संग्रहीत नहीं करता है, न ही इसके पूरा होने की प्रतीक्षा करने का एक तरीका है।

एक सफल आह्वान इंगित करता है कि दूरस्थ नोड पर निष्पादन के लिए अनुरोध स्वीकार कर लिया गया है।

एक ही कार्यकर्ता को लगातार remotecall करने के क्रम में क्रमबद्ध किया जाता है, जबकि दूरस्थ कार्यकर्ता पर निष्पादन का क्रम अनिर्धारित होता है। उदाहरण के लिए, remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) उस क्रम में f2 और f3 बाद f1 कॉल को क्रमबद्ध करेगा। हालांकि, यह गारंटी नहीं है कि कार्यकर्ता 2 पर f3 से पहले f1 निष्पादित किया जाता है।

दूरदराज के कार्यकर्ता पर stderr करने के लिए f द्वारा फेंक दिए गए कुछ अपवाद मुद्रित होते हैं।

कीवर्ड तर्क, यदि कोई हो, f माध्यम से पारित किए जाते हैं।

source

Base.put! तरीका

put!(rr::RemoteChannel, args...)

RemoteChannel लिए मानों का एक सेट स्टोर करें। यदि चैनल भरा हुआ है, तो अंतरिक्ष उपलब्ध होने तक ब्लॉक होता है। पहला तर्क लौटाओ।

source

Base.put! तरीका

put!(rr::Future, v)

मान को Future rr स्टोर करें। Future लेखन एक बार सुदूर संदर्भ हैं। एक put! पहले से ही सेट पर Future एक Exception फेंकता है। सभी एसिंक्रोनस रिमोट कॉल Future एस वापस करते हैं और पूरा होने पर कॉल के रिटर्न वैल्यू को सेट करते हैं।

source

Base.take! तरीका

take!(rr::RemoteChannel, args...)

RemoteChannel rr से मूल्य प्राप्त करें, इस प्रक्रिया में मूल्य निकालते हैं।

source

Base.isready विधि

isready(rr::RemoteChannel, args...)

यह निर्धारित करें कि क्या RemoteChannel का मान इसमें संग्रहीत है। ध्यान दें कि यह फ़ंक्शन दौड़ की स्थिति पैदा कर सकता है, क्योंकि जब तक आप इसका परिणाम प्राप्त करते हैं तब तक यह सच नहीं हो सकता है। हालाँकि, इसे Future सुरक्षित रूप से उपयोग किया जा सकता है क्योंकि इन्हें केवल एक बार सौंपा जाता है।

source

Base.isready विधि

isready(rr::Future)

निर्धारित करें कि क्या Future इसका मान संग्रहीत है या नहीं।

यदि तर्क Future एक अलग नोड के स्वामित्व में है, तो यह कॉल उत्तर की प्रतीक्षा करने के लिए ब्लॉक हो जाएगा। इसके बजाय एक अलग कार्य में rr लिए प्रतीक्षा करने या प्रॉक्सी के रूप में स्थानीय Channel का उपयोग करने की सिफारिश की जाती है:

c = Channel(1)
@async put!(c, remotecall_fetch(long_computation, p))
isready(c)  # will not block
source

Distributed.WorkerPool प्रकार

WorkerPool(workers::Vector{Int})

वर्कर आईडी के वेक्टर से WorkerPool बनाएं।

उदाहरण

$ julia -p 3

julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
source

Distributed.CachingPool प्रकार

CachingPool(workers::Vector{Int})

एक AbstractWorkerPool का कार्यान्वयन। remote , remotecall_fetch , pmap (और अन्य दूरस्थ कॉल जो दूरस्थ रूप से कार्यों को निष्पादित करते हैं) कार्यकर्ता नोड्स पर क्रमबद्ध / deserialized कार्यों को बंद करने से लाभान्वित होते हैं, विशेष रूप से क्लोजर (जो बड़ी मात्रा में डेटा पर कब्जा कर सकते हैं)।

दूरस्थ कैश को CachingPool ऑब्जेक्ट के जीवनकाल के लिए बनाए रखा जाता है। कैश को पहले खाली करने के लिए, clear!(pool) उपयोग करें।

वैश्विक चर के लिए, केवल बाइंडिंग एक क्लोजर में कैप्चर की जाती है, डेटा नहीं। वैश्विक डेटा को कैप्चर करने के let ब्लॉक का उपयोग किया जा सकता है।

उदाहरण

const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
    pmap(wp, i -> sum(foo) + i, 1:100);
end

उपरोक्त प्रत्येक कार्यकर्ता को केवल एक बार foo ट्रांसफर करेगा।

source

Distributed.default_worker_pool .default_worker_pool फ़ंक्शन

default_worker_pool()

WorkerPool में निष्क्रिय workers() - जिनका उपयोग remote(f) और pmap (डिफ़ॉल्ट रूप से) द्वारा किया जाता है।

उदाहरण

$ julia -p 3

julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
source

Distributed.clear! तरीका

clear!(pool::CachingPool) -> pool

सभी भाग लेने वाले श्रमिकों से सभी कैश्ड कार्यों को हटा देता है।

source

Distributed.remote

remote([p::AbstractWorkerPool], f) -> Function

एक अनाम फ़ंक्शन लौटाएँ जो उपलब्ध कार्यकर्ता पर कार्य f को कार्यान्वित करता है (यदि प्रदान किया गया WorkerPool p से खींचा गया है) remotecall_fetch का उपयोग remotecall_fetch

source

Distributed.remotecall .remotecall विधि

remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

WorkerPool संस्करण remotecall(f, pid, ....) । प्रतीक्षा करें और pool से एक मुफ्त कर्मचारी लें और remotecall पर एक remotecall प्रदर्शन करें।

उदाहरण

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)

इस उदाहरण में, कार्य pid 2 पर चला, जिसे pid 1 से पुकारा गया।

source

Distributed.remotecall_wait .remotecall_wait विधि

remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

WorkerPool remotecall_wait(f, pid, ....) WorkerPool संस्करण। प्रतीक्षा करें और pool से एक स्वतंत्र कार्यकर्ता लें और remotecall_wait पर एक remotecall_wait प्रदर्शन करें।

उदाहरण

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)

julia> fetch(f)
0.9995177101692958
source

Distributed.remotecall_fetch .remotecall_fetch विधि

remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result

WorkerPool संस्करण remotecall_fetch(f, pid, ....) । प्रतीक्षा करता है और pool से एक स्वतंत्र कार्यकर्ता लेता है और remotecall_fetch पर एक remotecall_fetch करता है।

उदाहरण

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
source

Distributed.remote_do .remote_do विधि

remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing

WorkerPool संस्करण remote_do(f, pid, ....) । प्रतीक्षा करें और pool से एक मुफ्त कार्यकर्ता लें और remote_do पर एक remote_do करें।

source

Base.timedwait फंक्शन

timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1)

testcb true होने तक या सेकंड के लिए, जो भी पहले हो, प्रतीक्षा करता है। testcb को हर pollint सेकंड में चुना जाता है।

source

[email protected] मैक्रो

@spawn

एक अभिव्यक्ति के चारों ओर एक घनिष्ठता बनाएं और इसे स्वचालित रूप से चुनी गई प्रक्रिया पर चलाएं, परिणाम पर Future लौटाता है।

उदाहरण

julia> addprocs(3);

julia> f = @spawn myid()
Future(2, 1, 5, nothing)

julia> fetch(f)
2

julia> f = @spawn myid()
Future(3, 1, 7, nothing)

julia> fetch(f)
3
source

[email protected]

@spawnat

एक अभिव्यक्ति के आसपास एक क्लोजर बनाएं और प्रक्रिया p पर एसिंक्रोनस रूप से क्लोजर चलाएं। परिणाम पर एक Future लौटें। दो तर्कों, p और एक अभिव्यक्ति को स्वीकार करता है।

उदाहरण

julia> addprocs(1);

julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)

julia> fetch(f)
2
source

[email protected] मैक्रो

@fetch

समान fetch(@spawn expr) । देखें

उदाहरण

julia> addprocs(3);

julia> @fetch myid()
2

julia> @fetch myid()
3

julia> @fetch myid()
4

julia> @fetch myid()
2
source

[email protected] मैक्रो

@fetchfrom

समान fetch(@spawnat p expr) । देखें @spawnat और @spawnat

उदाहरण

julia> addprocs(3);

julia> @fetchfrom 2 myid()
2

julia> @fetchfrom 4 myid()
4
source

[email protected] मैक्रो

@async

एक Task में एक अभिव्यक्ति लपेटें और इसे स्थानीय मशीन के शेड्यूलर कतार में जोड़ें।

source

[email protected] मैक्रो

@sync

प्रतीक्षा करें जब तक @async , @spawn , @spawnat और @distributed सभी lexically- संलग्न उपयोग पूरे नहीं हो जाते। संलग्न async संचालनों द्वारा फेंके गए सभी अपवादों को एकत्रित किया जाता है और एक CompositeException अपवाद के रूप में फेंक दिया जाता है।

source

[email protected] मैक्रो

@distributed

फार्म की लूप के लिए एक वितरित मेमोरी, समानांतर:

@distributed [reducer] for var = range
    body
end

निर्दिष्ट श्रेणी का विभाजन किया गया है और सभी श्रमिकों में स्थानीय रूप से निष्पादित किया गया है। यदि कोई वैकल्पिक reducer फ़ंक्शन निर्दिष्ट किया जाता है, तो @distributed कॉलिंग प्रक्रिया पर अंतिम कमी के साथ प्रत्येक कार्यकर्ता पर स्थानीय कटौती करता है।

ध्यान दें कि @distributed फ़ंक्शन के बिना, @distributed एसिंक्रोनस रूप से निष्पादित होता है, अर्थात यह सभी उपलब्ध श्रमिकों पर स्वतंत्र कार्य करता है और पूरा होने का इंतजार किए बिना तुरंत लौटता है। पूरा होने की प्रतीक्षा करने के लिए, @sync साथ कॉल को उपसर्ग करें, जैसे:

@sync @distributed for var = range
    body
end
source

[email protected] मैक्रो

@everywhere [procs()] expr

सभी procs पर Main अंतर्गत एक अभिव्यक्ति निष्पादित करें। किसी भी प्रक्रिया में त्रुटियां एक समग्रता में एकत्र की जाती हैं और फेंक दी जाती हैं। उदाहरण के लिए:

@everywhere bar = 1

सभी प्रक्रियाओं पर Main.bar को परिभाषित करेगा।

@spawn और @spawnat विपरीत, @everywhere किसी भी स्थानीय चर को कैप्चर नहीं करता है। इसके बजाय, स्थानीय चर को प्रक्षेप के उपयोग से प्रसारित किया जा सकता है:

foo = 1
@everywhere bar = $foo

वैकल्पिक तर्क procs अभिव्यक्ति निष्पादित करने के लिए सभी प्रक्रियाओं के सबसेट को निर्दिष्ट करने की अनुमति देता है।

remotecall_eval(Main, procs, expr) को कॉल करने के लिए बराबर है।

source

Distributed.clear! तरीका

clear!(syms, pids=workers(); mod=Main)

nothing करने के लिए उन्हें शुरू करके मॉड्यूल में वैश्विक बंधनों को साफ करता है। syms Symbol या Symbol संग्रह का होना चाहिए। pids और mod प्रक्रियाओं और मॉड्यूल की पहचान करते हैं जिसमें वैश्विक चर को फिर से संगठित करना होता है। केवल उन्हीं नामों को परिभाषित किया गया है जो mod तहत परिभाषित हैं।

यदि वैश्विक स्थिरांक को साफ़ करने का अनुरोध किया जाता है तो एक अपवाद उठाया जाता है।

source

Distributed.remoteref_id ।remoteref_id फ़ंक्शन

remoteref_id(r::AbstractRemoteRef) -> RRID

Future s और RemoteChannel s को खेतों द्वारा पहचाना जाता है:

  • where - उस नोड को संदर्भित करता है जहाँ संदर्भ द्वारा संदर्भित अंतर्निहित वस्तु / भंडारण वास्तव में मौजूद है।

  • whence - नोड को संदर्भित करता है जिससे दूरस्थ संदर्भ बनाया गया था। ध्यान दें कि यह नोड से अलग है जहां अंतर्निहित ऑब्जेक्ट वास्तव में मौजूद है। उदाहरण के लिए, मास्टर प्रक्रिया से RemoteChannel(2) को कॉल करने पर 2 का मान और 1 का मान प्राप्त होगा।

  • id पूरे कार्यकर्ता के लिए अद्वितीय है जिसे कार्यकर्ता द्वारा निर्दिष्ट किया गया है।

एक साथ ले जाया गया, whence एक साथ सभी श्रमिकों के संदर्भ में विशिष्ट पहचान की पहचान की।

remoteref_id एक निम्न-स्तरीय API है जो एक RRID ऑब्जेक्ट को लौटाता है जो एक दूरस्थ संदर्भ के स्थान और id मान को लपेटता है।

source

Distributed.channel_from_id .hannel_from_id फ़ंक्शन

channel_from_id(id) -> c

एक निम्न-स्तरीय एपीआई जो remoteref_id द्वारा लौटाए गए id लिए बैकिंग remoteref_id लौटाता है। कॉल केवल उस नोड पर मान्य है जहां बैकिंग चैनल मौजूद है।

source

Distributed.worker_id_from_socket .worker_id_from_socket फ़ंक्शन

worker_id_from_socket(s) -> pid

एक निम्न-स्तरीय API, जो IO कनेक्शन या एक Worker दिया जाता है, वह उस Worker के pid को लौटाता है जिससे वह जुड़ा होता है। यह एक प्रकार के लिए कस्टम serialize तरीके लिखते समय उपयोगी है, जो प्राप्त प्रक्रिया आईडी के आधार पर बाहर लिखे गए डेटा को अनुकूलित करता है।

source
cluster_cookie() -> cookie

क्लस्टर कुकी वापस करें।

source
cluster_cookie(cookie) -> cookie

पारित कुकी को क्लस्टर कुकी के रूप में सेट करें, फिर उसे लौटाता है।

source

क्लस्टर प्रबंधक इंटरफ़ेस

यह इंटरफ़ेस विभिन्न क्लस्टर वातावरणों पर जूलिया श्रमिकों को लॉन्च करने और प्रबंधित करने के लिए एक तंत्र प्रदान करता है। बेस में दो प्रकार के प्रबंधक मौजूद हैं: LocalManager , एक ही मेजबान पर अतिरिक्त श्रमिकों को लॉन्च करने के लिए, और SSHManager , दूरस्थ मेजबान पर ssh माध्यम से लॉन्च करने के लिए। टीसीपी / आईपी सॉकेट का उपयोग प्रक्रियाओं के बीच संदेशों को जोड़ने और परिवहन के लिए किया जाता है। क्लस्टर मैनेजर के लिए एक अलग परिवहन प्रदान करना संभव है।

Distributed.launch समारोह

launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)

क्लस्टर प्रबंधकों द्वारा कार्यान्वित किया गया। इस फ़ंक्शन द्वारा लॉन्च किए गए प्रत्येक जूलिया कार्यकर्ता के लिए, उसे launched लिए एक WorkerConfig प्रविष्टि को launched करना चाहिए और launch_ntfy सूचित करना launch_ntfymanager द्वारा अनुरोध किए जाने के manager फ़ंक्शन सभी श्रमिकों से बाहर होना चाहिए। params सभी खोजशब्द तर्कों का एक शब्दकोष है जिसके साथ addprocs को बुलाया गया था।

source

Distributed.manage समारोह

manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)

क्लस्टर प्रबंधकों द्वारा कार्यान्वित किया गया। यह एक कार्यकर्ता के जीवनकाल के दौरान, उचित op मान के साथ मास्टर प्रक्रिया पर कहा जाता है:

  • with :register / :deregister जब किसी कार्यकर्ता को जूलिया वर्कर पूल से जोड़ा / हटाया जाता है।
  • साथ :interrupt जब interrupt(workers) कहा जाता है। ClusterManager को एक बाधा संकेत के साथ उपयुक्त कार्यकर्ता को संकेत देना चाहिए।
  • साथ :finalize सफाई उद्देश्यों के लिए :finalize
source

Base.kill विधि

kill(manager::ClusterManager, pid::Int, config::WorkerConfig)

क्लस्टर प्रबंधकों द्वारा कार्यान्वित किया गया। इसे मास्टर प्रक्रिया पर, rmprocs द्वारा कहा जाता है। यह pid द्वारा निर्दिष्ट दूरस्थ कार्यकर्ता को बाहर निकलने का कारण pid चाहिए। kill(manager::ClusterManager.....) pid पर एक दूरस्थ exit() निष्पादित करता है।

source

Sockets.connect विधि

connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)

कस्टम ट्रांसपोर्ट का उपयोग करके क्लस्टर प्रबंधकों द्वारा कार्यान्वित किया गया। यह आईडी के साथ आईडी के साथ निर्दिष्ट IO ऑब्जेक्ट के साथ कार्यकर्ता के लिए एक तार्किक कनेक्शन स्थापित करना चाहिए और IO ऑब्जेक्ट की एक जोड़ी वापस करना चाहिए। pid से वर्तमान प्रक्रिया तक के संदेश instrm से instrm , जबकि pid को भेजे जाने वाले संदेशों को आगे outstrm लिए लिखा जाएगा। कस्टम परिवहन कार्यान्वयन को यह सुनिश्चित करना होगा कि संदेश वितरित और पूरी तरह से और क्रम में प्राप्त हो। connect(manager::ClusterManager.....) टीसीपी / आईपी सॉकेट कनेक्शन को श्रमिकों के बीच स्थापित करता है।

source

Distributed.init_worker .it_worker फ़ंक्शन

init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())

कस्टम परिवहन को लागू करने वाले क्लस्टर प्रबंधकों द्वारा कॉल किया जाता है। यह एक कार्यकर्ता के रूप में एक नई शुरू की गई प्रक्रिया को आरंभ करता है। कमांड लाइन तर्क - काम --worker[=<cookie>] परिवहन के लिए टीसीपी / आईपी सॉकेट्स का उपयोग करते हुए एक कार्यकर्ता के रूप में एक प्रक्रिया को शुरू करने का प्रभाव पड़ता है। cookie एक cluster_cookie

source

Distributed.start_worker .start_worker फ़ंक्शन

start_worker([out::IO=stdout], cookie::AbstractString=readline(stdin))

start_worker एक आंतरिक फ़ंक्शन है जो टीसीपी / आईपी के माध्यम से कनेक्ट होने वाली कार्यकर्ता प्रक्रियाओं के लिए डिफ़ॉल्ट प्रवेश बिंदु है। यह प्रक्रिया जूलिया क्लस्टर कार्यकर्ता के रूप में स्थापित करती है।

होस्ट: पोर्ट जानकारी को स्ट्रीम करने के लिए लिखा गया out (stdout में चूक)।

फ़ंक्शन स्टडिन को बंद कर देता है (यदि आवश्यक हो तो कुकी को पढ़ने के बाद), स्टडआउट के लिए stderr को पुनर्निर्देशित करता है, एक नि: शुल्क पोर्ट पर सुनता है (या यदि निर्दिष्ट किया गया है, तो --bind-to कमांड लाइन विकल्प में पोर्ट) और आने वाले टीसीपी कनेक्शन को संसाधित करने के लिए शेड्यूल कार्य अनुरोध।

यह वापस नहीं आता है।

source

Distributed.process_messages

process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)

कस्टम ट्रांसपोर्ट का उपयोग करके क्लस्टर प्रबंधकों द्वारा कॉल किया जाता है। यह कहा जाना चाहिए जब कस्टम ट्रांसपोर्ट कार्यान्वयन एक दूरस्थ कार्यकर्ता से पहला संदेश प्राप्त करता है। कस्टम परिवहन को दूरस्थ कार्यकर्ता के लिए एक तार्किक कनेक्शन का प्रबंधन करना चाहिए और दो IO ऑब्जेक्ट प्रदान करना चाहिए, एक आने वाले संदेशों के लिए और दूसरा दूरदराज के कार्यकर्ता को संबोधित संदेशों के लिए। यदि incoming true , तो दूरस्थ सहकर्मी ने कनेक्शन शुरू किया। जो भी जोड़ी कनेक्शन की पहल करती है, वह ऑथेंटिकेशन हैंडशेक करने के लिए क्लस्टर कुकी और उसके जूलिया वर्जन नंबर को भेजती है।

cluster_cookie भी देखें।

source