process - मैं रस्ट में अवरुद्ध किए बिना एक बच्चे की प्रक्रिया का आउटपुट कैसे पढ़ सकता हूं?




io rust (2)

मैं रस्ट में एक छोटी सी ncurses एप्लिकेशन बना रहा हूं जिसे एक बच्चे की प्रक्रिया के साथ संवाद करने की आवश्यकता है। मेरे पास पहले से ही कॉमन लिस्प में लिखा हुआ एक प्रोटोटाइप है। मैं इसे फिर से लिखने की कोशिश कर रहा हूं क्योंकि सीएल इस तरह के एक छोटे उपकरण के लिए स्मृति की एक बड़ी मात्रा का उपयोग करता है।

मुझे कुछ परेशानी हो रही है कि कैसे उप-प्रक्रिया के साथ बातचीत करें।

वर्तमान में मैं जो कर रहा हूं वह लगभग यही है:

  1. प्रक्रिया बनाएँ:

    let mut program = match Command::new(command)
        .args(arguments)
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .spawn()
    {
        Ok(child) => child,
        Err(_) => {
            println!("Cannot run program '{}'.", command);
            return;
        }
    };
  2. इसे अनंत तक पास करें (जब तक कि उपयोगकर्ता बाहर न निकल जाए) लूप, जो इनपुट को पढ़ता है और संभालता है और इस तरह आउटपुट के लिए सुनता है (और इसे स्क्रीन पर लिखता है):

    fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {
        match program.stdout {
            Some(ref mut out) => {
                let mut buf_string = String::new();
                match out.read_to_string(&mut buf_string) {
                    Ok(_) => output_viewer.append_string(buf_string),
                    Err(_) => return,
                };
            }
            None => return,
        };
    }

जब तक प्रक्रिया बाहर नहीं निकल जाती, तब तक read_to_string का कॉल प्रोग्राम को ब्लॉक कर देता है। मैं जो कुछ भी read सकता हूं, उसे read_to_end और read भी ब्लॉक होने लगता है। अगर मैं ls जैसी कोई चीज़ चलाने की कोशिश करता हूं, जो तुरंत बाहर निकल जाता है, तो यह काम करता है, लेकिन कुछ चीज़ों के साथ जो python या sbcl तरह बाहर नहीं निकलता है, यह केवल तब भी जारी रहता है जब मैं मैन्युअल रूप से sbcl को मार देता हूं।

इस उत्तर के आधार पर, मैंने BufReader का उपयोग करने के लिए कोड बदल दिया:

    fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {
        match program.stdout.as_mut() {
            Some(out) => {
                let buf_reader = BufReader::new(out);
                for line in buf_reader.lines() {
                    match line {
                        Ok(l) => {
                            output_viewer.append_string(l);
                        }
                        Err(_) => return,
                    };
                }
            }
            None => return,
        }
    }

हालाँकि, समस्या अभी भी वही है। यह उपलब्ध सभी पंक्तियों को पढ़ेगा, और फिर ब्लॉक करेगा। चूंकि टूल किसी भी प्रोग्राम के साथ काम करने वाला है, इसलिए यह अनुमान लगाने का कोई तरीका नहीं है कि आउटपुट कब समाप्त होगा, पढ़ने की कोशिश करने से पहले। BufReader लिए कोई टाइमआउट सेट करने का कोई तरीका प्रतीत नहीं होता है।


टोकियो-प्रक्रिया

यहाँ tokio और tokio-process का उपयोग करने का एक उदाहरण tokio-process

use std::{
    io::BufReader,
    process::{Command, Stdio},
};
use tokio::{io, prelude::*, runtime::Runtime}; // 0.1.18
use tokio_process::CommandExt; // 0.2.3

fn main() {
    let mut cmd = Command::new("/tmp/slow.bash")
        .stdout(Stdio::piped())
        .spawn_async()
        .expect("cannot spawn");
    let stdout = cmd.stdout().take().expect("no stdout");

    let mut runtime = Runtime::new().expect("Unable to start the runtime");

    let result = runtime.block_on({
        io::lines(BufReader::new(stdout))
            .inspect(|s| println!("> {}", s))
            .collect()
    });

    println!("All the lines: {:?}", result);
}

टोकियो-ThreadPool

यहाँ tokio और tokio-threadpool का उपयोग करने का एक उदाहरण है। हम blocking फ़ंक्शन का उपयोग करके थ्रेड में प्रक्रिया शुरू करते हैं। हम उस धारा को stream::poll_fn बदल देते हैं stream::poll_fn

