RxJava2 原理解析


RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
RxJava 是 Reactive Extensions 的 Java VM 实现:一个使用可观察序列组合异步和基于事件的程序的库。

RxJava 通过观察者订阅被观察者进行事件的分发,并提供了很多被观察者(即事件发送者)的创建方法和事件转换的操作符函数(map、flatmap、filter 等等)

下面以最简单的一个 SingleJust 的发射和订阅为例,来看一下 RxJava 中到底是如何进行订阅的

1
2
3
4
5
6
7
8
9
10
11
12
13
SingleJust.just(0)
.map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return integer.toString();
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {

}
});

首先看一下 SingleJust.just(1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static <T> Single<T> just(final T item) {
//检查校验参数不能为 null
ObjectHelper.requireNonNull(item, "item is null");
//这是个 hook 方法,一般默认返回传入的对象本身
//如果设置了 hook 对象(Function),则会对其进行一些操作后再返回
return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
}

//RxJavaPlugins.onAssembly
public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
Function<? super Single, ? extends Single> f = onSingleAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}

重点看一下 SingleJust.java

很简单的一个类,继承 Single 类,构造方法中传入一个 value,并实现 subscribeActual() 方法
subscribeActual() 方法中调用传入的 observer 的 onSubscribe() 方法和 onSuccess() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public final class SingleJust<T> extends Single<T> {

final T value;

public SingleJust(T value) {
this.value = value;
}

@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposables.disposed());
observer.onSuccess(value);
}

}

subscribeActual() 方法中调用了 observer(观察者)的 onSubscribe()onSuccess() 方法
至于 subscribeActual() 什么时候被调用的,我们一会再看

再接着看 Single.map() 方法

1
2
3
4
5
6
7
8
9
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
//检查入参
ObjectHelper.requireNonNull(mapper, "mapper is null");
//返回一个 SingleMap 对象
return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
}

具体看下 SingleMap 的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public final class SingleMap<T, R> extends Single<R> {
final SingleSource<? extends T> source;

final Function<? super T, ? extends R> mapper;

public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}

@Override
protected void subscribeActual(final SingleObserver<? super R> t) {
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}

static final class MapSingleObserver<T, R> implements SingleObserver<T> {

final SingleObserver<? super R> t;

final Function<? super T, ? extends R> mapper;

MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
this.t = t;
this.mapper = mapper;
}

@Override
public void onSubscribe(Disposable d) {
t.onSubscribe(d);
}

@Override
public void onSuccess(T value) {
R v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}

t.onSuccess(v);
}

@Override
public void onError(Throwable e) {
t.onError(e);
}
}
}

可见 SingleMap 的构造方法中
传入的第一个参数 SingleSource source,即调用该方法的 SingleSouce 对象
第二个参数是个 Function mapper,即转换方法

同样的,subscribeActual(final SingleObserver<? super R> t) 方法里调用了 source:SingleSource 的 subscribe 方法,传入了一个 MapSingleObserver 对象

MapSingleObserver 相当于一个代理类,对 subscribeActual() 方法中的 t 进行代理,当 mapSingleObserver 的方法被调用时,就会调用其构造方法中传入的 t 进行调用,在 onSuccess() 中先使用 mapper 对数据进行处理后,得到 result,再调用 t.onSuccess(result) 进行回调

再回过头来看 subscribe() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public final Disposable subscribe(final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError) {
//对参数进行检查
ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
ObjectHelper.requireNonNull(onError, "onError is null");

//对传入的观察者进行包装为一个 ConsumerSingleObserver
ConsumerSingleObserver<T> observer = new ConsumerSingleObserver<T>(onSuccess, onError);
//再调用 subscribe 方法订阅
subscribe(observer);
return observer;
}

public final void subscribe(SingleObserver<? super T> observer) {
//依旧是进行参数检查
ObjectHelper.requireNonNull(observer, "observer is null");
//同上理,进行 hook 配置
observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

try {
//重点在于这里,调用了 subscribeActual 方法
//subscribeActual 是个抽象方法,需要各个子类进行实现
//于是这里就会调用到了我们上面所看到的 SingleJust 类中的 subscribeActual 方法,完成了整个 RxJava 的流程
subscribeActual(observer);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
NullPointerException npe = new NullPointerException("subscribeActual failed");
npe.initCause(ex);
throw npe;
}
}

