国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

RxJava2.x源碼解析(一):訂閱流程

harryhappy / 671人閱讀

摘要:現(xiàn)在網(wǎng)上已經(jīng)有大量的源碼分析文章,各種技術的都有。你完全可以寫成下面的鏈式風格方法會最先被執(zhí)行同樣,為了便于理解,我會借用流里面經(jīng)常用到的水流進行類比。該子類的命名是有規(guī)律可言的。

現(xiàn)在網(wǎng)上已經(jīng)有大量的源碼分析文章,各種技術的都有。但我覺得很多文章對初學者并不友好,讓人讀起來云里霧里的,比源碼還源碼。究其原因,是根本沒有從學習者的角度去分析。在自己完成了源碼閱讀之后,卻忘記了自己是如何一步步提出問題,進而走到這里的。

所以,我想在本篇及以后的文章中,花更多的精力去進行源碼的分析,爭取用淺顯易懂的語言,用適合的邏輯去組織內容。這樣不至于陷入源碼里,導致文章難懂。盡量讓更多的人愿意去讀源碼。

閱讀本文,你需要對 RxJava2 的一些基本使用有所了解,不過不用太深。這里推薦下Season_zlc的給初學者的RxJava2.0教程(一) ,比較淺顯易懂。

提到 RxJava,你第一個想到的詞是什么?

“異步”。

RxJava 在 GitHub 上的官網(wǎng)主頁也說了,“RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.”(RxJava是一個使用可觀測序列來組建異步、基于事件的程序的庫,它是 Reactive Extensions 在Java虛擬機上的一個實現(xiàn))。它的優(yōu)點嘛,用扔物線凱哥的話講,就是“簡潔”,并且“隨著程序邏輯變得越來越復雜,它依然能夠保持簡潔”。

這里要注意一點,雖然對大多數(shù)人來講,更多的是使用 RxJava 來配合 Retrofit、OkHttp 進行網(wǎng)絡請求框架的封裝及數(shù)據(jù)的異步處理,但是,RxJava和網(wǎng)絡請求本質上沒有半毛錢的關系。它的本質,官網(wǎng)已經(jīng)說的很明白了,就是“異步”。

RxJava 基于觀察者模式實現(xiàn),基于事件流進行鏈式調用。

首先,我們需要添加必要的依賴,這里以最新的2.2.8版本為例:

????implementation?"io.reactivex.rxjava2:rxjava:2.2.8"

當然,對于 Android 項目來講,我們一般還需要添加一個補充庫:

????implementation?"io.reactivex.rxjava2:rxandroid:2.1.0"

這個庫其實就是提供了 Android 相關的主線程的支持。

然后寫個簡單的代碼,就可以開始我們的源碼分析啦。

????????//?上游?observable
????????Observable?observable?=?Observable.create(new?ObservableOnSubscribe()?{
????????????@Override
????????????public?void?subscribe(ObservableEmitter?emitter)?throws?Exception?{
????????????????Log.d(TAG,?"subscribe:?");
????????????????emitter.onNext(1);
????????????????emitter.onNext(2);
????????????????emitter.onComplete();
????????????}
????????});

????????//?下游?observer
????????Observer?observer?=?new?Observer()?{
????????????@Override
????????????public?void?onSubscribe(Disposable?d)?{
????????????????//?onSubscribe?方法會最先被執(zhí)行
????????????????Log.d(TAG,?"onSubscribe:?");
????????????}

????????????@Override
????????????public?void?onNext(Integer?integer)?{
????????????????Log.d(TAG,?"onNext:?");
????????????}

????????????@Override
????????????public?void?onError(Throwable?e)?{
????????????????Log.d(TAG,?"onError:?");
????????????}

????????????@Override
????????????public?void?onComplete()?{
????????????????Log.d(TAG,?"onComplete:?");
????????????}
????????};

????????//?將上游和下游進行關聯(lián)
????????observable.subscribe(observer);

為便于理解,我故意將可以鏈式調用的代碼,拆成了三部分。你完全可以寫成下面的鏈式風格:

?Observable.create(new?ObservableOnSubscribe()?{
????????????@Override
????????????public?void?subscribe(ObservableEmitter?emitter)?throws?Exception?{
????????????????Log.d(TAG,?"subscribe:?");
????????????????emitter.onNext(1);
????????????????emitter.onNext(2);
????????????????emitter.onComplete();
????????????}
????????}).subscribe(new?Observer()?{
????????????@Override
????????????public?void?onSubscribe(Disposable?d)?{
????????????????//?onSubscribe?方法會最先被執(zhí)行
????????????????Log.d(TAG,?"onSubscribe:?");
????????????}

????????????@Override
????????????public?void?onNext(Integer?integer)?{
????????????????Log.d(TAG,?"onNext:?");
????????????}

????????????@Override
????????????public?void?onError(Throwable?e)?{
????????????????Log.d(TAG,?"onError:?");
????????????}

????????????@Override
????????????public?void?onComplete()?{
????????????????Log.d(TAG,?"onComplete:?");
????????????}
????????});

