国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

《Node.js設(shè)計模式》使用流進行編碼

xinhaip / 3058人閱讀

摘要:如何創(chuàng)建并使用。正如我們所預料到的那樣,使用來進行大文件的讀取顯然是錯誤的。使用進行壓縮文件我們必須修復我們的應(yīng)用程序,并使其處理大文件的最簡單方法是使用的。確切地說,由返回的流。

本系列文章為《Node.js Design Patterns Second Edition》的原文翻譯和讀書筆記,在GitHub連載更新,同步翻譯版鏈接。

歡迎關(guān)注我的專欄,之后的博文將在專欄同步:

Encounter的掘金專欄

知乎專欄 Encounter的編程思考

segmentfault專欄 前端小站

Coding with Streams

StreamsNode.js最重要的組件和模式之一。 社區(qū)中有一句格言“Stream all the things(Steam就是所有的)”,僅此一點就足以描述流在Node.js中的地位。 Dominic Tarr作為Node.js社區(qū)的最大貢獻者,它將流定義為Node.js最好,也是最難以理解的概念。

使Node.jsStreams如此吸引人還有其它原因; 此外,Streams不僅與性能或效率等技術(shù)特性有關(guān),更重要的是它們的優(yōu)雅性以及它們與Node.js的設(shè)計理念完美契合的方式。

在本章中,將會學到以下內(nèi)容:

Streams對于Node.js的重要性。

如何創(chuàng)建并使用Streams

Streams作為編程范式,不只是對于I/O而言,在多種應(yīng)用場景下它的應(yīng)用和強大的功能。

管道模式和在不同的配置中連接Streams

發(fā)現(xiàn)Streams的重要性

在基于事件的平臺(如Node.js)中,處理I / O的最有效的方法是實時處理,一旦有輸入的信息,立馬進行處理,一旦有需要輸出的結(jié)果,也立馬輸出反饋。

在本節(jié)中,我們將首先介紹Node.jsStreams和它的優(yōu)點。 請記住,這只是一個概述,因為本章后面將會詳細介紹如何使用和組合Streams

Streams和Buffer的比較

我們在本書中幾乎所有看到過的異步API都是使用的Buffer模式。 對于輸入操作,Buffer模式會將來自資源的所有數(shù)據(jù)收集到Buffer區(qū)中; 一旦讀取完整個資源,就會把結(jié)果傳遞給回調(diào)函數(shù)。 下圖顯示了這個范例的一個真實的例子:

從上圖我們可以看到,在t1時刻,一些數(shù)據(jù)從資源接收并保存到緩沖區(qū)。 在t2時刻,最后一段數(shù)據(jù)被接收到另一個數(shù)據(jù)塊,完成讀取操作,這時,把整個緩沖區(qū)的內(nèi)容發(fā)送給消費者。

另一方面,Streams允許你在數(shù)據(jù)到達時立即處理數(shù)據(jù)。 如下圖所示:

這一張圖顯示了Streams如何從資源接收每個新的數(shù)據(jù)塊,并立即提供給消費者,消費者現(xiàn)在不必等待緩沖區(qū)中收集所有數(shù)據(jù)再處理每個數(shù)據(jù)塊。

但是這兩種方法有什么區(qū)別呢? 我們可以將它們概括為兩點:

空間效率

時間效率

此外,Node.jsStreams具有另一個重要的優(yōu)點:可組合性(composability)。 現(xiàn)在讓我們看看這些屬性對我們設(shè)計和編寫應(yīng)用程序的方式會產(chǎn)生什么影響。

空間效率

首先,Streams允許我們做一些看起來不可能的事情,通過緩沖數(shù)據(jù)并一次性處理。 例如,考慮一下我們必須讀取一個非常大的文件,比如說數(shù)百MB甚至千MB。 顯然,等待完全讀取文件時返回大BufferAPI不是一個好主意。 想象一下,如果并發(fā)讀取一些大文件, 我們的應(yīng)用程序很容易耗盡內(nèi)存。 除此之外,V8中的Buffer不能大于0x3FFFFFFF字節(jié)(小于1GB)。 所以,在耗盡物理內(nèi)存之前,我們可能會碰壁。

使用Buffered的API進行壓縮文件

舉一個具體的例子,讓我們考慮一個簡單的命令行接口(CLI)的應(yīng)用程序,它使用Gzip格式壓縮文件。 使用BufferedAPI,這樣的應(yīng)用程序在Node.js中大概這么編寫(為簡潔起見,省略了異常處理):

const fs = require("fs");
const zlib = require("zlib");
const file = process.argv[2];
fs.readFile(file, (err, buffer) => {
  zlib.gzip(buffer, (err, buffer) => {
    fs.writeFile(file + ".gz", buffer, err => {
      console.log("File successfully compressed");
    });
  });
});

現(xiàn)在,我們可以嘗試將前面的代碼放在一個叫做gzip.js的文件中,然后執(zhí)行下面的命令:

node gzip 

如果我們選擇一個足夠大的文件,比如說大于1GB的文件,我們會收到一個錯誤信息,說明我們要讀取的文件大于最大允許的緩沖區(qū)大小,如下所示:

RangeError: File size is greater than possible Buffer:0x3FFFFFFF

上面的例子中,沒找到一個大文件,但確實對于大文件的讀取速率慢了許多。

正如我們所預料到的那樣,使用Buffer來進行大文件的讀取顯然是錯誤的。

使用Streams進行壓縮文件

我們必須修復我們的Gzip應(yīng)用程序,并使其處理大文件的最簡單方法是使用StreamsAPI。 讓我們看看如何實現(xiàn)這一點。 讓我們用下面的代碼替換剛創(chuàng)建的模塊的內(nèi)容:

const fs = require("fs");
const zlib = require("zlib");
const file = process.argv[2];
fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + ".gz"))
  .on("finish", () => console.log("File successfully compressed"));

“是嗎?”你可能會問。是的;正如我們所說的,由于Streams的接口和可組合性,因此我們還能寫出這樣的更加簡潔,優(yōu)雅和精煉的代碼。 我們稍后會詳細地看到這一點,但是現(xiàn)在需要認識到的重要一點是,程序可以順暢地運行在任何大小的文件上,理想情況是內(nèi)存利用率不變。 嘗試一下(但考慮壓縮一個大文件可能需要一段時間)。

時間效率

現(xiàn)在讓我們考慮一個壓縮文件并將其上傳到遠程HTTP服務(wù)器的應(yīng)用程序的例子,該遠程HTTP服務(wù)器進而將其解壓縮并保存到文件系統(tǒng)中。如果我們的客戶端是使用BufferedAPI實現(xiàn)的,那么只有當整個文件被讀取和壓縮時,上傳才會開始。 另一方面,只有在接收到所有數(shù)據(jù)的情況下,解壓縮才會在服務(wù)器上啟動。 實現(xiàn)相同結(jié)果的更好的解決方案涉及使用Streams。 在客戶端機器上,Streams只要從文件系統(tǒng)中讀取就可以壓縮和發(fā)送數(shù)據(jù)塊,而在服務(wù)器上,只要從遠程對端接收到數(shù)據(jù)塊,就可以解壓每個數(shù)據(jù)塊。 我們通過構(gòu)建前面提到的應(yīng)用程序來展示這一點,從服務(wù)器端開始。

我們創(chuàng)建一個叫做gzipReceive.js的模塊,代碼如下:

const http = require("http");
const fs = require("fs");
const zlib = require("zlib");

