摘要:是消費(fèi)數(shù)據(jù)的,從中獲取數(shù)據(jù),然后對得到的塊數(shù)據(jù)進(jìn)行處理,至于如何處理,就依賴于具體實(shí)現(xiàn)也就是的實(shí)現(xiàn)。也可以說是建立在的基礎(chǔ)上。
1. 認(rèn)識Stream
Stream的概念最早來源于Unix系統(tǒng),其可以將一個(gè)大型系統(tǒng)拆分成一些小的組件,然后將這些小的組件可以很好地運(yùn)行
TCP/IP協(xié)議中的TCP協(xié)議也用到了Stream的思想,進(jìn)而可以進(jìn)行流量控制、差錯控制
在unix中通過 |來表示流;node中通過pipe方法
Stream可以認(rèn)為數(shù)據(jù)就像管道一樣,多次不斷地被傳遞下去,而不是一次性全部傳遞給下游
2. node中的stream在node stream中可以看到第一段的描述:
A stream is an abstract interface implemented by various objects in Node. For example a request to an HTTP server is a stream, as is stdout. Streams are readable, writable, or both. All streams are instances of EventEmitter
對上面一段話進(jìn)行解析,可以得到如下幾點(diǎn):
Stream是Node中一個(gè)非常重要的概念,被大量對象實(shí)現(xiàn),尤其是Node中的I/O操作
Stream是一個(gè)抽像的接口,一般不會直接使用,需要實(shí)現(xiàn)內(nèi)部的某些抽象方法(例如_read、_write、_transform)
Stream是EventEmitter的子類,實(shí)際上Stream的數(shù)據(jù)傳遞內(nèi)部依然是通過事件(data)來實(shí)現(xiàn)的
Stream分為四種:readable、writeable、Duplex、transform
3.Readable Stream 與 Writeable Stream 3.1 二者的關(guān)系Readable Stream是提供數(shù)據(jù)的Stream,外部來源的數(shù)據(jù)均會存儲到內(nèi)部的buffer數(shù)組內(nèi)緩存起來。
writeable Stream是消費(fèi)數(shù)據(jù)的Stream,從readable stream中獲取數(shù)據(jù),然后對得到的chunk塊數(shù)據(jù)進(jìn)行處理,至于如何處理,就依賴于具體實(shí)現(xiàn)(也就是_write的實(shí)現(xiàn))。
首先看看Readdable Stream與writeable stream二者之間的流動關(guān)系:
3.2 pipe的流程解析stream內(nèi)部是如何從readable stream流到writeable stream里面呢?有兩種方法:
a) pipe 連接兩個(gè)stream
先看一個(gè)簡單地demo
var Read = require("stream").Readable; var Write = require("stream").Writable; var r = new Read(); var w = new Write(); r.push("hello "); r.push("world!"); r.push(null) w._write = function (chunk, ev, cb) { console.log(chunk.toString()); cb(); } r.pipe(w);
pipe是一種最簡單直接的方法連接兩個(gè)stream,內(nèi)部實(shí)現(xiàn)了數(shù)據(jù)傳遞的整個(gè)過程,在開發(fā)的時(shí)候不需要關(guān)注內(nèi)部數(shù)據(jù)的流動:
Readable.prototype.pipe = function (dest, pipeOpts) { var src = this; ... src.on("data", ondata); function ondata(chunk) { var ret = dest.write(chunk); if (false === ret) { debug("false write response, pause", src._readableState.awaitDrain); src._readableState.awaitDrain++; src.pause(); } } ... }
b) 事件data + 事件drain聯(lián)合實(shí)現(xiàn)
var Read = require("stream").Readable; var Write = require("stream").Writable; var r = new Read(); var w = new Write(); r.push("hello "); r.push("world!"); r.push(null) w._write = function (chunk, ev, cb) { console.log(chunk.toString()); cb(); } r.on("data", function (chunk) { if (!w.write(chunk)) { r.pause(); } }) w.on("drain", function () { r.resume(); }) // hello // world!4 Readable Stream的模式 4.1 內(nèi)部模式的實(shí)現(xiàn)
Readable Stream 存在兩種模式(flowing mode 與 paused mode),這兩種模式?jīng)Q定了chunk數(shù)據(jù)流動的方式---自動流動還是手工流動。那如何觸發(fā)這兩種模式呢:
flowing mode: 注冊事件data、調(diào)用resume方法、調(diào)用pipe方法
paused mode: 調(diào)用pause方法(沒有pipe方法)、移除data事件 && unpipe所有pipe
讓我們再深入一些,看看里面具體是如何實(shí)現(xiàn)的:
// data事件觸發(fā)flowing mode Readable.prototype.on = function(ev, fn) { ... if (ev === "data" && false !== this._readableState.flowing) { this.resume(); } ... } // resume觸發(fā)flowing mode Readable.prototype.resume = function() { var state = this._readableState; if (!state.flowing) { debug("resume"); state.flowing = true; resume(this, state); } return this; } // pipe方法觸發(fā)flowing模式 Readable.prototype.resume = function() { if (!state.flowing) { this.resume() } }
結(jié)論
兩種方式取決于一個(gè)flowing字段:true --> flowing mode;false --> paused mode
三種方式最后均是通過resume方法,將state.flowing = true
4.2 兩種模式的操作a) paused mode
在paused mode下,需要手動地讀取數(shù)據(jù),并且可以直接指定讀取數(shù)據(jù)的長度:
var Read = require("stream").Readable; var r = new Read(); r.push("hello"); r.push("world"); r.push(null); console.log("輸出結(jié)果為: ", r.read(1).toString()) // 輸出結(jié)果為: "h"
還可以通過監(jiān)聽事件readable,觸發(fā)時(shí)手工讀取chunk數(shù)據(jù):
var Read = require("stream").Readable; var r = new Read(); r.push("hello"); r.push("world"); r.push(null); r.on("readable", function () { var chunk = r.read(); console.log("get data by readable event: ", chunk.toString()) }); // get data by readable event: hello world!
需要注意的是,一旦注冊了readable事件,必須手工讀取read數(shù)據(jù),否則數(shù)據(jù)就會流失,看看內(nèi)部實(shí)現(xiàn):
function emitReadable_(stream) { debug("emit readable"); stream.emit("readable"); flow(stream); } function flow(stream) { var state = stream._readableState; debug("flow", state.flowing); if (state.flowing) { do { var chunk = stream.read(); } while (null !== chunk && state.flowing); } } Readable.prototype.read = function (n) { ... var res = fromList(n, state); if (!util.isNull(ret)) { this.emit("data", ret); } ... }
flow方法直接read數(shù)據(jù),將得到的數(shù)據(jù)通過事件data交付出去,然而此處沒有注冊data事件監(jiān)控,因此,得到的chunk數(shù)據(jù)并沒有交付給任何對象,這樣數(shù)據(jù)就白白流失了,所以在觸發(fā)emit("readable")時(shí),需要提前read數(shù)據(jù)。
b) flowing mode
通過注冊data、pipe、resume可以自動獲取所需要的數(shù)據(jù),看看內(nèi)部實(shí)現(xiàn):
// 事件data方式 var Read = require("stream").Readable; var r = new Read(); r.push("hello "); r.push("world!"); r.push(null) r.on("data", function (chunk) { console.log("chunk :", chunk.toString()) }) // chunk : hello // chunk : world!
// 通過pipe方式 var r = new Read(); r.push("hello "); r.push("world!"); r.push(null) r.pipe(process.stdout) // hello world!
c) 兩種mode的總結(jié)
5. transform stream的實(shí)現(xiàn)用過browserify的人都知道,browserify是一種基于stream的模塊打包工具,里面存在browserify.prototype.transform(tr)方法,其中的tr就要求是transform stream,且browserify內(nèi)部通過through2構(gòu)建了很多tranform stream。也可以說browserify是建立在transform stream的基礎(chǔ)上。那么具備readable、writeablestream的transform stream內(nèi)部是如何工作的呢?
6. 自定義stream自定義stream很簡單,只要實(shí)現(xiàn)相應(yīng)的內(nèi)部待實(shí)現(xiàn)方法就可以了,具體來說:
readable stream: 實(shí)現(xiàn)_read方法來解決數(shù)據(jù)的獲取問題
writeable stream: 實(shí)現(xiàn)_write方法來解決數(shù)據(jù)的去向問題
tranform stream: 實(shí)現(xiàn)_tranform方法來解決數(shù)據(jù)存放在buffer前的轉(zhuǎn)換工作
// 自定義readable stream的實(shí)現(xiàn) var Stream = require("stream"); var Read = Stream.Readable; var util = require("util"); util.inherits(MyReadStream, Read); function MyReadStream(data, opt) { Read.call(this, opt); this.data = data || []; } MyReadStream.prototype._read = function () { var _this = this; this.data.forEach(function (d) { _this.push(d); }) this.push(null); } var data = ["aa", "bb", "cc"]; var r = new MyReadStream(data); r.on("data", function (chunk) { console.log(chunk.toString()); })7. 參考資料
stream-handbook
node-stream
iojs源碼
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/85905.html
摘要:源碼簡介源碼核心部分寥寥行。同時(shí)本身是直接繼承于模塊。寫在末尾閱讀代碼的這一次,是我第一次閱讀這種開源的模塊化項(xiàng)目。深深的被震撼到了,認(rèn)識到了模塊化的巨大力量。就能完成非常復(fù)雜的事情,而不需要凡是親力親為,一行行代碼,一個(gè)個(gè)小問題依次解決。 gulp源碼簡介 gulp源碼核心部分寥寥60+行。但是通過這60+行代碼,gulp給我們帶來的確是前端自動化構(gòu)建的便利。以往以為其源碼肯定蠻復(fù)雜...
摘要:源碼版本構(gòu)造器實(shí)例選項(xiàng)讓我們用一段展示一下這三個(gè)概念其中的構(gòu)造器實(shí)例實(shí)例名可以任意取,這里我們便于理解保持和文檔一致選項(xiàng)即為傳入構(gòu)造器里的配置選項(xiàng)。其實(shí)構(gòu)造器上也綁了不少好用的方法。 源碼版本:2.0.5 構(gòu)造器、實(shí)例、選項(xiàng) 讓我們用一段demo展示一下這三個(gè)概念: //HTML {{ message }} //JS var vm = new Vue({ el: #app,...
摘要:回調(diào)函數(shù)中檢測該次寫入是否被緩沖,若是,觸發(fā)事件。若目標(biāo)可寫流表示該寫入操作需要進(jìn)行緩沖,則立刻將源可讀流切換至?xí)和DJ健1O(jiān)聽源可讀流的事件,相應(yīng)地結(jié)束目標(biāo)可寫流。 在Node.js中,流(Stream)是其眾多原生對象的基類,它對處理潛在的大文件提供了支持,也抽象了一些場景下的數(shù)據(jù)處理和傳遞。在它對外暴露的接口中,最為神奇的,莫過于導(dǎo)流(pipe)方法了。鑒于近期自己正在閱讀Node...
摘要:的源碼僅僅就多行,本質(zhì)上就是對于原生的流進(jìn)行的封裝,先來看下。是一個(gè)雙工流,既可讀,也可寫,但是與還是有著一些區(qū)別,的寫和讀可以說是沒有任何的關(guān)聯(lián),是兩個(gè)緩沖區(qū)和管道互補(bǔ)干擾,而將其輸入和輸出是存在相互關(guān)聯(lián)的,中間做了處理。 寫在前面 through2經(jīng)常被用于處理node的stream,假如使用過gulp的話,對于這個(gè)包一定不會陌生,如: gulp.task(rewrite, () ...
摘要:下面跟蹤代碼到這個(gè)實(shí)現(xiàn)中看看是怎么做的在實(shí)例化的過程中,在構(gòu)造函數(shù)中調(diào)用了其超類的構(gòu)造函數(shù),而在超類中對其所處換環(huán)境進(jìn)行的判斷,所謂的環(huán)境呢,事實(shí)上指得就是是通過,還是通過加載的上下文,這也就意味著不同方式加載可能存在某些不同。 前言 本文基于《Spring源碼深度解析》學(xué)習(xí), 《Spring源碼深度解析》講解的Spring版本低于Spring3.1,當(dāng)前閱讀的版本為Spring5.x...
閱讀 3794·2023-04-25 16:32
閱讀 2194·2021-09-28 09:36
閱讀 2035·2021-09-06 15:02
閱讀 673·2021-09-02 15:21
閱讀 918·2019-08-30 15:56
閱讀 3513·2019-08-30 15:45
閱讀 1708·2019-08-30 13:09
閱讀 379·2019-08-29 16:05