摘要:響應式編程第一章響應式響應式編程第二章序列的深入研究響應式編程第三章構建并發程序響應式編程第四章構建完整的應用程序響應式編程第五章使用管理時間響應式編程第六章使用的響應式應用程序使用管理時間自從接觸,就開始在我的項目中使用它。
Rxjs 響應式編程-第一章:響應式
Rxjs 響應式編程-第二章:序列的深入研究
Rxjs 響應式編程-第三章: 構建并發程序
Rxjs 響應式編程-第四章 構建完整的Web應用程序
Rxjs 響應式編程-第五章 使用Schedulers管理時間
Rxjs 響應式編程-第六章 使用Cycle.js的響應式Web應用程序
自從接觸RxJS,就開始在我的項目中使用它。有一段時間我以為我知道如何有效地使用它,但有一個令人煩惱的問題:我怎么知道我使用的運算符是同步還是異步?換句話說,Operators到底什么時候發出通知?這似乎是正確使用RxJS的關鍵部分,但對我來說感覺有點模糊。
我認為,間隔運算符顯然是異步的,所以它在內部使用類似setTimeout的東西來發出項目。但是,如果我使用范圍怎么辦?它也是異步發射的嗎?它會阻止事件循環嗎?來自哪里?我到處都在使用這些運算符,但我對它們的內部并發模型知之甚少。
然后我了解了Schedulers。
Schedulers是一種強大的機制,可以精確管理應用程序中的并發性。它們允許您隨時更改其并發模型,從而對Observable如何發出通知進行細粒度控制。在本章中,您將學習如何使用調度程序并在常見場景中應用它們。我們將專注于測試,調度程序特別有用,您將學習如何制作自己的Schedulers。
使用SchedulersSchedulers是一種“安排”將來發生的操作的機制。 RxJS中的每個運算符在內部使用一個Schedulers,選擇該Schedulers以在最可能的情況下提供最佳性能。
讓我們看看我們如何改變運算符中的Schedulers以及這樣做的后果。 首先讓我們創建一個包含1,000個整數的數組:
var arr = []; for (var i=0; i<1000; i++) { arr.push(i); }
然后,我們從arr創建一個Observable并強制它通過訂閱它來發出所有通知。 在代碼中,我們還保存了發出所有通知所需的時間:
var timeStart = Date.now(); Rx.Observable.from(arr).subscribe( function onNext() {}, function onError() {}, function onCompleted() { console.log("Total time: " + (Date.now() - timeStart) + "ms"); } );
"Total time: 6ms”
六毫秒 - 不壞! from在內部使用Rx.Scheduler.currentThread,它計劃在任何當前工作完成后運行。 一旦啟動,它將同步處理所有通知。
在讓我們將Scheduler更改為Rx.Scheduler.default
var timeStart = Date.now(); Rx.Observable.from(arr, null, null, Rx.Scheduler.default).subscribe( function onNext() {}, function onError() {}, function onCompleted() { console.log("Total time: " + (Date.now() - timeStart) + "ms"); } );
"Total time: 5337ms”
哇,我們的代碼運行速度比使用currentThread Scheduler慢幾千倍。 那是因為默認的Scheduler異步運行每個通知。 我們可以通過在訂閱后添加一個簡單的日志語句來驗證這一點。
使用currentThread Scheduler:
Rx.Observable.from(arr).subscribe( ... ); console.log("Hi there!’);
"Total time: 8ms" "Hi there!"
使用默認Scheduler:
Rx.Observable.from(arr, null, null, Rx.Scheduler.timeout).subscribe( ... ); console.log("Hi there!’);
"Hi there!" "Total time: 5423ms"
因為使用默認Schedule的Observer以異步方式發出其項目,所以我們的console.log語句(它是同步的)在Observable甚至開始發出任何通知之前執行。 使用currentThread Scheduler,所有通知都會同步發生,因此只有在Observable發出所有通知時才會執行console.log語句。
因此,Scheduler確實可以改變我們的Observable的工作方式。 在我們的例子中,性能確實受到異步處理一個已經可用的大型陣列的影響。 但我們實際上可以使用Scheduler來提高性能。 例如,我們可以在對Observable執行昂貴的操作之前動態切換Scheduler:
arr.groupBy(function(value) { return value % 2 === 0; }) .map(function(value) { return value.observeOn(Rx.Scheduler.default); }) .map(function(groupedObservable) { return expensiveOperation(groupedObservable); });
在前面的代碼中,我們將數組中的所有值分為兩組:偶數和非偶數。 groupBy返回一個Observable,它為每個創建的組發出一個Observable。 這里是很酷的部分:在運行之前對每個分組的Observable中的項目進行昂貴的操作,我們使用observeOn將Scheduler切換到默認值,這樣昂貴的操作將異步執行,而不是阻塞事件循環
observeOn和subscribeOn在上一節中,我們使用observeOn運算符來更改某些Observable中的Scheduler。 observeOn和subscribeOn是返回Observable實例副本的運算符,但它使用的Scheduler我們作為參數傳遞的。
observeOn接受一個Scheduler并返回一個使用該Scheduler的新Observable。 它將使每個onNext調用在新的Scheduler中運行。
subscribeOn強制Observable的訂閱和取消訂閱工作(而不是通知)在特定的Scheduler上運行。 與observeOn一樣,它接受Scheduler作為參數。 例如,當我們在瀏覽器中運行并在訂閱調用中執行重要工作時,卻不希望用它來阻止UI線程,subscribeOn非常有用。
基本的Rx Scheduler讓我們在我們剛剛使用的Scheduler中深入了解一下。 RxJS的運算符最常用的是immediate,default和currentThread。
Immediate SchedulerImmediate Scheduler同步發出來自Observable的通知,因此無論何時在Immediate Scheduler上調度操作,它都將立即執行,從而阻塞該線程。 Rx.Observable.range是內部使用Immediate Scheduler序的運算符之一:
console.log("Before subscription"); Rx.Observable.range(1, 5) .do(function(a) { console.log("Processing value", a); }) .map(function(value) { return value * value; }) .subscribe(function(value) { console.log("Emitted", value); }); console.log("After subscription");
Before subscription Processing value 1 Emitted 1 Processing value 2 Emitted 4 Processing value 3 Emitted 9 Processing value 4 Emitted 16 Processing value 5 Emitted 25 After subscription
程序輸出按我們期望的順序發生。 每個console.log語句在當前項的通知之前運行。
何時使用它Immediate Scheduler非常適合于在每個通知中執行可預測且非常昂貴的操作的Observable。 此外,Observable最終必須調用onCompleted。
Default SchedulerDefault Scheduler以異步方式運行操作。 您可以將其視為setTimeout的等價物,其延遲為零毫秒,從而保持序列中的順序。 它使用其運行的平臺上可用的最有效的異步實現(例如,Node.js中的process.nextTick或瀏覽器中的setTimeout)。
讓我們使用前一個使用了range示例,并使其在默認的Scheduler上運行。 為此,我們將使用observeOn運算符:
console.log("Before subscription"); Rx.Observable.range(1, 5) .do(function(value) { console.log("Processing value", value); }) .observeOn(Rx.Scheduler.default) .map(function(value) { return value * value; }) .subscribe(function(value) { console.log("Emitted", value); }); console.log("After subscription");
Before subscription Processing value 1 Processing value 2 Processing value 3 Processing value 4 Processing value 5 After subscription Emitted 1 Emitted 4 Emitted 9 Emitted 16 Emitted 25
這個輸出有很大的不同。 我們的同步console.log語句輸出每個值,但我們使Observable在默認的Scheduler上運行,它會異步生成每個值。 這意味著我們在do運算符中的日志語句在平方值之前處理。
何時使用它Default Scheduler永遠不會阻塞事件循環,因此它非常適合涉及時間的操作,如異步請求。 它也可以在從未完成的Observable中使用,因為它不會在等待新通知時阻塞程序(這可能永遠不會發生)。
Current Thread SchedulercurrentThread Scheduler與Immediate Scheduler一樣是同步的,但是如果我們使用遞歸運算符,它會將要執行的操作排入隊列,而不是立即執行它們。 遞歸運算符是一個自己調度另一個運算符的運算符。 一個很好的例子就是repeat。 repeat運算符 - 如果沒有給出參數 - 將無限期地重復鏈中的先前Observable序列。
如果對使用Immediate Scheduler的運算符(例如return)調用repeat,則會遇到麻煩。 讓我們通過重復值10來嘗試這個,然后使用take只取重復的第一個值。 理想情況下,代碼將打印10次然后退出:
// Be careful: the code below will freeze your environment! Rx.Observable.return(10).repeat().take(1) .subscribe(function(value) { console.log(value); });
Error: Too much recursion
此代碼導致無限循環。 在訂閱時,如return調用onNext(10)然后onCompleted,這使得repeat再次訂閱return。 由于返回正在Immediate Scheduler上運行,因此該過程會重復,導致無限循環并且永遠不會結束。
但是如果相反我們通過將它作為第二個參數傳遞給currentThread Scheduler給return,我們得到:
var scheduler = Rx.Scheduler.currentThread; Rx.Observable.return(10, scheduler).repeat().take(1) .subscribe(function(value) { console.log(value); });
10
現在,當repeat重新訂閱返回時,新的onNext調用將排隊,因為之前的onCompleted仍在發生。 repeat然后返回一個可以使用的一次性對象,它調用onCompleted并通過重復處理取消repeat,最終從subscribe返回調用。
何時使用它currentThread Scheduler對于涉及遞歸運算符(如repeat)的操作非常有用,并且通常用于包含嵌套運算符的迭代。
動畫調度對于諸如canvas或DOM動畫之類的快速視覺更新,我們可以使用具有非常小時間間隔的interval運算符,或者我們可以在內部使用類似setTimeout的函數來調度通知。
但這兩種方法都不理想。 在他們兩個中我們都在瀏覽器上拋出所有這些更新,這可能無法足夠快地處理它們。之所以會發生這種情況,是因為瀏覽器正在嘗試渲染一個幀,然后它會收到渲染下一幀的指令,因此它會丟棄當前幀以保持速度。 結果是導致動畫的不流暢,卡頓。
瀏覽器具有處理動畫的原生方式,并且它們提供了一個使用它的API,稱為requestAnimationFrame。 requestAnimationFrame允許瀏覽器通過在最合適的時間排列動畫來優化性能,并幫助我們實現更流暢的動畫。
有專門的Scheduler處理requestAnimationFrameRxDOM庫附帶了一些額外的調度程序,其中一個是requestAnimationFrame Scheduler。
是的,你猜對了。 我們可以使用此Scheduler來改進我們的太空飛船視頻游戲。 在其中,我們建立了40ms的刷新速度 - 大約每秒25幀 - 通過在該速度下創建一個interval Observable,然后使用combineLatest以間隔設置的速度更新整個游戲場景(因為它是最快速更新的Observable) )...但誰知道瀏覽器使用這種技術丟幀了多少幀! 使用requestAnimationFrame可以獲得更好的性能。
讓我們創建一個使用Rx.Scheduler.requestAnimationFrame作為其調度程序的Observable。 請注意,它與interval運算符的工作方式類似:
ch_schedulers/starfield_raf.js
function animationLoop(scheduler) { return Rx.Observable.generate( 0, function() { return true; }, // Keep generating forever function(x) { return x + 1; }, // Increment internal value function(x) { return x; }, // Value to return on each notification Rx.Scheduler.requestAnimationFrame ); // Schedule to requestAnimationFrame }
現在,無論何時我們使用了25 FPS動畫,我們都可以使用animationLoop函數。 所以我們的Observable繪制了星星,之前看起來像這樣:
spaceship_reactive/spaceship.js
var StarStream = Rx.Observable.range(1, 250) .map(function() { return { x: parseInt(Math.random() * canvas.width), y: parseInt(Math.random() * canvas.height), size: Math.random() * 3 + 1 }; }) .toArray() .flatMap(function(arr) { return Rx.Observable.interval(SPEED).map(function() { return arr.map(function(star) { if (star.y >= canvas.height) { star.y = 0; } star.y += 3; return star; }); }); });
變成這樣:
ch_schedulers/starfield_raf.js
var StarStream = Rx.Observable.range(1, 250) .map(function() { return { x: parseInt(Math.random() * canvas.width), y: parseInt(Math.random() * canvas.height), size: Math.random() * 3 + 1 }; }) .toArray() .flatMap(function(arr) { return animationLoop().map(function() { return arr.map(function(star) { if (star.y >= canvas.height) { star.y = 0; } star.y += 3; return star; }); }); });
這給了我們一個更流暢的動畫。 代碼也更簡潔!
使用Scheduler進行測試測試可能是我們可以使用Scheduler的最引人注目的場景之一。 到目前為止,在本書中,我們一直在編寫我們的核心代碼而不考慮后果。 但是在現實世界的軟件項目中,我們將編寫測試以確保我們的代碼按照我們的意圖運行。
測試異步代碼很難。 我們經常遇到以下問題之一:
模擬異步事件很復雜且容易出錯。 測試的重點是避免bug和錯誤,但如果你的測試本身有錯誤,那這顯然是有問題的。
如果我們想要準確測試基于時間的功能,自動化測試變得非常緩慢。 例如,如果我們需要準確測試在嘗試檢索遠程文件四秒后調用錯誤,則每個測試至少需要花費很長時間才能運行結束。 如果我們不斷運行我們的測試套件,那將影響我們的開發時間。
TestSchedulerRxJS為我們提供了TestScheduler,一個旨在幫助測試的Scheduler。 TestScheduler允許我們在方便時模擬時間并創建確定性測試,確保它們100%可重復。 除此之外,它允許我們執行需要花費大量時間并將其壓縮到瞬間的操作,同時保持測試的準確性。
TestScheduler是VirtualTimeScheduler的專業化。 VirtualTimeSchedulers在“虛擬”時間而不是實時執行操作。 計劃的操作進入隊列并在虛擬時間內分配一個時刻。 然后,Scheduler在其時鐘前進時按順序運行操作。 因為它是虛擬時間,所以一切都立即運行,而不必等待指定的時間。 我們來看一個例子:
var onNext = Rx.ReactiveTest.onNext; QUnit.test("Test value order", function(assert) { var scheduler = new Rx.TestScheduler(); var subject = scheduler.createColdObservable( onNext(100, "first"), onNext(200, "second"), onNext(300, "third") ); var result = ""; subject.subscribe(function(value) { result = value }); scheduler.advanceBy(100); assert.equal(result, "first"); scheduler.advanceBy(100); assert.equal(result, "second"); scheduler.advanceBy(100); assert.equal(result, "third"); });
在前面的代碼中,我們測試了來自冷Observable的一些值以正確的順序到達。 為此,我們在TestScheduler中使用helper方法createColdObservable來創建一個Observable,它回放我們作為參數傳遞的onNext通知。 在每個通知中,我們指定應該發出通知值的時間。 在此之后,我們訂閱此Observable,手動提前調度程序中的虛擬時間,并檢查它是否確實發出了預期值。 如果示例在正常時間運行,則需要300毫秒,但由于我們使用TestScheduler來運行Observable,它將立即運行,但完全按照我們的順序。
寫一個真實的測試案例沒有比在現實世界中為時間敏感的任務編寫測試更好的方法來理解如何使用虛擬時間來縮短時間。 讓我們從我們在緩沖值中制作的地震查看器中恢復一個Observable:
quakes .pluck("properties") .map(makeRow) .bufferWithTime(500) .filter(function(rows) { return rows.length > 0; }) .map(function(rows) { var fragment = document.createDocumentFragment(); rows.forEach(function(row) { fragment.appendChild(row); }); return fragment; }) .subscribe(function(fragment) { table.appendChild(fragment); });
為了使代碼更易于測試,讓我們將Observable封裝在一個函數中,該函數接受我們在bufferWithTime運算符中使用的Scheduler。在Obpectables中參數化將要測試的Scheduler總是一個好主意。
ch_schedulers/testscheduler.js
function quakeBatches(scheduler) { return quakes.pluck("properties") .bufferWithTime(500, null, scheduler || null) .filter(function(rows) { return rows.length > 0; }); }
讓我們通過采取一些步驟來簡化代碼,但保持本質。 此代碼采用包含屬性屬性的Observable JSON對象,將它們緩沖到每500毫秒釋放的批次中,并過濾掉空的批次。
我們想要驗證此代碼是否有效,但我們絕對不希望每次運行測試時都等待幾秒鐘,以確保我們的緩沖按預期工作。 這是虛擬時間和TestScheduler將幫助我們的地方:
ch_schedulers/testscheduler.js
? var onNext = Rx.ReactiveTest.onNext; var onCompleted = Rx.ReactiveTest.onCompleted; var subscribe = Rx.ReactiveTest.subscribe; ? var scheduler = new Rx.TestScheduler(); ? var quakes = scheduler.createHotObservable( onNext(100, { properties: 1 }), onNext(300, { properties: 2 }), onNext(550, { properties: 3 }), onNext(750, { properties: 4 }), onNext(1000, { properties: 5 }), onCompleted(1100) ); ? QUnit.test("Test quake buffering", function(assert) { ? var results = scheduler.startScheduler(function() { return quakeBatches(scheduler) }, { created: 0, subscribed: 0, disposed: 1200 }); ? var messages = results.messages; console.log(results.scheduler === scheduler); ? assert.equal( messages[0].toString(), onNext(501, [1, 2]).toString() ); assert.equal( messages[1].toString(), onNext(1001, [3, 4, 5]).toString() ); assert.equal( messages[2].toString(), onCompleted(1100).toString() ); });
讓我們一步一步地剖析代碼:
我們首先從ReactiveTest加載一些輔助函數。 這些在虛擬時間內注冊onNext,onCompleted和訂閱事件。
我們創建了一個新的TestScheduler,它將推動整個測試。
我們使用TestScheduler中的方法createHotObservable創建一個假的熱Observable,它將在虛擬時間內模擬特定點的通知。 特別是,它在第一秒發出五個通知,并在1100毫秒完成。 每次它發出一個具有特定屬性的對象。
我們可以使用任何測試框架來運行測試。 對于我們的例子,我選擇了QUnit。
我們使用startScheduler方法創建一個使用測試調度程序的Observable。 第一個參數是一個函數,它創建Observable以使用我們的Scheduler運行。 在我們的例子中,我們只返回我們傳遞TestScheduler的quakeBatches函數。 第二個參數是一個對象,它包含我們想要創建Observable的不同虛擬時間,訂閱它并處理它。 對于我們的示例,我們在虛擬時間0開始和訂閱,并且我們在1200(虛擬)毫秒處理Observable。
startScheduler方法返回一個帶有scheduler和messages屬性的對象。 在消息中,我們可以在虛擬時間內找到Observable發出的所有通知。
我們的第一個斷言測試在501毫秒之后(在第一個緩沖時間限制之后),我們的Observable產生值1和2。
我們的第二個斷言測試在1001毫秒后,我們的Observable產生剩余的值3,4和5.最后,我們的第三個斷言檢查序列是否完全在1100毫秒完成,正如我們在熱的Observable地震中所指出的那樣。
該代碼以非常可靠的方式有效地測試我們的高度異步的Observable,并且無需跳過箍來模擬異步條件。我們只是指定我們希望代碼在虛擬時間內作出反應的時間,我們使用測試調度程序來運行整個操作。
總結Scheduler是RxJS的重要組成部分。 即使您可以在沒有明確使用它們的情況下走很長的路,它們也是一種先進的概念,它可以讓您在程序中微調并發性。虛擬時間的概念是RxJS獨有的,對于測試異步代碼等任務非常有用。
在下一章中,我們將使用Cycle.js,這是一種基于稱為單向數據流的概念來創建令人驚嘆的Web應用程序的反應方式。有了它,我們將使用現代技術創建一個快速的Web應用程序,從而顯著改進傳統的Web應用程序制作方式。
關注我的微信公眾號,更多優質文章定時推送
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/108136.html
摘要:由于技術棧的學習,筆者需要在原來函數式編程知識的基礎上,學習的使用。筆者在社區發現了一個非常高質量的響應式編程系列教程共篇,從基礎概念到實際應用講解的非常詳細,有大量直觀的大理石圖來輔助理解流的處理,對培養響應式編程的思維方式有很大幫助。 showImg(https://segmentfault.com/img/bVus8n); [TOC] 一. 響應式編程 響應式編程,也稱為流式編程...
摘要:響應式編程具有很強的表現力,舉個例子來說,限制鼠標重復點擊的例子。在響應式編程中,我把鼠標點擊事件作為一個我們可以查詢和操作的持續的流事件。這在響應式編程中尤其重要,因為我們隨著時間變換會產生很多狀態片段。迭代器模式的另一主要部分來自模式。 Rxjs 響應式編程-第一章:響應式Rxjs 響應式編程-第二章:序列的深入研究Rxjs 響應式編程-第三章: 構建并發程序Rxjs 響應式編程-...
摘要:原文是一個使用可觀察量隊列解決異步編程和基于事件編程的庫。提供了幾個管理異步事件的核心概念可觀察量,代表了一個由未來獲取到的值或事件組成的集合。相當于事件觸發器,是向多個廣播事件或推送值的唯一方法。 原文:http://reactivex.io/rxjs/manu... RxJS 是一個使用可觀察量(observable)隊列解決異步編程和基于事件編程的js庫。他提供了一個核心的類型O...
摘要:補充說明響應式編程采用了訂閱觀察者設計模式,使訂閱者可以將通知主動發送給各訂閱者。一個響應式編程的實現庫是一個庫,它通過使用序列來編寫異步和基于事件的程序。 或許響應式布局這個名單大家都聽過或者都自己實現過,那么響應式編程是什么呢?下面我們來具體聊一聊。 我的理解 從字面意思上我們可以大致理解為:所有的事件存在于一條事件總線上,所有的事件都可以看作未來某個時間將要發生的事件流(stre...
摘要:我們將使用,這是一個現代,簡單,漂亮的框架,在內部使用并將響應式編程概念應用于前端編程。驅動程序采用從我們的應用程序發出數據的,它們返回另一個導致副作用的。我們將使用來呈現我們的應用程序。僅采用長度超過兩個字符的文本。 Rxjs 響應式編程-第一章:響應式Rxjs 響應式編程-第二章:序列的深入研究Rxjs 響應式編程-第三章: 構建并發程序Rxjs 響應式編程-第四章 構建完整的We...
閱讀 3759·2021-11-25 09:43
閱讀 2191·2021-11-23 10:13
閱讀 822·2021-11-16 11:44
閱讀 2369·2019-08-29 17:24
閱讀 1384·2019-08-29 17:17
閱讀 3480·2019-08-29 11:30
閱讀 2584·2019-08-26 13:23
閱讀 2345·2019-08-26 12:10