use std::process::{Command, Stdio};
use tokio::{prelude::*, runtime::Runtime}; // 0.1.18
use tokio_threadpool; // 0.1.13

fn stream_command_output(
    mut command: Command,
) -> impl Stream<Item = Vec<u8>, Error = tokio_threadpool::BlockingError> {
    // Ensure that the output is available to read from and start the process
    let mut child = command
        .stdout(Stdio::piped())
        .spawn()
        .expect("cannot spawn");
    let mut stdout = child.stdout.take().expect("no stdout");

    // Create a stream of data
    stream::poll_fn(move || {
        // Perform blocking IO
        tokio_threadpool::blocking(|| {
            // Allocate some space to store anything read
            let mut data = vec![0; 128];
            // Read 1-128 bytes of data
            let n_bytes_read = stdout.read(&mut data).expect("cannot read");

            if n_bytes_read == 0 {
                // Stdout is done
                None
            } else {
                // Only return as many bytes as we read
                data.truncate(n_bytes_read);
                Some(data)
            }
        })
    })
}

fn main() {
    let output_stream = stream_command_output(Command::new("/tmp/slow.bash"));

    let mut runtime = Runtime::new().expect("Unable to start the runtime");

    let result = runtime.block_on({
        output_stream
            .map(|d| String::from_utf8(d).expect("Not UTF-8"))
            .fold(Vec::new(), |mut v, s| {
                print!("> {}", s);
                v.push(s);
                Ok(v)
            })
    });

    println!("All the lines: {:?}", result);
}

यहां कई संभावित ट्रेडऑफ हैं जो यहां बनाए जा सकते हैं। उदाहरण के लिए, हमेशा 128 बाइट्स आवंटित करना आदर्श नहीं है, लेकिन इसे लागू करना सरल है।

समर्थन

संदर्भ के लिए, यहाँ धीमा है

#!/usr/bin/env bash

set -eu

val=0

while [[ $val -lt 10 ]]; do
    echo $val
    val=$(($val + 1))
    sleep 1
done

यह भी देखें:

  • मैं स्थिर रूप से स्थिर जंग में एक अतुल्यकालिक भविष्य में गणना किए गए मूल्य को कैसे वापस करूं?

डिफ़ॉल्ट रूप से धाराएँ अवरुद्ध हो रही हैं। टीसीपी / आईपी स्ट्रीम, फाइल सिस्टम स्ट्रीम, पाइप स्ट्रीम, वे सभी अवरुद्ध हैं। जब आप एक बाइट आपको बाइट्स का एक हिस्सा देने के लिए कहते हैं तो यह रुक जाता है और तब तक इंतजार करता है जब तक कि बाइट्स के दिए हुए एमाउट या कुछ और होने तक (एक interrupt , स्ट्रीम का अंत, एक त्रुटि)।

ऑपरेटिंग सिस्टम डेटा को पढ़ने की प्रक्रिया में वापस करने के लिए उत्सुक हैं, इसलिए यदि आप चाहते हैं कि अगली लाइन का इंतजार करें और जैसे ही यह आता है तो इसे संभाल लें, यह विधि Shepmaster द्वारा सुझाए गए तरीके से या बच्चे के जन्म से पाइप करने में असमर्थ है एक से अधिक बार काम करने की प्रक्रिया । (सिद्धांत में यह नहीं है, क्योंकि एक ऑपरेटिंग सिस्टम को BufReader को read में अधिक डेटा की प्रतीक्षा करने की अनुमति है, लेकिन व्यवहार में ऑपरेटिंग सिस्टम प्रारंभिक "शॉर्ट रीड" को प्रतीक्षा करना पसंद करते हैं)।

यह सरल BufReader आधारित दृष्टिकोण तब काम करना बंद कर देता है जब आपको कई धाराओं (जैसे कि एक बच्चे की प्रक्रिया के stdout और stderr ) या कई प्रक्रियाओं को संभालने की आवश्यकता होती है। उदाहरण के लिए, BufReader दृष्टिकोण गतिरोध हो सकता है जब एक बच्चे की प्रक्रिया आपके BufReader को खत्म करने के लिए इंतजार करती है, जबकि आपकी प्रक्रिया को अवरुद्ध होने पर इंतजार करना बंद हो जाता है।

