node.js नोडजे में लिखने योग्य एक से पाइप पठनीय स्ट्रीम को रोकने के लिए एक सही तरीका क्या है?



stream backpressure (1)

मैं एक मॉड्यूल लिख रहा हूं, जो एक लेखन योग्य धारा है। मैं अपने उपयोगकर्ताओं के लिए पाइप इंटरफ़ेस को कार्यान्वित करना चाहता हूं।

अगर कुछ त्रुटि होती है, तो मुझे पठनीय धारा को रोकने और त्रुटि घटना का उत्सर्जन करने की आवश्यकता है। फिर, उपयोगकर्ता तय करेगा - अगर वह त्रुटि के साथ ठीक है, तो वह डाटा प्रोसेसिंग को फिर से शुरू करने में सक्षम होना चाहिए।

var writeable = new BackPressureStream();
writeable.on('error', function(error){
    console.log(error);
    writeable.resume();
});

var readable = require('fs').createReadStream('somefile.txt');
readable.pipe.(writeable);

मुझे लगता है कि नोड हमें readable.pause() विधि प्रदान करता है, जिसका इस्तेमाल पठनीय धारा को रोकने के लिए किया जा सकता है। लेकिन मुझे ये नहीं मिल सकता कि मैं इसे अपने लेखन योग्य धारा मॉड्यूल से कैसे कह सकता हूं:

var Writable = require('stream').Writable;

function BackPressureStream(options) {
    Writable.call(this, options);
}
require('util').inherits(BackPressureStream, Writable);

BackPressureStream.prototype._write = function(chunk, encoding, done) {
    done();
};

BackPressureStream.prototype.resume = function() {
    this.emit('drain');
}

लिखने योग्य स्ट्रीम में कैसे दबाव लागू किया जा सकता है?

पी.एस. pipe/unpipe इवेंट्स का उपयोग करना संभव है, जो एक पैरामीटर के रूप में पठनीय स्ट्रीम प्रदान करते हैं। लेकिन यह भी कहा गया है कि पाइप किए गए धाराओं के लिए, विराम के लिए एकमात्र मौका लेखन योग्य पठनीय धारा से है।

क्या मुझे यह सही मिला? जब तक उपयोगकर्ता कॉल फिर से शुरू नहीं हो जाए तो मुझे अपने लिखने योग्य स्ट्रीम को अनप्इप करना होगा? और उपयोगकर्ता कॉल फिर से शुरू होने के बाद, मुझे पठ पठनीय धारा वापस करना चाहिए?


असल में, जैसा कि मैं समझता हूं, आप किसी त्रुटि घटना के मामले में स्ट्रीम पर बैकप्रेस दबाव डाल रहे हैं। आपके पास विकल्पों की एक जोड़ी है।

सबसे पहले, जैसा कि आपने पहले से ही पहचान लिया है, पठन स्ट्रीम के एक उदाहरण को पकड़ने के लिए pipe का उपयोग करें और कुछ फैंसी फुटवर्क करें।

एक और विकल्प एक रैपिंग लेखन योग्य स्ट्रीम बनाने के लिए है जो इस कार्यक्षमता प्रदान करता है (यानी यह एक इनपुट के रूप में एक WritableStream लेता है, और स्ट्रीम फ़ंक्शंस कार्यान्वित करते समय, डेटा को आपूर्ति की गई धारा के साथ पास करता है

मूल रूप से आप की तरह कुछ के साथ अंत

source stream -> wrapping writable -> writable

https://nodejs.org/api/stream.html#stream_implementing_a_writable_stream एक लेखन योग्य स्ट्रीम को लागू करने के साथ सौदा करता है

आपके लिए कुंजी यह है कि यदि एक अंतर्निहित लिखने योग्य में कोई त्रुटि आती है, तो आप स्ट्रीम पर एक ध्वज सेट करेंगे, और आने वाली write लिए अगली कॉल, आप हिस्सा बफर करेंगे, कॉलबैक को स्टोर करेंगे और केवल कॉल करें कुछ इस तरह

// ...
constructor(wrappedWritableStream) {
    wrappedWritableStream.on('error', this.errorHandler);
    this.wrappedWritableStream = wrappedWritableStream;
}
// ...
write(chunk, encoding, callback) {
    if (this.hadError) {
        // Note: until callback is called, this function won't be called again, so we will have maximum one stored
        //  chunk.
        this.bufferedChunk = [chunk, encoding, callback];
    } else {
        wrappedWritableStream.write(chunk, encoding, callback);
    }
}
// ...
errorHandler(err) {
    console.error(err);
    this.hadError = err;
    this.emit(err);
}
// ...
recoverFromError() {
    if (this.bufferedChunk) {
        wrappedWritableStream.write(...this.bufferedChunk);
        this.bufferedChunk = undefined;
    }
    this.hadError = false;
}

नोट: आपको write कार्य को लागू करने की आवश्यकता है, लेकिन मैं आपको चारों ओर खिसकने और अन्य कार्यान्वयन कार्यों के साथ खेलने के लिए प्रोत्साहित करता हूं।

यह भी ध्यान देने योग्य है कि आपके पास कुछ समस्याएं हैं जो एक ऐसी घटनाओं को लिखती हैं जो एक त्रुटि घटना को उत्सर्जित कर चुके हैं, लेकिन मैं इसे हल करने के लिए एक अलग समस्या के रूप में छोड़ दूँगा।

Https://www.nodejs.org/en/docs/guides/backpressuring-in-streams/ बैकअप के लिए यहां एक और अच्छा संसाधन है