摘要:而這個就是線程調度的關鍵前面的例子中我們通過指定了發射處理事件以及通知觀察者的一系列操作的執行線程,正是通過這個創建了我們前面提到的。總結這一章以執行流程操作符實現以及線程調度三個方面為切入點剖析了源碼。
轉載請注明出處:https://zhuanlan.zhihu.com/p/22338235
RxJava系列1(簡介)
RxJava系列2(基本概念及使用介紹)
RxJava系列3(轉換操作符)
RxJava系列4(過濾操作符)
RxJava系列5(組合操作符)
RxJava系列6(從微觀角度解讀RxJava源碼)
RxJava系列7(最佳實踐)
前言通過前面五個篇幅的介紹,相信大家對RxJava的基本使用以及操作符應該有了一定的認識。但是知其然還要知其所以然;所以從這一章開始我們聊聊源碼,分析RxJava的實現原理。本文我們主要從三個方面來分析RxJava的實現:
RxJava基本流程分析
操作符原理分析
線程調度原理分析
一、RxJava執行流程分析本章節基于RxJava1.1.9版本的源碼
在RxJava系列2(基本概念及使用介紹)中我們介紹過,一個最基本的RxJava調用是這樣的:
示例A
Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber super String> subscriber) { subscriber.onNext("Hello RxJava!"); subscriber.onCompleted(); } }).subscribe(new Subscriber () { @Override public void onCompleted() { System.out.println("completed!"); } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println(s); } });
首先調用Observable.create()創建一個被觀察者Observable,同時創建一個OnSubscribe作為create()方法的入參;接著創建一個觀察者Subscriber,然后通過subseribe()實現二者的訂閱關系。這里涉及到三個關鍵對象和一個核心的方法:
Observable(被觀察者)
OnSubscribe (從純設計模式的角度來理解,OnSubscribe.call()可以看做是觀察者模式中被觀察者用來通知觀察者的notifyObservers()方法)
Subscriber (觀察者)
subscribe() (實現觀察者與被觀察者訂閱關系的方法)
1、Observable.create()源碼分析首先我們來看看Observable.create()的實現:
public staticObservable create(OnSubscribe f) { return new Observable (RxJavaHooks.onCreate(f)); }
這里創建了一個被觀察者Observable,同時將RxJavaHooks.onCreate(f)作為構造函數的參數,源碼如下:
protected Observable(OnSubscribef) { this.onSubscribe = f; }
我們看到源碼中直接將參數RxJavaHooks.onCreate(f)賦值給了當前我們構造的被觀察者Observable的成員變量onSubscribe。那么RxJavaHooks.onCreate(f)返回的又是什么呢?我們接著往下看:
public staticObservable.OnSubscribe onCreate(Observable.OnSubscribe onSubscribe) { Func1 f = onObservableCreate; if (f != null) { return f.call(onSubscribe); } return onSubscribe; }
由于我們并沒調用RxJavaHooks.initCreate(),所以上面代碼中的onObservableCreate為null;因此RxJavaHooks.onCreate(f)最終返回的就是f,也就是我們在Observable.create()的時候new出來的OnSubscribe。(由于對RxJavaHooks的理解并不影響我們對RxJava執行流程的分析,因此在這里我們不做進一步的探討。為了方便理解我們只需要知道RxJavaHooks一系列方法的返回值就是入參本身就OK了,例如這里的RxJavaHooks.onCreate(f)返回的就是f)。
至此我們做下邏輯梳理:Observable.create()方法構造了一個被觀察者Observable對象,同時將new出來的OnSubscribe賦值給了該Observable的成員變量onSubscribe。
2、Subscriber源碼分析接著我們看下觀察者Subscriber的源碼,為了增加可讀性,我去掉了源碼中的注釋和部分代碼。
public abstract class Subscriberimplements Observer , Subscription { private final SubscriptionList subscriptions;//訂閱事件集,所有發送給當前Subscriber的事件都會保存在這里 ... protected Subscriber(Subscriber> subscriber, boolean shareSubscriptions) { this.subscriber = subscriber; this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList(); } ... @Override public final void unsubscribe() { subscriptions.unsubscribe(); } @Override public final boolean isUnsubscribed() { return subscriptions.isUnsubscribed(); } public void onStart() { } ... }
public interface Subscription { void unsubscribe(); boolean isUnsubscribed(); }
Subscriber實現了Subscription接口,從而對外提供isUnsubscribed()和unsubscribe()方法。前者用于判斷是否已經取消訂閱;后者用于將訂閱事件列表(也就是當前觀察者的成員變量subscriptions)中的所有Subscription取消訂閱,并且不再接受觀察者Observable發送的后續事件。
3、subscribe()源碼分析前面我們分析了觀察者和被觀察者相關的源碼,那么接下來便是整個訂閱流程中最最關鍵的環節了。
public final Subscription subscribe(Subscriber super T> subscriber) { return Observable.subscribe(subscriber, this); }
staticSubscription subscribe(Subscriber super T> subscriber, Observable observable) { ... subscriber.onStart(); if (!(subscriber instanceof SafeSubscriber)) { subscriber = new SafeSubscriber (subscriber); } try { RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { ... return Subscriptions.unsubscribed(); } }
subscribe()方法中將傳進來的subscriber包裝成了SafeSubscriber,SafeSubscriber其實是subscriber的一個代理,對subscriber的一系列方法做了更加嚴格的安全校驗。保證了onCompleted()和onError()只會有一個被執行且只執行一次,一旦它們其中方法被執行過后onNext()就不在執行了。
上述代碼中最關鍵的就是RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)。這里的RxJavaHooks和之前提到的一樣,RxJavaHooks.onObservableStart(observable, observable.onSubscribe)返回的正是他的第二個入參observable.onSubscribe,也就是當前observable的成員變量onSubscribe。而這個成員變量我們前面提到過,它是我們在Observable.create()的時候new出來的。所以這段代碼可以簡化為onSubscribe.call(subscriber)。這也印證了我在RxJava系列2(基本概念及使用介紹)中說的,onSubscribe.call(subscriber)中的subscriber正是我們在subscribe()方法中new出來的觀察者。
到這里,我們對RxJava的執行流程做個總結:首先我們調用crate()創建一個觀察者,同時創建一個OnSubscribe作為該方法的入參;接著調用subscribe()來訂閱我們自己創建的觀察者Subscriber。
一旦調用subscribe()方法后就會觸發執行OnSubscribe.call()。然后我們就可以在call方法調用觀察者subscriber的onNext(),onCompleted(),onError()。
最后我用張圖來總結下之前的分析結果:
二、操作符原理分析之前我們介紹過幾十個操作符,要一一分析它們的源碼顯然不太現實。在這里我拋磚引玉,選取一個相對簡單且常用的map操作符來分析。
我們先來看一個map操作符的簡單應用:
示例B
Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber super Integer> subscriber) { subscriber.onNext(1); subscriber.onCompleted(); } }).map(new Func1 () { @Override public String call(Integer integer) { return "This is " + integer; } }).subscribe(new Subscriber () { @Override public void onCompleted() { System.out.println("onCompleted!"); } @Override public void onError(Throwable e) { System.out.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
為了便于表述,我將上面的代碼做了如下拆解:
ObservableobservableA = Observable.create(new Observable.OnSubscribe () { @Override public void call(Subscriber super Integer> subscriber) { subscriber.onNext(1); subscriber.onCompleted(); } }); Subscriber subscriberOne = new Subscriber () { @Override public void onCompleted() { System.out.println("onCompleted!"); } @Override public void onError(Throwable e) { System.out.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } }; Observable observableB = observableA.map(new Func1 () { @Override public String call(Integer integer) { return "This is " + integer;; } }); observableB.subscribe(subscriberOne);
map()的源碼和上一小節介紹的create()一樣位于Observable這個類中。
public finalObservable map(Func1 super T, ? extends R> func) { return create(new OnSubscribeMap (this, func)); }
通過查看源碼我們發現調用map()的時候實際上是創建了一個新的被觀察者Observable,我們姑且稱它為ObservableB;一開始通過Observable.create()創建的Observable我們稱之為ObservableA。在創建ObservableB的時候同時創建了一個OnSubscribeMap,而ObservableA和變換函數Func1則作為構造OnSubscribeMap的參數。
public final class OnSubscribeMapimplements OnSubscribe { final Observable source;//ObservableA final Func1 super T, ? extends R> transformer;//map操作符中的轉換函數Func1。T為轉換前的數據類型,在上面的例子中為Integer;R為轉換后的數據類型,在該例中為String。 public OnSubscribeMap(Observable source, Func1 super T, ? extends R> transformer) { this.source = source; this.transformer = transformer; } @Override public void call(final Subscriber super R> o) {//結合第一小節的分析結果,我們知道這里的入參o其實就是我們自己new的觀察者subscriberOne。 MapSubscriber parent = new MapSubscriber (o, transformer); o.add(parent); source.unsafeSubscribe(parent); } static final class MapSubscriber extends Subscriber { final Subscriber super R> actual;//這里的actual就是我們在調用subscribe()時創建的觀察者mSubscriber final Func1 super T, ? extends R> mapper;//變換函數 boolean done; public MapSubscriber(Subscriber super R> actual, Func1 super T, ? extends R> mapper) { this.actual = actual; this.mapper = mapper; } @Override public void onNext(T t) { R result; try { result = mapper.call(t); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); unsubscribe(); onError(OnErrorThrowable.addValueAsLastCause(ex, t)); return; } actual.onNext(result); } @Override public void onError(Throwable e) { ... actual.onError(e); } @Override public void onCompleted() { ... actual.onCompleted(); } @Override public void setProducer(Producer p) { actual.setProducer(p); } } }
OnSubscribeMap實現了OnSubscribe接口,因此OnSubscribeMap就是一個OnSubscribe。在調用map()的時候創建了一個新的被觀察者ObservableB,然后我們用ObservableB.subscribe(subscriberOne)訂閱了觀察者subscriberOne。結合我們在第一小節的分析結果,所以OnSubscribeMap.call(o)中的o就是subscribe(subscriberOne)中的subscriberOne;一旦調用了ObservableB.subscribe(subscriberOne)就會執行OnSubscribeMap.call()。
在call()方法中,首先通過我們的觀察者o和轉換函數transformer構造了一個MapSubscriber,最后調用了source也就是observableA的unsafeSubscribe()方法。即observableA訂閱了一個觀察者MapSubscriber。
public final Subscription unsafeSubscribe(Subscriber super T> subscriber) { try { ... RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { ... return Subscriptions.unsubscribed(); } }
上面這段代碼最終執行了onSubscribe也就是OnSubscribeMap的call()方法,call()方法中的參數就是之前在OnSubscribeMap.call()中new出來的MapSubscriber。最后在call()方法中執行了我們自己的業務代碼:
subscriber.onNext(1); subscriber.onCompleted();
其實也就是執行了MapSubscriber的onNext()和onCompleted()。
@Override public void onNext(T t) { R result; try { result = mapper.call(t); } catch (Throwable ex) { ... return; } actual.onNext(result); }
onNext(T t)方法中的的mapper就是變換函數,actual就是我們在調用subscribe()時創建的觀察者subscriberOne。這個T就是我們例子中的Integer,R就是String。在onNext()中首先調用變換函數mapper.call()將T轉換成R(在我們的例子中就是將Integer類型的1轉換成了String類型的“This is 1”);接著調用subscriberOne.onNext(String result)。同樣在調用MapSubscriber.onCompleted()時會執行subscriberOne.onCompleted()。這樣就完成了一直完成的調用流程。
我承認太啰嗦了,花費了這么大的篇幅才將map()的轉換原理解釋清楚。我也是希望盡量的將每個細節都呈現出來方便大家理解,如果看我啰嗦了這么久還是沒能理解,請看下面我畫的這張執行流程圖。
三、線程調度原理分析在前面的文章中我介紹過RxJava可以很方便的通過subscribeOn()和observeOn()來指定數據流的每一部分運行在哪個線程。其中subscribeOn()指定了處理Observable的全部的過程(包括發射數據和通知)的線程;observeOn()指定了觀察者的onNext(), onError()和onCompleted()執行的線程。接下來我們就分析分析源碼,看看線程調度是如何實現的。
在分析源碼前我們先看看一段常見的通過RxJava實現的線程調度代碼:
示例C
Observable.create(new Observable.OnSubscribe1、subscribeOn()源碼分析() { @Override public void call(Subscriber super String> subscriber) { subscriber.onNext("Hello RxJava!"); subscriber.onCompleted(); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber () { @Override public void onCompleted() { System.out.println("completed!"); } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println(s); } });
public final ObservablesubscribeOn(Scheduler scheduler) { ... return create(new OperatorSubscribeOn (this, scheduler)); }
通過上面的代碼我們可以看到,subscribeOn()和map()一樣是創建了一個新的被觀察者Observable。因此我大致就能猜到subscribeOn()的執行流程應該和map()差不多,OperatorSubscribeOn肯定也是一個OnSubscribe。那我們接下來就看看OperatorSubscribeOn的源碼:
public final class OperatorSubscribeOnimplements OnSubscribe { final Scheduler scheduler;//線程調度器,用來指定訂閱事件發送、處理等所在的線程 final Observable source; public OperatorSubscribeOn(Observable source, Scheduler scheduler) { this.scheduler = scheduler; this.source = source; } @Override public void call(final Subscriber super T> subscriber) { final Worker inner = scheduler.createWorker(); subscriber.add(inner); inner.schedule(new Action0() { @Override public void call() { final Thread t = Thread.currentThread(); Subscriber s = new Subscriber (subscriber) { @Override public void onNext(T t) { subscriber.onNext(t); } @Override public void onError(Throwable e) { try { subscriber.onError(e); } finally { inner.unsubscribe(); } } @Override public void onCompleted() { try { subscriber.onCompleted(); } finally { inner.unsubscribe(); } } @Override public void setProducer(final Producer p) { subscriber.setProducer(new Producer() { @Override public void request(final long n) { if (t == Thread.currentThread()) { p.request(n); } else { inner.schedule(new Action0() { @Override public void call() { p.request(n); } }); } } }); } }; source.unsafeSubscribe(s); } }); } }
OperatorSubscribeOn實現了OnSubscribe接口,call()中對Subscriber的處理也和OperatorMap對Subscriber的處理類似。首先通過scheduler構建了一個Worker;然后用傳進來的subscriber構造了一個新的Subscriber s,并將s丟到Worker.schedule()中來處理;最后用原Observable去訂閱觀察者s。而這個Worker就是線程調度的關鍵!前面的例子中我們通過subscribeOn(Schedulers.io())指定了Observable發射處理事件以及通知觀察者的一系列操作的執行線程,正是通過這個Schedulers.io()創建了我們前面提到的Worker。所以我們來看看Schedulers.io()的實現。
首先通過Schedulers.io()獲得了ioScheduler并返回,上面的OperatorSubscribeOn通過這個的Scheduler的createWorker()方法創建了我們前面提到的Worker。
public static Scheduler io() { return RxJavaHooks.onIOScheduler(getInstance().ioScheduler); }
接著我們看看這個ioScheduler是怎么來的,下面的代碼向我們展現了是如何在Schedulers的構造函數中通過RxJavaSchedulersHook.createIoScheduler()來初始化ioScheduler的。
private Schedulers() { ... Scheduler io = hook.getIOScheduler(); if (io != null) { ioScheduler = io; } else { ioScheduler = RxJavaSchedulersHook.createIoScheduler(); } ... }
最終RxJavaSchedulersHook.createIoScheduler()返回了一個CachedThreadScheduler,并賦值給了ioScheduler。
public static Scheduler createIoScheduler() { return createIoScheduler(new RxThreadFactory("RxIoScheduler-")); }
public static Scheduler createIoScheduler(ThreadFactory threadFactory) { ... return new CachedThreadScheduler(threadFactory); }
到這一步既然我們知道了ioScheduler就是一個CachedThreadScheduler,那我們就來看看它的createWorker()的實現。
public Worker createWorker() { return new EventLoopWorker(pool.get()); }
上面的代碼向我們赤裸裸的呈現了前面OperatorSubscribeOn中的Worker其實就是EventLoopWorker。我們重點要關注的是他的scheduleActual()。
static final class EventLoopWorker extends Scheduler.Worker implements Action0 { private final CompositeSubscription innerSubscription = new CompositeSubscription(); private final CachedWorkerPool pool; private final ThreadWorker threadWorker; final AtomicBoolean once; EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.once = new AtomicBoolean(); this.threadWorker = pool.get(); } ... @Override public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { ... ScheduledAction s = threadWorker.scheduleActual(new Action0() { @Override public void call() { if (isUnsubscribed()) { return; } action.call(); } }, delayTime, unit); innerSubscription.add(s); s.addParent(innerSubscription); return s; } }
通過對源碼的一步步追蹤,我們知道了前面OperatorSubscribeOn.call()中的inner.schedule()最終會執行到ThreadWorker的scheduleActual()方法。
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) { Action0 decoratedAction = RxJavaHooks.onScheduledAction(action); ScheduledAction run = new ScheduledAction(decoratedAction); Future> f; if (delayTime <= 0) { f = executor.submit(run); } else { f = executor.schedule(run, delayTime, unit); } run.add(f); return run; }
scheduleActual()中的ScheduledAction實現了Runnable接口,通過線程池executor最終實現了線程切換。上面便是subscribeOn(Schedulers.io())實現線程切換的全部過程。
2、observeOn()源碼分析observeOn()切換線程是通過lift來實現的,相比subscribeOn()在實現原理上相對復雜些。不過本質上最終還是創建了一個新的Observable。
public final ObservableobserveOn(Scheduler scheduler, boolean delayError, int bufferSize) { ... return lift(new OperatorObserveOn (scheduler, delayError, bufferSize)); } public final Observable lift(final Operator extends R, ? super T> operator) { return create(new OnSubscribeLift (onSubscribe, operator)); }
OperatorObserveOn作為OnSubscribeLift構造函數的參數用來創建了一個新的OnSubscribeLift對象,接下來我們看看OnSubscribeLift的實現:
public final class OnSubscribeLiftimplements OnSubscribe { final OnSubscribe parent; final Operator extends R, ? super T> operator; public OnSubscribeLift(OnSubscribe parent, Operator extends R, ? super T> operator) { this.parent = parent; this.operator = operator; } @Override public void call(Subscriber super R> o) { try { Subscriber super T> st = RxJavaHooks.onObservableLift(operator).call(o); try { st.onStart(); parent.call(st); } catch (Throwable e) { Exceptions.throwIfFatal(e); st.onError(e); } } catch (Throwable e) { Exceptions.throwIfFatal(e); o.onError(e); } } }
OnSubscribeLift繼承自OnSubscribe,通過前面的分析我們知道一旦調用了subscribe()將觀察者與被觀察綁定后就會觸發被觀察者所對應的OnSubscribe的call()方法,所以這里會觸發OnSubscribeLift.call()。在call()中調用了OperatorObserveOn.call()并返回了一個新的觀察者Subscriber st,接著調用了前一級Observable對應OnSubscriber.call(st)。
我們再看看OperatorObserveOn.call()的實現:
public Subscriber super T> call(Subscriber super T> child) { ... ObserveOnSubscriberparent = new ObserveOnSubscriber (scheduler, child, delayError, bufferSize); parent.init(); return parent; }
OperatorObserveOn.call()中創建了一個ObserveOnSubscriber并調用init()進行了初始化。
static final class ObserveOnSubscriberextends Subscriber implements Action0 { ... @Override public void onNext(final T t) { ... schedule(); } @Override public void onCompleted() { ... schedule(); } @Override public void onError(final Throwable e) { ... schedule(); } protected void schedule() { if (counter.getAndIncrement() == 0) { recursiveScheduler.schedule(this); } } @Override public void call() { long missed = 1L; long currentEmission = emitted; final Queue
ObserveOnSubscriber繼承自Subscriber,并實現了Action0接口。我們看到ObserveOnSubscriber的onNext()、onCompleted()、onError()都有個schedule(),這個方法就是我們線程調度的關鍵;通過schedule()將新觀察者ObserveOnSubscriber發送給subscriberOne的所有事件都切換到了recursiveScheduler所對應的線程,簡單的說就是把subscriberOne的onNext()、onCompleted()、onError()方法丟到了recursiveScheduler對應的線程中來執行。
那么schedule()又是如何做到這一點的呢?他內部調用了recursiveScheduler.schedule(this),recursiveScheduler其實就是一個Worker,和我們在介紹subscribeOn()時提到的worker一樣,執行schedule()實際上最終是創建了一個runable,然后把這個runnable丟到了特定的線程池中去執行。在runnable的run()方法中調用了ObserveOnSubscriber.call(),看上面的代碼大家就會發現在call()方法中最終調用了subscriberOne的onNext()、onCompleted()、onError()方法。這便是它實現線程切換的原理。
好了,我們最后再看看示例C對應的執行流程圖,幫助大家加深理解。
總結這一章以執行流程、操作符實現以及線程調度三個方面為切入點剖析了RxJava源碼。下一章將站在更宏觀的角度來分析整個RxJava的框架結構、設計思想等等。敬請期待~~ :)
如果大家喜歡這一系列的文章,歡迎關注我的知乎專欄和GitHub。
知乎專欄:https://zhuanlan.zhihu.com/baron
GitHub:https://github.com/BaronZ88
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/66533.html
摘要:之前寫過一系列的文章,也承諾過會盡快有的介紹。所以這次還是給大家分享一個使用解決問題的案例,希望對大家在使用的時候有一點點啟發。上述這一套復雜的業務邏輯如果使用傳統編碼方式將是極其復雜的。 之前寫過一系列RxJava1的文章,也承諾過會盡快有RxJava2的介紹。無奈實際項目中還未真正的使用RxJava2,不敢妄動筆墨。所以這次還是給大家分享一個使用RxJava1解決問題的案例,希望對...
摘要:按照計劃這一期是要介紹框架結構和設計思想的,但是考慮到將在十月底發布正式版因此決定將框架結構和設計思想分析放到正式版發布后再做。后續我也會有一系列的文章來介紹和的區別。首選我們需要調用系統來獲取所有已安裝的,所以在的方法中調用。 轉載請注明出處:[https://zhuanlan.zhihu.com/p/... RxJava系列1(簡介) RxJava系列2(基本概念及使用介紹) R...
閱讀 2184·2020-06-12 14:26
閱讀 2487·2019-08-29 16:41
閱讀 1889·2019-08-29 15:28
閱讀 2455·2019-08-26 13:43
閱讀 757·2019-08-26 13:37
閱讀 2777·2019-08-23 18:13
閱讀 2799·2019-08-23 15:31
閱讀 1018·2019-08-23 14:10