const server = http.createServer((req, res) => {
  const filename = req.headers.filename;
  console.log("File request received: " + filename);
  req
    .pipe(zlib.createGunzip())
    .pipe(fs.createWriteStream(filename))
    .on("finish", () => {
      res.writeHead(201, {
        "Content-Type": "text/plain"
      });
      res.end("That"s it
");
      console.log(`File saved: ${filename}`);
    });
});

server.listen(3000, () => console.log("Listening"));

服務(wù)器從網(wǎng)絡(luò)接收數(shù)據(jù)塊,將其解壓縮,并在接收到數(shù)據(jù)塊后立即保存,這要歸功于Node.jsStreams

我們的應(yīng)用程序的客戶端將進入一個名為gzipSend.js的模塊,如下所示:

在前面的代碼中,我們再次使用Streams從文件中讀取數(shù)據(jù),然后在從文件系統(tǒng)中讀取的同時壓縮并發(fā)送每個數(shù)據(jù)塊。

現(xiàn)在,運行這個應(yīng)用程序,我們首先使用以下命令啟動服務(wù)器:

node gzipReceive

然后,我們可以通過指定要發(fā)送的文件和服務(wù)器的地址(例如localhost)來啟動客戶端:

node gzipSend  localhost

如果我們選擇一個足夠大的文件,我們將更容易地看到數(shù)據(jù)如何從客戶端流向服務(wù)器,但為什么這種模式下,我們使用Streams,比使用BufferedAPI更有效率? 下圖應(yīng)該給我們一個提示:

一個文件被處理的過程,它經(jīng)過以下階段:

客戶端從文件系統(tǒng)中讀取

客戶端壓縮數(shù)據(jù)

客戶端將數(shù)據(jù)發(fā)送到服務(wù)器

服務(wù)端接收數(shù)據(jù)

服務(wù)端解壓數(shù)據(jù)

服務(wù)端將數(shù)據(jù)寫入磁盤

為了完成處理,我們必須按照流水線順序那樣經(jīng)過每個階段,直到最后。在上圖中,我們可以看到,使用BufferedAPI,這個過程完全是順序的。為了壓縮數(shù)據(jù),我們首先必須等待整個文件被讀取完畢,然后,發(fā)送數(shù)據(jù),我們必須等待整個文件被讀取和壓縮,依此類推。當我們使用Streams時,只要我們收到第一個數(shù)據(jù)塊,流水線就會被啟動,而不需要等待整個文件的讀取。但更令人驚訝的是,當下一塊數(shù)據(jù)可用時,不需要等待上一組任務(wù)完成;相反,另一條裝配線是并行啟動的。因為我們執(zhí)行的每個任務(wù)都是異步的,這樣顯得很完美,所以可以通過Node.js來并行執(zhí)行Streams的相關(guān)操作;唯一的限制就是每個階段都必須保證數(shù)據(jù)塊的到達順序。

從前面的圖可以看出,使用Streams的結(jié)果是整個過程花費的時間更少,因為我們不用等待所有數(shù)據(jù)被全部讀取完畢和處理。

組合性

到目前為止,我們已經(jīng)看到的代碼已經(jīng)告訴我們?nèi)绾问褂?b>pipe()方法來組裝Streams的數(shù)據(jù)塊,Streams允許我們連接不同的處理單元,每個處理單元負責單一的職責(這是符合Node.js風格的)。這是可能的,因為Streams具有統(tǒng)一的接口,并且就API而言,不同Streams也可以很好的進行交互。唯一的先決條件是管道的下一個Streams必須支持上一個Streams生成的數(shù)據(jù)類型,可以是二進制,文本甚至是對象,我們將在后面的章節(jié)中看到。

為了證明Streams組合性的優(yōu)勢,我們可以嘗試在我們先前構(gòu)建的gzipReceive / gzipSend應(yīng)用程序中添加加密功能。
為此,我們只需要通過向流水線添加另一個Streams來更新客戶端。 確切地說,由crypto.createChipher()返回的流。 由此產(chǎn)生的代碼應(yīng)如下所示:

const fs = require("fs");
const zlib = require("zlib");
const crypto = require("crypto");
const http = require("http");
const path = require("path");

const file = process.argv[2];
const server = process.argv[3];

const options = {
  hostname: server,
  port: 3000,
  path: "/",
  method: "PUT",
  headers: {
    filename: path.basename(file),
    "Content-Type": "application/octet-stream",
    "Content-Encoding": "gzip"
  }
};

const req = http.request(options, res => {
  console.log("Server response: " + res.statusCode);
});

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipher("aes192", "a_shared_secret"))
  .pipe(req)
  .on("finish", () => {
    console.log("File successfully sent");
  });

使用相同的方式,我們更新服務(wù)端的代碼,使得它可以在數(shù)據(jù)塊進行解壓之前先解密:

const http = require("http");
const fs = require("fs");
const zlib = require("zlib");
const crypto = require("crypto");

const server = http.createServer((req, res) => {
  const filename = req.headers.filename;
  console.log("File request received: " + filename);
  req
    .pipe(crypto.createDecipher("aes192", "a_shared_secret"))
    .pipe(zlib.createGunzip())
    .pipe(fs.createWriteStream(filename))
    .on("finish", () => {
      res.writeHead(201, {
        "Content-Type": "text/plain"
      });
      res.end("That"s it
");
      console.log(`File saved: ${filename}`);
    });
});

server.listen(3000, () => console.log("Listening"));
crypto是Node.js的核心模塊之一,提供了一系列加密算法。

只需幾行代碼,我們就在應(yīng)用程序中添加了一個加密層。 我們只需要簡單地通過把已經(jīng)存在的Streams模塊和加密層組合到一起,就可以。類似的,我們可以添加和合并其他Streams,如同在玩樂高積木一樣。

顯然,這種方法的主要優(yōu)點是可重用性,但正如我們從目前為止所介紹的代碼中可以看到的那樣,Streams也可以實現(xiàn)更清晰,更模塊化,更加簡潔的代碼。 出于這些原因,流通常不僅僅用于處理純粹的I / O,而且它還是簡化和模塊化代碼的手段。

開始使用Streams

在前面的章節(jié)中,我們了解了為什么Streams如此強大,而且它在Node.js中無處不在,甚至在Node.js的核心模塊中也有其身影。 例如,我們已經(jīng)看到,fs模塊具有用于從文件讀取的createReadStream()和用于寫入文件的createWriteStream()HTTP請求和響應(yīng)對象本質(zhì)上是Streams,并且zlib模塊允許我們使用StreamsAPI壓縮和解壓縮數(shù)據(jù)塊。

現(xiàn)在我們知道為什么Streams是如此重要,讓我們退后一步,開始更詳細地探索它。

Streams的結(jié)構(gòu)

Node.js中的每個Streams都是Streams核心模塊中可用的四個基本抽象類之一的實現(xiàn):

stream.Readable

stream.Writable

stream.Duplex

stream.Transform

每個stream類也是EventEmitter的一個實例。實際上,Streams可以產(chǎn)生幾種類型的事件,比如end事件會在一個可讀的Streams完成讀取,或者錯誤讀取,或其過程中產(chǎn)生異常時觸發(fā)。

請注意,為簡潔起見,在本章介紹的例子中,我們經(jīng)常會忽略適當?shù)腻e誤處理。但是,在生產(chǎn)環(huán)境下中,總是建議為所有Stream注冊錯誤事件偵聽器。

Streams之所以如此靈活的原因之一是它不僅能夠處理二進制數(shù)據(jù),而且?guī)缀蹩梢蕴幚砣魏?b>JavaScript值。實際上,Streams可以支持兩種操作模式:

二進制模式:以數(shù)據(jù)塊形式(例如buffersstrings)流式傳輸數(shù)據(jù)

對象模式:將流數(shù)據(jù)視為一系列離散對象(這使得我們幾乎可以使用任何JavaScript值)

這兩種操作模式使我們不僅可以使用I / O流,而且還可以作為一種工具,以函數(shù)式的風格優(yōu)雅地組合處理單元,我們將在本章后面看到。

在本章中,我們將主要使用在Node.js 0.11中引入的Node.js流接口,也稱為版本3。 有關(guān)與舊接口差異的更多詳細信息,請參閱StrongLoop在https://strongloop.com/strong...。
可讀的Streams

一個可讀的Streams表示一個數(shù)據(jù)源,在Node.js中,它使用stream模塊中的Readableabstract類實現(xiàn)。

從Streams中讀取信息

從可讀Streams接收數(shù)據(jù)有兩種方式:non-flowing模式和flowing模式。 我們來更詳細地分析這些模式。

non-flowing模式(不流動模式)

從可讀的Streams中讀取數(shù)據(jù)的默認模式是為其附加一個可讀事件偵聽器,用于指示要讀取的新數(shù)據(jù)的可用性。然后,在一個循環(huán)中,我們讀取所有的數(shù)據(jù),直到內(nèi)部buffer被清空。這可以使用read()方法完成,該方法同步從內(nèi)部緩沖區(qū)中讀取數(shù)據(jù),并返回表示數(shù)據(jù)塊的BufferString對象。read()方法以如下使用模式:

readable.read([size]);

使用這種方法,數(shù)據(jù)隨時可以直接從Streams中按需提取。

為了說明這是如何工作的,我們創(chuàng)建一個名為readStdin.js的新模塊,它實現(xiàn)了一個簡單的程序,它從標準輸入(一個可讀流)中讀取數(shù)據(jù),并將所有數(shù)據(jù)回送到標準輸出:

