摘要:子進程使用反序列化消息字符串為消息對象。在調用這類方法時,遍歷列表中的實例發(fā)送內部消息,子進程列表中的對應項收到內部消息并處理返回,父進程中再結合返回結果和對應著這個類實例維護的信息,保證功能的正確性。
在 Node.js 中,當我們使用 child_process 模塊創(chuàng)建子進程后,會返回一個 ChildProcess 類的實例,通過調用 ChildProcess#send(message[, sendHandle[, options]][, callback]) 方法,我們可以實現(xiàn)與子進程的通信,其中的 sendHandle 參數(shù)支持傳遞 net.Server ,net.Socket 等多種句柄,使用它,我們可以很輕松的實現(xiàn)在進程間轉發(fā) TCP socket:
// parent.js "use stirct" const { createServer } = require("net") const { fork } = require("child_process") const server = createServer() const child = fork("./child.js") server.on("connection", function (socket) { child.send("socket", socket) }) .listen(1337)
// child.js "use strict" process.on("message", function (message, socket) { if (message === "socket") socket.end("Child handled it.") })
$ curl 127.0.0.1:1337 Child handled it.
這時你可能就會疑問,此時 socket 已經(jīng)處在了另一個進程中,那么像 net.Server#getConnections,net.Server#close 等等這些方法,該怎么實現(xiàn)其功能呢?傳遞的句柄都是 JavaScript 對象,它們在傳遞時,序列化和反序列化的機制,又是怎么樣的呢?
讓我們跟著 Node.js 項目中的 lib/child_process.js,lib/internal/child_process.js,lib/internal/process.js 等文件中的代碼,來一探究竟。
序列化與反序列化當使用 child_process 模塊中的 fork 函數(shù)創(chuàng)建 ChildProcess 類的實例時,會在建立 IPC channel 時,初始化 ChildProcess#send 方法:
// lib/internal/child_process.js // ... function setupChannel(target, channel) { // 此處的 target,即為正在創(chuàng)建的 ChildProcess 類實例 target._channel = channel; target._handleQueue = null; // ... target.send = function(message, handle, options, callback) { // ... if (this.connected) { return this._send(message, handle, options, callback); } // ... }; target._send = function(message, handle, options, callback) { assert(this.connected || this._channel); // ... if (handle) { message = { cmd: "NODE_HANDLE", type: null, msg: message }; if (handle instanceof net.Socket) { message.type = "net.Socket"; } else if (handle instanceof net.Server) { message.type = "net.Server"; } else if (handle instanceof TCP || handle instanceof Pipe) { message.type = "net.Native"; } else if (handle instanceof dgram.Socket) { message.type = "dgram.Socket"; } else if (handle instanceof UDP) { message.type = "dgram.Native"; } else { throw new TypeError("This handle type can"t be sent"); } var obj = handleConversion[message.type]; handle = handleConversion[message.type].send.call(target, message, handle, options); // ... var req = new WriteWrap(); req.async = false; var string = JSON.stringify(message) + " "; var err = channel.writeUtf8String(req, string, handle); // ... }; }
從代碼我們可以看到,當我們帶著句柄調用 ChildProcess#send 方法發(fā)送消息時,Node.js 會替我們先將該消息封裝成它的內部消息(將消息包在對象中,且對象擁有一個 cmd 屬性)。句柄的序列化,使用到的是 handleConversion[message.type].send 方法,在傳遞的是 socket 時,即為 handleConversion["net.Socket"].send。
所以關鍵一定就是在 handleConversion 這個對象上了,我們先不著急看它的如山真面如。讓我們先來看看子進程反序列化時的關鍵步驟代碼。
在子進程啟動時,若發(fā)現(xiàn)自己是通過 child_process 模塊創(chuàng)建的進程(環(huán)境變量中帶有 NODE_CHANNEL_FD),則最后也會執(zhí)行上述的 lib/internal/child_process.js 文件中的 setupChannel 初始化函數(shù):
// lib/internal/process.js // ... function setupChannel() { if (process.env.NODE_CHANNEL_FD) { var fd = parseInt(process.env.NODE_CHANNEL_FD, 10); delete process.env.NODE_CHANNEL_FD; var cp = require("child_process"); // ... cp._forkChild(fd); assert(process.send); } } // lib/child_process.js // ... const child_process = require("internal/child_process"); const setupChannel = child_process.setupChannel; exports._forkChild = function(fd) { // ... const control = setupChannel(process, p); };
以下函數(shù)與上上個例子的中函數(shù)為同一個,只不過于子進程中執(zhí)行:
// lib/internal/child_process.js // ... function setupChannel(target, channel) { target._channel = channel; target._handleQueue = null; // ... target.on("internalMessage", function(message, handle) { // ... if (message.cmd !== "NODE_HANDLE") return; var obj = handleConversion[message.type]; obj.got.call(this, message, handle, function(handle) { handleMessage(target, message.msg, handle); }); }); } function handleMessage(target, message, handle) { if (!target._channel) return; var eventName = "message"; if (message !== null && typeof message === "object" && typeof message.cmd === "string" && message.cmd.length > INTERNAL_PREFIX.length && message.cmd.slice(0, INTERNAL_PREFIX.length) === INTERNAL_PREFIX) { eventName = "internalMessage"; } target.emit(eventName, message, handle); }
顯而易見,使用了 handleConversion[message.type].got 來進行句柄的反序列化,使之構建成 JavaScript 對象。所以我們不難想到,句柄序列化 & 反序列化運用的就是,各個 handleConversion[message.type] 對象中提供的同一方法 send 和 got 。打個比方就像 Java 中的這些 class 都實現(xiàn)了同一個 interface:
// lib/internal/child_process.js // ... const handleConversion = { // ... "net.Server": { // ... send: function(message, server, options) { return server._handle; }, got: function(message, handle, emit) { var server = new net.Server(); server.listen(handle, function() { emit(server); }); } }, "net.Socket": { send: function(message, socket, options) { // ... }, got: function(message, handle, emit) { // ... } }, "dgram.Socket": { send: function(message, socket, options) { // ... }, got: function(message, handle, emit) { // ... } } // ... };
所以傳遞的過程:
主進程:
傳遞消息和句柄。
將消息包裝成內部消息,使用 JSON.stringify 序列化為字符串。
通過對應的 handleConversion[message.type].send 方法序列化句柄。
將序列化后的字符串和句柄發(fā)入 IPC channel 。
子進程
使用 JSON.parse 反序列化消息字符串為消息對象。
觸發(fā)內部消息事件(internalMessage)監(jiān)聽器。
將傳遞來的句柄使用 handleConversion[message.type].got 方法反序列化為 JavaScript 對象。
帶著消息對象中的具體消息內容和反序列化后的句柄對象,觸發(fā)用戶級別事件。
net.Server#getConnections 等方法的功能實現(xiàn)由于將 socket 傳遞給了子進程之后,net.Server#getConnections,net.Server#close 等等方法,原來的實現(xiàn)已經(jīng)無效了,為了保證功能,Node.js 又是怎么辦的呢?答案可以大致概括為,父子進程之間,在同一地址下的 socket 傳遞時,各自都額外維護一個關聯(lián)列表存儲這些 socket 信息和 ChildProcess 實例,并且父進程中的 net#Server 類實例自己保存下所有父進程關聯(lián)列表。在調用 net.Server#getConnections 這類方法時,遍歷列表中的 ChildPorcess 實例發(fā)送內部消息,子進程列表中的對應項收到內部消息并處理返回,父進程中再結合返回結果和對應著這個 ChildProcess 類實例維護的 socket 信息,保證功能的正確性。
lib/internal/socket_list.js 這個文件中,分別定義了這兩個列表類,分別名為 SocketListSend 和 SocketListReceive:
// lib/internal/socket_list.js // ... function SocketListSend(slave, key) { EventEmitter.call(this); this.key = key; this.slave = slave; } util.inherits(SocketListSend, EventEmitter); // ... function SocketListReceive(slave, key) { EventEmitter.call(this); this.connections = 0; this.key = key; this.slave = slave; // ... } util.inherits(SocketListReceive, EventEmitter);
然后在 net.Socket 句柄的序列化和反序列化過程中,將句柄和進程推入列表:
// lib/internal/child_process.js // ... const handleConversion = { // ... send: function(message, socket, options) { // ... if (socket.server) { // ... var firstTime = !this._channel.sockets.send[message.key]; var socketList = getSocketList("send", this, message.key); if (firstTime) socket.server._setupSlave(socketList); } // ... return handle; }, got: function(message, handle, emit) { var socket = new net.Socket({handle: handle}); socket.readable = socket.writable = true; if (message.key) { var socketList = getSocketList("got", this, message.key); socketList.add({ socket: socket }); } emit(socket); } } function getSocketList(type, slave, key) { // slave 對象即為當前正在創(chuàng)建的 ChildProcess 類實例 var sockets = slave._channel.sockets[type]; var socketList = sockets[key]; if (!socketList) { var Construct = type === "send" ? SocketListSend : SocketListReceive; socketList = sockets[key] = new Construct(slave, key); } return socketList; } // lib/net.js // ... Server.prototype._setupSlave = function(socketList) { this._usingSlaves = true; this._slaves.push(socketList); };
然后在調用具體方法時,遍歷列表,結合通信來的結果,再返回:
// lib/net.js // ... Server.prototype.getConnections = function(cb) { // ... if (!this._usingSlaves) { return end(null, this._connections); } var left = this._slaves.length; var total = this._connections; function oncount(err, count) { if (err) { left = -1; return end(err); } total += count; if (--left === 0) return end(null, total); } this._slaves.forEach(function(slave) { slave.getConnections(oncount); }); }
即遍歷了 _salves
當我們解析好了 net.Server#getConnections 方法后,其他類似需求方法的解決方案,其實也大同小異,思路是一致的。涉及的東西有點多,上一個簡單的圖示(順序為黑,藍,紅):
最后參考:
https://github.com/nodejs/node/blob/master/lib/child_process.js
https://github.com/nodejs/node/blob/master/lib/net.js
https://github.com/nodejs/node/blob/master/lib/internal/process.js
https://github.com/nodejs/node/blob/master/lib/internal/child_process.js
https://github.com/nodejs/node/blob/master/lib/internal/socket_list.js
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/79380.html
摘要:通常的解決方案,便是使用中自帶的模塊,以模式啟動多個應用實例。最后中的模塊除了上述提到的功能外,其實還提供了非常豐富的供和進程之前通信,對于不同的操作系統(tǒng)平臺,也提供了不同的默認行為。如果大家有閑,非常推薦完整領略一下模塊的代碼實現(xiàn)。 眾所周知,Node.js中的JavaScript代碼執(zhí)行在單線程中,非常脆弱,一旦出現(xiàn)了未捕獲的異常,那么整個應用就會崩潰。這在許多場景下,尤其是web...
摘要:進程間通信的目的是為了讓不同的進程能夠互相訪問資源,并進程協(xié)調工作。這個過程的示意圖如下端口共同監(jiān)聽集群穩(wěn)定之路進程事件自動重啟負載均衡狀態(tài)共享模塊工作原理事件二測試單元測試性能測試三產品化項目工程化部署流程性能日志監(jiān)控報警穩(wěn)定性異構共存 內容 9.玩轉進程10.測試11.產品化 一、玩轉進程 node的單線程只不過是js層面的單線程,是基于V8引擎的單線程,因為,V8的緣故,前后...
摘要:第二種是主進程創(chuàng)建監(jiān)聽后發(fā)送給感興趣的工作進程,由工作進程負責直接接收連接。繼續(xù)看,可以看到它捕獲了事件,并在回調函數(shù)里面關閉連接,關閉本身進程,斷開與的通道。參考與引用多進程模型和進程間通訊源碼解析之 前言 最近用Egg作為底層框架開發(fā)項目,好奇其多進程模型的管理實現(xiàn),于是學習了解了一些東西,順便記錄下來。文章如有錯誤, 請輕噴 為什么需要多進程 伴隨科技的發(fā)展, 現(xiàn)在的服務器基本上...
摘要:而在進程執(zhí)行把進程添加到調度器中時添加了一個回調函數(shù),回調函數(shù)了一個帶的消息,并且為,就是這個消息觸發(fā)了發(fā)送的函數(shù)的執(zhí)行。 最近做了點nodejs項目,對nodejs的cluster怎么利用多進程處理請求產生了疑問,于是著手進行了研究,之后發(fā)現(xiàn)這其中竟大有文章!一切還是先從遙遠的TCP說起吧。。。 TCP與Socket 說到TCP,相信很多人都相當了解了,大學已經(jīng)教過,但是又相信有很多...
閱讀 1220·2021-09-26 09:55
閱讀 3177·2019-08-30 15:55
閱讀 958·2019-08-30 15:53
閱讀 2290·2019-08-30 13:59
閱讀 2375·2019-08-29 13:08
閱讀 1102·2019-08-29 12:19
閱讀 3296·2019-08-26 13:41
閱讀 414·2019-08-26 13:24