国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

《Node.js設(shè)計(jì)模式》高級(jí)異步準(zhǔn)則

wfc_666 / 2292人閱讀

摘要:這使我們的知道什么時(shí)候原始模塊被初始化,在初始化后執(zhí)行預(yù)初始化隊(duì)列的操作,之后清空預(yù)初始化隊(duì)列,再調(diào)用作為參數(shù)的回調(diào)函數(shù),以下為具體步驟把賦值給,表示預(yù)初始化已經(jīng)完成了。

本系列文章為《Node.js Design Patterns Second Edition》的原文翻譯和讀書筆記,在GitHub連載更新,同步翻譯版鏈接。

歡迎關(guān)注我的專欄,之后的博文將在專欄同步:

Encounter的掘金專欄

知乎專欄 Encounter的編程思考

segmentfault專欄 前端小站

Advanced Asynchronous Recipes

幾乎所有我們迄今為止看到的設(shè)計(jì)模式都可以被認(rèn)為是通用的,并且適用于應(yīng)用程序的許多不同的領(lǐng)域。但是,有一套更具體的模式,專注于解決明確的問題。我們可以調(diào)用這些模式。就像現(xiàn)實(shí)生活中的烹飪一樣,我們有一套明確的步驟來實(shí)現(xiàn)預(yù)期的結(jié)果。當(dāng)然,這并不意味著我們不能用一些創(chuàng)意來定制設(shè)計(jì)模式,以配合我們的客人的口味,對(duì)于書寫Node.js程序來說是必要的。在本章中,我們將提供一些常見的解決方案來解決我們?cè)谌粘?b>Node.js開發(fā)中遇到的一些具體問題。這些模式包括以下內(nèi)容:

異步引入模塊并初始化

在高并發(fā)的應(yīng)用程序中使用批處理和緩存異步操作的性能優(yōu)化

運(yùn)行與Node.js處理并發(fā)請(qǐng)求的能力相悖的阻塞事件循環(huán)的同步CPU綁定操作

異步引入模塊并初始化

Chapter2-Node.js Essential Patterns中,當(dāng)我們討論Node.js模塊系統(tǒng)的基本屬性時(shí),我們提到了require()是同步的,并且module.exports也不能異步設(shè)置。

這是在核心模塊和許多npm包中存在同步API的主要原因之一,是否同步加載會(huì)被作為一個(gè)option參數(shù)被提供,主要用于初始化任務(wù),而不是替代異步API

不幸的是,這并不總是可能的。同步API可能并不總是可用的,特別是對(duì)于在初始化階段使用網(wǎng)絡(luò)的組件,例如執(zhí)行三次握手協(xié)議或在網(wǎng)絡(luò)中檢索配置參數(shù)。 許多數(shù)據(jù)庫驅(qū)動(dòng)程序和消息隊(duì)列等中間件系統(tǒng)的客戶端都是如此。

廣泛適用的解決方案

我們舉一個(gè)例子:一個(gè)名為db的模塊,它將會(huì)連接到遠(yuǎn)程數(shù)據(jù)庫。 只有在連接和與服務(wù)器的握手完成之后,db模塊才能夠接受請(qǐng)求。在這種情況下,我們通常有兩種選擇:

在開始使用之前確保模塊已經(jīng)初始化,否則則等待其初始化。每當(dāng)我們想要在異步模塊上調(diào)用一個(gè)操作時(shí),都必須完成這個(gè)過程:

const db = require("aDb"); //The async module
module.exports = function findAll(type, callback) {
  if (db.connected) { //is it initialized?
    runFind();
  } else {
    db.once("connected", runFind);
  }

  function runFind() {
    db.findAll(type, callback);
  };
};

使用依賴注入(Dependency Injection)而不是直接引入異步模塊。通過這樣做,我們可以延遲一些模塊的初始化,直到它們的異步依賴被完全初始化。 這種技術(shù)將管理模塊初始化的復(fù)雜性轉(zhuǎn)移到另一個(gè)組件,通常是它的父模塊。 在下面的例子中,這個(gè)組件是app.js

// 模塊app.js
const db = require("aDb"); // aDb是一個(gè)異步模塊
const findAllFactory = require("./findAll");
db.on("connected", function() {
  const findAll = findAllFactory(db);
  // 之后再執(zhí)行異步操作
});


// 模塊findAll.js
module.exports = db => {
  //db 在這里被初始化
  return function findAll(type, callback) {
    db.findAll(type, callback);
  }
}

我們可以看出,如果所涉及的異步依賴的數(shù)量過多,第一種方案便不太適用了。

另外,使用DI有時(shí)也是不理想的,正如我們?cè)?b>Chapter7-Wiring Modules中看到的那樣。在大型項(xiàng)目中,它可能很快變得過于復(fù)雜,尤其對(duì)于手動(dòng)完成并使用異步初始化模塊的情況下。如果我們使用一個(gè)設(shè)計(jì)用于支持異步初始化模塊的DI容器,這些問題將會(huì)得到緩解。

但是,我們將會(huì)看到,還有第三種方案可以讓我們輕松地將模塊從其依賴關(guān)系的初始化狀態(tài)中分離出來。

預(yù)初始化隊(duì)列

將模塊與依賴項(xiàng)的初始化狀態(tài)分離的簡(jiǎn)單模式涉及到使用隊(duì)列和命令模式。這個(gè)想法是保存一個(gè)模塊在尚未初始化的時(shí)候接收到的所有操作,然后在所有初始化步驟完成后立即執(zhí)行這些操作。

實(shí)現(xiàn)一個(gè)異步初始化的模塊

為了演示這個(gè)簡(jiǎn)單而有效的技術(shù),我們來構(gòu)建一個(gè)應(yīng)用程序。首先創(chuàng)建一個(gè)名為asyncModule.js的異步初始化模塊:

const asyncModule = module.exports;

asyncModule.initialized = false;
asyncModule.initialize = callback => {
  setTimeout(() => {
    asyncModule.initialized = true;
    callback();
  }, 10000);
};

asyncModule.tellMeSomething = callback => {
  process.nextTick(() => {
    if(!asyncModule.initialized) {
      return callback(
        new Error("I don"t have anything to say right now")
      );
    }
    callback(null, "Current time is: " + new Date());
  });
};

在上面的代碼中,asyncModule展現(xiàn)了一個(gè)異步初始化模塊的設(shè)計(jì)模式。 它有一個(gè)initialize()方法,在10秒的延遲后,將初始化的flag變量設(shè)置為true,并通知它的回調(diào)調(diào)用(10秒對(duì)于真實(shí)應(yīng)用程序來說是很長(zhǎng)的一段時(shí)間了,但是對(duì)于具有互斥條件的應(yīng)用來說可能會(huì)顯得力不從心)。

另一個(gè)方法tellMeSomething()返回當(dāng)前的時(shí)間,但是如果模塊還沒有初始化,它拋出產(chǎn)生一個(gè)異常。
下一步是根據(jù)我們剛剛創(chuàng)建的服務(wù)創(chuàng)建另一個(gè)模塊。 我們?cè)O(shè)計(jì)一個(gè)簡(jiǎn)單的HTTP請(qǐng)求處理程序,在一個(gè)名為routes.js的文件中實(shí)現(xiàn):

const asyncModule = require("./asyncModule");

module.exports.say = (req, res) => {
  asyncModule.tellMeSomething((err, something) => {
    if(err) {
      res.writeHead(500);
      return res.end("Error:" + err.message);
    }
    res.writeHead(200);
    res.end("I say: " + something);
  });
};

handler中調(diào)用asyncModuletellMeSomething()方法,然后將其結(jié)果寫入HTTP響應(yīng)中。 正如我們所看到的那樣,我們沒有對(duì)asyncModule的初始化狀態(tài)進(jìn)行任何檢查,這可能會(huì)導(dǎo)致問題。

現(xiàn)在,創(chuàng)建app.js模塊,使用核心http模塊創(chuàng)建一個(gè)非常基本的HTTP服務(wù)器:

const http = require("http");
const routes = require("./routes");
const asyncModule = require("./asyncModule");

asyncModule.initialize(() => {
  console.log("Async module initialized");
});

http.createServer((req, res) => {
  if (req.method === "GET" && req.url === "/say") {
    return routes.say(req, res);
  }
  res.writeHead(404);
  res.end("Not found");
}).listen(8000, () => console.log("Started"));

上述模塊是我們應(yīng)用程序的入口點(diǎn),它所做的只是觸發(fā)asyncModule的初始化并創(chuàng)建一個(gè)HTTP服務(wù)器,它使用我們以前創(chuàng)建的handlerroutes.say())來對(duì)網(wǎng)絡(luò)請(qǐng)求作出相應(yīng)。

我們現(xiàn)在可以像往常一樣通過執(zhí)行app.js模塊來嘗試啟動(dòng)我們的服務(wù)器。

在服務(wù)器啟動(dòng)后,我們可以嘗試使用瀏覽器訪問URLhttp://localhost:8000/并查看從asyncModule返回的內(nèi)容。
和預(yù)期的一樣,如果我們?cè)诜?wù)器啟動(dòng)后立即發(fā)送請(qǐng)求,結(jié)果將是一個(gè)錯(cuò)誤,如下所示:

Error:I don"t have anything to say right now

顯然,在異步模塊加載好了之后:

