RxJava2 原理解析


RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
RxJava 是 Reactive Extensions 的 Java VM 实现:一个使用可观察序列组合异步和基于事件的程序的库。

RxJava 通过观察者订阅被观察者进行事件的分发,并提供了很多被观察者(即事件发送者)的创建方法和事件转换的操作符函数(map、flatmap、filter 等等)

下面以最简单的一个 SingleJust 的发射和订阅为例,来看一下 RxJava 中到底是如何进行订阅的

1
2
3
4
5
6
7
8
9
10
11
12
13
SingleJust.just(0)
.map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return integer.toString();
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {

}
});

首先看一下 SingleJust.just(1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static <T> Single<T> just(final T item) {
//检查校验参数不能为 null
ObjectHelper.requireNonNull(item, "item is null");
//这是个 hook 方法,一般默认返回传入的对象本身
//如果设置了 hook 对象(Function),则会对其进行一些操作后再返回
return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
}

//RxJavaPlugins.onAssembly
public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
Function<? super Single, ? extends Single> f = onSingleAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}

重点看一下 SingleJust.java

很简单的一个类,继承 Single 类,构造方法中传入一个 value,并实现 subscribeActual() 方法
subscribeActual() 方法中调用传入的 observer 的 onSubscribe() 方法和 onSuccess() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public final class SingleJust<T> extends Single<T> {

final T value;

public SingleJust(T value) {
this.value = value;
}

@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposables.disposed());
observer.onSuccess(value);
}

}

subscribeActual() 方法中调用了 observer(观察者)的 onSubscribe()onSuccess() 方法
至于 subscribeActual() 什么时候被调用的,我们一会再看

再接着看 Single.map() 方法

1
2
3
4
5
6
7
8
9
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
//检查入参
ObjectHelper.requireNonNull(mapper, "mapper is null");
//返回一个 SingleMap 对象
return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
}

具体看下 SingleMap 的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public final class SingleMap<T, R> extends Single<R> {
final SingleSource<? extends T> source;

final Function<? super T, ? extends R> mapper;

public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}

@Override
protected void subscribeActual(final SingleObserver<? super R> t) {
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}

static final class MapSingleObserver<T, R> implements SingleObserver<T> {

final SingleObserver<? super R> t;

final Function<? super T, ? extends R> mapper;

MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
this.t = t;
this.mapper = mapper;
}

@Override
public void onSubscribe(Disposable d) {
t.onSubscribe(d);
}

@Override
public void onSuccess(T value) {
R v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}

t.onSuccess(v);
}

@Override
public void onError(Throwable e) {
t.onError(e);
}
}
}

可见 SingleMap 的构造方法中
传入的第一个参数 SingleSource source,即调用该方法的 SingleSouce 对象
第二个参数是个 Function mapper,即转换方法

同样的,subscribeActual(final SingleObserver<? super R> t) 方法里调用了 source:SingleSource 的 subscribe 方法,传入了一个 MapSingleObserver 对象

MapSingleObserver 相当于一个代理类,对 subscribeActual() 方法中的 t 进行代理,当 mapSingleObserver 的方法被调用时,就会调用其构造方法中传入的 t 进行调用,在 onSuccess() 中先使用 mapper 对数据进行处理后,得到 result,再调用 t.onSuccess(result) 进行回调

再回过头来看 subscribe() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public final Disposable subscribe(final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError) {
//对参数进行检查
ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
ObjectHelper.requireNonNull(onError, "onError is null");

//对传入的观察者进行包装为一个 ConsumerSingleObserver
ConsumerSingleObserver<T> observer = new ConsumerSingleObserver<T>(onSuccess, onError);
//再调用 subscribe 方法订阅
subscribe(observer);
return observer;
}

public final void subscribe(SingleObserver<? super T> observer) {
//依旧是进行参数检查
ObjectHelper.requireNonNull(observer, "observer is null");
//同上理,进行 hook 配置
observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

try {
//重点在于这里,调用了 subscribeActual 方法
//subscribeActual 是个抽象方法,需要各个子类进行实现
//于是这里就会调用到了我们上面所看到的 SingleJust 类中的 subscribeActual 方法,完成了整个 RxJava 的流程
subscribeActual(observer);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
NullPointerException npe = new NullPointerException("subscribeActual failed");
npe.initCause(ex);
throw npe;
}
}

由上述我们可以知道,在 RxJava 中,就是由最下游的 subscribe() 方法中的参数 observer ,最终调用事件发出者的 subscribeActual() 方法

上述的代码的流程我们可以用下面的伪代码来进行理解

