摘要:傳統(tǒng)觀察者模式觀察者模式面向的需求是對(duì)象觀察者對(duì)對(duì)象被觀察者的某種變化高度敏感,需要在變化的一瞬間做出反應(yīng)。如下圖而作為一個(gè)工具庫(kù),使用的就是通用形式的觀察者模式。這是為了方便以上就是最基本的一個(gè)通過觀察者模式,來響應(yīng)事件的原理。
了解 RxJava 的應(yīng)該都知道是一個(gè)基于事務(wù)驅(qū)動(dòng)的庫(kù),響應(yīng)式編程的典范。提到事務(wù)驅(qū)動(dòng)和響應(yīng)就不得不說說,設(shè)計(jì)模式中觀察者模式,已經(jīng)了解的朋友,可以直接跳過觀察者模式的介紹,直接到 RxJava 源碼中對(duì)于觀察者的應(yīng)用。
觀察者模式該部分結(jié)合自扔物線的 《給 Android 開發(fā)者的 RxJava 詳解》, 強(qiáng)烈推薦剛接觸 RxJava 的朋友閱讀。
傳統(tǒng)觀察者模式觀察者模式面向的需求是:A 對(duì)象(觀察者)對(duì) B 對(duì)象(被觀察者)的某種變化高度敏感,需要在 B 變化的一瞬間做出反應(yīng)。舉個(gè)例子,新聞里喜聞樂見的警察抓小偷,警察需要在小偷伸手作案的時(shí)候?qū)嵤┳ゲ丁T谶@個(gè)例子里,警察是觀察者,小偷是被觀察者,警察需要時(shí)刻盯著小偷的一舉一動(dòng),才能保證不會(huì)漏過任何瞬間。程序的觀察者模式和這種真正的『觀察』略有不同,觀察者不需要時(shí)刻盯著被觀察者(例如 A 不需要每過 2ms 就檢查一次 B 的狀態(tài)),而是采用注冊(cè)( Register )或者稱為訂閱( Subscribe )的方式,告訴被觀察者:我需要你的某某狀態(tài),你要在它變化的時(shí)候通知我。 Android 開發(fā)中一個(gè)比較典型的例子是點(diǎn)擊監(jiān)聽器 OnClickListener 。對(duì)設(shè)置 OnClickListener 來說, View 是被觀察者, OnClickListener 是觀察者,二者通過 setOnClickListener() 方法達(dá)成訂閱關(guān)系。訂閱之后用戶點(diǎn)擊按鈕的瞬間,Android Framework 就會(huì)將點(diǎn)擊事件發(fā)送給已經(jīng)注冊(cè)的 OnClickListener 。采取這樣被動(dòng)的觀察方式,既省去了反復(fù)檢索狀態(tài)的資源消耗,也能夠得到最高的反饋速度。當(dāng)然,這也得益于我們可以隨意定制自己程序中的觀察者和被觀察者,而警察叔叔明顯無法要求小偷『你在作案的時(shí)候務(wù)必通知我』。
OnClickListener 的模式大致如下圖:
如圖所示,通過 setOnClickListener() 方法,Button 持有 OnClickListener 的引用(這一過程沒有在圖上畫出);當(dāng)用戶點(diǎn)擊時(shí),Button 自動(dòng)調(diào)用 OnClickListener 的 onClick() 方法。另外,如果把這張圖中的概念抽象出來(Button -> 被觀察者、OnClickListener -> 觀察者、setOnClickListener() -> 訂閱,onClick() -> 事件),就由專用的觀察者模式(例如只用于監(jiān)聽控件點(diǎn)擊)轉(zhuǎn)變成了通用的觀察者模式。如下圖:
而 RxJava 作為一個(gè)工具庫(kù),使用的就是通用形式的觀察者模式。
RxJava 中觀察者模式RxJava 有四個(gè)基本概念:Observable (可觀察者,即被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。Observable 和 Observer 通過 subscribe() 方法實(shí)現(xiàn)訂閱關(guān)系,從而 Observable 可以在需要的時(shí)候發(fā)出事件來通知 Observer。
與傳統(tǒng)觀察者模式不同, RxJava 的事件回調(diào)方法除了普通事件 onNext() (相當(dāng)于 onClick() / onEvent())之外,還定義了兩個(gè)特殊的事件:onCompleted() 和 onError()。
onCompleted(): 事件隊(duì)列完結(jié)。RxJava 不僅把每個(gè)事件多帶帶處理,還會(huì)把它們看做一個(gè)隊(duì)列。RxJava 規(guī)定,當(dāng)不會(huì)再有新的 onNext() 發(fā)出時(shí),需要觸發(fā) onCompleted() 方法作為標(biāo)志。
onError(): 事件隊(duì)列異常。在事件處理過程中出異常時(shí),onError() 會(huì)被觸發(fā),同時(shí)隊(duì)列自動(dòng)終止,不允許再有事件發(fā)出。
在一個(gè)正確運(yùn)行的事件序列中, onCompleted() 和 onError() 有且只有一個(gè),并且是事件序列中的最后一個(gè)。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在隊(duì)列中調(diào)用了其中一個(gè),就不應(yīng)該再調(diào)用另一個(gè)。并且只要onCompleted() 和 onError() 中有一個(gè)調(diào)用了,都會(huì)中止 onNext() 的調(diào)用。
RxJava 的觀察者模式大致如下圖:
基于以上觀點(diǎn), RxJava 的基本實(shí)現(xiàn)主要有三點(diǎn):
創(chuàng)建 ObserverObserver 即觀察者,它決定事件觸發(fā)的時(shí)候?qū)⒂性鯓拥男袨椤?RxJava 中的 Observer 接口的實(shí)現(xiàn)方式:
Observerobserver = new Observer () { @Override public void onNext(String s) { Log.d(tag, "Item: " + s); } @Override public void onCompleted() { Log.d(tag, "Completed!"); } @Override public void onError(Throwable e) { Log.d(tag, "Error!"); } };
除了 Observer 接口之外,RxJava 還內(nèi)置了一個(gè)實(shí)現(xiàn)了 Observer 的抽象類:Subscriber。 Subscriber 對(duì) Observer 接口進(jìn)行了一些擴(kuò)展,但他們的基本使用方式是完全一樣的:
Subscribersubscriber = new Subscriber () { @Override public void onNext(String s) { Log.d(tag, "Item: " + s); } @Override public void onCompleted() { Log.d(tag, "Completed!"); } @Override public void onError(Throwable e) { Log.d(tag, "Error!"); } };
不僅基本使用方式一樣,實(shí)質(zhì)上,在 RxJava 的 subscribe 過程中,Observer 也總是會(huì)先被轉(zhuǎn)換成一個(gè) Subscriber 再使用。
// Observable.java 源碼 public final Subscription subscribe(final Observer super T> observer) { if (observer instanceof Subscriber) { // 如果是 Subscriber 的子類,直接轉(zhuǎn)化為 Subscriber return subscribe((Subscriber super T>)observer); } if (observer == null) { throw new NullPointerException("observer is null"); } return subscribe(new ObserverSubscriber(observer)); }
// ObserverSubscriber.java public final class ObserverSubscriberextends Subscriber { ... }
通過源碼可以看到,傳入的 Observer 最終還是會(huì)轉(zhuǎn)化為 Subscriber 來使用。
所以如果你只想使用基本功能,選擇 Observer 和 Subscriber 是完全一樣的。它們的區(qū)別對(duì)于使用者來說主要有兩點(diǎn):
onStart(): 這是 Subscriber 增加的方法。它會(huì)在 subscribe 剛開始,而事件還未發(fā)送之前被調(diào)用,可以用于做一些準(zhǔn)備工作,例如數(shù)據(jù)的清零或重置。這是一個(gè)可選方法,默認(rèn)情況下它的實(shí)現(xiàn)為空。需要注意的是,如果對(duì)準(zhǔn)備工作的線程有要求(例如彈出一個(gè)顯示進(jìn)度的對(duì)話框,這必須在主線程執(zhí)行), onStart() 就不適用了,因?yàn)樗偸窃?subscribe 所發(fā)生的線程被調(diào)用,而不能指定線程。要在指定的線程來做準(zhǔn)備工作,可以使用 doOnSubscribe() 方法。
// Subscriber.java public void onStart() { // do nothing by default }
unsubscribe(): 這是 Subscriber 所實(shí)現(xiàn)的另一個(gè)接口 Subscription 的方法,用于取消訂閱。在這個(gè)方法被調(diào)用后,Subscriber 將不再接收事件。一般在這個(gè)方法調(diào)用前,可以使用 isUnsubscribed() 先判斷一下狀態(tài)。 unsubscribe() 這個(gè)方法很重要,因?yàn)樵?subscribe() 之后, Observable 會(huì)持有 Subscriber 的引用,這個(gè)引用如果不能及時(shí)被釋放,將有內(nèi)存泄露的風(fēng)險(xiǎn)。所以最好保持一個(gè)原則:要在不再使用的時(shí)候盡快在合適的地方(例如 onPause() onStop() 等方法中)調(diào)用 unsubscribe() 來解除引用關(guān)系,以避免內(nèi)存泄露的發(fā)生。
// Subscriber.java @Override public final void unsubscribe() { subscriptions.unsubscribe(); } @Override public final boolean isUnsubscribed() { return subscriptions.isUnsubscribed(); }創(chuàng)建 Observable
Observable 即被觀察者,它決定什么時(shí)候觸發(fā)事件以及觸發(fā)怎樣的事件。例如 create() 方法
Observable observable = Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber super String> subscriber) { subscriber.onNext("Hello"); subscriber.onNext("Hi"); subscriber.onNext("Aloha"); subscriber.onCompleted(); } });
可以看到,這里傳入了一個(gè) OnSubscribe 對(duì)象作為參數(shù)。OnSubscribe 會(huì)被存儲(chǔ)在返回的 Observable 對(duì)象中,它的作用相當(dāng)于一個(gè)計(jì)劃表,當(dāng) Observable 被訂閱的時(shí)候,OnSubscribe 的 call() 方法會(huì)自動(dòng)被調(diào)用,事件序列就會(huì)依照設(shè)定依次觸發(fā)(對(duì)于上面的代碼,就是觀察者Subscriber 將會(huì)被調(diào)用三次 onNext() 和一次 onCompleted()。這樣,由被觀察者調(diào)用了觀察者的回調(diào)方法,就實(shí)現(xiàn)了由被觀察者向觀察者的事件傳遞,即觀察者模式。
create() 方法是 RxJava 最基本的創(chuàng)造事件序列的方法。基于這個(gè)方法, RxJava 還提供了一些方法用來快捷創(chuàng)建事件隊(duì)列,例如 just(), from()
訂閱 Subscribe創(chuàng)建了 Observable 和 Observer 之后,再用 subscribe() 方法將它們聯(lián)結(jié)起來,整條鏈子就可以工作了。代碼形式很簡(jiǎn)單:
observable.subscribe(observer); // 或者: observable.subscribe(subscriber);
有人可能會(huì)注意到, subscribe() 這個(gè)方法有點(diǎn)怪:它看起來是『observalbe 訂閱了 observer / subscriber』而不是『observer / subscriber 訂閱了 observalbe』,這看起來就像『雜志訂閱了讀者』一樣顛倒了對(duì)象關(guān)系。這讓人讀起來有點(diǎn)別扭,不過如果把 API 設(shè)計(jì)成 observer.subscribe(observable) / subscriber.subscribe(observable) ,雖然更加符合思維邏輯,但對(duì)流式 API 的設(shè)計(jì)就造成影響了,比較起來明顯是得不償失的。
整個(gè)過程中對(duì)象間的關(guān)系如下圖:
// 例子 Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber super String> subscriber) { subscriber.onNext("Hello"); subscriber.onNext("Hi"); subscriber.onNext("Aloha"); subscriber.onCompleted(); } }).subscribe(new Subscriber () { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("value: " + s); } });
log 信息
value: Hello value: Hi value: Aloha onCompleted
看到上面代碼,可能會(huì)有人跟我一樣不明白, create() 中的 OnSubscribe 中 call() 的 Subscriber 是怎么樣最終就變成了 subscribe() 中的 Subscriber。
下面來一下 Observable.subscribe(Subscriber) 的內(nèi)部實(shí)現(xiàn)是這樣的(僅核心代碼):
// 注意:這不是 subscribe() 的源碼,而是將源碼中與性能、兼容性、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼。 staticSubscription subscribe(Subscriber super T> subscriber, Observable observable) { ... // 可以用于做一些準(zhǔn)備工作,例如數(shù)據(jù)的清零或重置, 默認(rèn)情況下它的實(shí)現(xiàn)為空 subscriber.onStart(); if (!(subscriber instanceof SafeSubscriber)) { // 強(qiáng)制轉(zhuǎn)化為 SafeSubscriber 是為了保證 onCompleted 或 onError 調(diào)用的時(shí)候會(huì)中止 onNext 的調(diào)用 subscriber = new SafeSubscriber (subscriber); } ... // // onObservableStart() 默認(rèn)返回的就是 observable.onSubscribe RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); // onObservableReturn() 默認(rèn)也是返回 subscriber return RxJavaHooks.onObservableReturn(subscriber); ... }
通過源碼可以看到,subscriber() 實(shí)際就做了 4 件事情
調(diào)用 Subscriber.onStart() 。這個(gè)方法在前面已經(jīng)介紹過,是一個(gè)可選的準(zhǔn)備方法。
將傳入的 Subscriber 轉(zhuǎn)化為 SafeSubscriber, 為了保證 onCompleted 或 onError 調(diào)用的時(shí)候會(huì)中止 onNext 的調(diào)用。
// 注意:這不是 SafeSubscriber 的源碼,而是將源碼中與性能、兼容性、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼。 public class SafeSubscriberextends Subscriber { private final Subscriber super T> actual; boolean done; // 通過改標(biāo)志來保證 onCompleted 或 onError 調(diào)用的時(shí)候會(huì)中止 onNext 的調(diào)用 public SafeSubscriber(Subscriber super T> actual) { super(actual); this.actual = actual; } @Override public void onCompleted() { if (!done) { done = true; ... actual.onCompleted(); ... unsubscribe(); // 取消訂閱,結(jié)束事務(wù) } } @Override public void onError(Throwable e) { ... if (!done) { done = true; _onError(e); } } @Override public void onNext(T t) { if (!done) { // done 為 true 時(shí),中止傳遞 actual.onNext(t); } } @SuppressWarnings("deprecation") protected void _onError(Throwable e) { ... actual.onError(e); ... unsubscribe(); ... } }
通過代碼可以看出來,通過 SafeSubscriber 中的布爾變量 done 來做標(biāo)記保證上文提到的 onCompleted() 和 onError() 二者的互斥性,即在隊(duì)列中調(diào)用了其中一個(gè),就不應(yīng)該再調(diào)用另一個(gè)。并且只要 onCompleted() 和 onError() 中有一個(gè)調(diào)用了,都會(huì)中止 onNext() 的調(diào)用。
調(diào)用 Observable 中的 OnSubscribe.call(Subscriber) 。在這里,事件發(fā)送的邏輯開始運(yùn)行。從這也可以看出,在 RxJava 中, Observable 并不是在創(chuàng)建的時(shí)候就立即開始發(fā)送事件,而是在它被訂閱的時(shí)候,即當(dāng) subscribe() 方法執(zhí)行的時(shí)候。
將傳入的 Subscriber 作為 Subscription 返回。這是為了方便 unsubscribe().
以上就是 RxJava 最基本的一個(gè)通過觀察者模式,來響應(yīng)事件的原理。下面來看看 RxJava 中一些基本操作符的實(shí)現(xiàn)原理又是怎樣的。
進(jìn)階為了能更好的理解源碼,需要對(duì) RxJava 有基本的使用基礎(chǔ),對(duì) RxJava 不太熟悉的朋友請(qǐng)先一步到《給 Android 開發(fā)者的 RxJava 詳解》
Observable.interval(1, TimeUnit.SECONDS) .map(new Func1() { @Override public Long call(Long aLong) { return aLong * 5; } }) .subscribe(new Action1 () { @Override public void call(Long aLong) { System.out.println("value: " + aLong); } });
log 信息
value: 0 value: 5 value: 10 ...
上面的列子會(huì)每秒生成一個(gè)從 0 依次遞增的整數(shù),然后通過 map() 變換操作符后,變成了 5 的倍數(shù)的一個(gè)整數(shù)列。
第一次看到該例子時(shí),就喜歡上了 RxJava,這種鏈?zhǔn)胶瘮?shù)的交互模式真的很簡(jiǎn)潔,終于可以從回調(diào)地獄里逃出來了。喜歡的同時(shí)不免也會(huì)想 RxJava 是如何實(shí)現(xiàn)的。這種鏈?zhǔn)降暮瘮?shù)流可以算是建造者模式的一種變形,只不過省去了中間 Builder 而直接返回當(dāng)前對(duì)象來實(shí)現(xiàn)。 更讓我興奮的是內(nèi)部這些操作符的實(shí)現(xiàn)原理。
上文也已經(jīng)說過了在 RxJava 中, Observable 并不是在創(chuàng)建的時(shí)候就立即開始發(fā)送事件,而是在它被訂閱的時(shí)候,即當(dāng) subscribe() 方法執(zhí)行的時(shí)候。 所以對(duì)于上面一段的代碼我們要從 subscribe() 往前屢,首先看一下 map() 這個(gè)函數(shù)的內(nèi)部實(shí)現(xiàn)。
public finalObservable map(Func1 super T, ? extends R> func) { // 新建了一個(gè) Observable 并使用新的 OnSubscribeMap 來封裝傳入的數(shù)據(jù) return unsafeCreate(new OnSubscribeMap (this, func)); }
不用說大家也猜到了 OnSubscribeMap 是 OnSubscribe 的子類
// 注意:這不是 OnSubscribeMap 的源碼,而是將源碼中與性能、兼容性、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼。 public final class OnSubscribeMapimplements OnSubscribe { final Observable source; final Func1 super T, ? extends R> transformer; public OnSubscribeMap(Observable source, Func1 super T, ? extends R> transformer) { this.source = source; // 列子中經(jīng)過 Observable.interval() 函數(shù)生成的 Observable this.transformer = transformer; } // 傳入的 o 就是例子中 `subscribe()` 出入的 Subscribe // 具體結(jié)合 Observable.subscribe() 源碼來理解 @Override public void call(final Subscriber super R> o) { // 對(duì)傳入的 Subscriber 進(jìn)行再次封裝成 MapSubscriber // 具體 Observable.map() 的邏輯是在 MapSubscriber 中 MapSubscriber parent = new MapSubscriber (o, transformer); o.add(parent); // 加入到 SubscriptionList 中,為之后取消訂閱 // Observable.interval() 返回的 Observable 進(jìn)行訂閱(關(guān)鍵點(diǎn)) source.unsafeSubscribe(parent); } ... }
可以看到 call() 方法的邏輯很簡(jiǎn)單,只是將例子中 Observable.subscribe() 傳入的 Subscriber 進(jìn)行封裝后,再將上流傳入的 Observable 進(jìn)行訂閱
// 注意:這不是 MapSubscriber 的源碼 // 而是將源碼中與性能、兼容性、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼。 static final class MapSubscriberextends Subscriber { final Subscriber super R> actual; final Func1 super T, ? extends R> mapper; public MapSubscriber(Subscriber super R> actual, Func1 super T, ? extends R> mapper) { this.actual = actual; // Observable.subscribe() 傳入的 Subscriber this.mapper = mapper; } @Override public void onNext(T t) { R result; ... result = mapper.call(t); // 數(shù)據(jù)進(jìn)行了變換 ... actual.onNext(result); // 往下流傳 } ... }
通過以上就完成了 map() 對(duì)數(shù)據(jù)的變換,這里最終的就是理解 OnSubscribeMap 的 call() 中 source.unsafeSubscribe(parent); source 指的是例子中 Observable.interval() 生成的對(duì)象。
再來看一下 RxJava 中對(duì) Observable.interval() 的實(shí)現(xiàn)
public static Observableinterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) { return unsafeCreate(new OnSubscribeTimerPeriodically(initialDelay, period, unit, scheduler)); }
可以看出 interval() 和 map() 一樣都是通過生成新的 Observable 并向 Observable 中傳入與之對(duì)應(yīng)的 OnSubscribe 的子類來完成具體操作。
// 注意:這不是 OnSubscribeTimerPeriodically 的源碼 // 而是將源碼中與性能、兼容性、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼。 public final class OnSubscribeTimerPeriodically implements OnSubscribe{ final long initialDelay; final long period; final TimeUnit unit; final Scheduler scheduler; public OnSubscribeTimerPeriodically(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) { this.initialDelay = initialDelay; this.period = period; this.unit = unit; this.scheduler = scheduler; } // 傳入的 Subscriber 為上文提到的 OnSubscribeMap.call() 方法中 source.unsafeSubscribe(parent); @Override public void call(final Subscriber super Long> child) { final Worker worker = scheduler.createWorker(); child.add(worker); worker.schedulePeriodically(new Action0() { long counter; @Override public void call() { ... child.onNext(counter++); ... } }, initialDelay, period, unit); } }
以上就是 RxJava 整體的邏輯結(jié)構(gòu),可以看到 RxJava 將觀察者模式發(fā)揮的淋漓盡致。整體邏輯的處理有點(diǎn)像遞歸函數(shù)的原理。而 map() 則像一種代理機(jī)制,通過事件攔截和處理實(shí)現(xiàn)事件序列的變換。
參考總結(jié): 精簡(jiǎn)掉細(xì)節(jié)的話,也可以這么說:在 Observable 執(zhí)行了各種操作符( map, interval 等)之后 方法之后,會(huì)返回一個(gè)新的 Observable,這個(gè)新的 Observable 會(huì)像一個(gè)代理一樣,負(fù)責(zé)接收原始的 Observable 發(fā)出的事件,并在處理后發(fā)送給 Subscriber。
給 Android 開發(fā)者的 RxJava 詳解
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/66911.html
摘要:讓你收獲滿滿碼個(gè)蛋從年月日推送第篇文章一年過去了已累積推文近篇文章,本文為年度精選,共計(jì)篇,按照類別整理便于讀者主題閱讀。本篇文章是今年的最后一篇技術(shù)文章,為了讓大家在家也能好好學(xué)習(xí),特此花了幾個(gè)小時(shí)整理了這些文章。 showImg(https://segmentfault.com/img/remote/1460000013241596); 讓你收獲滿滿! 碼個(gè)蛋從2017年02月20...
摘要:現(xiàn)在網(wǎng)上已經(jīng)有大量的源碼分析文章,各種技術(shù)的都有。你完全可以寫成下面的鏈?zhǔn)斤L(fēng)格方法會(huì)最先被執(zhí)行同樣,為了便于理解,我會(huì)借用流里面經(jīng)常用到的水流進(jìn)行類比。該子類的命名是有規(guī)律可言的。現(xiàn)在網(wǎng)上已經(jīng)有大量的源碼分析文章,各種技術(shù)的都有。但我覺得很多文章對(duì)初學(xué)者并不友好,讓人讀起來云里霧里的,比源碼還源碼。究其原因,是根本沒有從學(xué)習(xí)者的角度去分析。在自己完成了源碼閱讀之后,卻忘記了自己是如何一步步提...
閱讀 1058·2021-11-12 10:34
閱讀 985·2021-09-30 09:56
閱讀 668·2019-08-30 15:54
閱讀 2602·2019-08-30 11:14
閱讀 1465·2019-08-29 16:44
閱讀 3203·2019-08-29 16:35
閱讀 2489·2019-08-29 16:22
閱讀 2441·2019-08-29 15:39