這意味著asyncModule尚未初始化,但我們?nèi)試L試使用它,則會(huì)拋出一個(gè)錯(cuò)誤。

根據(jù)異步初始化模塊的實(shí)現(xiàn)細(xì)節(jié),幸運(yùn)的情況是我們可能會(huì)收到一個(gè)錯(cuò)誤,乃至丟失重要的信息,崩潰整個(gè)應(yīng)用程序。 總的來說,我們剛剛描述的情況總是必須要避免的。

大多數(shù)時(shí)候,可能并不會(huì)出現(xiàn)上述問題,畢竟初始化一般來說很快,以至于在實(shí)踐中,它永遠(yuǎn)不會(huì)發(fā)生。 然而,對(duì)于設(shè)計(jì)用于自動(dòng)調(diào)節(jié)的高負(fù)載應(yīng)用和云服務(wù)器,情況就完全不同了。

用預(yù)初始化隊(duì)列包裝模塊

為了維護(hù)服務(wù)器的健壯性,我們現(xiàn)在要通過使用我們?cè)诒竟?jié)開頭描述的模式來進(jìn)行異步模塊加載。我們將在asyncModule尚未初始化的這段時(shí)間內(nèi)對(duì)所有調(diào)用的操作推入一個(gè)預(yù)初始化隊(duì)列,然后在異步模塊加載好后處理它們時(shí)立即刷新隊(duì)列。這就是狀態(tài)模式的一個(gè)很好的應(yīng)用!我們將需要兩個(gè)狀態(tài),一個(gè)在模塊尚未初始化的時(shí)候?qū)⑺胁僮髋抨?duì),另一個(gè)在初始化完成時(shí)將每個(gè)方法簡(jiǎn)單地委托給原始的asyncModule模塊。

通常,我們沒有機(jī)會(huì)修改異步模塊的代碼;所以,為了添加我們的排隊(duì)層,我們需要圍繞原始的asyncModule模塊創(chuàng)建一個(gè)代理。

接下來創(chuàng)建一個(gè)名為asyncModuleWrapper.js的新文件,讓我們依照每個(gè)步驟逐個(gè)構(gòu)建它。我們需要做的第一件事是創(chuàng)建一個(gè)代理,并將原始異步模塊的操作委托給這個(gè)代理:

const asyncModule = require("./asyncModule");
const asyncModuleWrapper = module.exports;
asyncModuleWrapper.initialized = false;
asyncModuleWrapper.initialize = () => {
  activeState.initialize.apply(activeState, arguments);
};
asyncModuleWrapper.tellMeSomething = () => {
  activeState.tellMeSomething.apply(activeState, arguments);
};

在前面的代碼中,asyncModuleWrapper將其每個(gè)方法簡(jiǎn)單地委托給activeState。 讓我們來看看這兩個(gè)狀態(tài)是什么樣子

notInitializedState開始,notInitializedState是指還沒初始化的狀態(tài):

// 當(dāng)模塊沒有被初始化時(shí)的狀態(tài)
let pending = [];
let notInitializedState = {

  initialize: function(callback) {
    asyncModule.initialize(function() {
      asyncModuleWrapper.initalized = true;
      activeState = initializedState;
      
      pending.forEach(function(req) {
        asyncModule[req.method].apply(null, req.args);
      });
      pending = [];
      
      callback();
    });
  },
  
  tellMeSomething: function(callback) {
    return pending.push({
      method: "tellMeSomething",
      args: arguments
    });
  }
  
};

當(dāng)initialize()方法被調(diào)用時(shí),我們觸發(fā)初始化asyncModule模塊,提供一個(gè)回調(diào)函數(shù)作為參數(shù)。 這使我們的asyncModuleWrapper知道什么時(shí)候原始模塊被初始化,在初始化后執(zhí)行預(yù)初始化隊(duì)列的操作,之后清空預(yù)初始化隊(duì)列,再調(diào)用作為參數(shù)的回調(diào)函數(shù),以下為具體步驟:

initializedState賦值給activeState,表示預(yù)初始化已經(jīng)完成了。

執(zhí)行先前存儲(chǔ)在待處理隊(duì)列中的所有命令。

調(diào)用原始回調(diào)。

由于此時(shí)的模塊尚未初始化,此狀態(tài)的tellMeSomething()方法僅創(chuàng)建一個(gè)新的Command對(duì)象,并將其添加到預(yù)初始化隊(duì)列中。

此時(shí),當(dāng)原始的asyncModule模塊尚未初始化時(shí),代理應(yīng)該已經(jīng)清楚,我們的代理將簡(jiǎn)單地把所有接收到的請(qǐng)求防到預(yù)初始化隊(duì)列中。 然后,當(dāng)我們被通知初始化完成時(shí),我們執(zhí)行所有預(yù)初始化隊(duì)列的操作,然后將內(nèi)部狀態(tài)切換到initializedState。來看這個(gè)代理模塊最后的定義:

let initializedState = asyncModule;

不出意外,initializedState對(duì)象只是對(duì)原始的asyncModule的引用!事實(shí)上,初始化完成后,我們可以安全地將任何請(qǐng)求直接發(fā)送到原始模塊。

最后,設(shè)定異步模塊還沒加載好的的狀態(tài),即notInitializedState

let activeState = notInitializedState;

我們現(xiàn)在可以嘗試再次啟動(dòng)我們的測(cè)試服務(wù)器,但首先,我們不要忘記用我們新的asyncModuleWrapper對(duì)象替換原始的asyncModule模塊的引用; 這必須在app.jsroutes.js模塊中完成。

這樣做之后,如果我們?cè)噲D再次向服務(wù)器發(fā)送一個(gè)請(qǐng)求,我們會(huì)看到在asyncModule模塊尚未初始化的時(shí)候,請(qǐng)求不會(huì)失敗; 相反,他們會(huì)掛起,直到初始化完成,然后才會(huì)被實(shí)際執(zhí)行。我們當(dāng)然可以肯定,比起之前,容錯(cuò)率變得更高了。

可以看到,在剛剛初始化異步模塊的時(shí)候,服務(wù)器會(huì)等待請(qǐng)求的響應(yīng):

在異步模塊加載完成后,服務(wù)器才會(huì)返回響應(yīng)的信息:

模式:如果模塊是需要異步初始化的,則對(duì)每個(gè)操作進(jìn)行排隊(duì),直到模塊完全初始化釋放隊(duì)列。

現(xiàn)在,我們的服務(wù)器可以在啟動(dòng)后立即開始接受請(qǐng)求,并保證這些請(qǐng)求都不會(huì)由于其模塊的初始化狀態(tài)而失敗。我們能夠在不使用DI的情況下獲得這個(gè)結(jié)果,也不需要冗長(zhǎng)且容易出錯(cuò)的檢查來驗(yàn)證異步模塊的狀態(tài)。

其它場(chǎng)景的應(yīng)用

我們剛剛介紹的模式被許多數(shù)據(jù)庫驅(qū)動(dòng)程序和ORM庫所使用。 最值得注意的是Mongoose,它是MongoDBORM。使用Mongoose,不必等待數(shù)據(jù)庫連接打開,以便能夠發(fā)送查詢,因?yàn)槊總€(gè)操作都排隊(duì),稍后與數(shù)據(jù)庫的連接完全建立時(shí)執(zhí)行。 這顯然提高了其API的可用性。

看一下Mongoose的源碼,它的每個(gè)方法是如何通過代理添加預(yù)初始化隊(duì)列。 可以看看實(shí)現(xiàn)這中模式的代碼片段:https://github.com/Automattic...
for (var i in Collection.prototype) {
  (function(i){
    NativeCollection.prototype[i] = function () {
      if (this.buffer) {
        // mongoose中,在緩沖區(qū)不為空時(shí),只是簡(jiǎn)單地把這個(gè)操作加入緩沖區(qū)內(nèi)
        this.addQueue(i, arguments);
        return;
      }

      var collection = this.collection
        , args = arguments
        , self = this
        , debug = self.conn.base.options.debug;

      if (debug) {
        if ("function" === typeof debug) {
          debug.apply(debug
            , [self.name, i].concat(utils.args(args, 0, args.length-1)));
        } else {
          console.error("x1B[0;36mMongoose:x1B[0m %s.%s(%s) %s %s %s"
            , self.name
            , i
            , print(args[0])
            , print(args[1])
            , print(args[2])
            , print(args[3]))
        }
      }

      return collection[i].apply(collection, args);
    };
  })(i);
}
異步批處理和緩存

在高負(fù)載的應(yīng)用程序中,緩存起著至關(guān)重要的作用,幾乎在網(wǎng)絡(luò)中的任何地方,從網(wǎng)頁,圖像和樣式表等靜態(tài)資源到純數(shù)據(jù)(如數(shù)據(jù)庫查詢的結(jié)果)都會(huì)使用緩存。 在本節(jié)中,我們將學(xué)習(xí)如何將緩存應(yīng)用于異步操作,以及如何充分利用緩存解決高請(qǐng)求吞吐量的問題。

實(shí)現(xiàn)沒有緩存或批處理的服務(wù)器

在這之前,我們來實(shí)現(xiàn)一個(gè)小型的服務(wù)器,以便用它來衡量緩存和批處理等技術(shù)在解決高負(fù)載應(yīng)用程序的優(yōu)勢(shì)。

