Android RxJava3 核心使用场景与实战指南
前言
RxJava3 是响应式编程库 RxJava 的最新版本,在 Android 开发中广泛用于处理异步操作、事件流和数据转换。相比传统的回调方式,RxJava 提供了更简洁的链式调用语法,能够显著减少嵌套回调(Callback Hell),并增强代码的可读性和可维护性。本文将深入探讨 RxJava3 在 Android 项目中的典型使用场景,包括多任务串行/并行处理、轮询机制、UI 交互优化以及内存泄漏防护。
本文探讨了 RxJava3 在 Android 开发中的核心应用场景,涵盖网络请求的串行与并行处理、定时轮询机制、UI 交互效果实现及内存泄漏防护。通过详解 flatMap、zip、repeatWhen 等操作符的实际用法,结合 RxBinding 优化点击与输入事件,提供了完整的代码示例与最佳实践建议,旨在帮助开发者构建高效、稳定的响应式架构。

RxJava3 是响应式编程库 RxJava 的最新版本,在 Android 开发中广泛用于处理异步操作、事件流和数据转换。相比传统的回调方式,RxJava 提供了更简洁的链式调用语法,能够显著减少嵌套回调(Callback Hell),并增强代码的可读性和可维护性。本文将深入探讨 RxJava3 在 Android 项目中的典型使用场景,包括多任务串行/并行处理、轮询机制、UI 交互优化以及内存泄漏防护。
在实际应用中,我们经常需要调用多个接口。根据业务逻辑的不同,这些接口可能涉及串行依赖、并行合并或定时轮询。以下基于 Retrofit 定义的接口进行说明:
public interface ApiServer {
/**
* 获取文章列表
*/
@GET("article/list/1/json")
Observable<BaseResponse<ArticleListResp>> getArticleList();
/**
* 获取热词
*/
@GET("hotkey/json")
Observable<BaseResponse<List<HotKeyResp.DataBean>>> getHotKey();
}
场景描述:当接口二依赖于接口一的成功返回时(例如必须先登录才能获取详情),或者需要在接口一完成后才触发接口二。如果接口一失败,则不应执行接口二。
实现方案:使用 flatMap 操作符可以将前一个 Observable 的结果转换为新的 Observable,从而实现链式调用。这种方式比嵌套订阅更清晰。
// 接口一
Observable<BaseResponse<ArticleListResp>> articleList = ApiManager.getInstance().getApiService().getArticleList();
// 接口二
Observable<BaseResponse<List<HotKeyResp.DataBean>>> hotKey = ApiManager.getInstance().getApiService().getHotKey();
// 使用 flatMap 实现串行依赖
articleList.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(response -> {
// 处理第一个网络请求的结果
if (response != null) {
mTv.setText(response.toString());
}
})
.observeOn(Schedulers.io())
.flatMap(response -> {
// 将第一个请求的结果转换为第二个请求
return hotKey;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<BaseResponse<List<HotKeyResp.DataBean>>>() {
@Override
public void accept(BaseResponse<List<HotKeyResp.DataBean>> listBaseResponse) throws Throwable {
// 处理第二次网络请求的结果
if (listBaseResponse != null) {
mTvTwo.setText(listBaseResponse.toString());
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Throwable {
// 异常处理:如 Dialog 关闭、缺省页展示等
// 注意:如果第一个网络请求异常,整个事件流会中断,不会执行第二个请求
LogUtil.e(throwable.getMessage());
}
});
关键点解析:
subscribeOn(Schedulers.io()):指定上游线程为 IO 线程,适合网络请求。observeOn(AndroidSchedulers.mainThread()):指定下游线程为主线程,适合 UI 更新。flatMap:将上游发射的数据映射为新的 Observable,实现逻辑上的'下一步'。场景描述:当两个接口相互独立,但业务逻辑要求必须等待两个接口都返回数据后才能进行后续处理(例如同时加载用户信息和配置信息)。
实现方案:使用 zip 操作符。zip 会将两个 Observable 发射的数据按顺序配对,直到其中一个完成。如果只需要任意一个完成即可,可使用 merge;如果需要严格配对,使用 zip。
private void zipRequest() {
Observable<BaseResponse<ArticleListResp>> articleList = ApiManager.getInstance().getApiService().getArticleList();
Observable<BaseResponse<List<HotKeyResp.DataBean>>> hotKey = ApiManager.getInstance().getApiService().getHotKey();
Observable.zip(articleList, hotKey, (resp1, resp2) -> {
// 合并规则:接收两个参数,返回组合后的结果
if (resp1 != null && resp2 != null) {
return resp1.toString() + resp2.toString();
}
return "";
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {}
@Override
public void onNext(@NonNull String msg) {
if (!TextUtils.isEmpty(msg)) {
mTv.setText(msg);
}
}
@Override
public void onError(@NonNull Throwable e) {}
@Override
public void onComplete() {}
});
}
关键点解析:
zip 操作符要求两个 Observable 发射的数据数量一致,否则会提前终止。轮询机制常用于实时通知、状态检查等场景。RxJava3 提供了多种操作符来实现灵活的轮询策略。
场景描述:每隔一定时间发起一次请求,直到达到指定次数或收到特定成功标志。每次请求需等待上一次完成。
实现方案:结合 repeatWhen 和 takeUntil。repeatWhen 允许在错误发生时决定是否需要重试,而 takeUntil 用于设定终止条件。
private void timedPolling(int pollingTimes) {
AtomicInteger times = new AtomicInteger();
Observable<BaseResponse<ArticleListResp>> articleList = ApiManager.getInstance().getApiService().getArticleList();
articleList.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Throwable {
// 将重复信号转换为延迟发射的信号
return objectObservable.flatMap(o ->
Observable.just(0).delay(2, TimeUnit.SECONDS)
);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
// 终止条件:达到次数 或 接口返回成功码
.takeUntil(response -> times.incrementAndGet() >= pollingTimes || response.getErrorCode() == 0)
.subscribe(new Observer<BaseResponse<ArticleListResp>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {}
@Override
public void onNext(@NonNull BaseResponse<ArticleListResp> response) {
// 处理轮询结果
}
@Override
public void onError(@NonNull Throwable e) {}
@Override
public void onComplete() {}
});
}
场景描述:无论上一次请求是否成功,都按照固定频率发起新请求。常用于心跳检测或实时数据拉取。
实现方案:使用 intervalRange 或 interval 生成时间序列,再通过 flatMap 转换为网络请求。
private void intervalPolling(int pollTimes) {
Observable<BaseResponse<ArticleListResp>> articleList = ApiManager.getInstance().getApiService().getArticleList();
Observable.intervalRange(0, pollTimes, 0, 2000, TimeUnit.MILLISECONDS)
.flatMap(aLong -> articleList)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<BaseResponse<ArticleListResp>>() {
@Override
public void accept(BaseResponse<ArticleListResp> response) throws Throwable {
// 处理结果
// 如需提前停止,可调用 mDisposable.dispose()
}
});
}
场景描述:持续轮询直到满足特定业务条件(如接口返回 code=0)。
实现方案:使用 takeUntil 监听响应内容,一旦满足条件即自动取消订阅。
Observable<BaseResponse<ArticleListResp>> articleList = ApiManager.getInstance().getApiService().getArticleList();
Disposable mDisposable = Observable.interval(0, 2000, TimeUnit.MILLISECONDS)
.flatMap(aLong -> articleList)
.takeUntil(response -> response.getErrorCode() == 0)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<BaseResponse<ArticleListResp>>() {
@Override
public void accept(BaseResponse<ArticleListResp> response) throws Throwable {
// 处理回调
}
});
注意事项:务必在 Activity 或 Fragment 销毁时(如 onDestroy)调用 mDisposable.dispose(),防止内存泄漏。
RxJava3 结合 RxBinding 可以极大地简化 UI 事件的处理,实现倒计时、打字机效果等常见交互。
利用 intervalRange 生成时间序列,配合 map 转换显示文本。
private void countDown(int countDownSeconds) {
Observable.intervalRange(0, countDownSeconds, 0, 1000, TimeUnit.MILLISECONDS)
.map(aLong -> (countDownSeconds - aLong) + "s 后重新获取")
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
mTv.setEnabled(false);
}
@Override
public void onNext(@NonNull String s) {
mTv.setText(s);
}
@Override
public void onError(@NonNull Throwable e) {
mTv.setEnabled(true);
mTv.setText("获取验证码");
}
@Override
public void onComplete() {
mTv.setText("获取验证码");
mTv.setEnabled(true);
}
});
}
通过控制文本长度逐步绘制,模拟逐字输入的效果。
@RequiresApi(api = Build.VERSION_CODES.M)
public class DaziView extends View {
private TextPaint mTextPaint;
private StaticLayout mStaticLayout;
public DaziView(Context context) {
super(context);
initTextPaint();
}
private void initTextPaint() {
mTextPaint = new TextPaint(Paint.ANTI_ALIAS_FLAG);
mTextPaint.setTextSize(48);
mTextPaint.setColor(Color.parseColor("#000000"));
}
public void drawText(String content) {
if (!TextUtils.isEmpty(content)) {
Observable.intervalRange(0, content.length() + 1, 0, 150, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Throwable {
// 动态改变文本长度
mStaticLayout = StaticLayout.Builder.obtain(
content, 0, aLong.intValue(), mTextPaint, getWidth())
.build();
invalidate();
}
});
}
}
@Override
protected void onDraw(Canvas canvas) {
super.onDraw(canvas);
if (mStaticLayout != null) {
mStaticLayout.draw(canvas);
}
}
}
RxBinding 将 Android View 的事件转换为 Observable,使得 UI 事件处理符合响应式范式。
防止用户快速连续点击导致重复提交。
RxView.clicks(button)
.throttleFirst(1000, TimeUnit.MILLISECONDS)
.subscribe(unit -> {
// 一秒内第一次点击有效
handleButtonClick();
});
对输入流进行节流(debounce)和跳过初始空值处理。
RxTextView.textChanges(editText)
.debounce(1000, TimeUnit.MILLISECONDS) // 1 秒无新输入才发射
.skip(1) // 跳过初始空内容
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(charSequence -> {
// 执行搜索逻辑
performSearch(charSequence.toString());
});
监听多个输入框的状态变化,只有当所有条件满足时才启用按钮。
Observable<CharSequence> observableEdittext = RxTextView.textChanges(editText).skip(1);
Observable<CharSequence> observableEdittextTwo = RxTextView.textChanges(editText_two).skip(1);
Observable.combineLatest(observableEdittext, observableEdittextTwo, (str1, str2) -> {
return !TextUtils.isEmpty(str1) && !TextUtils.isEmpty(str2);
}).subscribe(enabled -> {
button.setEnabled(enabled);
});
RxJava 订阅若未正确管理生命周期,极易导致 Activity 或 Fragment 无法回收,引发内存泄漏。
推荐将所有订阅关系放入 CompositeDisposable,并在销毁时统一清除。
private CompositeDisposable compositeDisposable = new CompositeDisposable();
// 订阅时添加
compositeDisposable.add(RxView.clicks(button)
.subscribe(unit -> handleButtonClick()));
// 销毁时清除
@Override
protected void onDestroy() {
super.onDestroy();
if (!compositeDisposable.isDisposed()) {
compositeDisposable.clear();
}
}
确保在 observeOn 切换回主线程后再访问 UI 控件,且不要在闭包中强引用 Activity。
对于长生命周期的 Observable(如轮询),务必在业务结束或组件销毁时主动调用 dispose()。
RxJava3 为 Android 异步编程提供了强大的工具集。通过合理使用 flatMap、zip、repeatWhen 等操作符,开发者可以构建出逻辑清晰、易于维护的代码结构。同时,结合 RxBinding 和生命周期管理最佳实践,可以有效提升应用性能和稳定性。在实际项目中,建议遵循以下原则:
onError 处理逻辑。CompositeDisposable 或 AutoDispose 等库管理订阅。掌握这些核心场景与技巧,将有助于开发者更高效地应对复杂的业务需求,打造高质量的 Android 应用。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online