摘要:第二種是主進(jìn)程創(chuàng)建監(jiān)聽后發(fā)送給感興趣的工作進(jìn)程,由工作進(jìn)程負(fù)責(zé)直接接收連接。繼續(xù)看,可以看到它捕獲了事件,并在回調(diào)函數(shù)里面關(guān)閉連接,關(guān)閉本身進(jìn)程,斷開與的通道。參考與引用多進(jìn)程模型和進(jìn)程間通訊源碼解析之
前言
最近用Egg作為底層框架開發(fā)項(xiàng)目,好奇其多進(jìn)程模型的管理實(shí)現(xiàn),于是學(xué)習(xí)了解了一些東西,順便記錄下來。文章如有錯(cuò)誤, 請(qǐng)輕噴
為什么需要多進(jìn)程伴隨科技的發(fā)展, 現(xiàn)在的服務(wù)器基本上都是多核cpu的了。然而,Node是一個(gè)單進(jìn)程單線程語(yǔ)言(對(duì)于開發(fā)者來說是單線程,實(shí)際上不是)。我們都知道,cpu的調(diào)度單位是線程,而基于Node的特性,那么我們每次只能利用一個(gè)cpu。這樣不僅僅利用率極低,而且容錯(cuò)更是不能接受(出錯(cuò)時(shí)會(huì)崩潰整個(gè)程序)。所以,Node有了cluster來協(xié)助我們充分利用服務(wù)器的資源。
cluster工作原理
關(guān)于cluster的工作原理推薦大家看這篇文章,這里簡(jiǎn)單總結(jié)一下:
子進(jìn)程的端口監(jiān)聽會(huì)被hack掉,而是統(tǒng)一由master的內(nèi)部TCP監(jiān)聽,所以不會(huì)出現(xiàn)多個(gè)子進(jìn)程監(jiān)聽同一端口而報(bào)錯(cuò)的現(xiàn)象。
請(qǐng)求統(tǒng)一經(jīng)過master的內(nèi)部TCP,TCP的請(qǐng)求處理邏輯中,會(huì)挑選一個(gè)worker進(jìn)程向其發(fā)送一個(gè)newconn內(nèi)部消息,隨消息發(fā)送客戶端句柄。(這里的挑選有兩種方式,第一種是除Windows外所有平臺(tái)的默認(rèn)方法循環(huán)法,即由主進(jìn)程負(fù)責(zé)監(jiān)聽端口,接收新連接后再將連接循環(huán)分發(fā)給工作進(jìn)程。在分發(fā)中使用了一些內(nèi)置技巧防止工作進(jìn)程任務(wù)過載。第二種是主進(jìn)程創(chuàng)建監(jiān)聽socket后發(fā)送給感興趣的工作進(jìn)程,由工作進(jìn)程負(fù)責(zé)直接接收連接。)
worker進(jìn)程收到句柄后,創(chuàng)建客戶端實(shí)例(net.socket)執(zhí)行具體的業(yè)務(wù)邏輯,然后返回。
如圖:
圖引用出處
先看一下Egg官方文檔的進(jìn)程模型
+--------+ +-------+ | Master |<-------->| Agent | +--------+ +-------+ ^ ^ ^ / | / | / | v v v +----------+ +----------+ +----------+ | Worker 1 | | Worker 2 | | Worker 3 | +----------+ +----------+ +----------+
類型 | 進(jìn)程數(shù)量 | 作用 | 穩(wěn)定性 | 是否運(yùn)行業(yè)務(wù)代碼 |
---|---|---|---|---|
Master | 1 | 進(jìn)程管理,進(jìn)程間消息轉(zhuǎn)發(fā) | 非常高 | 否 |
Agent | 1 | 后臺(tái)運(yùn)行工作(長(zhǎng)連接客戶端) | 高 | 少量 |
Worker | 一般為cpu核數(shù) | 執(zhí)行業(yè)務(wù)代碼 | 一般 | 是 |
大致上就是利用Master作為主線程,啟動(dòng)Agent作為秘書進(jìn)程協(xié)助Worker處理一些公共事務(wù)(日志之類),啟動(dòng)Worker進(jìn)程執(zhí)行真正的業(yè)務(wù)代碼。
多進(jìn)程的實(shí)現(xiàn) 流程相關(guān)代碼首先從Master入手,這里暫時(shí)認(rèn)為Master是最頂級(jí)的進(jìn)程(事實(shí)上還有一個(gè)parent進(jìn)程,待會(huì)再說)。
/** * start egg app * @method Egg#startCluster * @param {Object} options {@link Master} * @param {Function} callback start success callback */ exports.startCluster = function(options, callback) { new Master(options).ready(callback); };
先從Master的構(gòu)造函數(shù)看起
constructor(options) { super(); // 初始化參數(shù) this.options = parseOptions(options); // worker進(jìn)程的管理類 詳情見 Manager及Messenger篇 this.workerManager = new Manager(); // messenger類, 詳情見 Manager及Messenger篇 this.messenger = new Messenger(this); // 設(shè)置一個(gè)ready事件 詳情見get-ready npm包 ready.mixin(this); // 是否為生產(chǎn)環(huán)境 this.isProduction = isProduction(); this.agentWorkerIndex = 0; // 是否關(guān)閉 this.closed = false; ... 接下來看的是ready的回調(diào)函數(shù)及注冊(cè)的各類事件: this.ready(() => { // 將開始狀態(tài)設(shè)置為true this.isStarted = true; const stickyMsg = this.options.sticky ? " with STICKY MODE!" : ""; this.logger.info("[master] %s started on %s (%sms)%s", frameworkPkg.name, this[APP_ADDRESS], Date.now() - startTime, stickyMsg); // 發(fā)送egg-ready至各個(gè)進(jìn)程并觸發(fā)相關(guān)事件 const action = "egg-ready"; this.messenger.send({ action, to: "parent", data: { port: this[REALPORT], address: this[APP_ADDRESS] } }); this.messenger.send({ action, to: "app", data: this.options }); this.messenger.send({ action, to: "agent", data: this.options }); // start check agent and worker status this.workerManager.startCheck(); }); // 注冊(cè)各類事件 this.on("agent-exit", this.onAgentExit.bind(this)); this.on("agent-start", this.onAgentStart.bind(this)); ... // 檢查端口并 Fork一個(gè)Agent detectPort((err, port) => { ... this.forkAgentWorker(); } }); }
綜上, 可以看到Master的構(gòu)造函數(shù)主要是初始化和注冊(cè)各類相應(yīng)的事件, 最后運(yùn)行的是forkAgentWorker函數(shù), 該函數(shù)的關(guān)鍵代碼可以看到:
const agentWorkerFile = path.join(__dirname, "agent_worker.js"); // 通過child_process執(zhí)行一個(gè)Agent const agentWorker = childprocess.fork(agentWorkerFile, args, opt);
繼續(xù)到agent_worker.js上面看,agent_worker實(shí)例化一個(gè)agent對(duì)象,agent_worker.js有一句關(guān)鍵代碼:
agent.ready(() => { agent.removeListener("error", startErrorHandler); // 清除錯(cuò)誤監(jiān)聽的事件 process.send({ action: "agent-start", to: "master" }); // 向master發(fā)送一個(gè)agent-start的動(dòng)作 });
可以看到, agent_worker.js中的代碼向master發(fā)出了一個(gè)信息, 動(dòng)作為agent-start, 再回到Master中, 可以看到其注冊(cè)了兩個(gè)事件, 分別為once的forkAppWorkers和 on的onAgentStart
this.on("agent-start", this.onAgentStart.bind(this)); this.once("agent-start", this.forkAppWorkers.bind(this));
先看onAgentStart函數(shù), 這個(gè)函數(shù)相對(duì)簡(jiǎn)單, 就是一些信息的傳遞:
onAgentStart() { this.agentWorker.status = "started"; // Send egg-ready when agent is started after launched if (this.isAllAppWorkerStarted) { this.messenger.send({ action: "egg-ready", to: "agent", data: this.options }); } this.messenger.send({ action: "egg-pids", to: "app", data: [ this.agentWorker.pid ] }); // should send current worker pids when agent restart if (this.isStarted) { this.messenger.send({ action: "egg-pids", to: "agent", data: this.workerManager.getListeningWorkerIds() }); } this.messenger.send({ action: "agent-start", to: "app" }); this.logger.info("[master] agent_worker#%s:%s started (%sms)", this.agentWorker.id, this.agentWorker.pid, Date.now() - this.agentStartTime); }
然后會(huì)執(zhí)行forkAppWorkers函數(shù),該函數(shù)主要是借助cfork包fork對(duì)應(yīng)的工作進(jìn)程, 并注冊(cè)一系列相關(guān)的監(jiān)聽事件,
... cfork({ exec: this.getAppWorkerFile(), args, silent: false, count: this.options.workers, // don"t refork in local env refork: this.isProduction, }); ... // 觸發(fā)app-start事件 cluster.on("listening", (worker, address) => { this.messenger.send({ action: "app-start", data: { workerPid: worker.process.pid, address }, to: "master", from: "app", }); });
可以看到forkAppWorkers函數(shù)在監(jiān)聽Listening事件時(shí),會(huì)觸發(fā)master上的app-start事件。
this.on("app-start", this.onAppStart.bind(this)); ... // master ready回調(diào)觸發(fā) if (this.options.sticky) { this.startMasterSocketServer(err => { if (err) return this.ready(err); this.ready(true); }); } else { this.ready(true); } // ready回調(diào) 發(fā)送egg-ready狀態(tài)到各個(gè)進(jìn)程 const action = "egg-ready"; this.messenger.send({ action, to: "parent", data: { port: this[REALPORT], address: this[APP_ADDRESS] } }); this.messenger.send({ action, to: "app", data: this.options }); this.messenger.send({ action, to: "agent", data: this.options }); // start check agent and worker status if (this.isProduction) { this.workerManager.startCheck(); }
總結(jié)下:
Master.constructor: 先執(zhí)行Master的構(gòu)造函數(shù), 里面有個(gè)detect函數(shù)被執(zhí)行
Detect: Detect => forkAgentWorker()
forkAgentWorker: 獲取Agent進(jìn)程, 向master觸發(fā)agent-start事件
執(zhí)行onAgentStart函數(shù), 執(zhí)行forkAppWorker函數(shù)(once)
onAgentStart => 發(fā)送各類信息, forkAppWorker => 向master觸發(fā) app-start事件
App-start事件 觸發(fā) onAppStart()方法
onAppStart => 設(shè)置ready(true) => 執(zhí)行ready的回調(diào)函數(shù)
Ready() = > 發(fā)送egg-ready到各個(gè)進(jìn)程并觸發(fā)相關(guān)事件, 執(zhí)行startCheck()函數(shù)
+---------+ +---------+ +---------+ | Master | | Agent | | Worker | +---------+ +----+----+ +----+----+ | fork agent | | +-------------------->| | | agent ready | | |<--------------------+ | | | fork worker | +----------------------------------------->| | worker ready | | |<-----------------------------------------+ | Egg ready | | +-------------------->| | | Egg ready | | +----------------------------------------->|進(jìn)程守護(hù)
根據(jù)官方文檔,進(jìn)程守護(hù)主要是依賴于graceful和egg-cluster這兩個(gè)庫(kù)。
未捕獲異常
關(guān)閉異常 Worker 進(jìn)程所有的 TCP Server(將已有的連接快速斷開,且不再接收新的連接),斷開和 Master 的 IPC 通道,不再接受新的用戶請(qǐng)求。
Master 立刻 fork 一個(gè)新的 Worker 進(jìn)程,保證在線的『工人』總數(shù)不變。
異常 Worker 等待一段時(shí)間,處理完已經(jīng)接受的請(qǐng)求后退出。
+---------+ +---------+ | Worker | | Master | +---------+ +----+----+ | uncaughtException | +------------+ | | | | +---------+ | <----------+ | | Worker | | | +----+----+ | disconnect | fork a new worker | +-------------------------> + ---------------------> | | wait... | | | exit | | +-------------------------> | | | | | die | | | | | |
由執(zhí)行的app文件可知, app實(shí)際上是繼承于Application類, 該類下面調(diào)用了graceful()。
onServer(server) { ...... graceful({ server: [ server ], error: (err, throwErrorCount) => { ...... }, }); ...... }
繼續(xù)看graceful, 可以看到它捕獲了process.on("uncaughtException")事件, 并在回調(diào)函數(shù)里面關(guān)閉TCP連接, 關(guān)閉本身進(jìn)程, 斷開與master的IPC通道。
process.on("uncaughtException", function (err) { ...... // 對(duì)http連接設(shè)置 Connection: close響應(yīng)頭 servers.forEach(function (server) { if (server instanceof http.Server) { server.on("request", function (req, res) { // Let http server set `Connection: close` header, and close the current request socket. req.shouldKeepAlive = false; res.shouldKeepAlive = false; if (!res._header) { res.setHeader("Connection", "close"); } }); } }); // 設(shè)置一個(gè)定時(shí)函數(shù)關(guān)閉子進(jìn)程, 并退出本身進(jìn)程 // make sure we close down within `killTimeout` seconds var killtimer = setTimeout(function () { console.error("[%s] [graceful:worker:%s] kill timeout, exit now.", Date(), process.pid); if (process.env.NODE_ENV !== "test") { // kill children by SIGKILL before exit killChildren(function() { // 退出本身進(jìn)程 process.exit(1); }); } }, killTimeout); // But don"t keep the process open just for that! // If there is no more io waitting, just let process exit normally. if (typeof killtimer.unref === "function") { // only worked on node 0.10+ killtimer.unref(); } var worker = options.worker || cluster.worker; // cluster mode if (worker) { try { // 關(guān)閉TCP連接 for (var i = 0; i < servers.length; i++) { var server = servers[i]; server.close(); } } catch (er1) { ...... } try { // 關(guān)閉ICP通道 worker.disconnect(); } catch (er2) { ...... } } });
ok, 關(guān)閉了IPC通道后, 我們繼續(xù)看cfork文件, 即上面提到的fork worker的包, 里面監(jiān)聽了子進(jìn)程的disconnect事件, 他會(huì)根據(jù)條件判斷是否重新fork一個(gè)新的子進(jìn)程
cluster.on("disconnect", function (worker) { ...... // 存起該pid disconnects[worker.process.pid] = utility.logDate(); if (allow()) { // fork一個(gè)新的子進(jìn)程 newWorker = forkWorker(worker._clusterSettings); newWorker._clusterSettings = worker._clusterSettings; } else { ...... } });
一般來說, 這個(gè)時(shí)候會(huì)繼續(xù)等待一會(huì)然后就執(zhí)行了上面說到的定時(shí)函數(shù)了, 即退出進(jìn)程。
OOM、系統(tǒng)異常
關(guān)于這種系統(tǒng)異常, 有時(shí)候在子進(jìn)程中是不能捕獲到的, 我們只能在master中進(jìn)行處理, 也就是cfork包。
cluster.on("exit", function (worker, code, signal) { // 是程序異常的話, 會(huì)通過上面提到的uncatughException重新fork一個(gè)子進(jìn)程, 所以這里就不需要了 var isExpected = !!disconnects[worker.process.pid]; if (isExpected) { delete disconnects[worker.process.pid]; // worker disconnect first, exit expected return; } // 是master殺死的子進(jìn)程, 無需fork if (worker.disableRefork) { // worker is killed by master return; } if (allow()) { newWorker = forkWorker(worker._clusterSettings); newWorker._clusterSettings = worker._clusterSettings; } else { ...... } cluster.emit("unexpectedExit", worker, code, signal); });進(jìn)程間通信(IPC)
上面一直提到各種進(jìn)程間通信,細(xì)心的你可能已經(jīng)發(fā)現(xiàn) cluster 的 IPC 通道只存在于 Master 和 Worker/Agent 之間,Worker 與 Agent 進(jìn)程互相間是沒有的。那么 Worker 之間想通訊該怎么辦呢?是的,通過 Master 來轉(zhuǎn)發(fā)。
廣播消息: agent => all workers +--------+ +-------+ | Master |<---------| Agent | +--------+ +-------+ / | / | / | / | v v v +----------+ +----------+ +----------+ | Worker 1 | | Worker 2 | | Worker 3 | +----------+ +----------+ +----------+ 指定接收方: one worker => another worker +--------+ +-------+ | Master |----------| Agent | +--------+ +-------+ ^ | send to / | worker 2 / | / | / v +----------+ +----------+ +----------+ | Worker 1 | | Worker 2 | | Worker 3 | +----------+ +----------+ +----------+
在master中, 可以看到當(dāng)agent和app被fork時(shí), 會(huì)監(jiān)聽他們的信息, 同時(shí)將信息轉(zhuǎn)化成一個(gè)對(duì)象:
agentWorker.on("message", msg => { if (typeof msg === "string") msg = { action: msg, data: msg }; msg.from = "agent"; this.messenger.send(msg); }); worker.on("message", msg => { if (typeof msg === "string") msg = { action: msg, data: msg }; msg.from = "app"; this.messenger.send(msg); });
可以看到最后調(diào)用的是messenger.send, 而messengeer.send就是根據(jù)from和to來決定將信息發(fā)送到哪里
send(data) { if (!data.from) { data.from = "master"; } ...... // app -> master // agent -> master if (data.to === "master") { debug("%s -> master, data: %j", data.from, data); // app/agent to master this.sendToMaster(data); return; } // master -> parent // app -> parent // agent -> parent if (data.to === "parent") { debug("%s -> parent, data: %j", data.from, data); this.sendToParent(data); return; } // parent -> master -> app // agent -> master -> app if (data.to === "app") { debug("%s -> %s, data: %j", data.from, data.to, data); this.sendToAppWorker(data); return; } // parent -> master -> agent // app -> master -> agent,可能不指定 to if (data.to === "agent") { debug("%s -> %s, data: %j", data.from, data.to, data); this.sendToAgentWorker(data); return; } }
master則是直接根據(jù)action信息emit對(duì)應(yīng)的注冊(cè)事件
sendToMaster(data) { this.master.emit(data.action, data.data); }
而agent和worker則是通過一個(gè)sendmessage包, 實(shí)際上就是調(diào)用下面類似的方法
// 將信息傳給子進(jìn)程 agent.send(data) worker.send(data)
最后, 在agent和app都繼承的基礎(chǔ)類EggApplication上, 調(diào)用了Messenger類, 該類內(nèi)部的構(gòu)造函數(shù)如下:
constructor() { super(); ...... this._onMessage = this._onMessage.bind(this); process.on("message", this._onMessage); } _onMessage(message) { if (message && is.string(message.action)) { // 和master一樣根據(jù)action信息emit對(duì)應(yīng)的注冊(cè)事件 this.emit(message.action, message.data); } }
總結(jié)一下:
思路就是利用事件機(jī)制和IPC通道來達(dá)到各個(gè)進(jìn)程之間的通信。
學(xué)習(xí)過程中有遇到一個(gè)timeout.unref()的函數(shù), 關(guān)于該函數(shù)推薦大家參考這個(gè)問題的6樓回答
總結(jié)從前端思維轉(zhuǎn)到后端思維其實(shí)還是很吃力的,加上Egg的進(jìn)程管理實(shí)現(xiàn)確實(shí)非常厲害, 所以花了很多時(shí)間在各種api和思路思考上。
參考與引用多進(jìn)程模型和進(jìn)程間通訊
Egg 源碼解析之 egg-cluster
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/103585.html
摘要:前言一直混跡社區(qū)突然發(fā)現(xiàn)自己收藏了不少好文但是管理起來有點(diǎn)混亂所以將前端主流技術(shù)做了一個(gè)書簽整理不求最多最全但求最實(shí)用。 前言 一直混跡社區(qū),突然發(fā)現(xiàn)自己收藏了不少好文但是管理起來有點(diǎn)混亂; 所以將前端主流技術(shù)做了一個(gè)書簽整理,不求最多最全,但求最實(shí)用。 書簽源碼 書簽導(dǎo)入瀏覽器效果截圖showImg(https://segmentfault.com/img/bVbg41b?w=107...
摘要:概述本系列文章將從開發(fā)者角度梳理開發(fā)實(shí)時(shí)聯(lián)網(wǎng)游戲后臺(tái)服務(wù)過程中可能面臨的挑戰(zhàn),并針對(duì)性地提供相應(yīng)解決思路,期望幫助開發(fā)者依據(jù)自身游戲特點(diǎn)做出合理的技術(shù)選型。多路復(fù)用避免了讀寫阻塞,減少了上下文切換,提升了利用率和系統(tǒng)吞吐率。 概述:本系列文章將從開發(fā)者角度梳理開發(fā)實(shí)時(shí)聯(lián)網(wǎng)游戲后臺(tái)服務(wù)過程中可能面臨的挑戰(zhàn),并針對(duì)性地提供相應(yīng)解決思路,期望幫助開發(fā)者依據(jù)自身游戲特點(diǎn)做出合理的技術(shù)選型。 關(guān)...
摘要:一進(jìn)程間通信理解間進(jìn)程通信機(jī)制,先了解下進(jìn)程間有哪些通訊機(jī)制歷史發(fā)展按照歷史來源主要有兩大塊的管道,,信號(hào)的消息隊(duì)列,共享內(nèi)存,信號(hào)燈。信號(hào)量主要作為進(jìn)程間,以及進(jìn)程內(nèi)部線程之間的通訊手段。主要依賴,兼容擴(kuò)展實(shí)現(xiàn)方式的進(jìn)程間通信之消息隊(duì)列。 PHP間進(jìn)程如何通信,PHP相關(guān)的服務(wù)的IPC是實(shí)現(xiàn)方式,IPC的思想如何用到項(xiàng)目中。 一、linux進(jìn)程間通信 理解php間進(jìn)程通信機(jī)制,先了解...
摘要:例如,在方法中,如果需要主從進(jìn)程之間建立管道,則通過環(huán)境變量來告知從進(jìn)程應(yīng)該綁定的相關(guān)的文件描述符,這個(gè)特殊的環(huán)境變量后面會(huì)被再次涉及到。 文:正龍(滬江網(wǎng)校Web前端工程師)本文原創(chuàng),轉(zhuǎn)載請(qǐng)注明作者及出處 之前的文章走進(jìn)Node.js之HTTP實(shí)現(xiàn)分析中,大家已經(jīng)了解 Node.js 是如何處理 HTTP 請(qǐng)求的,在整個(gè)處理過程,它僅僅用到單進(jìn)程模型。那么如何讓 Web 應(yīng)用擴(kuò)展到...
摘要:摘要在年云棲大會(huì)北京峰會(huì)的大數(shù)據(jù)專場(chǎng)中,來自阿里云的高級(jí)技術(shù)專家李雪峰帶來了主題為金融級(jí)別大數(shù)據(jù)平臺(tái)的多租戶隔離實(shí)踐的演講。三是運(yùn)行隔離機(jī)制。針對(duì)這一問題,提供了多層隔離嵌套方案以便規(guī)避這種潛在的安全風(fēng)險(xiǎn)。 摘要:在2017年云棲大會(huì)?北京峰會(huì)的大數(shù)據(jù)專場(chǎng)中,來自阿里云的高級(jí)技術(shù)專家李雪峰帶來了主題為《金融級(jí)別大數(shù)據(jù)平臺(tái)的多租戶隔離實(shí)踐》的演講。在分享中,李雪峰首先介紹了基于傳統(tǒng)Iaa...
閱讀 930·2021-11-22 12:09
閱讀 3704·2021-09-27 13:36
閱讀 1390·2021-08-20 09:37
閱讀 4008·2019-12-27 12:22
閱讀 2353·2019-08-30 15:55
閱讀 2359·2019-08-30 13:16
閱讀 2818·2019-08-26 17:06
閱讀 3434·2019-08-23 18:32