摘要:開始讀取位置結束讀取位置包括結束位置如果為,則文件描述符不會被關閉,即使有錯誤。需要程序負責關閉它,并且確保沒有文件描述符泄漏。
流的定義
流是抽象化的概念,形象生動的描述了數據的流動、變化。
具體來說,在node中流是處理數據的抽象接口,繼承了EventEmitter,通過這個接口我們能夠控制流的開關,流動的方向等等。
比較形象直觀一點類似我們在linux上使用shell,通過管道,鏈接處理各個部分,下面是我寫的一個命令,篩選出version并導出到文件中。
Readable(可讀流)
Writable(可寫流)
Duplex(可讀可寫的流)
Transform(在讀寫過程中可以修改和變化的Duplex流)
流按照功能大致劃分為以上四類,具體應用的話有很多場景,如下圖所示(來源:參考鏈接2)
下面我根據流的分類,列舉一些demo應用實例
可讀流能接受各種數據源,例如控制臺的輸入,文件,字符串等等,就如介紹中所說是抽象接口,可以面向各種形式的輸入,下面舉幾個例子。
文件流require("fs").createReadStream("./1.txt",{ encoding: "utf8" }).on("data",(data) => { console.log(data) }) // 輸出 hello jsdt
說明 為什么要用流來讀取,直接用fs.readFile豈不是更方便嗎,因為readFile是整體操作,會將文件全部讀到內存中在做處理,這樣的話文件如果很大,程序就會很卡,甚至報錯。
標準輸入流process.stdin.setEncoding("utf8"); process.stdin.on("data",(data) => { console.log("輸出: "+ data) }) node 運行code,然后輸入 hello jsdt 輸出: hello jsdt
說明 這個做acm的時候會用到,或者平時自己寫一些交互式應用的時候
普通數據流let {Readable} = require("stream") let util = require("util") class Test extends Readable{ constructor(){ super() this.dataSource = 5 } _read(){ if(this.dataSource-->0){ this.push(this.dataSource+""); }else{ this.push(null); } } } let counter = new Test(); counter.on("data",function(data){ console.log(data.toString()) }); 輸出: 4 3 2 1
說明 重寫_read方法,自定義輸入的邏輯,上面示例中是自己邏輯中產生的一個數據源。
Writable 文件流let dataSource = "hello jsdt",i = 0; (function(){ let ws = require("fs").createWriteStream("./1.txt",{ encoding: "utf8" }) let flag = true; while(flag && i自定義輸出
說明 閉包自執行,通過流將數據寫入到文件中,上面是輸出結果。let {Writable} = require("stream") let arr = [] let ws = Writable({ write(chunk,encoding,cb){ arr.push(chunk) cb() } }) for(let i = 1; i<= 3;i++){ ws.write(""+i,"utf8",()=>{}) } process.nextTick(function () { console.log(arr.toString()) }) // 輸出 1,2,3說明 上面重寫了流的write方法,可以自定義寫邏輯
Duplexrequire("net").createServer(socket => { socket.on("data",data => { console.log("client message " + data); socket.write("server message " + "hello client "); }) }).listen(8080,() =>{})Transform
說明 作為可寫流一面socket可以向客戶端發送信息,做為可讀流一面可以監聽data事件,收到客戶端發送過信息let t = require("stream").Transform({ transform(chunk,encoding,cb){ this.push(chunk.toString().toUpperCase()); cb(); } }); process.stdin.pipe(t).pipe(process.stdout); // 輸入abc // 輸出ABC說明 上面使用轉換流,實現了terminal上小寫輸入,對應大寫輸出的功能
流中數據分類二進制模式
對象模式
在創建流的時候可以指定配置,objectMode默認為false,設為true切換到對象模式。二進制即buffer模式,可讀或可寫流都會將數據會緩存數據在buffer中。
流的剖析通過上面的介紹我們明確了流的定義,并按照功能對流進行了分類,下面我進行下剖析,總的來說流的各種形態間轉化傳輸底層都是二進制,具體到使用形態上有buffer,string等等。
首先詳細說下可讀流,可讀流有兩種模式,默認為paused模式。flowing 按照初始化配置,自動讀取數據,并通過觀察者模式,直接將數據提供給訂閱者
paused 顯式調用流的read方法讀取數據
其中如果我們想切換到流動模式可以通過監聽data事件的方式、或者調用stream.resume()、stream.pipe() 這些方法。
可讀流源碼分析// 可讀流入口,根據配置返回一個可讀流 fs.createReadStream = function(path, options) { return new ReadStream(path, options); }; // 實現原理是ReadStream.prototype.__proto__ = Readable.prototype,可以繼承Readable上的一些方法 util.inherits(ReadStream, Readable); fs.ReadStream = ReadStream; function ReadStream(path, options) { // 非new方式調用,直接返回一個實例 if (!(this instanceof ReadStream)) return new ReadStream(path, options); options = copyObject(getOptions(options, {})); if (options.highWaterMark === undefined) // highWaterMark默認值為64k,設置了flow模式下緩沖區的大小 options.highWaterMark = 64 * 1024; Readable.call(this, options); handleError((this.path = getPathFromURL(path))); // 文件描述符,根據這個句柄找到文件 this.fd = options.fd === undefined ? null : options.fd; // flags打開文件要做的操作,默認為"r" this.flags = options.flags === undefined ? "r" : options.flags; // 用于設置文件模式(權限和粘結位),僅限創建文件時。 this.mode = options.mode === undefined ? 0o666 : options.mode; // 開始讀取位置 this.start = options.start; // 結束讀取位置(?。。“ńY束位置) this.end = options.end; /** * 如果 autoClose 為 false,則文件描述符不會被關閉,即使有錯誤。 * 需要程序負責關閉它,并且確保沒有文件描述符泄漏。 * 如果 autoClose 被設置為 true(默認),則在 error 或 end 時,文件描述符會被自動關閉 */ this.autoClose = options.autoClose === undefined ? true : options.autoClose; this.pos = this.start; } // 適合傳入句柄的情況,例如fd: 0,這樣就不是文件,而是控制臺輸入的數據了 if (typeof this.fd !== "number") this.open(); this.on("end", function() { if (this.autoClose) { this.destroy(); } }); } // 打開文件,并觸發open事件,只有打開了才能讀取,所以在回調中觸發open事件,看下步操作 ReadStream.prototype.open = function() { var self = this; fs.open(this.path, this.flags, this.mode, function(er, fd) { self.fd = fd; self.emit("open", fd); // start the flow of data. self.read(); }); }; Readable.prototype.read = function(n) { // 當read(0)時,如果緩存中已有數據,則觸發readable事件,相當于刷新下緩存。否則觸發end事件 if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) { if (state.length === 0 && state.ended) endReadable(this); else emitReadable(this); return null; } // 若可讀流已經被傳入了終止符(null),且緩沖中沒有遺留數據,則結束這個可讀流 if (n === 0 && state.ended) { if (state.length === 0) endReadable(this); return null; } // 若目前緩沖中的數據大小為空,或未超過設置的警戒線,則進行一次數據讀取。 if (state.length === 0 || state.length - n < state.highWaterMark) { doRead = true; } if (state.ended || state.reading) { doRead = false; } else if (doRead) { state.reading = true; state.sync = true; this._read(state.highWaterMark); } } ReadStream.prototype._read = function(n) { if (typeof this.fd !== "number") { // 防止重復綁定open事件,當文件打開且emit open事件,此時才會進行真正的讀操作 return this.once("open", function() { this._read(n); }); } // 然后讀數據的時候會計算實際讀的數量 function howMuchToRead(n, state) { // 如果讀的數量超過highWaterMark,則重新計算highWaterMark if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n); if (n <= state.length) return n; } // 經過上面一系列的準備工作,下面開始真正的讀操作咯 fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => { if (bytesRead > 0) { this.bytesRead += bytesRead; } this.push(b); }); }; // 上面整個過程是paused的流程,其中flow模式又有所不同,如下所示 // 如果監聽了data事件,則會調用this.resume(),開始流動模式 Readable.prototype.on = function(ev, fn) { const res = Stream.prototype.on.call(this, ev, fn); if (ev === "data") { // Start flowing on next tick if stream isn"t explicitly paused if (this._readableState.flowing !== false) this.resume(); } } // flow模式下 流內部自動觸發data事件,循環讀取數據 function flow(stream) { const state = stream._readableState; debug("flow", state.flowing); while (state.flowing && stream.read() !== null); } // 然后觸發 data事件,循環發射數據 stream.emit("data", chunk);總結 上面是可讀流的源碼分析,摘要了關鍵部分,下面在梳理一下,當通過ReadStream創建一個流的時候,默認會觸發readable事件,進入暫停模式,此時內部維護的有一個緩沖區,在readable事件回調邏輯中進行read操作,首先會通過howMuchToRead方法計算實際讀取的數量,如果現有數據小于highWaterMark,內部會進行this._read(state.highWaterMark)操作,其回調中會進行push操作,push在調用readableAddChunk將數據放到內部維護的緩存中,反之則從fromList中讀取緩存中的數據,然后返回。而如果監聽了data事件,代碼中所示會調用this.resume(),將流狀態設置為flowing模式,然后resume()->resume_()->flow()的調用順序執行flow方法循環讀取數據,觸發data事件,完成數據的自動讀取,然后發射給調用者,會不停的循環整個過程。上面比較值的注意一點的就是flow模式和paused模式區別,如果是flow模式在addChunk的時候,如下所示
function addChunk(stream, state, chunk, addToFront) { if (state.flowing && state.length === 0 && !state.sync) { stream.emit("data", chunk); stream.read(0); } }會自動發射數據,不會走緩存,而paused模式會走一遍內部的緩存機制。
根據上面node源碼的分析過程,下面圖形化描述下整個流程。自己實現的一個可讀流
可寫流源碼分析// 1:首先第一步根據createWriteStream傳入參數進行初始化 // 2:調用寫操作 Writable.prototype.write = function(chunk, encoding, cb) { if (state.ended) //在end繼續寫入會emit一個error事件 writeAfterEnd(this, cb); else if (validChunk(this, state, chunk, cb)) { //在校驗數據chunk合法的情況下才會進行后續的寫邏輯 state.pendingcb++; ret = writeOrBuffer(this, state, chunk, encoding, cb); } return ret; }; function writeOrBuffer(stream, state, chunk, encoding, cb) { chunk = decodeChunk(state, chunk, encoding); if (chunk instanceof Buffer) encoding = "buffer"; var len = state.objectMode ? 1 : chunk.length; state.length += len;//實時更新緩沖區長度 var ret = state.length < state.highWaterMark;//判斷緩存區是否超過水位線(highWaterMark,不傳默認16k,源碼_stream_writeable.js--40行)設置 if (!ret) state.needDrain = true; if (state.writing || state.corked) { //如果此時處于寫狀態,將新添加的數據放到緩沖池鏈表尾部 var last = state.lastBufferedRequest; state.lastBufferedRequest = new WriteReq(chunk, encoding, cb); if (last) { last.next = state.lastBufferedRequest; } else { state.bufferedRequest = state.lastBufferedRequest; } state.bufferedRequestCount += 1; } else { //寫入數據 doWrite(stream, state, false, len, chunk, encoding, cb); } return ret; } function doWrite(stream, state, writev, len, chunk, encoding, cb) { if (writev) //一次寫入多個數據塊 stream._writev(chunk, state.onwrite); else //一次寫入一個數據塊 stream._write(chunk, encoding, state.onwrite); state.sync = false; } function onwrite(stream, er) { if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) { //清空緩沖池 ,不為空,則循環執行 _write() 寫入單個數據塊 clearBuffer(stream, state); } } } function clearBuffer(stream, state) { // 單個數據寫入 while (entry) { var chunk = entry.chunk; var encoding = entry.encoding; var cb = entry.callback; var len = state.objectMode ? 1 : chunk.length; //開啟數據寫操作 doWrite(stream, state, false, len, chunk, encoding, cb); entry = entry.next; } }總結 上面是可寫流源碼分析,摘要了關鍵流程,首先根據傳入參數進行初始化配置,然后用戶調用write方法進行寫入,寫入前會判斷一下是否超過水位線,超過觸發drain事件,返回false,注意一點此時仍可以進行寫入,返回false只是告訴你,已經滿了,后需要不要寫入還是靠用戶根據這個返回值來控制。如果沒超過,在寫之前會先判斷是否處于寫狀態,是的話將數據放到緩存中,反之會進行doWrite <-->clearBuffer這樣的循環操作,一直到數據緩存中數據消耗完為止。清理完了之后,后續調用write的返回值ret為false,從而繼續寫,一直循環前面描述的整個過程,直到數據源寫完為止。總的來說,因為可寫流內部只有一個狀態,復雜度低于可讀流,整個過程還是比較清晰的,不在圖形化流程。
自己實現的一個可寫流
說明
node源碼分析版本基于v8.9.4
參考資料
http://nodejs.cn/api/
https://medium.freecodecamp.o...
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/107173.html
摘要:通過執行和可以向訂閱者推送不同的通知。之后,執行過程可能被處理掉。當調用并得到觀察者時,在中傳入的函數將會被執行。每次執行都會觸發一個單獨針對當前的運行邏輯。通知不發出任何值,表示流的結束。 原文:http://reactivex.io/rxjs/manu... Rx.Observalbe.create()或者創建操作符,可以 創建(created) Observable流。Obser...
摘要:一面試題及剖析今日面試題今天壹哥帶各位復習一塊可能會令初學者比較頭疼的內容,起碼當時讓我很有些頭疼的內容,那就是流。在這里壹哥會從兩部分展開介紹流,即與流。除此之外盡量使用字節流。關閉此輸入流并釋放與流相關聯的任何系統資源。 一. 面試題及剖析 1. 今日面試題 今天 壹哥 帶各位復習一塊可...
摘要:內部迭代與使用迭代器顯式迭代的集合不同,流的迭代操作是在背后進行的。流只能遍歷一次請注意,和迭代器類似,流只能遍歷一次。 流(Stream) 流是什么 流是Java API的新成員,它允許你以聲明性方式處理數據集合(通過查詢語句來表達,而不是臨時編寫一個實現)。就現在來說,你可以把它們看成遍歷數據集的高級迭代器。此外,流還可以透明地并行處理,你無需寫任何多線程代碼了!我會在后面的筆記中...
摘要:為了處理請求流上的錯誤,我們將錯誤記錄到并發送狀態碼以指示,但是,在實際應用程序中,我們需要檢查錯誤以確定正確的狀態碼和消息是什么,與通常的錯誤一樣,你應該查閱錯誤文檔。通過對象發送狀態碼和數據。 HTTP事務的剖析 本指南的目的是讓你充分了解Node.js HTTP處理的過程,我們假設你在一般意義上知道HTTP請求的工作方式,無論語言或編程環境如何,我們還假設你對Node.js Ev...
閱讀 4414·2021-11-19 09:59
閱讀 3329·2021-10-12 10:12
閱讀 2641·2021-09-22 15:25
閱讀 3338·2019-08-30 15:55
閱讀 1192·2019-08-29 11:27
閱讀 1472·2019-08-28 18:06
閱讀 2744·2019-08-26 13:41
閱讀 2562·2019-08-26 13:41