讓我們考慮一個(gè)管理電子商務(wù)公司銷售的web服務(wù)器,特別是對(duì)于查詢我們的服務(wù)器所有特定類型的商品交易的總和的情況。 為此,考慮到LevelUP的簡(jiǎn)單性和靈活性,我們將再次使用LevelUP。我們要使用的數(shù)據(jù)模型是存儲(chǔ)在sales這一個(gè)sublevel中的簡(jiǎn)單事務(wù)列表,它是以下的形式:

transactionId {amount, item}

keytransactionId表示,value則是一個(gè)JSON對(duì)象,它包含amount,表示銷售金額和item,表示項(xiàng)目類型。
要處理的數(shù)據(jù)是非常基本的,所以讓我們立即在名為的totalSales.js文件中實(shí)現(xiàn)API,將如下所示:

const level = require("level");
const sublevel = require("level-sublevel");

const db = sublevel(level("example-db", {valueEncoding: "json"}));
const salesDb = db.sublevel("sales");

module.exports = function totalSales(item, callback) {
  console.log("totalSales() invoked");
  let sum = 0;
  salesDb.createValueStream()  // [1]
    .on("data", data => {
      if(!item || data.item === item) {  // [2]
        sum += data.amount;
      }
    })
    .on("end", () => {
      callback(null, sum);  // [3]
    });
};

該模塊的核心是totalSales函數(shù),它也是唯一exportsAPI;它進(jìn)行如下工作:

我們從包含交易信息的salesDbsublevel創(chuàng)建一個(gè)StreamStream將從數(shù)據(jù)庫中提取所有條目。

監(jiān)聽data事件,這個(gè)事件觸發(fā)時(shí),將從數(shù)據(jù)庫Stream中提取出每一項(xiàng),如果這一項(xiàng)的item參數(shù)正是我們需要的item,就去累加它的amount到總的sum里面。

最后,end事件觸發(fā)時(shí),我們最終調(diào)用callback()方法。

上述查詢方式可能在性能方面并不好。理想情況下,在實(shí)際的應(yīng)用程序中,我們可以使用索引,甚至使用增量映射來縮短實(shí)時(shí)計(jì)算的時(shí)間;但是,由于我們需要體現(xiàn)緩存的優(yōu)勢(shì),對(duì)于上述例子來說,慢速的查詢實(shí)際上更好,因?yàn)樗鼤?huì)突出顯示我們要分析的模式的優(yōu)點(diǎn)。

為了完成總銷售應(yīng)用程序,我們只需要從HTTP服務(wù)器公開totalSalesAPI;所以,下一步是構(gòu)建一個(gè)(app.js文件):

const http = require("http");
const url = require("url");
const totalSales = require("./totalSales");

http.createServer((req, res) => {
  const query = url.parse(req.url, true).query;
  totalSales(query.item, (err, sum) => {
    res.writeHead(200);
    res.end(`Total sales for item ${query.item} is ${sum}`);
  });
}).listen(8000, () => console.log("Started"));

我們創(chuàng)建的服務(wù)器是非常簡(jiǎn)單的;我們只需要它暴露totalSales API
在我們第一次啟動(dòng)服務(wù)器之前,我們需要用一些示例數(shù)據(jù)填充數(shù)據(jù)庫;我們可以使用專用于本節(jié)的代碼示例中的populate_db.js腳本來執(zhí)行此操作。該腳本將在數(shù)據(jù)庫中創(chuàng)建100K個(gè)隨機(jī)銷售交易。
好的! 現(xiàn)在,一切都準(zhǔn)備好了。 像往常一樣,啟動(dòng)服務(wù)器,我們執(zhí)行以下命令:

node app

請(qǐng)求這個(gè)HTTP接口,訪問至以下URL

http://localhost:8000/?item=book

但是,為了更好地了解服務(wù)器的性能,我們需要連續(xù)發(fā)送多個(gè)請(qǐng)求;所以,我們創(chuàng)建一個(gè)名為loadTest.js的腳本,它以200 ms的間隔發(fā)送請(qǐng)求。它已經(jīng)被配置為連接到服務(wù)器的URL,因此,要運(yùn)行它,執(zhí)行以下命令:

node loadTest

我們會(huì)看到這20個(gè)請(qǐng)求需要一段時(shí)間才能完成。注意測(cè)試的總執(zhí)行時(shí)間,因?yàn)槲覀儸F(xiàn)在開始我們的服務(wù),并測(cè)量我們可以節(jié)省多少時(shí)間。

批量異步請(qǐng)求

在處理異步操作時(shí),最基本的緩存級(jí)別可以通過將一組調(diào)用集中到同一個(gè)API來實(shí)現(xiàn)。這非常簡(jiǎn)單:如果我們?cè)谡{(diào)用異步函數(shù)的同時(shí)在隊(duì)列中還有另一個(gè)尚未處理的回調(diào),我們可以將回調(diào)附加到已經(jīng)運(yùn)行的操作上,而不是創(chuàng)建一個(gè)全新的請(qǐng)求。看下圖的情況:

前面的圖像顯示了兩個(gè)客戶端(它們可以是兩臺(tái)不同的機(jī)器,或兩個(gè)不同的Web請(qǐng)求),使用完全相同的輸入調(diào)用相同的異步操作。 當(dāng)然,描述這種情況的自然方式是由兩個(gè)客戶開始兩個(gè)多帶帶的操作,這兩個(gè)操作將在兩個(gè)不同的時(shí)刻完成,如前圖所示。現(xiàn)在考慮下一個(gè)場(chǎng)景,如下圖所示:

上圖向我們展示了如何對(duì)API的兩個(gè)請(qǐng)求進(jìn)行批處理,或者換句話說,對(duì)兩個(gè)請(qǐng)求執(zhí)行到相同的操作。通過這樣做,當(dāng)操作完成時(shí),兩個(gè)客戶端將同時(shí)被通知。這代表了一種簡(jiǎn)單而又非常強(qiáng)大的方式來降低應(yīng)用程序的負(fù)載,而不必處理更復(fù)雜的緩存機(jī)制,這通常需要適當(dāng)?shù)膬?nèi)存管理和緩存失效策略。

在電子商務(wù)銷售的Web服務(wù)器中使用批處理

現(xiàn)在讓我們?cè)?b>totalSales API上添加一個(gè)批處理層。我們要使用的模式非常簡(jiǎn)單:如果在API被調(diào)用時(shí)已經(jīng)有另一個(gè)相同的請(qǐng)求掛起,我們將把這個(gè)回調(diào)添加到一個(gè)隊(duì)列中。當(dāng)異步操作完成時(shí),其隊(duì)列中的所有回調(diào)立即被調(diào)用。

現(xiàn)在,讓我們來改變之前的代碼:創(chuàng)建一個(gè)名為totalSalesBatch.js的新模塊。在這里,我們將在原始的totalSales API之上實(shí)現(xiàn)一個(gè)批處理層:

const totalSales = require("./totalSales");

const queues = {};
module.exports = function totalSalesBatch(item, callback) {
  if(queues[item]) {  // [1]
    console.log("Batching operation");
    return queues[item].push(callback);
  }
  
  queues[item] = [callback];  // [2]
  totalSales(item, (err, res) => {
    const queue = queues[item];  // [3]
    queues[item] = null;
    queue.forEach(cb => cb(err, res));
  });
};

totalSalesBatch()函數(shù)是原始的totalSales() API的代理,它的工作原理如下:

如果請(qǐng)求的item已經(jīng)存在隊(duì)列中,則意味著該特定item的請(qǐng)求已經(jīng)在服務(wù)器任務(wù)隊(duì)列中。在這種情況下,我們所要做的只是將回調(diào)push到現(xiàn)有隊(duì)列,并立即從調(diào)用中返回。不進(jìn)行后續(xù)操作。

如果請(qǐng)求的item沒有在隊(duì)列中,這意味著我們必須創(chuàng)建一個(gè)新的請(qǐng)求。為此,我們?yōu)樵撎囟?b>item的請(qǐng)求創(chuàng)建一個(gè)新隊(duì)列,并使用當(dāng)前回調(diào)函數(shù)對(duì)其進(jìn)行初始化。 接下來,我們調(diào)用原始的totalSales() API

當(dāng)原始的totalSales()請(qǐng)求完成時(shí),則執(zhí)行我們的回調(diào)函數(shù),我們遍歷隊(duì)列中為該特定請(qǐng)求的item添加的所有回調(diào),并分別調(diào)用這些回調(diào)函數(shù)。

totalSalesBatch()函數(shù)的行為與原始的totalSales() API的行為相同,不同之處在于,現(xiàn)在對(duì)于相同內(nèi)容的請(qǐng)求API進(jìn)行批處理,從而節(jié)省時(shí)間和資源。

想知道相比于totalSales() API原始的非批處理版本,在性能方面的優(yōu)勢(shì)是什么?然后,讓我們將HTTP服務(wù)器使用的totalSales模塊替換為我們剛剛創(chuàng)建的模塊,修改app.js文件如下:

//const totalSales = require("./totalSales");
const totalSales = require("./totalSalesBatch");
http.createServer(function(req, res) {
// ...
});

如果我們現(xiàn)在嘗試再次啟動(dòng)服務(wù)器并進(jìn)行負(fù)載測(cè)試,我們首先看到的是請(qǐng)求被批量返回。