由上述我们可以知道,在 RxJava 中,就是由最下游的 subscribe() 方法中的参数 observer ,最终调用事件发出者的 subscribeActual() 方法

上述的代码的流程我们可以用下面的伪代码来进行理解

1
2
3
4
5
6
7
8
9
10
11
12
//最后被执行
SingleObserver.subscribeActual(){
//第二个被执行
SingleMap.subscribeActual(){
//最先执行
SingleJust.subscribeActual(){
//create
onSubscribe()
onSuccess()
}
}
}

整理一下,RxJava 的事件流起始在于调用了 subscribe() 方法,然后一层一层地调用上游的 subscribeActual(SingleObserver<? super T> observer) 方法,同时也把把自己传给上游

对于操作符来说,其 subscribeActual() 方法会调用其上游的 subscribe() -> subscribeActual() 方法,直到最上游的生产者

对于生产者来说,其 subscribeActual() 方法则是在生产数据,调用下游观察者(即从 subscribeActual() 方法中传入的参数) 的回调方法

所以实际上就是后面的 observer 在倒序地调用上游的 subscribeActual() 方法,然后不断调用下游的 onSuccess/onSubscribe/onError 等等方法

操作符持有调用它的上游的引用,并在被订阅的时候(subscribeActual()) 将下游进行代理后,去订阅其上游

同理,Observable/Flowable 等 RxJava 的事件流也是同样的流程

画个流程图
RxJava.drawio _1_.png

RxJava 的线程切换

  • subscribeOn()

    指定事件产生的线程

  • observeOn()

    指定事件消费的线程

RxJava 中的线程调度是通过 subscribeOn(Scheduler scheduler)observeOn(Scheduler scheduler) 两个方法完成的
subscribeOn() 方法调度的是上游到第一个 observeOn() 之间的的线程
observeOn() 调度的是该 observeOn() 到下一个 observeOn() 之间的的线程

所以在 RxJava 中可以进行任意的线程调度,但是,subscribeOn() 方法则有所区别,如果有多个 subscribeOn() 方法,只有最靠近上游的 subscribeOn() 起作用,而其他的 subscribeOn() 方法不起作用,但是这其中也有一个例外,当使用 doOnSubscribe() 方法时,这个方法发生在 subscribe() 调用后而且在事件发送前,这个方法是可以指定线程的,默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。

subscribeOn()observeOn() 方法需要传入一个 Schedule 对象

Schedule

Schedule 是一个抽象类,其子类有我们经常使用的 Schedulers.io()Schedulers.newThread() 以及 AndroidSchedulers.mainThread() 等等

举个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
Observable.just(1)
//---------1 start ------------//
.flatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer integer) throws Exception {
Log.d("MainActivity","1");
Log.d("MainActivity","flatMap");
Log.d("MainActivity",Thread.currentThread().toString());
return Observable.just(integer);
}
})
//---------1 end ------------//
.subscribeOn(Schedulers.newThread())//调度从上游到第一个 observerOn() 方法之间的线程,即 1 和 2
//---------2 start ------------//
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
Log.d("MainActivity","2");
Log.d("MainActivity","map");
Log.d("MainActivity",Thread.currentThread().toString());
return String.valueOf(integer);
}
})
//---------2 end ------------//
//---------3 start ------------//
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d("MainActivity","3");
Log.d("MainActivity","doOnSubscribe");
Log.d("MainActivity",Thread.currentThread().toString());
}
})
//---------3 end ------------//
.observeOn(Schedulers.computation())//调度下面的线程,即4 和5
//---------4 start ------------//
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("MainActivity","4");
Log.d("MainActivity","map");
Log.d("MainActivity",Thread.currentThread().toString());
return s + " map ";
}
})
//---------4 end ------------//
//只能决定 doOnSubscribe 中的线程,即3
//.subscribeOn(Schedulers.newThread())
//---------5 start ------------//
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("MainActivity","5");
Log.d("MainActivity","subscribe");
Log.d("MainActivity",Thread.currentThread().toString());
Log.d("MainActivity","result is "+s);

}
});
//---------5 end ------------//