1
2
3
4
5
6
7
8
9
10
11
12
//最后被执行
SingleObserver.subscribeActual(){
//第二个被执行
SingleMap.subscribeActual(){
//最先执行
SingleJust.subscribeActual(){
//create
onSubscribe()
onSuccess()
}
}
}

整理一下,RxJava 的事件流起始在于调用了 subscribe() 方法,然后一层一层地调用上游的 subscribeActual(SingleObserver<? super T> observer) 方法,同时也把把自己传给上游

对于操作符来说,其 subscribeActual() 方法会调用其上游的 subscribe() -> subscribeActual() 方法,直到最上游的生产者

对于生产者来说,其 subscribeActual() 方法则是在生产数据,调用下游观察者(即从 subscribeActual() 方法中传入的参数) 的回调方法

所以实际上就是后面的 observer 在倒序地调用上游的 subscribeActual() 方法,然后不断调用下游的 onSuccess/onSubscribe/onError 等等方法

操作符持有调用它的上游的引用,并在被订阅的时候(subscribeActual()) 将下游进行代理后,去订阅其上游

同理,Observable/Flowable 等 RxJava 的事件流也是同样的流程

画个流程图
RxJava.drawio _1_.png

RxJava 的线程切换

  • subscribeOn()

    指定事件产生的线程

  • observeOn()

    指定事件消费的线程

RxJava 中的线程调度是通过 subscribeOn(Scheduler scheduler)observeOn(Scheduler scheduler) 两个方法完成的
subscribeOn() 方法调度的是上游到第一个 observeOn() 之间的的线程
observeOn() 调度的是该 observeOn() 到下一个 observeOn() 之间的的线程

所以在 RxJava 中可以进行任意的线程调度,但是,subscribeOn() 方法则有所区别,如果有多个 subscribeOn() 方法,只有最靠近上游的 subscribeOn() 起作用,而其他的 subscribeOn() 方法不起作用,但是这其中也有一个例外,当使用 doOnSubscribe() 方法时,这个方法发生在 subscribe() 调用后而且在事件发送前,这个方法是可以指定线程的,默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。

subscribeOn()observeOn() 方法需要传入一个 Schedule 对象

Schedule

Schedule 是一个抽象类,其子类有我们经常使用的 Schedulers.io()Schedulers.newThread() 以及 AndroidSchedulers.mainThread() 等等

举个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
Observable.just(1)
//---------1 start ------------//
.flatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer integer) throws Exception {
Log.d("MainActivity","1");
Log.d("MainActivity","flatMap");
Log.d("MainActivity",Thread.currentThread().toString());
return Observable.just(integer);
}
})
//---------1 end ------------//
.subscribeOn(Schedulers.newThread())//调度从上游到第一个 observerOn() 方法之间的线程,即 1 和 2
//---------2 start ------------//
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
Log.d("MainActivity","2");
Log.d("MainActivity","map");
Log.d("MainActivity",Thread.currentThread().toString());
return String.valueOf(integer);
}
})
//---------2 end ------------//
//---------3 start ------------//
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d("MainActivity","3");
Log.d("MainActivity","doOnSubscribe");
Log.d("MainActivity",Thread.currentThread().toString());
}
})
//---------3 end ------------//
.observeOn(Schedulers.computation())//调度下面的线程,即4 和5
//---------4 start ------------//
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("MainActivity","4");
Log.d("MainActivity","map");
Log.d("MainActivity",Thread.currentThread().toString());
return s + " map ";
}
})
//---------4 end ------------//
//只能决定 doOnSubscribe 中的线程,即3
//.subscribeOn(Schedulers.newThread())
//---------5 start ------------//
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("MainActivity","5");
Log.d("MainActivity","subscribe");
Log.d("MainActivity",Thread.currentThread().toString());
Log.d("MainActivity","result is "+s);

}
});
//---------5 end ------------//

打印出来的日志为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
D/MainActivity: 3
D/MainActivity: doOnSubscribe
D/MainActivity: Thread[main,5,main]
D/MainActivity: 1
D/MainActivity: flatMap
D/MainActivity: Thread[RxNewThreadScheduler-1,5,main]
D/MainActivity: 2
D/MainActivity: map
D/MainActivity: Thread[RxNewThreadScheduler-1,5,main]
D/MainActivity: 4
D/MainActivity: map
D/MainActivity: Thread[RxComputationThreadPool-1,5,main]
D/MainActivity: 5
D/MainActivity: subscribe
D/MainActivity: Thread[RxComputationThreadPool-1,5,main]
D/MainActivity: result is 1 map