除此之外,我們觀察到請(qǐng)求的總時(shí)間大大減少;它應(yīng)該至少比對(duì)原始totalSales() API執(zhí)行的原始測(cè)試快四倍!

這是一個(gè)驚人的結(jié)果,證明了只需應(yīng)用一個(gè)簡(jiǎn)單的批處理層即可獲得巨大的性能提升,比起緩存機(jī)制,也沒有顯得太復(fù)雜,因?yàn)椋瑹o需考慮緩存淘汰策略。

批處理模式在高負(fù)載應(yīng)用程序和執(zhí)行較為緩慢的API中發(fā)揮巨大作用,正是由于這種模式的運(yùn)用,可以批量處理大量的請(qǐng)求。
異步請(qǐng)求緩存策略

異步批處理模式的問題之一是對(duì)于API的答復(fù)越快,我們對(duì)于批處理來說,其意義就越小。有人可能會(huì)爭(zhēng)辯說,如果一個(gè)API已經(jīng)很快了,那么試圖優(yōu)化它就沒有意義了。然而,它仍然是一個(gè)占用應(yīng)用程序的資源負(fù)載的因素,總結(jié)起來,仍然可以有解決方案。另外,如果API調(diào)用的結(jié)果不會(huì)經(jīng)常改變;因此,這時(shí)候批處理將并不會(huì)有較好的性能提升。在這種情況下,減少應(yīng)用程序負(fù)載并提高響應(yīng)速度的最佳方案肯定是更好的緩存模式。

緩存模式很簡(jiǎn)單:一旦請(qǐng)求完成,我們將其結(jié)果存儲(chǔ)在緩存中,該緩存可以是變量,數(shù)據(jù)庫中的條目,也可以是專門的緩存服務(wù)器。因此,下一次調(diào)用API時(shí),可以立即從緩存中檢索結(jié)果,而不是產(chǎn)生另一個(gè)請(qǐng)求。

對(duì)于一個(gè)有經(jīng)驗(yàn)的開發(fā)人員來說,緩存不應(yīng)該是多么新的技術(shù),但是異步編程中這種模式的不同之處在于它應(yīng)該與批處理結(jié)合在一起,以達(dá)到最佳效果。原因是因?yàn)槎鄠€(gè)請(qǐng)求可能并發(fā)運(yùn)行,而沒有設(shè)置緩存,并且當(dāng)這些請(qǐng)求完成時(shí),緩存將會(huì)被設(shè)置多次,這樣做則會(huì)造成緩存資源的浪費(fèi)。

基于這些假設(shè),異步請(qǐng)求緩存模式的最終結(jié)構(gòu)如下圖所示:

上圖給出了異步緩存算法的兩個(gè)步驟:

與批處理模式完全相同,與在未設(shè)置高速緩存時(shí)接收到的任何請(qǐng)求將一起批處理。這些請(qǐng)求完成時(shí),緩存將會(huì)被設(shè)置一次。

當(dāng)緩存最終被設(shè)置時(shí),任何后續(xù)的請(qǐng)求都將直接從緩存中提供。

另外我們需要考慮Zalgo的反作用(我們已經(jīng)在Chapter 2-Node.js Essential Patterns中看到了它的實(shí)際應(yīng)用)。在處理異步API時(shí),我們必須確保始終以異步方式返回緩存的值,即使訪問緩存只涉及同步操作。

在電子商務(wù)銷售的Web服務(wù)器中使用異步緩存請(qǐng)求

實(shí)踐異步緩存模式的優(yōu)點(diǎn),現(xiàn)在讓我們將我們學(xué)到的東西應(yīng)用到totalSales() API

與異步批處理示例程序一樣,我們創(chuàng)建一個(gè)代理,其作用是添加緩存層。

然后創(chuàng)建一個(gè)名為totalSalesCache.js的新模塊,代碼如下:

const totalSales = require("./totalSales");

const queues = {};
const cache = {};

module.exports = function totalSalesBatch(item, callback) {
  const cached = cache[item];
  if (cached) {
    console.log("Cache hit");
    return process.nextTick(callback.bind(null, null, cached));
  }
  
  if (queues[item]) {
    console.log("Batching operation");
    return queues[item].push(callback);
  }
  
  queues[item] = [callback];
  totalSales(item, (err, res) => {
    if (!err) {
      cache[item] = res;
      setTimeout(() => {
        delete cache[item];
      }, 30 * 1000); //30 seconds expiry
    }
    
    const queue = queues[item];
    queues[item] = null;
    queue.forEach(cb => cb(err, res));
  });
};

我們可以看到前面的代碼與我們異步批處理的很多地方基本相同。 其實(shí)唯一的區(qū)別是以下幾點(diǎn):

我們需要做的第一件事就是檢查緩存是否被設(shè)置,如果是這種情況,我們將立即使用callback()返回緩存的值,這里必須要使用process.nextTick(),因?yàn)榫彺婵赡苁钱惒皆O(shè)定的,需要等到下一次事件輪詢時(shí)才能夠保證緩存已經(jīng)被設(shè)定。

繼續(xù)異步批處理模式,但是這次,當(dāng)原始API成功完成時(shí),我們將結(jié)果保存到緩存中。此外,我們還設(shè)置了一個(gè)緩存淘汰機(jī)制,在30秒后使緩存失效。 一個(gè)簡(jiǎn)單而有效的技術(shù)!

現(xiàn)在,我們準(zhǔn)備嘗試我們剛創(chuàng)建的totalSales模塊。 先更改app.js模塊,如下所示:

// const totalSales = require("./totalSales");
// const totalSales = require("./totalSalesBatch");
const totalSales = require("./totalSalesCache");
   http.createServer(function(req, res) {
     // ...
});

現(xiàn)在,重新啟動(dòng)服務(wù)器,并使用loadTest.js腳本進(jìn)行配置,就像我們?cè)谇懊娴睦又兴龅哪菢印J褂媚J(rèn)的測(cè)試參數(shù),與簡(jiǎn)單的異步批處理模式相比,很明顯地有了更好的性能提升。 當(dāng)然,這很大程度上取決于很多因素;例如收到的請(qǐng)求數(shù)量,以及一個(gè)請(qǐng)求和另一個(gè)請(qǐng)求之間的延遲等。當(dāng)請(qǐng)求數(shù)量較高且跨越較長(zhǎng)時(shí)間時(shí),使用高速緩存批處理的優(yōu)勢(shì)將更為顯著。

Memoization被稱做緩存函數(shù)調(diào)用的結(jié)果的算法。 在npm中,你可以找到許多包來實(shí)現(xiàn)異步的memoization,其中最著名的之一之一是memoizee。
有關(guān)實(shí)現(xiàn)緩存機(jī)制的說明

我們必須記住,在實(shí)際應(yīng)用中,我們可能想要使用更先進(jìn)的失效技術(shù)和存儲(chǔ)機(jī)制。 這可能是必要的,原因如下:

大量的緩存值可能會(huì)消耗大量?jī)?nèi)存。 在這種情況下,可以應(yīng)用最近最少使用(LRU)算法來保持恒定的存儲(chǔ)器利用率。

當(dāng)應(yīng)用程序分布在多個(gè)進(jìn)程中時(shí),對(duì)緩存使用簡(jiǎn)單變量可能會(huì)導(dǎo)致每個(gè)服務(wù)器實(shí)例返回不同的結(jié)果。如果這對(duì)于我們正在實(shí)現(xiàn)的特定應(yīng)用程序來說是不希望的,那么解決方案就是使用共享存儲(chǔ)來存儲(chǔ)緩存。 常用的解決方案是Redis和Memcached。

與定時(shí)淘汰緩存相比,手動(dòng)淘汰高速緩存可使得高速緩存使用壽命更長(zhǎng),同時(shí)提供更新的數(shù)據(jù),但當(dāng)然,管理起緩存來要復(fù)雜得多。

使用Promise進(jìn)行批處理和緩存

Chapter4-Asynchronous Control Flow Patterns with ES2015 and Beyond中,我們看到了Promise如何極大地簡(jiǎn)化我們的異步代碼,但是在處理批處理和緩存時(shí),它則可以提供更大的幫助。

利用Promise進(jìn)行異步批處理和緩存策略,有如下兩個(gè)優(yōu)點(diǎn):

多個(gè)then()監(jiān)聽器可以附加到相同的Promise實(shí)例。

then()監(jiān)聽器最多保證被調(diào)用一次,即使在Promise已經(jīng)被resolve了之后,then()也能正常工作。 此外,then()總是會(huì)被保證其是異步調(diào)用的。

簡(jiǎn)而言之,第一個(gè)優(yōu)點(diǎn)正是批處理請(qǐng)求所需要的,而第二個(gè)優(yōu)點(diǎn)則在Promise已經(jīng)是解析值的緩存時(shí),也會(huì)提供同樣的的異步返回緩存值的機(jī)制。

下面開始看代碼,我們可以嘗試使用PromisestotalSales()創(chuàng)建一個(gè)模塊,在其中添加批處理和緩存功能。創(chuàng)建一個(gè)名為totalSalesPromises.js的新模塊:

const pify = require("pify");  // [1]
const totalSales = pify(require("./totalSales"));

const cache = {};
module.exports = function totalSalesPromises(item) {
  if (cache[item]) {  // [2]
    return cache[item];
  }

  cache[item] = totalSales(item)  // [3]
    .then(res => {  // [4]
      setTimeout(() => {delete cache[item]}, 30 * 1000); //30 seconds expiry
      return res;
    })
    .catch(err => {  // [5]
      delete cache[item];
      throw err;
    });
  return cache[item];  // [6]
};

