摘要:而在進程執行把進程添加到調度器中時添加了一個回調函數,回調函數了一個帶的消息,并且為,就是這個消息觸發了發送的函數的執行。
最近做了點nodejs項目,對nodejs的cluster怎么利用多進程處理請求產生了疑問,于是著手進行了研究,之后發現這其中竟大有文章!一切還是先從遙遠的TCP說起吧。。。
TCP與Socket說到TCP,相信很多人都相當了解了,大學已經教過,但是又相信有很多人也不是很了解,要不是當時沒聽,要不也可能是自身的編程能力不足以去實踐相關內容,寫到這我還特意去翻了一下大學的計算機網絡教材,內容是很豐富的,但教人實踐的內容還是太少了,里面的內容都把學生當成了有相當的Linux編程能力的人了,所以結果就是大部分只上了一年編程課剛學會幾個Hello world程序的大二學生,聽了這門課后一臉懵逼,即使記住了也因為沒什么實踐很快忘了,當年我就是這么懵逼過來的。
所以,扯了這些,結果是什么呢,結果就是我們要多動手!而要動手建立一條TCP連接可以用socket來實現,不過這里不是要說socket用法,只是來簡單聊一聊他們之間的一點小聯系,以便于理解后面的內容。
應用層通過傳輸層進行TCP通信時,有時TCP需要為多個應用程序進程提供并發服務。多個TCP連接或多個應用程序進程可能需要通過同一個TCP協議端口傳輸數據。為了區別不同的應用程序進程和連接,許多計算機操作系統為應用程序與TCP協議交互提供了稱為套接字 (Socket)的接口,區分不同應用程序進程間的網絡通信和連接。
我們可以用一個四元組來確定一條TCP連接(源ip,源端口,目標ip,目標端口),而連接是通過socket來建立的(服務端進行bind和listen->客戶端發起connect->服務端accept),計算機系統就是通過socket來區分不同的TCP連接的。所以我們可以看出來,只要目標ip/端口不同,服務端可以用同一個端口生成多個socket,建立多條連接。
但是,一個進程只能監聽一個端口,一個端口怎么生成多個socket呢?其實服務器端程序一般會把socket和服務器某個端口(ip+端口)bind起來, 這樣構成了一個特殊的socket, 這個socket沒有目標ip和端口。socket進行listen之后當有新的連接進來時, 系統將請求存進隊列(此時TCP握三次手完成), 后續可以再調用accept拿到隊列的請求,返回一個新的socket, 這個socket是由四元組建立的, 也就對應了一個唯一的連接。
說完這些,可以來聊一聊nodejs是怎樣建立一個TCP服務的了。
nodejs createServer啟動TCP服務小解析一般我們用nodejs啟動一個TCP服務可能是這樣的:
require("net").createServer(function(sock) { sock.on("data", function(data) { sock.write("Hello world"); }); }).listen(8080, "127.0.0.1");
進到createServer一看(代碼都在net模塊中),里面return了一個Server對象,Server繼承EventEmitter,將createServer的參數做為connection事件的回調函數,這塊比較簡單就不貼代碼了。我們需要關注的是Server的listen方法,其不同的參數最終都會調用到listenInCluster方法。cluster!是的這和cluster有關,但先不管它,我們先管在主進程中它的執行:
function listenInCluster(server, address, port, addressType, backlog, fd, exclusive) { // ... if (cluster.isMaster || exclusive) { // ... server._listen2(address, port, addressType, backlog, fd); return; } // ... }
從代碼我們可以看到listenInCluster最終是調用了_listen2方法,它就是服務啟動的關鍵,其定義如下:
function setupListenHandle(address, port, addressType, backlog, fd) { // ... var rval = null; // ... if (rval === null) rval = createServerHandle(address, port, addressType, fd); // ... this._handle = rval; // ... this._handle.onconnection = onconnection; this._handle.owner = this; var err = this._handle.listen(backlog || 511); // ... }
其中createServerHandle方法就不展開了,它就如之前所說的:把socket和服務器某個端口(ip+端口)bind起來, 這樣構成了一個特殊的socket, 這個socket沒有目標ip和端口。它綁定了address+port并返回了一個特殊socket(句柄)rval,可以看到最后它調用了listen對端口進行監聽,并且指定了一個回調函數onconnection,函數會在C++層當accept請求時觸發,其回調參數之一就是前面提到的accept后與客戶端連接的新socket句柄。到這里再看一下onconnection的代碼:
function onconnection(err, clientHandle) { // ... var self = handle.owner; var socket = new Socket({ handle: clientHandle, allowHalfOpen: self.allowHalfOpen, pauseOnCreate: self.pauseOnConnect }); socket.readable = socket.writable = true; // ... self.emit("connection", socket); }
可以看到nodejs在對socket句柄進一步封裝后(封裝成nodejs的Socket對象),再觸發server(由createServer創建)的connection事件。這時我們再回到前面createServer的介紹,其監聽了connection事件,所以最終流程走下來createServer的的方法參數將被觸發,并且可以拿到一個nodejs的Socket對象進行write與read操作,與客戶端進行通信。
至此我們已經對nodejs啟動一個TCP服務的流程有了了解,接下來就到主題cluster了。
cluster為我們做了什么開始說代碼之前,先來聊一聊喂鴿子吧。假設你坐在布拉格廣場前靜靜地坐著,然后往前面撒了一把狗糧,喔不對是鴿糧,然后周圍的一群鴿子都震驚了并往你這邊飛搶東西吃。這個現象可以用一個詞來形容就是“驚群“。然而這只是我的瞎掰,我們程序員理解的驚群應該是:多個進程/線程同時阻塞等待某個事件,當事件發生時喚醒了所有等待的進程/線程,但最終只有一個能對事件進行處理。很明顯這對cpu造成了浪費,而cluster的多進程模型對此做了處理:只用一個master進程等待請求,然后有請求到來時使用round-robin輪詢分配請求給各個子進程進行處理,這塊后面提到的源碼會涉及到,這里就不深入了。除了round-robin,還有其他的一些cluster為我們做的,就用代碼來talk吧:
const cluster = require("cluster"); const http = require("http"); if (cluster.isMaster) { const numCPUs = require("os").cpus().length; for (let i = 0; i < numCPUs; i++) { cluster.fork(); } } else { // Worker processes have a http server. http.Server((req, res) => { res.writeHead(200); res.end("hello world "); }).listen(8000); }
以上代碼就是cluster的典型用法,在nodejs啟動文件判斷當前進程,如果當前進程是master進程,那么就根據cpu的核數fork出相同數量的進程,否則(worker進程)就啟動一個http服務,所以一般這樣會給一個核心分配一個worker進程來啟動一個服務,搭起一個小服務集群。但是問題來了,為什么這里可以有多個進程同時監聽一個端口呢,是因為listen做的一些文章,下面再一步步深入解析。由于http.Server其實是繼承了net.Server,所以跟前面創建TCP服務一樣,listen最終也是調用到listenInCluster,我們從這里重新開始。
function listenInCluster(server, address, port, addressType, backlog, fd, exclusive) { // ... const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags: 0 }; // Get the master"s server handle, and listen on it cluster._getServer(server, serverQuery, listenOnMasterHandle); // ... }
listenInCluster在worker進程中調用cluster._getServer,并且傳入了一個函數listenOnMasterHandle。這里還不知道它做了什么,所以再進入cluster._getServer看看(由于當前是在worker進程,cluster模塊文件是lib/internal/cluster/child.js):
cluster._getServer = function(obj, options, cb) { // ... const message = util._extend({ act: "queryServer", index: indexes[indexesKey], data: null }, options); send(message, (reply, handle) => { if (typeof obj._setServerData === "function") obj._setServerData(reply.data); if (handle) shared(reply, handle, indexesKey, cb); // Shared listen socket. else rr(reply, indexesKey, cb); // Round-robin. }); // ... };
關注send方法,它調用了sendHelper方法,該方法是在internal/cluster/utils定義的,相當一個消息轉發器處理進程間通信,它發送一個“進程內部消息“(internalMessage),而worker進程在master進程被fork出來的時候監聽了internalMessage:
// lib/internal/cluster/master.js worker.process.on("internalMessage", internal(worker, onmessage));
所以最終在worker進程發送的消息,觸發了master進程執行了onmessage方法,onmessage判斷message.act === "queryServer"執行queryServer,而就是在這個方法中,新建了一個RoundRobinHandle調度器,就是這個東西分配請求做了負載均衡。這里用地址和端口號作為key將調度器存儲起來,調度器不會被worker創建兩次,最后將worker進程add到隊列。相關代碼如下:
// lib/internal/cluster/master.js function queryServer(worker, message) { // ... var handle = handles[key]; if (handle === undefined) { var constructor = RoundRobinHandle; // ... handles[key] = handle = new constructor(key, message.address, message.port, message.addressType, message.fd, message.flags); } // ... // Set custom server data handle.add(worker, (errno, reply, handle) => { // ... }); }
然后我們再來看看RoundRobinHandle,它里面調用net.createServer方法新建了一個server,并且開始監聽,這塊可以看前面內容。不過與前面不同的是,server在listening事件完成時拿到監聽端口的那個特殊socket句柄,重置了onconnection方法,當新的連接建立時方法被調用,將accept連接的socket句柄分發到隊列里的worker進行處理(distribute)。對于listening事件,它在Server.listen執行后就會觸發,代碼就在setupListenHandle方法里面。RoundRobinHandle代碼如下:
// lib/internal/cluster/round_robin_handle.js function RoundRobinHandle(key, address, port, addressType, fd) { // ... this.server = net.createServer(assert.fail); if (fd >= 0) this.server.listen({ fd }); else if (port >= 0) this.server.listen(port, address); else this.server.listen(address); // UNIX socket path. this.server.once("listening", () => { this.handle = this.server._handle; this.handle.onconnection = (err, handle) => this.distribute(err, handle); // ... }); } RoundRobinHandle.prototype.distribute = function(err, handle) { this.handles.push(handle); const worker = this.free.shift(); if (worker) this.handoff(worker); }; RoundRobinHandle.prototype.handoff = function(worker) { // ... const message = { act: "newconn", key: this.key }; sendHelper(worker.process, message, handle, (reply) => { // ... }); };
從代碼上看到最終調度器調用handoff方法,通過sendHelper向worker進程發送一個新連接到達的消息newconn,執行worker進程的server的onconnection方法,worker進程相關代碼如下:
// lib/internal/cluster/child.js cluster._setupWorker = function() { // ... process.on("internalMessage", internal(worker, onmessage)); send({ act: "online" }); function onmessage(message, handle) { if (message.act === "newconn") onconnection(message, handle); else if (message.act === "disconnect") _disconnect.call(worker, true); } }; // Round-robin connection. function onconnection(message, handle) { const key = message.key; const server = handles[key]; const accepted = server !== undefined; send({ ack: message.seq, accepted }); if (accepted) server.onconnection(0, handle); }
走到這里worker進程的server就拿到了連接的socket句柄可以進行處理,但是好像有點問題,worker進程的server好像還沒起起來啊,前面講的只是在master進程的調度器啟動了一個server,worker進程并沒有server。我們又得翻回前面的內容看一看了,看看之前提到的workder進程的cluster._getServer,里面send方法發送了一個函數,函數里面的rr(reply, indexesKey, cb);就是創建了workder進程server的代碼。
先來看看cluster._getServer中發送的函數怎么被調用的。這里需要來了解一下之前出現了幾次的sendHelper,它是cluster模塊用來做進程間通信的,另外還有一個internal方法用來處理通信的回調。cluster._getServer的send會調用sendHelper,它會用message.seq當key把send的函數存儲起來。然后在internal方法處理通信的回調時判斷message是否有這個key,是否能找到這個函數,可以的話就執行。而在master進程執行queryServer把worker進程添加到調度器中時添加了一個回調函數,回調函數send了一個帶seq的消息,并且handle為null,就是這個消息觸發了cluster._getServer發送的函數的執行。相關代碼如下:
// `internal/cluster/utils.js` const callbacks = {}; var seq = 0; function sendHelper(proc, message, handle, cb) { // ... if (typeof cb === "function") callbacks[seq] = cb; message.seq = seq; // ... } function internal(worker, cb) { return function onInternalMessage(message, handle) { // ... var fn = cb; if (message.ack !== undefined && callbacks[message.ack] !== undefined) { fn = callbacks[message.ack]; delete callbacks[message.ack]; } // ... }; } // lib/internal/cluster/master.js function queryServer(worker, message) { // ... // Set custom server data handle.add(worker, (errno, reply, handle) => { reply = util._extend({ // ... ack: message.seq, // ... }, reply); // ... send(worker, reply, handle); });
最終,rr(reply, indexesKey, cb);執行,它構造了一個假的socket句柄,句柄設置了一個不做操作的listen方法。然后執行cb,這個cb也就是前面提到過的listenOnMasterHandle,它會把假socket句柄賦值給worker進程的server._handle,隨后由于server._handle的存在,server._listen2(address, port, addressType, backlog, fd);也不會做任何操作,也就是說worker進程創建的server是不會對端口進行監聽的。相關代碼如下:
// lib/internal/cluster/child.js function rr(message, indexesKey, cb) { function listen(backlog) { // ... return 0; } // ... cb(0, handle); } // lib/net.js function listenOnMasterHandle(err, handle) { // ... server._handle = handle; server._listen2(address, port, addressType, backlog, fd); } // setupListenHandle就是_listen2 function setupListenHandle(address, port, addressType, backlog, fd) { // ... if (this._handle) { debug("setupListenHandle: have a handle already"); } // ...
至此,cluster模塊如何建立多進程服務的就算講完了。畫個草圖總結下吧:
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/89327.html
摘要:嚴格來說,并不是單線程的。其他異步和事件驅動相關的線程通過來實現內部的線程池和線程調度。線程是最小的進程,因此也是單進程的。子進程中執行的是非程序,提供一組參數后,執行的結果以回調的形式返回。在子進程中通過和的機制來接收和發送消息。 ??node遵循的是單線程單進程的模式,node的單線程是指js的引擎只有一個實例,且在nodejs的主線程中執行,同時node以事件驅動的方式處理IO...
摘要:一般由客戶端發送,用來表示報文段中第一個數據字節在數據流中的序號,主要用來解決網絡包亂序的問題。為有效,為無效表示,當數據包得到后,立馬給應用程序使用到最頂端用來確保連接的安全。親,那進程和線程區別是什么嘞這算是計算機的基本知識吧。 在正文之前,我想問大家一個問題:問:親,你有基礎嗎?答: 有啊,你說前端嗎? 不就是HTML,JS,CSS 嗎? so easy~問: oh-my-zsh...
摘要:通常的解決方案,便是使用中自帶的模塊,以模式啟動多個應用實例。最后中的模塊除了上述提到的功能外,其實還提供了非常豐富的供和進程之前通信,對于不同的操作系統平臺,也提供了不同的默認行為。如果大家有閑,非常推薦完整領略一下模塊的代碼實現。 眾所周知,Node.js中的JavaScript代碼執行在單線程中,非常脆弱,一旦出現了未捕獲的異常,那么整個應用就會崩潰。這在許多場景下,尤其是web...
閱讀 2236·2021-11-24 11:15
閱讀 3080·2021-11-24 10:46
閱讀 1378·2021-11-24 09:39
閱讀 3924·2021-08-18 10:21
閱讀 1478·2019-08-30 15:53
閱讀 1395·2019-08-30 11:19
閱讀 3320·2019-08-29 18:42
閱讀 2321·2019-08-29 16:58