摘要:在中,又由于單線程的原因,異步編程又是非常重要的。方法有很多,,,觀察者,,,這些中處理異步編程的,都可以做到這種串行的需求。
引入
隊列對于任何語言來說都是重要的,io 的串行,請求的并行等等。在 JavaScript 中,又由于單線程的原因,異步編程又是非常重要的。昨天由一道面試題的啟發,我去實現 JS 中的異步隊列的時候,借鑒了 express 中間件思想,并發散到 co 實現 與 generator,以及 asyncToGenerator。
本次用例代碼都在此,可以 clone 下來試一下
異步隊列很多面試的時候會問一個問題,就是怎么讓異步函數可以順序執行。方法有很多,callback,promise,觀察者,generator,async/await,這些 JS 中處理異步編程的,都可以做到這種串行的需求。但是很麻煩的是,處理起來是挺麻煩的,你要不停的手動在上一個任務調用下一個任務。比如 promise,像這樣:
a.then(() => b.then(() => c.then(...)))
代碼嵌套的問題,有點嚴重。所以要是有一個隊列就好了,往隊列里添加異步任務,執行的時候讓隊列開始 run 就好了。先制定一下 API,我們有一個 queue,隊列都在內部維護,通過 queue.add 添加異步任務,queue.run 執行隊列,可以先想想。
參照之前 express 中間件的實現,給異步任務 async-fun 傳入一個 next 方法,只有調用 next,隊列才會繼續往下走。那這個 next 就至關重要了,它會控制隊列往后移一位,執行下一個 async-fun。我們需要一個隊列,來保存 async-fun,也需要一個游標,來控制順序。
以下是我的簡單實現:
const queue = () => { const list = []; // 隊列 let index = 0; // 游標 // next 方法 const next = () => { if (index >= list.length - 1) return; // 游標 + 1 const cur = list[++index]; cur(next); } // 添加任務 const add = (...fn) => { list.push(...fn); } // 執行 const run = (...args) => { const cur = list[index]; typeof cur === "function" && cur(next); } // 返回一個對象 return { add, run, } } // 生成異步任務 const async = (x) => { return (next) => {// 傳入 next 函數 setTimeout(() => { console.log(x); next(); // 異步任務完成調用 }, 1000); } } const q = queue(); const funs = "123456".split("").map(x => async(x)); q.add(...funs); q.run();// 1, 2, 3, 4, 5, 6 隔一秒一個。
我這里沒去構造一個 class,而是通過閉包的特性去處理的。queue 方法返回一個包含 add,run 的對象,add 即為像隊列中添加異步方法,run 就是開始執行。在 queue 內部,我們定義了幾個變量,list 用來保存隊列,index 就是游標,表示隊列現在走到哪個函數了,另外,最重要的是 next 方法,它是控制游標向后移動的。
run 函數一旦執行,隊列即開始 run。一開始執行隊列里的第一個 async 函數,我們把 next 函數傳給了它,然后由 async 函數決定什么時候執行 next,即開始執行下一個任務。我們沒有并不知道異步任務什么時候才算完成,只能通過打成某種共識,來告知 queue 某個任務完成。就是傳給任務的 next 函數。其實 async 返回的這個函數,有一個名字,叫 Thunk,后面我們會簡單介紹。
Thunkthunk 其實是為了解決 “傳名調用” 的。就是我傳給函數 A 一個表達式作參數 x + 1,但是我不確定這個 x + 1 什么時候會用到,以及會不會用到,如果在傳入就執行,這個求值是沒有必要的。所以就出現了一個臨時函數 Thunk,來保存這個表達式,傳入函數 A 中,待需要時再調用。
const thunk = () => { return x + 1; }; const A = thunk => { return thunk() * 2; }
嗯... 其實就是一個回調函數...
暫停其實只要某個任務,不繼續調用 next,隊列就已經不會繼續往下走了。比如我們 async 任務里加一個判斷(通常是異步 io,請求的容錯處理):
// queue 函數不變, // async 加限制條件 const async = (x) => { return (next) => { setTimeout(() => { if(x > 3) { console.log(x); q.run(); //重試 return; } console.log(x); next(); }, 1000); } } const q = queue(); const funs = "123456".split("").map(x => async(x)); q.add(...funs); q.run(); //打印結果: 1, 2, 3, 4, 4,4, 4,4 一直是 4
當執行到第四個任務的時候,x 是 4 的時候,不再繼續,就可以直接 return,不再調用 next。也有可能是出現錯誤,我們需要再重試,那就再調用 q.run 就可以了,因為游標保存的就是當前的 async 任務的索引。
另外,還有一種方式,就是添加 stop 方法。雖然感覺上面的方法就 OK 了,但是 stop 的好處在于,你可以主動的停止隊列,而不是在 async 任務里加限制條件。當然,有暫停就有繼續了,兩種方式,一個是 retry,就是重新執行上一次暫停的那個;另一個就是 goOn,不管上次最后一個如何,繼續下一個。上代碼:
const queue = () => { const list = []; let index = 0; let isStop = false; const next = () => { // 加限制 if (index >= list.length - 1 || isStop) return; const cur = list[++index]; cur(next); } const add = (...fn) => { list.push(...fn); } const run = (...args) => { const cur = list[index]; typeof cur === "function" && cur(next); } const stop = () => { isStop = true; } const retry = () => { isStop = false; run(); } const jump = () => { isStop = false; next(); } return { add, run, stop, retry, goOn, } } const async = (x) => { return (next) => { setTimeout(() => { console.log(x); next(); }, 1000); } } const q = queue(); const funs = "123456".split("").map(x => async(x)); q.add(...funs); q.run(); setTimeout(() => { q.stop(); }, 3000) setTimeout(() => { q.goOn(); }, 5000)
其實還是加攔截... 只不過從 async 函數中,換到了 next 函數里面,利用 isStop 這個變量切換 true/false,開關暫停。我加了兩個定時器,一個是 3 秒后暫停,一個是 5 秒后繼續,(請忽略定時器的誤差),按道理應該是隊列到三秒的時候,也就是第三個任務執行完暫停,然后再隔 2 秒,繼續。結果打印到 3 的時候,停住,兩秒之后繼續 4,5,6.
兩種思路,請結合場景思考問題。
并發上面的都是在做串行,假如 run 的時候我要并行呢... 也很簡單,把隊列一次性跑完就可以了。
// 為了代碼短一些,把 retry,goOn 先去掉了。 const queue = () => { const list = []; let index = 0; let isStop = false; let isParallel = false; const next = () => { if (index >= list.length - 1 || isStop || isParallel) return; const cur = list[++index]; cur(next); } const add = (...fn) => { list.push(...fn); } const run = (...args) => { const cur = list[index]; typeof cur === "function" && cur(next); } const parallelRun = () => { isParallel = true; for(const fn of list) { fn(next); } } const stop = () => { isStop = true; } return { add, run, stop, parallelRun, } } const async = (x) => { return (next) => { setTimeout(() => { console.log(x); next(); }, 1000); } } const q = queue(); const funs = "123456".split("").map(x => async(x)); q.add(...funs); q.parallelRun(); // 一秒后全部輸出 1, 2, 3, 4, 5, 6
我添加了一個 parallelRun 方法,用于并行,我覺得還是不要放到 run 函數里面了,抽象單元盡量細化還是。然后還加了一個 isParallel 的變量,默認是 false,考慮到 next 函數有可能會被調用,所以需要加一個攔截,保證不會處亂。
以上就是利用僅用 thunk 函數,結合 next 實現的異步隊列控制器,queue,跟你可以把 es6 代碼都改成 es5,保證兼容,當然是足夠簡單的,不適用于負責的場景 ?,僅提供思路。
generator 與 co為什么要介紹 generator,首先它也是用來解決異步回調的,另外它的使用方式也是調用 next 函數,generator 才會往下執行,默認是暫停狀態。yield 就相當于上面的 q.add,往隊列中添加任務。所以我也打算一起介紹,來更好的拓寬思路。發散思維,相似的知識點做好歸納,然后某一天你就會突然有一種:原來是這么回事,原來 xxx 是借鑒子 yyy,然后你又去研究 yyy - -。
簡介 generator簡單介紹回顧一下,因為有同學不經常用,肯定會有遺忘。
// 一個簡單的栗子,介紹它的用法 function* gen(x) { const y = yield x + 1; console.log(y, "here"); // 12 return y; } const g = gen(1); const value = g.next().value; // {value: 2, done: false} console.log(value); // 2 console.log(g.next(value + 10)); // {value: 12, done: true}
首先生成器其實就是一個通過函數體內部定義迭代算法,然后返回一個 iterator 對象。關于iterator,可以看我另一篇文章。
gen 執行返回一個對象 g,而不是返回結果。g 跟其他 iterator 一樣,通過調用 next 方法,保證游標 + 1,并且返回一個對象,包含了 value(yield 語句的結果),和 done(迭代器是否完成)。另外,yield 語句的值,比如上面代碼中的 y,是下一次調用 next 傳入的參數,也就是 value + 10,所以是 12.這樣設計是有好處的,因為這樣你就可以在 generator 內部,定義迭代算法的時候,拿到上次的結果(或者是處理后的結果)了。
但是 generator 有一個弊端就是不會自動執行,TJ 大神寫了一個 co,來自動執行 generator,也就是自動調用 next。它要求 yield 后面的函數/語句,必須是 thunk 函數或者是 promise 對象,因為只有這樣才會串聯執行完,這跟我們最開始實現 queue 的思路是一樣的。co 的實現有兩種思想,一個是 thunk,一個是 promise,我們都來試一下。
Thunk 實現還記得最開始的 queue 怎么實現的嗎,內部定義 next 函數,來保證游標的前進,async 函數會接收 next,去執行 next。到這里是一樣的,我們只要在 co 函數內部定義一個同樣的 next 函數,來保證繼續執行,那么 generator 是沒有提供索引的,不過它提供了 g.next 函數啊,所以我們只需要給 async 函數傳 g.next 不就好了,async 就是 yield 后面的語句啊,也就是 g.value。但是并不能直接傳 g.next,為什么?因為下一次的 thunk 函數,要通過 g.next 的返回值 value 取到啊,木有 value,下一個 thunk 函數不就沒了... 所以我們還是需要定義一個 next 函數去包裝一下的。
上代碼:
const coThunk = function(gen, ...params) { const g = gen(...params); const next = (...args) => { // args 用于接收參數 const ret = g.next(...args); // args 傳給 g.next,即賦值給上一個 yield 的值。 if(!ret.done) { // 去判斷是否完成 ret.value(next); // ret.value 就是下一個 thunk 函數 } } next(); // 先調用一波 } // 返回 thunk 函數的 asyncFn const asyncFn = (x) => { return (next) => { // 接收 next const data = x + 1; setTimeout(() => { next && next(data); }, 1000) } } const gen = function* (x) { const a = yield asyncFn(x); console.log(a); const b = yield asyncFn(a); console.log(b); const c = yield asyncFn(b); console.log(c); const d = yield asyncFn(c); console.log(d); console.log("done"); } coThunk(gen, 1); // 2, 3, 4, 5, done
這里定義的 gen,功能很簡單,就是傳入參數 1,然后每個 asyncFn 異步累加,即多個異步操作串行,并且下一個依賴上一個的返回值。
promise 實現其實思路都是一樣的,只不過調用 next,換到了 co 內部。因為 yield 后面的語句是 promise 對象的話,我們可以在 co 內部拿到了,然后在 g.next().value 的 then 語句執行 next 就好了。
// 定義 co const coPromise = function(gen) { // 為了執行后的結果可以繼續 then return new Promise((resolve, reject) => { const g = gen(); const next = (data) => { // 用于傳遞,只是換個名字 const ret = g.next(data); if(ret.done) { // done 后去執行 resolve,即co().then(resolve) resolve(data); // 最好把最后一次的結果給它 return; } ret.value.then((data) => { // then 中的第一個參數就是 promise 對象中的 resolve,data 用于接受并傳遞。 next(data); //調用下一次 next }) } next(); }) } const asyncPromise = (x) => { return new Promise((resolve) => { setTimeout(() => { resolve(x + 1); }, 1000) }) } const genP = function* () { const data1 = yield asyncPromise(1); console.log(data1); const data2 = yield asyncPromise(data1); console.log(data2); const data3 = yield asyncPromise(data2); console.log(data3); } coPromise(genP).then((data) => { setTimeout(() => { console.log(data + 1); // 5 }, 1000) }); // 一樣的 2, 3, 4, 5
其實 co 的源碼就是通過這兩種思路實現的,只不過它做了更多的 catch 錯誤的處理,而且支持你 yield 一個數組,對象,通過 promise.all 去實現。另外 yield thunk 函數的時候,它統一轉成 promise 去處理了。感興趣的可以去看一下 co,相信現在一定很明朗了。
async/await現在 JS 中用的最常用的異步解決方案了,不過 async 也是基于 generator 的實現,只不過是做了封裝。如果把 async/await 轉化成 generate/yield,只需要把 await 語法換成 yield,再扔到一個 generate 函數中,async 的執行換成 coPromise(gennerate) 就好了。
const asyncPromise = (x) => { return new Promise((resolve) => { setTimeout(() => { resolve(x + 1); }, 1000) }) } async function fn () { const data = await asyncPromise(1); console.log(data); } fn(); // 那轉化成 generator 可能就是這樣了。 coPromise 就是上面的實現 function* gen() { const data = yield asyncPromise(1); console.log(data); } coPromise(gen);
asyncToGenerator 就是這樣的原理,事實上 babel 也是這樣轉化的。
最后我首先是通過 express 的中間件思想,實現了一個 JS 中需求常見的 queue (異步隊列解決方案),然后再接著去實現一個簡單的 coThunk,最后把 thunk 換成 promise。因為異步解決方案在 JS 中是很重要的,去使用現成的解決方案的時候,如果能去深入思考一下實現的原理,我相信是有助于我們學習進步的。
歡迎 star 個人 blog:https://github.com/sunyongjia... ?
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/88768.html
摘要:的異步完成整個異步環節的有事件循環觀察者請求對象以及線程池。執行回調組裝好請求對象送入線程池等待執行,實際上是完成了異步的第一部分,回調通知是第二部分。異步編程是首個將異步大規模帶到應用層面的平臺。 showImg(https://segmentfault.com/img/remote/1460000011303472); 本文首發在個人博客:http://muyunyun.cn/po...
摘要:而事件循環是主線程中執行棧里的代碼執行完畢之后,才開始執行的。由此產生的異步事件執行會作為任務隊列掛在當前循環的末尾執行。在下,觀察者基于監聽事件的完成情況在下基于多線程創建。 主要問題: 1、JS引擎是單線程,如何完成事件循環的? 2、定時器函數為什么計時不準確? 3、回調與異步,有什么聯系和不同? 4、ES6的事件循環有什么變化?Node中呢? 5、異步控制有什么難點?有什么解決方...
摘要:執行棧清空后,檢查微任務隊列,將可執行的微任務全部執行。對象的錯誤具有冒泡性質,會一直向后傳遞,直到被捕獲為止。返回的遍歷器對象,可以依次遍歷函數內部的每一個狀態。表示函數里有異步操作,表示緊跟在后面的表達式需要等待結果。 javascript 是單線程執行的,由js文件自上而下依次執行。即為同步執行,若是有網絡請求或者定時器等業務時,不能讓瀏覽器傻傻等待到結束后再繼續執行后面的js吧...
摘要:以下展示它是如何工作的函數使用構造函數創建一個新的對象,并立即將其返回給調用者。在傳遞給構造函數的函數中,我們確保傳遞給,這是一個特殊的回調函數。 本系列文章為《Node.js Design Patterns Second Edition》的原文翻譯和讀書筆記,在GitHub連載更新,同步翻譯版鏈接。 歡迎關注我的專欄,之后的博文將在專欄同步: Encounter的掘金專欄 知乎專欄...
摘要:調用棧被清空,消息隊列中并無任務,線程停止,事件循環結束。不確定的時間點請求返回,將設定好的回調函數放入消息隊列。調用棧執行完畢執行消息隊列任務。請求并發回調函數執行順序無法確定。 異步編程 JavaScript中異步編程問題可以說是基礎中的重點,也是比較難理解的地方。首先要弄懂的是什么叫異步? 我們的代碼在執行的時候是從上到下按順序執行,一段代碼執行了之后才會執行下一段代碼,這種方式...
閱讀 3693·2021-11-22 15:24
閱讀 1599·2021-09-26 09:46
閱讀 1912·2021-09-14 18:01
閱讀 2610·2019-08-30 15:45
閱讀 3528·2019-08-30 14:23
閱讀 1873·2019-08-30 12:43
閱讀 2917·2019-08-30 10:56
閱讀 803·2019-08-29 12:20