React编程入门示例:RxJava深度解析

文章目录

4.1 RxJava示例
RxJava是ReactiveX在Java虚拟机上的实现,它使用可观察序列来构建异步和基于事件的程序。RxJava提供了丰富的操作符来处理异步数据流,使开发者能够以声明式的方式组合异步操作。

4.1.1 创建Observable
Observable是RxJava中的基本构建块,代表一个可观察的数据源,能够发射0到N个数据项,然后可能以一个完成或错误通知终止。
基本创建方式
Observable.never():创建一个不发射任何数据也不终止的Observable
Observable<String> observable =Observable.never();Observable.error():创建一个不发射任何数据但以错误终止的Observable
Observable<String> observable =Observable.error(newRuntimeException("Error occurred"));Observable.empty():创建一个不发射任何数据但正常终止的Observable
Observable<String> observable =Observable.empty();Observable.create():自定义Observable创建
Observable<String> observable =Observable.create(emitter ->{ emitter.onNext("First"); emitter.onNext("Second"); emitter.onComplete();});Observable.interval():创建一个按固定时间间隔发射整数序列的Observable
Observable<Long> observable =Observable.interval(1,TimeUnit.SECONDS);// 0, 1, 2... 每秒Observable.range():创建一个发射特定整数序列的Observable
Observable<Integer> observable =Observable.range(1,5);// 1, 2, 3, 4, 5Observable.fromIterable():从集合创建Observable
List<String> list =Arrays.asList("Apple","Banana","Cherry");Observable<String> observable =Observable.fromIterable(list);Observable.just():创建一个发射指定值的Observable
Observable<String> observable =Observable.just("Hello","World");高级创建方式

