摘要:這使我們的知道什么時(shí)候原始模塊被初始化,在初始化后執(zhí)行預(yù)初始化隊(duì)列的操作,之后清空預(yù)初始化隊(duì)列,再調(diào)用作為參數(shù)的回調(diào)函數(shù),以下為具體步驟把賦值給,表示預(yù)初始化已經(jīng)完成了。
本系列文章為《Node.js Design Patterns Second Edition》的原文翻譯和讀書筆記,在GitHub連載更新,同步翻譯版鏈接。
歡迎關(guān)注我的專欄,之后的博文將在專欄同步:
Encounter的掘金專欄
知乎專欄 Encounter的編程思考
segmentfault專欄 前端小站
Advanced Asynchronous Recipes幾乎所有我們迄今為止看到的設(shè)計(jì)模式都可以被認(rèn)為是通用的,并且適用于應(yīng)用程序的許多不同的領(lǐng)域。但是,有一套更具體的模式,專注于解決明確的問題。我們可以調(diào)用這些模式。就像現(xiàn)實(shí)生活中的烹飪一樣,我們有一套明確的步驟來實(shí)現(xiàn)預(yù)期的結(jié)果。當(dāng)然,這并不意味著我們不能用一些創(chuàng)意來定制設(shè)計(jì)模式,以配合我們的客人的口味,對(duì)于書寫Node.js程序來說是必要的。在本章中,我們將提供一些常見的解決方案來解決我們?cè)谌粘?b>Node.js開發(fā)中遇到的一些具體問題。這些模式包括以下內(nèi)容:
異步引入模塊并初始化
在高并發(fā)的應(yīng)用程序中使用批處理和緩存異步操作的性能優(yōu)化
運(yùn)行與Node.js處理并發(fā)請(qǐng)求的能力相悖的阻塞事件循環(huán)的同步CPU綁定操作
異步引入模塊并初始化在Chapter2-Node.js Essential Patterns中,當(dāng)我們討論Node.js模塊系統(tǒng)的基本屬性時(shí),我們提到了require()是同步的,并且module.exports也不能異步設(shè)置。
這是在核心模塊和許多npm包中存在同步API的主要原因之一,是否同步加載會(huì)被作為一個(gè)option參數(shù)被提供,主要用于初始化任務(wù),而不是替代異步API。
不幸的是,這并不總是可能的。同步API可能并不總是可用的,特別是對(duì)于在初始化階段使用網(wǎng)絡(luò)的組件,例如執(zhí)行三次握手協(xié)議或在網(wǎng)絡(luò)中檢索配置參數(shù)。 許多數(shù)據(jù)庫驅(qū)動(dòng)程序和消息隊(duì)列等中間件系統(tǒng)的客戶端都是如此。
廣泛適用的解決方案我們舉一個(gè)例子:一個(gè)名為db的模塊,它將會(huì)連接到遠(yuǎn)程數(shù)據(jù)庫。 只有在連接和與服務(wù)器的握手完成之后,db模塊才能夠接受請(qǐng)求。在這種情況下,我們通常有兩種選擇:
在開始使用之前確保模塊已經(jīng)初始化,否則則等待其初始化。每當(dāng)我們想要在異步模塊上調(diào)用一個(gè)操作時(shí),都必須完成這個(gè)過程:
const db = require("aDb"); //The async module module.exports = function findAll(type, callback) { if (db.connected) { //is it initialized? runFind(); } else { db.once("connected", runFind); } function runFind() { db.findAll(type, callback); }; };
使用依賴注入(Dependency Injection)而不是直接引入異步模塊。通過這樣做,我們可以延遲一些模塊的初始化,直到它們的異步依賴被完全初始化。 這種技術(shù)將管理模塊初始化的復(fù)雜性轉(zhuǎn)移到另一個(gè)組件,通常是它的父模塊。 在下面的例子中,這個(gè)組件是app.js:
// 模塊app.js const db = require("aDb"); // aDb是一個(gè)異步模塊 const findAllFactory = require("./findAll"); db.on("connected", function() { const findAll = findAllFactory(db); // 之后再執(zhí)行異步操作 }); // 模塊findAll.js module.exports = db => { //db 在這里被初始化 return function findAll(type, callback) { db.findAll(type, callback); } }
我們可以看出,如果所涉及的異步依賴的數(shù)量過多,第一種方案便不太適用了。
另外,使用DI有時(shí)也是不理想的,正如我們?cè)?b>Chapter7-Wiring Modules中看到的那樣。在大型項(xiàng)目中,它可能很快變得過于復(fù)雜,尤其對(duì)于手動(dòng)完成并使用異步初始化模塊的情況下。如果我們使用一個(gè)設(shè)計(jì)用于支持異步初始化模塊的DI容器,這些問題將會(huì)得到緩解。
但是,我們將會(huì)看到,還有第三種方案可以讓我們輕松地將模塊從其依賴關(guān)系的初始化狀態(tài)中分離出來。
預(yù)初始化隊(duì)列將模塊與依賴項(xiàng)的初始化狀態(tài)分離的簡(jiǎn)單模式涉及到使用隊(duì)列和命令模式。這個(gè)想法是保存一個(gè)模塊在尚未初始化的時(shí)候接收到的所有操作,然后在所有初始化步驟完成后立即執(zhí)行這些操作。
實(shí)現(xiàn)一個(gè)異步初始化的模塊為了演示這個(gè)簡(jiǎn)單而有效的技術(shù),我們來構(gòu)建一個(gè)應(yīng)用程序。首先創(chuàng)建一個(gè)名為asyncModule.js的異步初始化模塊:
const asyncModule = module.exports; asyncModule.initialized = false; asyncModule.initialize = callback => { setTimeout(() => { asyncModule.initialized = true; callback(); }, 10000); }; asyncModule.tellMeSomething = callback => { process.nextTick(() => { if(!asyncModule.initialized) { return callback( new Error("I don"t have anything to say right now") ); } callback(null, "Current time is: " + new Date()); }); };
在上面的代碼中,asyncModule展現(xiàn)了一個(gè)異步初始化模塊的設(shè)計(jì)模式。 它有一個(gè)initialize()方法,在10秒的延遲后,將初始化的flag變量設(shè)置為true,并通知它的回調(diào)調(diào)用(10秒對(duì)于真實(shí)應(yīng)用程序來說是很長(zhǎng)的一段時(shí)間了,但是對(duì)于具有互斥條件的應(yīng)用來說可能會(huì)顯得力不從心)。
另一個(gè)方法tellMeSomething()返回當(dāng)前的時(shí)間,但是如果模塊還沒有初始化,它拋出產(chǎn)生一個(gè)異常。
下一步是根據(jù)我們剛剛創(chuàng)建的服務(wù)創(chuàng)建另一個(gè)模塊。 我們?cè)O(shè)計(jì)一個(gè)簡(jiǎn)單的HTTP請(qǐng)求處理程序,在一個(gè)名為routes.js的文件中實(shí)現(xiàn):
const asyncModule = require("./asyncModule"); module.exports.say = (req, res) => { asyncModule.tellMeSomething((err, something) => { if(err) { res.writeHead(500); return res.end("Error:" + err.message); } res.writeHead(200); res.end("I say: " + something); }); };
在handler中調(diào)用asyncModule的tellMeSomething()方法,然后將其結(jié)果寫入HTTP響應(yīng)中。 正如我們所看到的那樣,我們沒有對(duì)asyncModule的初始化狀態(tài)進(jìn)行任何檢查,這可能會(huì)導(dǎo)致問題。
現(xiàn)在,創(chuàng)建app.js模塊,使用核心http模塊創(chuàng)建一個(gè)非常基本的HTTP服務(wù)器:
const http = require("http"); const routes = require("./routes"); const asyncModule = require("./asyncModule"); asyncModule.initialize(() => { console.log("Async module initialized"); }); http.createServer((req, res) => { if (req.method === "GET" && req.url === "/say") { return routes.say(req, res); } res.writeHead(404); res.end("Not found"); }).listen(8000, () => console.log("Started"));
上述模塊是我們應(yīng)用程序的入口點(diǎn),它所做的只是觸發(fā)asyncModule的初始化并創(chuàng)建一個(gè)HTTP服務(wù)器,它使用我們以前創(chuàng)建的handler(routes.say())來對(duì)網(wǎng)絡(luò)請(qǐng)求作出相應(yīng)。
我們現(xiàn)在可以像往常一樣通過執(zhí)行app.js模塊來嘗試啟動(dòng)我們的服務(wù)器。
在服務(wù)器啟動(dòng)后,我們可以嘗試使用瀏覽器訪問URL:http://localhost:8000/并查看從asyncModule返回的內(nèi)容。
和預(yù)期的一樣,如果我們?cè)诜?wù)器啟動(dòng)后立即發(fā)送請(qǐng)求,結(jié)果將是一個(gè)錯(cuò)誤,如下所示:
Error:I don"t have anything to say right now
顯然,在異步模塊加載好了之后:
這意味著asyncModule尚未初始化,但我們?nèi)試L試使用它,則會(huì)拋出一個(gè)錯(cuò)誤。
根據(jù)異步初始化模塊的實(shí)現(xiàn)細(xì)節(jié),幸運(yùn)的情況是我們可能會(huì)收到一個(gè)錯(cuò)誤,乃至丟失重要的信息,崩潰整個(gè)應(yīng)用程序。 總的來說,我們剛剛描述的情況總是必須要避免的。
大多數(shù)時(shí)候,可能并不會(huì)出現(xiàn)上述問題,畢竟初始化一般來說很快,以至于在實(shí)踐中,它永遠(yuǎn)不會(huì)發(fā)生。 然而,對(duì)于設(shè)計(jì)用于自動(dòng)調(diào)節(jié)的高負(fù)載應(yīng)用和云服務(wù)器,情況就完全不同了。
用預(yù)初始化隊(duì)列包裝模塊為了維護(hù)服務(wù)器的健壯性,我們現(xiàn)在要通過使用我們?cè)诒竟?jié)開頭描述的模式來進(jìn)行異步模塊加載。我們將在asyncModule尚未初始化的這段時(shí)間內(nèi)對(duì)所有調(diào)用的操作推入一個(gè)預(yù)初始化隊(duì)列,然后在異步模塊加載好后處理它們時(shí)立即刷新隊(duì)列。這就是狀態(tài)模式的一個(gè)很好的應(yīng)用!我們將需要兩個(gè)狀態(tài),一個(gè)在模塊尚未初始化的時(shí)候?qū)⑺胁僮髋抨?duì),另一個(gè)在初始化完成時(shí)將每個(gè)方法簡(jiǎn)單地委托給原始的asyncModule模塊。
通常,我們沒有機(jī)會(huì)修改異步模塊的代碼;所以,為了添加我們的排隊(duì)層,我們需要圍繞原始的asyncModule模塊創(chuàng)建一個(gè)代理。
接下來創(chuàng)建一個(gè)名為asyncModuleWrapper.js的新文件,讓我們依照每個(gè)步驟逐個(gè)構(gòu)建它。我們需要做的第一件事是創(chuàng)建一個(gè)代理,并將原始異步模塊的操作委托給這個(gè)代理:
const asyncModule = require("./asyncModule"); const asyncModuleWrapper = module.exports; asyncModuleWrapper.initialized = false; asyncModuleWrapper.initialize = () => { activeState.initialize.apply(activeState, arguments); }; asyncModuleWrapper.tellMeSomething = () => { activeState.tellMeSomething.apply(activeState, arguments); };
在前面的代碼中,asyncModuleWrapper將其每個(gè)方法簡(jiǎn)單地委托給activeState。 讓我們來看看這兩個(gè)狀態(tài)是什么樣子
從notInitializedState開始,notInitializedState是指還沒初始化的狀態(tài):
// 當(dāng)模塊沒有被初始化時(shí)的狀態(tài) let pending = []; let notInitializedState = { initialize: function(callback) { asyncModule.initialize(function() { asyncModuleWrapper.initalized = true; activeState = initializedState; pending.forEach(function(req) { asyncModule[req.method].apply(null, req.args); }); pending = []; callback(); }); }, tellMeSomething: function(callback) { return pending.push({ method: "tellMeSomething", args: arguments }); } };
當(dāng)initialize()方法被調(diào)用時(shí),我們觸發(fā)初始化asyncModule模塊,提供一個(gè)回調(diào)函數(shù)作為參數(shù)。 這使我們的asyncModuleWrapper知道什么時(shí)候原始模塊被初始化,在初始化后執(zhí)行預(yù)初始化隊(duì)列的操作,之后清空預(yù)初始化隊(duì)列,再調(diào)用作為參數(shù)的回調(diào)函數(shù),以下為具體步驟:
把initializedState賦值給activeState,表示預(yù)初始化已經(jīng)完成了。
執(zhí)行先前存儲(chǔ)在待處理隊(duì)列中的所有命令。
調(diào)用原始回調(diào)。
由于此時(shí)的模塊尚未初始化,此狀態(tài)的tellMeSomething()方法僅創(chuàng)建一個(gè)新的Command對(duì)象,并將其添加到預(yù)初始化隊(duì)列中。
此時(shí),當(dāng)原始的asyncModule模塊尚未初始化時(shí),代理應(yīng)該已經(jīng)清楚,我們的代理將簡(jiǎn)單地把所有接收到的請(qǐng)求防到預(yù)初始化隊(duì)列中。 然后,當(dāng)我們被通知初始化完成時(shí),我們執(zhí)行所有預(yù)初始化隊(duì)列的操作,然后將內(nèi)部狀態(tài)切換到initializedState。來看這個(gè)代理模塊最后的定義:
let initializedState = asyncModule;
不出意外,initializedState對(duì)象只是對(duì)原始的asyncModule的引用!事實(shí)上,初始化完成后,我們可以安全地將任何請(qǐng)求直接發(fā)送到原始模塊。
最后,設(shè)定異步模塊還沒加載好的的狀態(tài),即notInitializedState
let activeState = notInitializedState;
我們現(xiàn)在可以嘗試再次啟動(dòng)我們的測(cè)試服務(wù)器,但首先,我們不要忘記用我們新的asyncModuleWrapper對(duì)象替換原始的asyncModule模塊的引用; 這必須在app.js和routes.js模塊中完成。
這樣做之后,如果我們?cè)噲D再次向服務(wù)器發(fā)送一個(gè)請(qǐng)求,我們會(huì)看到在asyncModule模塊尚未初始化的時(shí)候,請(qǐng)求不會(huì)失敗; 相反,他們會(huì)掛起,直到初始化完成,然后才會(huì)被實(shí)際執(zhí)行。我們當(dāng)然可以肯定,比起之前,容錯(cuò)率變得更高了。
可以看到,在剛剛初始化異步模塊的時(shí)候,服務(wù)器會(huì)等待請(qǐng)求的響應(yīng):
在異步模塊加載完成后,服務(wù)器才會(huì)返回響應(yīng)的信息:
模式:如果模塊是需要異步初始化的,則對(duì)每個(gè)操作進(jìn)行排隊(duì),直到模塊完全初始化釋放隊(duì)列。
現(xiàn)在,我們的服務(wù)器可以在啟動(dòng)后立即開始接受請(qǐng)求,并保證這些請(qǐng)求都不會(huì)由于其模塊的初始化狀態(tài)而失敗。我們能夠在不使用DI的情況下獲得這個(gè)結(jié)果,也不需要冗長(zhǎng)且容易出錯(cuò)的檢查來驗(yàn)證異步模塊的狀態(tài)。
其它場(chǎng)景的應(yīng)用我們剛剛介紹的模式被許多數(shù)據(jù)庫驅(qū)動(dòng)程序和ORM庫所使用。 最值得注意的是Mongoose,它是MongoDB的ORM。使用Mongoose,不必等待數(shù)據(jù)庫連接打開,以便能夠發(fā)送查詢,因?yàn)槊總€(gè)操作都排隊(duì),稍后與數(shù)據(jù)庫的連接完全建立時(shí)執(zhí)行。 這顯然提高了其API的可用性。
看一下Mongoose的源碼,它的每個(gè)方法是如何通過代理添加預(yù)初始化隊(duì)列。 可以看看實(shí)現(xiàn)這中模式的代碼片段:https://github.com/Automattic...
for (var i in Collection.prototype) { (function(i){ NativeCollection.prototype[i] = function () { if (this.buffer) { // mongoose中,在緩沖區(qū)不為空時(shí),只是簡(jiǎn)單地把這個(gè)操作加入緩沖區(qū)內(nèi) this.addQueue(i, arguments); return; } var collection = this.collection , args = arguments , self = this , debug = self.conn.base.options.debug; if (debug) { if ("function" === typeof debug) { debug.apply(debug , [self.name, i].concat(utils.args(args, 0, args.length-1))); } else { console.error("x1B[0;36mMongoose:x1B[0m %s.%s(%s) %s %s %s" , self.name , i , print(args[0]) , print(args[1]) , print(args[2]) , print(args[3])) } } return collection[i].apply(collection, args); }; })(i); }異步批處理和緩存
在高負(fù)載的應(yīng)用程序中,緩存起著至關(guān)重要的作用,幾乎在網(wǎng)絡(luò)中的任何地方,從網(wǎng)頁,圖像和樣式表等靜態(tài)資源到純數(shù)據(jù)(如數(shù)據(jù)庫查詢的結(jié)果)都會(huì)使用緩存。 在本節(jié)中,我們將學(xué)習(xí)如何將緩存應(yīng)用于異步操作,以及如何充分利用緩存解決高請(qǐng)求吞吐量的問題。
實(shí)現(xiàn)沒有緩存或批處理的服務(wù)器在這之前,我們來實(shí)現(xiàn)一個(gè)小型的服務(wù)器,以便用它來衡量緩存和批處理等技術(shù)在解決高負(fù)載應(yīng)用程序的優(yōu)勢(shì)。
讓我們考慮一個(gè)管理電子商務(wù)公司銷售的web服務(wù)器,特別是對(duì)于查詢我們的服務(wù)器所有特定類型的商品交易的總和的情況。 為此,考慮到LevelUP的簡(jiǎn)單性和靈活性,我們將再次使用LevelUP。我們要使用的數(shù)據(jù)模型是存儲(chǔ)在sales這一個(gè)sublevel中的簡(jiǎn)單事務(wù)列表,它是以下的形式:
transactionId {amount, item}
key由transactionId表示,value則是一個(gè)JSON對(duì)象,它包含amount,表示銷售金額和item,表示項(xiàng)目類型。
要處理的數(shù)據(jù)是非常基本的,所以讓我們立即在名為的totalSales.js文件中實(shí)現(xiàn)API,將如下所示:
const level = require("level"); const sublevel = require("level-sublevel"); const db = sublevel(level("example-db", {valueEncoding: "json"})); const salesDb = db.sublevel("sales"); module.exports = function totalSales(item, callback) { console.log("totalSales() invoked"); let sum = 0; salesDb.createValueStream() // [1] .on("data", data => { if(!item || data.item === item) { // [2] sum += data.amount; } }) .on("end", () => { callback(null, sum); // [3] }); };
該模塊的核心是totalSales函數(shù),它也是唯一exports的API;它進(jìn)行如下工作:
我們從包含交易信息的salesDb的sublevel創(chuàng)建一個(gè)Stream。Stream將從數(shù)據(jù)庫中提取所有條目。
監(jiān)聽data事件,這個(gè)事件觸發(fā)時(shí),將從數(shù)據(jù)庫Stream中提取出每一項(xiàng),如果這一項(xiàng)的item參數(shù)正是我們需要的item,就去累加它的amount到總的sum里面。
最后,end事件觸發(fā)時(shí),我們最終調(diào)用callback()方法。
上述查詢方式可能在性能方面并不好。理想情況下,在實(shí)際的應(yīng)用程序中,我們可以使用索引,甚至使用增量映射來縮短實(shí)時(shí)計(jì)算的時(shí)間;但是,由于我們需要體現(xiàn)緩存的優(yōu)勢(shì),對(duì)于上述例子來說,慢速的查詢實(shí)際上更好,因?yàn)樗鼤?huì)突出顯示我們要分析的模式的優(yōu)點(diǎn)。
為了完成總銷售應(yīng)用程序,我們只需要從HTTP服務(wù)器公開totalSales的API;所以,下一步是構(gòu)建一個(gè)(app.js文件):
const http = require("http"); const url = require("url"); const totalSales = require("./totalSales"); http.createServer((req, res) => { const query = url.parse(req.url, true).query; totalSales(query.item, (err, sum) => { res.writeHead(200); res.end(`Total sales for item ${query.item} is ${sum}`); }); }).listen(8000, () => console.log("Started"));
我們創(chuàng)建的服務(wù)器是非常簡(jiǎn)單的;我們只需要它暴露totalSales API。
在我們第一次啟動(dòng)服務(wù)器之前,我們需要用一些示例數(shù)據(jù)填充數(shù)據(jù)庫;我們可以使用專用于本節(jié)的代碼示例中的populate_db.js腳本來執(zhí)行此操作。該腳本將在數(shù)據(jù)庫中創(chuàng)建100K個(gè)隨機(jī)銷售交易。
好的! 現(xiàn)在,一切都準(zhǔn)備好了。 像往常一樣,啟動(dòng)服務(wù)器,我們執(zhí)行以下命令:
node app
請(qǐng)求這個(gè)HTTP接口,訪問至以下URL:
http://localhost:8000/?item=book
但是,為了更好地了解服務(wù)器的性能,我們需要連續(xù)發(fā)送多個(gè)請(qǐng)求;所以,我們創(chuàng)建一個(gè)名為loadTest.js的腳本,它以200 ms的間隔發(fā)送請(qǐng)求。它已經(jīng)被配置為連接到服務(wù)器的URL,因此,要運(yùn)行它,執(zhí)行以下命令:
node loadTest
我們會(huì)看到這20個(gè)請(qǐng)求需要一段時(shí)間才能完成。注意測(cè)試的總執(zhí)行時(shí)間,因?yàn)槲覀儸F(xiàn)在開始我們的服務(wù),并測(cè)量我們可以節(jié)省多少時(shí)間。
批量異步請(qǐng)求在處理異步操作時(shí),最基本的緩存級(jí)別可以通過將一組調(diào)用集中到同一個(gè)API來實(shí)現(xiàn)。這非常簡(jiǎn)單:如果我們?cè)谡{(diào)用異步函數(shù)的同時(shí)在隊(duì)列中還有另一個(gè)尚未處理的回調(diào),我們可以將回調(diào)附加到已經(jīng)運(yùn)行的操作上,而不是創(chuàng)建一個(gè)全新的請(qǐng)求。看下圖的情況:
前面的圖像顯示了兩個(gè)客戶端(它們可以是兩臺(tái)不同的機(jī)器,或兩個(gè)不同的Web請(qǐng)求),使用完全相同的輸入調(diào)用相同的異步操作。 當(dāng)然,描述這種情況的自然方式是由兩個(gè)客戶開始兩個(gè)多帶帶的操作,這兩個(gè)操作將在兩個(gè)不同的時(shí)刻完成,如前圖所示。現(xiàn)在考慮下一個(gè)場(chǎng)景,如下圖所示:
上圖向我們展示了如何對(duì)API的兩個(gè)請(qǐng)求進(jìn)行批處理,或者換句話說,對(duì)兩個(gè)請(qǐng)求執(zhí)行到相同的操作。通過這樣做,當(dāng)操作完成時(shí),兩個(gè)客戶端將同時(shí)被通知。這代表了一種簡(jiǎn)單而又非常強(qiáng)大的方式來降低應(yīng)用程序的負(fù)載,而不必處理更復(fù)雜的緩存機(jī)制,這通常需要適當(dāng)?shù)膬?nèi)存管理和緩存失效策略。
在電子商務(wù)銷售的Web服務(wù)器中使用批處理現(xiàn)在讓我們?cè)?b>totalSales API上添加一個(gè)批處理層。我們要使用的模式非常簡(jiǎn)單:如果在API被調(diào)用時(shí)已經(jīng)有另一個(gè)相同的請(qǐng)求掛起,我們將把這個(gè)回調(diào)添加到一個(gè)隊(duì)列中。當(dāng)異步操作完成時(shí),其隊(duì)列中的所有回調(diào)立即被調(diào)用。
現(xiàn)在,讓我們來改變之前的代碼:創(chuàng)建一個(gè)名為totalSalesBatch.js的新模塊。在這里,我們將在原始的totalSales API之上實(shí)現(xiàn)一個(gè)批處理層:
const totalSales = require("./totalSales"); const queues = {}; module.exports = function totalSalesBatch(item, callback) { if(queues[item]) { // [1] console.log("Batching operation"); return queues[item].push(callback); } queues[item] = [callback]; // [2] totalSales(item, (err, res) => { const queue = queues[item]; // [3] queues[item] = null; queue.forEach(cb => cb(err, res)); }); };
totalSalesBatch()函數(shù)是原始的totalSales() API的代理,它的工作原理如下:
如果請(qǐng)求的item已經(jīng)存在隊(duì)列中,則意味著該特定item的請(qǐng)求已經(jīng)在服務(wù)器任務(wù)隊(duì)列中。在這種情況下,我們所要做的只是將回調(diào)push到現(xiàn)有隊(duì)列,并立即從調(diào)用中返回。不進(jìn)行后續(xù)操作。
如果請(qǐng)求的item沒有在隊(duì)列中,這意味著我們必須創(chuàng)建一個(gè)新的請(qǐng)求。為此,我們?yōu)樵撎囟?b>item的請(qǐng)求創(chuàng)建一個(gè)新隊(duì)列,并使用當(dāng)前回調(diào)函數(shù)對(duì)其進(jìn)行初始化。 接下來,我們調(diào)用原始的totalSales() API。
當(dāng)原始的totalSales()請(qǐng)求完成時(shí),則執(zhí)行我們的回調(diào)函數(shù),我們遍歷隊(duì)列中為該特定請(qǐng)求的item添加的所有回調(diào),并分別調(diào)用這些回調(diào)函數(shù)。
totalSalesBatch()函數(shù)的行為與原始的totalSales() API的行為相同,不同之處在于,現(xiàn)在對(duì)于相同內(nèi)容的請(qǐng)求API進(jìn)行批處理,從而節(jié)省時(shí)間和資源。
想知道相比于totalSales() API原始的非批處理版本,在性能方面的優(yōu)勢(shì)是什么?然后,讓我們將HTTP服務(wù)器使用的totalSales模塊替換為我們剛剛創(chuàng)建的模塊,修改app.js文件如下:
//const totalSales = require("./totalSales"); const totalSales = require("./totalSalesBatch"); http.createServer(function(req, res) { // ... });
如果我們現(xiàn)在嘗試再次啟動(dòng)服務(wù)器并進(jìn)行負(fù)載測(cè)試,我們首先看到的是請(qǐng)求被批量返回。
除此之外,我們觀察到請(qǐng)求的總時(shí)間大大減少;它應(yīng)該至少比對(duì)原始totalSales() API執(zhí)行的原始測(cè)試快四倍!
這是一個(gè)驚人的結(jié)果,證明了只需應(yīng)用一個(gè)簡(jiǎn)單的批處理層即可獲得巨大的性能提升,比起緩存機(jī)制,也沒有顯得太復(fù)雜,因?yàn)椋瑹o需考慮緩存淘汰策略。
批處理模式在高負(fù)載應(yīng)用程序和執(zhí)行較為緩慢的API中發(fā)揮巨大作用,正是由于這種模式的運(yùn)用,可以批量處理大量的請(qǐng)求。異步請(qǐng)求緩存策略
異步批處理模式的問題之一是對(duì)于API的答復(fù)越快,我們對(duì)于批處理來說,其意義就越小。有人可能會(huì)爭(zhēng)辯說,如果一個(gè)API已經(jīng)很快了,那么試圖優(yōu)化它就沒有意義了。然而,它仍然是一個(gè)占用應(yīng)用程序的資源負(fù)載的因素,總結(jié)起來,仍然可以有解決方案。另外,如果API調(diào)用的結(jié)果不會(huì)經(jīng)常改變;因此,這時(shí)候批處理將并不會(huì)有較好的性能提升。在這種情況下,減少應(yīng)用程序負(fù)載并提高響應(yīng)速度的最佳方案肯定是更好的緩存模式。
緩存模式很簡(jiǎn)單:一旦請(qǐng)求完成,我們將其結(jié)果存儲(chǔ)在緩存中,該緩存可以是變量,數(shù)據(jù)庫中的條目,也可以是專門的緩存服務(wù)器。因此,下一次調(diào)用API時(shí),可以立即從緩存中檢索結(jié)果,而不是產(chǎn)生另一個(gè)請(qǐng)求。
對(duì)于一個(gè)有經(jīng)驗(yàn)的開發(fā)人員來說,緩存不應(yīng)該是多么新的技術(shù),但是異步編程中這種模式的不同之處在于它應(yīng)該與批處理結(jié)合在一起,以達(dá)到最佳效果。原因是因?yàn)槎鄠€(gè)請(qǐng)求可能并發(fā)運(yùn)行,而沒有設(shè)置緩存,并且當(dāng)這些請(qǐng)求完成時(shí),緩存將會(huì)被設(shè)置多次,這樣做則會(huì)造成緩存資源的浪費(fèi)。
基于這些假設(shè),異步請(qǐng)求緩存模式的最終結(jié)構(gòu)如下圖所示:
上圖給出了異步緩存算法的兩個(gè)步驟:
與批處理模式完全相同,與在未設(shè)置高速緩存時(shí)接收到的任何請(qǐng)求將一起批處理。這些請(qǐng)求完成時(shí),緩存將會(huì)被設(shè)置一次。
當(dāng)緩存最終被設(shè)置時(shí),任何后續(xù)的請(qǐng)求都將直接從緩存中提供。
另外我們需要考慮Zalgo的反作用(我們已經(jīng)在Chapter 2-Node.js Essential Patterns中看到了它的實(shí)際應(yīng)用)。在處理異步API時(shí),我們必須確保始終以異步方式返回緩存的值,即使訪問緩存只涉及同步操作。
在電子商務(wù)銷售的Web服務(wù)器中使用異步緩存請(qǐng)求實(shí)踐異步緩存模式的優(yōu)點(diǎn),現(xiàn)在讓我們將我們學(xué)到的東西應(yīng)用到totalSales() API。
與異步批處理示例程序一樣,我們創(chuàng)建一個(gè)代理,其作用是添加緩存層。
然后創(chuàng)建一個(gè)名為totalSalesCache.js的新模塊,代碼如下:
const totalSales = require("./totalSales"); const queues = {}; const cache = {}; module.exports = function totalSalesBatch(item, callback) { const cached = cache[item]; if (cached) { console.log("Cache hit"); return process.nextTick(callback.bind(null, null, cached)); } if (queues[item]) { console.log("Batching operation"); return queues[item].push(callback); } queues[item] = [callback]; totalSales(item, (err, res) => { if (!err) { cache[item] = res; setTimeout(() => { delete cache[item]; }, 30 * 1000); //30 seconds expiry } const queue = queues[item]; queues[item] = null; queue.forEach(cb => cb(err, res)); }); };
我們可以看到前面的代碼與我們異步批處理的很多地方基本相同。 其實(shí)唯一的區(qū)別是以下幾點(diǎn):
我們需要做的第一件事就是檢查緩存是否被設(shè)置,如果是這種情況,我們將立即使用callback()返回緩存的值,這里必須要使用process.nextTick(),因?yàn)榫彺婵赡苁钱惒皆O(shè)定的,需要等到下一次事件輪詢時(shí)才能夠保證緩存已經(jīng)被設(shè)定。
繼續(xù)異步批處理模式,但是這次,當(dāng)原始API成功完成時(shí),我們將結(jié)果保存到緩存中。此外,我們還設(shè)置了一個(gè)緩存淘汰機(jī)制,在30秒后使緩存失效。 一個(gè)簡(jiǎn)單而有效的技術(shù)!
現(xiàn)在,我們準(zhǔn)備嘗試我們剛創(chuàng)建的totalSales模塊。 先更改app.js模塊,如下所示:
// const totalSales = require("./totalSales"); // const totalSales = require("./totalSalesBatch"); const totalSales = require("./totalSalesCache"); http.createServer(function(req, res) { // ... });
現(xiàn)在,重新啟動(dòng)服務(wù)器,并使用loadTest.js腳本進(jìn)行配置,就像我們?cè)谇懊娴睦又兴龅哪菢印J褂媚J(rèn)的測(cè)試參數(shù),與簡(jiǎn)單的異步批處理模式相比,很明顯地有了更好的性能提升。 當(dāng)然,這很大程度上取決于很多因素;例如收到的請(qǐng)求數(shù)量,以及一個(gè)請(qǐng)求和另一個(gè)請(qǐng)求之間的延遲等。當(dāng)請(qǐng)求數(shù)量較高且跨越較長(zhǎng)時(shí)間時(shí),使用高速緩存批處理的優(yōu)勢(shì)將更為顯著。
Memoization被稱做緩存函數(shù)調(diào)用的結(jié)果的算法。 在npm中,你可以找到許多包來實(shí)現(xiàn)異步的memoization,其中最著名的之一之一是memoizee。有關(guān)實(shí)現(xiàn)緩存機(jī)制的說明
我們必須記住,在實(shí)際應(yīng)用中,我們可能想要使用更先進(jìn)的失效技術(shù)和存儲(chǔ)機(jī)制。 這可能是必要的,原因如下:
大量的緩存值可能會(huì)消耗大量?jī)?nèi)存。 在這種情況下,可以應(yīng)用最近最少使用(LRU)算法來保持恒定的存儲(chǔ)器利用率。
當(dāng)應(yīng)用程序分布在多個(gè)進(jìn)程中時(shí),對(duì)緩存使用簡(jiǎn)單變量可能會(huì)導(dǎo)致每個(gè)服務(wù)器實(shí)例返回不同的結(jié)果。如果這對(duì)于我們正在實(shí)現(xiàn)的特定應(yīng)用程序來說是不希望的,那么解決方案就是使用共享存儲(chǔ)來存儲(chǔ)緩存。 常用的解決方案是Redis和Memcached。
與定時(shí)淘汰緩存相比,手動(dòng)淘汰高速緩存可使得高速緩存使用壽命更長(zhǎng),同時(shí)提供更新的數(shù)據(jù),但當(dāng)然,管理起緩存來要復(fù)雜得多。
使用Promise進(jìn)行批處理和緩存在Chapter4-Asynchronous Control Flow Patterns with ES2015 and Beyond中,我們看到了Promise如何極大地簡(jiǎn)化我們的異步代碼,但是在處理批處理和緩存時(shí),它則可以提供更大的幫助。
利用Promise進(jìn)行異步批處理和緩存策略,有如下兩個(gè)優(yōu)點(diǎn):
多個(gè)then()監(jiān)聽器可以附加到相同的Promise實(shí)例。
then()監(jiān)聽器最多保證被調(diào)用一次,即使在Promise已經(jīng)被resolve了之后,then()也能正常工作。 此外,then()總是會(huì)被保證其是異步調(diào)用的。
簡(jiǎn)而言之,第一個(gè)優(yōu)點(diǎn)正是批處理請(qǐng)求所需要的,而第二個(gè)優(yōu)點(diǎn)則在Promise已經(jīng)是解析值的緩存時(shí),也會(huì)提供同樣的的異步返回緩存值的機(jī)制。
下面開始看代碼,我們可以嘗試使用Promises為totalSales()創(chuàng)建一個(gè)模塊,在其中添加批處理和緩存功能。創(chuàng)建一個(gè)名為totalSalesPromises.js的新模塊:
const pify = require("pify"); // [1] const totalSales = pify(require("./totalSales")); const cache = {}; module.exports = function totalSalesPromises(item) { if (cache[item]) { // [2] return cache[item]; } cache[item] = totalSales(item) // [3] .then(res => { // [4] setTimeout(() => {delete cache[item]}, 30 * 1000); //30 seconds expiry return res; }) .catch(err => { // [5] delete cache[item]; throw err; }); return cache[item]; // [6] };
Promise確實(shí)很好,下面是上述函數(shù)的功能描述:
首先,我們需要一個(gè)名為pify的模塊,它允許我們對(duì)totalSales()模塊進(jìn)行promisification。這樣做之后,totalSales()將返回一個(gè)符合ES2015標(biāo)準(zhǔn)的Promise實(shí)例,而不是接受一個(gè)回調(diào)函數(shù)作為參數(shù)。
當(dāng)調(diào)用totalSalesPromises()時(shí),我們檢查給定的項(xiàng)目類型是否已經(jīng)在緩存中有相應(yīng)的Promise。如果我們已經(jīng)有了這樣的Promise,我們直接返回這個(gè)Promise實(shí)例。
如果我們?cè)诰彺嬷袥]有針對(duì)給定項(xiàng)目類型的Promise,我們繼續(xù)通過調(diào)用原始(promisified)的totalSales()來創(chuàng)建一個(gè)Promise實(shí)例。
當(dāng)Promise正常resolve了,我們?cè)O(shè)置了一個(gè)清除緩存的時(shí)間(假設(shè)為30秒),我們返回res將操作的結(jié)果返回給應(yīng)用程序。
如果Promise被異常reject了,我們立即重置緩存,并再次拋出錯(cuò)誤,將其傳播到Promise chain中,所以任何附加到相同Promise的其他應(yīng)用程序也將收到這一異常。
最后,我們返回我們剛才創(chuàng)建或者緩存的Promise實(shí)例。
非常簡(jiǎn)單直觀,更重要的是,我們使用Promise也能夠?qū)崿F(xiàn)批處理和緩存。
如果我們現(xiàn)在要嘗試使用totalSalesPromise()函數(shù),稍微調(diào)整app.js模塊,因?yàn)楝F(xiàn)在使用Promise而不是回調(diào)函數(shù)。 讓我們通過創(chuàng)建一個(gè)名為appPromises.js的app模塊來實(shí)現(xiàn):
const http = require("http"); const url = require("url"); const totalSales = require("./totalSalesPromises"); http.createServer(function(req, res) { const query = url.parse(req.url, true).query; totalSales(query.item).then(function(sum) { res.writeHead(200); res.end(`Total sales for item ${query.item} is ${sum}`); }); }).listen(8000, function() {console.log("Started")});
它的實(shí)現(xiàn)與原始應(yīng)用程序模塊幾乎完全相同,不同的是現(xiàn)在我們使用的是基于Promise的批處理/緩存封裝版本; 因此,我們調(diào)用它的方式也略有不同。
運(yùn)行以下命令開啟這個(gè)新版本的服務(wù)器:
node appPromises運(yùn)行與CPU-bound的任務(wù)
雖然上面的totalSales()在系統(tǒng)資源上面消耗較大,但是其也不會(huì)影響服務(wù)器處理并發(fā)的能力。 我們?cè)?b>Chapter1-Welcome to the Node.js Platform中了解到有關(guān)事件循環(huán)的內(nèi)容,應(yīng)該為此行為提供解釋:調(diào)用異步操作會(huì)導(dǎo)致堆棧退回到事件循環(huán),從而使其免于處理其他請(qǐng)求。
但是,當(dāng)我們運(yùn)行一個(gè)長(zhǎng)時(shí)間的同步任務(wù)時(shí),會(huì)發(fā)生什么情況,從不會(huì)將控制權(quán)交還給事件循環(huán)?
這種任務(wù)也被稱為CPU-bound,因?yàn)樗闹饕攸c(diǎn)是CPU利用率較高,而不是I/O操作繁重。
讓我們立即舉一個(gè)例子上看看這些類型的任務(wù)在Node.js中的具體行為。
現(xiàn)在讓我們做一個(gè)CPU占用比較高的高計(jì)算量的實(shí)驗(yàn)。下面來看的是子集總和問題,我們計(jì)算一個(gè)數(shù)組中是否具有一個(gè)子數(shù)組,其總和為0。例如,如果我們有數(shù)組[1, 2, -4, 5, -3]作為輸入,則滿足問題的子數(shù)組是[1, 2, -3]和[2, -4, 5, -3]。
最簡(jiǎn)單的算法是把每一個(gè)數(shù)組元素做遍歷然后依次計(jì)算,時(shí)間復(fù)雜度為O(2^n),或者換句話說,它隨著輸入的數(shù)組長(zhǎng)度成指數(shù)增長(zhǎng)。這意味著一組20個(gè)整數(shù)則會(huì)有多達(dá)1, 048, 576中情況,顯然不能夠通過窮舉來做到。當(dāng)然,這個(gè)問題的解決方案可能并不算復(fù)雜。為了使事情變得更加困難,我們將考慮數(shù)組和問題的以下變化:給定一組整數(shù),我們要計(jì)算所有可能的組合,其總和等于給定的任意整數(shù)。
const EventEmitter = require("events").EventEmitter; class SubsetSum extends EventEmitter { constructor(sum, set) { super(); this.sum = sum; this.set = set; this.totalSubsets = 0; } //... }
SubsetSum類是EventEmitter類的子類;這使得我們每次找到一個(gè)匹配收到的總和作為輸入的新子集時(shí)都會(huì)發(fā)出一個(gè)事件。 我們將會(huì)看到,這會(huì)給我們很大的靈活性。
接下來,讓我們看看我們?nèi)绾文軌蛏伤锌赡艿淖蛹M合:
開始構(gòu)建一個(gè)這樣的算法。創(chuàng)建一個(gè)名為subsetSum.js的新模塊。在其中聲明一個(gè)SubsetSum類:
_combine(set, subset) { for(let i = 0; i < set.length; i++) { let newSubset = subset.concat(set[i]); this._combine(set.slice(i + 1), newSubset); this._processSubset(newSubset); } }
不管算法其中到底是什么內(nèi)容,但有兩點(diǎn)要注意:
_combine()方法是完全同步的;它遞歸地生成每一個(gè)可能的子集,而不把CPU控制權(quán)交還給事件循環(huán)。如果我們考慮一下,這對(duì)于不需要任何I/O的算法來說是非常正常的。
每當(dāng)生成一個(gè)新的組合時(shí),我們都會(huì)將這個(gè)組合提供給_processSubset()方法以供進(jìn)一步處理。
_processSubset()方法負(fù)責(zé)驗(yàn)證給定子集的元素總和是否等于我們要查找的數(shù)字:
_processSubset(subset) { console.log("Subset", ++this.totalSubsets, subset); const res = subset.reduce((prev, item) => (prev + item), 0); if (res == this.sum) { this.emit("match", subset); } }
簡(jiǎn)單地說,_processSubset()方法將reduce操作應(yīng)用于子集,以便計(jì)算其元素的總和。然后,當(dāng)結(jié)果總和等于給定的sum參數(shù)時(shí),會(huì)發(fā)出一個(gè)match事件。
最后,調(diào)用start()方法開始執(zhí)行算法:
start() { this._combine(this.set, []); this.emit("end"); }
通過調(diào)用_combine()觸發(fā)算法,最后觸發(fā)一個(gè)end事件,表明所有的組合都被檢查過,并且任何可能的匹配都已經(jīng)被計(jì)算出來。 這是可能的,因?yàn)?b>_combine()是同步的; 因此,只要前面的函數(shù)返回,end事件就會(huì)觸發(fā),這意味著所有的組合都被計(jì)算出來了。
接下來,我們?cè)诰W(wǎng)絡(luò)上公開剛剛創(chuàng)建的算法。可以使用一個(gè)簡(jiǎn)單的HTTP服務(wù)器對(duì)響應(yīng)的任務(wù)作出響應(yīng)。 特別是,我們希望以/subsetSum?data=
在一個(gè)名為app.js的模塊中實(shí)現(xiàn)這個(gè)簡(jiǎn)單的服務(wù)器:
const http = require("http"); const SubsetSum = require("./subsetSum"); http.createServer((req, res) => { const url = require("url").parse(req.url, true); if(url.pathname === "/subsetSum") { const data = JSON.parse(url.query.data); res.writeHead(200); const subsetSum = new SubsetSum(url.query.sum, data); subsetSum.on("match", match => { res.write("Match: " + JSON.stringify(match) + " "); }); subsetSum.on("end", () => res.end()); subsetSum.start(); } else { res.writeHead(200); res.end("Im alive! "); } }).listen(8000, () => console.log("Started"));
由于SubsetSum實(shí)例使用事件返回結(jié)果,所以我們可以在算法生成后立即對(duì)匹配的結(jié)果使用Stream進(jìn)行處理。另一個(gè)需要注意的細(xì)節(jié)是,每次我們的服務(wù)器都會(huì)返回I"m alive!,這樣我們每次發(fā)送一個(gè)不同于/subsetSum的請(qǐng)求的時(shí)候。可以用來檢查我們服務(wù)器是否掛掉了,這在稍后將會(huì)看到。
開始運(yùn)行:
node app
一旦服務(wù)器啟動(dòng),我們準(zhǔn)備發(fā)送我們的第一個(gè)請(qǐng)求;讓我們嘗試發(fā)送一組17個(gè)隨機(jī)數(shù),這將導(dǎo)致產(chǎn)生131,071個(gè)組合,那么服務(wù)器將會(huì)處理一段時(shí)間:
curl -G http://localhost:8000/subsetSum --data-urlencode "data=[116,119,101,101,-116,109,101,-105,-102,117,-115,-97,119,-116,-104,-105,115]"--data-urlencode "sum=0"
這是如果我們?cè)诘谝粋€(gè)請(qǐng)求仍在運(yùn)行的時(shí)候在另一個(gè)終端中嘗試輸入以下命令,我們將發(fā)現(xiàn)一個(gè)巨大的問題:
curl -G http://localhost:8000
我們會(huì)看到直到第一個(gè)請(qǐng)求結(jié)束之前,最后一個(gè)請(qǐng)求一直處于掛起的狀態(tài)。服務(wù)器沒有返回響應(yīng)!這正如我們所想的那樣。Node.js事件循環(huán)運(yùn)行在一個(gè)多帶帶的線程中,如果這個(gè)線程被一個(gè)長(zhǎng)的同步計(jì)算阻塞,它將不能再執(zhí)行一個(gè)循環(huán)來響應(yīng)I"m alive!,
我們必須知道,這種代碼顯然不能夠用于同時(shí)接收到多個(gè)請(qǐng)求的應(yīng)用程序。
但是不要對(duì)Node.js中絕望,我們可以通過幾種方式來解決這種情況。我們來分析一下最常見的兩種方案:
使用setImmediate通常,CPU-bound算法是建立在一定規(guī)則之上的。它可以是一組遞歸調(diào)用,一個(gè)循環(huán),或者基于這些的任何變化/組合。 所以,對(duì)于我們的問題,一個(gè)簡(jiǎn)單的解決方案就是在這些步驟完成后(或者在一定數(shù)量的步驟之后),將控制權(quán)交還給事件循環(huán)。這樣,任何待處理的I / O仍然可以在事件循環(huán)在長(zhǎng)時(shí)間運(yùn)行的算法產(chǎn)生CPU的時(shí)間間隔中處理。對(duì)于這個(gè)問題而言,解決這一問題的方式是把算法的下一步在任何可能導(dǎo)致掛起的I/O請(qǐng)求之后運(yùn)行。這聽起來像是setImmediate()方法的完美用例(我們已經(jīng)在Chapter2-Node.js Essential Patterns中介紹過這一API)。
模式:使用setImmediate()交錯(cuò)執(zhí)行長(zhǎng)時(shí)間運(yùn)行的同步任務(wù)。使用setImmediate進(jìn)行子集求和算法的步驟
現(xiàn)在我們來看看這個(gè)模式如何應(yīng)用于子集求和算法。 我們所要做的只是稍微修改一下subsetSum.js模塊。 為方便起見,我們將創(chuàng)建一個(gè)名為subsetSumDefer.js的新模塊,將原始的subsetSum類的代碼作為起點(diǎn)。
我們要做的第一個(gè)改變是添加一個(gè)名為_combineInterleaved()的新方法,它是我們正在實(shí)現(xiàn)的模式的核心:
_combineInterleaved(set, subset) { this.runningCombine++; setImmediate(() => { this._combine(set, subset); if(--this.runningCombine === 0) { this.emit("end"); } }); }
正如我們所看到的,我們所要做的只是使用setImmediate()調(diào)用原始的同步的_combine()方法。然而,現(xiàn)在的問題是因?yàn)樵撍惴ú辉偈峭降模覀兏y以知道何時(shí)已經(jīng)完成了所有的組合的計(jì)算。
為了解決這個(gè)問題,我們必須使用非常類似于我們?cè)?b>Chapter3-Asynchronous Control Flow Patterns with Callbacks看到的異步并行執(zhí)行的模式來追溯_combine()方法的所有正在運(yùn)行的實(shí)例。 當(dāng)_combine()方法的所有實(shí)例都已經(jīng)完成運(yùn)行時(shí),觸發(fā)end事件,通知任何監(jiān)聽器,進(jìn)程需要做的所有動(dòng)作都已經(jīng)完成。
對(duì)于最終子集求和算法的重構(gòu)版本。首先,我們需要將_combine()方法中的遞歸步驟替換為異步:
_combine(set, subset) { for(let i = 0; i < set.length; i++) { let newSubset = subset.concat(set[i]); this._combineInterleaved(set.slice(i + 1), newSubset); this._processSubset(newSubset); } }
通過上面的更改,我們確保算法的每個(gè)步驟都將使用setImmediate()在事件循環(huán)中排隊(duì),在事件循環(huán)隊(duì)列中I / O請(qǐng)求之后執(zhí)行,而不是同步運(yùn)行造成阻塞。
另一個(gè)小調(diào)整是對(duì)于start()方法:
start() { this.runningCombine = 0; this._combineInterleaved(this.set, []); }
在前面的代碼中,我們將_combine()方法的運(yùn)行實(shí)例的數(shù)量初始化為0.我們還通過調(diào)用_combineInterleaved()來將調(diào)用替換為_combine(),并移除了end的觸發(fā),因?yàn)楝F(xiàn)在_combineInterleaved()是異步處理的。
通過這個(gè)最后的改變,我們的子集求和算法現(xiàn)在應(yīng)該能夠通過事件循環(huán)可以運(yùn)行的時(shí)間間隔交替地運(yùn)行其可能大量占用CPU的代碼,并且不會(huì)再造成阻塞。
最后更新app.js模塊,以便它可以使用新版本的SubsetSum:
const http = require("http"); // const SubsetSum = require("./subsetSum"); const SubsetSum = require("./subsetSumDefer"); http.createServer(function(req, res) { // ... })
和之前一樣的方式開始運(yùn)行,結(jié)果如下:
此時(shí),使用異步的方式運(yùn)行,不再會(huì)阻塞CPU了。
interleaving模式正如我們所看到的,在保持應(yīng)用程序的響應(yīng)性的同時(shí)運(yùn)行一個(gè)CPU-bound的任務(wù)并不復(fù)雜,只需要使用setImmediate()把同步執(zhí)行的代碼變?yōu)楫惒綀?zhí)行即可。但是,這不是效率最好的模式;實(shí)際上,延遲執(zhí)行一個(gè)任務(wù)會(huì)額外帶來一個(gè)小的開銷,在這樣的算法中,積少成多,則會(huì)產(chǎn)生重大的影響。這通常是我們?cè)谶\(yùn)行CPU限制任務(wù)時(shí)所需要的最后一件事情,特別是如果我們必須將結(jié)果直接返回給用戶,這應(yīng)該在合理的時(shí)間內(nèi)進(jìn)行響應(yīng)。 緩解這個(gè)問題的一個(gè)可能的解決方案是只有在一定數(shù)量的步驟之后使用setImmediate(),而不是在每一步中使用它。但是這仍然不能解決問題的根源。
記住,這并不是說一旦我們想要通過異步的模式來執(zhí)行CPU-bound的任務(wù),我們就應(yīng)該不惜一切代價(jià)來避免這樣的額外開銷,事實(shí)上,從更廣闊的角度來看,同步任務(wù)并不一定非常漫長(zhǎng)和復(fù)雜,以至于造成麻煩。在繁忙的服務(wù)器中,即使是阻塞事件循環(huán)200毫秒的任務(wù)也會(huì)產(chǎn)生不希望的延遲。 在那些并發(fā)量并不高的服務(wù)器來說,即使產(chǎn)生一定短時(shí)的阻塞,也不會(huì)影響性能,使用交錯(cuò)執(zhí)行setImmediate()可能是避免阻塞事件循環(huán)的最簡(jiǎn)單也是最有效的方法。
process.nextTick()不能用于交錯(cuò)長(zhǎng)時(shí)間運(yùn)行的任務(wù)。正如我們?cè)?b>Chapter1-Welcome to the Node.js Platform中看到的,nextTick()會(huì)在任何未返回的I / O之前調(diào)度,并且在重復(fù)調(diào)用process.nextTick()最終會(huì)導(dǎo)致I / O饑餓。 你可以通過在前面的例子中用process.nextTick()替換setImmediate()來驗(yàn)證。使用多個(gè)進(jìn)程
使用interleaving模式并不是我們用來運(yùn)行CPU-bound任務(wù)的唯一方法;防止事件循環(huán)阻塞的另一種模式是使用子進(jìn)程。我們已經(jīng)知道Node.js在運(yùn)行I / O密集型應(yīng)用程序(如Web服務(wù)器)的時(shí)候是最好的,因?yàn)?b>Node.js可以使得我們可以通過異步來優(yōu)化資源利用率。
所以,我們必須保持應(yīng)用程序響應(yīng)的最好方法是不要在主應(yīng)用程序的上下文中運(yùn)行昂貴的CPU-bound任務(wù),而是使用多帶帶的進(jìn)程。這有三個(gè)主要的優(yōu)點(diǎn):
同步任務(wù)可以全速運(yùn)行,而不需要交錯(cuò)執(zhí)行的步驟
在Node.js中處理進(jìn)程很簡(jiǎn)單,可能比修改一個(gè)使用setImmediate()的算法更容易,并且多進(jìn)程允許我們輕松使用多個(gè)處理器,而無需擴(kuò)展主應(yīng)用程序本身。
如果我們真的需要超高的性能,可以使用低級(jí)語言,如性能良好的C。
Node.js有一個(gè)充足的API庫帶來與外部進(jìn)程交互。 我們可以在child_process模塊中找到我們需要的所有東西。 而且,當(dāng)外部進(jìn)程只是另一個(gè)Node.js程序時(shí),將它連接到主應(yīng)用程序是非常容易的,我們甚至不覺得我們?cè)诒镜貞?yīng)用程序外部運(yùn)行任何東西。這得益于child_process.fork()函數(shù),該函數(shù)創(chuàng)建一個(gè)新的子Node.js進(jìn)程,并自動(dòng)創(chuàng)建一個(gè)通信管道,使我們能夠使用與EventEmitter非常相似的接口交換信息。來看如何用這個(gè)特性來重構(gòu)我們的子集求和算法。
將子集求和任務(wù)委托給其他進(jìn)程重構(gòu)SubsetSum任務(wù)的目標(biāo)是創(chuàng)建一個(gè)多帶帶的子進(jìn)程,負(fù)責(zé)處理CPU-bound的任務(wù),使服務(wù)器的事件循環(huán)專注于處理來自網(wǎng)絡(luò)的請(qǐng)求:
我們將創(chuàng)建一個(gè)名為processPool.js的新模塊,它將允許我們創(chuàng)建一個(gè)正在運(yùn)行的進(jìn)程池。創(chuàng)建一個(gè)新的進(jìn)程代價(jià)昂貴,需要時(shí)間,因此我們需要保持它們不斷運(yùn)行,盡量不要產(chǎn)生中斷,時(shí)刻準(zhǔn)備好處理請(qǐng)求,使我們可以節(jié)省時(shí)間和CPU。此外,進(jìn)程池需要幫助我們限制同時(shí)運(yùn)行的進(jìn)程數(shù)量,以避免將使我們的應(yīng)用程序受到拒絕服務(wù)(DoS)攻擊。
接下來,我們將創(chuàng)建一個(gè)名為subsetSumFork.js的模塊,負(fù)責(zé)抽象子進(jìn)程中運(yùn)行的SubsetSum任務(wù)。 它的角色將與子進(jìn)程進(jìn)行通信,并將任務(wù)的結(jié)果展示為來自當(dāng)前應(yīng)用程序。
最后,我們需要一個(gè)worker(我們的子進(jìn)程),一個(gè)新的Node.js程序,運(yùn)行子集求和算法并將其結(jié)果轉(zhuǎn)發(fā)給父進(jìn)程。
DoS攻擊是企圖使其計(jì)劃用戶無法使用機(jī)器或網(wǎng)絡(luò)資源,例如臨時(shí)或無限中斷或暫停連接到Internet的主機(jī)的服務(wù)。
先從構(gòu)建processPool.js模塊開始:
const fork = require("child_process").fork; class ProcessPool { constructor(file, poolMax) { this.file = file; this.poolMax = poolMax; this.pool = []; this.active = []; this.waiting = []; } //... }
在模塊的第一部分,引入我們將用來創(chuàng)建新進(jìn)程的child_process.fork()函數(shù)。 然后,我們定義ProcessPool的構(gòu)造函數(shù),該構(gòu)造函數(shù)接受表示要運(yùn)行的Node.js程序的文件參數(shù)以及池中運(yùn)行的最大實(shí)例數(shù)poolMax作為參數(shù)。然后我們定義三個(gè)實(shí)例變量:
pool表示的是準(zhǔn)備運(yùn)行的進(jìn)程
active表示的是當(dāng)前正在運(yùn)行的進(jìn)程列表
waiting包含所有這些請(qǐng)求的任務(wù)隊(duì)列,保存由于缺少可用的資源而無法立即實(shí)現(xiàn)的任務(wù)
看ProcessPool類的acquire()方法,它負(fù)責(zé)取出一個(gè)準(zhǔn)備好被使用的進(jìn)程:
acquire(callback) { let worker; if(this.pool.length > 0) { // [1] worker = this.pool.pop(); this.active.push(worker); return process.nextTick(callback.bind(null, null, worker)); } if(this.active.length >= this.poolMax) { // [2] return this.waiting.push(callback); } worker = fork(this.file); // [3] this.active.push(worker); process.nextTick(callback.bind(null, null, worker)); }
函數(shù)邏輯如下:
如果在進(jìn)程池中有一個(gè)準(zhǔn)備好被使用的進(jìn)程,我們只需將其移動(dòng)到active數(shù)組中,然后通過異步的方式調(diào)用其回調(diào)函數(shù)。
如果池中沒有可用的進(jìn)程,或者已經(jīng)達(dá)到運(yùn)行進(jìn)程的最大數(shù)量,必須等待。通過把當(dāng)前回調(diào)放入waiting數(shù)組。
如果我們還沒有達(dá)到運(yùn)行進(jìn)程的最大數(shù)量,我們將使用child_process.fork()創(chuàng)建一個(gè)新的進(jìn)程,將其添加到active列表中,然后調(diào)用其回調(diào)。
ProcessPool類的最后一個(gè)方法是release(),其目的是將一個(gè)進(jìn)程放回進(jìn)程池中:
release(worker) { if(this.waiting.length > 0) { // [1] const waitingCallback = this.waiting.shift(); waitingCallback(null, worker); } this.active = this.active.filter(w => worker !== w); // [2] this.pool.push(worker); }
前面的代碼也很簡(jiǎn)單,其解釋如下:
如果在waiting任務(wù)隊(duì)列里面有任務(wù)需要被執(zhí)行,我們只需為這個(gè)任務(wù)分配一個(gè)進(jìn)程worker執(zhí)行。
否則,如果在waiting任務(wù)隊(duì)列中都沒有需要被執(zhí)行的任務(wù),我們則把active的進(jìn)程列表中的進(jìn)程放回進(jìn)程池中。
正如我們所看到的,進(jìn)程從來沒有中斷,只在為其不斷地重新分配任務(wù),使我們可以通過在每個(gè)請(qǐng)求不重新啟動(dòng)一個(gè)進(jìn)程達(dá)到節(jié)省時(shí)間和空間的目的。然而,重要的是要注意,這可能并不總是最好的選擇,這很大程度上取決于我們的應(yīng)用程序的要求。為減少進(jìn)程池長(zhǎng)期占用內(nèi)存,可能的調(diào)整如下:
在一個(gè)進(jìn)程空閑一段時(shí)間后,終止進(jìn)程,釋放內(nèi)存空間。
添加一個(gè)機(jī)制來終止或重啟沒有響應(yīng)的或者崩潰了的進(jìn)程。
現(xiàn)在我們的ProcessPool類已經(jīng)準(zhǔn)備就緒,我們可以使用它來實(shí)現(xiàn)SubsetSumFork模塊,SubsetSumFork的作用是與子進(jìn)程進(jìn)行通信得到子集求和的結(jié)果。前面曾說到,用child_process.fork()啟動(dòng)一個(gè)進(jìn)程也給了我們創(chuàng)建了一個(gè)簡(jiǎn)單的基于消息的管道,通過實(shí)現(xiàn)subsetSumFork.js模塊來看看它是如何工作的:
const EventEmitter = require("events").EventEmitter; const ProcessPool = require("./processPool"); const workers = new ProcessPool(__dirname + "/subsetSumWorker.js", 2); class SubsetSumFork extends EventEmitter { constructor(sum, set) { super(); this.sum = sum; this.set = set; } start() { workers.acquire((err, worker) => { // [1] worker.send({sum: this.sum, set: this.set}); const onMessage = msg => { if (msg.event === "end") { // [3] worker.removeListener("message", onMessage); workers.release(worker); } this.emit(msg.event, msg.data); // [4] }; worker.on("message", onMessage); // [2] }); } } module.exports = SubsetSumFork;
首先注意,我們?cè)?b>subsetSumWorker.js調(diào)用ProcessPool的構(gòu)造函數(shù)創(chuàng)建ProcessPool實(shí)例。 我們還將進(jìn)程池的最大容量設(shè)置為2。
另外,我們?cè)噲D維持原來的SubsetSum類相同的公共API。實(shí)際上,SubsetSumFork是EventEmitter的子類,它的構(gòu)造函數(shù)接受sum和set,而start()方法則觸發(fā)算法的執(zhí)行,而這個(gè)SubsetSumFork實(shí)例運(yùn)行在一個(gè)多帶帶的進(jìn)程上。調(diào)用start()方法時(shí)會(huì)發(fā)生的情況:
我們?cè)噲D從進(jìn)程池中獲得一個(gè)新的子進(jìn)程。在創(chuàng)建進(jìn)程成功之后,我們嘗試向子進(jìn)程發(fā)送一條消息,包含sum和set。 send()方法是Node.js自動(dòng)提供給child_process.fork()創(chuàng)建的所有進(jìn)程,這實(shí)際上與父子進(jìn)程之間的通信管道有關(guān)。
然后我們開始監(jiān)聽子進(jìn)程返回的任何消息,我們使用on()方法附加一個(gè)新的事件監(jiān)聽器(這也是所有以child_process.fork()創(chuàng)建的進(jìn)程提供的通信通道的一部分)。
在事件監(jiān)聽器中,我們首先檢查是否收到一個(gè)end事件,這意味著SubsetSum所有任務(wù)已經(jīng)完成,在這種情況下,我們刪除onMessage監(jiān)聽器并釋放worker,并將其放回進(jìn)程池中,不再讓其占用內(nèi)存資源和CPU資源。
worker以{event,data}格式生成消息,使得任何時(shí)候一旦子進(jìn)程處理完畢任務(wù),我們?cè)谕獠慷寄芙邮盏竭@一消息。
這就是SubsetSumFork模塊現(xiàn)在我們來實(shí)現(xiàn)這個(gè)worker應(yīng)用程序。
現(xiàn)在我們來創(chuàng)建subsetSumWorker.js模塊,我們的應(yīng)用程序,這個(gè)模塊的全部?jī)?nèi)容將在一個(gè)多帶帶的進(jìn)程中運(yùn)行:
const SubsetSum = require("./subsetSum"); process.on("message", msg => { // [1] const subsetSum = new SubsetSum(msg.sum, msg.set); subsetSum.on("match", data => { // [2] process.send({event: "match", data: data}); }); subsetSum.on("end", data => { process.send({event: "end", data: data}); }); subsetSum.start(); });
由于我們的handler處于一個(gè)多帶帶的進(jìn)程中,我們不必?fù)?dān)心這類CPU-bound任務(wù)阻塞事件循環(huán),所有的HTTP請(qǐng)求將繼續(xù)由主應(yīng)用程序的事件循環(huán)處理,而不會(huì)中斷。
當(dāng)子進(jìn)程開始啟動(dòng)時(shí),父進(jìn)程:
子進(jìn)程立即開始監(jiān)聽來自父進(jìn)程的消息。這可以通過process.on()函數(shù)輕松實(shí)現(xiàn)。我們期望從父進(jìn)程中唯一的消息是為新的SubsetSum任務(wù)提供輸入的消息。只要收到這樣的消息,我們創(chuàng)建一個(gè)SubsetSum類的新實(shí)例,并注冊(cè)match和end事件監(jiān)聽器。最后,我們用subsetSum.start()開始計(jì)算。
每次子集求和算法收到事件時(shí),把結(jié)果它封裝在格式為{event,data}的對(duì)象中,并將其發(fā)送給父進(jìn)程。這些消息然后在subsetSumFork.js模塊中處理,就像我們?cè)谇懊娴恼鹿?jié)中看到的那樣。
注意:當(dāng)子進(jìn)程不是Node.js進(jìn)程時(shí),則上述的通信管道就不可用了。在這種情況下,我們?nèi)匀豢梢酝ㄟ^在暴露于父進(jìn)程的標(biāo)準(zhǔn)輸入流和標(biāo)準(zhǔn)輸出流之上實(shí)現(xiàn)我們自己的協(xié)議來建立父子進(jìn)程通信的接口。多進(jìn)程模式
嘗試新版本的子集求和算法,我們只需要替換HTTP服務(wù)器使用的模塊(文件app.js):
運(yùn)行結(jié)果如下:
更有趣的是,我們也可以嘗試同時(shí)啟動(dòng)兩個(gè)subsetSum任務(wù),我們可以充分看到多核CPU的作用。 相反,如果我們嘗試同時(shí)運(yùn)行三個(gè)subsetSum任務(wù),結(jié)果應(yīng)該是最后一個(gè)啟動(dòng)將掛起。這不是因?yàn)橹鬟M(jìn)程的事件循環(huán)被阻塞,而是因?yàn)槲覀優(yōu)?b>subsetSum任務(wù)設(shè)置了兩個(gè)進(jìn)程的并發(fā)限制。
正如我們所看到的,多進(jìn)程模式比interleaving模式更加強(qiáng)大和靈活;然而,由于單個(gè)機(jī)器提供的CPU和內(nèi)存資源量仍然是一個(gè)硬性限制,所以它仍然不可擴(kuò)展。在這種情況下,將負(fù)載分配到多臺(tái)機(jī)器上,則是更優(yōu)秀的解決辦法。
值得一提的是,在運(yùn)行CPU-bound任務(wù)時(shí),多線程可以成為多進(jìn)程的替代方案。目前,有幾個(gè)npm包公開了一個(gè)用于處理用戶級(jí)模塊的線程的API;其中最流行的是webworker-threads。但是,即使線程更輕量級(jí),完整的進(jìn)程也可以提供更大的靈活性,并具備更高更可靠的容錯(cuò)處理。總結(jié)
本章講述以下三點(diǎn):
異步初始化模塊
批處理和緩存在Node.js異步中的運(yùn)用
使用異步或者多進(jìn)程來處理CPU-bound的任務(wù)
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/90718.html
摘要:原文鏈接在中貫徹單元測(cè)試在團(tuán)隊(duì)合作中,你寫好了一個(gè)函數(shù),供隊(duì)友使用,跑去跟你的隊(duì)友說,你傳個(gè)值進(jìn)去,他就會(huì)返回結(jié)果了。如果你也為社區(qū)貢獻(xiàn)過,想更多人使用的話,加上單元測(cè)試吧,讓你的值得別人信賴。 原文鏈接:BlueSun | 在Nodejs中貫徹單元測(cè)試 在團(tuán)隊(duì)合作中,你寫好了一個(gè)函數(shù),供隊(duì)友使用,跑去跟你的隊(duì)友說,你傳個(gè)A值進(jìn)去,他就會(huì)返回B結(jié)果了。過了一會(huì),你隊(duì)友跑過來說,我傳個(gè)A...
摘要:編寫異步代碼可能是一種不同的體驗(yàn),尤其是對(duì)異步控制流而言。回調(diào)函數(shù)的準(zhǔn)則在編寫異步代碼時(shí),要記住的第一個(gè)規(guī)則是在定義回調(diào)時(shí)不要濫用閉包。為回調(diào)創(chuàng)建命名函數(shù),避免使用閉包,并將中間結(jié)果作為參數(shù)傳遞。 本系列文章為《Node.js Design Patterns Second Edition》的原文翻譯和讀書筆記,在GitHub連載更新,同步翻譯版鏈接。 歡迎關(guān)注我的專欄,之后的博文將在專...
摘要:回調(diào)函數(shù)是在異步操作完成后傳播其操作結(jié)果的函數(shù),總是用來替代同步操作的返回指令。下面的圖片顯示了中事件循環(huán)過程當(dāng)異步操作完成時(shí),執(zhí)行權(quán)就會(huì)交給這個(gè)異步操作開始的地方,即回調(diào)函數(shù)。 本系列文章為《Node.js Design Patterns Second Edition》的原文翻譯和讀書筆記,在GitHub連載更新,同步翻譯版鏈接。 歡迎關(guān)注我的專欄,之后的博文將在專欄同步: Enc...
摘要:如果不能快速返回,就應(yīng)當(dāng)將其遷移到另一個(gè)進(jìn)程中模塊讓開發(fā)人員可以為事件設(shè)置偵聽器和處理器。我們需要給每個(gè)想要響應(yīng)的事件創(chuàng)建偵聽器 Node.js的http服務(wù)器 通過使用HTTP模塊的低級(jí)API,Node.js允許我們創(chuàng)建服務(wù)器和客戶端。剛開始學(xué)node的時(shí)候,我們都會(huì)遇到如下代碼: var http = require(http); http.createServer(funct...
摘要:回調(diào)函數(shù)提供兩個(gè)參數(shù)和,表示有沒有錯(cuò)誤發(fā)生,是文件內(nèi)容。文件關(guān)閉第一個(gè)參數(shù)文件時(shí)傳遞的文件描述符第二個(gè)參數(shù)回調(diào)函數(shù)回調(diào)函數(shù)有一個(gè)參數(shù)錯(cuò)誤,關(guān)閉文件后執(zhí)行。 showImg(//img.mukewang.com/5d3f890d0001836113660768.jpg); 人所缺乏的不是才干而是志向,不是成功的能力而是勤勞的意志。 —— 部爾衛(wèi) 文章同步到github博客:https:/...
閱讀 3677·2021-09-22 15:34
閱讀 1186·2019-08-29 17:25
閱讀 3399·2019-08-29 11:18
閱讀 1371·2019-08-26 17:15
閱讀 1740·2019-08-23 17:19
閱讀 1228·2019-08-23 16:15
閱讀 718·2019-08-23 16:02
閱讀 1335·2019-08-23 15:19