摘要:方法也可以接收一個參數表示數據請求著請求的數據大小,但是可讀流可以根據需要忽略這個參數。讀取數據大部分情況下我們只要簡單的使用方法將可讀流的數據重定向到另外形式的流,但是在某些情況下也許直接從可讀流中讀取數據更有用。
介紹
本文介紹了使用 node.js streams 開發程序的基本方法。
"We should have some ways of connecting programs like garden hose--screw in another segment when it becomes necessary to massage data in another way. This is the way of IO also."
Doug McIlroy. October 11, 1964
最早接觸Stream是從早期的unix開始的
數十年的實踐證明Stream 思想可以很簡單的開發出一些龐大的系統。在unix里,Stream是通過
|實現的;在node中,作為內置的stream模塊,很多核心模塊和三方模塊都使用到。和unix一樣,
node Stream主要的操作也是.pipe(),使用者可以使用反壓力機制來控制讀和寫的平衡。
Stream 可以為開發者提供可以重復使用統一的接口,通過抽象的Stream接口來控制Stream之間的讀寫平衡。
為什么使用Stream
node中的I/O是異步的,因此對磁盤和網絡的讀寫需要通過回調函數來讀取數據,下面是一個文件下載服務器
的簡單代碼:
var http = require("http"); var fs = require("fs"); var server = http.createServer(function (req, res) { fs.readFile(__dirname + "/data.txt", function (err, data) { res.end(data); }); }); server.listen(8000);
這些代碼可以實現需要的功能,但是服務在發送文件數據之前需要緩存整個文件數據到內存,如果"data.txt"文件很
大并且并發量很大的話,會浪費很多內存。因為用戶需要等到整個文件緩存到內存才能接受的文件數據,這樣導致
用戶體驗相當不好。不過還好(req, res)兩個參數都是Stream,這樣我們可以用fs.createReadStream()代替fs.readFile():
var http = require("http"); var fs = require("fs"); var server = http.createServer(function (req, res) { var stream = fs.createReadStream(__dirname + "/data.txt"); stream.pipe(res); }); server.listen(8000);
.pipe()方法監聽fs.createReadStream()的"data" 和"end"事件,這樣"data.txt"文件就不需要緩存整
個文件,當客戶端連接完成之后馬上可以發送一個數據塊到客戶端。使用.pipe()另一個好處是可以解決當客戶
端延遲非常大時導致的讀寫不平衡問題。如果想壓縮文件再發送,可以使用三方模塊實現:
var http = require("http"); var fs = require("fs"); var oppressor = require("oppressor"); var server = http.createServer(function (req, res) { var stream = fs.createReadStream(__dirname + "/data.txt"); stream.pipe(oppressor(req)).pipe(res); }); server.listen(8000);
這樣文件就會對支持gzip和deflate的瀏覽器進行壓縮。oppressor 模塊會處理所有的content-encoding。
Stream使開發程序變得簡單。
基礎概念
有五種基本的Stream: readable, writable, transform, duplex, and"classic”.
所有類型的Stream收是使用 .pipe() 來創建一個輸入輸出對,接收一個可讀流src并將其數據輸出到可寫流dst,如下:
src.pipe(dst)
.pipe(dst)方法為返回dst流,這樣就可以接連使用多個.pipe(),如下:
a.pipe(b).pipe(c).pipe(d)
功能與下面的代碼相同:
a.pipe(b); b.pipe(c); c.pipe(d);
這樣的用法十分類似于unix命令下面用法:
a | b | c | dreadable streams
通過調用Readable streams的 .pipe()方法可以把Readable streams的數據寫入一個
Writable , Transform, 或者Duplex stream。
readableStream.pipe(dst)創建 readable stream
這里我們創建一個readable stream!
var Readable = require("stream").Readable; var rs = new Readable; rs.push("beep "); rs.push("boop "); rs.push(null); rs.pipe(process.stdout); $ node read0.js beep boop
rs.push(null) 通知數據接收者數據已經發送完畢.
注意到我們在將所有數據內容壓入可讀流之前并沒有調用rs.pipe(process.stdout);,但是我們壓入的所有數據
內容還是完全的輸出了,這是因為可讀流在接收者沒有讀取數據之前,會緩存所有壓入的數據。但是在很多情況下,更好的方法是只有數據接收著請求數據的時候,才壓入數據到可讀流而不是緩存整個數據。下面我們重寫 一下
._read()函數:
var Readable = require("stream").Readable; var rs = Readable(); var c = 97; rs._read = function () { rs.push(String.fromCharCode(c++)); if (c > "z".charCodeAt(0)) rs.push(null); }; rs.pipe(process.stdout); $ node read1.js abcdefghijklmnopqrstuvwxyz
上面的代碼通過重寫_read()方法實現了只有在數據接受者請求數據才向可讀流中壓入數據。_read()方法也可以接收一個size參數表示數據請求著請求的數據大小,但是可讀流可以根據需要忽略這個參數。注意我們也可以用util.inherits()繼承可讀流。為了說明只有在數據接受者請求數據時_read()方法才被調用,我們在向可讀流壓入數據時做一個延時,如下:
var Readable = require("stream").Readable; var rs = Readable(); var c = 97 - 1; rs._read = function () { if (c >= "z".charCodeAt(0)) return rs.push(null); setTimeout(function () { rs.push(String.fromCharCode(++c)); }, 100); }; rs.pipe(process.stdout); process.on("exit", function () { console.error(" _read() called " + (c - 97) + " times"); }); process.stdout.on("error", process.exit);
用下面的命令運行程序我們發現_read()方法只調用了5次:
$ node read2.js | head -c5 abcde _read() called 5 times
使用計時器的原因是系統需要時間來發送信號來通知程序關閉管道。使用process.stdout.on("error", fn) 是為了處理系統因為header命令關閉管道而發送SIGPIPE信號,因為這樣會導致process.stdout觸發EPIPE事件。如果想創建一個的可以壓入任意形式數據的可讀流,只要在創建流的時候設置參數objectMode為true即可,例如:Readable({ objectMode: true })。
讀取readable stream數據大部分情況下我們只要簡單的使用pipe方法將可讀流的數據重定向到另外形式的流,但是在某些情況下也許直接從可讀流中讀取數據更有用。如下所示:
process.stdin.on("readable", function () { var buf = process.stdin.read(); console.dir(buf); }); $ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.jsnull
當可讀流中有數據可讀取時,流會觸發"readable" 事件,這樣就可以調用.read()方法來讀取相關數據,當可讀流中沒有數據可讀取時,.read() 會返回null,這樣就可以結束.read() 的調用, 等待下一次"readable" 事件的觸發。下面是一個使用.read(n)從標準輸入每次讀取3個字節的例子:
process.stdin.on("readable", function () { var buf = process.stdin.read(3); console.dir(buf); });
如下運行程序發現,輸出結果并不完全!
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js
這是應為額外的數據數據留在流的內部緩沖區里了,而我們需要通知流我們要讀取更多的數據.read(0) 可以達到這個目的。
process.stdin.on("readable", function () { var buf = process.stdin.read(3); console.dir(buf); process.stdin.read(0); });
這次運行結果如下:
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js
我們可以使用 .unshift() 將數據重新押回流數據隊列的頭部,這樣可以接續讀取押回的數據。如下面的代碼,會按行輸出標準輸入的內容:
var offset = 0; process.stdin.on("readable", function () { var buf = process.stdin.read(); if (!buf) return; for (; offset < buf.length; offset++) { if (buf[offset] === 0x0a) { console.dir(buf.slice(0, offset).toString()); buf = buf.slice(offset + 1); offset = 0; process.stdin.unshift(buf); return; } } process.stdin.unshift(buf); }); $ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js "hearties" "heartiest" "heartily" "heartiness" "heartiness"s" "heartland" "heartland"s" "heartlands" "heartless" "heartlessly"
當然,有很多模塊可以實現這個功能,如:split 。
writable streams重寫 ._write(chunk, enc, next) 方法就可以接受一個readable stream的數據。
var Writable = require("stream").Writable; var ws = Writable(); ws._write = function (chunk, enc, next) { console.dir(chunk); next(); }; process.stdin.pipe(ws); $ (echo beep; sleep 1; echo boop) | node write0.js
第一個參數chunk是數據輸入者寫入的數據。第二個參數end是數據的編碼格式。第三個參數next(err)通過回調函數通知數據寫入者可以寫入更多的時間。如果readable stream寫入的是字符串,那么字符串會默認轉換為Buffer,如果在創建流的時候設置Writable({ decodeStrings: false })參數,那么不會做轉換。如果readable stream寫入的數據時對象,那么需要這樣創建writable stream
Writable({ objectMode: true })
寫數據到 writable stream調用writable stream的.write(data)方法即可完成數據寫入。
process.stdout.write("beep boop ");
調用.end()方法通知writable stream 數據已經寫入完成。
var fs = require("fs"); var ws = fs.createWriteStream("message.txt"); ws.write("beep "); setTimeout(function () { ws.end("boop "); }, 1000); $ node writing1.js $ cat message.txt beep boop
如果需要設置writable stream的緩沖區的大小,那么在創建流的時候,需要設置opts.highWaterMark,這樣如果緩沖區里的數據超過opts.highWaterMark,.write(data)方法會返回false。當緩沖區可寫的時候,writable stream會觸發"drain" 事件。
classic streamsClassic streams比較老的接口了,最早出現在node 0.4版本中,但是了解一下其運行原理還是十分有好處的。當一個流被注冊了"data"事件的回到函數,那么流就會工作在老版本模式下,即會使用老的API。
classic readable streams
Classic readable streams事件就是一個事件觸發器,如果Classic readable streams有數據可讀取,那么其觸發 "data" 事件,等到數據讀取完畢時,會觸發"end" 事件。.pipe() 方法通過檢查stream.readable的值確定流是否有數據可讀。下面是一個使用Classic readable streams打印A-J字母的例子:
var Stream = require("stream"); var stream = new Stream; stream.readable = true; var c = 64; var iv = setInterval(function () { if (++c >= 75) { clearInterval(iv); stream.emit("end"); } else stream.emit("data", String.fromCharCode(c)); }, 100); stream.pipe(process.stdout); $ node classic0.js ABCDEFGHIJ
如果要從classic readable stream中讀取數據,注冊"data" 和"end"兩個事件的回調函數即可,代碼如下:
process.stdin.on("data", function (buf) { console.log(buf); }); process.stdin.on("end", function () { console.log("__END__"); }); $ (echo beep; sleep 1; echo boop) | node classic1.js__END__
需要注意的是如果你使用這種方式讀取數據,那么會失去使用新接口帶來的好處。比如你在往一個
延遲非常大的流寫數據時,需要注意讀取數據和寫數據的平衡問題,否則會導致大量數據緩存在內
存中,導致浪費大量內存。一般這時候強烈建議使用流的.pipe()方法,這樣就不用自己監聽"data"
和"end"事件了,也不用擔心讀寫不平衡的問題了。當然你也可以用 through代替自己監聽"data" 和"end" 事件,如下面的代碼:
var through = require("through"); process.stdin.pipe(through(write, end)); function write (buf) { console.log(buf); } function end () { console.log("__END__"); } $ (echo beep; sleep 1; echo boop) | node through.js__END__
或者也可以使用concat-stream來緩存整個流的內容:
var concat = require("concat-stream"); process.stdin.pipe(concat(function (body) { console.log(JSON.parse(body)); })); $ echo "{"beep":"boop"}" | node concat.js { beep: "boop" }
當然如果你非要自己監聽"data" 和"end"事件,那么你可以在寫數據的流寫的 時候使用.pause()方法暫停Classic readable streams繼續觸發"data" 事件。等到寫數據的流可寫的時候再使用.resume() 方法通知流繼續觸發"data" 事件繼續讀取數據。
classic writable streams
Classic writable streams 非常簡單。只有 .write(buf), .end(buf)和.destroy()三個方法。.end(buf) 方法的buf參數是可選的,如果選擇該參數,相當于stream.write(buf); stream.end() 這樣的操作,需要注意的是當流的緩沖區寫滿即流不可寫時.write(buf)方法會返回false,如果流再次可寫時,流會觸發drain事件。
transformtransform是一個對讀入數據過濾然輸出的流。
duplexduplex stream是一個可讀也可寫的雙向流,如下面的a就是一個duplex stream:
a.pipe(b).pipe(a)read more
core stream documentation
You can use the readable-stream
本文翻譯來自 https://github.com/substack/stream-handb...
本文轉載來自 http://www.open-open.com/lib/view/open13...
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/79078.html
摘要:在創建時大小已經被確定且是無法調整的,在內存分配這塊是由層面提供而不是具體后面會講解。在這里不知道你是否認為這是很簡單的但是上面提到的一些關鍵詞二進制流緩沖區,這些又都是什么呢下面嘗試做一些簡單的介紹。 showImg(https://segmentfault.com/img/remote/1460000019894717?w=1280&h=850); 多數人都擁有自己不了解的能力和機...
摘要:創建簡單應用使用指令來載入模塊創建服務器使用方法創建服務器,并使用方法綁定端口。全局安裝將安裝包放在下。的核心就是事件觸發與事件監聽器功能的封裝。通常我們用于從一個流中獲取數據并將數據傳遞到另外一個流中。壓縮文件為文件壓縮完成。 創建簡單應用 使用 require 指令來載入 http 模塊 var http = require(http); 創建服務器 使用 http.create...
摘要:前言一個集微信公眾號商城小程序商城商城后臺的一個開源項目,后臺是基于開發的,是一個簡潔而強大的開源微信公眾平臺開發框架,微信功能插件化開發多公眾號管理配置簡單。微信小程序項目下載整個包之后,進行根目錄文件夾。 前言 一個集微信公眾號商城/小程序商城/商城后臺的一個開源項目,后臺是基于WeiPHP5.0開發的,WeiPHP是一個簡潔而強大的開源微信公眾平臺開發框架,微信功能插件化開發,多...
摘要:前言一個集微信公眾號商城小程序商城商城后臺的一個開源項目,后臺是基于開發的,是一個簡潔而強大的開源微信公眾平臺開發框架,微信功能插件化開發多公眾號管理配置簡單。微信小程序項目下載整個包之后,進行根目錄文件夾。 前言 一個集微信公眾號商城/小程序商城/商城后臺的一個開源項目,后臺是基于WeiPHP5.0開發的,WeiPHP是一個簡潔而強大的開源微信公眾平臺開發框架,微信功能插件化開發,多...
摘要:進程間通信的目的是為了讓不同的進程能夠互相訪問資源,并進程協調工作。這個過程的示意圖如下端口共同監聽集群穩定之路進程事件自動重啟負載均衡狀態共享模塊工作原理事件二測試單元測試性能測試三產品化項目工程化部署流程性能日志監控報警穩定性異構共存 內容 9.玩轉進程10.測試11.產品化 一、玩轉進程 node的單線程只不過是js層面的單線程,是基于V8引擎的單線程,因為,V8的緣故,前后...
閱讀 3213·2021-11-24 09:39
閱讀 2942·2021-11-23 09:51
閱讀 899·2021-11-18 10:07
閱讀 3549·2021-10-11 10:57
閱讀 2750·2021-10-08 10:04
閱讀 3008·2021-09-26 10:11
閱讀 1053·2021-09-23 11:21
閱讀 2795·2019-08-29 17:28