इसी तरह, आप BufReader उपयोग नहीं कर सकते हैं जब आप अपने कार्यक्रम को अनिश्चित काल तक बच्चे की प्रक्रिया पर इंतजार नहीं करना चाहते हैं। हो सकता है कि आप एक प्रगति बार या एक टाइमर प्रदर्शित करना चाहते हैं जबकि बच्चा अभी भी काम कर रहा है और आपको कोई आउटपुट नहीं देता है।

यदि आपका ऑपरेटिंग सिस्टम प्रक्रिया को डेटा वापस करने के लिए उत्सुक नहीं होता है तो आप BufReader आधारित दृष्टिकोण का उपयोग नहीं कर सकते हैं (क्योंकि "पूरा पढ़ता है" "शॉर्ट रीड्स" के लिए) क्योंकि उस मामले में कुछ अंतिम लाइनें चाइल्ड प्रोसेस द्वारा प्रिंट की जाती हैं एक ग्रे ज़ोन में समाप्त हो सकता है: ऑपरेटिंग सिस्टम उन्हें मिल गया, लेकिन वे BufReader के बफर को भरने के लिए पर्याप्त बड़े नहीं हैं।

BufReader सीमित है जो Read इंटरफ़ेस इसे स्ट्रीम के साथ करने की अनुमति देता है, यह अंतर्निहित स्ट्रीम की तुलना में कम अवरुद्ध नहीं है। कुशल होने के लिए यह चंक्स में इनपुट को read , ऑपरेटिंग सिस्टम को बताएगा कि उसके बफर में उतना ही भरना है जितना कि उपलब्ध है।

आप सोच रहे होंगे कि BufReader में डेटा पढ़ना इतना महत्वपूर्ण क्यों है, तो BufReader सिर्फ बाइट डेटा बाइट क्यों नहीं पढ़ सकता है। समस्या यह है कि एक स्ट्रीम से डेटा को पढ़ने के लिए हमें ऑपरेटिंग सिस्टम की सहायता की आवश्यकता है। दूसरी ओर, हम ऑपरेटिंग सिस्टम नहीं हैं, हम इससे अलग-थलग काम करते हैं, इसलिए यदि हमारी प्रक्रिया में कुछ गलत हो जाता है, तो इसके साथ गड़बड़ न करें। इसलिए ऑपरेटिंग सिस्टम को कॉल करने के लिए "कर्नेल मोड" में एक संक्रमण होना चाहिए जो "संदर्भ स्विच" भी हो सकता है। यही कारण है कि ऑपरेटिंग सिस्टम को हर एक बाइट को पढ़ने के लिए कॉल करना महंगा है। हम यथासंभव कम ओएस कॉल चाहते हैं और इसलिए हमें बैचों में स्ट्रीम डेटा मिलता है।

ब्लॉक किए बिना स्ट्रीम पर प्रतीक्षा करने के लिए आपको एक गैर-ब्लॉकिंग स्ट्रीम की आवश्यकता होगी। MIO पाइपों के लिए आवश्यक नॉन-ब्लॉकिंग स्ट्रीम सपोर्ट रखने का वादा करता है , जो संभवत: PipeReader साथ है, लेकिन मैंने अब तक इसकी जांच नहीं की है।

एक धारा की गैर-अवरुद्ध प्रकृति को चंक्स में डेटा पढ़ना संभव बनाना चाहिए, भले ही ऑपरेटिंग सिस्टम "शॉर्ट रीड्स" पसंद करे या नहीं। क्योंकि नॉन-ब्लॉकिंग स्ट्रीम कभी ब्लॉक नहीं होती है। यदि स्ट्रीम में कोई डेटा नहीं है तो यह आपको बस इतना बताता है।

गैर-अवरुद्ध स्ट्रीम की अनुपस्थिति में, आपको स्प्रेडिंग थ्रेड्स का सहारा लेना होगा, ताकि ब्लॉकिंग रीड्स को एक अलग थ्रेड में प्रदर्शित किया जा सके और इस तरह से आपका प्राथमिक धागा ब्लॉक नहीं होगा। ऑपरेटिंग सिस्टम "शॉर्ट रीड्स" को पसंद नहीं करने की स्थिति में तुरंत लाइन सेपरेटर पर प्रतिक्रिया करने के लिए आप बाइट द्वारा स्ट्रीम बाइट भी पढ़ना चाह सकते हैं। यहाँ एक कार्यशील उदाहरण दिया गया है: https://gist.github.com/ArtemGr/db40ae04b431a95f2b78