打印出来的日志为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
D/MainActivity: 3
D/MainActivity: doOnSubscribe
D/MainActivity: Thread[main,5,main]
D/MainActivity: 1
D/MainActivity: flatMap
D/MainActivity: Thread[RxNewThreadScheduler-1,5,main]
D/MainActivity: 2
D/MainActivity: map
D/MainActivity: Thread[RxNewThreadScheduler-1,5,main]
D/MainActivity: 4
D/MainActivity: map
D/MainActivity: Thread[RxComputationThreadPool-1,5,main]
D/MainActivity: 5
D/MainActivity: subscribe
D/MainActivity: Thread[RxComputationThreadPool-1,5,main]
D/MainActivity: result is 1 map

可以看出 doOnSubscribe 是最先被执行的

第一个 subscribeOn(Schedulers.io()) 调度的是最上游的 1 和 2 中的线程
第二个 subscribeOn(Schedulers.newThread()) 不起作用,只能调度 3 中的线程
第三个 observeOn(Schedulers.computation()) 调度的是其后面的代码的线程,即 4 和 5

subscribeOn()

我们通过源码一步一步看,subscribeOn 到底是如何调度线程的

同上,也是先校验 scheduler 不为空
然后将 scheduler 包装为一个 ObservableSubscribeOn 并返回

1
2
3
4
5
6
7
io.reactivex.Observable#subscribeOn
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
io.reactivex.internal.operators.observable.ObservableSubscribeOn#ObservableSubscribeOn
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}

@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

observer.onSubscribe(parent);

//⑥
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;

SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}

//⑦
@Override
public void run() {
source.subscribe(parent);
}
}

ObservableSubscribeOn 类的构造方法
第一个参数是 ObservableSource source 即上面传入的 this(就是调用 subscribeOn 的对象)
第二个参数是 Scheduler scheduler,即要调度到的线程

再看 subscribeActual(final Observer<? super T> observer) 方法
从上文我们可以知道,当下游的观察者订阅时,会调用上游的 subscribeActual() 方法,在我们这里,ObservableSubscribeOn#subscribeActual() 的职责就是切换到指定的线程,并在指定的线程中调用上游的 subscribe() 方法,达到切换上游线程的目的

在这个方法中,会将下游的 observer 进行包装为 SubscribeOnOnserver 对象 parent,这个对象中持有着下游 observer

重点看一下 scheduler.scheduleDirect(new SubsribeTask(parent))

SubscribeTask 是一个实现了 Runnable 接口的类,那肯定会在某个时机调用其 run() 方法,可以看到在 run() 方法中调用了 source.subscribe(parent),由于 SubscribeTaskObservableSubscribeOn 的内部类,所以这里的 source 就是 ObservableSubscribeOn 构造方法中传入的第一个参数,即调用subscribeOn() 的上游对象,这里的 parent 则是上面的 SubscribeOnOnserver 包装对象

那么重点又转移到了 SubscribeTask 中的 run() 方法是如何被调用的了

我们继续看 io.reactivex.Scheduler#scheduleDirect(java.lang.Runnable) 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//创建一个 Worker
final Worker w = createWorker();
//对传入的 run 对象(即前面的 SubscribeTask 对象)做钩子处理
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//将 decoratedRun 和 w 包装成一个 DisposeTask (即一个可以被取消的 Runnable)
DisposeTask task = new DisposeTask(decoratedRun, w);
//worker 调用 schedule 方法进行操作
//① 具体的实现见下方
w.schedule(task, delay, unit);

return task;
}

可以看到这个 createWorker() 方法,是 Scheduler 类的抽象方法,我们以 Schedulers.newThread() 的具体实例 NewThreadScheduler 为例,来看一下这段代码到底是如何切换线程的

1
2
3
4
5
6
7
8
9
10
public final class NewThreadScheduler extends Scheduler {

...忽略一些代码...

@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}

继续看 NewThreadWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
......
public NewThreadWorker(ThreadFactory threadFactory) {
//具体代码见 ②
executor = SchedulerPoolFactory.create(threadFactory);
}