同樣,為了便于理解,我會借用i/o流里面經(jīng)常用到的水流進行類比。將被觀察者 observable 稱為上游(upstream),將觀察者 observer 稱為下游(downstream)。讀源碼其實也能看出,作者本身也正是這么類比的。

通過將整個過程拆分成三個步驟,能更清晰的理清邏輯。我們需要做的,本質上就是創(chuàng)建一個上游和一個下游,最終通過上游對象的subscribe方法將二者關聯(lián)起來:

  1. 創(chuàng)建一個 Observable 的實現(xiàn)類

  2. 創(chuàng)建一個 Observer 的實現(xiàn)類

  3. 將二者通過 Observable 的 subscribe(…) 方法將二者進行關聯(lián)

明白了這三點,以后我們就不會被各種實現(xiàn)類搞的眼花繚亂。

這三個步驟,里面的核心是第三部,也就是訂閱過程,畢竟,這屬于一個動作,而我們進行源碼分析的時候,往往就是從動作開始的。這時候,我們Ctrl/Command + 鼠標左鍵,進入該方法看看,里面做了下什么。

????public?final?void?subscribe(Observer<");super?T>?observer)?{
????????ObjectHelper.requireNonNull(observer,?"observer?is?null");
????????try?{
????????????//?RxJavaPlugins是個鉤子函數(shù),用來在代碼的執(zhí)行前后插入進行一些操作
????????????observer?=?RxJavaPlugins.onSubscribe(this,?observer);

????????????ObjectHelper.requireNonNull(observer,?"The?RxJavaPlugins.onSubscribe?hook?returned?a?null?Observer.?Please?change?the?handler?provided?to?RxJavaPlugins.setOnObservableSubscribe?for?invalid?null?returns.?Further?reading:?https://github.com/ReactiveX/RxJava/wiki/Plugins");
????????????//?關鍵點是這行代碼
????????????subscribeActual(observer);
????????}?catch?(NullPointerException?e)?{?//?NOPMD
????????????throw?e;
????????}?catch?(Throwable?e)?{
????????????Exceptions.throwIfFatal(e);
????????????//?can"t?call?onError?because?no?way?to?know?if?a?Disposable?has?been?set?or?not
????????????//?can"t?call?onSubscribe?because?the?call?might?have?set?a?Subscription?already
????????????RxJavaPlugins.onError(e);

????????????NullPointerException?npe?=?new?NullPointerException("Actually?not,?but?can"t?throw?other?exceptions?due?to?RS");
????????????npe.initCause(e);
????????????throw?npe;
????????}
????}

這里將this(上游Observable類型)的和下游observer作為參數(shù)傳給了 RxJavaPlugins 的 onSubscribe(…)方法,并返回一個Observer,同時,將原來的observer指向這個返回值,那么我們看看這個函數(shù)中到底進行了什么操作:

????//??RxJavaPlugins.java
????public?static??Observer<");super?T>?onSubscribe(@NonNull?Observable?source,?@NonNull?Observer<");super?T>?observer)?{
????????BiFunction<");super?Observable,?");super?Observer,?");????????if?(f?!=?null)?{
????????????return?apply(f,?source,?observer);
????????}
????????return?observer;
????}

這里判斷onObservableSubscribe是否為 null,不為 null 則調用其 apply(…) 方法。若為 null ,則直接返回原來的observer。而該變量需要通過RxJavaPlugin的setOnSingleSubscribe(...)方法來指定的,顯然,我們并沒有指定,所以忽略不管(后面遇到類似問題,基本也都可以忽略)。

回到之前的訂閱流程,就可以簡化為下面這樣:

????public?final?void?subscribe(Observer<");super?T>?observer)?{
????????ObjectHelper.requireNonNull(observer,?"observer?is?null");
????????try?{
????????????...
????????????//?調用到具體實現(xiàn)子類的?subscribeActual(observer)?方法
????????????subscribeActual(observer);
????????}?catch?(
????????????...
????????}
????}

從上面代碼可以看出,訂閱過程,即調用Observable的subscribe(...)的過程,其實就是直接調用了其實現(xiàn)類的subscribeActual(observer)方法(該方法在 Observable 中是個抽象方法)。以后我們遇到這個方法,就直接去 Observable 的實現(xiàn)類中找即可,就不會亂了。

一些熟悉RxJava的朋友可能會說,有時候我們通過subscribe(...)訂閱的并不是Observer對象,而是consumer對象,有各種重載。如下:

當你傳入的是Consumer的時候,不管你傳遞了幾個參數(shù),最終都會代用到以下方法,那些你沒傳遞的 onError或者 onComplete 回調等等,會自動使用默認創(chuàng)建的值。

????public?final?Disposable?subscribe(Consumer<");super?T>?onNext,?Consumer<");super?Throwable>?onError,
????????????Action?onComplete,?Consumer<");super?Disposable>?onSubscribe)?{
????????ObjectHelper.requireNonNull(onNext,?"onNext?is?null");
????????ObjectHelper.requireNonNull(onError,?"onError?is?null");
????????ObjectHelper.requireNonNull(onComplete,?"onComplete?is?null");
????????ObjectHelper.requireNonNull(onSubscribe,?"onSubscribe?is?null");