可以看出 doOnSubscribe 是最先被执行的

第一个 subscribeOn(Schedulers.io()) 调度的是最上游的 1 和 2 中的线程
第二个 subscribeOn(Schedulers.newThread()) 不起作用,只能调度 3 中的线程
第三个 observeOn(Schedulers.computation()) 调度的是其后面的代码的线程,即 4 和 5

subscribeOn()

我们通过源码一步一步看,subscribeOn 到底是如何调度线程的

同上,也是先校验 scheduler 不为空
然后将 scheduler 包装为一个 ObservableSubscribeOn 并返回

1
2
3
4
5
6
7
io.reactivex.Observable#subscribeOn
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
io.reactivex.internal.operators.observable.ObservableSubscribeOn#ObservableSubscribeOn
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}

@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

//调用下游的 onSubscribe() 方法,
observer.onSubscribe(parent);

//⑥
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;

SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}

//⑦
@Override
public void run() {
source.subscribe(parent);
}
}

ObservableSubscribeOn 类的构造方法
第一个参数是 ObservableSource source 即上面传入的 this(就是调用 subscribeOn 的对象)
第二个参数是 Scheduler scheduler,即要调度到的线程

再看 subscribeActual(final Observer<? super T> observer) 方法
从上文我们可以知道,当下游的观察者订阅时,会调用上游的 subscribeActual() 方法,在我们这里,ObservableSubscribeOn#subscribeActual() 的职责就是切换到指定的线程,并在指定的线程中调用上游的 subscribe() 方法,达到切换上游线程的目的

在这个方法中,会将下游的 observer 进行包装为 SubscribeOnOnserver 对象 parent,这个对象中持有着下游 observer

重点看一下 scheduler.scheduleDirect(new SubsribeTask(parent))

SubscribeTask 是一个实现了 Runnable 接口的类,那肯定会在某个时机调用其 run() 方法,可以看到在 run() 方法中调用了 source.subscribe(parent),由于 SubscribeTaskObservableSubscribeOn 的内部类,所以这里的 source 就是 ObservableSubscribeOn 构造方法中传入的第一个参数,即调用subscribeOn() 的上游对象,这里的 parent 则是上面的 SubscribeOnOnserver 包装对象

那么重点又转移到了 SubscribeTask 中的 run() 方法是如何被调用的了

我们继续看 io.reactivex.Scheduler#scheduleDirect(java.lang.Runnable) 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//创建一个 Worker
final Worker w = createWorker();
//对传入的 run 对象(即前面的 SubscribeTask 对象)做钩子处理
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//将 decoratedRun 和 w 包装成一个 DisposeTask (即一个可以被取消的 Runnable)
DisposeTask task = new DisposeTask(decoratedRun, w);
//worker 调用 schedule 方法进行操作
//① 具体的实现见下方
w.schedule(task, delay, unit);

return task;
}

可以看到这个 createWorker() 方法,是 Scheduler 类的抽象方法,我们以 Schedulers.newThread() 的具体实例 NewThreadScheduler 为例,来看一下这段代码到底是如何切换线程的

1
2
3
4
5
6
7
8
9
10
public final class NewThreadScheduler extends Scheduler {

...忽略一些代码...

@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}

继续看 NewThreadWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
......
public NewThreadWorker(ThreadFactory threadFactory) {
//具体代码见 ②
executor = SchedulerPoolFactory.create(threadFactory);
}

//从上文可以知道,①处的代码会调用到这里
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}

//③
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}

Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}

return sr;
}

② 创建一个只有一个核心线程的线程池并返回

1
2
3
4
5
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
tryPutIntoPool(PURGE_ENABLED, exec);
return exec;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {

@NonNull
final Runnable decoratedRun;

@NonNull
final Worker w;

@Nullable
Thread runner;

DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}

//④
@Override
public void run() {
runner = Thread.currentThread();
try {
//⑤
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}

@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
w.dispose();
}
}

@Override
public boolean isDisposed() {
return w.isDisposed();
}

@Override
public Runnable getWrappedRunnable() {
return this.decoratedRun;
}
}

由上述的代码我们可以看到
① 处的 worker 调用 schedule() 方法,传入包装类 DisposeTask 的对象(持有 worker 和 subscribeTask),一步步调用到 ③ 处,③这里就是将传入的 run 对象(即 DisposeTask ),根据延迟时间,交给 ② 处的线程池去执行,最终执行到 ④ 处 DisposeTask 的 run() 方法,run() 方法中会的调用 ⑤处 的 decoratedRun.run(),而这个 decoratedRun 对象就是我们在 ⑥处 传入的 SubscribeTask 对象,至此就调用到了它的 run 方法,也就是 ⑦处 的代码,完成了在 Schedules.newThread() 所指定的线程中调用 source.subscribe(parent) 的方法,使得上游的 subscribe() -> 即 subscribeActual() 的代码都运行在了 Schedules.newThread() 所指定的线程中