process.stdin
  .on("readable", () => {
    let chunk;
    console.log("New data available");
    while ((chunk = process.stdin.read()) !== null) {
      console.log(
        `Chunk read: (${chunk.length}) "${chunk.toString()}"`
      );
    }
  })
  .on("end", () => process.stdout.write("End of stream"));

read()方法是一個同步操作,它從可讀Streams的內(nèi)部Buffers區(qū)中提取數(shù)據(jù)塊。如果Streams在二進制模式下工作,返回的數(shù)據(jù)塊默認為一個Buffer對象。

在以二進制模式工作的可讀的Stream中,我們可以通過在Stream上調(diào)用setEncoding(encoding)來讀取字符串而不是Buffer對象,并提供有效的編碼格式(例如utf8)。

數(shù)據(jù)是從可讀的偵聽器中讀取的,只要有新的數(shù)據(jù),就會調(diào)用這個偵聽器。當內(nèi)部緩沖區(qū)中沒有更多數(shù)據(jù)可用時,read()方法返回null;在這種情況下,我們不得不等待另一個可讀的事件被觸發(fā),告訴我們可以再次讀取或者等待表示Streams讀取過程結(jié)束的end事件觸發(fā)。當一個流以二進制模式工作時,我們也可以通過向read()方法傳遞一個size參數(shù)來指定我們想要讀取的數(shù)據(jù)大小。這在實現(xiàn)網(wǎng)絡(luò)協(xié)議或解析特定數(shù)據(jù)格式時特別有用。

現(xiàn)在,我們準備運行readStdin模塊并進行實驗。讓我們在控制臺中鍵入一些字符,然后按Enter鍵查看回顯到標準輸出中的數(shù)據(jù)。要終止流并因此生成一個正常的結(jié)束事件,我們需要插入一個EOF(文件結(jié)束)字符(在Windows上使用Ctrl + Z或在Linux上使用Ctrl + D)。

我們也可以嘗試將我們的程序與其他程序連接起來;這可以使用管道運算符(|),它將程序的標準輸出重定向到另一個程序的標準輸入。例如,我們可以運行如下命令:

cat  | node readStdin

這是流式范例是一個通用接口的一個很好的例子,它使得我們的程序能夠進行通信,而不管它們是用什么語言寫的。

flowing模式(流動模式)

Streams中讀取的另一種方法是將偵聽器附加到data事件;這會將Streams切換為flowing模式,其中數(shù)據(jù)不是使用read()函數(shù)來提取的,而是一旦有數(shù)據(jù)到達data監(jiān)聽器就被推送到監(jiān)聽器內(nèi)。例如,我們之前創(chuàng)建的readStdin應(yīng)用程序?qū)⑹褂昧鲃幽J剑?/p>

process.stdin
  .on("data", chunk => {
    console.log("New data available");
    console.log(
      `Chunk read: (${chunk.length}) "${chunk.toString()}"`
    );
  })
  .on("end", () => process.stdout.write("End of stream"));

flowing模式是舊版Streams接口(也稱為Streams1)的繼承,其靈活性較低,API較少。隨著Streams2接口的引入,flowing模式不是默認的工作模式,要啟用它,需要將偵聽器附加到data事件或顯式調(diào)用resume()方法。 要暫時中斷Streams觸發(fā)data事件,我們可以調(diào)用pause()方法,導致任何傳入數(shù)據(jù)緩存在內(nèi)部buffer中。

調(diào)用pause()不會導致Streams切換回non-flowing模式。
實現(xiàn)可讀的Streams

現(xiàn)在我們知道如何從Streams中讀取數(shù)據(jù),下一步是學習如何實現(xiàn)一個新的Readable數(shù)據(jù)流。為此,有必要通過繼承stream.Readable的原型來創(chuàng)建一個新的類。 具體流必須提供_read()方法的實現(xiàn):

readable._read(size)

Readable類的內(nèi)部將調(diào)用_read()方法,而該方法又將啟動
使用push()填充內(nèi)部緩沖區(qū):

請注意,read()是Stream消費者調(diào)用的方法,而_read()是一個由Stream子類實現(xiàn)的方法,不能直接調(diào)用。下劃線通常表示該方法為私有方法,不應(yīng)該直接調(diào)用。

為了演示如何實現(xiàn)新的可讀Streams,我們可以嘗試實現(xiàn)一個生成隨機字符串的Streams。 我們來創(chuàng)建一個名為randomStream.js的新模塊,它將包含我們的字符串的generator的代碼:

const stream = require("stream");
const Chance = require("chance");

const chance = new Chance();

class RandomStream extends stream.Readable {
  constructor(options) {
    super(options);
  }

  _read(size) {
    const chunk = chance.string(); //[1]
    console.log(`Pushing chunk of size: ${chunk.length}`);
    this.push(chunk, "utf8"); //[2]
    if (chance.bool({
        likelihood: 5
      })) { //[3]
      this.push(null);
    }
  }
}

module.exports = RandomStream;

在文件頂部,我們將加載我們的依賴關(guān)系。除了我們正在加載一個chance的npm模塊之外,沒有什么特別之處,它是一個用于生成各種隨機值的庫,從數(shù)字到字符串到整個句子都能生成隨機值。

下一步是創(chuàng)建一個名為RandomStream的新類,并指定stream.Readable作為其父類。 在前面的代碼中,我們調(diào)用父類的構(gòu)造函數(shù)來初始化其內(nèi)部狀態(tài),并將收到的options參數(shù)作為輸入。通過options對象傳遞的可能參數(shù)包括以下內(nèi)容:

