摘要:舉例來說,每年都有生日是一道數據流,但是一個人的年齡卻是一個流。運行結果顯示,第二個在訂閱之后,獲得了數據流中最后毫秒事件內產生的和三個值。
原文:http://reactivex.io/rxjs/manu...
Subject是什么? RxJS的Subject是Observable的一個特殊類型,他可以將流中的值廣播給眾多觀察者(Observer)。
一般的Observalbe流是單一廣播制(每一個訂閱流的Observer擁有一個獨立的執行過程)。
一個Subject類似一道Observable數據流,但是可以對多個Observer進行多點廣播。這就像事件觸發器(EventEmitter):維護了一個偵聽器的列表。
每一個Subject就是一個Observable流。 對于給定的Subject,你可以訂閱它(subscribe),提供一個Observer,之后將會正常的接收傳遞來的數據。從Observer的角度來說,它是無法分辨一個流中的值是來源于單一廣播機制的Observable流還是一個Subject流。
在Subject內部,訂閱(subscribe)不會引起一個新的接收數據的過程。類似于其他庫或語言中的注冊事件偵聽器(addListener),它會直接把給定的Observer放入到一個注冊列表中。
每一個Subject也是一個觀察者(Observer)。 擁有next(v)、error(e)和complete()方法。往Subject中填充數據,只需要調用next(theValue)即可,它將會把數據廣播給所有已注冊的Observer。
以下的例子中,我們設定了2個訂閱Subject流的Observer,然后我們填充一些數據到Subject:
var subject = new Rx.Subject(); subject.subscribe({ next: (v) => console.log("observerA: " + v) }); subject.subscribe({ next: (v) => console.log("observerB: " + v) }); subject.next(1); subject.next(2);
得到了如下輸出:
observerA: 1 observerB: 1 observerA: 2 observerB: 2
因為Subject是一個Observer,因此你也可以將它作為任何Observable的subscribe()的參數,訂閱這個Observable流,就像下面這樣:
var subject = new Rx.Subject(); subject.subscribe({ next: (v) => console.log("observerA: " + v) }); subject.subscribe({ next: (v) => console.log("observerB: " + v) }); var observable = Rx.Observable.from([1, 2, 3]); observable.subscribe(subject); // You can subscribe providing a Subject
運行的結果:
observerA: 1 observerB: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3
在上面的方法中,我們使用Subject將一個單點廣播的Observable流轉換為多點廣播。這也佐證了,Subject是可以將任何Observable流共享給多個Observer的唯一途徑。
除了Subject,還有一些衍生出的專門的Subject:BehaviorSubject,ReplaySubject和AsyncSubject。
多路傳播的Observable流 Multicasted Observables相比于只能推送消息給單個的Observer的“單路Observable流”,利用具有多個訂閱者的Subject,“多路傳播的Observable流”可以有多個通知通道。
多路傳播的Observable在后臺通過使用Subject讓多個Observers能夠從同一個Observable流中獲取數據。
在后臺,multicast操作符是這樣工作的:Obersver訂閱潛在的Subject,而Subject又訂閱了源Observable流。下面的例子和之前使用observable.subscribe(subject)的情況類似:
var source = Rx.Observable.from([1, 2, 3]); var subject = new Rx.Subject(); var multicasted = source.multicast(subject); // These are, under the hood, `subject.subscribe({...})`: multicasted.subscribe({ next: (v) => console.log("observerA: " + v) }); multicasted.subscribe({ next: (v) => console.log("observerB: " + v) }); // This is, under the hood, `source.subscribe(subject)`: multicasted.connect();
multicast流返回了一個看似普通的Observable流,但是當訂閱的時候他表現的與Subject類似。這個流被稱作ConnectableObservable流,本質是一個Observable流,但擁有connect()方法。
connect()在內部執行了source.subscribe(subject),并且返回了一個你可以取消Observable流執行的Subscription。因此,當可被共享的Observable流開始時,connect()方法對于精確的判定執行過程很重要。
引用計數 Reference counting手動的調用connect()和執行Subscription往往是很累人的。我們當然希望可以在第一個Observer訂閱的時候就自動的執行connect(),并且最好在最后一個Observer取消訂閱(unsubscribe)的時候能自動取消流的執行。
考慮一下,處于下列操作順序時的表現情況:
第一個Observer訂閱了多路傳播的Observable流
多路傳播的Observable流呈被連接狀態
調用next()傳0給第一個Observer
第二個Observer訂閱多路傳播Observable流
調用next()傳1給第一個Observer
調用next()傳1給第二個Observer
第一個Observer取消訂閱
調用next()傳2給第二個Observer
第二個Observer取消訂閱
多路傳播Observable流的連接情況是未被訂閱狀態
為了顯式的調用connect()實現這個過程,我們編寫如下代碼:
var source = Rx.Observable.interval(500); var subject = new Rx.Subject(); var multicasted = source.multicast(subject); var subscription1, subscription2, subscriptionConnect; subscription1 = multicasted.subscribe({ next: (v) => console.log("observerA: " + v) }); // We should call `connect()` here, because the first // subscriber to `multicasted` is interested in consuming values subscriptionConnect = multicasted.connect(); setTimeout(() => { subscription2 = multicasted.subscribe({ next: (v) => console.log("observerB: " + v) }); }, 600); setTimeout(() => { subscription1.unsubscribe(); }, 1200); // We should unsubscribe the shared Observable execution here, // because `multicasted` would have no more subscribers after this setTimeout(() => { subscription2.unsubscribe(); subscriptionConnect.unsubscribe(); // for the shared Observable execution }, 2000);
如果我們想避免顯式的調用connect(),我們可以使用ConnectableObservable的refCount()方法(引用計數),他返回了一個存有眾多訂閱者的Observable流。當訂閱者的數量從0增加到1時,將會自動調用connect(),開始共享流。
當訂閱者的數量從1變為0,即將處于未訂閱狀態時,將會自動停止下一步的執行。
refCount使多路傳播Observable流在第一個訂閱者出現時自動啟動,在最后一個訂閱者離開時自動停止。
請看下面的例子:
var source = Rx.Observable.interval(500); var subject = new Rx.Subject(); var refCounted = source.multicast(subject).refCount(); var subscription1, subscription2, subscriptionConnect; // This calls `connect()`, because // it is the first subscriber to `refCounted` console.log("observerA subscribed"); subscription1 = refCounted.subscribe({ next: (v) => console.log("observerA: " + v) }); setTimeout(() => { console.log("observerB subscribed"); subscription2 = refCounted.subscribe({ next: (v) => console.log("observerB: " + v) }); }, 600); setTimeout(() => { console.log("observerA unsubscribed"); subscription1.unsubscribe(); }, 1200); // This is when the shared Observable execution will stop, because // `refCounted` would have no more subscribers after this setTimeout(() => { console.log("observerB unsubscribed"); subscription2.unsubscribe(); }, 2000);
執行過后的輸出是:
observerA subscribed observerA: 0 observerB subscribed observerA: 1 observerB: 1 observerA unsubscribed observerB: 2 observerB unsubscribed
refCount()方法只存在于ConnectableObservable中,他返回一個Observable流,而不是另一個ConnectableObservable流。
BehaviorSubjectBehaviorSubject是一類特異的Subject。具有返回“當前值”的特性。它存儲了流中最新的值并把它推送給自己的用戶,不論它的新舊與否,都能夠立即收到推送的這個“當前值”。
BehaviorSubject 非常有利于表示“變化中的值”。舉例來說,每年都有生日是一道Subject數據流,但是一個人的年齡卻是一個BehaviorSubject流。
來看下面的例子,BehaviorSubject以0為值進行初始化,第一個訂閱的Observer將會直接收到這個值。當2被填充入流之后,第二個Observer訂閱流時,盡管時間較晚,也會收到最新值2。
var subject = new Rx.BehaviorSubject(0); // 0 is the initial value subject.subscribe({ next: (v) => console.log("observerA: " + v) }); subject.next(1); subject.next(2); subject.subscribe({ next: (v) => console.log("observerB: " + v) }); subject.next(3);
輸出如下:
observerA: 0 observerA: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3ReplaySubject
ReplaySubject 很像BehaviorSubject,他會把時間線中較老的值推送給新的訂閱者們,而且他還可以記錄Observable流中一段時間的值。
ReplaySubject能夠記錄Observable流中的多個值,并將它們推送給新的訂閱者。
創建ReplaySubject時,你可以指定需要回放多少個值,像這樣:
var subject = new Rx.ReplaySubject(3); // buffer 3 values for new subscribers subject.subscribe({ next: (v) => console.log("observerA: " + v) }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({ next: (v) => console.log("observerB: " + v) }); subject.next(5);
輸出如下:
observerA: 1 observerA: 2 observerA: 3 observerA: 4 observerB: 2 observerB: 3 observerB: 4 observerA: 5 observerB: 5
在設定數據量大小之外,你還可以指定一個以毫秒為單位的窗口時間,用來確定記錄的數據所在的時間區間(數據有多老)。
在下面的例子中,我們使用了一個較大的數據量設定,同時還設定了500毫秒的窗口時間。
var subject = new Rx.ReplaySubject(100, 500 /* windowTime */); subject.subscribe({ next: (v) => console.log("observerA: " + v) }); var i = 1; setInterval(() => subject.next(i++), 200); setTimeout(() => { subject.subscribe({ next: (v) => console.log("observerB: " + v) }); }, 1000);
運行結果顯示,第二個Observer在訂閱之后,獲得了數據流中最后500毫秒事件內產生的3,4和5三個值。
observerA: 1 observerA: 2 observerA: 3 observerA: 4 observerA: 5 /************/ observerB: 3 observerB: 4 observerB: 5 /************/ observerA: 6 observerB: 6 ...AsyncSubject
AsyncSubject是Subject的另一個變化,他會在流發出complete通知時,將數據流中的最后一個值推送給所有訂閱流的Observer。
var subject = new Rx.AsyncSubject(); subject.subscribe({ next: (v) => console.log("observerA: " + v) }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({ next: (v) => console.log("observerB: " + v) }); subject.next(5); subject.complete();
輸出為:
With output:
observerA: 5 observerB: 5
AsyncSubject非常類似last()操作符,它會等待complete通知,并在那時推送流中的數據值。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/83625.html
摘要:原文是一個使用可觀察量隊列解決異步編程和基于事件編程的庫。提供了幾個管理異步事件的核心概念可觀察量,代表了一個由未來獲取到的值或事件組成的集合。相當于事件觸發器,是向多個廣播事件或推送值的唯一方法。 原文:http://reactivex.io/rxjs/manu... RxJS 是一個使用可觀察量(observable)隊列解決異步編程和基于事件編程的js庫。他提供了一個核心的類型O...
摘要:原文可觀察量是一種能惰性推送的集合,他可以包含多個值。是一種惰性計算方式,會在迭代中同步的返回到無限個可能的話返回值。使用一種處理方法,最終可能會或可能不會返回一個值。無論是同步方式還是異步方式,都可以擇其一來傳遞返回值。 原文:http://reactivex.io/rxjs/manu... Observable 可觀察量是一種能惰性推送的集合,他可以包含多個值。下面的表格對比了推送...
摘要:實例化一個對象向接受者發送一個消息流接受者訂閱消息,獲取消息流中的數據接受者訂閱消息,獲取消息流中的數據這樣兩路接受者都能拿到發送的數據流是的一個衍生類,它將數據流中的最新值推送給接受者。 Rxjs_Subject 及其衍生類 在 RxJS 中,Observable 有一些特殊的類,在消息通信中使用比較頻繁,下面主要介紹較常用的幾個類: 1/ Subject Subject 可以實現...
摘要:技術積累經過社區的努力學習資料還是很多的,官方中文文檔就已經很不錯,不過我們先從天精通初步感受一下然后配合一些中文文檔來補充知識點,最后再根據官方文檔來校驗整個知識體系。資料學習操作符的時候可以對照彈珠圖的交互彈珠圖的中文版中文文檔 前言 最近準備畢設,技術選型的時候因為功能的一些需求準備將RxJs融入到項目中,考慮RxJs的時候因為之前的技術棧還猶豫了一下,查了一些資料以及粗略瀏覽了...
閱讀 2020·2021-10-09 09:41
閱讀 1596·2021-09-28 09:36
閱讀 1100·2021-09-26 09:55
閱讀 1285·2021-09-10 11:17
閱讀 1141·2021-09-02 09:56
閱讀 2755·2019-08-30 12:58
閱讀 2927·2019-08-29 13:03
閱讀 1847·2019-08-26 13:40