为何 subscribeOn() 方法只有第一个才生效

为了避免歧义,这里先不讨论 doOnSubscribe() 的情况

由上文我们可以知道,subscribeOn 的核心内容就是将 source.subscribe(parent) 方法放到指定的线程中去执行,其中 source 为上游,parent 为下游的监听者,所以上游的 subscribe() 方法,即 subscribeActual() 都会被执行在指定的线程中。

而当最上游的事件产生者接收到订阅后,就会开始发射事件,即调用 onNext、onError、onComplete 等方法,这些方法如果没有经过 observeOn 指定线程,则依然执行在subscribeOn 的线程中,否则执行在 observeOn 指定的线程中。

所以「subscribeOn() 方法只有第一个才生效」这种说法是有点令人误解的,每一个 subscribeOn 方法都会使得其上游的 subscribe() -> subscribeActual() 方法执行在指定的线程中

observeOn()

同上理,我们从源码开始着手

1
2
3
4
5
6
7
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}

//①
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();

source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}

......
}

可以看到,当上游调用 observeOn() ,包装成一个 ObservableObserveOn 对象,并传入 source(即上游) 和所需要调度线程的 scheduler

由上文我们可以知道,当下游调用 subscribe() 方法后,会调用到 ①处的 subscribeActual() 方法,
在这里创建一个 worker 对象,从上文我们知道这个 worker 是一个用来进行切换线程的抽象类
将 worker 和 下游的 observer 进行代理为一个 ObserveOnObserver 对象,并让 source(即上游) 订阅这个被代理了的「下游 observer」

当上游调用下游的 onNext()onError()onComplete() 等方法时候,会调用到 ObserveOnObserver 这个代理对象中的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
......//省略一些代码
final Observer<? super T> downstream;
final Scheduler.Worker worker;
Disposable upstream;
......//省略一些代码
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}

@Override
public void onSubscribe(Disposable d) {
......//省略一些代码
}

@Override
public void onNext(T t) {
......//省略一些代码
schedule();
}

@Override
public void onError(Throwable t) {
......//省略一些代码
schedule();
}

@Override
public void onComplete() {
......//省略一些代码
schedule();
}

......//省略一些代码

void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}

......//省略一些代码

//②
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}

......//省略一些代码
}

从代码中我们看到,在 ObserveOnObserveronNext()onError()onComplete() 方法中调用了 schedule() 方法,而 schedule() 方法中调用了 worker.schedule(this)

同理我们知道会在该 worker 的线程中执行 ②处的 run 方法,在 run() 方法中的 drainNormal()drainFused() 会调用传入的「下游 observer」相对应的 onNext()onError()onComplete()等方法,也就达到了切换下游 observer 的代码的执行线程,从这里就完成了 observeOn() 的线程切换

总结

从上述的 subscribeOn() 和 observeOn() 的流程的分析,我们可以得出以下的结论

  1. subscribeOn() 切换的是 source.subscribe() 这行代码的执行线程,所以他只能影响到的是从其开始,自下而上的 subscribe() 方法执行的线程,但从表象上看如果有多个的话,生效的只有最上游的 subscribeOn(),且作用域是从最上游(事件生产者)到第一个 subscribeOn() 之间,订阅(subscribe)是自下而上的,但事件流真正的传递(onNext、onError、onComplete)是自上而下的
  2. observeOn() 切换的是 onNext、onError、onComplete 等方法的执行线程,这些方法中再调用下游的代码进行自上而下的事件传递,所以多个 observeOn() 的话,作用域的就是其下游到下一个 obserbeOn() 之间的代码
  3. 由于开发者写的代码都是在 onNext、onError、onComplete 中,所以从表象上看,我们所写的各个操作符中的代码都是执行在 observeOn 指定的线程中

doOn 操作符

doOnNext、doOnError、doOnComplete

以上三个 doOn 操作符所在的线程由 subscribeOn 和 observeOn 决定

看下 doOnNext() 的源码

1
2
3
4
5
6
7
8
9
10
public final Observable<T> doOnNext(Consumer<? super T> onNext) {
return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
}
private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null");
return RxJavaPlugins.onAssembly(new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate));
}