用于將Buffers轉(zhuǎn)換為Stringsencoding參數(shù)(默認值為null

是否啟用對象模式(objectMode默認為false

存儲在內(nèi)部buffer區(qū)中的數(shù)據(jù)的上限,一旦超過這個上限,則暫停從data source讀取(highWaterMark默認為16KB

好的,現(xiàn)在讓我們來解釋一下我們重寫的stream.Readable類的_read()方法:

該方法使用chance生成隨機字符串。

它將字符串push內(nèi)部buffer。 請注意,由于我們push的是String,此外我們還指定了編碼為utf8(如果數(shù)據(jù)塊只是一個二進制Buffer,則不需要)。

5%的概率隨機中斷stream的隨機字符串產(chǎn)生,通過push null到內(nèi)部Buffer來表示EOF,即stream的結(jié)束。

我們還可以看到在_read()函數(shù)的輸入中給出的size參數(shù)被忽略了,因為它是一個建議的參數(shù)。 我們可以簡單地把所有可用的數(shù)據(jù)都push到內(nèi)部的buffer中,但是如果在同一個調(diào)用中有多個推送,那么我們應(yīng)該檢查push()是否返回false,因為這意味著內(nèi)部buffer已經(jīng)達到了highWaterMark限制,我們應(yīng)該停止添加更多的數(shù)據(jù)。

以上就是RandomStream模塊,我們現(xiàn)在準備好使用它。我們來創(chuàng)建一個名為generateRandom.js的新模塊,在這個模塊中我們實例化一個新的RandomStream對象并從中提取一些數(shù)據(jù):

const RandomStream = require("./randomStream");
const randomStream = new RandomStream();

randomStream.on("readable", () => {
  let chunk;
  while ((chunk = randomStream.read()) !== null) {
    console.log(`Chunk received: ${chunk.toString()}`);
  }
});

現(xiàn)在,一切都準備好了,我們嘗試新的自定義的stream。 像往常一樣簡單地執(zhí)行generateRandom模塊,觀察隨機的字符串在屏幕上流動。

可寫的Streams

一個可寫的stream表示一個數(shù)據(jù)終點,在Node.js中,它使用stream模塊中的Writable抽象類來實現(xiàn)。

寫入一個stream

把一些數(shù)據(jù)放在可寫入的stream中是一件簡單的事情, 我們所要做的就是使用write()方法,它具有以下格式:

writable.write(chunk, [encoding], [callback])

encoding參數(shù)是可選的,其在chunkString類型時指定(默認為utf8,如果chunkBuffer,則忽略);當數(shù)據(jù)塊被刷新到底層資源中時,callback就會被調(diào)用,callback參數(shù)也是可選的。

為了表示沒有更多的數(shù)據(jù)將被寫入stream中,我們必須使用end()方法:

writable.end([chunk], [encoding], [callback])

我們可以通過end()方法提供最后一塊數(shù)據(jù)。在這種情況下,callbak函數(shù)相當于為finish事件注冊一個監(jiān)聽器,當數(shù)據(jù)塊全部被寫入stream中時,會觸發(fā)該事件。

現(xiàn)在,讓我們通過創(chuàng)建一個輸出隨機字符串序列的小型HTTP服務(wù)器來演示這是如何工作的:

const Chance = require("chance");
const chance = new Chance();

require("http").createServer((req, res) => {
  res.writeHead(200, {
    "Content-Type": "text/plain"
  }); //[1]
  while (chance.bool({
      likelihood: 95
    })) { //[2]
    res.write(chance.string() + "
"); //[3]
  }
  res.end("
The end...
"); //[4]
  res.on("finish", () => console.log("All data was sent")); //[5]
}).listen(8080, () => console.log("Listening on http://localhost:8080"));

我們創(chuàng)建了一個HTTP服務(wù)器,并把數(shù)據(jù)寫入res對象,res對象是http.ServerResponse的一個實例,也是一個可寫入的stream。下面來解釋上述代碼發(fā)生了什么:

我們首先寫HTTP response的頭部。請注意,writeHead()不是Writable接口的一部分,實際上,這個方法是http.ServerResponse類公開的輔助方法。

我們開始一個5%的概率終止的循環(huán)(進入循環(huán)體的概率為chance.bool()產(chǎn)生,其為95%)。

在循環(huán)內(nèi)部,我們寫入一個隨機字符串到stream

一旦我們不在循環(huán)中,我們調(diào)用streamend(),表示沒有更多

數(shù)據(jù)塊將被寫入。另外,我們在結(jié)束之前提供一個最終的字符串寫入流中。

最后,我們注冊一個finish事件的監(jiān)聽器,當所有的數(shù)據(jù)塊都被刷新到底層socket中時,這個事件將被觸發(fā)。

我們可以調(diào)用這個小模塊稱為entropyServer.js,然后執(zhí)行它。要測試這個服務(wù)器,我們可以在地址http:// localhost:8080打開一個瀏覽器,或者從終端使用curl命令,如下所示:

curl localhost:8080

此時,服務(wù)器應(yīng)該開始向您選擇的HTTP客戶端發(fā)送隨機字符串(請注意,某些瀏覽器可能會緩沖數(shù)據(jù),并且流式傳輸行為可能不明顯)。

Back-pressure(反壓)

類似于在真實管道系統(tǒng)中流動的液體,Node.jsstream也可能遭受瓶頸,數(shù)據(jù)寫入速度可能快于stream的消耗。 解決這個問題的機制包括緩沖輸入數(shù)據(jù);然而,如果數(shù)據(jù)stream沒有給生產(chǎn)者任何反饋,我們可能會產(chǎn)生越來越多的數(shù)據(jù)被累積到內(nèi)部緩沖區(qū)的情況,導致內(nèi)存泄露的發(fā)生。

為了防止這種情況的發(fā)生,當內(nèi)部buffer超過highWaterMark限制時,writable.write()將返回false。 可寫入的stream具有highWaterMark屬性,這是write()方法開始返回false的內(nèi)部Buffer區(qū)大小的限制,一旦Buffer區(qū)的大小超過這個限制,表示應(yīng)用程序應(yīng)該停止寫入。 當緩沖器被清空時,會觸發(fā)一個叫做drain的事件,通知再次開始寫入是安全的。 這種機制被稱為back-pressure

本節(jié)介紹的機制同樣適用于可讀的stream。事實上,在可讀stream中也存在back-pressure,并且在_read()內(nèi)調(diào)用的push()方法返回false時觸發(fā)。 但是,這對于stream實現(xiàn)者來說是一個特定的問題,所以我們將不經(jīng)常處理它。

我們可以通過修改之前創(chuàng)建的entropyServer模塊來演示可寫入的streamback-pressure

const Chance = require("chance");
const chance = new Chance();

require("http").createServer((req, res) => {
  res.writeHead(200, {
    "Content-Type": "text/plain"
  });

  function generateMore() { //[1]
    while (chance.bool({
        likelihood: 95
      })) {
      const shouldContinue = res.write(
        chance.string({
          length: (16 * 1024) - 1
        }) //[2]
      );
      if (!shouldContinue) { //[3]
        console.log("Backpressure");
        return res.once("drain", generateMore);
      }
    }
    res.end("
The end...
", () => console.log("All data was sent"));
  }
  generateMore();
}).listen(8080, () => console.log("Listening on http://localhost:8080"));

前面代碼中最重要的步驟可以概括如下:

我們將主邏輯封裝在一個名為generateMore()的函數(shù)中。

為了增加獲得一些back-pressure的機會,我們將數(shù)據(jù)塊的大小增加到16KB-1Byte,這非常接近默認的highWaterMark限制。

在寫入一大塊數(shù)據(jù)之后,我們檢查res.write()的返回值。 如果它返回false,這意味著內(nèi)部buffer已滿,我們應(yīng)該停止發(fā)送更多的數(shù)據(jù)。在這種情況下,我們從函數(shù)中退出,然后新注冊一個寫入事件的發(fā)布者,當drain事件觸發(fā)時調(diào)用generateMore

如果我們現(xiàn)在嘗試再次運行服務(wù)器,然后使用curl生成客戶端請求,則很可能會有一些back-pressure,因為服務(wù)器以非常高的速度生成數(shù)據(jù),速度甚至會比底層socket更快。

實現(xiàn)可寫入的Streams

我們可以通過繼承stream.Writable類來實現(xiàn)一個新的可寫入的流,并為_write()方法提供一個實現(xiàn)。實現(xiàn)一個我們自定義的可寫入的Streams類。

讓我們構(gòu)建一個可寫入的stream,它接收對象的格式如下:

{
  path: 
  content: 
}

這個類的作用是這樣的:對于每一個對象,我們的stream必須將content部分保存到在給定路徑中創(chuàng)建的文件中。 我們可以立即看到,我們stream的輸入是對象,而不是StringsBuffers,這意味著我們的stream必須以對象模式工作。

調(diào)用模塊toFileStream.js

const stream = require("stream");
const fs = require("fs");
const path = require("path");
const mkdirp = require("mkdirp");

class ToFileStream extends stream.Writable {
  constructor() {
    super({
      objectMode: true
    });
  }

  _write(chunk, encoding, callback) {
    mkdirp(path.dirname(chunk.path), err => {
      if (err) {
        return callback(err);
      }
      fs.writeFile(chunk.path, chunk.content, callback);
    });
  }
}
module.exports = ToFileStream;

作為第一步,我們加載所有我們所需要的依賴包。注意,我們需要模塊mkdirp,正如你應(yīng)該從前幾章中所知道的,它應(yīng)該使用npm安裝。

我們創(chuàng)建了一個新類,它從stream.Writable擴展而來。

我們不得不調(diào)用父構(gòu)造函數(shù)來初始化其內(nèi)部狀態(tài);我們還提供了一個option對象作為參數(shù),用于指定流在對象模式下工作(objectMode:true)。stream.Writable接受的其他選項如下:

highWaterMark(默認值是16KB):控制back-pressure的上限。

decodeStrings(默認為true):在字符串傳遞給_write()方法之前,將字符串自動解碼為二進制buffer區(qū)。 在對象模式下這個參數(shù)被忽略。

最后,我們?yōu)?b>_write()方法提供了一個實現(xiàn)。正如你所看到的,這個方法接受一個數(shù)據(jù)塊,一個編碼方式(只有在二進制模式下,stream選項decodeStrings設(shè)置為false時才有意義)。

另外,該方法接受一個回調(diào)函數(shù),該函數(shù)在操作完成時需要調(diào)用;而不必要傳遞操作的結(jié)果,但是如果需要的話,我們?nèi)匀豢梢詡鬟f一個error對象,這將導致stream觸發(fā)error事件。

現(xiàn)在,為了嘗試我們剛剛構(gòu)建的stream,我們可以創(chuàng)建一個名為writeToFile.js的新模塊,并對該流執(zhí)行一些寫操作:

const ToFileStream = require("./toFileStream.js");
const tfs = new ToFileStream();

tfs.write({path: "file1.txt", content: "Hello"});
tfs.write({path: "file2.txt", content: "Node.js"});
tfs.write({path: "file3.txt", content: "Streams"});
tfs.end(() => console.log("All files created"));

有了這個,我們創(chuàng)建并使用了我們的第一個自定義的可寫入流。 像往常一樣運行新模塊來檢查其輸出;你會看到執(zhí)行后會創(chuàng)建三個新文件。

雙重的Streams

雙重的stream既是可讀的,也可寫的。 當我們想描述一個既是數(shù)據(jù)源又是數(shù)據(jù)終點的實體時(例如socket),這就顯得十分有用了。 雙工流繼承stream.Readablestream.Writable的方法,所以它對我們來說并不新鮮。這意味著我們可以read()write()數(shù)據(jù),或者可以監(jiān)聽readabledrain事件。

要創(chuàng)建一個自定義的雙重stream,我們必須為_read()_write()提供一個實現(xiàn)。傳遞給Duplex()構(gòu)造函數(shù)的options對象在內(nèi)部被轉(zhuǎn)發(fā)給ReadableWritable的構(gòu)造函數(shù)。options參數(shù)的內(nèi)容與前面討論的相同,options增加了一個名為allowHalfOpen值(默認為true),如果設(shè)置為false,則會導致只要stream的一方(ReadableWritable)結(jié)束,stream就結(jié)束了。

為了使雙重的stream在一方以對象模式工作,而在另一方以二進制模式工作,我們需要在流構(gòu)造器中手動設(shè)置以下屬性:
this._writableState.objectMode
this._readableState.objectMode
轉(zhuǎn)換的Streams

轉(zhuǎn)換的Streams是專門設(shè)計用于處理數(shù)據(jù)轉(zhuǎn)換的一種特殊類型的雙重Streams

在一個簡單的雙重Streams中,從stream中讀取的數(shù)據(jù)和寫入到其中的數(shù)據(jù)之間沒有直接的關(guān)系(至少stream是不可知的)。 想想一個TCP socket,它只是向遠程節(jié)點發(fā)送數(shù)據(jù)和從遠程節(jié)點接收數(shù)據(jù)。TCP socket自身沒有意識到輸入和輸出之間有任何關(guān)系。

下圖說明了雙重Streams中的數(shù)據(jù)流:

另一方面,轉(zhuǎn)換的Streams對從可寫入端接收到的每個數(shù)據(jù)塊應(yīng)用某種轉(zhuǎn)換,然后在其可讀端使轉(zhuǎn)換的數(shù)據(jù)可用。

下圖顯示了數(shù)據(jù)如何在轉(zhuǎn)換的Streams中流動:

從外面看,轉(zhuǎn)換的Streams的接口與雙重Streams的接口完全相同。但是,當我們想要構(gòu)建一個新的雙重Streams時,我們必須提供_read()_write()方法,而為了實現(xiàn)一個新的變換流,我們必須填寫另一對方法:_transform()_flush())。

我們來演示如何用一個例子來創(chuàng)建一個新的轉(zhuǎn)換的Streams

實現(xiàn)轉(zhuǎn)換的Streams

我們來實現(xiàn)一個轉(zhuǎn)換的Streams,它將替換給定所有出現(xiàn)的字符串。 要做到這一點,我們必須創(chuàng)建一個名為replaceStream.js的新模塊。 讓我們直接看怎么實現(xiàn)它:

const stream = require("stream");
const util = require("util");

class ReplaceStream extends stream.Transform {
  constructor(searchString, replaceString) {
    super();
    this.searchString = searchString;
    this.replaceString = replaceString;
    this.tailPiece = "";
  }

  _transform(chunk, encoding, callback) {
    const pieces = (this.tailPiece + chunk)         //[1]
      .split(this.searchString);
    const lastPiece = pieces[pieces.length - 1];
    const tailPieceLen = this.searchString.length - 1;

    this.tailPiece = lastPiece.slice(-tailPieceLen);     //[2]
    pieces[pieces.length - 1] = lastPiece.slice(0,-tailPieceLen);

    this.push(pieces.join(this.replaceString));       //[3]
    callback();
  }

  _flush(callback) {
    this.push(this.tailPiece);
    callback();
  }
}

module.exports = ReplaceStream;

與往常一樣,我們將從其依賴項開始構(gòu)建模塊。這次我們沒有使用第三方模塊。

然后我們創(chuàng)建了一個從stream.Transform基類繼承的新類。該類的構(gòu)造函數(shù)接受兩個參數(shù):searchStringreplaceString。 正如你所想象的那樣,它們允許我們定義要匹配的文本以及用作替換的字符串。 我們還初始化一個將由_transform()方法使用的tailPiece內(nèi)部變量。

現(xiàn)在,我們來分析一下_transform()方法,它是我們新類的核心。_transform()方法與可寫入的stream_write()方法具有幾乎相同的格式,但不是將數(shù)據(jù)寫入底層資源,而是使用this.push()將其推入內(nèi)部buffer,這與我們會在可讀流的_read()方法中執(zhí)行。這顯示了轉(zhuǎn)換的Streams的雙方如何實際連接。

ReplaceStream_transform()方法實現(xiàn)了我們這個新類的核心。正常情況下,搜索和替換buffer區(qū)中的字符串是一件容易的事情;但是,當數(shù)據(jù)流式傳輸時,情況則完全不同,可能的匹配可能分布在多個數(shù)據(jù)塊中。代碼后面的程序可以解釋如下:

我們的算法使用searchString函數(shù)作為分隔符來分割塊。

然后,它取出分隔后生成的數(shù)組的最后一項lastPiece,并提取其最后一個字符searchString.length - 1。結(jié)果被保存到tailPiece變量中,它將會被作為下一個數(shù)據(jù)塊的前綴。

最后,所有從split()得到的片段用replaceString作為分隔符連接在一起,并推入內(nèi)部buffer區(qū)。

stream結(jié)束時,我們可能仍然有最后一個tailPiece變量沒有被壓入內(nèi)部緩沖區(qū)。這正是_flush()方法的用途;它在stream結(jié)束之前被調(diào)用,并且這是我們最終有機會完成流或者在完全結(jié)束流之前推送任何剩余數(shù)據(jù)的地方。

_flush()方法只需要一個回調(diào)函數(shù)作為參數(shù),當所有的操作完成后,我們必須確保調(diào)用這個回調(diào)函數(shù)。完成了這個,我們已經(jīng)完成了我們的ReplaceStream類。

現(xiàn)在,是時候嘗試新的stream。我們可以創(chuàng)建另一個名為replaceStreamTest.js的模塊來寫入一些數(shù)據(jù),然后讀取轉(zhuǎn)換的結(jié)果:

const ReplaceStream = require("./replaceStream");

const rs = new ReplaceStream("World", "Node.js");
rs.on("data", chunk => console.log(chunk.toString()));

rs.write("Hello W");
rs.write("orld!");
rs.end();

為了使得這個例子更復雜一些,我們把搜索詞分布在兩個不同的數(shù)據(jù)塊上;然后,使用flowing模式,我們從同一個stream中讀取數(shù)據(jù),記錄每個已轉(zhuǎn)換的塊。運行前面的程序應(yīng)該產(chǎn)生以下輸出:

Hel
lo Node.js
!
有一個值得提及是,第五種類型的stream:stream.PassThrough。 與我們介紹的其他流類不同,PassThrough不是抽象的,可以直接實例化,而不需要實現(xiàn)任何方法。實際上,這是一個可轉(zhuǎn)換的stream,它可以輸出每個數(shù)據(jù)塊,而不需要進行任何轉(zhuǎn)換。
使用管道連接Streams

Unix管道的概念是由Douglas Mcllroy發(fā)明的;這使程序的輸出能夠連接到下一個的輸入。看看下面的命令:

echo Hello World! | sed s/World/Node.js/g

在前面的命令中,echo會將Hello World!寫入標準輸出,然后被重定向到sed命令的標準輸入(因為有管道操作符 |)。 然后sedNode.js替換任何World,并將結(jié)果打印到它的標準輸出(這次是控制臺)。

以類似的方式,可以使用可讀的Streamspipe()方法將Node.jsStreams連接在一起,它具有以下接口:

readable.pipe(writable, [options])

非常直觀地,pipe()方法將從可讀的Streams中發(fā)出的數(shù)據(jù)抽取到所提供的可寫入的Streams中。 另外,當可讀的Streams發(fā)出end事件(除非我們指定{end:false}作為options)時,可寫入的Streams將自動結(jié)束。 pipe()方法返回作為參數(shù)傳遞的可寫入的Streams,如果這樣的stream也是可讀的(例如雙重或可轉(zhuǎn)換的Streams),則允許我們創(chuàng)建鏈式調(diào)用。

將兩個Streams連接到一起時,則允許數(shù)據(jù)自動流向可寫入的Streams,所以不需要調(diào)用read()write()方法;但最重要的是不需要控制back-pressure,因為它會自動處理。

舉個簡單的例子(將會有大量的例子),我們可以創(chuàng)建一個名為replace.js的新模塊,它接受來自標準輸入的文本流,應(yīng)用替換轉(zhuǎn)換,然后將數(shù)據(jù)返回到標準輸出:

const ReplaceStream = require("./replaceStream");
process.stdin
  .pipe(new ReplaceStream(process.argv[2], process.argv[3]))
  .pipe(process.stdout);

上述程序?qū)碜詷藴瘦斎氲臄?shù)據(jù)傳送到ReplaceStream,然后返回到標準輸出。 現(xiàn)在,為了實踐這個小應(yīng)用程序,我們可以利用Unix管道將一些數(shù)據(jù)重定向到它的標準輸入,如下所示:

echo Hello World! | node replace World Node.js

運行上述程序,會輸出如下結(jié)果:

Hello Node.js

這個簡單的例子演示了Streams(特別是文本Streams)是一個通用接口,管道幾乎是構(gòu)成和連接所有這些接口的通用方式。

error事件不會通過管道自動傳播。舉個例子,看如下代碼片段:
stream1
  .pipe(stream2)
  .on("error", function() {});
在前面的鏈式調(diào)用中,我們將只捕獲來自stream2的錯誤,這是由于我們給其添加了erorr事件偵聽器。這意味著,如果我們想捕獲從stream1生成的任何錯誤,我們必須直接附加另一個錯誤偵聽器。 稍后我們將看到一種可以實現(xiàn)共同錯誤捕獲的另一種模式(合并Streams)。 此外,我們應(yīng)該注意到,如果目標Streams(讀取的Streams)發(fā)出錯誤,它將會對源Streams通知一個error,之后導致管道的中斷。
Streams如何通過管道

到目前為止,我們創(chuàng)建自定義Streams的方式并不完全遵循Node定義的模式;實際上,從stream基類繼承是違反small surface area的,并需要一些示例代碼。 這并不意味著Streams設(shè)計得不好,實際上,我們不應(yīng)該忘記,因為StreamsNode.js核心的一部分,所以它們必須盡可能地靈活,廣泛拓展Streams以致于用戶級模塊能夠?qū)⑺鼈兂浞诌\用。

