摘要:作用默認的,直接在當前線程運行總是開啟一個新線程用于密集型任務,如異步阻塞操作,這個調度器的線程池會根據需要增長對于普通的計算任務,請使用默認是一個,很像一個有線程緩存的新線程調度器計算所使用的。這個使用的固定的線程池,大小為核數。
轉載請注明出處:https://zhuanlan.zhihu.com/p/20687307
RxJava系列1(簡介)
RxJava系列2(基本概念及使用介紹)
RxJava系列3(轉換操作符)
RxJava系列4(過濾操作符)
RxJava系列5(組合操作符)
RxJava系列6(從微觀角度解讀RxJava源碼)
RxJava系列7(最佳實踐)
前言上一篇的示例代碼中大家一定發現了Observable這個類。從純Java的觀點看,Observable類源自于經典的觀察者模式。RxJava的異步實現正是基于觀察者模式來實現的,而且是一種擴展的觀察者模式。
觀察者模式觀察者模式基于Subject這個概念,Subject是一種特殊對象,又叫做主題或者被觀察者。當它改變時那些由它保存的一系列對象將會得到通知,而這一系列對象被稱作Observer(觀察者)。它們會對外暴漏了一個通知方法(比方說update之類的),當Subject狀態發生變化時會調用的這個方法。
觀察者模式很適合下面這些場景中的任何一個:
當你的架構有兩個實體類,一個依賴另一個,你想讓它們互不影響或者是獨立復用它們時。
當一個變化的對象通知那些與它自身變化相關聯的未知數量的對象時。
當一個變化的對象通知那些無需推斷具體類型的對象時。
通常一個觀察者模式的類圖是這樣的:
如果你對觀察者模式不是很了解,那么強烈建議你先去學習下。關于觀察者模式的詳細介紹可以參考我之前的文章:設計模式之觀察者模式
擴展的觀察者模式在RxJava中主要有4個角色:
Observable
Subject
Observer
Subscriber
Observable和Subject是兩個“生產”實體,Observer和Subscriber是兩個“消費”實體。說直白點Observable對應于觀察者模式中的被觀察者,而Observer和Subscriber對應于觀察者模式中的觀察者。Subscriber其實是一個實現了Observer的抽象類,后面我們分析源碼的時候也會介紹到。Subject比較復雜,以后再分析。
上一篇文章中我們說到RxJava中有個關鍵概念:事件。觀察者Observer和被觀察者Observable通過subscribe()方法實現訂閱關系。從而Observable 可以在需要的時候發出事件來通知Observer。
RxJava如何使用我自己在學習一種新技術的時候通常喜歡先去了解它是怎么用的,掌握了使用方法后再去深挖其原理。那么我們現在就來說說RxJava到底該怎么用。
第一步:創建觀察者Observer
Observer
這么簡單,一個觀察者Observer創建了!
大兄弟你等等...,你之前那篇觀察者模式中不是說觀察者只提供一個update方法的嗎?這特么怎么有三個?!!
少年勿急,且聽我慢慢道來。在普通的觀察者模式中觀察者一般只會提供一個update()方法用于被觀察者的狀態發生變化時,用于提供給被觀察者調用。而在RxJava中的觀察者Observer提供了:onNext()、 onCompleted()和onError()三個方法。還記得嗎?開篇我們講過RxJava是基于一種擴展的觀察這模式實現,這里多出的onCompleted和onError正是對觀察者模式的擴展。ps:onNext就相當于普通觀察者模式中的update
RxJava中添加了普通觀察者模式缺失的三個功能:
RxJava中規定當不再有新的事件發出時,可以調用onCompleted()方法作為標示;
當事件處理出現異常時框架自動觸發onError()方法;
同時Observables支持鏈式調用,從而避免了回調嵌套的問題。
第二步:創建被觀察者Observable
Observable.create()方法可以創建一個Observable,使用crate()創建Observable需要一個OnSubscribe對象,這個對象繼承Action1。當觀察者訂閱我們的Observable時,它作為一個參數傳入并執行call()函數。
Observableobservable = Observable.create(new Observable.OnSubscribe () { @Override public void call(Subscriber super Object> subscriber) { } });
除了create(),just()和from()同樣可以創建Observable。看看下面兩個例子:
just(T...)將傳入的參數依次發送
Observable observable = Observable.just("One", "Two", "Three"); //上面這行代碼會依次調用 //onNext("One"); //onNext("Two"); //onNext("Three"); //onCompleted();
from(T[])/from(Iterable extends T>)將傳入的數組或者Iterable拆分成Java對象依次發送
String[] parameters = {"One", "Two", "Three"}; Observable observable = Observable.from(parameters); //上面這行代碼會依次調用 //onNext("One"); //onNext("Two"); //onNext("Three"); //onCompleted();
第三步:被觀察者Observable訂閱觀察者Observer(ps:你沒看錯,不同于普通的觀察者模式,這里是被觀察者訂閱觀察者)
有了觀察者和被觀察者,我們就可以通過subscribe()來實現二者的訂閱關系了。
observable.subscribe(observer);
連在一起寫就是這樣:
Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber super Integer> subscriber) { for (int i = 0; i < 5; i++) { subscriber.onNext(i); } subscriber.onCompleted(); } }).subscribe(new Observer () { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { System.out.println("onError"); } @Override public void onNext(Integer item) { System.out.println("Item is " + item); } });
至此一個完整的RxJava調用就完成了。
兄臺,你叨逼叨叨逼叨的說了一大堆,可是我沒搞定你特么到底在干啥啊?!!不急,我現在就來告訴你們到底發生了什么。
首先我們使用Observable.create()創建了一個新的Observable
Item is 0 Item is 1 Item is 2 Item is 3 Item is 4 onCompleted
看到這里可能你又要說了,大兄弟你別唬我啊!OnSubscribe的call()方法中的參數Subscriber怎么就變成了subscribe()方法中的觀察者Observer?!!!這倆兒貨明明看起來就是兩個不同的類啊。
我們先看看Subscriber這個類:
public abstract class Subscriberimplements Observer , Subscription { ... }
從源碼中我們可以看到,Subscriber是Observer的一個抽象實現類,所以我首先可以肯定的是Subscriber和Observer類型是一致的。接著往下我們看看subscribe()這個方法:
public final Subscription subscribe(final Observer super T> observer) { //這里的if判斷對于我們要分享的問題沒有關聯,可以先無視 if (observer instanceof Subscriber) { return subscribe((Subscriber super T>)observer); } return subscribe(new Subscriber() { @Override public void onCompleted() { observer.onCompleted(); } @Override public void onError(Throwable e) { observer.onError(e); } @Override public void onNext(T t) { observer.onNext(t); } }); }
我們看到subscribe()方法內部首先將傳進來的Observer做了一層代理,將它轉換成了Subscriber。我們再看看這個方法內部的subscribe()方法:
public final Subscription subscribe(Subscriber super T> subscriber) { return Observable.subscribe(subscriber, this); }
進一步往下追蹤看看return后面這段代碼到底做了什么。精簡掉其他無關代碼后的subscribe(subscriber, this)方法是這樣的:
private staticSubscription subscribe(Subscriber super T> subscriber, Observable observable) { subscriber.onStart(); try { hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { return Subscriptions.unsubscribed(); } }
我們重點看看hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber),前面這個hook.onSubscribeStart(observable, observable.onSubscribe)返回的是它自己括號內的第二個參數observable.onSubscribe,然后調用了它的call方法。而這個observable.onSubscribe正是create()方法中的Subscriber,這樣整個流程就理順了。看到這里是不是對RxJava的執行流程清晰了一點呢?這里也建議大家在學習新技術的時候多去翻一翻源碼,知其然還要能知其所以然不是嗎。
異步subscribe()的參數除了可以是Observer和Subscriber以外還可以是Action1、Action0;這是一種更簡單的回調,只有一個call(T)方法;由于太簡單這里就不做詳細介紹了!
上一篇文章中開篇就講到RxJava就是來處理異步任務的。但是默認情況下我們在哪個線程調用subscribe()就在哪個線程生產事件,在哪個線程生產事件就在哪個線程消費事件。那怎么做到異步呢?RxJava為我們提供Scheduler用來做線程調度,我們來看看RxJava提供了哪些Scheduler。
Schedulers | 作用 |
---|---|
Schedulers.immediate() | 默認的Scheduler,直接在當前線程運行 |
Schedulers.newThread() | 總是開啟一個新線程 |
Schedulers.io() | 用于IO密集型任務,如異步阻塞IO操作,這個調度器的線程池會根據需要增長;對于普通的計算任務,請使用Schedulers.computation();Schedulers.io()默認是一個CachedThreadScheduler,很像一個有線程緩存的新線程調度器 |
Schedulers.computation() | 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU |
Schedulers.from(executor) | 使用指定的Executor作為調度器 |
Schedulers.trampoline() | 當其它排隊的任務完成后,在當前線程排隊開始執行 |
AndroidSchedulers.mainThread() | RxAndroid中新增的Scheduler,表示在Android的main線程中運行 |
同時RxJava還為我們提供了subscribeOn()和observeOn()兩個方法來指定Observable和Observer運行的線程。
Observable.from(getCommunitiesFromServer()) .flatMap(community -> Observable.from(community.houses)) .filter(house -> house.price>=5000000).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(this::addHouseInformationToScreen);
上面這段代碼大家應該有印象吧,沒錯正是我們上一篇文章中的例子。subscribeOn(Schedulers.io())指定了獲取小區列表、處理房源信息等一系列事件都是在IO線程中運行,observeOn(AndroidSchedulers.mainThread())指定了在屏幕上展示房源的操作在UI線程執行。這就做到了在子線程獲取房源,主線程展示房源。
好了,RxJava系列的入門內容我們就聊到這。下一篇我們再繼續介紹更多的API以及它們內部的原理。
如果大家喜歡這一系列的文章,歡迎關注我的知乎專欄和GitHub。
知乎專欄:https://zhuanlan.zhihu.com/baron
GitHub:https://github.com/BaronZ88
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/66507.html
摘要:而這個就是線程調度的關鍵前面的例子中我們通過指定了發射處理事件以及通知觀察者的一系列操作的執行線程,正是通過這個創建了我們前面提到的。總結這一章以執行流程操作符實現以及線程調度三個方面為切入點剖析了源碼。 轉載請注明出處:https://zhuanlan.zhihu.com/p/22338235 RxJava系列1(簡介) RxJava系列2(基本概念及使用介紹) RxJava系列3...
摘要:從這一章開始,我們開始聊聊中的操作符,后面我將用三章的篇幅來分別介紹轉換類操作符過濾類操作符組合類操作符這一章我們主要講講轉換類操作符。函數同樣也是做轉換的,但是作用卻不一樣。 轉載請注明出處:https://zhuanlan.zhihu.com/p/21926591 RxJava系列1(簡介) RxJava系列2(基本概念及使用介紹) RxJava系列3(轉換操作符) RxJava...
閱讀 3109·2021-11-24 09:39
閱讀 967·2021-09-07 10:20
閱讀 2389·2021-08-23 09:45
閱讀 2253·2021-08-05 10:00
閱讀 565·2019-08-29 16:36
閱讀 833·2019-08-29 11:12
閱讀 2812·2019-08-26 11:34
閱讀 1839·2019-08-26 10:56