Observable.zip():组合多个Observable
Observable<String> zipped =Observable.zip(first, second,(f, s)-> f + s);Observable.concat():顺序连接多个Observable
Observable<String> concatenated =Observable.concat(first, second);Observable.merge():合并多个Observable
Observable<String> first =Observable.just("A","B","C");Observable<String> second =Observable.just("1","2","3");Observable<String> merged =Observable.merge(first, second);Observable.generate():同步生成复杂流
Observable<Integer> observable =Observable.generate(()->0,// 初始状态(state, emitter)->{ emitter.onNext(state);if(state ==10){ emitter.onComplete();}return state +1;});Observable.fromFuture():从Future创建
Future<String> future =Executors.newSingleThreadExecutor().submit(()->"Result from Future");Observable<String> observable =Observable.fromFuture(future);Observable.fromCallable():从Callable创建,适合可能有异常抛出的场景
Observable<String> observable =Observable.fromCallable(()->{if(Math.random()>0.5){thrownewIOException("Random error");}return"Success";});Observable.defer():延迟创建,直到有观察者订阅
Observable<Long> observable =Observable.defer(()->Observable.just(System.currentTimeMillis()));特殊用途Observable

- Subject:既是Observable又是Observer
ConnectableObservable:需要调用connect()才开始发射数据
ConnectableObservable<String> connectable =Observable.just("A","B","C").publish(); connectable.subscribe(System.out::println); connectable.connect();// 此时才开始发射数据AsyncSubject:只在完成时发送最后一个事件
AsyncSubject<String> subject =AsyncSubject.create(); subject.onNext("Hello"); subject.onNext("World"); subject.subscribe(System.out::println);// 不会立即收到 subject.onComplete();// 会打印"World"ReplaySubject:向所有订阅者重放所有事件
ReplaySubject<String> subject =ReplaySubject.create(); subject.onNext("Hello"); subject.subscribe(System.out::println);// 会收到"Hello"BehaviorSubject:向新订阅者发送最近的一个事件
BehaviorSubject<String> subject =BehaviorSubject.createDefault("Initial"); subject.onNext("First"); subject.subscribe(System.out::println);// 立即收到"First"PublishSubject:向所有订阅者广播所有后续事件
PublishSubject<String> subject =PublishSubject.create(); subject.onNext("Hello"); subject.subscribe(System.out::println);// 不会收到"Hello" subject.onNext("World");// 会打印"World"4.1.2 订阅Observer

Observer是RxJava中的消费者,用于接收Observable发射的数据和通知。
基本订阅方式
使用Disposable控制订阅
Disposable disposable =Observable.interval(1,TimeUnit.SECONDS).subscribe(System.out::println);// 在需要时取消订阅 disposable.dispose();使用Observer接口
Observer<String> observer =newObserver<String>(){@OverridepublicvoidonSubscribe(Disposable d){System.out.println("Subscribed");}@OverridepublicvoidonNext(String s){System.out.println("Received: "+ s);}@OverridepublicvoidonError(Throwable e){System.err.println("Error: "+ e);}@OverridepublicvoidonComplete(){System.out.println("Completed");}};Observable.just("Hello","World").subscribe(observer);带Consumer的订阅
Observable.just("Hello").subscribe( value ->System.out.println("Received: "+ value),// onNext error ->System.err.println("Error: "+ error),// onError()->System.out.println("Completed")// onComplete);简单订阅
Observable.just("Hello").subscribe();背压处理

RxJava 2.x引入了Flowable来处理背压(Backpressure),当数据生产速度大于消费速度时:
- BUFFER:缓冲所有数据(可能导致OOM)
- DROP:丢弃无法处理的数据
- LATEST:只保留最新数据
- ERROR:抛出MissingBackpressureException
- MISSING:不实现背压,由下游处理
背压策略
Flowable.interval(1,TimeUnit.MILLISECONDS).onBackpressureDrop(dropped ->System.out.println("Dropped: "+ dropped)).observeOn(Schedulers.computation()).subscribe(System.out::println);Flowable基本使用
Flowable.range(1,1000).onBackpressureBuffer()// 缓冲策略.observeOn(Schedulers.io()).subscribe(System.out::println);调度器控制
RxJava使用Schedulers控制线程:
- 常用调度器
- Schedulers.io():适合I/O操作(无界线程池)
- Schedulers.computation():适合计算操作(固定大小线程池)
- Schedulers.newThread():为每个任务创建新线程
- Schedulers.single():单一线程顺序执行
- Schedulers.trampoline():在当前线程排队执行
- Schedulers.from(Executor):自定义Executor
observeOn:指定Observer接收数据的线程
Observable.range(1,5).observeOn(Schedulers.computation())// 在计算线程接收.subscribe(System.out::println);subscribeOn:指定Observable操作执行的线程
Observable.just("Hello").subscribeOn(Schedulers.io())// 在IO线程执行.subscribe(System.out::println);组合订阅

资源管理
Observable.using(()->newFileInputStream("file.txt"),// 创建资源 inputStream ->Observable.just(readFile(inputStream)),// 使用资源 inputStream -> inputStream.close()// 释放资源).subscribe(System.out::println);条件订阅
Observable<String> source =Observable.just("Hello","World","Error"); source.flatMap(s ->{if("Error".equals(s)){returnObservable.error(newRuntimeException("Error encountered"));}returnObservable.just(s.toUpperCase());}).subscribe(System.out::println,System.err::println);合并多个订阅
Observable<String> first =Observable.just("A","B","C");Observable<String> second =Observable.just("1","2","3");Observable.merge(first, second).subscribe(System.out::println);4.1.3 使用操作符
RxJava提供了数百个操作符来处理Observable流,下面介绍最常用的几类操作符。
转换操作符
groupBy:按条件分组
Observable.just("Apple","Banana","Cherry","Date").groupBy(s -> s.length()).flatMapSingle(group -> group.toList().map(list -> group.getKey()+": "+ list)).subscribe(System.out::println);scan:累加器函数
Observable.range(1,5).scan((sum, item)-> sum + item).subscribe(System.out::println);// 1, 3, 6, 10, 15cast:强制类型转换
Observable<Object> objObs =Observable.just("Hello");Observable<String> strObs = objObs.cast(String.class);switchMap:只保留最新的Observable
Observable.just("Hello","World").switchMap(s ->Observable.interval(100,TimeUnit.MILLISECONDS).map(i -> s +" "+ i).take(5)).subscribe(System.out::println);concatMap:类似flatMap但保持顺序
Observable.just("Hello","World").concatMap(s ->Observable.fromArray(s.split(""))).subscribe(System.out::println);flatMap:将每个元素转换为Observable并合并
Observable.just("Hello","World").flatMap(s ->Observable.fromArray(s.split(""))).subscribe(System.out::println);map:对每个元素应用函数
Observable.just("Hello","World").map(String::toUpperCase).subscribe(System.out::println);过滤操作符

debounce/throttleWithTimeout:防抖动
Observable.create(emitter ->{ emitter.onNext("H");Thread.sleep(100); emitter.onNext("He");Thread.sleep(200); emitter.onNext("Hel");Thread.sleep(300); emitter.onNext("Hell");Thread.sleep(400); emitter.onNext("Hello");}).debounce(350,TimeUnit.MILLISECONDS).subscribe(System.out::println);// 只输出"Hello"sample/throttleLast:定期采样
Observable.interval(100,TimeUnit.MILLISECONDS).sample(1,TimeUnit.SECONDS).subscribe(System.out::println);// 大约每秒一个数elementAt:取指定位置的元素
Observable.range(1,10).elementAt(5)// 索引从0开始.subscribe(System.out::println);// 6first/last:取第一个/最后一个元素
Observable.range(1,10).first(0)// 默认值.subscribe(System.out::println);// 1distinctUntilChanged:过滤连续重复
Observable.just(1,1,2,2,3,1,1,4).distinctUntilChanged().subscribe(System.out::println);// 1, 2, 3, 1, 4distinct:去重
Observable.just(1,2,2,3,1,4).distinct().subscribe(System.out::println);// 1, 2, 3, 4skip:跳过前N个元素
Observable.range(1,10).skip(5).subscribe(System.out::println);// 6, 7, 8, 9, 10take:取前N个元素
Observable.interval(1,TimeUnit.SECONDS).take(5).subscribe(System.out::println);// 0, 1, 2, 3, 4filter:基于条件过滤
Observable.range(1,10).filter(i -> i %2==0).subscribe(System.out::println);// 2, 4, 6, 8, 10组合操作符

switchOnNext:切换Observable流
Observable<Observable<String>> observables =Observable.just(Observable.interval(100,TimeUnit.MILLISECONDS).map(i ->"A"+ i),Observable.interval(150,TimeUnit.MILLISECONDS).map(i ->"B"+ i));Observable.switchOnNext(observables.delay(500,TimeUnit.MILLISECONDS)).subscribe(System.out::println);startWith:在Observable开始前插入数据
Observable.just("World").startWith("Hello").subscribe(System.out::println);// Hello, WorldwithLatestFrom:类似combineLatest但由主Observable触发
letters.withLatestFrom(numbers,(l, n)-> l + n).subscribe(System.out::println);combineLatest:当任一Observable发射时组合最新值
Observable<String> letters =Observable.interval(1,TimeUnit.SECONDS).map(i ->"Letter"+(char)(i +65));Observable<Integer> numbers =Observable.interval(750,TimeUnit.MILLISECONDS).map(i -> i +1);Observable.combineLatest(letters, numbers,(l, n)-> l + n).subscribe(System.out::println);zip:组合多个Observable
Observable<String> letters =Observable.just("A","B","C");Observable<Integer> numbers =Observable.just(1,2,3);Observable.zip(letters, numbers,(l, n)-> l + n).subscribe(System.out::println);// A1, B2, C3concat:顺序连接多个Observable
Observable.concat(Observable.just("First","Second"),Observable.just("Third","Fourth")).subscribe(System.out::println);// First, Second, Third, Fourthmerge:合并多个Observable
Observable<String> first =Observable.interval(1,TimeUnit.SECONDS).map(i ->"First: "+ i);Observable<String> second =Observable.interval(750,TimeUnit.MILLISECONDS).map(i ->"Second: "+ i);Observable.merge(first, second).subscribe(System.out::println);错误处理操作符

retryWhen:条件重试
Observable.error(newRuntimeException("Error")).retryWhen(errors -> errors.zipWith(Observable.range(1,3),(e, i)-> i).flatMap(i ->{System.out.println("Retry #"+ i);returnObservable.timer(i,TimeUnit.SECONDS);})).subscribe(System.out::println,System.err::println);retry:重试
Observable.create(emitter ->{System.out.println("Attempting"); emitter.onError(newRuntimeException("Failed"));}).retry(3).subscribe(System.out::println,System.err::println);onErrorResumeNext:出错时切换到另一个Observable
Observable.error(newRuntimeException("Error")).onErrorResumeNext(Observable.just("A","B","C")).subscribe(System.out::println);// A, B, ConErrorReturn:出错时返回默认值
Observable.error(newRuntimeException("Error")).onErrorReturn(e ->"Default").subscribe(System.out::println);// Default条件操作符
sequenceEqual:比较两个Observable序列
Observable.sequenceEqual(Observable.just(1,2,3),Observable.just(1,2,3)).subscribe(System.out::println);// truedefaultIfEmpty:如果为空提供默认值
Observable.empty().defaultIfEmpty("Default").subscribe(System.out::println);// DefaultisEmpty:是否为空
Observable.empty().isEmpty().subscribe(System.out::println);// truecontains:是否包含指定元素
Observable.just("A","B","C").contains("B").subscribe(System.out::println);// trueall:是否所有元素满足条件
Observable.range(1,5).all(i -> i <10).subscribe(System.out::println);// true数学和聚合操作符

sum/average/max/min:数学运算
Observable.range(1,5).map(Integer::doubleValue).average().subscribe(System.out::println);// 3.0toList/toMap/toSet:转换为集合
Observable.just("A","B","A").toSet().subscribe(System.out::println);// [A, B]collect:收集到容器
Observable.range(1,5).collect(ArrayList::new,List::add).subscribe(System.out::println);// [1, 2, 3, 4, 5]reduce:累积计算
Observable.range(1,5).reduce((sum, i)-> sum + i).subscribe(System.out::println);// 15count:计数
Observable.range(1,10).count().subscribe(System.out::println);// 10实用操作符

replay:重放给后续订阅者
ConnectableObservable<Long> replay =Observable.interval(1,TimeUnit.SECONDS).take(5).replay(); replay.connect();// 开始发射Thread.sleep(3000); replay.subscribe(System.out::println);// 从开始重放cache:缓存发射的数据
Observable<Long> cached =Observable.interval(1,TimeUnit.SECONDS).take(5).cache(); cached.subscribe(System.out::println);// 开始发射Thread.sleep(3000); cached.subscribe(System.out::println);// 从缓存中获取timeInterval/timestamp:添加时间信息
Observable.interval(1,TimeUnit.SECONDS).take(3).timeInterval().subscribe(ti ->System.out.println("Value: "+ ti.value()+", Interval: "+ ti.time()+"ms"));materialize/dematerialize:将通知转换为对象
Observable.just("Hello").materialize().subscribe(notification ->{if(notification.isOnNext()){System.out.println("Value: "+ notification.getValue());}elseif(notification.isOnComplete()){System.out.println("Completed");}});doOnNext/doOnError/doOnComplete:副作用操作
Observable.just("Hello").doOnNext(s ->System.out.println("About to emit: "+ s)).doOnComplete(()->System.out.println("Completed")).subscribe();实际应用示例

事件总线
publicclassRxEventBus{privatefinalPublishSubject<Object> subject =PublishSubject.create();publicvoidpost(Object event){ subject.onNext(event);}public<T>Observable<T>observe(Class<T> eventClass){return subject.ofType(eventClass);}}批量处理
Observable.fromIterable(hugeList).buffer(100)// 每100个一批.flatMap(batch ->processBatch(batch).subscribeOn(Schedulers.io())).subscribe(result ->aggregateResult(result));轮询检查
Observable.interval(5,TimeUnit.SECONDS).flatMap(i ->checkStatus().retryWhen(errors -> errors.delay(1,TimeUnit.SECONDS))).distinctUntilChanged().subscribe(status ->updateStatus(status));搜索建议
RxTextView.textChanges(searchInput).debounce(300,TimeUnit.MILLISECONDS).filter(text -> text.length()>2).distinctUntilChanged().switchMap(query -> searchService.suggest(query.toString()).onErrorResumeNext(Observable.empty())).observeOn(AndroidSchedulers.mainThread()).subscribe(suggestions ->updateSuggestions(suggestions));网络请求组合
Observable<Profile> profileObservable = userService.getProfile(userId);Observable<List<Friend>> friendsObservable = userService.getFriends(userId);Observable.zip(profileObservable, friendsObservable,(profile, friends)->newUserData(profile, friends)).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(userData ->updateUI(userData), error ->showError(error));RxJava的操作符非常丰富,掌握这些操作符能够帮助开发者高效处理各种异步数据流场景。实际开发中,应根据具体需求选择合适的操作符组合,同时注意线程调度和资源管理,以构建高效可靠的响应式应用。