Promise確實(shí)很好,下面是上述函數(shù)的功能描述:

首先,我們需要一個(gè)名為pify的模塊,它允許我們對(duì)totalSales()模塊進(jìn)行promisification。這樣做之后,totalSales()將返回一個(gè)符合ES2015標(biāo)準(zhǔn)的Promise實(shí)例,而不是接受一個(gè)回調(diào)函數(shù)作為參數(shù)。

當(dāng)調(diào)用totalSalesPromises()時(shí),我們檢查給定的項(xiàng)目類型是否已經(jīng)在緩存中有相應(yīng)的Promise。如果我們已經(jīng)有了這樣的Promise,我們直接返回這個(gè)Promise實(shí)例。

如果我們?cè)诰彺嬷袥]有針對(duì)給定項(xiàng)目類型的Promise,我們繼續(xù)通過調(diào)用原始(promisified)的totalSales()來創(chuàng)建一個(gè)Promise實(shí)例。

當(dāng)Promise正常resolve了,我們?cè)O(shè)置了一個(gè)清除緩存的時(shí)間(假設(shè)為30秒),我們返回res將操作的結(jié)果返回給應(yīng)用程序。

如果Promise被異常reject了,我們立即重置緩存,并再次拋出錯(cuò)誤,將其傳播到Promise chain中,所以任何附加到相同Promise的其他應(yīng)用程序也將收到這一異常。

最后,我們返回我們剛才創(chuàng)建或者緩存的Promise實(shí)例。

非常簡(jiǎn)單直觀,更重要的是,我們使用Promise也能夠?qū)崿F(xiàn)批處理和緩存。
如果我們現(xiàn)在要嘗試使用totalSalesPromise()函數(shù),稍微調(diào)整app.js模塊,因?yàn)楝F(xiàn)在使用Promise而不是回調(diào)函數(shù)。 讓我們通過創(chuàng)建一個(gè)名為appPromises.js的app模塊來實(shí)現(xiàn):

const http = require("http");
const url = require("url");
const totalSales = require("./totalSalesPromises");

http.createServer(function(req, res) {
  const query = url.parse(req.url, true).query;
  totalSales(query.item).then(function(sum) {
    res.writeHead(200);
    res.end(`Total sales for item ${query.item} is ${sum}`);
  });
}).listen(8000, function() {console.log("Started")});

它的實(shí)現(xiàn)與原始應(yīng)用程序模塊幾乎完全相同,不同的是現(xiàn)在我們使用的是基于Promise的批處理/緩存封裝版本; 因此,我們調(diào)用它的方式也略有不同。

運(yùn)行以下命令開啟這個(gè)新版本的服務(wù)器:

node appPromises
運(yùn)行與CPU-bound的任務(wù)

雖然上面的totalSales()在系統(tǒng)資源上面消耗較大,但是其也不會(huì)影響服務(wù)器處理并發(fā)的能力。 我們?cè)?b>Chapter1-Welcome to the Node.js Platform中了解到有關(guān)事件循環(huán)的內(nèi)容,應(yīng)該為此行為提供解釋:調(diào)用異步操作會(huì)導(dǎo)致堆棧退回到事件循環(huán),從而使其免于處理其他請(qǐng)求。

但是,當(dāng)我們運(yùn)行一個(gè)長(zhǎng)時(shí)間的同步任務(wù)時(shí),會(huì)發(fā)生什么情況,從不會(huì)將控制權(quán)交還給事件循環(huán)?

這種任務(wù)也被稱為CPU-bound,因?yàn)樗闹饕攸c(diǎn)是CPU利用率較高,而不是I/O操作繁重。
讓我們立即舉一個(gè)例子上看看這些類型的任務(wù)在Node.js中的具體行為。

解決子集總和問題

現(xiàn)在讓我們做一個(gè)CPU占用比較高的高計(jì)算量的實(shí)驗(yàn)。下面來看的是子集總和問題,我們計(jì)算一個(gè)數(shù)組中是否具有一個(gè)子數(shù)組,其總和為0。例如,如果我們有數(shù)組[1, 2, -4, 5, -3]作為輸入,則滿足問題的子數(shù)組是[1, 2, -3][2, -4, 5, -3]

最簡(jiǎn)單的算法是把每一個(gè)數(shù)組元素做遍歷然后依次計(jì)算,時(shí)間復(fù)雜度為O(2^n),或者換句話說,它隨著輸入的數(shù)組長(zhǎng)度成指數(shù)增長(zhǎng)。這意味著一組20個(gè)整數(shù)則會(huì)有多達(dá)1, 048, 576中情況,顯然不能夠通過窮舉來做到。當(dāng)然,這個(gè)問題的解決方案可能并不算復(fù)雜。為了使事情變得更加困難,我們將考慮數(shù)組和問題的以下變化:給定一組整數(shù),我們要計(jì)算所有可能的組合,其總和等于給定的任意整數(shù)。

const EventEmitter = require("events").EventEmitter;
class SubsetSum extends EventEmitter {
  constructor(sum, set) {
      super();
      this.sum = sum;
      this.set = set;
      this.totalSubsets = 0;
    } //...
}

SubsetSum類是EventEmitter類的子類;這使得我們每次找到一個(gè)匹配收到的總和作為輸入的新子集時(shí)都會(huì)發(fā)出一個(gè)事件。 我們將會(huì)看到,這會(huì)給我們很大的靈活性。

接下來,讓我們看看我們?nèi)绾文軌蛏伤锌赡艿淖蛹M合:

開始構(gòu)建一個(gè)這樣的算法。創(chuàng)建一個(gè)名為subsetSum.js的新模塊。在其中聲明一個(gè)SubsetSum類:

_combine(set, subset) {
  for(let i = 0; i < set.length; i++) {
    let newSubset = subset.concat(set[i]);
    this._combine(set.slice(i + 1), newSubset);
    this._processSubset(newSubset);
  }
}

不管算法其中到底是什么內(nèi)容,但有兩點(diǎn)要注意:

_combine()方法是完全同步的;它遞歸地生成每一個(gè)可能的子集,而不把CPU控制權(quán)交還給事件循環(huán)。如果我們考慮一下,這對(duì)于不需要任何I/O的算法來說是非常正常的。

每當(dāng)生成一個(gè)新的組合時(shí),我們都會(huì)將這個(gè)組合提供給_processSubset()方法以供進(jìn)一步處理。

_processSubset()方法負(fù)責(zé)驗(yàn)證給定子集的元素總和是否等于我們要查找的數(shù)字:

_processSubset(subset) {
  console.log("Subset", ++this.totalSubsets, subset);
  const res = subset.reduce((prev, item) => (prev + item), 0);
  if (res == this.sum) {
    this.emit("match", subset);
  }
}

簡(jiǎn)單地說,_processSubset()方法將reduce操作應(yīng)用于子集,以便計(jì)算其元素的總和。然后,當(dāng)結(jié)果總和等于給定的sum參數(shù)時(shí),會(huì)發(fā)出一個(gè)match事件。

最后,調(diào)用start()方法開始執(zhí)行算法:

start() {
  this._combine(this.set, []);
  this.emit("end");
}

通過調(diào)用_combine()觸發(fā)算法,最后觸發(fā)一個(gè)end事件,表明所有的組合都被檢查過,并且任何可能的匹配都已經(jīng)被計(jì)算出來。 這是可能的,因?yàn)?b>_combine()是同步的; 因此,只要前面的函數(shù)返回,end事件就會(huì)觸發(fā),這意味著所有的組合都被計(jì)算出來了。

接下來,我們?cè)诰W(wǎng)絡(luò)上公開剛剛創(chuàng)建的算法。可以使用一個(gè)簡(jiǎn)單的HTTP服務(wù)器對(duì)響應(yīng)的任務(wù)作出響應(yīng)。 特別是,我們希望以/subsetSum?data=&sum=這樣的請(qǐng)求格式進(jìn)行響應(yīng),傳入給定的數(shù)組和sum,使用SubsetSum算法進(jìn)行匹配。

在一個(gè)名為app.js的模塊中實(shí)現(xiàn)這個(gè)簡(jiǎn)單的服務(wù)器:

const http = require("http");
const SubsetSum = require("./subsetSum");

