摘要:傳的最后一次參數是一個回調函數,當命令成功或失敗之后會立即被調用。回調函數中,我們明確地處理連接錯誤的情況,設置狀態為,并再次調用重連。如果沒有發生錯誤,調用回調函數結束當前工作項目。嘗試連接的時候,使用增加每次重連的時間間隔。
Node.js 中的隊列
本文轉載自:眾成翻譯
譯者:文藺
鏈接:http://www.zcfy.cc/article/662
原文:http://blog.yld.io/2016/05/10/introducing-queues/
這是深入探索 Node.js 中使用工作隊列(work queues)管理異步工作流的系列文章的第一篇,來自the Node Patterns series。
開始享受吧!
很常見的是,在應用程序流中,應用有著可以異步處理的工作負載。一個常見的例子是發送郵件。比方說,新用戶注冊時,可能需要給 Ta 發送一封確認郵件來確認用戶剛剛輸入的 email 地址是 Ta 自己的。這包括從模板中生成消息,向電子郵件服務提供商發送請求,解析結果,處理任何可能發送的最終錯誤,重試,等等…… 這個流程可能比較復雜,容易出錯,或者在 HTTP 服務器的周期中花費太長時間。不過也有另外一個選擇,可以向持久化存儲中插入一個文檔,該文檔描述我們有一條待發送給這個用戶的消息。另一個進程可能拿到這個文檔,做一些比較重的工作:從模板生成消息,向服務器發送請求,解析錯誤,并在必要的情況下重排這個工作。
此外,系統需要和其他系統整合的情況也很常見。在我曾做過的一些項目中,需要不同的系統之間的進行用戶配置文件的雙向同步:當用戶在一個系統中更新了個人資料,這些變化需要傳遞給其他系統,反之亦然。如果兩個系統之間不需要很強的一致性,資料同步之間有一個小的延遲也許是可接受的,那這項工作就可以使用另一個進程異步處理。
更一般地說,在整個系統中有一個工作隊列將工作生產者和消費者分開,這是一種常見的模式。生產者往工作隊列中插入工作,消費者從隊列中拿到工作并執行需要的任務。
使用這樣的拓撲結構有許多原因和優點,如:
解耦工作生產者和消費者
使重試邏輯更易于實現
跨時間分配工作負載
跨空間(nodes 節點)分配工作負載
異步工作
使外部系統更容易整合(最終的一致性)
讓我們來分析一下其中的一些問題吧。
獨立 (Isolate)發送郵件是許多應用需要做的工作。一個例子是,用戶修改了密碼,一些應用很友好地發送郵件通知用戶有人(最好不是其他人)修改了密碼。現在發送郵件,通常是通過調用第三方郵件提供商提供的 HTTP API來完成的。如果服務緩慢或無法訪問時候會怎樣?你可不想就因為一封郵件發布出去就把密碼給回滾了。當然,你也不想就因為在處理請求失敗時碰到了工作中的一個非重要的部分,使得密碼更改請求就這樣崩掉了。密碼修改后希望可以很快就發送出這封郵件,但不能有如此的代價。
重試 (Retry)還有,修改密碼意味著,你要為這個用戶在兩個系統中都做更改:一個中央用戶數據庫和一個遺留系統(legacy system)。(我知道這很惡心啊,不過我可不止見過一次 —— 現實就這么骨感。)假如第一個成功了、第二個失敗了,咋辦?
在這些情形下,你可以想一直重試直至成功:在遺留系統中更改密碼是一個可以多次重復的結果相同的操作,而郵件也可以重復發送多次。
分布及規模 (Distribute and scale)舉例子,假如遺留系統修改密碼了但未能成功返回通知,如果操作是冪等的,你可以稍后重試。
甚至,非冪等操作也可以從工作隊列處理中嘗到甜頭。比如,你可以將一次貨幣交易插入到工作隊列中 :給每次交易一個通用唯一標識符(UUID, universal unique identifier),稍后接收交易請求的系統可以保證不會發生重復交易。
在這個例子中,你基本只需要擔心工作隊列提供的必要的持久性保證:如果系統故障,你希望將交易丟失的風險降到最低。
另一個將工作生產者和消費者解耦的原因是,你可能想將工作集群規模化:如果任務消耗大量資源,如果任務是重 CPU 型的或者需要大量內存或操作系統資源,你可以將其與應用其他部分分離出來,放到工作隊列中。
在任何應用中,一些操作比其他的要重。這可能會在整個節點引入有差異的工作負載:一個不幸的節點可能因處理太多的高并發業務而負擔過重,而其它節點卻被閑置。使用工作隊列,將具體工作平均分配,可以將影響最小化。
工作隊列的另一個效果是吸收工作峰(absorb work peaks):你可以為工作集群計劃給定的最大容量,并確保容量永遠不會超過。如果工作數量在短時間內急劇上升,工作隊列完全可以解決,遠離工作峰的壓力。
防止崩潰 (Survive crashes)系統監控在這里起到重要作用:你應當持續監控工作隊列的長度,工作時間(完成一項任務的時間),工作占用,以及容量,以確定在高峰時間保證令人滿意的操作時間需要的最佳、最小的資源。
如果你不需要以上任何一點東西,使用持久化工作隊列的一個理由是防止崩潰。即使是同一個進程中的內存隊列也能滿足你的應用需求,持續的隊列使你的應用在進程重啟的時候更具彈性。
好了,理論講得差不多了 —— 我們來看具體實現。
最簡單的案例:內存工作隊列(In-Memory Work Queue)可以設計出的最簡單的工作隊列是一個內存隊列。實現內存隊列可能是個學校的練習(留給讀者)。這里我們使用 Async 的 queue。
假設你在做的這個演示應用和一個控制你的房子的硬件單元相連接。你的 Node.js 應用和該單元通過一個串行端口對話,且有線協議只能同時接受一個掛起的命令。
這個協議被包裝在我們的 domotic.js 模塊中,模塊暴露三個函數:
.connect() - 連接 domotic 模塊
.command() - 發送命令,等待響應
.disconnect() - 切斷與模塊的連接
下面的代碼模擬了這樣一個模塊:
domotic.js:
exports.connect = connect; exports.command = command; exports.disconnect = disconnect; function connect(cb) { setTimeout(cb, 100); // simulate connection } function command(cmd, options, cb) { if (succeeds()) { setTimeout(cb, 100); // simulate command } else { setTimeout(function() { var err = Error("error connecting"); err.code = "ECONN"; cb(err); }, 100); } } function disconnect(cb) { if (cb) setTimeout(cb, 100); // simulate disconnection } function succeeds() { return Math.random() > 0.5; }
注意我們并沒有和任何 domotic 模塊交互;我們只是假裝,100 毫秒后成功調用回調函數。
同樣, .command 函數模擬了連接錯誤: 如果 succeeds() 返回 false,連接失敗,命令失敗,這有 50% 的可能性(我們的 domotic 串行連接很容易出錯)。這使我們能夠測試在發生失敗之后,我們的應用是否會成功重連并重試命令。
然后我們新建另一個模塊,可以在隊列后面發出命令。
domotic_queue.js:
var async = require("async"); var Backoff = require("backoff"); var domotic = require("./domotic"); var connected = false; var queue = async.queue(work, 1); function work(item, cb) { ensureConnected(function() { domotic.command(item.command, item.options, callback); }); function callback(err) { if (err && err.code == "ECONN") { connected = false; work(item); } else cb(err); } } /// command exports.command = pushCommand; function pushCommand(command, options, cb) { var work = { command: command, options: options }; console.log("pushing command", work); queue.push(work, cb); } function ensureConnected(cb) { if (connected) { return cb(); } else { var backoff = Backoff.fibonacci(); backoff.on("backoff", connect); backoff.backoff(); } function connect() { domotic.connect(connected); } function connected(err) { if (err) { backoff.backoff(); } else { connected = true; cb(); } } } /// disconnect exports.disconnect = disconnect; function disconnect() { if (! queue.length()) { domotic.disconnect(); } else { console.log("waiting for the queue to drain before disonnecting"); queue.drain = function() { console.log("disconnecting"); domotic.disconnect(); }; } }
做了不少工作 —— 我們來一段段地分析。
var async = require("async"); var Backoff = require("backoff"); var domotic = require("./domotic");
這里我們引入了一些包:
async - 提供內存隊列的實現
backoff - 讓我們增加每一次失敗后嘗試重新連接的時間間隔
./domotic - 模擬 domotic 的模塊
我們的模塊從連接斷開狀態開始啟動:
`var connected = false;`
建立我們的 async 隊列:
`var queue = async.queue(work, 1);`
這里提供一個叫做 worker 的工作函數(在代碼中進一步定義的)和一個最大并發量 1。我們在這里強制設置,是因為我們定義了 domotic 模塊協議一次只允許一個命令。
然后定義 worker 函數,它每次處理一個隊列元素:
function work(item, cb) { ensureConnected(function() { domotic.command(item.command, item.options, callback); }); function callback(err) { if (err && err.code == "ECONN") { connected = false; work(item); } else cb(err); } }
當我們的 async 隊列加入另一個工作項目,會調用 work 函數,傳遞該工作項目和一個當工作完成時候為我們所調用的回調函數。
對每個工作項目來說,我們要確認已經連接了。一旦連接上,使用工作項目中會有的 command 和 options 屬性,來用 domotic 模塊來執行命令。傳的最后一次參數是一個回調函數,當命令成功或失敗之后會立即被調用。
回調函數中,我們明確地處理連接錯誤的情況,設置 connected 狀態為 false,并再次調用 work重連。
如果沒有發生錯誤,調用回調函數 cb 結束當前工作項目。
function ensureConnected(cb) { if (connected) { return cb(); } else { var backoff = Backoff.fibonacci(); backoff.on("backoff", connect); backoff.backoff(); } function connect() { domotic.connect(connected); } function connected(err) { if (err) { backoff.backoff(); } else { connected = true; cb(); } } }
ensureConnected 函數現在負責處于連接狀態時調用回調或相反情況下嘗試連接。嘗試連接的時候,使用 backoff 增加每次重連的時間間隔。 每次 domotic.connect 函數帶著錯誤被調用,在 backoff 事件觸發之前增加間隔時間。觸發 backoff 時,嘗試連接。一旦連接成功,調用 cb 回調;否則保持重試。
這個模塊暴露一個 .command 函數:
/// command exports.command = pushCommand; function pushCommand(command, options, cb) { var work = { command: command, options: options }; console.log("pushing command", work); queue.push(work, cb); }
這個命令簡單的解析一個工作項目并將其推入隊列。
最后,這個模塊同樣暴露出 .disconnect 函數。
/// disconnect exports.disconnect = disconnect; function disconnect() { if (! queue.length()) { domotic.disconnect(); } else { console.log("waiting for the queue to drain before disonnecting"); queue.drain = function() { console.log("disconnecting"); domotic.disconnect(); }; } }
這里我們只是確保在調用 domotic 模塊的 disconnected 方法之前隊列是空的。如果隊列非空,在真正斷開連接之前會等待其耗盡(drain)。
可選:在隊列未被耗盡的情況下,您可以設置一個超時時間,然后強制斷開連接。
然后我們來新建一個 domotic 客戶端:
client.js:
var domotic = require("./domotic_queue"); for(var i = 0 ; i < 20; i ++) { domotic.command("toggle light", i, function(err) { if (err) throw err; console.log("command finished"); }); } domotic.disconnect();
這里我們并行得向 domotic 模塊添加了 20 個 settime 命令,同時傳遞了回調函數,當命令完成時就會被調用。如果有命令出錯,簡單地拋出錯誤并中斷執行。
添加所有命令之后我們馬上斷開連接,不過模塊會等待所有命令被執行之后才會真正將其斷開。
讓我們在命令行中試一下:
$ node client.js pushing command { command: "toggle light", options: 0 } pushing command { command: "toggle light", options: 1 } pushing command { command: "toggle light", options: 2 } pushing command { command: "toggle light", options: 3 } pushing command { command: "toggle light", options: 4 } pushing command { command: "toggle light", options: 5 } pushing command { command: "toggle light", options: 6 } pushing command { command: "toggle light", options: 7 } pushing command { command: "toggle light", options: 8 } pushing command { command: "toggle light", options: 9 } pushing command { command: "toggle light", options: 10 } pushing command { command: "toggle light", options: 11 } pushing command { command: "toggle light", options: 12 } pushing command { command: "toggle light", options: 13 } pushing command { command: "toggle light", options: 14 } pushing command { command: "toggle light", options: 15 } pushing command { command: "toggle light", options: 16 } pushing command { command: "toggle light", options: 17 } pushing command { command: "toggle light", options: 18 } pushing command { command: "toggle light", options: 19 } waiting for the queue to drain before disonnecting command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished disconnecting
這里我們可以看到,所有命令被立即放到隊列中,并且命令是被一些隨機時間間隔著有序完成的。最后,所有命令完成之后連接切斷。
下一篇文章本系列的下一篇文章,我們將探索如何避免崩潰以及通過持久化工作項目來限制內存影響。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/79779.html
摘要:文件系統請求和相關請求都會放進這個線程池處理其他的請求,如網絡平臺特性相關的請求會分發給相應的系統處理單元參見設計概覽。 譯者按:在 Medium 上看到這篇文章,行文脈絡清晰,闡述簡明利落,果斷點下翻譯按鈕。第一小節背景鋪陳略啰嗦,可以略過。剛開始我給這部分留了個 blah blah blah 直接翻后面的,翻完之后回頭看,考慮完整性才把第一節給補上。接下來的內容干貨滿滿,相信對 N...
摘要:正在失業中的課多周刊第期我們的微信公眾號,更多精彩內容皆在微信公眾號,歡迎關注。若有幫助,請把課多周刊推薦給你的朋友,你的支持是我們最大的動力。是一種禍害譯本文淺談了在中關于的不好之處。淺談超時一運維的排查方式。 正在失業中的《課多周刊》(第3期) 我們的微信公眾號:fed-talk,更多精彩內容皆在微信公眾號,歡迎關注。 若有幫助,請把 課多周刊 推薦給你的朋友,你的支持是我們最大的...
摘要:正在失業中的課多周刊第期我們的微信公眾號,更多精彩內容皆在微信公眾號,歡迎關注。若有幫助,請把課多周刊推薦給你的朋友,你的支持是我們最大的動力。是一種禍害譯本文淺談了在中關于的不好之處。淺談超時一運維的排查方式。 正在失業中的《課多周刊》(第3期) 我們的微信公眾號:fed-talk,更多精彩內容皆在微信公眾號,歡迎關注。 若有幫助,請把 課多周刊 推薦給你的朋友,你的支持是我們最大的...
原文 先說1.1總攬: Reactor模式 Reactor模式中的協調機制Event Loop Reactor模式中的事件分離器Event Demultiplexer 一些Event Demultiplexer處理不了的復雜I/O接口比如File I/O、DNS等 復雜I/O的解決方案 未完待續 前言 nodejs和其他編程平臺的區別在于如何去處理I/O接口,我們聽一個人介紹nodejs,總是...
閱讀 2078·2021-10-08 10:21
閱讀 2471·2021-09-29 09:34
閱讀 3494·2021-09-22 15:51
閱讀 4926·2021-09-22 15:46
閱讀 2314·2021-08-09 13:42
閱讀 3434·2019-08-30 15:52
閱讀 2723·2019-08-29 17:13
閱讀 1555·2019-08-29 11:30