然而,大多數(shù)情況下,我們并不需要原型繼承可以給予的所有權(quán)力和可擴展性,但通常我們想要的僅僅是定義新Streams的一種快速開發(fā)的模式。Node.js社區(qū)當然也為此創(chuàng)建了一個解決方案。 一個完美的例子是through2,一個使得我們可以簡單地創(chuàng)建轉(zhuǎn)換的Streams的小型庫。 通過through2,我們可以通過調(diào)用一個簡單的函數(shù)來創(chuàng)建一個新的可轉(zhuǎn)換的Streams

const transform = through2([options], [_transform], [_flush]);

類似的,from2也允許我們像下面這樣創(chuàng)建一個可讀的Streams

const readable = from2([options], _read);

接下來,我們將在本章其余部分展示它們的用法,那時,我們會清楚使用這些小型庫的好處。

through和from是基于Stream1規(guī)范的頂層庫。
基于Streams的異步控制流

通過我們已經(jīng)介紹的例子,應(yīng)該清楚的是,Streams不僅可以用來處理I / O,而且可以用作處理任何類型數(shù)據(jù)的優(yōu)雅編程模式。 但優(yōu)點并不止這些;還可以利用Streams來實現(xiàn)異步控制流,在本節(jié)將會看到。

順序執(zhí)行

