摘要:如何創(chuàng)建并使用。正如我們所預料到的那樣,使用來進行大文件的讀取顯然是錯誤的。使用進行壓縮文件我們必須修復我們的應(yīng)用程序,并使其處理大文件的最簡單方法是使用的。確切地說,由返回的流。
本系列文章為《Node.js Design Patterns Second Edition》的原文翻譯和讀書筆記,在GitHub連載更新,同步翻譯版鏈接。
歡迎關(guān)注我的專欄,之后的博文將在專欄同步:
Encounter的掘金專欄
知乎專欄 Encounter的編程思考
segmentfault專欄 前端小站
Coding with StreamsStreams是Node.js最重要的組件和模式之一。 社區(qū)中有一句格言“Stream all the things(Steam就是所有的)”,僅此一點就足以描述流在Node.js中的地位。 Dominic Tarr作為Node.js社區(qū)的最大貢獻者,它將流定義為Node.js最好,也是最難以理解的概念。
使Node.js的Streams如此吸引人還有其它原因; 此外,Streams不僅與性能或效率等技術(shù)特性有關(guān),更重要的是它們的優(yōu)雅性以及它們與Node.js的設(shè)計理念完美契合的方式。
在本章中,將會學到以下內(nèi)容:
Streams對于Node.js的重要性。
如何創(chuàng)建并使用Streams。
Streams作為編程范式,不只是對于I/O而言,在多種應(yīng)用場景下它的應(yīng)用和強大的功能。
管道模式和在不同的配置中連接Streams。
發(fā)現(xiàn)Streams的重要性在基于事件的平臺(如Node.js)中,處理I / O的最有效的方法是實時處理,一旦有輸入的信息,立馬進行處理,一旦有需要輸出的結(jié)果,也立馬輸出反饋。
在本節(jié)中,我們將首先介紹Node.js的Streams和它的優(yōu)點。 請記住,這只是一個概述,因為本章后面將會詳細介紹如何使用和組合Streams。
Streams和Buffer的比較我們在本書中幾乎所有看到過的異步API都是使用的Buffer模式。 對于輸入操作,Buffer模式會將來自資源的所有數(shù)據(jù)收集到Buffer區(qū)中; 一旦讀取完整個資源,就會把結(jié)果傳遞給回調(diào)函數(shù)。 下圖顯示了這個范例的一個真實的例子:
從上圖我們可以看到,在t1時刻,一些數(shù)據(jù)從資源接收并保存到緩沖區(qū)。 在t2時刻,最后一段數(shù)據(jù)被接收到另一個數(shù)據(jù)塊,完成讀取操作,這時,把整個緩沖區(qū)的內(nèi)容發(fā)送給消費者。
另一方面,Streams允許你在數(shù)據(jù)到達時立即處理數(shù)據(jù)。 如下圖所示:
這一張圖顯示了Streams如何從資源接收每個新的數(shù)據(jù)塊,并立即提供給消費者,消費者現(xiàn)在不必等待緩沖區(qū)中收集所有數(shù)據(jù)再處理每個數(shù)據(jù)塊。
但是這兩種方法有什么區(qū)別呢? 我們可以將它們概括為兩點:
空間效率
時間效率
此外,Node.js的Streams具有另一個重要的優(yōu)點:可組合性(composability)。 現(xiàn)在讓我們看看這些屬性對我們設(shè)計和編寫應(yīng)用程序的方式會產(chǎn)生什么影響。
空間效率首先,Streams允許我們做一些看起來不可能的事情,通過緩沖數(shù)據(jù)并一次性處理。 例如,考慮一下我們必須讀取一個非常大的文件,比如說數(shù)百MB甚至千MB。 顯然,等待完全讀取文件時返回大Buffer的API不是一個好主意。 想象一下,如果并發(fā)讀取一些大文件, 我們的應(yīng)用程序很容易耗盡內(nèi)存。 除此之外,V8中的Buffer不能大于0x3FFFFFFF字節(jié)(小于1GB)。 所以,在耗盡物理內(nèi)存之前,我們可能會碰壁。
使用Buffered的API進行壓縮文件舉一個具體的例子,讓我們考慮一個簡單的命令行接口(CLI)的應(yīng)用程序,它使用Gzip格式壓縮文件。 使用Buffered的API,這樣的應(yīng)用程序在Node.js中大概這么編寫(為簡潔起見,省略了異常處理):
const fs = require("fs"); const zlib = require("zlib"); const file = process.argv[2]; fs.readFile(file, (err, buffer) => { zlib.gzip(buffer, (err, buffer) => { fs.writeFile(file + ".gz", buffer, err => { console.log("File successfully compressed"); }); }); });
現(xiàn)在,我們可以嘗試將前面的代碼放在一個叫做gzip.js的文件中,然后執(zhí)行下面的命令:
node gzip
如果我們選擇一個足夠大的文件,比如說大于1GB的文件,我們會收到一個錯誤信息,說明我們要讀取的文件大于最大允許的緩沖區(qū)大小,如下所示:
RangeError: File size is greater than possible Buffer:0x3FFFFFFF
上面的例子中,沒找到一個大文件,但確實對于大文件的讀取速率慢了許多。
正如我們所預料到的那樣,使用Buffer來進行大文件的讀取顯然是錯誤的。
使用Streams進行壓縮文件我們必須修復我們的Gzip應(yīng)用程序,并使其處理大文件的最簡單方法是使用Streams的API。 讓我們看看如何實現(xiàn)這一點。 讓我們用下面的代碼替換剛創(chuàng)建的模塊的內(nèi)容:
const fs = require("fs"); const zlib = require("zlib"); const file = process.argv[2]; fs.createReadStream(file) .pipe(zlib.createGzip()) .pipe(fs.createWriteStream(file + ".gz")) .on("finish", () => console.log("File successfully compressed"));
“是嗎?”你可能會問。是的;正如我們所說的,由于Streams的接口和可組合性,因此我們還能寫出這樣的更加簡潔,優(yōu)雅和精煉的代碼。 我們稍后會詳細地看到這一點,但是現(xiàn)在需要認識到的重要一點是,程序可以順暢地運行在任何大小的文件上,理想情況是內(nèi)存利用率不變。 嘗試一下(但考慮壓縮一個大文件可能需要一段時間)。
時間效率現(xiàn)在讓我們考慮一個壓縮文件并將其上傳到遠程HTTP服務(wù)器的應(yīng)用程序的例子,該遠程HTTP服務(wù)器進而將其解壓縮并保存到文件系統(tǒng)中。如果我們的客戶端是使用Buffered的API實現(xiàn)的,那么只有當整個文件被讀取和壓縮時,上傳才會開始。 另一方面,只有在接收到所有數(shù)據(jù)的情況下,解壓縮才會在服務(wù)器上啟動。 實現(xiàn)相同結(jié)果的更好的解決方案涉及使用Streams。 在客戶端機器上,Streams只要從文件系統(tǒng)中讀取就可以壓縮和發(fā)送數(shù)據(jù)塊,而在服務(wù)器上,只要從遠程對端接收到數(shù)據(jù)塊,就可以解壓每個數(shù)據(jù)塊。 我們通過構(gòu)建前面提到的應(yīng)用程序來展示這一點,從服務(wù)器端開始。
我們創(chuàng)建一個叫做gzipReceive.js的模塊,代碼如下:
const http = require("http"); const fs = require("fs"); const zlib = require("zlib"); const server = http.createServer((req, res) => { const filename = req.headers.filename; console.log("File request received: " + filename); req .pipe(zlib.createGunzip()) .pipe(fs.createWriteStream(filename)) .on("finish", () => { res.writeHead(201, { "Content-Type": "text/plain" }); res.end("That"s it "); console.log(`File saved: ${filename}`); }); }); server.listen(3000, () => console.log("Listening"));
服務(wù)器從網(wǎng)絡(luò)接收數(shù)據(jù)塊,將其解壓縮,并在接收到數(shù)據(jù)塊后立即保存,這要歸功于Node.js的Streams。
我們的應(yīng)用程序的客戶端將進入一個名為gzipSend.js的模塊,如下所示:
在前面的代碼中,我們再次使用Streams從文件中讀取數(shù)據(jù),然后在從文件系統(tǒng)中讀取的同時壓縮并發(fā)送每個數(shù)據(jù)塊。
現(xiàn)在,運行這個應(yīng)用程序,我們首先使用以下命令啟動服務(wù)器:
node gzipReceive
然后,我們可以通過指定要發(fā)送的文件和服務(wù)器的地址(例如localhost)來啟動客戶端:
node gzipSendlocalhost
如果我們選擇一個足夠大的文件,我們將更容易地看到數(shù)據(jù)如何從客戶端流向服務(wù)器,但為什么這種模式下,我們使用Streams,比使用Buffered的API更有效率? 下圖應(yīng)該給我們一個提示:
一個文件被處理的過程,它經(jīng)過以下階段:
客戶端從文件系統(tǒng)中讀取
客戶端壓縮數(shù)據(jù)
客戶端將數(shù)據(jù)發(fā)送到服務(wù)器
服務(wù)端接收數(shù)據(jù)
服務(wù)端解壓數(shù)據(jù)
服務(wù)端將數(shù)據(jù)寫入磁盤
為了完成處理,我們必須按照流水線順序那樣經(jīng)過每個階段,直到最后。在上圖中,我們可以看到,使用Buffered的API,這個過程完全是順序的。為了壓縮數(shù)據(jù),我們首先必須等待整個文件被讀取完畢,然后,發(fā)送數(shù)據(jù),我們必須等待整個文件被讀取和壓縮,依此類推。當我們使用Streams時,只要我們收到第一個數(shù)據(jù)塊,流水線就會被啟動,而不需要等待整個文件的讀取。但更令人驚訝的是,當下一塊數(shù)據(jù)可用時,不需要等待上一組任務(wù)完成;相反,另一條裝配線是并行啟動的。因為我們執(zhí)行的每個任務(wù)都是異步的,這樣顯得很完美,所以可以通過Node.js來并行執(zhí)行Streams的相關(guān)操作;唯一的限制就是每個階段都必須保證數(shù)據(jù)塊的到達順序。
從前面的圖可以看出,使用Streams的結(jié)果是整個過程花費的時間更少,因為我們不用等待所有數(shù)據(jù)被全部讀取完畢和處理。
組合性到目前為止,我們已經(jīng)看到的代碼已經(jīng)告訴我們?nèi)绾问褂?b>pipe()方法來組裝Streams的數(shù)據(jù)塊,Streams允許我們連接不同的處理單元,每個處理單元負責單一的職責(這是符合Node.js風格的)。這是可能的,因為Streams具有統(tǒng)一的接口,并且就API而言,不同Streams也可以很好的進行交互。唯一的先決條件是管道的下一個Streams必須支持上一個Streams生成的數(shù)據(jù)類型,可以是二進制,文本甚至是對象,我們將在后面的章節(jié)中看到。
為了證明Streams組合性的優(yōu)勢,我們可以嘗試在我們先前構(gòu)建的gzipReceive / gzipSend應(yīng)用程序中添加加密功能。
為此,我們只需要通過向流水線添加另一個Streams來更新客戶端。 確切地說,由crypto.createChipher()返回的流。 由此產(chǎn)生的代碼應(yīng)如下所示:
const fs = require("fs"); const zlib = require("zlib"); const crypto = require("crypto"); const http = require("http"); const path = require("path"); const file = process.argv[2]; const server = process.argv[3]; const options = { hostname: server, port: 3000, path: "/", method: "PUT", headers: { filename: path.basename(file), "Content-Type": "application/octet-stream", "Content-Encoding": "gzip" } }; const req = http.request(options, res => { console.log("Server response: " + res.statusCode); }); fs.createReadStream(file) .pipe(zlib.createGzip()) .pipe(crypto.createCipher("aes192", "a_shared_secret")) .pipe(req) .on("finish", () => { console.log("File successfully sent"); });
使用相同的方式,我們更新服務(wù)端的代碼,使得它可以在數(shù)據(jù)塊進行解壓之前先解密:
const http = require("http"); const fs = require("fs"); const zlib = require("zlib"); const crypto = require("crypto"); const server = http.createServer((req, res) => { const filename = req.headers.filename; console.log("File request received: " + filename); req .pipe(crypto.createDecipher("aes192", "a_shared_secret")) .pipe(zlib.createGunzip()) .pipe(fs.createWriteStream(filename)) .on("finish", () => { res.writeHead(201, { "Content-Type": "text/plain" }); res.end("That"s it "); console.log(`File saved: ${filename}`); }); }); server.listen(3000, () => console.log("Listening"));
crypto是Node.js的核心模塊之一,提供了一系列加密算法。
只需幾行代碼,我們就在應(yīng)用程序中添加了一個加密層。 我們只需要簡單地通過把已經(jīng)存在的Streams模塊和加密層組合到一起,就可以。類似的,我們可以添加和合并其他Streams,如同在玩樂高積木一樣。
顯然,這種方法的主要優(yōu)點是可重用性,但正如我們從目前為止所介紹的代碼中可以看到的那樣,Streams也可以實現(xiàn)更清晰,更模塊化,更加簡潔的代碼。 出于這些原因,流通常不僅僅用于處理純粹的I / O,而且它還是簡化和模塊化代碼的手段。
開始使用Streams在前面的章節(jié)中,我們了解了為什么Streams如此強大,而且它在Node.js中無處不在,甚至在Node.js的核心模塊中也有其身影。 例如,我們已經(jīng)看到,fs模塊具有用于從文件讀取的createReadStream()和用于寫入文件的createWriteStream(),HTTP請求和響應(yīng)對象本質(zhì)上是Streams,并且zlib模塊允許我們使用Streams式API壓縮和解壓縮數(shù)據(jù)塊。
現(xiàn)在我們知道為什么Streams是如此重要,讓我們退后一步,開始更詳細地探索它。
Streams的結(jié)構(gòu)Node.js中的每個Streams都是Streams核心模塊中可用的四個基本抽象類之一的實現(xiàn):
stream.Readable
stream.Writable
stream.Duplex
stream.Transform
每個stream類也是EventEmitter的一個實例。實際上,Streams可以產(chǎn)生幾種類型的事件,比如end事件會在一個可讀的Streams完成讀取,或者錯誤讀取,或其過程中產(chǎn)生異常時觸發(fā)。
請注意,為簡潔起見,在本章介紹的例子中,我們經(jīng)常會忽略適當?shù)腻e誤處理。但是,在生產(chǎn)環(huán)境下中,總是建議為所有Stream注冊錯誤事件偵聽器。
Streams之所以如此靈活的原因之一是它不僅能夠處理二進制數(shù)據(jù),而且?guī)缀蹩梢蕴幚砣魏?b>JavaScript值。實際上,Streams可以支持兩種操作模式:
二進制模式:以數(shù)據(jù)塊形式(例如buffers或strings)流式傳輸數(shù)據(jù)
對象模式:將流數(shù)據(jù)視為一系列離散對象(這使得我們幾乎可以使用任何JavaScript值)
這兩種操作模式使我們不僅可以使用I / O流,而且還可以作為一種工具,以函數(shù)式的風格優(yōu)雅地組合處理單元,我們將在本章后面看到。
在本章中,我們將主要使用在Node.js 0.11中引入的Node.js流接口,也稱為版本3。 有關(guān)與舊接口差異的更多詳細信息,請參閱StrongLoop在https://strongloop.com/strong...。可讀的Streams
一個可讀的Streams表示一個數(shù)據(jù)源,在Node.js中,它使用stream模塊中的Readableabstract類實現(xiàn)。
從Streams中讀取信息從可讀Streams接收數(shù)據(jù)有兩種方式:non-flowing模式和flowing模式。 我們來更詳細地分析這些模式。
從可讀的Streams中讀取數(shù)據(jù)的默認模式是為其附加一個可讀事件偵聽器,用于指示要讀取的新數(shù)據(jù)的可用性。然后,在一個循環(huán)中,我們讀取所有的數(shù)據(jù),直到內(nèi)部buffer被清空。這可以使用read()方法完成,該方法同步從內(nèi)部緩沖區(qū)中讀取數(shù)據(jù),并返回表示數(shù)據(jù)塊的Buffer或String對象。read()方法以如下使用模式:
readable.read([size]);
使用這種方法,數(shù)據(jù)隨時可以直接從Streams中按需提取。
為了說明這是如何工作的,我們創(chuàng)建一個名為readStdin.js的新模塊,它實現(xiàn)了一個簡單的程序,它從標準輸入(一個可讀流)中讀取數(shù)據(jù),并將所有數(shù)據(jù)回送到標準輸出:
process.stdin .on("readable", () => { let chunk; console.log("New data available"); while ((chunk = process.stdin.read()) !== null) { console.log( `Chunk read: (${chunk.length}) "${chunk.toString()}"` ); } }) .on("end", () => process.stdout.write("End of stream"));
read()方法是一個同步操作,它從可讀Streams的內(nèi)部Buffers區(qū)中提取數(shù)據(jù)塊。如果Streams在二進制模式下工作,返回的數(shù)據(jù)塊默認為一個Buffer對象。
在以二進制模式工作的可讀的Stream中,我們可以通過在Stream上調(diào)用setEncoding(encoding)來讀取字符串而不是Buffer對象,并提供有效的編碼格式(例如utf8)。
數(shù)據(jù)是從可讀的偵聽器中讀取的,只要有新的數(shù)據(jù),就會調(diào)用這個偵聽器。當內(nèi)部緩沖區(qū)中沒有更多數(shù)據(jù)可用時,read()方法返回null;在這種情況下,我們不得不等待另一個可讀的事件被觸發(fā),告訴我們可以再次讀取或者等待表示Streams讀取過程結(jié)束的end事件觸發(fā)。當一個流以二進制模式工作時,我們也可以通過向read()方法傳遞一個size參數(shù)來指定我們想要讀取的數(shù)據(jù)大小。這在實現(xiàn)網(wǎng)絡(luò)協(xié)議或解析特定數(shù)據(jù)格式時特別有用。
現(xiàn)在,我們準備運行readStdin模塊并進行實驗。讓我們在控制臺中鍵入一些字符,然后按Enter鍵查看回顯到標準輸出中的數(shù)據(jù)。要終止流并因此生成一個正常的結(jié)束事件,我們需要插入一個EOF(文件結(jié)束)字符(在Windows上使用Ctrl + Z或在Linux上使用Ctrl + D)。
我們也可以嘗試將我們的程序與其他程序連接起來;這可以使用管道運算符(|),它將程序的標準輸出重定向到另一個程序的標準輸入。例如,我們可以運行如下命令:
cat| node readStdin
這是流式范例是一個通用接口的一個很好的例子,它使得我們的程序能夠進行通信,而不管它們是用什么語言寫的。
從Streams中讀取的另一種方法是將偵聽器附加到data事件;這會將Streams切換為flowing模式,其中數(shù)據(jù)不是使用read()函數(shù)來提取的,而是一旦有數(shù)據(jù)到達data監(jiān)聽器就被推送到監(jiān)聽器內(nèi)。例如,我們之前創(chuàng)建的readStdin應(yīng)用程序?qū)⑹褂昧鲃幽J剑?/p>
process.stdin .on("data", chunk => { console.log("New data available"); console.log( `Chunk read: (${chunk.length}) "${chunk.toString()}"` ); }) .on("end", () => process.stdout.write("End of stream"));
flowing模式是舊版Streams接口(也稱為Streams1)的繼承,其靈活性較低,API較少。隨著Streams2接口的引入,flowing模式不是默認的工作模式,要啟用它,需要將偵聽器附加到data事件或顯式調(diào)用resume()方法。 要暫時中斷Streams觸發(fā)data事件,我們可以調(diào)用pause()方法,導致任何傳入數(shù)據(jù)緩存在內(nèi)部buffer中。
調(diào)用pause()不會導致Streams切換回non-flowing模式。實現(xiàn)可讀的Streams
現(xiàn)在我們知道如何從Streams中讀取數(shù)據(jù),下一步是學習如何實現(xiàn)一個新的Readable數(shù)據(jù)流。為此,有必要通過繼承stream.Readable的原型來創(chuàng)建一個新的類。 具體流必須提供_read()方法的實現(xiàn):
readable._read(size)
Readable類的內(nèi)部將調(diào)用_read()方法,而該方法又將啟動
使用push()填充內(nèi)部緩沖區(qū):
請注意,read()是Stream消費者調(diào)用的方法,而_read()是一個由Stream子類實現(xiàn)的方法,不能直接調(diào)用。下劃線通常表示該方法為私有方法,不應(yīng)該直接調(diào)用。
為了演示如何實現(xiàn)新的可讀Streams,我們可以嘗試實現(xiàn)一個生成隨機字符串的Streams。 我們來創(chuàng)建一個名為randomStream.js的新模塊,它將包含我們的字符串的generator的代碼:
const stream = require("stream"); const Chance = require("chance"); const chance = new Chance(); class RandomStream extends stream.Readable { constructor(options) { super(options); } _read(size) { const chunk = chance.string(); //[1] console.log(`Pushing chunk of size: ${chunk.length}`); this.push(chunk, "utf8"); //[2] if (chance.bool({ likelihood: 5 })) { //[3] this.push(null); } } } module.exports = RandomStream;
在文件頂部,我們將加載我們的依賴關(guān)系。除了我們正在加載一個chance的npm模塊之外,沒有什么特別之處,它是一個用于生成各種隨機值的庫,從數(shù)字到字符串到整個句子都能生成隨機值。
下一步是創(chuàng)建一個名為RandomStream的新類,并指定stream.Readable作為其父類。 在前面的代碼中,我們調(diào)用父類的構(gòu)造函數(shù)來初始化其內(nèi)部狀態(tài),并將收到的options參數(shù)作為輸入。通過options對象傳遞的可能參數(shù)包括以下內(nèi)容:
用于將Buffers轉(zhuǎn)換為Strings的encoding參數(shù)(默認值為null)
是否啟用對象模式(objectMode默認為false)
存儲在內(nèi)部buffer區(qū)中的數(shù)據(jù)的上限,一旦超過這個上限,則暫停從data source讀取(highWaterMark默認為16KB)
好的,現(xiàn)在讓我們來解釋一下我們重寫的stream.Readable類的_read()方法:
該方法使用chance生成隨機字符串。
它將字符串push內(nèi)部buffer。 請注意,由于我們push的是String,此外我們還指定了編碼為utf8(如果數(shù)據(jù)塊只是一個二進制Buffer,則不需要)。
以5%的概率隨機中斷stream的隨機字符串產(chǎn)生,通過push null到內(nèi)部Buffer來表示EOF,即stream的結(jié)束。
我們還可以看到在_read()函數(shù)的輸入中給出的size參數(shù)被忽略了,因為它是一個建議的參數(shù)。 我們可以簡單地把所有可用的數(shù)據(jù)都push到內(nèi)部的buffer中,但是如果在同一個調(diào)用中有多個推送,那么我們應(yīng)該檢查push()是否返回false,因為這意味著內(nèi)部buffer已經(jīng)達到了highWaterMark限制,我們應(yīng)該停止添加更多的數(shù)據(jù)。
以上就是RandomStream模塊,我們現(xiàn)在準備好使用它。我們來創(chuàng)建一個名為generateRandom.js的新模塊,在這個模塊中我們實例化一個新的RandomStream對象并從中提取一些數(shù)據(jù):
const RandomStream = require("./randomStream"); const randomStream = new RandomStream(); randomStream.on("readable", () => { let chunk; while ((chunk = randomStream.read()) !== null) { console.log(`Chunk received: ${chunk.toString()}`); } });
現(xiàn)在,一切都準備好了,我們嘗試新的自定義的stream。 像往常一樣簡單地執(zhí)行generateRandom模塊,觀察隨機的字符串在屏幕上流動。
可寫的Streams一個可寫的stream表示一個數(shù)據(jù)終點,在Node.js中,它使用stream模塊中的Writable抽象類來實現(xiàn)。
寫入一個stream把一些數(shù)據(jù)放在可寫入的stream中是一件簡單的事情, 我們所要做的就是使用write()方法,它具有以下格式:
writable.write(chunk, [encoding], [callback])
encoding參數(shù)是可選的,其在chunk是String類型時指定(默認為utf8,如果chunk是Buffer,則忽略);當數(shù)據(jù)塊被刷新到底層資源中時,callback就會被調(diào)用,callback參數(shù)也是可選的。
為了表示沒有更多的數(shù)據(jù)將被寫入stream中,我們必須使用end()方法:
writable.end([chunk], [encoding], [callback])
我們可以通過end()方法提供最后一塊數(shù)據(jù)。在這種情況下,callbak函數(shù)相當于為finish事件注冊一個監(jiān)聽器,當數(shù)據(jù)塊全部被寫入stream中時,會觸發(fā)該事件。
現(xiàn)在,讓我們通過創(chuàng)建一個輸出隨機字符串序列的小型HTTP服務(wù)器來演示這是如何工作的:
const Chance = require("chance"); const chance = new Chance(); require("http").createServer((req, res) => { res.writeHead(200, { "Content-Type": "text/plain" }); //[1] while (chance.bool({ likelihood: 95 })) { //[2] res.write(chance.string() + " "); //[3] } res.end(" The end... "); //[4] res.on("finish", () => console.log("All data was sent")); //[5] }).listen(8080, () => console.log("Listening on http://localhost:8080"));
我們創(chuàng)建了一個HTTP服務(wù)器,并把數(shù)據(jù)寫入res對象,res對象是http.ServerResponse的一個實例,也是一個可寫入的stream。下面來解釋上述代碼發(fā)生了什么:
我們首先寫HTTP response的頭部。請注意,writeHead()不是Writable接口的一部分,實際上,這個方法是http.ServerResponse類公開的輔助方法。
我們開始一個5%的概率終止的循環(huán)(進入循環(huán)體的概率為chance.bool()產(chǎn)生,其為95%)。
在循環(huán)內(nèi)部,我們寫入一個隨機字符串到stream。
一旦我們不在循環(huán)中,我們調(diào)用stream的end(),表示沒有更多
數(shù)據(jù)塊將被寫入。另外,我們在結(jié)束之前提供一個最終的字符串寫入流中。
最后,我們注冊一個finish事件的監(jiān)聽器,當所有的數(shù)據(jù)塊都被刷新到底層socket中時,這個事件將被觸發(fā)。
我們可以調(diào)用這個小模塊稱為entropyServer.js,然后執(zhí)行它。要測試這個服務(wù)器,我們可以在地址http:// localhost:8080打開一個瀏覽器,或者從終端使用curl命令,如下所示:
curl localhost:8080
此時,服務(wù)器應(yīng)該開始向您選擇的HTTP客戶端發(fā)送隨機字符串(請注意,某些瀏覽器可能會緩沖數(shù)據(jù),并且流式傳輸行為可能不明顯)。
Back-pressure(反壓)類似于在真實管道系統(tǒng)中流動的液體,Node.js的stream也可能遭受瓶頸,數(shù)據(jù)寫入速度可能快于stream的消耗。 解決這個問題的機制包括緩沖輸入數(shù)據(jù);然而,如果數(shù)據(jù)stream沒有給生產(chǎn)者任何反饋,我們可能會產(chǎn)生越來越多的數(shù)據(jù)被累積到內(nèi)部緩沖區(qū)的情況,導致內(nèi)存泄露的發(fā)生。
為了防止這種情況的發(fā)生,當內(nèi)部buffer超過highWaterMark限制時,writable.write()將返回false。 可寫入的stream具有highWaterMark屬性,這是write()方法開始返回false的內(nèi)部Buffer區(qū)大小的限制,一旦Buffer區(qū)的大小超過這個限制,表示應(yīng)用程序應(yīng)該停止寫入。 當緩沖器被清空時,會觸發(fā)一個叫做drain的事件,通知再次開始寫入是安全的。 這種機制被稱為back-pressure。
本節(jié)介紹的機制同樣適用于可讀的stream。事實上,在可讀stream中也存在back-pressure,并且在_read()內(nèi)調(diào)用的push()方法返回false時觸發(fā)。 但是,這對于stream實現(xiàn)者來說是一個特定的問題,所以我們將不經(jīng)常處理它。
我們可以通過修改之前創(chuàng)建的entropyServer模塊來演示可寫入的stream的back-pressure:
const Chance = require("chance"); const chance = new Chance(); require("http").createServer((req, res) => { res.writeHead(200, { "Content-Type": "text/plain" }); function generateMore() { //[1] while (chance.bool({ likelihood: 95 })) { const shouldContinue = res.write( chance.string({ length: (16 * 1024) - 1 }) //[2] ); if (!shouldContinue) { //[3] console.log("Backpressure"); return res.once("drain", generateMore); } } res.end(" The end... ", () => console.log("All data was sent")); } generateMore(); }).listen(8080, () => console.log("Listening on http://localhost:8080"));
前面代碼中最重要的步驟可以概括如下:
我們將主邏輯封裝在一個名為generateMore()的函數(shù)中。
為了增加獲得一些back-pressure的機會,我們將數(shù)據(jù)塊的大小增加到16KB-1Byte,這非常接近默認的highWaterMark限制。
在寫入一大塊數(shù)據(jù)之后,我們檢查res.write()的返回值。 如果它返回false,這意味著內(nèi)部buffer已滿,我們應(yīng)該停止發(fā)送更多的數(shù)據(jù)。在這種情況下,我們從函數(shù)中退出,然后新注冊一個寫入事件的發(fā)布者,當drain事件觸發(fā)時調(diào)用generateMore。
如果我們現(xiàn)在嘗試再次運行服務(wù)器,然后使用curl生成客戶端請求,則很可能會有一些back-pressure,因為服務(wù)器以非常高的速度生成數(shù)據(jù),速度甚至會比底層socket更快。
實現(xiàn)可寫入的Streams我們可以通過繼承stream.Writable類來實現(xiàn)一個新的可寫入的流,并為_write()方法提供一個實現(xiàn)。實現(xiàn)一個我們自定義的可寫入的Streams類。
讓我們構(gòu)建一個可寫入的stream,它接收對象的格式如下:
{ path:content: }
這個類的作用是這樣的:對于每一個對象,我們的stream必須將content部分保存到在給定路徑中創(chuàng)建的文件中。 我們可以立即看到,我們stream的輸入是對象,而不是Strings或Buffers,這意味著我們的stream必須以對象模式工作。
調(diào)用模塊toFileStream.js:
const stream = require("stream"); const fs = require("fs"); const path = require("path"); const mkdirp = require("mkdirp"); class ToFileStream extends stream.Writable { constructor() { super({ objectMode: true }); } _write(chunk, encoding, callback) { mkdirp(path.dirname(chunk.path), err => { if (err) { return callback(err); } fs.writeFile(chunk.path, chunk.content, callback); }); } } module.exports = ToFileStream;
作為第一步,我們加載所有我們所需要的依賴包。注意,我們需要模塊mkdirp,正如你應(yīng)該從前幾章中所知道的,它應(yīng)該使用npm安裝。
我們創(chuàng)建了一個新類,它從stream.Writable擴展而來。
我們不得不調(diào)用父構(gòu)造函數(shù)來初始化其內(nèi)部狀態(tài);我們還提供了一個option對象作為參數(shù),用于指定流在對象模式下工作(objectMode:true)。stream.Writable接受的其他選項如下:
highWaterMark(默認值是16KB):控制back-pressure的上限。
decodeStrings(默認為true):在字符串傳遞給_write()方法之前,將字符串自動解碼為二進制buffer區(qū)。 在對象模式下這個參數(shù)被忽略。
最后,我們?yōu)?b>_write()方法提供了一個實現(xiàn)。正如你所看到的,這個方法接受一個數(shù)據(jù)塊,一個編碼方式(只有在二進制模式下,stream選項decodeStrings設(shè)置為false時才有意義)。
另外,該方法接受一個回調(diào)函數(shù),該函數(shù)在操作完成時需要調(diào)用;而不必要傳遞操作的結(jié)果,但是如果需要的話,我們?nèi)匀豢梢詡鬟f一個error對象,這將導致stream觸發(fā)error事件。
現(xiàn)在,為了嘗試我們剛剛構(gòu)建的stream,我們可以創(chuàng)建一個名為writeToFile.js的新模塊,并對該流執(zhí)行一些寫操作:
const ToFileStream = require("./toFileStream.js"); const tfs = new ToFileStream(); tfs.write({path: "file1.txt", content: "Hello"}); tfs.write({path: "file2.txt", content: "Node.js"}); tfs.write({path: "file3.txt", content: "Streams"}); tfs.end(() => console.log("All files created"));
有了這個,我們創(chuàng)建并使用了我們的第一個自定義的可寫入流。 像往常一樣運行新模塊來檢查其輸出;你會看到執(zhí)行后會創(chuàng)建三個新文件。
雙重的Streams雙重的stream既是可讀的,也可寫的。 當我們想描述一個既是數(shù)據(jù)源又是數(shù)據(jù)終點的實體時(例如socket),這就顯得十分有用了。 雙工流繼承stream.Readable和stream.Writable的方法,所以它對我們來說并不新鮮。這意味著我們可以read()或write()數(shù)據(jù),或者可以監(jiān)聽readable和drain事件。
要創(chuàng)建一個自定義的雙重stream,我們必須為_read()和_write()提供一個實現(xiàn)。傳遞給Duplex()構(gòu)造函數(shù)的options對象在內(nèi)部被轉(zhuǎn)發(fā)給Readable和Writable的構(gòu)造函數(shù)。options參數(shù)的內(nèi)容與前面討論的相同,options增加了一個名為allowHalfOpen值(默認為true),如果設(shè)置為false,則會導致只要stream的一方(Readable和Writable)結(jié)束,stream就結(jié)束了。
為了使雙重的stream在一方以對象模式工作,而在另一方以二進制模式工作,我們需要在流構(gòu)造器中手動設(shè)置以下屬性:
this._writableState.objectMode this._readableState.objectMode轉(zhuǎn)換的Streams
轉(zhuǎn)換的Streams是專門設(shè)計用于處理數(shù)據(jù)轉(zhuǎn)換的一種特殊類型的雙重Streams。
在一個簡單的雙重Streams中,從stream中讀取的數(shù)據(jù)和寫入到其中的數(shù)據(jù)之間沒有直接的關(guān)系(至少stream是不可知的)。 想想一個TCP socket,它只是向遠程節(jié)點發(fā)送數(shù)據(jù)和從遠程節(jié)點接收數(shù)據(jù)。TCP socket自身沒有意識到輸入和輸出之間有任何關(guān)系。
下圖說明了雙重Streams中的數(shù)據(jù)流:
另一方面,轉(zhuǎn)換的Streams對從可寫入端接收到的每個數(shù)據(jù)塊應(yīng)用某種轉(zhuǎn)換,然后在其可讀端使轉(zhuǎn)換的數(shù)據(jù)可用。
下圖顯示了數(shù)據(jù)如何在轉(zhuǎn)換的Streams中流動:
從外面看,轉(zhuǎn)換的Streams的接口與雙重Streams的接口完全相同。但是,當我們想要構(gòu)建一個新的雙重Streams時,我們必須提供_read()和_write()方法,而為了實現(xiàn)一個新的變換流,我們必須填寫另一對方法:_transform()和_flush())。
我們來演示如何用一個例子來創(chuàng)建一個新的轉(zhuǎn)換的Streams。
實現(xiàn)轉(zhuǎn)換的Streams我們來實現(xiàn)一個轉(zhuǎn)換的Streams,它將替換給定所有出現(xiàn)的字符串。 要做到這一點,我們必須創(chuàng)建一個名為replaceStream.js的新模塊。 讓我們直接看怎么實現(xiàn)它:
const stream = require("stream"); const util = require("util"); class ReplaceStream extends stream.Transform { constructor(searchString, replaceString) { super(); this.searchString = searchString; this.replaceString = replaceString; this.tailPiece = ""; } _transform(chunk, encoding, callback) { const pieces = (this.tailPiece + chunk) //[1] .split(this.searchString); const lastPiece = pieces[pieces.length - 1]; const tailPieceLen = this.searchString.length - 1; this.tailPiece = lastPiece.slice(-tailPieceLen); //[2] pieces[pieces.length - 1] = lastPiece.slice(0,-tailPieceLen); this.push(pieces.join(this.replaceString)); //[3] callback(); } _flush(callback) { this.push(this.tailPiece); callback(); } } module.exports = ReplaceStream;
與往常一樣,我們將從其依賴項開始構(gòu)建模塊。這次我們沒有使用第三方模塊。
然后我們創(chuàng)建了一個從stream.Transform基類繼承的新類。該類的構(gòu)造函數(shù)接受兩個參數(shù):searchString和replaceString。 正如你所想象的那樣,它們允許我們定義要匹配的文本以及用作替換的字符串。 我們還初始化一個將由_transform()方法使用的tailPiece內(nèi)部變量。
現(xiàn)在,我們來分析一下_transform()方法,它是我們新類的核心。_transform()方法與可寫入的stream的_write()方法具有幾乎相同的格式,但不是將數(shù)據(jù)寫入底層資源,而是使用this.push()將其推入內(nèi)部buffer,這與我們會在可讀流的_read()方法中執(zhí)行。這顯示了轉(zhuǎn)換的Streams的雙方如何實際連接。
ReplaceStream的_transform()方法實現(xiàn)了我們這個新類的核心。正常情況下,搜索和替換buffer區(qū)中的字符串是一件容易的事情;但是,當數(shù)據(jù)流式傳輸時,情況則完全不同,可能的匹配可能分布在多個數(shù)據(jù)塊中。代碼后面的程序可以解釋如下:
我們的算法使用searchString函數(shù)作為分隔符來分割塊。
然后,它取出分隔后生成的數(shù)組的最后一項lastPiece,并提取其最后一個字符searchString.length - 1。結(jié)果被保存到tailPiece變量中,它將會被作為下一個數(shù)據(jù)塊的前綴。
最后,所有從split()得到的片段用replaceString作為分隔符連接在一起,并推入內(nèi)部buffer區(qū)。
當stream結(jié)束時,我們可能仍然有最后一個tailPiece變量沒有被壓入內(nèi)部緩沖區(qū)。這正是_flush()方法的用途;它在stream結(jié)束之前被調(diào)用,并且這是我們最終有機會完成流或者在完全結(jié)束流之前推送任何剩余數(shù)據(jù)的地方。
_flush()方法只需要一個回調(diào)函數(shù)作為參數(shù),當所有的操作完成后,我們必須確保調(diào)用這個回調(diào)函數(shù)。完成了這個,我們已經(jīng)完成了我們的ReplaceStream類。
現(xiàn)在,是時候嘗試新的stream。我們可以創(chuàng)建另一個名為replaceStreamTest.js的模塊來寫入一些數(shù)據(jù),然后讀取轉(zhuǎn)換的結(jié)果:
const ReplaceStream = require("./replaceStream"); const rs = new ReplaceStream("World", "Node.js"); rs.on("data", chunk => console.log(chunk.toString())); rs.write("Hello W"); rs.write("orld!"); rs.end();
為了使得這個例子更復雜一些,我們把搜索詞分布在兩個不同的數(shù)據(jù)塊上;然后,使用flowing模式,我們從同一個stream中讀取數(shù)據(jù),記錄每個已轉(zhuǎn)換的塊。運行前面的程序應(yīng)該產(chǎn)生以下輸出:
Hel lo Node.js !
有一個值得提及是,第五種類型的stream:stream.PassThrough。 與我們介紹的其他流類不同,PassThrough不是抽象的,可以直接實例化,而不需要實現(xiàn)任何方法。實際上,這是一個可轉(zhuǎn)換的stream,它可以輸出每個數(shù)據(jù)塊,而不需要進行任何轉(zhuǎn)換。使用管道連接Streams
Unix管道的概念是由Douglas Mcllroy發(fā)明的;這使程序的輸出能夠連接到下一個的輸入。看看下面的命令:
echo Hello World! | sed s/World/Node.js/g
在前面的命令中,echo會將Hello World!寫入標準輸出,然后被重定向到sed命令的標準輸入(因為有管道操作符 |)。 然后sed用Node.js替換任何World,并將結(jié)果打印到它的標準輸出(這次是控制臺)。
以類似的方式,可以使用可讀的Streams的pipe()方法將Node.js的Streams連接在一起,它具有以下接口:
readable.pipe(writable, [options])
非常直觀地,pipe()方法將從可讀的Streams中發(fā)出的數(shù)據(jù)抽取到所提供的可寫入的Streams中。 另外,當可讀的Streams發(fā)出end事件(除非我們指定{end:false}作為options)時,可寫入的Streams將自動結(jié)束。 pipe()方法返回作為參數(shù)傳遞的可寫入的Streams,如果這樣的stream也是可讀的(例如雙重或可轉(zhuǎn)換的Streams),則允許我們創(chuàng)建鏈式調(diào)用。
將兩個Streams連接到一起時,則允許數(shù)據(jù)自動流向可寫入的Streams,所以不需要調(diào)用read()或write()方法;但最重要的是不需要控制back-pressure,因為它會自動處理。
舉個簡單的例子(將會有大量的例子),我們可以創(chuàng)建一個名為replace.js的新模塊,它接受來自標準輸入的文本流,應(yīng)用替換轉(zhuǎn)換,然后將數(shù)據(jù)返回到標準輸出:
const ReplaceStream = require("./replaceStream"); process.stdin .pipe(new ReplaceStream(process.argv[2], process.argv[3])) .pipe(process.stdout);
上述程序?qū)碜詷藴瘦斎氲臄?shù)據(jù)傳送到ReplaceStream,然后返回到標準輸出。 現(xiàn)在,為了實踐這個小應(yīng)用程序,我們可以利用Unix管道將一些數(shù)據(jù)重定向到它的標準輸入,如下所示:
echo Hello World! | node replace World Node.js
運行上述程序,會輸出如下結(jié)果:
Hello Node.js
這個簡單的例子演示了Streams(特別是文本Streams)是一個通用接口,管道幾乎是構(gòu)成和連接所有這些接口的通用方式。
error事件不會通過管道自動傳播。舉個例子,看如下代碼片段:
stream1 .pipe(stream2) .on("error", function() {});
在前面的鏈式調(diào)用中,我們將只捕獲來自stream2的錯誤,這是由于我們給其添加了erorr事件偵聽器。這意味著,如果我們想捕獲從stream1生成的任何錯誤,我們必須直接附加另一個錯誤偵聽器。 稍后我們將看到一種可以實現(xiàn)共同錯誤捕獲的另一種模式(合并Streams)。 此外,我們應(yīng)該注意到,如果目標Streams(讀取的Streams)發(fā)出錯誤,它將會對源Streams通知一個error,之后導致管道的中斷。Streams如何通過管道
到目前為止,我們創(chuàng)建自定義Streams的方式并不完全遵循Node定義的模式;實際上,從stream基類繼承是違反small surface area的,并需要一些示例代碼。 這并不意味著Streams設(shè)計得不好,實際上,我們不應(yīng)該忘記,因為Streams是Node.js核心的一部分,所以它們必須盡可能地靈活,廣泛拓展Streams以致于用戶級模塊能夠?qū)⑺鼈兂浞诌\用。
然而,大多數(shù)情況下,我們并不需要原型繼承可以給予的所有權(quán)力和可擴展性,但通常我們想要的僅僅是定義新Streams的一種快速開發(fā)的模式。Node.js社區(qū)當然也為此創(chuàng)建了一個解決方案。 一個完美的例子是through2,一個使得我們可以簡單地創(chuàng)建轉(zhuǎn)換的Streams的小型庫。 通過through2,我們可以通過調(diào)用一個簡單的函數(shù)來創(chuàng)建一個新的可轉(zhuǎn)換的Streams:
const transform = through2([options], [_transform], [_flush]);
類似的,from2也允許我們像下面這樣創(chuàng)建一個可讀的Streams:
const readable = from2([options], _read);
接下來,我們將在本章其余部分展示它們的用法,那時,我們會清楚使用這些小型庫的好處。
through和from是基于Stream1規(guī)范的頂層庫。基于Streams的異步控制流
通過我們已經(jīng)介紹的例子,應(yīng)該清楚的是,Streams不僅可以用來處理I / O,而且可以用作處理任何類型數(shù)據(jù)的優(yōu)雅編程模式。 但優(yōu)點并不止這些;還可以利用Streams來實現(xiàn)異步控制流,在本節(jié)將會看到。
順序執(zhí)行默認情況下,Streams將按順序處理數(shù)據(jù);例如,轉(zhuǎn)換的Streams的_transform()函數(shù)在前一個數(shù)據(jù)塊執(zhí)行callback()之后才會進行下一塊數(shù)據(jù)塊的調(diào)用。這是Streams的一個重要屬性,按正確順序處理每個數(shù)據(jù)塊至關(guān)重要,但是也可以利用這一屬性將Streams實現(xiàn)優(yōu)雅的傳統(tǒng)控制流模式。
代碼總是比太多的解釋要好得多,所以讓我們來演示一下如何使用流來按順序執(zhí)行異步任務(wù)的例子。讓我們創(chuàng)建一個函數(shù)來連接一組接收到的文件作為輸入,確保遵守提供的順序。我們創(chuàng)建一個名為concatFiles.js的新模塊,并從其依賴開始:
const fromArray = require("from2-array"); const through = require("through2"); const fs = require("fs");
我們將使用through2來簡化轉(zhuǎn)換的Streams的創(chuàng)建,并使用from2-array從一個對象數(shù)組中創(chuàng)建可讀的Streams。
接下來,我們可以定義concatFiles()函數(shù):
function concatFiles(destination, files, callback) { const destStream = fs.createWriteStream(destination); fromArray.obj(files) //[1] .pipe(through.obj((file, enc, done) => { //[2] const src = fs.createReadStream(file); src.pipe(destStream, {end: false}); src.on("end", done); //[3] })) .on("finish", () => { //[4] destStream.end(); callback(); }); } module.exports = concatFiles;
前面的函數(shù)通過將files數(shù)組轉(zhuǎn)換為Streams來實現(xiàn)對files數(shù)組的順序迭代。 該函數(shù)所遵循的程序解釋如下:
首先,我們使用from2-array從files數(shù)組創(chuàng)建一個可讀的Streams。
接下來,我們使用through來創(chuàng)建一個轉(zhuǎn)換的Streams來處理序列中的每個文件。對于每個文件,我們創(chuàng)建一個可讀的Streams,并通過管道將其輸入到表示輸出文件的destStream中。 在源文件完成讀取后,通過在pipe()方法的第二個參數(shù)中指定{end:false},我們確保不關(guān)閉destStream。
當源文件的所有內(nèi)容都被傳送到destStream時,我們調(diào)用through.obj公開的done函數(shù)來傳遞當前處理已經(jīng)完成,在我們的情況下這是需要觸發(fā)處理下一個文件。
所有文件處理完后,finish事件被觸發(fā)。我們最后可以結(jié)束destStream并調(diào)用concatFiles()的callback()函數(shù),這個函數(shù)表示整個操作的完成。
我們現(xiàn)在可以嘗試使用我們剛剛創(chuàng)建的小模塊。讓我們創(chuàng)建一個名為concat.js的新文件來完成一個示例:
const concatFiles = require("./concatFiles"); concatFiles(process.argv[2], process.argv.slice(3), () => { console.log("Files concatenated successfully"); });
我們現(xiàn)在可以運行上述程序,將目標文件作為第一個命令行參數(shù),接著是要連接的文件列表,例如:
node concat allTogether.txt file1.txt file2.txt
執(zhí)行這一條命令,會創(chuàng)建一個名為allTogether.txt的新文件,其中按順序保存file1.txt和file2.txt的內(nèi)容。
使用concatFiles()函數(shù),我們能夠僅使用Streams實現(xiàn)異步操作的順序執(zhí)行。正如我們在Chapter3 Asynchronous Control Flow Patters with Callbacks中看到的那樣,如果使用純JavaScript實現(xiàn),或者使用async等外部庫,則需要使用或?qū)崿F(xiàn)迭代器。我們現(xiàn)在提供了另外一個可以達到同樣效果的方法,正如我們所看到的,它的實現(xiàn)方式非常優(yōu)雅且可讀性高。
模式:使用Streams或Streams的組合,可以輕松地按順序遍歷一組異步任務(wù)。無序并行執(zhí)行
我們剛剛看到Streams按順序處理每個數(shù)據(jù)塊,但有時這可能并不能這么做,因為這樣并沒有充分利用Node.js的并發(fā)性。如果我們必須對每個數(shù)據(jù)塊執(zhí)行一個緩慢的異步操作,那么并行化執(zhí)行這一組異步任務(wù)完全是有必要的。當然,只有在每個數(shù)據(jù)塊之間沒有關(guān)系的情況下才能應(yīng)用這種模式,這些數(shù)據(jù)塊可能經(jīng)常發(fā)生在對象模式的Streams中,但是對于二進制模式的Streams很少使用無序的并行執(zhí)行。
注意:當處理數(shù)據(jù)的順序很重要時,不能使用無序并行執(zhí)行的Streams。
為了并行化一個可轉(zhuǎn)換的Streams的執(zhí)行,我們可以運用Chapter3 Asynchronous Control Flow Patters with Callbacks所講到的無序并行執(zhí)行的相同模式,然后做出一些改變使它們適用于Streams。讓我們看看這是如何更改的。
讓我們用一個例子直接說明:我們創(chuàng)建一個叫做parallelStream.js的模塊,然后自定義一個普通的可轉(zhuǎn)換的Streams,然后給出一系列可轉(zhuǎn)換流的方法:
const stream = require("stream"); class ParallelStream extends stream.Transform { constructor(userTransform) { super({objectMode: true}); this.userTransform = userTransform; this.running = 0; this.terminateCallback = null; } _transform(chunk, enc, done) { this.running++; this.userTransform(chunk, enc, this._onComplete.bind(this), this.push.bind(this)); done(); } _flush(done) { if(this.running > 0) { this.terminateCallback = done; } else { done(); } } _onComplete(err) { this.running--; if(err) { return this.emit("error", err); } if(this.running === 0) { this.terminateCallback && this.terminateCallback(); } } } module.exports = ParallelStream;
我們來分析一下這個新的自定義的類。正如你所看到的一樣,構(gòu)造函數(shù)接受一個userTransform()函數(shù)作為參數(shù),然后將其另存為一個實例變量;我們也調(diào)用父構(gòu)造函數(shù),并且我們默認啟用對象模式。
接下來,來看_transform()方法,在這個方法中,我們執(zhí)行userTransform()函數(shù),然后增加當前正在運行的任務(wù)個數(shù); 最后,我們通過調(diào)用done()來通知當前轉(zhuǎn)換步驟已經(jīng)完成。_transform()方法展示了如何并行處理另一項任務(wù)。我們不用等待userTransform()方法執(zhí)行完畢再調(diào)用done()。 相反,我們立即執(zhí)行done()方法。另一方面,我們提供了一個特殊的回調(diào)函數(shù)給userTransform()方法,這就是this._onComplete()方法;以便我們在userTransform()完成的時候收到通知。
在Streams終止之前,會調(diào)用_flush()方法,所以如果仍有任務(wù)正在運行,我們可以通過不立即調(diào)用done()回調(diào)函數(shù)來延遲finish事件的觸發(fā)。相反,我們將其分配給this.terminateCallback變量。為了理解Streams如何正確終止,來看_onComplete()方法。
在每組異步任務(wù)最終完成時,_onComplete()方法會被調(diào)用。首先,它會檢查是否有任務(wù)正在運行,如果沒有,則調(diào)用this.terminateCallback()函數(shù),這將導致Streams結(jié)束,觸發(fā)_flush()方法的finish事件。
利用剛剛構(gòu)建的ParallelStream類可以輕松地創(chuàng)建一個無序并行執(zhí)行的可轉(zhuǎn)換的Streams實例,但是有個注意:它不會保留項目接收的順序。實際上,異步操作可以在任何時候都有可能完成并推送數(shù)據(jù),而跟它們開始的時刻并沒有必然的聯(lián)系。因此我們知道,對于二進制模式的Streams并不適用,因為二進制的Streams對順序要求較高。
現(xiàn)在,讓我們使用ParallelStream模塊實現(xiàn)一個具體的例子。讓我們想象以下我們想要構(gòu)建一個簡單的服務(wù)來監(jiān)控一個大URL列表的狀態(tài),讓我們想象以下,所有的這些URL包含在一個多帶帶的文件中,并且每一個URL占據(jù)一個空行。
Streams能夠為這個場景提供一個高效且優(yōu)雅的解決方案。特別是當我們使用我們剛剛寫的ParallelStream類來無序地審核這些URL。
接下來,讓我們創(chuàng)建一個簡單的放在checkUrls.js模塊的應(yīng)用程序。
const fs = require("fs"); const split = require("split"); const request = require("request"); const ParallelStream = require("./parallelStream"); fs.createReadStream(process.argv[2]) //[1] .pipe(split()) //[2] .pipe(new ParallelStream((url, enc, done, push) => { //[3] if(!url) return done(); request.head(url, (err, response) => { push(url + " is " + (err ? "down" : "up") + " "); done(); }); })) .pipe(fs.createWriteStream("results.txt")) //[4] .on("finish", () => console.log("All urls were checked")) ;
正如我們所看到的,通過流,我們的代碼看起來非常優(yōu)雅,直觀。 讓我們看看它是如何工作的:
首先,我們通過給定的文件參數(shù)創(chuàng)建一個可讀的Streams,便于接下來讀取文件。
我們通過split將輸入的文件的Streams的內(nèi)容輸出一個可轉(zhuǎn)換的Streams到管道中,并且分隔了數(shù)據(jù)塊的每一行。
然后,是時候使用我們的ParallelStream來檢查URL了,我們發(fā)送一個HEAD請求然后等待請求的response。當請求返回時,我們把請求的結(jié)果push到stream中。
最后,通過管道把結(jié)果保存到results.txt文件中。
node checkUrls urlList.txt
這里的文件urlList.txt包含一組URL,例如:
http://www.mariocasciaro.me/
http://loige.co/
http://thiswillbedownforsure.com/
當應(yīng)用執(zhí)行完成后,我們可以看到一個文件results.txt被創(chuàng)建,里面包含有操作的結(jié)果,例如:
http://thiswillbedownforsure.com is down
http://loige.co is up
http://www.mariocasciaro.me is up
輸出的結(jié)果的順序很有可能與輸入文件中指定URL的順序不同。這是Streams無序并行執(zhí)行任務(wù)的明顯特征。
出于好奇,我們可能想嘗試用一個正常的through2流替換ParallelStream,并比較兩者的行為和性能(你可能想這樣做的一個練習)。我們將會看到,使用through2的方式會比較慢,因為每個URL都將按順序進行檢查,而且文件results.txt中結(jié)果的順序也會被保留。無序限制并行執(zhí)行
如果運行包含數(shù)千或數(shù)百萬個URL的文件的checkUrls應(yīng)用程序,我們肯定會遇到麻煩。我們的應(yīng)用程序?qū)⑼瑫r創(chuàng)建不受控制的連接數(shù)量,并行發(fā)送大量數(shù)據(jù),并可能破壞應(yīng)用程序的穩(wěn)定性和整個系統(tǒng)的可用性。我們已經(jīng)知道,控制負載的無序限制并行執(zhí)行是一個極好的解決方案。
讓我們通過創(chuàng)建一個limitedParallelStream.js模塊來看看它是如何工作的,這個模塊是改編自上一節(jié)中創(chuàng)建的parallelStream.js模塊。
讓我們看看它的構(gòu)造函數(shù):
class LimitedParallelStream extends stream.Transform { constructor(concurrency, userTransform) { super({objectMode: true}); this.concurrency = concurrency; this.userTransform = userTransform; this.running = 0; this.terminateCallback = null; this.continueCallback = null; } // ... }
我們需要一個concurrency變量作為輸入來限制并發(fā)量,這次我們要保存兩個回調(diào)函數(shù),continueCallback用于任何掛起的_transform方法,terminateCallback用于_flush方法的回調(diào)。
接下來看_transform()方法:
_transform(chunk, enc, done) { this.running++; this.userTransform(chunk, enc, this.push.bind(this), this._onComplete.bind(this)); if(this.running < this.concurrency) { done(); } else { this.continueCallback = done; } }
這次在_transform()方法中,我們必須在調(diào)用done()之前檢查是否達到了最大并行數(shù)量的限制,如果沒有達到了限制,才能觸發(fā)下一個項目的處理。如果我們已經(jīng)達到最大并行數(shù)量的限制,我們可以簡單地將done()回調(diào)保存到continueCallback變量中,以便在任務(wù)完成后立即調(diào)用它。
_flush()方法與ParallelStream類保持完全一樣,所以我們直接轉(zhuǎn)到實現(xiàn)_onComplete()方法:
_onComplete(err) { this.running--; if(err) { return this.emit("error", err); } const tmpCallback = this.continueCallback; this.continueCallback = null; tmpCallback && tmpCallback(); if(this.running === 0) { this.terminateCallback && this.terminateCallback(); } }
每當任務(wù)完成,我們調(diào)用任何已保存的continueCallback()將導致
stream解鎖,觸發(fā)下一個項目的處理。
這就是limitedParallelStream模塊。 我們現(xiàn)在可以在checkUrls模塊中使用它來代替parallelStream,并且將我們的任務(wù)的并發(fā)限制在我們設(shè)置的值上。
順序并行執(zhí)行我們以前創(chuàng)建的并行Streams可能會使得數(shù)據(jù)的順序混亂,但是在某些情況下這是不可接受的。有時,實際上,有那種需要每個數(shù)據(jù)塊都以接收到的相同順序發(fā)出的業(yè)務(wù)場景。我們?nèi)匀豢梢圆⑿羞\行transform函數(shù)。我們所要做的就是對每個任務(wù)發(fā)出的數(shù)據(jù)進行排序,使其遵循與接收數(shù)據(jù)相同的順序。
這種技術(shù)涉及使用buffer,在每個正在運行的任務(wù)發(fā)出時重新排序塊。為簡潔起見,我們不打算提供這樣一個stream的實現(xiàn),因為這本書的范圍是相當冗長的;我們要做的就是重用為了這個特定目的而構(gòu)建的npm上的一個可用包,例如through2-parallel。
我們可以通過修改現(xiàn)有的checkUrls模塊來快速檢查一個有序的并行執(zhí)行的行為。 假設(shè)我們希望我們的結(jié)果按照與輸入文件中的URL相同的順序編寫。 我們可以使用通過through2-parallel來實現(xiàn):
const fs = require("fs"); const split = require("split"); const request = require("request"); const throughParallel = require("through2-parallel"); fs.createReadStream(process.argv[2]) .pipe(split()) .pipe(throughParallel.obj({concurrency: 2}, function (url, enc, done) { if(!url) return done(); request.head(url, (err, response) => { this.push(url + " is " + (err ? "down" : "up") + " "); done(); }); })) .pipe(fs.createWriteStream("results.txt")) .on("finish", () => console.log("All urls were checked")) ;
正如我們所看到的,through2-parallel的接口與through2的接口非常相似;唯一的不同是在through2-parallel還可以為我們提供的transform函數(shù)指定一個并發(fā)限制。如果我們嘗試運行這個新版本的checkUrls,我們會看到results.txt文件列出結(jié)果的順序與輸入文件中
URLs的出現(xiàn)順序是一樣的。
通過這個,我們總結(jié)了使用Streams實現(xiàn)異步控制流的分析;接下來,我們研究管道模式。
管道模式就像在現(xiàn)實生活中一樣,Node.js的Streams也可以按照不同的模式進行管道連接。事實上,我們可以將兩個不同的Streams合并成一個Streams,將一個Streams分成兩個或更多的管道,或者根據(jù)條件重定向流。 在本節(jié)中,我們將探討可應(yīng)用于Node.js的Streams最重要的管道技術(shù)。
組合的Streams在本章中,我們強調(diào)Streams提供了一個簡單的基礎(chǔ)結(jié)構(gòu)來模塊化和重用我們的代碼,但是卻漏掉了一個重要的部分:如果我們想要模塊化和重用整個流水線?如果我們想要合并多個Streams,使它們看起來像外部的Streams,那該怎么辦?下圖顯示了這是什么意思:
從上圖中,我們看到了如何組合幾個流的了:
當我們寫入組合的Streams的時候,實際上我們是寫入組合的Streams的第一個單元,即StreamA。
當我們從組合的Streams中讀取信息時,實際上我們從組合的Streams的最后一個單元中讀取。
一個組合的Streams通常是一個多重的Streams,通過連接第一個單元的寫入端和連接最后一個單元的讀取端。
要從兩個不同的Streams(一個可讀的Streams和一個可寫入的Streams)中創(chuàng)建一個多重的Streams,我們可以使用一個npm模塊,例如duplexer2。
但上述這么做并不完整。實際上,組合的Streams還應(yīng)該做到捕獲到管道中任意一段Streams單元產(chǎn)生的錯誤。我們已經(jīng)說過,任何錯誤都不會自動傳播到管道中。 所以,我們必須有適當?shù)腻e誤管理,我們將不得不顯式附加一個錯誤監(jiān)聽器到每個Streams。但是,組合的Streams實際上是一個黑盒,這意味著我們無法訪問管道中間的任何單元,所以對于管道中任意單元的異常捕獲,組合的Streams也充當聚合器的角色。
總而言之,組合的Streams具有兩個主要優(yōu)點:
管道內(nèi)部是一個黑盒,對使用者不可見。
簡化了錯誤管理,因為我們不必為管道中的每個單元附加一個錯誤偵聽器,而只需要給組合的Streams自身附加上就可以了。
組合的Streams是一個非常通用和普遍的做法,所以如果我們沒有任何特殊的需要,我
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/90594.html
摘要:緩沖模塊起初就是為瀏覽器而設(shè)計的,所以能很好的處理編碼的字符串,但不能很好的處理二進制數(shù)據(jù)。有如下三個主要的流標準輸入標準輸出標準錯誤可讀流如果說,緩沖區(qū)是處理原始數(shù)據(jù)的方式的話,那么流通常是移動數(shù)據(jù)的方式。該方法讓可讀流繼續(xù)觸發(fā)事件。 緩沖(buffer)模塊 js起初就是為瀏覽器而設(shè)計的,所以能很好的處理unicode編碼的字符串,但不能很好的處理二進制數(shù)據(jù)。這是Node.js的...
摘要:表示當前正在執(zhí)行的腳本的文件名。默認編碼為模式為,為回調(diào)函數(shù),回調(diào)函數(shù)只包含錯誤信息參數(shù),在寫入失敗時返回。參數(shù)使用說明如下通過方法返回的文件描述符。 Node.js回調(diào) Node.js異步編程的直接體現(xiàn)就是回調(diào)。 阻塞代碼: const fs = require(fs); let data = fs.readFileSync(input.txt); console.log(data...
摘要:壓縮與解壓縮處理在中,可以使用模塊進行壓縮及解壓縮處理創(chuàng)建各種用于壓縮及解壓縮的對象方法說明該方法創(chuàng)建并返回一個對象該對象使用算法對數(shù)據(jù)進行壓縮處理該方法創(chuàng)建并返回一個對象該對象使用算法對數(shù)據(jù)進行壓縮處理該方法創(chuàng)建并返回一個對象該對象使用算 壓縮與解壓縮處理 在Node.js中,可以使用zlib模塊進行壓縮及解壓縮處理. 1. 創(chuàng)建各種用于壓縮及解壓縮的對象 方法 說明 zl...
摘要:一個匿名函數(shù),執(zhí)行,事件全部完成,執(zhí)行最后一句,程序執(zhí)行完畢。這個事件的監(jiān)聽器為一個匿名函數(shù),事件名稱為,當秒以后被觸發(fā)先對象發(fā)送一個事件觸發(fā)了匿名函數(shù)即監(jiān)聽器,監(jiān)聽器被執(zhí)行。 node.js事件循環(huán) node.js單進程,單線程的程序每一個api都支持回調(diào)所有的事件機制都是設(shè)計模式中的 一共是23種設(shè)計模式 http://design-patterns.readth...一個對象發(fā)生...
摘要:在創(chuàng)建時大小已經(jīng)被確定且是無法調(diào)整的,在內(nèi)存分配這塊是由層面提供而不是具體后面會講解。在這里不知道你是否認為這是很簡單的但是上面提到的一些關(guān)鍵詞二進制流緩沖區(qū),這些又都是什么呢下面嘗試做一些簡單的介紹。 showImg(https://segmentfault.com/img/remote/1460000019894717?w=1280&h=850); 多數(shù)人都擁有自己不了解的能力和機...
閱讀 3267·2021-09-23 11:55
閱讀 2600·2021-09-13 10:33
閱讀 1666·2019-08-30 15:54
閱讀 3095·2019-08-30 15:54
閱讀 2363·2019-08-30 10:59
閱讀 2369·2019-08-29 17:08
閱讀 1804·2019-08-29 13:16
閱讀 3589·2019-08-26 12:25