摘要:調(diào)度器有了迭代器,還需要一個調(diào)度器才能按照預期的流程串行執(zhí)行需要的函數(shù),同時處理參數(shù)傳遞的過程我自己寫的代碼,調(diào)度的工作是由一起做的。
本文摘自我的博客,歡迎大家去逛逛。
又是兩周沒寫博客了,圣誕夜來水一發(fā)~
今天稍微看了下async的源碼,感覺很簡短精煉,總共也才1000多行代碼,好多值得學習的地方。主要看的是waterfall模塊,由于源碼中有好多不同接口公用的部分,因此看完waterfall這個接口的整個流程,差不多就cover了一半的async源碼了。
在沒有太多使用經(jīng)驗的情況下,直接看源碼,可能會遇到一些不明所以的細節(jié),看了可能也只能吸收很少的一部分。最好的方式我覺得莫過于自己先造一遍輪子,再看源碼了。
接口需求waterfall這個接口的命名還是很形象的
我要定義一個waterfall函數(shù),滿足以下需求:
可以按照Array給定的順序逐個執(zhí)行
所有函數(shù)執(zhí)行完畢后,調(diào)用指定的回調(diào)函數(shù)
前一個函數(shù)的輸出作為后一個函數(shù)的輸入
中途某一個函數(shù)執(zhí)行失敗,直接調(diào)用回調(diào)函數(shù)結(jié)束
需求的代碼描述如下:
async.waterfall([ function(callback) { callback(null, "one", "two"); }, function(arg1, arg2, callback) { console.log(arg1); console.log(arg2); // arg1 now equals "one" and arg2 now equals "two" callback(null, "three"); }, function(arg1, callback) { console.log(arg1); // arg1 now equals "three" callback(null, "done"); } ], function (err, result) { // result now equals "done" console.log(result); }); // 期望輸出: // one // two // three // done編碼
代碼組織了好一會兒,又調(diào)試了好一會后(中間遇到了一個關于arguments的坑,后面會講),終于成型了。輸出是按照預期的,和async源碼運行的結(jié)果相同,分析也都寫在注釋中:
var async = {}; async.waterfall = function (tasks, cb){ // 指向下一個將要執(zhí)行的函數(shù) var _index = 0; /** * 調(diào)用用戶指定的函數(shù) */ function _run(index, args, cb){ var task = tasks[index]; args.push(cb); task.apply(null, args); }; /** * 因為涉及到控制流的轉(zhuǎn)移,從框架轉(zhuǎn)移到用戶,再從用戶轉(zhuǎn)移到框架。 * 需要定義一個傳遞控制流的使者,就是這個_cb函數(shù) * 1.框架轉(zhuǎn)移到用戶:調(diào)用用戶函數(shù)的同時,把_cb作為參數(shù) * 2.用戶轉(zhuǎn)移到框架:用戶調(diào)用這個_cb,表明已執(zhí)行完該函數(shù),把控制交給框架。抑或結(jié)束,抑或執(zhí)行下一個函數(shù) */ function _cb(){ // 如果錯誤了,直接回調(diào)最外層的cb // 如果是最后一個,也直接調(diào)用最外層的cb if (arguments[0] || _index === tasks.length) { return cb && cb.apply(null, arguments); } /** * 取出回調(diào)參數(shù),作為下一個函數(shù)的輸入 * 因為回調(diào)的第一個參數(shù)是錯誤碼,所以要去掉第一個 */ // var rest = arguments.slice(1); //arguments并沒有slice方法,因此這樣會報錯 var rest = [].slice.call(arguments, 1); _run(_index++, rest, _cb); }; // 如果用戶沒有指定要串行執(zhí)行的函數(shù),則直接調(diào)用回調(diào) if (tasks.length === 0) return cb && cb(); _run(_index++, [], _cb); };坑
踩的這個坑是關于arguments的(在ES6語法中其實不推薦使用arguments的方式,因為語法已經(jīng)支持了rest param)。我一直以為一個函數(shù)的arguments屬性是一個Array,因為經(jīng)常可以看到通過arguments[0]的方式去獲取參數(shù),也從來沒有質(zhì)疑過。先來看看下面這一個例子:
function a (){ console.log(typeof arguments); console.log(arguments); console.log(arguments[0]); console.log(arguments["0"]); console.log(arguments.length); console.log([].slice.call(arguments, 1)); }; a("one", "two", "three"); /** * 輸出(chrome): * object * ["one", "two", "three"] * one * one * 3 * ["two", "three"] * * 輸出(node.js) * object * { "0": "one", "1": "two", "2": "three" } * one * one * 3 * [ "two", "three" ] */
可以看出,arguments對象并不是一個array對象。在chrome中雖然看上去打印出來的是Array,但它是可以展開的,里面還有好多參數(shù)。而且下標取值的時候不光可以用數(shù)字,也可以用字符串來取值。這也是為什么我寫的代碼注釋中arguments.slice(1);的方式會執(zhí)行錯誤(slice是Array才有的方法)。但是[].slice.call(arguments, 1);卻能執(zhí)行,說明arguments還是有一點slice的特性的,有點不太懂。感覺它同時繼承了dict和array兩種對象的部分特性。
原來的輪子貼上原來的代碼實現(xiàn):
async.waterfall = function (tasks, callback) { // 這種方式也是很聰明的一種方式,可以代替 callback && callback()的方式 // noop 是一個空函數(shù),什么也不執(zhí)行 callback = _once(callback || noop); if (!_isArray(tasks)) { var err = new Error("First argument to waterfall must be an array of functions"); return callback(err); } if (!tasks.length) { return callback(); } function wrapIterator(iterator) { return _restParam(function (err, args) { if (err) { callback.apply(null, [err].concat(args)); } else { var next = iterator.next(); if (next) { args.push(wrapIterator(next)); } else { args.push(callback); } ensureAsync(iterator).apply(null, args); } }); } wrapIterator(async.iterator(tasks))(); };
拋開一些異常處理的情況,就總體邏輯流程上還是有些區(qū)別的,下面就逐個來分析一下。
迭代器我是自己通過_index的局部變量來記錄當前執(zhí)行的函數(shù)的(得益于閉包的特性,這個局部變量可以一直保留著)。源碼實現(xiàn)了一種迭代器的方式去管理傳入的函數(shù)數(shù)組,非常優(yōu)雅,支持next特性,觀摩一下:
async.iterator = function (tasks) { function makeCallback(index) { function fn() { if (tasks.length) { tasks[index].apply(null, arguments); } return fn.next(); } fn.next = function () { return (index < tasks.length - 1) ? makeCallback(index + 1): null; }; return fn; } return makeCallback(0); };
通過async.iterator包裝以后返回的是一個迭代器對象,他同時又是一個函數(shù)可以直接執(zhí)行,包裝了用戶傳入的tasks中的第一個函數(shù)。
調(diào)度器有了迭代器,還需要一個調(diào)度器才能按照預期的流程串行執(zhí)行需要的函數(shù),同時處理參數(shù)傳遞的過程(我自己寫的代碼,調(diào)度的工作是由_cb一起做的)。
這個調(diào)度器實現(xiàn)的非常棒,由于它返回的也是一個函數(shù),因此和迭代器是屬于同一個維度的(如果是調(diào)用者和被調(diào)用者的關系則不屬于同一維度,他們的調(diào)用層次關系是同一層的)。_restParam函數(shù)可以暫時不用管它,因為從它的實現(xiàn)中可以看到,它本身和它參數(shù)中的函數(shù)是同一個維度的,它只是負責轉(zhuǎn)換了一下參數(shù)的結(jié)構(gòu)。完全可以理解為wrapIterator返回的就是被_restParam包著的那個函數(shù),_restParam只是一個參數(shù)結(jié)構(gòu)的轉(zhuǎn)換器,處理了參數(shù)結(jié)構(gòu)不一致的問題。
function _restParam(func, startIndex) { startIndex = startIndex == null ? func.length - 1 : +startIndex; return function() { var length = Math.max(arguments.length - startIndex, 0); var rest = Array(length); for (var index = 0; index < length; index++) { rest[index] = arguments[index + startIndex]; } switch (startIndex) { case 0: return func.call(this, rest); case 1: return func.call(this, arguments[0], rest); } // Currently unused but handle cases outside of the switch statement: // var args = Array(startIndex + 1); // for (index = 0; index < startIndex; index++) { // args[index] = arguments[index]; // } // args[startIndex] = rest; // return func.apply(this, args); }; }
回到調(diào)度器的上下文,在參數(shù)傳遞的過程中,args是上一個函數(shù)的返回結(jié)果組成的數(shù)組,再把下一個迭代器包裝一下作為該數(shù)組的最后一個元素。這樣在調(diào)用當前迭代器對應的函數(shù)的時候,用戶態(tài)上下文中的callback就是下一個用戶態(tài)函數(shù)對應的迭代器了。整個控制流程完全處在用戶層,框架層所做的事僅僅是參數(shù)結(jié)構(gòu)的轉(zhuǎn)換(畢竟apply函數(shù)需要的參數(shù)結(jié)構(gòu)是數(shù)組,而函數(shù)調(diào)用的時候則是展開的形式)。
奇淫技巧在閱讀代碼的過程中看到了不少巧妙的用法
在async源碼最后有這樣一段代碼:
// Node.js if (typeof module === "object" && module.exports) { module.exports = async; } // AMD / RequireJS else if (typeof define === "function" && define.amd) { define([], function () { return async; }); } // included directly via