可以知道,doOnError()、doOnComplete() 方法同样是会调用 doOnEach() 方法中,通过 ObservableDoOnEach 类进行代理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;

/**
* 构造方法
* 传入对应的 onNext、onError、onComplete、onAfterTerminate 方法
* @param source 上文中传入的 this,即上游的 Observable
*/
public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Action onComplete,
Action onAfterTerminate) {
super(source);
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onAfterTerminate = onAfterTerminate;
}

/**
* 当调用 subscribe() 方法后会调用这个方法
* @param t 下游或者下游的代理包装类
*/
@Override
public void subscribeActual(Observer<? super T> t) {
//调用上游的 subscribe 方法(-> subscribeActual ) 传递给上游
source.subscribe(new DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate));
}

static final class DoOnEachObserver<T> implements Observer<T>, Disposable {
final Observer<? super T> downstream;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;

Disposable upstream;

boolean done;

/**
* 构造方法
* @param actual 下游 Observer
* @param onNext 传入的 doOnNext 中的对象
*/
DoOnEachObserver(
Observer<? super T> actual,
Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Action onComplete,
Action onAfterTerminate) {
this.downstream = actual;
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onAfterTerminate = onAfterTerminate;
}

//...省略部分代码

/**
* 当上游调用下游的 onNext 方法时,会调用到这个方法
*/
@Override
public void onNext(T t) {
if (done) {
return;
}
try {
//调用 onNext 的 accept 方法
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
upstream.dispose();
onError(e);
return;
}
//再调用下游的 onNext 方法
downstream.onNext(t);
}
//...省略部分代码
}
}

可见,doOnXxxx() 方法会在 onXxxx() 调用之前调用,且其执行的线程和其他操作符一样由 subscribeOn()observeOn() 决定

doOnSubscribe()

在一个 RxJava 的调用链中,doOnSubscribe() 方法表示在被 subscribe 时执行

默认情况下,doOnSubscribe() 方法执行在 subscribe() 方法调用的线程中,但如果 doOnSubscribe() 方法后有subscribeOn()指定了线程,则执行在指定的线程中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "create thread is " + Thread.currentThread());
emitter.onNext(1);
emitter.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG, "thread is " + Thread.currentThread());
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();

打印的日志如下
thread is Thread[RxCachedThreadScheduler-1,5,main]
create thread is Thread[RxNewThreadScheduler-2,5,main]

说明 create 的代码执行在 Schedulers.newThread() 线程中,而 doOnSubscribe 的代码执行在 Schedulers.io() 线程中,接着看源码来分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;

public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);

try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
}

public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe) {
return doOnLifecycle(onSubscribe, Functions.EMPTY_ACTION);
}

public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Action onDispose) {
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
ObjectHelper.requireNonNull(onDispose, "onDispose is null");
return RxJavaPlugins.onAssembly(new ObservableDoOnLifecycle<T>(this, onSubscribe, onDispose));
}

同上理,不废话,直接看 ObservableDoOnLifecycle

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final class ObservableDoOnLifecycle<T> extends AbstractObservableWithUpstream<T, T> {
private final Consumer<? super Disposable> onSubscribe;
private final Action onDispose;

public ObservableDoOnLifecycle(Observable<T> upstream, Consumer<? super Disposable> onSubscribe,
Action onDispose) {
super(upstream);
this.onSubscribe = onSubscribe;
this.onDispose = onDispose;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
source.subscribe(new DisposableLambdaObserver<T>(observer, onSubscribe, onDispose));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public final class DisposableLambdaObserver<T> implements Observer<T>, Disposable {
final Observer<? super T> downstream;
final Consumer<? super Disposable> onSubscribe;
//......

public DisposableLambdaObserver(Observer<? super T> actual,
Consumer<? super Disposable> onSubscribe,
Action onDispose) {
this.downstream = actual;
this.onSubscribe = onSubscribe;
this.onDispose = onDispose;
}

@Override
public void onSubscribe(Disposable d) {
//......
onSubscribe.accept(d);
}
}

可以看到,在 onSubscribe() 方法中会调用 onSubscribe.accept() 方法,即 doOnSubscribe() 中的接口方法。

而从 Create 方法中可以看到 .onSubscribe() 方法的调用是在 subscribeActual() 中调用的,而从上文我们知道 subscribeActual() 所执行的线程是由 subscribeOn() 指定的,所以 .onSubscribe() 方法的线程也就执行在了 subscribeOn 指定的线程中了,所以 doOnSubscribe() 方法的线程会受到 subscribeOn() 方法影响

作者

PPTing

发布于

2021-11-24

更新于

2022-02-12

许可协议

评论