//从上文可以知道,①处的代码会调用到这里
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}

//③
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}

Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}

return sr;
}

② 创建一个只有一个核心线程的线程池并返回

1
2
3
4
5
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
tryPutIntoPool(PURGE_ENABLED, exec);
return exec;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {

@NonNull
final Runnable decoratedRun;

@NonNull
final Worker w;

@Nullable
Thread runner;

DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}

//④
@Override
public void run() {
runner = Thread.currentThread();
try {
//⑤
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}

@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
w.dispose();
}
}

@Override
public boolean isDisposed() {
return w.isDisposed();
}

@Override
public Runnable getWrappedRunnable() {
return this.decoratedRun;
}
}

由上述的代码我们可以看到
① 处的 worker 调用 schedule() 方法,传入包装类 DisposeTask 的对象(持有 worker 和 subscribeTask),一步步调用到 ③ 处,③这里就是将传入的 run 对象(即 DisposeTask ),根据延迟时间,交给 ② 处的线程池去执行,最终执行到 ④ 处 DisposeTask 的 run() 方法,run() 方法中会的调用 ⑤处 的 decoratedRun.run(),而这个 decoratedRun 对象就是我们在 ⑥处 传入的 SubscribeTask 对象,至此就调用到了它的 run 方法,也就是 ⑦处 的代码,完成了在 Schedules.newThread() 所指定的线程中调用 source.subscribe(parent) 的方法,使得上游的 subscribe() -> 即 subscribeActual() 的代码都运行在了 Schedules.newThread() 所指定的线程中

observeOn()

同上理,我们从源码开始着手

1
2
3
4
5
6
7
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}

//①
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();

source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}

......
}

可以看到,当上游调用 observeOn() ,包装成一个 ObservableObserveOn 对象,并传入 source(即上游) 和所需要调度线程的 scheduler

由上文我们可以知道,当下游调用 subscribe() 方法后,会调用到 ①处的 subscribeActual() 方法,
在这里创建一个 worker 对象,从上文我们知道这个 worker 是一个用来进行切换线程的抽象类
将 worker 和 下游的 observer 进行代理为一个 ObserveOnObserver 对象,并让 source(即上游) 订阅这个被代理了的「下游 observer」

当上游调用下游的 onNext()onError()onComplete() 等方法时候,会调用到 ObserveOnObserver 这个代理对象中的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
......//省略一些代码
final Observer<? super T> downstream;
final Scheduler.Worker worker;
Disposable upstream;
......//省略一些代码
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}

@Override
public void onSubscribe(Disposable d) {
......//省略一些代码
}

@Override
public void onNext(T t) {
......//省略一些代码
schedule();
}

@Override
public void onError(Throwable t) {
......//省略一些代码
schedule();
}

@Override
public void onComplete() {
......//省略一些代码
schedule();
}

......//省略一些代码

void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}

......//省略一些代码

//②
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}

......//省略一些代码
}

从代码中我们看到,在 ObserveOnObserveronNext()onError()onComplete() 方法中调用了 schedule() 方法,而 schedule() 方法中调用了 worker.schedule(this)

同理我们知道会在该 worker 的线程中执行 ②处的 run 方法,在 run() 方法中的 drainNormal()drainFused() 会调用传入的「下游 observer」相对应的 onNext()onError()onComplete()等方法,也就达到了切换下游 observer 的代码的执行线程,从这里就完成了 observeOn() 的线程切换

总结

从上述的 subscribeOn() 和 observeOn() 的流程的分析,我们可以得出以下的结论

  1. subscribeOn() 切换的是 source.subscribe() 这行代码的执行线程,所以他只能影响到其上游的代码的执行线程,如果有多个的话,生效的只有最上游的 subscribeOn(),并切作用域是从最上游到第一个 subscribeOn() 订阅(subscribe)是自下而上的,但事件流真正的传递(onNext、onError、onComplete)是自上而下的
  2. observeOn() 切换的是 onNext、onError、onComplete 等方法的执行线程,这些方法中再调用下游的代码进行自上而下的事件传递,所以多个 observeOn() 的话,作用域的就是其下游到下一个 obserbeOn() 之间的代码