摘要:在數(shù)據(jù)緩沖區(qū)已超過或?qū)懭腙犃挟斍罢Φ娜魏吻闆r下,將返回。當返回值時,背壓系統(tǒng)啟動,它會暫停傳入的流發(fā)送任何數(shù)據(jù),并等待消費者再次準備就緒,清空數(shù)據(jù)緩沖區(qū)后,將發(fā)出事件并恢復(fù)傳入的數(shù)據(jù)流。
流中的背壓
在數(shù)據(jù)處理過程中會出現(xiàn)一個叫做背壓的常見問題,它描述了數(shù)據(jù)傳輸過程中緩沖區(qū)后面數(shù)據(jù)的累積,當傳輸?shù)慕邮斩司哂袕?fù)雜的操作時,或者由于某種原因速度較慢時,來自傳入源的數(shù)據(jù)就有累積的趨勢,就像阻塞一樣。
要解決這個問題,必須有一個委托系統(tǒng)來確保數(shù)據(jù)從一個源到另一個源的平滑流動,不同的社區(qū)已經(jīng)針對他們的程序獨特地解決了這個問題,Unix管道和TCP套接字就是很好的例子,并且通常被稱為流量控制,在Node.js中,流是已采用的解決方案。
本指南的目的是進一步詳細說明背壓是什么,以及精確流如何在Node.js的源代碼中解決這個問題,本指南的第二部分將介紹建議的最佳實踐,以確保在實現(xiàn)流時應(yīng)用程序的代碼是安全的和優(yōu)化的。
我們假設(shè)你對Node.js中背壓、Buffer和EventEmitter的一般定義以及Stream的一些經(jīng)驗有所了解。如果你還沒有閱讀這些文檔,那么首先查看API文檔并不是一個壞主意,因為它有助于在閱讀本指南時擴展你的理解。
數(shù)據(jù)處理的問題在計算機系統(tǒng)中,數(shù)據(jù)通過管道、sockets和信號從一個進程傳輸?shù)搅硪粋€進程,在Node.js中,我們找到了一種名為Stream的類似機制。流很好!他們?yōu)镹ode.js做了很多事情,幾乎內(nèi)部代碼庫的每個部分都使用該模塊,作為開發(fā)人員,我們鼓勵你使用它們!
const readline = require("readline"); // process.stdin and process.stdout are both instances of Streams const rl = readline.createInterface({ input: process.stdin, output: process.stdout }); rl.question("Why should you use streams? ", (answer) => { console.log(`Maybe it"s ${answer}, maybe it"s because they are awesome! :)`); rl.close(); });
通過比較Node.js的Stream實現(xiàn)的內(nèi)部系統(tǒng)工具,可以證明為什么通過流實現(xiàn)背壓機制是一個很好的優(yōu)化的一個很好的例子。
在一種情況下,我們將使用一個大文件(約?9gb)并使用熟悉的zip(1)工具對其進行壓縮。
$ zip The.Matrix.1080p.mkv
雖然這需要幾分鐘才能完成,但在另一個shell中我們可以運行一個腳本,該腳本采用Node.js的模塊zlib,它包含另一個壓縮工具gzip(1)。
const gzip = require("zlib").createGzip(); const fs = require("fs"); const inp = fs.createReadStream("The.Matrix.1080p.mkv"); const out = fs.createWriteStream("The.Matrix.1080p.mkv.gz"); inp.pipe(gzip).pipe(out);
要測試結(jié)果,請嘗試打開每個壓縮文件,zip(1)工具壓縮的文件將通知你文件已損壞,而Stream完成的壓縮將無錯誤地解壓縮。
注意:在此示例中,我們使用.pipe()將數(shù)據(jù)源從一端獲取到另一端,但是,請注意沒有附加正確的錯誤處理程序。如果無法正確接收數(shù)據(jù)塊,Readable源或gzip流將不會被銷毀,pump是一個實用工具,如果其中一個流失敗或關(guān)閉,它將正確地銷毀管道中的所有流,并且在這種情況下是必須的!
只有Nodejs 8.x或更早版本才需要pump,對于Node 10.x或更高版本,引入pipeline來替換pump。這是一個模塊方法,用于在流傳輸之間轉(zhuǎn)發(fā)錯誤和正確清理,并在管道完成時提供回調(diào)。
以下是使用管道的示例:
const { pipeline } = require("stream"); const fs = require("fs"); const zlib = require("zlib"); // Use the pipeline API to easily pipe a series of streams // together and get notified when the pipeline is fully done. // A pipeline to gzip a potentially huge video file efficiently: pipeline( fs.createReadStream("The.Matrix.1080p.mkv"), zlib.createGzip(), fs.createWriteStream("The.Matrix.1080p.mkv.gz"), (err) => { if (err) { console.error("Pipeline failed", err); } else { console.log("Pipeline succeeded"); } } );
你還可以在管道上調(diào)用promisify以將其與async/await一起使用:
const stream = require("stream"); const fs = require("fs"); const zlib = require("zlib"); const pipeline = util.promisify(stream.pipeline); async function run() { try { await pipeline( fs.createReadStream("The.Matrix.1080p.mkv"), zlib.createGzip(), fs.createWriteStream("The.Matrix.1080p.mkv.gz"), ); console.log("Pipeline succeeded"); } catch (err) { console.error("Pipeline failed", err); } }太多的數(shù)據(jù),太快
有些情況下,Readable流可能會過快地為Writable提供數(shù)據(jù) — 遠遠超過消費者可以處理的數(shù)據(jù)!
當發(fā)生這種情況時,消費者將開始排隊所有數(shù)據(jù)塊以供以后消費,寫入隊列將變得越來越長,因此在整個過程完成之前,必須將更多數(shù)據(jù)保存在內(nèi)存中。
寫入磁盤比從磁盤讀取要慢很多,因此,當我們嘗試壓縮文件并將其寫入我們的硬盤時,將發(fā)生背壓,因為寫入磁盤將無法跟上讀取的速度。
// Secretly the stream is saying: "whoa, whoa! hang on, this is way too much!" // Data will begin to build up on the read-side of the data buffer as // `write` tries to keep up with the incoming data flow. inp.pipe(gzip).pipe(outputFile);
這就是背壓機制很重要的原因,如果沒有背壓系統(tǒng),該進程會耗盡系統(tǒng)的內(nèi)存,有效地減緩了其他進程,并獨占你系統(tǒng)的大部分直到完成。
這導(dǎo)致了一些事情:
減緩所有其他當前進程。
一個非常超負荷的垃圾收集器。
內(nèi)存耗盡。
在下面的示例中,我們將取出.write()函數(shù)的返回值并將其更改為true,這有效地禁用了Node.js核心中的背壓支持,在任何對"modified"二進制文件的引用中,我們正在談?wù)撛跊]有return ret;行的情況下運行node二進制,而改為return true;。
垃圾收集器上的過度負荷我們來看看快速基準測試,使用上面的相同示例,我們進行幾次試驗,以獲得兩個二進制的中位時間。
trial (#) | `node` binary (ms) | modified `node` binary (ms) ================================================================= 1 | 56924 | 55011 2 | 52686 | 55869 3 | 59479 | 54043 4 | 54473 | 55229 5 | 52933 | 59723 ================================================================= average time: | 55299 | 55975
兩者都需要大約一分鐘來運行,因此根本沒有太大差別,但讓我們仔細看看以確認我們的懷疑是否正確,我們使用Linux工具dtrace來評估V8垃圾收集器發(fā)生了什么。
GC(垃圾收集器)測量時間表示垃圾收集器完成單次掃描的完整周期的間隔:
approx. time (ms) | GC (ms) | modified GC (ms) ================================================= 0 | 0 | 0 1 | 0 | 0 40 | 0 | 2 170 | 3 | 1 300 | 3 | 1 * * * * * * * * * 39000 | 6 | 26 42000 | 6 | 21 47000 | 5 | 32 50000 | 8 | 28 54000 | 6 | 35
雖然這兩個過程開始時相同,但似乎以相同的速率運行GC,很明顯,在適當工作的背壓系統(tǒng)幾秒鐘后,它將GC負載分布在4-8毫秒的一致間隔內(nèi),直到數(shù)據(jù)傳輸結(jié)束。
但是,當背壓系統(tǒng)不到位時,V8垃圾收集開始拖延,正常二進制文件在一分鐘內(nèi)調(diào)用GC約75次,然而,修改后的二進制文件僅觸發(fā)36次。
這是由于內(nèi)存使用量增加而累積的緩慢而漸進的債務(wù),隨著數(shù)據(jù)傳輸,在沒有背壓系統(tǒng)的情況下,每個塊傳輸使用更多內(nèi)存。
分配的內(nèi)存越多,GC在一次掃描中需要處理的內(nèi)存就越多,掃描越大,GC就越需要決定可以釋放什么,并且在更大的內(nèi)存空間中掃描分離的指針將消耗更多的計算能力。
內(nèi)存耗盡為確定每個二進制的內(nèi)存消耗,我們使用/usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js多帶帶為每個進程計時。
這是正常二進制的輸出:
Respecting the return value of .write() ============================================= real 58.88 user 56.79 sys 8.79 87810048 maximum resident set size 0 average shared memory size 0 average unshared data size 0 average unshared stack size 19427 page reclaims 3134 page faults 0 swaps 5 block input operations 194 block output operations 0 messages sent 0 messages received 1 signals received 12 voluntary context switches 666037 involuntary context switches
虛擬內(nèi)存占用的最大字節(jié)大小約為87.81mb。
現(xiàn)在更改.write()函數(shù)的返回值,我們得到:
Without respecting the return value of .write(): ================================================== real 54.48 user 53.15 sys 7.43 1524965376 maximum resident set size 0 average shared memory size 0 average unshared data size 0 average unshared stack size 373617 page reclaims 3139 page faults 0 swaps 18 block input operations 199 block output operations 0 messages sent 0 messages received 1 signals received 25 voluntary context switches 629566 involuntary context switches
虛擬內(nèi)存占用的最大字節(jié)大小約為1.52gb。
如果沒有流來委托背壓,則分配的內(nèi)存空間要大一個數(shù)量級 — 同一進程之間的巨大差異!
這個實驗展示了Node.js的反壓機制是如何優(yōu)化和節(jié)省成本的,現(xiàn)在,讓我們分析一下它是如何工作的!
背壓如何解決這些問題?將數(shù)據(jù)從一個進程傳輸?shù)搅硪粋€進程有不同的函數(shù),在Node.js中,有一個名為.pipe()的內(nèi)部內(nèi)置函數(shù),還有其他包也可以使用!但最終,在這個過程的基本層面,我們有兩個獨立的組件:數(shù)據(jù)來源和消費者。
當從源調(diào)用.pipe()時,它向消費者發(fā)出信號,告知有數(shù)據(jù)要傳輸,管道函數(shù)有助于為事件觸發(fā)器設(shè)置適當?shù)谋硥洪]合。
在Node.js中,源是Readable流,而消費者是Writable流(這些都可以與Duplex或Transform流互換,但這超出了本指南的范圍)。
觸發(fā)背壓的時刻可以精確地縮小到Writable的.write()函數(shù)的返回值,當然,該返回值由幾個條件決定。
在數(shù)據(jù)緩沖區(qū)已超過highWaterMark或?qū)懭腙犃挟斍罢Φ娜魏吻闆r下,.write()將返回false。
當返回false值時,背壓系統(tǒng)啟動,它會暫停傳入的Readable流發(fā)送任何數(shù)據(jù),并等待消費者再次準備就緒,清空數(shù)據(jù)緩沖區(qū)后,將發(fā)出.drain()事件并恢復(fù)傳入的數(shù)據(jù)流。
隊列完成后,背壓將允許再次發(fā)送數(shù)據(jù),正在使用的內(nèi)存空間將自行釋放并為下一批數(shù)據(jù)做好準備。
這有效地允許在任何給定時間為.pipe()函數(shù)使用固定數(shù)量的內(nèi)存,沒有內(nèi)存泄漏,沒有無限緩沖,垃圾收集器只需要處理內(nèi)存中的一個區(qū)域!
那么,如果背壓如此重要,為什么你(可能)沒有聽說過它?答案很簡單:Node.js會自動為你完成所有這些工作。
那太好了!但是當我們試圖了解如何實現(xiàn)我們自己的自定義流時,也不是那么好。
注意:在大多數(shù)機器中,有一個字節(jié)大小可以確定緩沖區(qū)何時已滿(在不同的機器上會有所不同),Node.js允許你設(shè)置自己的自定義highWaterMark,但通常,默認設(shè)置為16kb(16384,或objectMode流為16),在你可能希望提高該值的情況下,可以嘗試,但是要小心!
.pipe()的生命周期為了更好地理解背壓,下面是一個關(guān)于Readable流的生命周期的流程圖,該流被管道傳輸?shù)?b>Writable流中:
+===================+ x--> Piping functions +--> src.pipe(dest) | x are set up during |===================| x the .pipe method. | Event callbacks | +===============+ x |-------------------| | Your Data | x They exist outside | .on("close", cb) | +=======+=======+ x the data flow, but | .on("data", cb) | | x importantly attach | .on("drain", cb) | | x events, and their | .on("unpipe", cb) | +---------v---------+ x respective callbacks. | .on("error", cb) | | Readable Stream +----+ | .on("finish", cb) | +-^-------^-------^-+ | | .on("end", cb) | ^ | ^ | +-------------------+ | | | | | ^ | | ^ ^ ^ | +-------------------+ +=================+ ^ | ^ +----> Writable Stream +---------> .write(chunk) | | | | +-------------------+ +=======+=========+ | | | | | ^ | +------------------v---------+ ^ | +-> if (!chunk) | Is this chunk too big? | ^ | | emit .end(); | Is the queue busy? | | | +-> else +-------+----------------+---+ | ^ | emit .write(); | | | ^ ^ +--v---+ +---v---+ | | ^-----------------------------------< No | | Yes | ^ | +------+ +---v---+ ^ | | | ^ emit .pause(); +=================+ | | ^---------------^-----------------------+ return false; <-----+---+ | +=================+ | | | ^ when queue is empty +============+ | ^------------^-----------------------< Buffering | | | |============| | +> emit .drain(); | ^Buffer^ | | +> emit .resume(); +------------+ | | ^Buffer^ | | +------------+ add chunk to queue | | <---^---------------------< +============+
注意:如果要設(shè)置管道以將一些流鏈接在一起來操作數(shù)據(jù),則很可能會實現(xiàn)Transform流。
在這種情況下,你的Readable流的輸出將輸入到Transform中,并將管道到Writable中。
Readable.pipe(Transformable).pipe(Writable);
背壓將自動應(yīng)用,但請注意,Transform流的輸入和輸出highWaterMark都可能被操縱并將影響背壓系統(tǒng)。
背壓指南從Node.js v0.10開始,Stream類提供了通過使用這些相應(yīng)函數(shù)的下劃線版本來修改.read()或.write()的行為的功能(._read()和._write())。
對于實現(xiàn)Readable流和Writable流,有文檔化的指南,我們假設(shè)你已閱讀過這些內(nèi)容,下一節(jié)將更深入一些。
實現(xiàn)自定義流時要遵守的規(guī)則流的黃金法則始終是尊重背壓,最佳實踐的構(gòu)成是非矛盾的實踐,只要你小心避免與內(nèi)部背壓支持相沖突的行為,你就可以確定你遵循良好做法。
一般來說:
如果你沒有被要求,永遠不要.push()。
永遠不要在返回false后調(diào)用.write(),而是等待"drain"。
流在不同的Node.js版本和你使用的庫之間有變化,小心并測試一下。
注意:關(guān)于第3點,構(gòu)建瀏覽器流的非常有用的包是readable-stream,Rodd Vagg撰寫了一篇很棒的博客文章,描述了這個庫的實用性,簡而言之,它為Readable流提供了一種自動優(yōu)雅降級,并支持舊版本的瀏覽器和Node.js。
Readable流的特定規(guī)則到目前為止,我們已經(jīng)了解了.write()如何影響背壓,并將重點放在Writable流上,由于Node.js的功能,數(shù)據(jù)在技術(shù)上從Readable流向下游Writable。但是,正如我們可以在數(shù)據(jù)、物質(zhì)或能量的任何傳輸中觀察到的那樣,源與目標一樣重要,Readable流對于如何處理背壓至關(guān)重要。
這兩個過程都相互依賴,有效地進行通信,如果Readable忽略Writable流要求它停止發(fā)送數(shù)據(jù)的時候,那么.write()的返回值不正確就會有問題。
因此,關(guān)于.write()返回,我們還必須尊重._read()方法中使用的.push()的返回值,如果.push()返回false值,則流將停止從源讀取,否則,它將繼續(xù)而不會停頓。
以下是使用.push()的不好做法示例:
// This is problematic as it completely ignores return value from push // which may be a signal for backpressure from the destination stream! class MyReadable extends Readable { _read(size) { let chunk; while (null !== (chunk = getNextChunk())) { this.push(chunk); } } }
此外,在自定義流之外,存在忽略背壓的陷阱,在這個良好的實踐的反例中,應(yīng)用程序的代碼會在數(shù)據(jù)可用時強制通過(由.data事件發(fā)出信號):
// This ignores the backpressure mechanisms Node.js has set in place, // and unconditionally pushes through data, regardless if the // destination stream is ready for it or not. readable.on("data", (data) => writable.write(data) );Writable流的特定規(guī)則
回想一下.write()可能會根據(jù)某些條件返回true或false,幸運的是,在構(gòu)建我們自己的Writable流時,流狀態(tài)機將處理我們的回調(diào)并確定何時處理背壓并為我們優(yōu)化數(shù)據(jù)流。
但是,當我們想直接使用Writable時,我們必須尊重.write()返回值并密切注意這些條件:
如果寫隊列忙,.write()將返回false。
如果數(shù)據(jù)塊太大,.write()將返回false(該值由變量highWaterMark指示)。
// This writable is invalid because of the async nature of JavaScript callbacks. // Without a return statement for each callback prior to the last, // there is a great chance multiple callbacks will be called. class MyWritable extends Writable { _write(chunk, encoding, callback) { if (chunk.toString().indexOf("a") >= 0) callback(); else if (chunk.toString().indexOf("b") >= 0) callback(); callback(); } } // The proper way to write this would be: if (chunk.contains("a")) return callback(); else if (chunk.contains("b")) return callback(); callback();
在實現(xiàn)._writev()時還需要注意一些事項,該函數(shù)與.cork()結(jié)合使用,但寫入時有一個常見錯誤:
// Using .uncork() twice here makes two calls on the C++ layer, rendering the // cork/uncork technique useless. ws.cork(); ws.write("hello "); ws.write("world "); ws.uncork(); ws.cork(); ws.write("from "); ws.write("Matteo"); ws.uncork(); // The correct way to write this is to utilize process.nextTick(), which fires // on the next event loop. ws.cork(); ws.write("hello "); ws.write("world "); process.nextTick(doUncork, ws); ws.cork(); ws.write("from "); ws.write("Matteo"); process.nextTick(doUncork, ws); // as a global function function doUncork(stream) { stream.uncork(); }
.cork()可以被調(diào)用多次,我們只需要小心調(diào)用.uncork()相同的次數(shù),使其再次流動。
結(jié)論Streams是Node.js中經(jīng)常使用的模塊,它們對于內(nèi)部結(jié)構(gòu)非常重要,對于開發(fā)人員來說,它們可以跨Node.js模塊生態(tài)系統(tǒng)進行擴展和連接。
希望你現(xiàn)在能夠進行故障排除,安全地編寫你自己的Writable和Readable流,并考慮背壓,并與同事和朋友分享你的知識。
在使用Node.js構(gòu)建應(yīng)用程序時,請務(wù)必閱讀有關(guān)其他API函數(shù)的Stream的更多信息,以幫助改進和釋放你的流功能。
上一篇:使用不同的文件系統(tǒng) 下一篇:域模塊剖析文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/100377.html
Node.js 指南 Node.js?是基于Chrome的V8 JavaScript引擎構(gòu)建的JavaScript運行時。 常規(guī) 關(guān)于Node.js 入門指南 輕松分析Node.js應(yīng)用程序 Docker化Node.js Web應(yīng)用程序 遷移到安全的Buffer構(gòu)造函數(shù) Node.js核心概念 阻塞與非阻塞概述 Node.js事件循環(huán)、定時器和process.nextTick() 不要阻塞事...
摘要:避免使用最低公分母方法你可能想讓你的程序像最低公分母文件系統(tǒng)一樣,通過將所有文件名規(guī)范化為大寫,將所有文件名規(guī)范化為格式,并將所有文件時間戳標準化為秒分辨率,這是最小公分母的方法。 使用不同的文件系統(tǒng) Node公開了文件系統(tǒng)的許多功能,但并非所有文件系統(tǒng)都相似,以下是建議的最佳實踐,以便在使用不同的文件系統(tǒng)時保持代碼簡單和安全。 文件系統(tǒng)行為 在使用文件系統(tǒng)之前,你需要知道它的行為方式...
摘要:快速檢查可能告訴我們,簡單地從的域處理程序拋出將允許然后捕獲異常并執(zhí)行其自己的錯誤處理程序,雖然情況并非如此,檢查后,你會看到堆棧只包含。 域模塊剖析 可用性問題 隱式行為 開發(fā)人員可以創(chuàng)建新域,然后只需運行domain.enter(),然后,它充當將來拋出者無法觀察到的任何異常的萬能捕捉器,允許模塊作者攔截不同模塊中不相關(guān)代碼的異常,防止代碼的發(fā)起者知道自己的異常。 以下是一個間接鏈...
摘要:相對于最大的更新就是把對背壓問題的處理邏輯從中抽取出來產(chǎn)生了新的可觀察對象。由于基于發(fā)射的數(shù)據(jù)流,以及對數(shù)據(jù)加工處理的各操作符都添加了背壓支持,附加了額外的邏輯,其運行效率要比慢得多。 背壓(backpressure)當上下游在不同的線程中,通過Observable發(fā)射,處理,響應(yīng)數(shù)據(jù)流時,如果上游發(fā)射數(shù)據(jù)的速度快于下游接收處理數(shù)據(jù)的速度,這樣對于那些沒來得及處理的數(shù)據(jù)就會造成積壓,這...
閱讀 3056·2021-11-18 10:02
閱讀 3324·2021-11-02 14:48
閱讀 3387·2019-08-30 13:52
閱讀 547·2019-08-29 17:10
閱讀 2079·2019-08-29 12:53
閱讀 1400·2019-08-29 12:53
閱讀 1024·2019-08-29 12:25
閱讀 2162·2019-08-29 12:17