????????//?最終都會封裝成一個?LambdaObserver,并作為參數(shù)傳入subscribe(...)方法中
????????LambdaObserver?ls?=?new?LambdaObserver(onNext,?onError,?onComplete,?onSubscribe);

????????subscribe(ls);

????????return?ls;
????}

可以看出,這里最終還是將這些 Consumer 對象包裝在了一個 LambdaObserver 類型的變量中,然后又調用了subscribe(...)方法,將其作為變量傳入,之后的分析,就跟上面是一樣的了。

訂閱方法講完了,我們也知道最終調用到了 Observable 的實現(xiàn)類的subscribeActual(...)方法。那接下來肯定就是要弄懂在這個方中做了什么事。我們例子中是使用Observable.create(...)方法創(chuàng)建的 observable:

????????//?上游?observable
????????Observable?observable?=?Observable.create(new?ObservableOnSubscribe()?{
????????????@Override
????????????public?void?subscribe(ObservableEmitter?emitter)?throws?Exception?{
????????????????Log.d(TAG,?"subscribe:?");
????????????????emitter.onNext(1);
????????????????emitter.onNext(2);
????????????????emitter.onComplete();
????????????}
????????});

其中,Observable.create(...)方法的實現(xiàn)是這樣的:

????public?static?<T>?Observable<T>?create(ObservableOnSubscribe<T>?source)?{
????????ObjectHelper.requireNonNull(source,?"source?is?null");
????????return?RxJavaPlugins.onAssembly(new?ObservableCreate<T>(source));
????}

我們傳進去了一個實現(xiàn)了ObservableOnSubscribe接口的匿名內部類,該接口類也很簡單,就定義了一個void subscribe(@NonNull ObservableEmitter emitter) throws Exception抽象方法。

然后我們將傳進來的source(剛剛提到的匿名內部類ObservableOnSubscribe)封裝進一個ObservableCreate對象中,又傳進了RxJavaPlugins.onAssembly(...)中,這個RxJavaPlugins類剛才我們說過,其實就是一個hook類,暫時直接忽略,一般就是直接把傳進來的參數(shù)返回了(不放心的話可以自己點進去,以后遇到該方法不再贅述)。

也就是說Observable.create(...)方法最終創(chuàng)建了一個ObservableCreate對象。注意,該對象是Observable抽象類的具體實現(xiàn)類。

特別注意!
特別注意!
特別注意!

重要事情說三遍。我們這里通過create(...)方法創(chuàng)建的Observable的具體實現(xiàn)子類是ObservableCreate。該子類的命名是有規(guī)律可言的。我在分析源碼的時候有時候就想,這么多看起來名字都一樣的類,RxJava的開發(fā)者本人不會懵逼嗎?作為一個用戶量這么大的庫,肯定各種都有講究,肯定有貴了。嗯。規(guī)律就是生成的子類的命名方法為“Observable+創(chuàng)建該類的方法名”,即:在創(chuàng)建該類的方法名稱前面加上個Observable,以此來作為新的類

文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/7367.html

相關文章

  • 「碼個蛋」2017年200篇精選干貨集合

    摘要:讓你收獲滿滿碼個蛋從年月日推送第篇文章一年過去了已累積推文近篇文章,本文為年度精選,共計篇,按照類別整理便于讀者主題閱讀。本篇文章是今年的最后一篇技術文章,為了讓大家在家也能好好學習,特此花了幾個小時整理了這些文章。 showImg(https://segmentfault.com/img/remote/1460000013241596); 讓你收獲滿滿! 碼個蛋從2017年02月20...

    wangtdgoodluck 評論0 收藏0
  • Rxjava2.x 源碼解析(二): 線程切換

    摘要:這個上游是個相對概念,上游之上,還有上游,所以就不斷回溯,最終調用到最開始指定的那個線程。雖然表面上看,確實是第一個指定的有效,但是千萬別被欺騙了。文章較長,可以耐心點,反復看看。 這個上游是個相對概念,上游之上,還有上游,所以就不斷回溯,最終調用到最開始指定的那個線程。 雖然表面上看,確實是第一個指定的有效,但是千萬別被欺騙了。 好...

    lowett 評論0 收藏0
  • Rxjava2.x 源碼解析(二): 線程切換

    摘要:這個上游是個相對概念,上游之上,還有上游,所以就不斷回溯,最終調用到最開始指定的那個線程。雖然表面上看,確實是第一個指定的有效,但是千萬別被欺騙了。文章較長,可以耐心點,反復看看。 這個上游是個相對概念,上游之上,還有上游,所以就不斷回溯,最終調用到最開始指定的那個線程。 雖然表面上看,確實是第一個指定的有效,但是千萬別被欺騙了。 好...

    LinkedME2016 評論0 收藏0

發(fā)表評論

0條評論

harryhappy

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<