摘要:在這些動作結束后,所有的隊列變化,就是整個組合任務狀態機的下一個狀態。如果組合狀態機停止了,向其中的任何一個對象執行或者操作都可以讓整個狀態機繼續動起來。
預覽。
先給出一個基礎類代碼。
const EventEmitter = require("events") const debug = require("debug")("transform") class Transform extends EventEmitter { constructor (options) { super() this.concurrency = 1 Object.assign(this, options) this.pending = [] this.working = [] this.finished = [] this.failed = [] this.ins = [] this.outs = [] } push (x) { this.pending.push(x) this.schedule() } pull () { let xs = this.finished this.finished = [] this.schedule() return xs } isBlocked () { return !!this.failed.length || // blocked by failed !!this.finished.length || // blocked by output buffer (lazy) this.outs.some(t => t.isBlocked()) // blocked by outputs transform } isStopped () { return !this.working.length && this.outs.every(t => t.isStopped()) } root () { return this.ins.length === 0 ? this : this.ins[0].root() } pipe (next) { this.outs.push(next) next.ins.push(this) return next } print () { debug(this.name, this.pending.map(x => x.name), this.working.map(x => x.name), this.finished.map(x => x.name), this.failed.map(x => x.name), this.isStopped()) this.outs.forEach(t => t.print()) } schedule () { // stop working if blocked if (this.isBlocked()) return this.pending = this.ins.reduce((acc, t) => [...acc, ...t.pull()], this.pending) while (this.working.length < this.concurrency && this.pending.length) { let x = this.pending.shift() this.working.push(x) this.transform(x, (err, y) => { this.working.splice(this.working.indexOf(x), 1) if (err) { x.error = err this.failed.push(x) } else { if (this.outs.length) { this.outs.forEach(t => t.push(y)) } else { if (this.root().listenerCount("data")) { this.root().emit("data", y) } else { this.finished.push(y) } } } this.schedule() this.root().emit("step", this.name, x.name) }) } } } module.exports = Transform
這段代碼目前還是雛形。
Transform類的設計類似node里的stream.Transform,但是它的設計目的不是buffering或流性能,而是作為并發編程的基礎模塊。
如果你熟悉流式編程,Transform的設計就很容易理解;在內部,Transform維護四個隊列:
pending是input buffer
working是當前正在執行的任務
finished是output buffer,它的目的不是為了buffer輸出,而是在沒有其他輸出辦法的時候作一下buffer。
failed是失敗的任務
Transform可以組合成DAG(Directed Acyclic Graph)使用,ins和outs用來存儲前置和后置Transform的引用,pipe方法負責設置這種雙向鏈接;最常見的情況是雙向鏈表,即ins和outs都只有一個對象。但把他們設計成數組就可以允許fan-in, fan-out的結構。
push和pull是write和read的等價物。
schedule是核心函數,它的任務是填充working隊列。在構造函數的參數里應該提供一個名字為transform的異步函數,schedule使用這個函數運行任務,在運行結束后,根據結果把任務推到failed隊列、推到下一個Transformer、用root節點的emit輸出、或者推到自己的finished隊列里。
Transform設計的核心思想,就是把并發任務的狀態,不使用對象屬性來編碼,只使用隊列位置來編碼;任何一個子任務,在任何時刻,僅存在于一個Transform對象的某個隊列中。換句話說,它等于把并發任務用資源來建模。如果你熟悉restful api對過程或狀態的建模方式就很容易理解這一點。
在Transform中,任何transform異步函數的返回,都是一個step;step是用Transform實現并發組合的最重要概念;
每一次transform函數返回,都會發生改變自己的隊列或向后續的Transform對象push任務的動作,這個push動作會觸發后續Transform的schedule方法;step結束時自己的schedule方法也會被調用,它會重新填充任務。在這些動作結束后,所有Transform的隊列變化,就是整個組合任務狀態機的下一個狀態。
這個狀態是顯式的,可以打印出來看,對debug非常有幫助;雖然異步i/o會讓這種狀態具有不確定性,但至少這里堅持了組合狀態機模型在處理并發問題時的同步原則,每個step結束時整體做一次狀態遷移,這個狀態遷移可以良好定義和觀察,這是Event模型下并發編程和Thread模型的重要區別。后者遇到并發邏輯引起的微妙錯誤時,很難捕捉現場分析,因為每一個Thread是黑盒。
從transform返回開始到emit(step)之間的一連串連鎖動作都是中間過程,最終實現一次完整的狀態遷移,這個過程必須是同步的。不應在這里出現異步、setImmediate或者process.nextTick等調用,這會帶來額外的不確定因素和極難發現和修復的bug。
在前面很長一段時間的并發編程實踐中,我指出過Promise的race/settle和錯誤處理邏輯在一些場景下的困難。Promise的過程邏輯不完備。我也花了很多力氣試圖在Process代數層面上把error, success, finish, race, settle, abort, pause, resume, 和他們的組合邏輯定義出來,但最終發現這很困難,因為實際編程中各種處理情況太多了。
所以在Transform的設計中,這些邏輯全部被拋棄了,因為事實上它們都不是真正的基礎并發邏輯。
Transform試圖實現組合的基礎并發邏輯只有一個:stopped。stopped的定義非常簡單:在一次step結束時,所有的Transform的working隊列為空,就是(整體的)stopped。這里要再次強調前述的step結束時同步方法的必要性,如果你在schedule里使用了異步方法調用,那么這個stopped的判斷就可能是錯的,因為schedule可能會在event loop里放置了一個馬上就會產生新的working任務的動作,而isStopped()的判斷就錯了。
stopped時,整體組合狀態可能是success, error, paused, 等等,都不難判斷,但目前代碼尚未穩定,我不打算加入語法糖。
在blocking i/o和同步的編程模式下,因果鏈和代碼書寫形式是一致的,但是在異步編程下,因果是異步和并發的,你只能去改變因,然后去觀察果,這是很多程序員不適應異步編程的根本原因,因為它要改變思維的習慣。
使用Transform來處理并發編程,仍然是在試圖重建這個因果鏈,即使他們是并發的,但是我們要有一個辦法把他們串起來;
前面說到的isStopped()是觀察到的果,能夠影響它的因,是isBlocked()函數,這個函數在schedule中被調用,如果估值為true,就會阻止schedule繼續向working隊列調度任務。
這里寫的isBlocked()的代碼實現只是一個例子;可以阻止schedule的原因可能有很多,比如出現錯誤,或者輸出buffer滿了,這些可以由實現者自己去定義。他們是policy,isBlocked()本身是mechanism。這個策略的粒度是每個Transform對象都可以有自己的策略。比如一個刪除臨時文件的操作,結果是無關痛癢的,那么它不該因為error就block。
isBlocked()邏輯可以象示例代碼里那樣向下chain起來,即只要有后續任務block了,前置任務就該停下來;這在絕大多數情況下都是合理的邏輯。因為雖然我們寫的是流式處理辦法,但是我們不是在處理octet-stream,追求性能的buffering和flow control都沒什么意義,如果前面任務在copy文件后面的任務要移動到目標文件夾,如果目標文件夾出了問題前面快速移動了大量文件最終也無法成功。
如果組合狀態機停止了,向其中的任何一個Transform對象執行push或者pull操作都可以讓整個狀態機繼續動起來。從root節點push是常見情況,從leaf節點pull也是,向中間節點push也是可能的;
資源建模的一個好處是你可以把狀態呈現給用戶,如果一個復制文件的任務因為文件名沖突而fail,你還可以讓用戶選擇處理策略,例如覆蓋或者重命名,在用戶選擇了操作之后,代碼會從某個Transform對象的failed隊列中取走一個對象,修改策略參數后重新push進去,那么這個狀態機可以繼續執行下去;這種可處理的錯誤不該成為block整個狀態機工作(復制其他文件和文件夾)的原因,除非他們積累到可觀的數量,在Transform模式下這些都非常容易實現,開發者可以很簡單的編寫isBlocked()的策略;
和node的stream一樣,Transform是lazy的,純粹的push machine可能會在中間節點buffer大量的任務,這對把任務作為流處理來說是不合適的;同時,Lazy對于停下來的組合狀態機能繼續run起來很重要,pull方法就是這個設計目的,它的schedule邏輯和push一樣,只是方向相反;如果設置了Leaf節點會因為輸出緩沖而block,它就可以block整個狀態機(或者其中的一部分),這在某些情況下也是有用的功能,如果整個狀態機的輸出因為某種原因暫時無法被立刻消費掉。
abort邏輯沒有在代碼中實現,但它很容易,可以遍歷所有的Transform,如果working隊列中的對象有abort方法,就調用它;這不是個立即的中止,該對象仍然要通過callback返回才能stop。如果要全局的block,可以把所有的Leaf Node都pipe到一個sink節點去,把這個sink節點強制設置成isBlocked,可以block全部。pause和resume也是非常類似的邏輯。
當然你可能會遇到類似finally的邏輯是必須去執行的,即使在發生錯誤的時候,它意味著這個Transform要向前傳遞isBlocked信息,但是它的Schedule方法不必停止工作。它可以一直運行到把所有隊列任務都處理完為止。
重載schedule方法也是可能的;例如你的任務之間有前后依賴的邏輯,你就可以重載schedule方法實現自己的調度方式。另外這里的schedule代碼只基于transform函數,很顯然如果transform本身是一個Transform對象它也應該工作,實現組合過程,包括Sequencer,Parallel等等,這些都是需要實現的。
總而言之,isBlocked和schedule是分開的邏輯,它們有各自不同的設計目的和使命,你可以重載它們獲得自己想要的結果。所以寫在這里的代碼,重要的不是他們的實現,而是其機制設計和界面設計,以及接口承諾;所有邏輯都是足夠原子化的,每個函數只做一件事,isBlocked是因,可以根據需要選擇策略,isStopped是果,通過step觀察和實現后續邏輯。應該避免通過向基類添加新方法來擴展能力,因為Transform使用隊列和任務描述狀態,這個描述是完備的,機制也是完善的。
就像我在另一篇介紹JavaScript語言的文章里寫的一樣,如果針對問題的模型具有完備性,即使抽象,也可以通過組合基本操作和概念獲得更多的特性,而不是在模型上增加概念,除非你認為模型不夠完備。
軟件工程中不是什么地方都要上狀態機(automaton)這么嚴格的模型工具,項目軟件里寫到bug數量足夠低就可以了,但是如果你要寫系統軟件或者對正確性有苛刻要求的東西,如果你沒有用狀態機建模,那么實際上你沒有完備設計。
當然有了完備設計也不意味著軟件沒bug了,但一個好的設計可以讓你對問題的理解、遇到問題時找到原因,有極大的幫助。
在復雜系統中,上述的同步方法狀態機組合,和Hierarchical的狀態機組合,是我們目前已知的兩種具有完備性的模型方法。但是兩者不同。雖然Transform的組合看起來是一個Hierarchy,但是它就像你在紙上畫一棵樹,它仍然是二維的,每個step的整體狀態聯動的遷移只是在populate一次狀態遷移的范圍,并不是幾何級數的增加狀態組合;所以我們仍然可以構筑一個線性的因果鏈,每個step因果因果這樣的繼續下去,和沒有并發的狀態機是一樣。
本質上這是數學歸納法:如果我們能證明如果n正確,那么n+1是正確的,這就可以證明chain下去的狀態組合即使是無窮也是正確的。
第二段代碼是使用的一個示例,這個class沒有必要,是為了保證和老代碼接口兼容,因為有一些項目內其他代碼的依賴性就不解釋了,很容易看明白大概邏輯;列在這里只是展示一下Transform使用時pipe過程的代碼樣子。
const Promise = require("bluebird") const path = require("path") const fs = Promise.promisifyAll(require("fs")) const EventEmitter = require("events") const debug = require("debug")("dircopy") const rimraf = require("rimraf") const Transform = require("../lib/transform") const { forceXstat } = require("../lib/xstat") const fileCopy = require("./filecopy") class DirCopy extends EventEmitter { constructor (src, tmp, files, getDirPath) { super() let dst = getDirPath() let pipe = new Transform({ name: "copy", concurrency: 4, transform: (x, callback) => (x.abort = fileCopy(path.join(src, x.name), path.join(tmp, x.name), (err, fingerprint) => { delete x.abort if (err) { callback(err) } else { callback(null, (x.fingerprint = fingerprint, x)) } })) }).pipe(new Transform({ name: "stamp", transform: (x, callback) => forceXstat(path.join(tmp, x.name), { hash: x.fingerprint }, (err, xstat) => err ? callback(err) : callback(null, (x.uuid = xstat.uuid, x))) })).pipe(new Transform({ name: "move", transform: (x, callback) => fs.link(path.join(tmp, x.name), path.join(dst, x.name), err => err ? callback(err) : callback(null, x)) })).pipe(new Transform({ name: "remove", transform: (x, callback) => rimraf(path.join(tmp, x.name), () => callback(null)) })).root() let count = 0 // drain data pipe.on("data", data => this.emit("data", data)) pipe.on("step", (tname, xname) => { debug("------------------------------------------") debug(`step ${count++}`, tname, xname) pipe.print() if (pipe.isStopped()) this.emit("stopped") }) files.forEach(name => pipe.push({ name })) pipe.print() this.pipe = pipe } } module.exports = DirCopy
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/87172.html
摘要:狀態機狀態機是模型層面的概念,與編程語言無關。狀態機具有良好的可實現性和可測試性。在代碼里,這是一個,但是我們在狀態機模型中要把他理解為事件。 這一篇是這個系列的開篇,沒有任何高級內容,就講講狀態機。 狀態機 狀態機是模型層面的概念,與編程語言無關。它的目的是為對象行為建模,屬于設計范疇。它的基礎概念是狀態(state)和事件(event)。 對象的內部結構描述為一組狀態S1, S2,...
摘要:前端日報精選譯用搭建探索生命周期中的匿名遞歸瀏覽器端機器智能框架深入理解筆記和屬性中文上海線下活動前端工程化架構實踐滬江技術沙龍掘金周二放送追加視頻知乎專欄第期聊一聊前端自動化測試上雙關語來自前端的小段子,你看得懂嗎眾成翻 2017-08-10 前端日報 精選 [譯] 用 Node.js 搭建 API Gateway探索 Service Worker 「生命周期」JavaScript ...
摘要:前端日報精選瀏覽器渲染過程與性能優化新版采用新引擎,速度是舊版的倍,名字和也變了中文與的使用個人文章在對比中理解掘金白潔血戰并發編程異步英文 2017-10-05 前端日報 精選 瀏覽器渲染過程與性能優化Firefox 新版采用新引擎,速度是舊版的 2 倍,名字和 Logo 也變了8 Key React Component DecisionsThe Intl.PluralRules A...
摘要:匿名函數是我們喜歡的一個重要原因,也是,它們分別消除了很多代碼細節上需要命名變量名或函數名的需要。這個匿名函數內,有更多的操作,根據的結果針對目錄和文件做了不同處理,而且有遞歸。 能和微博上的 @響馬 (fibjs作者)掰扯這個問題是我的榮幸。 事情緣起于知乎上的一個熱貼,諸神都發表了意見: https://www.zhihu.com/questio... 這一篇不是要說明白什么是as...
摘要:的科學定義是或者,它的標志性原語是。能解決一類對語言的實現來說特別無力的狀態機模型流程即狀態。容易實現是需要和的一個重要原因。 前面寫了一篇,寫的很粗,這篇講講一些細節。實際上Fiber/Coroutine vs Async/Await之爭不是一個簡單的continuation如何實現的問題,而是兩個完全不同的problem和solution domain。 Event Model 我...
閱讀 1802·2021-11-24 09:39
閱讀 2290·2021-09-30 09:47
閱讀 4144·2021-09-22 15:57
閱讀 1873·2019-08-29 18:36
閱讀 3577·2019-08-29 12:21
閱讀 590·2019-08-29 12:17
閱讀 1263·2019-08-29 11:25
閱讀 724·2019-08-28 18:26