默認情況下,Streams將按順序處理數(shù)據(jù);例如,轉(zhuǎn)換的Streams_transform()函數(shù)在前一個數(shù)據(jù)塊執(zhí)行callback()之后才會進行下一塊數(shù)據(jù)塊的調(diào)用。這是Streams的一個重要屬性,按正確順序處理每個數(shù)據(jù)塊至關(guān)重要,但是也可以利用這一屬性將Streams實現(xiàn)優(yōu)雅的傳統(tǒng)控制流模式。

代碼總是比太多的解釋要好得多,所以讓我們來演示一下如何使用流來按順序執(zhí)行異步任務(wù)的例子。讓我們創(chuàng)建一個函數(shù)來連接一組接收到的文件作為輸入,確保遵守提供的順序。我們創(chuàng)建一個名為concatFiles.js的新模塊,并從其依賴開始:

const fromArray = require("from2-array");
const through = require("through2");
const fs = require("fs");

我們將使用through2來簡化轉(zhuǎn)換的Streams的創(chuàng)建,并使用from2-array從一個對象數(shù)組中創(chuàng)建可讀的Streams
接下來,我們可以定義concatFiles()函數(shù):

function concatFiles(destination, files, callback) {
  const destStream = fs.createWriteStream(destination);
  fromArray.obj(files)             //[1]
    .pipe(through.obj((file, enc, done) => {   //[2]
      const src = fs.createReadStream(file);
      src.pipe(destStream, {end: false});
      src.on("end", done); //[3]
    }))
    .on("finish", () => {         //[4]
      destStream.end();
      callback();
    });
}

module.exports = concatFiles;

前面的函數(shù)通過將files數(shù)組轉(zhuǎn)換為Streams來實現(xiàn)對files數(shù)組的順序迭代。 該函數(shù)所遵循的程序解釋如下:

首先,我們使用from2-arrayfiles數(shù)組創(chuàng)建一個可讀的Streams

接下來,我們使用through來創(chuàng)建一個轉(zhuǎn)換的Streams來處理序列中的每個文件。對于每個文件,我們創(chuàng)建一個可讀的Streams,并通過管道將其輸入到表示輸出文件的destStream中。 在源文件完成讀取后,通過在pipe()方法的第二個參數(shù)中指定{end:false},我們確保不關(guān)閉destStream

當源文件的所有內(nèi)容都被傳送到destStream時,我們調(diào)用through.obj公開的done函數(shù)來傳遞當前處理已經(jīng)完成,在我們的情況下這是需要觸發(fā)處理下一個文件。

所有文件處理完后,finish事件被觸發(fā)。我們最后可以結(jié)束destStream并調(diào)用concatFiles()callback()函數(shù),這個函數(shù)表示整個操作的完成。

我們現(xiàn)在可以嘗試使用我們剛剛創(chuàng)建的小模塊。讓我們創(chuàng)建一個名為concat.js的新文件來完成一個示例:

const concatFiles = require("./concatFiles");

concatFiles(process.argv[2], process.argv.slice(3), () => {
  console.log("Files concatenated successfully");
});

我們現(xiàn)在可以運行上述程序,將目標文件作為第一個命令行參數(shù),接著是要連接的文件列表,例如:

node concat allTogether.txt file1.txt file2.txt

執(zhí)行這一條命令,會創(chuàng)建一個名為allTogether.txt的新文件,其中按順序保存file1.txtfile2.txt的內(nèi)容。

使用concatFiles()函數(shù),我們能夠僅使用Streams實現(xiàn)異步操作的順序執(zhí)行。正如我們在Chapter3 Asynchronous Control Flow Patters with Callbacks中看到的那樣,如果使用純JavaScript實現(xiàn),或者使用async等外部庫,則需要使用或?qū)崿F(xiàn)迭代器。我們現(xiàn)在提供了另外一個可以達到同樣效果的方法,正如我們所看到的,它的實現(xiàn)方式非常優(yōu)雅且可讀性高。

模式:使用Streams或Streams的組合,可以輕松地按順序遍歷一組異步任務(wù)。
無序并行執(zhí)行

我們剛剛看到Streams按順序處理每個數(shù)據(jù)塊,但有時這可能并不能這么做,因為這樣并沒有充分利用Node.js的并發(fā)性。如果我們必須對每個數(shù)據(jù)塊執(zhí)行一個緩慢的異步操作,那么并行化執(zhí)行這一組異步任務(wù)完全是有必要的。當然,只有在每個數(shù)據(jù)塊之間沒有關(guān)系的情況下才能應(yīng)用這種模式,這些數(shù)據(jù)塊可能經(jīng)常發(fā)生在對象模式的Streams中,但是對于二進制模式的Streams很少使用無序的并行執(zhí)行。

注意:當處理數(shù)據(jù)的順序很重要時,不能使用無序并行執(zhí)行的Streams。