http.createServer((req, res) => {
  const url = require("url").parse(req.url, true);
  if(url.pathname === "/subsetSum") {
    const data = JSON.parse(url.query.data);
    res.writeHead(200);
    const subsetSum = new SubsetSum(url.query.sum, data);
    subsetSum.on("match", match => {
      res.write("Match: " + JSON.stringify(match) + "
");
    });
    subsetSum.on("end", () => res.end());
    subsetSum.start();
  } else {
    res.writeHead(200);
    res.end("Im alive!
");
  }
}).listen(8000, () => console.log("Started"));

由于SubsetSum實(shí)例使用事件返回結(jié)果,所以我們可以在算法生成后立即對(duì)匹配的結(jié)果使用Stream進(jìn)行處理。另一個(gè)需要注意的細(xì)節(jié)是,每次我們的服務(wù)器都會(huì)返回I"m alive!,這樣我們每次發(fā)送一個(gè)不同于/subsetSum的請(qǐng)求的時(shí)候。可以用來檢查我們服務(wù)器是否掛掉了,這在稍后將會(huì)看到。

開始運(yùn)行:

node app

一旦服務(wù)器啟動(dòng),我們準(zhǔn)備發(fā)送我們的第一個(gè)請(qǐng)求;讓我們嘗試發(fā)送一組17個(gè)隨機(jī)數(shù),這將導(dǎo)致產(chǎn)生131,071個(gè)組合,那么服務(wù)器將會(huì)處理一段時(shí)間:

curl -G http://localhost:8000/subsetSum --data-urlencode "data=[116,119,101,101,-116,109,101,-105,-102,117,-115,-97,119,-116,-104,-105,115]"--data-urlencode "sum=0"

這是如果我們?cè)诘谝粋€(gè)請(qǐng)求仍在運(yùn)行的時(shí)候在另一個(gè)終端中嘗試輸入以下命令,我們將發(fā)現(xiàn)一個(gè)巨大的問題:

curl -G http://localhost:8000

我們會(huì)看到直到第一個(gè)請(qǐng)求結(jié)束之前,最后一個(gè)請(qǐng)求一直處于掛起的狀態(tài)。服務(wù)器沒有返回響應(yīng)!這正如我們所想的那樣。Node.js事件循環(huán)運(yùn)行在一個(gè)多帶帶的線程中,如果這個(gè)線程被一個(gè)長(zhǎng)的同步計(jì)算阻塞,它將不能再執(zhí)行一個(gè)循環(huán)來響應(yīng)I"m alive!
我們必須知道,這種代碼顯然不能夠用于同時(shí)接收到多個(gè)請(qǐng)求的應(yīng)用程序。

但是不要對(duì)Node.js中絕望,我們可以通過幾種方式來解決這種情況。我們來分析一下最常見的兩種方案:

使用setImmediate

通常,CPU-bound算法是建立在一定規(guī)則之上的。它可以是一組遞歸調(diào)用,一個(gè)循環(huán),或者基于這些的任何變化/組合。 所以,對(duì)于我們的問題,一個(gè)簡(jiǎn)單的解決方案就是在這些步驟完成后(或者在一定數(shù)量的步驟之后),將控制權(quán)交還給事件循環(huán)。這樣,任何待處理的I / O仍然可以在事件循環(huán)在長(zhǎng)時(shí)間運(yùn)行的算法產(chǎn)生CPU的時(shí)間間隔中處理。對(duì)于這個(gè)問題而言,解決這一問題的方式是把算法的下一步在任何可能導(dǎo)致掛起的I/O請(qǐng)求之后運(yùn)行。這聽起來像是setImmediate()方法的完美用例(我們已經(jīng)在Chapter2-Node.js Essential Patterns中介紹過這一API)。

模式:使用setImmediate()交錯(cuò)執(zhí)行長(zhǎng)時(shí)間運(yùn)行的同步任務(wù)。
使用setImmediate進(jìn)行子集求和算法的步驟

現(xiàn)在我們來看看這個(gè)模式如何應(yīng)用于子集求和算法。 我們所要做的只是稍微修改一下subsetSum.js模塊。 為方便起見,我們將創(chuàng)建一個(gè)名為subsetSumDefer.js的新模塊,將原始的subsetSum類的代碼作為起點(diǎn)。
我們要做的第一個(gè)改變是添加一個(gè)名為_combineInterleaved()的新方法,它是我們正在實(shí)現(xiàn)的模式的核心:

_combineInterleaved(set, subset) {
  this.runningCombine++;
  setImmediate(() => {
    this._combine(set, subset);
    if(--this.runningCombine === 0) {
      this.emit("end");
    }
  });
}

正如我們所看到的,我們所要做的只是使用setImmediate()調(diào)用原始的同步的_combine()方法。然而,現(xiàn)在的問題是因?yàn)樵撍惴ú辉偈峭降模覀兏y以知道何時(shí)已經(jīng)完成了所有的組合的計(jì)算。

為了解決這個(gè)問題,我們必須使用非常類似于我們?cè)?b>Chapter3-Asynchronous Control Flow Patterns with Callbacks看到的異步并行執(zhí)行的模式來追溯_combine()方法的所有正在運(yùn)行的實(shí)例。 當(dāng)_combine()方法的所有實(shí)例都已經(jīng)完成運(yùn)行時(shí),觸發(fā)end事件,通知任何監(jiān)聽器,進(jìn)程需要做的所有動(dòng)作都已經(jīng)完成。

對(duì)于最終子集求和算法的重構(gòu)版本。首先,我們需要將_combine()方法中的遞歸步驟替換為異步:

_combine(set, subset) {
  for(let i = 0; i < set.length; i++) {
    let newSubset = subset.concat(set[i]);
    this._combineInterleaved(set.slice(i + 1), newSubset);
    this._processSubset(newSubset);
  }
}

通過上面的更改,我們確保算法的每個(gè)步驟都將使用setImmediate()在事件循環(huán)中排隊(duì),在事件循環(huán)隊(duì)列中I / O請(qǐng)求之后執(zhí)行,而不是同步運(yùn)行造成阻塞。

另一個(gè)小調(diào)整是對(duì)于start()方法:

start() {
  this.runningCombine = 0;
  this._combineInterleaved(this.set, []);
}

在前面的代碼中,我們將_combine()方法的運(yùn)行實(shí)例的數(shù)量初始化為0.我們還通過調(diào)用_combineInterleaved()來將調(diào)用替換為_combine(),并移除了end的觸發(fā),因?yàn)楝F(xiàn)在_combineInterleaved()是異步處理的。
通過這個(gè)最后的改變,我們的子集求和算法現(xiàn)在應(yīng)該能夠通過事件循環(huán)可以運(yùn)行的時(shí)間間隔交替地運(yùn)行其可能大量占用CPU的代碼,并且不會(huì)再造成阻塞。

最后更新app.js模塊,以便它可以使用新版本的SubsetSum

const http = require("http");
// const SubsetSum = require("./subsetSum");
const SubsetSum = require("./subsetSumDefer");
http.createServer(function(req, res) {
  // ...
})

和之前一樣的方式開始運(yùn)行,結(jié)果如下:

此時(shí),使用異步的方式運(yùn)行,不再會(huì)阻塞CPU了。

interleaving模式

正如我們所看到的,在保持應(yīng)用程序的響應(yīng)性的同時(shí)運(yùn)行一個(gè)CPU-bound的任務(wù)并不復(fù)雜,只需要使用setImmediate()把同步執(zhí)行的代碼變?yōu)楫惒綀?zhí)行即可。但是,這不是效率最好的模式;實(shí)際上,延遲執(zhí)行一個(gè)任務(wù)會(huì)額外帶來一個(gè)小的開銷,在這樣的算法中,積少成多,則會(huì)產(chǎn)生重大的影響。這通常是我們?cè)谶\(yùn)行CPU限制任務(wù)時(shí)所需要的最后一件事情,特別是如果我們必須將結(jié)果直接返回給用戶,這應(yīng)該在合理的時(shí)間內(nèi)進(jìn)行響應(yīng)。 緩解這個(gè)問題的一個(gè)可能的解決方案是只有在一定數(shù)量的步驟之后使用setImmediate(),而不是在每一步中使用它。但是這仍然不能解決問題的根源。

記住,這并不是說一旦我們想要通過異步的模式來執(zhí)行CPU-bound的任務(wù),我們就應(yīng)該不惜一切代價(jià)來避免這樣的額外開銷,事實(shí)上,從更廣闊的角度來看,同步任務(wù)并不一定非常漫長(zhǎng)和復(fù)雜,以至于造成麻煩。在繁忙的服務(wù)器中,即使是阻塞事件循環(huán)200毫秒的任務(wù)也會(huì)產(chǎn)生不希望的延遲。 在那些并發(fā)量并不高的服務(wù)器來說,即使產(chǎn)生一定短時(shí)的阻塞,也不會(huì)影響性能,使用交錯(cuò)執(zhí)行setImmediate()可能是避免阻塞事件循環(huán)的最簡(jiǎn)單也是最有效的方法。

process.nextTick()不能用于交錯(cuò)長(zhǎng)時(shí)間運(yùn)行的任務(wù)。正如我們?cè)?b>Chapter1-Welcome to the Node.js Platform中看到的,nextTick()會(huì)在任何未返回的I / O之前調(diào)度,并且在重復(fù)調(diào)用process.nextTick()最終會(huì)導(dǎo)致I / O饑餓。 你可以通過在前面的例子中用process.nextTick()替換setImmediate()來驗(yàn)證。
使用多個(gè)進(jìn)程

使用interleaving模式并不是我們用來運(yùn)行CPU-bound任務(wù)的唯一方法;防止事件循環(huán)阻塞的另一種模式是使用子進(jìn)程。我們已經(jīng)知道Node.js在運(yùn)行I / O密集型應(yīng)用程序(如Web服務(wù)器)的時(shí)候是最好的,因?yàn)?b>Node.js可以使得我們可以通過異步來優(yōu)化資源利用率。

所以,我們必須保持應(yīng)用程序響應(yīng)的最好方法是不要在主應(yīng)用程序的上下文中運(yùn)行昂貴的CPU-bound任務(wù),而是使用多帶帶的進(jìn)程。這有三個(gè)主要的優(yōu)點(diǎn):

