RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences. RxJava 是 Reactive Extensions 的 Java VM 实现:一个使用可观察序列组合异步和基于事件的程序的库。
RxJava 通过观察者订阅被观察者进行事件的分发,并提供了很多被观察者(即事件发送者)的创建方法和事件转换的操作符函数(map、flatmap、filter 等等)
下面以最简单的一个 SingleJust 的发射和订阅为例,来看一下 RxJava 中到底是如何进行订阅的
1 2 3 4 5 6 7 8 9 10 11 12 13 SingleJust.just(0 ) .map(new Function<Integer, String>() { @Override public String apply (@NonNull Integer integer) throws Exception { return integer.toString(); } }) .subscribe(new Consumer<String>() { @Override public void accept (String s) throws Exception { } });
首先看一下 SingleJust.just(1)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) @NonNull public static <T> Single<T> just (final T item) { ObjectHelper.requireNonNull(item, "item is null" ); return RxJavaPlugins.onAssembly(new SingleJust<T>(item)); } public static <T> Single<T> onAssembly (@NonNull Single<T> source) { Function<? super Single, ? extends Single> f = onSingleAssembly; if (f != null ) { return apply(f, source); } return source; }
重点看一下 SingleJust.java
类
很简单的一个类,继承 Single 类,构造方法中传入一个 value
,并实现 subscribeActual()
方法 在 subscribeActual()
方法中调用传入的 observer 的 onSubscribe()
方法和 onSuccess()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public final class SingleJust <T > extends Single <T > { final T value; public SingleJust (T value) { this .value = value; } @Override protected void subscribeActual (SingleObserver<? super T> observer) { observer.onSubscribe(Disposables.disposed()); observer.onSuccess(value); } }
而 subscribeActual()
方法中调用了 observer
(观察者)的 onSubscribe()
和 onSuccess()
方法 至于 subscribeActual()
什么时候被调用的,我们一会再看
再接着看 Single.map()
方法
1 2 3 4 5 6 7 8 9 @CheckReturnValue @NonNull @SchedulerSupport(SchedulerSupport.NONE) public final <R> Single<R> map (Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null" ); return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this , mapper)); }
具体看下 SingleMap
的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public final class SingleMap <T , R > extends Single <R > { final SingleSource<? extends T> source; final Function<? super T, ? extends R> mapper; public SingleMap (SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) { this .source = source; this .mapper = mapper; } @Override protected void subscribeActual (final SingleObserver<? super R> t) { source.subscribe(new MapSingleObserver<T, R>(t, mapper)); } static final class MapSingleObserver <T , R > implements SingleObserver <T > { final SingleObserver<? super R> t; final Function<? super T, ? extends R> mapper; MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) { this .t = t; this .mapper = mapper; } @Override public void onSubscribe (Disposable d) { t.onSubscribe(d); } @Override public void onSuccess (T value) { R v; try { v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value." ); } catch (Throwable e) { Exceptions.throwIfFatal(e); onError(e); return ; } t.onSuccess(v); } @Override public void onError (Throwable e) { t.onError(e); } } }
可见 SingleMap 的构造方法中 传入的第一个参数 SingleSource source
,即调用该方法的 SingleSouce 对象 第二个参数是个 Function mapper
,即转换方法
同样的,subscribeActual(final SingleObserver<? super R> t)
方法里调用了 source:SingleSource
的 subscribe 方法,传入了一个 MapSingleObserver
对象
MapSingleObserver 相当于一个代理类,对 subscribeActual()
方法中的 t 进行代理,当 mapSingleObserver 的方法被调用时,就会调用其构造方法中传入的 t 进行调用,在 onSuccess()
中先使用 mapper 对数据进行处理后,得到 result,再调用 t.onSuccess(result)
进行回调
再回过头来看 subscribe()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public final Disposable subscribe (final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError) { ObjectHelper.requireNonNull(onSuccess, "onSuccess is null" ); ObjectHelper.requireNonNull(onError, "onError is null" ); ConsumerSingleObserver<T> observer = new ConsumerSingleObserver<T>(onSuccess, onError); subscribe(observer); return observer; } public final void subscribe (SingleObserver<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null" ); observer = RxJavaPlugins.onSubscribe(this , observer); ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins" ); try { subscribeActual(observer); } catch (NullPointerException ex) { throw ex; } catch (Throwable ex) { Exceptions.throwIfFatal(ex); NullPointerException npe = new NullPointerException("subscribeActual failed" ); npe.initCause(ex); throw npe; } }
由上述我们可以知道,在 RxJava 中,就是由最下游的 subscribe() 方法中的参数 observer ,最终调用事件发出者的 subscribeActual() 方法
上述的代码的流程我们可以用下面的伪代码来进行理解
1 2 3 4 5 6 7 8 9 10 11 12 SingleObserver.subscribeActual(){ SingleMap.subscribeActual(){ SingleJust.subscribeActual(){ onSubscribe() onSuccess() } } }
整理一下,RxJava 的事件流起始在于调用了 subscribe()
方法,然后一层一层地调用上游的 subscribeActual(SingleObserver<? super T> observer)
方法,同时也把把自己传给上游
对于操作符来说,其 subscribeActual()
方法会调用其上游的 subscribe() -> subscribeActual()
方法,直到最上游的生产者
对于生产者来说,其 subscribeActual()
方法则是在生产数据,调用下游观察者(即从 subscribeActual() 方法中传入的参数) 的回调方法
所以实际上就是后面的 observer 在倒序地调用上游的 subscribeActual() 方法,然后不断调用下游的 onSuccess/onSubscribe/onError 等等方法
操作符持有调用它的上游的引用,并在被订阅的时候(subscribeActual()
) 将下游进行代理后,去订阅其上游
同理,Observable/Flowable 等 RxJava 的事件流也是同样的流程
画个流程图
RxJava 的线程切换
subscribeOn()
指定事件产生的线程
observeOn()
指定事件消费的线程
RxJava 中的线程调度是通过 subscribeOn(Scheduler scheduler)
和 observeOn(Scheduler scheduler)
两个方法完成的subscribeOn()
方法调度的是上游到第一个 observeOn()
之间的的线程observeOn()
调度的是该 observeOn()
到下一个 observeOn()
之间的的线程
所以在 RxJava 中可以进行任意的线程调度,但是,subscribeOn()
方法则有所区别,如果有多个 subscribeOn()
方法,只有最靠近上游的 subscribeOn()
起作用,而其他的 subscribeOn()
方法不起作用,但是这其中也有一个例外,当使用 doOnSubscribe()
方法时,这个方法发生在 subscribe()
调用后而且在事件发送前,这个方法是可以指定线程的,默认情况下, doOnSubscribe()
执行在 subscribe()
发生的线程;而如果在 doOnSubscribe()
之后有 subscribeOn()
的话,它将执行在离它最近的 subscribeOn()
所指定的线程。
subscribeOn()
和 observeOn()
方法需要传入一个 Schedule
对象
Schedule Schedule
是一个抽象类,其子类有我们经常使用的 Schedulers.io()
、Schedulers.newThread()
以及 AndroidSchedulers.mainThread()
等等
举个例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 Observable.just(1 ) .flatMap(new Function<Integer, ObservableSource<Integer>>() { @Override public ObservableSource<Integer> apply (Integer integer) throws Exception { Log.d("MainActivity" ,"1" ); Log.d("MainActivity" ,"flatMap" ); Log.d("MainActivity" ,Thread.currentThread().toString()); return Observable.just(integer); } }) .subscribeOn(Schedulers.newThread()) .map(new Function<Integer, String>() { @Override public String apply (Integer integer) throws Exception { Log.d("MainActivity" ,"2" ); Log.d("MainActivity" ,"map" ); Log.d("MainActivity" ,Thread.currentThread().toString()); return String.valueOf(integer); } }) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept (Disposable disposable) throws Exception { Log.d("MainActivity" ,"3" ); Log.d("MainActivity" ,"doOnSubscribe" ); Log.d("MainActivity" ,Thread.currentThread().toString()); } }) .observeOn(Schedulers.computation()) .map(new Function<String, String>() { @Override public String apply (String s) throws Exception { Log.d("MainActivity" ,"4" ); Log.d("MainActivity" ,"map" ); Log.d("MainActivity" ,Thread.currentThread().toString()); return s + " map " ; } }) .subscribe(new Consumer<String>() { @Override public void accept (String s) throws Exception { Log.d("MainActivity" ,"5" ); Log.d("MainActivity" ,"subscribe" ); Log.d("MainActivity" ,Thread.currentThread().toString()); Log.d("MainActivity" ,"result is " +s); } });
打印出来的日志为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 D/MainActivity: 3 D/MainActivity: doOnSubscribe D/MainActivity: Thread[main,5,main] D/MainActivity: 1 D/MainActivity: flatMap D/MainActivity: Thread[RxNewThreadScheduler-1,5,main] D/MainActivity: 2 D/MainActivity: map D/MainActivity: Thread[RxNewThreadScheduler-1,5,main] D/MainActivity: 4 D/MainActivity: map D/MainActivity: Thread[RxComputationThreadPool-1,5,main] D/MainActivity: 5 D/MainActivity: subscribe D/MainActivity: Thread[RxComputationThreadPool-1,5,main] D/MainActivity: result is 1 map
可以看出 doOnSubscribe 是最先被执行的
第一个 subscribeOn(Schedulers.io())
调度的是最上游的 1 和 2 中的线程 第二个 subscribeOn(Schedulers.newThread())
不起作用,只能调度 3 中的线程 第三个 observeOn(Schedulers.computation())
调度的是其后面的代码的线程,即 4 和 5
subscribeOn() 我们通过源码一步一步看,subscribeOn 到底是如何调度线程的
同上,也是先校验 scheduler 不为空 然后将 scheduler 包装为一个 ObservableSubscribeOn 并返回
1 2 3 4 5 6 7 io.reactivex.Observable#subscribeOn @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> subscribeOn (Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null" ); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this , scheduler)); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 io.reactivex.internal.operators.observable.ObservableSubscribeOn#ObservableSubscribeOn public ObservableSubscribeOn (ObservableSource<T> source, Scheduler scheduler) { super (source); this .scheduler = scheduler; } @Override public void subscribeActual (final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer); observer.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
1 2 3 4 5 6 7 8 9 10 11 12 13 final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this .parent = parent; } @Override public void run () { source.subscribe(parent); } }
ObservableSubscribeOn 类的构造方法 第一个参数是 ObservableSource source
即上面传入的 this
(就是调用 subscribeOn 的对象) 第二个参数是 Scheduler scheduler
,即要调度到的线程
再看 subscribeActual(final Observer<? super T> observer)
方法 从上文我们可以知道,当下游的观察者订阅时,会调用上游的 subscribeActual()
方法,在我们这里,ObservableSubscribeOn#subscribeActual()
的职责就是切换到指定的线程,并在指定的线程中调用上游的 subscribe()
方法,达到切换上游线程的目的
在这个方法中,会将下游的 observer 进行包装为 SubscribeOnOnserver
对象 parent
,这个对象中持有着下游 observer
重点看一下 scheduler.scheduleDirect(new SubsribeTask(parent))
SubscribeTask
是一个实现了 Runnable
接口的类,那肯定会在某个时机调用其 run()
方法,可以看到在 run()
方法中调用了 source.subscribe(parent)
,由于 SubscribeTask
是 ObservableSubscribeOn
的内部类,所以这里的 source
就是 ObservableSubscribeOn
构造方法中传入的第一个参数,即调用subscribeOn()
的上游对象,这里的 parent
则是上面的 SubscribeOnOnserver
包装对象
那么重点又转移到了 SubscribeTask
中的 run()
方法是如何被调用的了
我们继续看 io.reactivex.Scheduler#scheduleDirect(java.lang.Runnable)
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 public Disposable scheduleDirect (@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; }
可以看到这个 createWorker()
方法,是 Scheduler
类的抽象方法,我们以 Schedulers.newThread() 的具体实例 NewThreadScheduler
为例,来看一下这段代码到底是如何切换线程的
1 2 3 4 5 6 7 8 9 10 public final class NewThreadScheduler extends Scheduler { ...忽略一些代码... @NonNull @Override public Worker createWorker () { return new NewThreadWorker(threadFactory); } }
继续看 NewThreadWorker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public class NewThreadWorker extends Scheduler .Worker implements Disposable { private final ScheduledExecutorService executor; ...... public NewThreadWorker (ThreadFactory threadFactory) { executor = SchedulerPoolFactory.create(threadFactory); } @NonNull @Override public Disposable schedule (@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) { if (disposed) { return EmptyDisposable.INSTANCE; } return scheduleActual(action, delayTime, unit, null ); } @NonNull public ScheduledRunnable scheduleActual (final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); if (parent != null ) { if (!parent.add(sr)) { return sr; } } Future<?> f; try { if (delayTime <= 0 ) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null ) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; }
② 创建一个只有一个核心线程的线程池并返回
1 2 3 4 5 public static ScheduledExecutorService create (ThreadFactory factory) { final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1 , factory); tryPutIntoPool(PURGE_ENABLED, exec); return exec; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 static final class DisposeTask implements Disposable , Runnable , SchedulerRunnableIntrospection { @NonNull final Runnable decoratedRun; @NonNull final Worker w; @Nullable Thread runner; DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) { this .decoratedRun = decoratedRun; this .w = w; } @Override public void run () { runner = Thread.currentThread(); try { decoratedRun.run(); } finally { dispose(); runner = null ; } } @Override public void dispose () { if (runner == Thread.currentThread() && w instanceof NewThreadWorker) { ((NewThreadWorker)w).shutdown(); } else { w.dispose(); } } @Override public boolean isDisposed () { return w.isDisposed(); } @Override public Runnable getWrappedRunnable () { return this .decoratedRun; } }
由上述的代码我们可以看到 ① 处的 worker 调用 schedule() 方法,传入包装类 DisposeTask 的对象(持有 worker 和 subscribeTask),一步步调用到 ③ 处,③这里就是将传入的 run
对象(即 DisposeTask ),根据延迟时间,交给 ② 处的线程池去执行,最终执行到 ④ 处 DisposeTask 的 run()
方法,run()
方法中会的调用 ⑤处 的 decoratedRun.run()
,而这个 decoratedRun
对象就是我们在 ⑥处 传入的 SubscribeTask
对象,至此就调用到了它的 run 方法,也就是 ⑦处 的代码,完成了在 Schedules.newThread()
所指定的线程中调用 source.subscribe(parent)
的方法,使得上游的 subscribe() -> 即 subscribeActual()
的代码都运行在了 Schedules.newThread()
所指定的线程中
为何 subscribeOn() 方法只有第一个才生效
为了避免歧义,这里先不讨论 doOnSubscribe() 的情况
由上文我们可以知道,subscribeOn 的核心内容就是将 source.subscribe(parent)
方法放到指定的线程中去执行,其中 source
为上游,parent
为下游的监听者,所以上游的 subscribe()
方法,即 subscribeActual()
都会被执行在指定的线程中。
而当最上游的事件产生者接收到订阅后,就会开始发射事件,即调用 onNext、onError、onComplete 等方法,这些方法如果没有经过 observeOn 指定线程,则依然执行在subscribeOn 的线程中,否则执行在 observeOn 指定的线程中。
所以「subscribeOn() 方法只有第一个才生效」这种说法是有点令人误解的,每一个 subscribeOn 方法都会使得其上游的 subscribe() -> subscribeActual()
方法执行在指定的线程中
observeOn() 同上理,我们从源码开始着手
1 2 3 4 5 6 7 @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> observeOn (Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null" ); ObjectHelper.verifyPositive(bufferSize, "bufferSize" ); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this , scheduler, delayError, bufferSize)); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public final class ObservableObserveOn <T > extends AbstractObservableWithUpstream <T , T > { final Scheduler scheduler; final boolean delayError; final int bufferSize; public ObservableObserveOn (ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { super (source); this .scheduler = scheduler; this .delayError = delayError; this .bufferSize = bufferSize; } @Override protected void subscribeActual (Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } } ...... }
可以看到,当上游调用 observeOn()
,包装成一个 ObservableObserveOn 对象,并传入 source(即上游)
和所需要调度线程的 scheduler
由上文我们可以知道,当下游调用 subscribe()
方法后,会调用到 ①处的 subscribeActual()
方法, 在这里创建一个 worker 对象,从上文我们知道这个 worker 是一个用来进行切换线程的抽象类 将 worker 和 下游的 observer 进行代理为一个 ObserveOnObserver
对象,并让 source(即上游)
订阅这个被代理了的「下游 observer」
当上游调用下游的 onNext()
、onError()
、onComplete()
等方法时候,会调用到 ObserveOnObserver
这个代理对象中的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 static final class ObserveOnObserver <T > extends BasicIntQueueDisposable <T > implements Observer <T >, Runnable { ...... final Observer<? super T> downstream; final Scheduler.Worker worker; Disposable upstream; ...... ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this .downstream = actual; this .worker = worker; this .delayError = delayError; this .bufferSize = bufferSize; } @Override public void onSubscribe (Disposable d) { ...... } @Override public void onNext (T t) { ...... schedule(); } @Override public void onError (Throwable t) { ...... schedule(); } @Override public void onComplete () { ...... schedule(); } ...... void schedule () { if (getAndIncrement() == 0 ) { worker.schedule(this ); } } ...... @Override public void run () { if (outputFused) { drainFused(); } else { drainNormal(); } } ...... }
从代码中我们看到,在 ObserveOnObserver
的 onNext()
、onError()
、onComplete()
方法中调用了 schedule()
方法,而 schedule()
方法中调用了 worker.schedule(this)
同理我们知道会在该 worker 的线程中执行 ②处的 run 方法,在 run()
方法中的 drainNormal()
和 drainFused()
会调用传入的「下游 observer」相对应的 onNext()
、onError()
、onComplete()
等方法,也就达到了切换下游 observer 的代码的执行线程,从这里就完成了 observeOn() 的线程切换
总结 从上述的 subscribeOn() 和 observeOn() 的流程的分析,我们可以得出以下的结论
subscribeOn() 切换的是 source.subscribe()
这行代码的执行线程,所以他只能影响到的是从其开始,自下而上的 subscribe()
方法执行的线程,但从表象上看如果有多个的话,生效的只有最上游的 subscribeOn()
,且作用域是从最上游(事件生产者)到第一个 subscribeOn()
之间,订阅(subscribe)是自下而上的,但事件流真正的传递(onNext、onError、onComplete)是自上而下的
observeOn() 切换的是 onNext、onError、onComplete
等方法的执行线程,这些方法中再调用下游的代码进行自上而下的事件传递,所以多个 observeOn() 的话,作用域的就是其下游到下一个 obserbeOn() 之间的代码
由于开发者写的代码都是在 onNext、onError、onComplete
中,所以从表象上看,我们所写的各个操作符中的代码都是执行在 observeOn
指定的线程中
doOn 操作符 doOnNext、doOnError、doOnComplete 以上三个 doOn 操作符所在的线程由 subscribeOn 和 observeOn 决定
看下 doOnNext() 的源码
1 2 3 4 5 6 7 8 9 10 public final Observable<T> doOnNext (Consumer<? super T> onNext) { return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION); } private Observable<T> doOnEach (Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) { ObjectHelper.requireNonNull(onNext, "onNext is null" ); ObjectHelper.requireNonNull(onError, "onError is null" ); ObjectHelper.requireNonNull(onComplete, "onComplete is null" ); ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null" ); return RxJavaPlugins.onAssembly(new ObservableDoOnEach<T>(this , onNext, onError, onComplete, onAfterTerminate)); }
可以知道,doOnError()、doOnComplete() 方法同样是会调用 doOnEach() 方法中,通过 ObservableDoOnEach 类进行代理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 public final class ObservableDoOnEach <T > extends AbstractObservableWithUpstream <T , T > { final Consumer<? super T> onNext; final Consumer<? super Throwable> onError; final Action onComplete; final Action onAfterTerminate; public ObservableDoOnEach (ObservableSource<T> source, Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) { super (source); this .onNext = onNext; this .onError = onError; this .onComplete = onComplete; this .onAfterTerminate = onAfterTerminate; } @Override public void subscribeActual (Observer<? super T> t) { source.subscribe(new DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate)); } static final class DoOnEachObserver <T > implements Observer <T >, Disposable { final Observer<? super T> downstream; final Consumer<? super T> onNext; final Consumer<? super Throwable> onError; final Action onComplete; final Action onAfterTerminate; Disposable upstream; boolean done; DoOnEachObserver( Observer<? super T> actual, Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) { this .downstream = actual; this .onNext = onNext; this .onError = onError; this .onComplete = onComplete; this .onAfterTerminate = onAfterTerminate; } @Override public void onNext (T t) { if (done) { return ; } try { onNext.accept(t); } catch (Throwable e) { Exceptions.throwIfFatal(e); upstream.dispose(); onError(e); return ; } downstream.onNext(t); } } }
可见,doOnXxxx() 方法会在 onXxxx() 调用之前调用,且其执行的线程和其他操作符一样由 subscribeOn()
和 observeOn()
决定
doOnSubscribe() 在一个 RxJava 的调用链中,doOnSubscribe() 方法表示在被 subscribe 时执行
默认情况下,doOnSubscribe()
方法执行在 subscribe()
方法调用的线程中,但如果 doOnSubscribe()
方法后有subscribeOn()
指定了线程,则执行在指定的线程中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe (@NonNull ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "create thread is " + Thread.currentThread()); emitter.onNext(1 ); emitter.onComplete(); } }) .subscribeOn(Schedulers.newThread()) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept (Disposable disposable) throws Exception { Log.d(TAG, "thread is " + Thread.currentThread()); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe();
打印的日志如下 thread is Thread[RxCachedThreadScheduler-1,5,main] create thread is Thread[RxNewThreadScheduler-2,5,main]
说明 create 的代码执行在 Schedulers.newThread()
线程中,而 doOnSubscribe
的代码执行在 Schedulers.io()
线程中,接着看源码来分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public final class ObservableCreate <T > extends Observable <T > { final ObservableOnSubscribe<T> source; public ObservableCreate (ObservableOnSubscribe<T> source) { this .source = source; } @Override protected void subscribeActual (Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } } public final Observable<T> doOnSubscribe (Consumer<? super Disposable> onSubscribe) { return doOnLifecycle(onSubscribe, Functions.EMPTY_ACTION); } public final Observable<T> doOnLifecycle (final Consumer<? super Disposable> onSubscribe, final Action onDispose) { ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null" ); ObjectHelper.requireNonNull(onDispose, "onDispose is null" ); return RxJavaPlugins.onAssembly(new ObservableDoOnLifecycle<T>(this , onSubscribe, onDispose)); }
同上理,不废话,直接看 ObservableDoOnLifecycle
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public final class ObservableDoOnLifecycle <T > extends AbstractObservableWithUpstream <T , T > { private final Consumer<? super Disposable> onSubscribe; private final Action onDispose; public ObservableDoOnLifecycle (Observable<T> upstream, Consumer<? super Disposable> onSubscribe, Action onDispose) { super (upstream); this .onSubscribe = onSubscribe; this .onDispose = onDispose; } @Override protected void subscribeActual (Observer<? super T> observer) { source.subscribe(new DisposableLambdaObserver<T>(observer, onSubscribe, onDispose)); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public final class DisposableLambdaObserver <T > implements Observer <T >, Disposable { final Observer<? super T> downstream; final Consumer<? super Disposable> onSubscribe; public DisposableLambdaObserver (Observer<? super T> actual, Consumer<? super Disposable> onSubscribe, Action onDispose) { this .downstream = actual; this .onSubscribe = onSubscribe; this .onDispose = onDispose; } @Override public void onSubscribe (Disposable d) { onSubscribe.accept(d); } }
可以看到,在 onSubscribe()
方法中会调用 onSubscribe.accept()
方法,即 doOnSubscribe() 中的接口方法。
而从 Create 方法中可以看到 .onSubscribe()
方法的调用是在 subscribeActual()
中调用的,而从上文我们知道 subscribeActual()
所执行的线程是由 subscribeOn()
指定的,所以 .onSubscribe()
方法的线程也就执行在了 subscribeOn 指定的线程中了,所以 doOnSubscribe() 方法的线程会受到 subscribeOn()
方法影响