為了并行化一個可轉(zhuǎn)換的Streams的執(zhí)行,我們可以運用Chapter3 Asynchronous Control Flow Patters with Callbacks所講到的無序并行執(zhí)行的相同模式,然后做出一些改變使它們適用于Streams。讓我們看看這是如何更改的。

實現(xiàn)一個無序并行的Streams

讓我們用一個例子直接說明:我們創(chuàng)建一個叫做parallelStream.js的模塊,然后自定義一個普通的可轉(zhuǎn)換的Streams,然后給出一系列可轉(zhuǎn)換流的方法:

const stream = require("stream");

class ParallelStream extends stream.Transform {
  constructor(userTransform) {
    super({objectMode: true});
    this.userTransform = userTransform;
    this.running = 0;
    this.terminateCallback = null;
  }

  _transform(chunk, enc, done) {
    this.running++;
    this.userTransform(chunk, enc, this._onComplete.bind(this), this.push.bind(this));
    done();
  }

  _flush(done) {
    if(this.running > 0) {
      this.terminateCallback = done;
    } else {
      done();
    }
  }

  _onComplete(err) {
    this.running--;
    if(err) {
      return this.emit("error", err);
    }
    if(this.running === 0) {
      this.terminateCallback && this.terminateCallback();
    }
  }
}

module.exports = ParallelStream;

我們來分析一下這個新的自定義的類。正如你所看到的一樣,構(gòu)造函數(shù)接受一個userTransform()函數(shù)作為參數(shù),然后將其另存為一個實例變量;我們也調(diào)用父構(gòu)造函數(shù),并且我們默認啟用對象模式。

接下來,來看_transform()方法,在這個方法中,我們執(zhí)行userTransform()函數(shù),然后增加當前正在運行的任務(wù)個數(shù); 最后,我們通過調(diào)用done()來通知當前轉(zhuǎn)換步驟已經(jīng)完成。_transform()方法展示了如何并行處理另一項任務(wù)。我們不用等待userTransform()方法執(zhí)行完畢再調(diào)用done()。 相反,我們立即執(zhí)行done()方法。另一方面,我們提供了一個特殊的回調(diào)函數(shù)給userTransform()方法,這就是this._onComplete()方法;以便我們在userTransform()完成的時候收到通知。

Streams終止之前,會調(diào)用_flush()方法,所以如果仍有任務(wù)正在運行,我們可以通過不立即調(diào)用done()回調(diào)函數(shù)來延遲finish事件的觸發(fā)。相反,我們將其分配給this.terminateCallback變量。為了理解Streams如何正確終止,來看_onComplete()方法。

在每組異步任務(wù)最終完成時,_onComplete()方法會被調(diào)用。首先,它會檢查是否有任務(wù)正在運行,如果沒有,則調(diào)用this.terminateCallback()函數(shù),這將導致Streams結(jié)束,觸發(fā)_flush()方法的finish事件。

利用剛剛構(gòu)建的ParallelStream類可以輕松地創(chuàng)建一個無序并行執(zhí)行的可轉(zhuǎn)換的Streams實例,但是有個注意:它不會保留項目接收的順序。實際上,異步操作可以在任何時候都有可能完成并推送數(shù)據(jù),而跟它們開始的時刻并沒有必然的聯(lián)系。因此我們知道,對于二進制模式的Streams并不適用,因為二進制的Streams對順序要求較高。

實現(xiàn)一個URL監(jiān)控應(yīng)用程序

現(xiàn)在,讓我們使用ParallelStream模塊實現(xiàn)一個具體的例子。讓我們想象以下我們想要構(gòu)建一個簡單的服務(wù)來監(jiān)控一個大URL列表的狀態(tài),讓我們想象以下,所有的這些URL包含在一個多帶帶的文件中,并且每一個URL占據(jù)一個空行。

Streams能夠為這個場景提供一個高效且優(yōu)雅的解決方案。特別是當我們使用我們剛剛寫的ParallelStream類來無序地審核這些URL

接下來,讓我們創(chuàng)建一個簡單的放在checkUrls.js模塊的應(yīng)用程序。

const fs = require("fs");
const split = require("split");
const request = require("request");
const ParallelStream = require("./parallelStream");

fs.createReadStream(process.argv[2])         //[1]
  .pipe(split())                             //[2]
  .pipe(new ParallelStream((url, enc, done, push) => {     //[3]
    if(!url) return done();
    request.head(url, (err, response) => {
      push(url + " is " + (err ? "down" : "up") + "
");
      done();
    });
  }))
  .pipe(fs.createWriteStream("results.txt"))   //[4]
  .on("finish", () => console.log("All urls were checked"))
;

正如我們所看到的,通過流,我們的代碼看起來非常優(yōu)雅,直觀。 讓我們看看它是如何工作的:

首先,我們通過給定的文件參數(shù)創(chuàng)建一個可讀的Streams,便于接下來讀取文件。

我們通過split將輸入的文件的Streams的內(nèi)容輸出一個可轉(zhuǎn)換的Streams到管道中,并且分隔了數(shù)據(jù)塊的每一行。

然后,是時候使用我們的ParallelStream來檢查URL了,我們發(fā)送一個HEAD請求然后等待請求的response。當請求返回時,我們把請求的結(jié)果pushstream中。

最后,通過管道把結(jié)果保存到results.txt文件中。

node checkUrls urlList.txt

這里的文件urlList.txt包含一組URL,例如:

http://www.mariocasciaro.me/

http://loige.co/

http://thiswillbedownforsure.com/

當應(yīng)用執(zhí)行完成后,我們可以看到一個文件results.txt被創(chuàng)建,里面包含有操作的結(jié)果,例如:

http://thiswillbedownforsure.com is down

http://loige.co is up

http://www.mariocasciaro.me is up

輸出的結(jié)果的順序很有可能與輸入文件中指定URL的順序不同。這是Streams無序并行執(zhí)行任務(wù)的明顯特征。

出于好奇,我們可能想嘗試用一個正常的through2流替換ParallelStream,并比較兩者的行為和性能(你可能想這樣做的一個練習)。我們將會看到,使用through2的方式會比較慢,因為每個URL都將按順序進行檢查,而且文件results.txt中結(jié)果的順序也會被保留。
無序限制并行執(zhí)行

如果運行包含數(shù)千或數(shù)百萬個URL的文件的checkUrls應(yīng)用程序,我們肯定會遇到麻煩。我們的應(yīng)用程序?qū)⑼瑫r創(chuàng)建不受控制的連接數(shù)量,并行發(fā)送大量數(shù)據(jù),并可能破壞應(yīng)用程序的穩(wěn)定性和整個系統(tǒng)的可用性。我們已經(jīng)知道,控制負載的無序限制并行執(zhí)行是一個極好的解決方案。

讓我們通過創(chuàng)建一個limitedParallelStream.js模塊來看看它是如何工作的,這個模塊是改編自上一節(jié)中創(chuàng)建的parallelStream.js模塊。

讓我們看看它的構(gòu)造函數(shù):

class LimitedParallelStream extends stream.Transform {
  constructor(concurrency, userTransform) {
    super({objectMode: true});
    this.concurrency = concurrency;
    this.userTransform = userTransform;
    this.running = 0;
    this.terminateCallback = null;
    this.continueCallback = null;
  }
// ...
}

我們需要一個concurrency變量作為輸入來限制并發(fā)量,這次我們要保存兩個回調(diào)函數(shù),continueCallback用于任何掛起的_transform方法,terminateCallback用于_flush方法的回調(diào)。
接下來看_transform()方法:

_transform(chunk, enc, done) {
  this.running++;
  this.userTransform(chunk, enc,  this.push.bind(this), this._onComplete.bind(this));
  if(this.running < this.concurrency) {
    done();
  } else {
    this.continueCallback = done;
  }
}

這次在_transform()方法中,我們必須在調(diào)用done()之前檢查是否達到了最大并行數(shù)量的限制,如果沒有達到了限制,才能觸發(fā)下一個項目的處理。如果我們已經(jīng)達到最大并行數(shù)量的限制,我們可以簡單地將done()回調(diào)保存到continueCallback變量中,以便在任務(wù)完成后立即調(diào)用它。

_flush()方法與ParallelStream類保持完全一樣,所以我們直接轉(zhuǎn)到實現(xiàn)_onComplete()方法:

_onComplete(err) {
  this.running--;
  if(err) {
    return this.emit("error", err);
  }
  const tmpCallback = this.continueCallback;
  this.continueCallback = null;
  tmpCallback && tmpCallback();
  if(this.running === 0) {
    this.terminateCallback && this.terminateCallback();
  }
}