同步任務(wù)可以全速運(yùn)行,而不需要交錯(cuò)執(zhí)行的步驟

Node.js中處理進(jìn)程很簡(jiǎn)單,可能比修改一個(gè)使用setImmediate()的算法更容易,并且多進(jìn)程允許我們輕松使用多個(gè)處理器,而無需擴(kuò)展主應(yīng)用程序本身。

如果我們真的需要超高的性能,可以使用低級(jí)語言,如性能良好的C

Node.js有一個(gè)充足的API庫帶來與外部進(jìn)程交互。 我們可以在child_process模塊中找到我們需要的所有東西。 而且,當(dāng)外部進(jìn)程只是另一個(gè)Node.js程序時(shí),將它連接到主應(yīng)用程序是非常容易的,我們甚至不覺得我們?cè)诒镜貞?yīng)用程序外部運(yùn)行任何東西。這得益于child_process.fork()函數(shù),該函數(shù)創(chuàng)建一個(gè)新的子Node.js進(jìn)程,并自動(dòng)創(chuàng)建一個(gè)通信管道,使我們能夠使用與EventEmitter非常相似的接口交換信息。來看如何用這個(gè)特性來重構(gòu)我們的子集求和算法。

將子集求和任務(wù)委托給其他進(jìn)程

重構(gòu)SubsetSum任務(wù)的目標(biāo)是創(chuàng)建一個(gè)多帶帶的子進(jìn)程,負(fù)責(zé)處理CPU-bound的任務(wù),使服務(wù)器的事件循環(huán)專注于處理來自網(wǎng)絡(luò)的請(qǐng)求:

我們將創(chuàng)建一個(gè)名為processPool.js的新模塊,它將允許我們創(chuàng)建一個(gè)正在運(yùn)行的進(jìn)程池。創(chuàng)建一個(gè)新的進(jìn)程代價(jià)昂貴,需要時(shí)間,因此我們需要保持它們不斷運(yùn)行,盡量不要產(chǎn)生中斷,時(shí)刻準(zhǔn)備好處理請(qǐng)求,使我們可以節(jié)省時(shí)間和CPU。此外,進(jìn)程池需要幫助我們限制同時(shí)運(yùn)行的進(jìn)程數(shù)量,以避免將使我們的應(yīng)用程序受到拒絕服務(wù)(DoS)攻擊。

接下來,我們將創(chuàng)建一個(gè)名為subsetSumFork.js的模塊,負(fù)責(zé)抽象子進(jìn)程中運(yùn)行的SubsetSum任務(wù)。 它的角色將與子進(jìn)程進(jìn)行通信,并將任務(wù)的結(jié)果展示為來自當(dāng)前應(yīng)用程序。

最后,我們需要一個(gè)worker(我們的子進(jìn)程),一個(gè)新的Node.js程序,運(yùn)行子集求和算法并將其結(jié)果轉(zhuǎn)發(fā)給父進(jìn)程。

DoS攻擊是企圖使其計(jì)劃用戶無法使用機(jī)器或網(wǎng)絡(luò)資源,例如臨時(shí)或無限中斷或暫停連接到Internet的主機(jī)的服務(wù)。
實(shí)現(xiàn)一個(gè)進(jìn)程池

先從構(gòu)建processPool.js模塊開始:

const fork = require("child_process").fork;
class ProcessPool {
  constructor(file, poolMax) {
      this.file = file;
      this.poolMax = poolMax;
      this.pool = [];
      this.active = [];
      this.waiting = [];
    } //...
}

在模塊的第一部分,引入我們將用來創(chuàng)建新進(jìn)程的child_process.fork()函數(shù)。 然后,我們定義ProcessPool的構(gòu)造函數(shù),該構(gòu)造函數(shù)接受表示要運(yùn)行的Node.js程序的文件參數(shù)以及池中運(yùn)行的最大實(shí)例數(shù)poolMax作為參數(shù)。然后我們定義三個(gè)實(shí)例變量:

pool表示的是準(zhǔn)備運(yùn)行的進(jìn)程

active表示的是當(dāng)前正在運(yùn)行的進(jìn)程列表

waiting包含所有這些請(qǐng)求的任務(wù)隊(duì)列,保存由于缺少可用的資源而無法立即實(shí)現(xiàn)的任務(wù)

ProcessPool類的acquire()方法,它負(fù)責(zé)取出一個(gè)準(zhǔn)備好被使用的進(jìn)程:

acquire(callback) {
  let worker;
  if(this.pool.length > 0) {  // [1]
    worker = this.pool.pop();
    this.active.push(worker);
    return process.nextTick(callback.bind(null, null, worker));
  }

  if(this.active.length >= this.poolMax) {  // [2]
    return this.waiting.push(callback);
  }

  worker = fork(this.file);  // [3]
  this.active.push(worker);
  process.nextTick(callback.bind(null, null, worker));
}

函數(shù)邏輯如下:

如果在進(jìn)程池中有一個(gè)準(zhǔn)備好被使用的進(jìn)程,我們只需將其移動(dòng)到active數(shù)組中,然后通過異步的方式調(diào)用其回調(diào)函數(shù)。

如果池中沒有可用的進(jìn)程,或者已經(jīng)達(dá)到運(yùn)行進(jìn)程的最大數(shù)量,必須等待。通過把當(dāng)前回調(diào)放入waiting數(shù)組。

如果我們還沒有達(dá)到運(yùn)行進(jìn)程的最大數(shù)量,我們將使用child_process.fork()創(chuàng)建一個(gè)新的進(jìn)程,將其添加到active列表中,然后調(diào)用其回調(diào)。

ProcessPool類的最后一個(gè)方法是release(),其目的是將一個(gè)進(jìn)程放回進(jìn)程池中:

release(worker) {
  if(this.waiting.length > 0) {  // [1]
    const waitingCallback = this.waiting.shift();
    waitingCallback(null, worker);
  }
  this.active = this.active.filter(w => worker !==  w);  // [2]
  this.pool.push(worker);
}

前面的代碼也很簡(jiǎn)單,其解釋如下:

如果在waiting任務(wù)隊(duì)列里面有任務(wù)需要被執(zhí)行,我們只需為這個(gè)任務(wù)分配一個(gè)進(jìn)程worker執(zhí)行。

否則,如果在waiting任務(wù)隊(duì)列中都沒有需要被執(zhí)行的任務(wù),我們則把active的進(jìn)程列表中的進(jìn)程放回進(jìn)程池中。

正如我們所看到的,進(jìn)程從來沒有中斷,只在為其不斷地重新分配任務(wù),使我們可以通過在每個(gè)請(qǐng)求不重新啟動(dòng)一個(gè)進(jìn)程達(dá)到節(jié)省時(shí)間和空間的目的。然而,重要的是要注意,這可能并不總是最好的選擇,這很大程度上取決于我們的應(yīng)用程序的要求。為減少進(jìn)程池長(zhǎng)期占用內(nèi)存,可能的調(diào)整如下:

在一個(gè)進(jìn)程空閑一段時(shí)間后,終止進(jìn)程,釋放內(nèi)存空間。

添加一個(gè)機(jī)制來終止或重啟沒有響應(yīng)的或者崩潰了的進(jìn)程。

父子進(jìn)程通信

現(xiàn)在我們的ProcessPool類已經(jīng)準(zhǔn)備就緒,我們可以使用它來實(shí)現(xiàn)SubsetSumFork模塊,SubsetSumFork的作用是與子進(jìn)程進(jìn)行通信得到子集求和的結(jié)果。前面曾說到,用child_process.fork()啟動(dòng)一個(gè)進(jìn)程也給了我們創(chuàng)建了一個(gè)簡(jiǎn)單的基于消息的管道,通過實(shí)現(xiàn)subsetSumFork.js模塊來看看它是如何工作的:

const EventEmitter = require("events").EventEmitter;
const ProcessPool = require("./processPool");
const workers = new ProcessPool(__dirname + "/subsetSumWorker.js", 2);

class SubsetSumFork extends EventEmitter {
  constructor(sum, set) {
    super();
    this.sum = sum;
    this.set = set;
  }

  start() {
    workers.acquire((err, worker) => {  // [1]
      worker.send({sum: this.sum, set: this.set});

      const onMessage = msg => {
        if (msg.event === "end") {  // [3]
          worker.removeListener("message", onMessage);
          workers.release(worker);
        }

        this.emit(msg.event, msg.data);  // [4]
      };

      worker.on("message", onMessage);  // [2]
    });
  }
}

module.exports = SubsetSumFork;

首先注意,我們?cè)?b>subsetSumWorker.js調(diào)用ProcessPool的構(gòu)造函數(shù)創(chuàng)建ProcessPool實(shí)例。 我們還將進(jìn)程池的最大容量設(shè)置為2

另外,我們?cè)噲D維持原來的SubsetSum類相同的公共API。實(shí)際上,SubsetSumForkEventEmitter的子類,它的構(gòu)造函數(shù)接受sumset,而start()方法則觸發(fā)算法的執(zhí)行,而這個(gè)SubsetSumFork實(shí)例運(yùn)行在一個(gè)多帶帶的進(jìn)程上。調(diào)用start()方法時(shí)會(huì)發(fā)生的情況:

