RxJava 概述
RxJava 是基于 Java 虚拟机上的响应式扩展库,它通过使用可观察的序列将异步和基于事件的程序组合起来。与此同时,它扩展了观察者模式来支持数据/事件序列,并且添加了操作符,这些操作符允许你声明性地组合序列,同时抽象出要关注的问题:比如低级线程、同步、线程安全和并发数据结构等。
从 RxJava 的官方定义来看,要想真正地理解 RxJava,必须对它以下两个部分进行深入的分析:
- 订阅流程
- 线程切换
当然,RxJava 操作符的源码也是很不错的学习资源,特别是 FlatMap、Zip 等操作符的源码,有很多可以借鉴的地方,但是它们内部的实现比较复杂,限于篇幅,本文重点讲解 RxJava 的订阅流程和线程切换原理。
订阅流程分析
首先给出 RxJava 消息订阅的例子:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext : " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError : " + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
可以看到,这里首先创建了一个被观察者,然后创建一个观察者订阅了这个被观察者。下面分两个部分对 RxJava 的订阅流程进行分析:
1. 创建被观察者过程
上面使用了 Observable 类的 create() 方法创建了一个被观察者,看看里面做了什么。
1.1 Observable#create()
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
在 Observable 的 create() 里面实际上是创建了一个新的 ObservableCreate 对象,同时把我们定义好的 ObservableOnSubscribe 对象传入了 ObservableCreate 对象中,最后调用了 RxJavaPlugins.onAssembly() 方法。
1.2 ObservableCreate
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
}
这里仅仅是把 ObservableOnSubscribe 这个对象保存在 ObservableCreate 中了。然后看看 RxJavaPlugins.onAssembly() 这个方法的处理。
1.3 RxJavaPlugins#onAssembly()
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
...
return source;
}
最终仅仅是把我们的 ObservableCreate 给返回了。
1.4 小结
从以上分析可知,Observable.create() 方法仅仅是先将我们自定义的 ObservableOnSubscribe 对象重新包装成了一个 ObservableCreate 对象。
2. 订阅过程
接着,看看 Observable.subscribe() 的订阅过程是如何实现的。
2.1 Observable#subscribe()
public final void subscribe(Observer<? super T> observer) {
...
observer = RxJavaPlugins.onSubscribe(this,observer);
...
subscribeActual(observer);
...
}
在注释 1 处,在 Observable 的 subscribe() 方法内部首先调用了 RxJavaPlugins 的 onSubscribe() 方法。
2.2 RxJavaPlugins#onSubscribe()
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
...
return observer;
}
除去 hook 应用的逻辑,这里仅仅是将 observer 返回了。接着来分析下注释 2 处的 subscribeActual() 方法。
2.3 Observable#subscribeActual()
protected abstract void subscribeActual(Observer<? super T> observer);
这是一个抽象的方法,很明显,它对应的具体实现类就是我们在第一步创建的 ObservableCreate 类,接下来看到 ObservableCreate 的 subscribeActual() 方法。
2.4 ObservableCreate#subscribeActual()
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
在注释 1 处,首先新创建了一个 CreateEmitter 对象,同时传入了我们自定义的 observer 对象进去。
2.4.1 CreateEmitter
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
...
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
...
}
从上面可以看出,CreateEmitter 通过继承了 Java 并发包中的原子引用类 AtomicReference 保证了事件流切断状态 Dispose 的一致性。并实现了 ObservableEmitter 接口和 Disposable 接口。
接着我们分析下注释 2 处的 observer.onSubscribe(parent),这个 onSubscribe 回调的含义其实就是告诉观察者已经成功订阅了被观察者。再看到注释 3 处的 source.subscribe(parent) 这行代码,这里的 source 其实是 ObservableOnSubscribe 对象。
2.4.2 ObservableOnSubscribe#subscribe()
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
});
这里面使用到了 ObservableEmitter 的 onNext() 方法将事件流发送出去,最后调用了 onComplete() 方法完成了订阅过程。
2.4.3 CreateEmitter#onNext() && CreateEmitter#onComplete()
@Override
public void onNext(T t) {
...
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
在 CreateEmitter 的 onNext 和 onComplete 方法中首先都要经过一个 isDisposed 的判断,作用就是看当前的事件流是否被切断(废弃)掉了,默认是不切断的,如果想要切断,可以调用 Disposable 的 dispose() 方法将此状态设置为切断(废弃)状态。
2.4.4 ObservableEmitter#isDisposed()
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
注意到这里通过 get() 方法首先从 ObservableEmitter 的 AtomicReference 中拿到了保存的 Disposable 状态。然后交给了 DisposableHelper 进行判断处理。
2.4.5 DisposableHelper#isDisposed() && DisposableHelper#set()
public enum DisposableHelper implements Disposable {
DISPOSED;
public static boolean isDisposed(Disposable d) {
return d == DISPOSED;
}
public static boolean set(AtomicReference<Disposable> field, Disposable d) {
for (;;) {
Disposable current = field.get();
if (current == DISPOSED) {
if (d != null) {
d.dispose();
}
return false;
}
if (field.compareAndSet(current, d)) {
if (current != null) {
current.dispose();
}
return true;
}
}
}
...
}
DisposableHelper 是一个枚举类,内部只有一个值即 DISPOSED, 从上面的分析可知它就是用来标记事件流被切断(废弃)状态的。先看到注释 2 处的代码 field.compareAndSet(current, d),这里使用了 原子引用 AtomicReference 内部包装的方法处理了标志 Disposable 的并发读写问题。
2.4.6 错误处理机制
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
...
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
在 onError() 和 onComplete() 内部最终会调用到 dispose() 代码,将事件流进行切断,由此可知,onError() 和 onComplete() 只能调用一个,如果先执行的是 onComplete(),再调用 onError() 的话就会导致异常崩溃。
线程切换机制
首先给出 RxJava 线程切换的例子:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext : " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError : " + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
可以看到,RxJava 的线程切换主要分为 subscribeOn() 和 observeOn() 方法。
1. subscribeOn(Schedulers.io())
在 Schedulers.io() 方法中,我们需要先传入一个 Scheduler 调度类,这里是传入了一个调度到 io 子线程的调度类。
1.1 Schedulers#io()
static final Scheduler IO;
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
static {
...
IO = RxJavaPlugins.initIoScheduler(new IOTask());
}
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
Schedulers.io() 方法其实质就是返回了一个 IOScheduler 对象。
1.2 Observable#subscribeOn()
public final Observable<T> subscribeOn(Scheduler scheduler) {
...
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
在 subscribeOn() 方法里面,又将 ObservableCreate 包装成了一个 ObservableSubscribeOn 对象。
1.3 ObservableSubscribeOn
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
}
在注释 4 处,内部先创建了一个 SubscribeTask 对象,来看看它的实现。
1.4 ObservableSubscribeOn#SubscribeTask
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
SubscribeTask 是 ObservableSubscribeOn 的内部类,它实质上就是一个任务类,在它的 run 方法中会执行到 source.subscribe(parent) 的订阅方法。
1.5 Scheduler#scheduleDirect()
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
这里最后会执行到上面这个 scheduleDirect() 重载方法。首先,在注释 1 处,会调用 createWorker() 方法创建一个工作者对象 Worker。
1.5.1 IOScheduler#createWorker()
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
...
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
}
pool 是一个 CachedWorkerPool 类型的原子引用对象,它的作用就是用于缓存工作者对象 Worker 的。
1.5.2 NewThreadWorker#scheduleActual()
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
在 NewThreadWorker 的 scheduleActual() 方法的内部,在注释 1 处首先会新建一个 ScheduledRunnable 对象,将 Runnable 对象和 parent 包装起来了。这里 parent 是一个 DisposableContainer 对象,它实际的实现类是 CompositeDisposable 类,它是一个保存所有事件流是否被切断状态的容器。
2. 为什么多次执行 subscribeOn(),只有第一次有效?
从上面的分析,我们可以很容易了解到被观察者被订阅时是从最外面的一层(ObservableSubscribeOn)通知到里面的一层(ObservableOnSubscribe),当连续执行了到多次 subscribeOn() 的时候,其实就是先执行倒数最后一次执行的 subscribeOn() 方法,这样肯定会覆盖前面的线程切换。
3. observeOn(AndroidSchedulers.mainThread())
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
....
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
可以看到,observeOn() 方法内部最终也是返回了一个 ObservableObserveOn 对象。
3.1 ObservableObserveOn#subscribeActual()
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
如果不是立即执行调度器,则会在注释 3 处创建一个工作者对象。然后,在注释 4 处创建一个新的 ObserveOnObserver 将 SubscribeOnobserver 对象包装起来。
3.2 ObserveOnObserver
@Override
public void onNext(T t) {
...
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
去除非主线逻辑的代码,在 ObserveOnObserver 的 onNext() 和 onError()、onComplete() 方法中最后都会调用到 schedule() 方法。接着看 schedule() 方法,其中 onNext() 还会把消息存放到队列中。
3.3 ObserveOnObserver#schedule()
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
这里使用了 worker 进行调度 ObserveOnObserver 这个实现了 Runnable 的任务。worker 就是在 AndroidSchedulers.mainThread() 中创建的,内部其实就是 使用 Handler 进行线程切换的。
3.4 ObserveOnObserver#run()
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
在注释 1 处会先判断 outputFused 这个标志位,它表示事件流是否被融化掉,默认是 false,所以,最后会执行到 drainNormal() 方法。
3.5 ObserveOnObserver#drainNormal()
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
v = q.poll();
a.onNext(v);
}
在注释 1 处,这里的 downstream 实际上是从外面传进来的 SubscribeOnObserver 对象。在注释 2 处将队列中的消息取出来,接着在注释 3 处调用了 SubscribeOnObserver 的 onNext 方法。最终,会从我们包装类的最外层一直调用到最里面的我们自定义的 Observer 中的 onNext() 方法,所以,在 observeOn() 方法下面的链式代码都会执行到它所指定的线程中。
注意事项与最佳实践
在实际开发中,除了理解核心流程外,还需要注意以下几点:
- 内存泄漏风险:RxJava 的订阅关系需要手动管理生命周期。在 Android 中,建议在 Activity 或 Fragment 销毁时调用
Disposable 的 dispose() 方法,或者使用 CompositeDisposable 统一管理,防止因未取消订阅导致的内存泄漏。
- 背压处理:虽然 RxJava 3 引入了更完善的背压机制,但在生产环境中,对于产生速度远快于消费速度的数据源,应配合
onBackpressureBuffer 或 onBackpressureDrop 等操作符使用,避免 OOM。
- 异常捕获:RxJava 的异常处理机制较为特殊,如果在主线程抛出未捕获的异常可能导致应用崩溃。建议在全局使用
RxJavaPlugins.setErrorHandler 设置全局错误处理器,或在关键节点使用 doOnError 和 onErrorResumeNext 进行容错处理。
- 调试技巧:利用
doOnSubscribe、doOnNext、doOnTerminate 等操作符可以在不改变业务逻辑的前提下插入日志打印,方便追踪数据流向和线程变化。
总结
RxJava 作为响应式编程的核心库,其设计思想深刻影响了现代 Android 开发。通过深入分析订阅流程,我们理解了 Observable 如何通过 Observer 传递数据,以及 Disposable 如何保障资源释放的安全性。而线程切换机制则展示了 RxJava 如何利用 Scheduler 和 Worker 模型优雅地解决多线程协作问题。掌握这些底层原理,有助于开发者编写更高效、更健壮的异步代码。