每當任務(wù)完成,我們調(diào)用任何已保存的continueCallback()將導致
stream解鎖,觸發(fā)下一個項目的處理。

這就是limitedParallelStream模塊。 我們現(xiàn)在可以在checkUrls模塊中使用它來代替parallelStream,并且將我們的任務(wù)的并發(fā)限制在我們設(shè)置的值上。

順序并行執(zhí)行

我們以前創(chuàng)建的并行Streams可能會使得數(shù)據(jù)的順序混亂,但是在某些情況下這是不可接受的。有時,實際上,有那種需要每個數(shù)據(jù)塊都以接收到的相同順序發(fā)出的業(yè)務(wù)場景。我們?nèi)匀豢梢圆⑿羞\行transform函數(shù)。我們所要做的就是對每個任務(wù)發(fā)出的數(shù)據(jù)進行排序,使其遵循與接收數(shù)據(jù)相同的順序。

這種技術(shù)涉及使用buffer,在每個正在運行的任務(wù)發(fā)出時重新排序塊。為簡潔起見,我們不打算提供這樣一個stream的實現(xiàn),因為這本書的范圍是相當冗長的;我們要做的就是重用為了這個特定目的而構(gòu)建的npm上的一個可用包,例如through2-parallel。

我們可以通過修改現(xiàn)有的checkUrls模塊來快速檢查一個有序的并行執(zhí)行的行為。 假設(shè)我們希望我們的結(jié)果按照與輸入文件中的URL相同的順序編寫。 我們可以使用通過through2-parallel來實現(xiàn):

const fs = require("fs");
const split = require("split");
const request = require("request");
const throughParallel = require("through2-parallel");

fs.createReadStream(process.argv[2])
  .pipe(split())
  .pipe(throughParallel.obj({concurrency: 2}, function (url, enc, done) {
    if(!url) return done();
    request.head(url, (err, response) => {
      this.push(url + " is " + (err ? "down" : "up") + "
");
      done();
    });
  }))
  .pipe(fs.createWriteStream("results.txt"))
  .on("finish", () => console.log("All urls were checked"))
;

正如我們所看到的,through2-parallel的接口與through2的接口非常相似;唯一的不同是在through2-parallel還可以為我們提供的transform函數(shù)指定一個并發(fā)限制。如果我們嘗試運行這個新版本的checkUrls,我們會看到results.txt文件列出結(jié)果的順序與輸入文件中
URLs的出現(xiàn)順序是一樣的。

通過這個,我們總結(jié)了使用Streams實現(xiàn)異步控制流的分析;接下來,我們研究管道模式。

管道模式

就像在現(xiàn)實生活中一樣,Node.jsStreams也可以按照不同的模式進行管道連接。事實上,我們可以將兩個不同的Streams合并成一個Streams,將一個Streams分成兩個或更多的管道,或者根據(jù)條件重定向流。 在本節(jié)中,我們將探討可應(yīng)用于Node.jsStreams最重要的管道技術(shù)。

組合的Streams

在本章中,我們強調(diào)Streams提供了一個簡單的基礎(chǔ)結(jié)構(gòu)來模塊化和重用我們的代碼,但是卻漏掉了一個重要的部分:如果我們想要模塊化和重用整個流水線?如果我們想要合并多個Streams,使它們看起來像外部的Streams,那該怎么辦?下圖顯示了這是什么意思:

從上圖中,我們看到了如何組合幾個流的了:

當我們寫入組合的Streams的時候,實際上我們是寫入組合的Streams的第一個單元,即StreamA

當我們從組合的Streams中讀取信息時,實際上我們從組合的Streams的最后一個單元中讀取。

一個組合的Streams通常是一個多重的Streams,通過連接第一個單元的寫入端和連接最后一個單元的讀取端。

要從兩個不同的Streams(一個可讀的Streams和一個可寫入的Streams)中創(chuàng)建一個多重的Streams,我們可以使用一個npm模塊,例如duplexer2。

但上述這么做并不完整。實際上,組合的Streams還應(yīng)該做到捕獲到管道中任意一段Streams單元產(chǎn)生的錯誤。我們已經(jīng)說過,任何錯誤都不會自動傳播到管道中。 所以,我們必須有適當?shù)腻e誤管理,我們將不得不顯式附加一個錯誤監(jiān)聽器到每個Streams。但是,組合的Streams實際上是一個黑盒,這意味著我們無法訪問管道中間的任何單元,所以對于管道中任意單元的異常捕獲,組合的Streams也充當聚合器的角色。

總而言之,組合的Streams具有兩個主要優(yōu)點:

管道內(nèi)部是一個黑盒,對使用者不可見。

簡化了錯誤管理,因為我們不必為管道中的每個單元附加一個錯誤偵聽器,而只需要給組合的Streams自身附加上就可以了。

組合的Streams是一個非常通用和普遍的做法,所以如果我們沒有任何特殊的需要,我

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/90594.html

相關(guān)文章

  • Node.js知識點詳解(三)緩沖與模塊

    摘要:緩沖模塊起初就是為瀏覽器而設(shè)計的,所以能很好的處理編碼的字符串,但不能很好的處理二進制數(shù)據(jù)。有如下三個主要的流標準輸入標準輸出標準錯誤可讀流如果說,緩沖區(qū)是處理原始數(shù)據(jù)的方式的話,那么流通常是移動數(shù)據(jù)的方式。該方法讓可讀流繼續(xù)觸發(fā)事件。 緩沖(buffer)模塊 js起初就是為瀏覽器而設(shè)計的,所以能很好的處理unicode編碼的字符串,但不能很好的處理二進制數(shù)據(jù)。這是Node.js的...

    plus2047 評論0 收藏0
  • Node.js學習總結(jié)

    摘要:表示當前正在執(zhí)行的腳本的文件名。默認編碼為模式為,為回調(diào)函數(shù),回調(diào)函數(shù)只包含錯誤信息參數(shù),在寫入失敗時返回。參數(shù)使用說明如下通過方法返回的文件描述符。 Node.js回調(diào) Node.js異步編程的直接體現(xiàn)就是回調(diào)。 阻塞代碼: const fs = require(fs); let data = fs.readFileSync(input.txt); console.log(data...

    kamushin233 評論0 收藏0
  • Node.js學習之路18——壓縮與解壓

    摘要:壓縮與解壓縮處理在中,可以使用模塊進行壓縮及解壓縮處理創(chuàng)建各種用于壓縮及解壓縮的對象方法說明該方法創(chuàng)建并返回一個對象該對象使用算法對數(shù)據(jù)進行壓縮處理該方法創(chuàng)建并返回一個對象該對象使用算法對數(shù)據(jù)進行壓縮處理該方法創(chuàng)建并返回一個對象該對象使用算 壓縮與解壓縮處理 在Node.js中,可以使用zlib模塊進行壓縮及解壓縮處理. 1. 創(chuàng)建各種用于壓縮及解壓縮的對象 方法 說明 zl...

    tigerZH 評論0 收藏0
  • node事件循環(huán) EventEmitter 異步I/O Buffer緩沖區(qū) 模塊

    摘要:一個匿名函數(shù),執(zhí)行,事件全部完成,執(zhí)行最后一句,程序執(zhí)行完畢。這個事件的監(jiān)聽器為一個匿名函數(shù),事件名稱為,當秒以后被觸發(fā)先對象發(fā)送一個事件觸發(fā)了匿名函數(shù)即監(jiān)聽器,監(jiān)聽器被執(zhí)行。 node.js事件循環(huán) node.js單進程,單線程的程序每一個api都支持回調(diào)所有的事件機制都是設(shè)計模式中的 一共是23種設(shè)計模式 http://design-patterns.readth...一個對象發(fā)生...

    SexySix 評論0 收藏0
  • Node.js 中的緩沖區(qū)(Buffer)究竟是什么?

    摘要:在創(chuàng)建時大小已經(jīng)被確定且是無法調(diào)整的,在內(nèi)存分配這塊是由層面提供而不是具體后面會講解。在這里不知道你是否認為這是很簡單的但是上面提到的一些關(guān)鍵詞二進制流緩沖區(qū),這些又都是什么呢下面嘗試做一些簡單的介紹。 showImg(https://segmentfault.com/img/remote/1460000019894717?w=1280&h=850); 多數(shù)人都擁有自己不了解的能力和機...

    scwang90 評論0 收藏0

發(fā)表評論

0條評論

xinhaip

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<