我們?cè)噲D從進(jìn)程池中獲得一個(gè)新的子進(jìn)程。在創(chuàng)建進(jìn)程成功之后,我們嘗試向子進(jìn)程發(fā)送一條消息,包含sumsetsend()方法是Node.js自動(dòng)提供給child_process.fork()創(chuàng)建的所有進(jìn)程,這實(shí)際上與父子進(jìn)程之間的通信管道有關(guān)。

然后我們開始監(jiān)聽子進(jìn)程返回的任何消息,我們使用on()方法附加一個(gè)新的事件監(jiān)聽器(這也是所有以child_process.fork()創(chuàng)建的進(jìn)程提供的通信通道的一部分)。

在事件監(jiān)聽器中,我們首先檢查是否收到一個(gè)end事件,這意味著SubsetSum所有任務(wù)已經(jīng)完成,在這種情況下,我們刪除onMessage監(jiān)聽器并釋放worker,并將其放回進(jìn)程池中,不再讓其占用內(nèi)存資源和CPU資源。

worker{event,data}格式生成消息,使得任何時(shí)候一旦子進(jìn)程處理完畢任務(wù),我們?cè)谕獠慷寄芙邮盏竭@一消息。

這就是SubsetSumFork模塊現(xiàn)在我們來實(shí)現(xiàn)這個(gè)worker應(yīng)用程序。

與父進(jìn)程進(jìn)行通信

現(xiàn)在我們來創(chuàng)建subsetSumWorker.js模塊,我們的應(yīng)用程序,這個(gè)模塊的全部?jī)?nèi)容將在一個(gè)多帶帶的進(jìn)程中運(yùn)行:

const SubsetSum = require("./subsetSum");

process.on("message", msg => {  // [1]
  const subsetSum = new SubsetSum(msg.sum, msg.set);
  
  subsetSum.on("match", data => {  // [2]
    process.send({event: "match", data: data});
  });
  
  subsetSum.on("end", data => {
    process.send({event: "end", data: data});
  });
  
  subsetSum.start();
});

由于我們的handler處于一個(gè)多帶帶的進(jìn)程中,我們不必?fù)?dān)心這類CPU-bound任務(wù)阻塞事件循環(huán),所有的HTTP請(qǐng)求將繼續(xù)由主應(yīng)用程序的事件循環(huán)處理,而不會(huì)中斷。

當(dāng)子進(jìn)程開始啟動(dòng)時(shí),父進(jìn)程:

子進(jìn)程立即開始監(jiān)聽來自父進(jìn)程的消息。這可以通過process.on()函數(shù)輕松實(shí)現(xiàn)。我們期望從父進(jìn)程中唯一的消息是為新的SubsetSum任務(wù)提供輸入的消息。只要收到這樣的消息,我們創(chuàng)建一個(gè)SubsetSum類的新實(shí)例,并注冊(cè)matchend事件監(jiān)聽器。最后,我們用subsetSum.start()開始計(jì)算。

每次子集求和算法收到事件時(shí),把結(jié)果它封裝在格式為{event,data}的對(duì)象中,并將其發(fā)送給父進(jìn)程。這些消息然后在subsetSumFork.js模塊中處理,就像我們?cè)谇懊娴恼鹿?jié)中看到的那樣。

注意:當(dāng)子進(jìn)程不是Node.js進(jìn)程時(shí),則上述的通信管道就不可用了。在這種情況下,我們?nèi)匀豢梢酝ㄟ^在暴露于父進(jìn)程的標(biāo)準(zhǔn)輸入流和標(biāo)準(zhǔn)輸出流之上實(shí)現(xiàn)我們自己的協(xié)議來建立父子進(jìn)程通信的接口。
多進(jìn)程模式

嘗試新版本的子集求和算法,我們只需要替換HTTP服務(wù)器使用的模塊(文件app.js):

運(yùn)行結(jié)果如下:

更有趣的是,我們也可以嘗試同時(shí)啟動(dòng)兩個(gè)subsetSum任務(wù),我們可以充分看到多核CPU的作用。 相反,如果我們嘗試同時(shí)運(yùn)行三個(gè)subsetSum任務(wù),結(jié)果應(yīng)該是最后一個(gè)啟動(dòng)將掛起。這不是因?yàn)橹鬟M(jìn)程的事件循環(huán)被阻塞,而是因?yàn)槲覀優(yōu)?b>subsetSum任務(wù)設(shè)置了兩個(gè)進(jìn)程的并發(fā)限制。

正如我們所看到的,多進(jìn)程模式比interleaving模式更加強(qiáng)大和靈活;然而,由于單個(gè)機(jī)器提供的CPU和內(nèi)存資源量仍然是一個(gè)硬性限制,所以它仍然不可擴(kuò)展。在這種情況下,將負(fù)載分配到多臺(tái)機(jī)器上,則是更優(yōu)秀的解決辦法。

值得一提的是,在運(yùn)行CPU-bound任務(wù)時(shí),多線程可以成為多進(jìn)程的替代方案。目前,有幾個(gè)npm包公開了一個(gè)用于處理用戶級(jí)模塊的線程的API;其中最流行的是webworker-threads。但是,即使線程更輕量級(jí),完整的進(jìn)程也可以提供更大的靈活性,并具備更高更可靠的容錯(cuò)處理。
總結(jié)

本章講述以下三點(diǎn):

異步初始化模塊

批處理和緩存在Node.js異步中的運(yùn)用

使用異步或者多進(jìn)程來處理CPU-bound的任務(wù)

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/90718.html

相關(guān)文章

  • Nodejs中貫徹單元測(cè)試

    摘要:原文鏈接在中貫徹單元測(cè)試在團(tuán)隊(duì)合作中,你寫好了一個(gè)函數(shù),供隊(duì)友使用,跑去跟你的隊(duì)友說,你傳個(gè)值進(jìn)去,他就會(huì)返回結(jié)果了。如果你也為社區(qū)貢獻(xiàn)過,想更多人使用的話,加上單元測(cè)試吧,讓你的值得別人信賴。 原文鏈接:BlueSun | 在Nodejs中貫徹單元測(cè)試 在團(tuán)隊(duì)合作中,你寫好了一個(gè)函數(shù),供隊(duì)友使用,跑去跟你的隊(duì)友說,你傳個(gè)A值進(jìn)去,他就會(huì)返回B結(jié)果了。過了一會(huì),你隊(duì)友跑過來說,我傳個(gè)A...

    enali 評(píng)論0 收藏0
  • Node.js設(shè)計(jì)模式》基于回調(diào)的異步控制流

    摘要:編寫異步代碼可能是一種不同的體驗(yàn),尤其是對(duì)異步控制流而言。回調(diào)函數(shù)的準(zhǔn)則在編寫異步代碼時(shí),要記住的第一個(gè)規(guī)則是在定義回調(diào)時(shí)不要濫用閉包。為回調(diào)創(chuàng)建命名函數(shù),避免使用閉包,并將中間結(jié)果作為參數(shù)傳遞。 本系列文章為《Node.js Design Patterns Second Edition》的原文翻譯和讀書筆記,在GitHub連載更新,同步翻譯版鏈接。 歡迎關(guān)注我的專欄,之后的博文將在專...

    Chiclaim 評(píng)論0 收藏0
  • Node.js設(shè)計(jì)模式Node.js基本模式

    摘要:回調(diào)函數(shù)是在異步操作完成后傳播其操作結(jié)果的函數(shù),總是用來替代同步操作的返回指令。下面的圖片顯示了中事件循環(huán)過程當(dāng)異步操作完成時(shí),執(zhí)行權(quán)就會(huì)交給這個(gè)異步操作開始的地方,即回調(diào)函數(shù)。 本系列文章為《Node.js Design Patterns Second Edition》的原文翻譯和讀書筆記,在GitHub連載更新,同步翻譯版鏈接。 歡迎關(guān)注我的專欄,之后的博文將在專欄同步: Enc...

    Seay 評(píng)論0 收藏0
  • Node.js知識(shí)點(diǎn)詳解(二)HTTP模塊與事件模塊

    摘要:如果不能快速返回,就應(yīng)當(dāng)將其遷移到另一個(gè)進(jìn)程中模塊讓開發(fā)人員可以為事件設(shè)置偵聽器和處理器。我們需要給每個(gè)想要響應(yīng)的事件創(chuàng)建偵聽器 Node.js的http服務(wù)器 通過使用HTTP模塊的低級(jí)API,Node.js允許我們創(chuàng)建服務(wù)器和客戶端。剛開始學(xué)node的時(shí)候,我們都會(huì)遇到如下代碼: var http = require(http); http.createServer(funct...

    Lionad-Morotar 評(píng)論0 收藏0
  • Node.js 高級(jí)進(jìn)階之 fs 文件模塊學(xué)習(xí)

    摘要:回調(diào)函數(shù)提供兩個(gè)參數(shù)和,表示有沒有錯(cuò)誤發(fā)生,是文件內(nèi)容。文件關(guān)閉第一個(gè)參數(shù)文件時(shí)傳遞的文件描述符第二個(gè)參數(shù)回調(diào)函數(shù)回調(diào)函數(shù)有一個(gè)參數(shù)錯(cuò)誤,關(guān)閉文件后執(zhí)行。 showImg(//img.mukewang.com/5d3f890d0001836113660768.jpg); 人所缺乏的不是才干而是志向,不是成功的能力而是勤勞的意志。 —— 部爾衛(wèi) 文章同步到github博客:https:/...

    verano 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<