React编程入门示例:RxJava深度解析

React编程入门示例:RxJava深度解析
在这里插入图片描述

文章目录

在这里插入图片描述

4.1 RxJava示例

RxJava是ReactiveX在Java虚拟机上的实现,它使用可观察序列来构建异步和基于事件的程序。RxJava提供了丰富的操作符来处理异步数据流,使开发者能够以声明式的方式组合异步操作。

在这里插入图片描述

4.1.1 创建Observable

Observable是RxJava中的基本构建块,代表一个可观察的数据源,能够发射0到N个数据项,然后可能以一个完成或错误通知终止。

基本创建方式

Observable.never():创建一个不发射任何数据也不终止的Observable

Observable<String> observable =Observable.never();

Observable.error():创建一个不发射任何数据但以错误终止的Observable

Observable<String> observable =Observable.error(newRuntimeException("Error occurred"));

Observable.empty():创建一个不发射任何数据但正常终止的Observable

Observable<String> observable =Observable.empty();

Observable.create():自定义Observable创建

Observable<String> observable =Observable.create(emitter ->{ emitter.onNext("First"); emitter.onNext("Second"); emitter.onComplete();});

Observable.interval():创建一个按固定时间间隔发射整数序列的Observable

Observable<Long> observable =Observable.interval(1,TimeUnit.SECONDS);// 0, 1, 2... 每秒

Observable.range():创建一个发射特定整数序列的Observable

Observable<Integer> observable =Observable.range(1,5);// 1, 2, 3, 4, 5

Observable.fromIterable():从集合创建Observable

List<String> list =Arrays.asList("Apple","Banana","Cherry");Observable<String> observable =Observable.fromIterable(list);

Observable.just():创建一个发射指定值的Observable

Observable<String> observable =Observable.just("Hello","World");
高级创建方式
在这里插入图片描述

Observable.zip():组合多个Observable

Observable<String> zipped =Observable.zip(first, second,(f, s)-> f + s);

Observable.concat():顺序连接多个Observable

Observable<String> concatenated =Observable.concat(first, second);

Observable.merge():合并多个Observable

Observable<String> first =Observable.just("A","B","C");Observable<String> second =Observable.just("1","2","3");Observable<String> merged =Observable.merge(first, second);

Observable.generate():同步生成复杂流

Observable<Integer> observable =Observable.generate(()->0,// 初始状态(state, emitter)->{ emitter.onNext(state);if(state ==10){ emitter.onComplete();}return state +1;});

Observable.fromFuture():从Future创建

Future<String> future =Executors.newSingleThreadExecutor().submit(()->"Result from Future");Observable<String> observable =Observable.fromFuture(future);

Observable.fromCallable():从Callable创建,适合可能有异常抛出的场景

Observable<String> observable =Observable.fromCallable(()->{if(Math.random()>0.5){thrownewIOException("Random error");}return"Success";});

Observable.defer():延迟创建,直到有观察者订阅

Observable<Long> observable =Observable.defer(()->Observable.just(System.currentTimeMillis()));
特殊用途Observable
在这里插入图片描述
  1. Subject:既是Observable又是Observer

ConnectableObservable:需要调用connect()才开始发射数据

ConnectableObservable<String> connectable =Observable.just("A","B","C").publish(); connectable.subscribe(System.out::println); connectable.connect();// 此时才开始发射数据

AsyncSubject:只在完成时发送最后一个事件

AsyncSubject<String> subject =AsyncSubject.create(); subject.onNext("Hello"); subject.onNext("World"); subject.subscribe(System.out::println);// 不会立即收到 subject.onComplete();// 会打印"World"

ReplaySubject:向所有订阅者重放所有事件

ReplaySubject<String> subject =ReplaySubject.create(); subject.onNext("Hello"); subject.subscribe(System.out::println);// 会收到"Hello"

BehaviorSubject:向新订阅者发送最近的一个事件

BehaviorSubject<String> subject =BehaviorSubject.createDefault("Initial"); subject.onNext("First"); subject.subscribe(System.out::println);// 立即收到"First"

PublishSubject:向所有订阅者广播所有后续事件

PublishSubject<String> subject =PublishSubject.create(); subject.onNext("Hello"); subject.subscribe(System.out::println);// 不会收到"Hello" subject.onNext("World");// 会打印"World"

4.1.2 订阅Observer

在这里插入图片描述


Observer是RxJava中的消费者,用于接收Observable发射的数据和通知。

基本订阅方式

使用Disposable控制订阅

Disposable disposable =Observable.interval(1,TimeUnit.SECONDS).subscribe(System.out::println);// 在需要时取消订阅 disposable.dispose();

使用Observer接口

Observer<String> observer =newObserver<String>(){@OverridepublicvoidonSubscribe(Disposable d){System.out.println("Subscribed");}@OverridepublicvoidonNext(String s){System.out.println("Received: "+ s);}@OverridepublicvoidonError(Throwable e){System.err.println("Error: "+ e);}@OverridepublicvoidonComplete(){System.out.println("Completed");}};Observable.just("Hello","World").subscribe(observer);

带Consumer的订阅

Observable.just("Hello").subscribe( value ->System.out.println("Received: "+ value),// onNext error ->System.err.println("Error: "+ error),// onError()->System.out.println("Completed")// onComplete);

简单订阅

Observable.just("Hello").subscribe();
背压处理
在这里插入图片描述


RxJava 2.x引入了Flowable来处理背压(Backpressure),当数据生产速度大于消费速度时:

    • BUFFER:缓冲所有数据(可能导致OOM)
    • DROP:丢弃无法处理的数据
    • LATEST:只保留最新数据
    • ERROR:抛出MissingBackpressureException
    • MISSING:不实现背压,由下游处理

背压策略

Flowable.interval(1,TimeUnit.MILLISECONDS).onBackpressureDrop(dropped ->System.out.println("Dropped: "+ dropped)).observeOn(Schedulers.computation()).subscribe(System.out::println);

Flowable基本使用

Flowable.range(1,1000).onBackpressureBuffer()// 缓冲策略.observeOn(Schedulers.io()).subscribe(System.out::println);
调度器控制

RxJava使用Schedulers控制线程:

  1. 常用调度器
    • Schedulers.io():适合I/O操作(无界线程池)
    • Schedulers.computation():适合计算操作(固定大小线程池)
    • Schedulers.newThread():为每个任务创建新线程
    • Schedulers.single():单一线程顺序执行
    • Schedulers.trampoline():在当前线程排队执行
    • Schedulers.from(Executor):自定义Executor

observeOn:指定Observer接收数据的线程

Observable.range(1,5).observeOn(Schedulers.computation())// 在计算线程接收.subscribe(System.out::println);

subscribeOn:指定Observable操作执行的线程

Observable.just("Hello").subscribeOn(Schedulers.io())// 在IO线程执行.subscribe(System.out::println);
组合订阅
在这里插入图片描述

资源管理

Observable.using(()->newFileInputStream("file.txt"),// 创建资源 inputStream ->Observable.just(readFile(inputStream)),// 使用资源 inputStream -> inputStream.close()// 释放资源).subscribe(System.out::println);

条件订阅

Observable<String> source =Observable.just("Hello","World","Error"); source.flatMap(s ->{if("Error".equals(s)){returnObservable.error(newRuntimeException("Error encountered"));}returnObservable.just(s.toUpperCase());}).subscribe(System.out::println,System.err::println);

合并多个订阅

Observable<String> first =Observable.just("A","B","C");Observable<String> second =Observable.just("1","2","3");Observable.merge(first, second).subscribe(System.out::println);

4.1.3 使用操作符

RxJava提供了数百个操作符来处理Observable流,下面介绍最常用的几类操作符。

转换操作符

groupBy:按条件分组

Observable.just("Apple","Banana","Cherry","Date").groupBy(s -> s.length()).flatMapSingle(group -> group.toList().map(list -> group.getKey()+": "+ list)).subscribe(System.out::println);

scan:累加器函数

Observable.range(1,5).scan((sum, item)-> sum + item).subscribe(System.out::println);// 1, 3, 6, 10, 15

cast:强制类型转换

Observable<Object> objObs =Observable.just("Hello");Observable<String> strObs = objObs.cast(String.class);

switchMap:只保留最新的Observable

Observable.just("Hello","World").switchMap(s ->Observable.interval(100,TimeUnit.MILLISECONDS).map(i -> s +" "+ i).take(5)).subscribe(System.out::println);

concatMap:类似flatMap但保持顺序

Observable.just("Hello","World").concatMap(s ->Observable.fromArray(s.split(""))).subscribe(System.out::println);

flatMap:将每个元素转换为Observable并合并

Observable.just("Hello","World").flatMap(s ->Observable.fromArray(s.split(""))).subscribe(System.out::println);

map:对每个元素应用函数

Observable.just("Hello","World").map(String::toUpperCase).subscribe(System.out::println);
过滤操作符
在这里插入图片描述

debounce/throttleWithTimeout:防抖动

Observable.create(emitter ->{ emitter.onNext("H");Thread.sleep(100); emitter.onNext("He");Thread.sleep(200); emitter.onNext("Hel");Thread.sleep(300); emitter.onNext("Hell");Thread.sleep(400); emitter.onNext("Hello");}).debounce(350,TimeUnit.MILLISECONDS).subscribe(System.out::println);// 只输出"Hello"

sample/throttleLast:定期采样

Observable.interval(100,TimeUnit.MILLISECONDS).sample(1,TimeUnit.SECONDS).subscribe(System.out::println);// 大约每秒一个数

elementAt:取指定位置的元素

Observable.range(1,10).elementAt(5)// 索引从0开始.subscribe(System.out::println);// 6

first/last:取第一个/最后一个元素

Observable.range(1,10).first(0)// 默认值.subscribe(System.out::println);// 1

distinctUntilChanged:过滤连续重复

Observable.just(1,1,2,2,3,1,1,4).distinctUntilChanged().subscribe(System.out::println);// 1, 2, 3, 1, 4

distinct:去重

Observable.just(1,2,2,3,1,4).distinct().subscribe(System.out::println);// 1, 2, 3, 4

skip:跳过前N个元素

Observable.range(1,10).skip(5).subscribe(System.out::println);// 6, 7, 8, 9, 10

take:取前N个元素

Observable.interval(1,TimeUnit.SECONDS).take(5).subscribe(System.out::println);// 0, 1, 2, 3, 4

filter:基于条件过滤

Observable.range(1,10).filter(i -> i %2==0).subscribe(System.out::println);// 2, 4, 6, 8, 10
组合操作符
在这里插入图片描述

switchOnNext:切换Observable流

Observable<Observable<String>> observables =Observable.just(Observable.interval(100,TimeUnit.MILLISECONDS).map(i ->"A"+ i),Observable.interval(150,TimeUnit.MILLISECONDS).map(i ->"B"+ i));Observable.switchOnNext(observables.delay(500,TimeUnit.MILLISECONDS)).subscribe(System.out::println);

startWith:在Observable开始前插入数据

Observable.just("World").startWith("Hello").subscribe(System.out::println);// Hello, World

withLatestFrom:类似combineLatest但由主Observable触发

letters.withLatestFrom(numbers,(l, n)-> l + n).subscribe(System.out::println);

combineLatest:当任一Observable发射时组合最新值

Observable<String> letters =Observable.interval(1,TimeUnit.SECONDS).map(i ->"Letter"+(char)(i +65));Observable<Integer> numbers =Observable.interval(750,TimeUnit.MILLISECONDS).map(i -> i +1);Observable.combineLatest(letters, numbers,(l, n)-> l + n).subscribe(System.out::println);

zip:组合多个Observable

Observable<String> letters =Observable.just("A","B","C");Observable<Integer> numbers =Observable.just(1,2,3);Observable.zip(letters, numbers,(l, n)-> l + n).subscribe(System.out::println);// A1, B2, C3

concat:顺序连接多个Observable

Observable.concat(Observable.just("First","Second"),Observable.just("Third","Fourth")).subscribe(System.out::println);// First, Second, Third, Fourth

merge:合并多个Observable

Observable<String> first =Observable.interval(1,TimeUnit.SECONDS).map(i ->"First: "+ i);Observable<String> second =Observable.interval(750,TimeUnit.MILLISECONDS).map(i ->"Second: "+ i);Observable.merge(first, second).subscribe(System.out::println);
错误处理操作符
在这里插入图片描述

retryWhen:条件重试

Observable.error(newRuntimeException("Error")).retryWhen(errors -> errors.zipWith(Observable.range(1,3),(e, i)-> i).flatMap(i ->{System.out.println("Retry #"+ i);returnObservable.timer(i,TimeUnit.SECONDS);})).subscribe(System.out::println,System.err::println);

retry:重试

Observable.create(emitter ->{System.out.println("Attempting"); emitter.onError(newRuntimeException("Failed"));}).retry(3).subscribe(System.out::println,System.err::println);

onErrorResumeNext:出错时切换到另一个Observable

Observable.error(newRuntimeException("Error")).onErrorResumeNext(Observable.just("A","B","C")).subscribe(System.out::println);// A, B, C

onErrorReturn:出错时返回默认值

Observable.error(newRuntimeException("Error")).onErrorReturn(e ->"Default").subscribe(System.out::println);// Default
条件操作符

sequenceEqual:比较两个Observable序列

Observable.sequenceEqual(Observable.just(1,2,3),Observable.just(1,2,3)).subscribe(System.out::println);// true

defaultIfEmpty:如果为空提供默认值

Observable.empty().defaultIfEmpty("Default").subscribe(System.out::println);// Default

isEmpty:是否为空

Observable.empty().isEmpty().subscribe(System.out::println);// true

contains:是否包含指定元素

Observable.just("A","B","C").contains("B").subscribe(System.out::println);// true

all:是否所有元素满足条件

Observable.range(1,5).all(i -> i <10).subscribe(System.out::println);// true
数学和聚合操作符
在这里插入图片描述

sum/average/max/min:数学运算

Observable.range(1,5).map(Integer::doubleValue).average().subscribe(System.out::println);// 3.0

toList/toMap/toSet:转换为集合

Observable.just("A","B","A").toSet().subscribe(System.out::println);// [A, B]

collect:收集到容器

Observable.range(1,5).collect(ArrayList::new,List::add).subscribe(System.out::println);// [1, 2, 3, 4, 5]

reduce:累积计算

Observable.range(1,5).reduce((sum, i)-> sum + i).subscribe(System.out::println);// 15

count:计数

Observable.range(1,10).count().subscribe(System.out::println);// 10
实用操作符
在这里插入图片描述

replay:重放给后续订阅者

ConnectableObservable<Long> replay =Observable.interval(1,TimeUnit.SECONDS).take(5).replay(); replay.connect();// 开始发射Thread.sleep(3000); replay.subscribe(System.out::println);// 从开始重放

cache:缓存发射的数据

Observable<Long> cached =Observable.interval(1,TimeUnit.SECONDS).take(5).cache(); cached.subscribe(System.out::println);// 开始发射Thread.sleep(3000); cached.subscribe(System.out::println);// 从缓存中获取

timeInterval/timestamp:添加时间信息

Observable.interval(1,TimeUnit.SECONDS).take(3).timeInterval().subscribe(ti ->System.out.println("Value: "+ ti.value()+", Interval: "+ ti.time()+"ms"));

materialize/dematerialize:将通知转换为对象

Observable.just("Hello").materialize().subscribe(notification ->{if(notification.isOnNext()){System.out.println("Value: "+ notification.getValue());}elseif(notification.isOnComplete()){System.out.println("Completed");}});

doOnNext/doOnError/doOnComplete:副作用操作

Observable.just("Hello").doOnNext(s ->System.out.println("About to emit: "+ s)).doOnComplete(()->System.out.println("Completed")).subscribe();
实际应用示例
在这里插入图片描述

事件总线

publicclassRxEventBus{privatefinalPublishSubject<Object> subject =PublishSubject.create();publicvoidpost(Object event){ subject.onNext(event);}public<T>Observable<T>observe(Class<T> eventClass){return subject.ofType(eventClass);}}

批量处理

Observable.fromIterable(hugeList).buffer(100)// 每100个一批.flatMap(batch ->processBatch(batch).subscribeOn(Schedulers.io())).subscribe(result ->aggregateResult(result));

轮询检查

Observable.interval(5,TimeUnit.SECONDS).flatMap(i ->checkStatus().retryWhen(errors -> errors.delay(1,TimeUnit.SECONDS))).distinctUntilChanged().subscribe(status ->updateStatus(status));

搜索建议

RxTextView.textChanges(searchInput).debounce(300,TimeUnit.MILLISECONDS).filter(text -> text.length()>2).distinctUntilChanged().switchMap(query -> searchService.suggest(query.toString()).onErrorResumeNext(Observable.empty())).observeOn(AndroidSchedulers.mainThread()).subscribe(suggestions ->updateSuggestions(suggestions));

网络请求组合

Observable<Profile> profileObservable = userService.getProfile(userId);Observable<List<Friend>> friendsObservable = userService.getFriends(userId);Observable.zip(profileObservable, friendsObservable,(profile, friends)->newUserData(profile, friends)).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(userData ->updateUI(userData), error ->showError(error));

RxJava的操作符非常丰富,掌握这些操作符能够帮助开发者高效处理各种异步数据流场景。实际开发中,应根据具体需求选择合适的操作符组合,同时注意线程调度和资源管理,以构建高效可靠的响应式应用。

Read more

人工智能多模态模型开发与应用:跨越文本、图像与语音的融合实践

人工智能多模态模型开发与应用:跨越文本、图像与语音的融合实践

一、人工智能多模态模型开发与应用:跨越文本、图像与语音的融合实践 1.1 本章学习目标与重点 💡 掌握多模态模型的核心概念与技术原理,理解文本、图像、语音等不同模态数据的融合逻辑; 💡 熟练运用主流多模态框架(Hugging Face Transformers、MMEngine、LangChain Multimodal),实现跨模态理解与生成任务; 💡 精通多模态模型的开发流程,包括数据预处理、模型选型、训练微调、部署落地等关键环节; 💡 通过真实场景案例(图文生成、跨模态问答、语音助手),掌握多模态技术从原型到产品的端到端落地能力。 ⚠️ 重点关注:多模态数据的对齐与预处理、模型训练的显存优化、生成内容的一致性与准确性、以及不同部署场景下的性能适配。 1.2 多模态模型基础:概念、技术与生态 随着人工智能技术的发展,单一模态(如纯文本、纯图像)模型已难以满足复杂场景需求。多模态模型通过融合文本、图像、语音、视频等多种模态数据,实现更全面的理解与更灵活的生成,成为当前

By Ne0inhk
OpenClaw接入企业微信全攻略:从0到1打通企业AI协作通道

OpenClaw接入企业微信全攻略:从0到1打通企业AI协作通道

摘要:本文详细介绍了将OpenClaw AI框架接入企业微信的完整方案。通过两种主流接入方式(API模式机器人和自建应用),企业可以快速实现智能问答、流程自动化等AI能力落地。文章重点讲解了从前期准备、核心接入流程到生产环境部署的全套实操步骤,包括权限配置、网络设置、参数对接等关键环节。同时提供了进阶优化建议,如后台守护、HTTPS加固、权限管控等企业级功能配置,以及常见问题排查方法。该方案能有效解决企业信息孤岛问题,将AI能力无缝嵌入员工日常办公场景,在保障数据安全的同时显著提升工作效率。 目录 一、前言:为什么要将OpenClaw接入企业微信? 二、接入前置准备 OpenClaw介绍 接入准备工作 三、核心接入流程(两种方案任选) 方案一:API模式机器人接入(新手首选,快速上手) 步骤1:企业微信后台创建API模式机器人 步骤2:OpenClaw安装企微插件并配置参数 步骤3:完成机器人创建并测试联调 方案二:企业微信自建应用接入(企业级进阶方案) 步骤1:企业微信创建自建应用并获取核心凭证 步骤2:OpenClaw配置自建应用核心参数 步骤3:启用应

By Ne0inhk
『AI开发工具』Pencil.dev:AI 时代开发者必备的设计工具,从安装到实战教学

『AI开发工具』Pencil.dev:AI 时代开发者必备的设计工具,从安装到实战教学

📣读完这篇文章里你能收获到 1. 📁 掌握Pencil.dev的核心理念与适用场景 2. 🐍 完成Pencil.dev的完整安装与配置流程 3. 🌐 通过实战案例学习从设计到生产代码的完整工作流 4. 🖥️ 对比传统开发流程与Pencil.dev新流程的效率差异 文章目录 * 前言 * 一、核心概念与环境准备 * 1.1 Pencil.dev是什么? * 1.2 解决的核心问题 * 1.3 适用人群 * 1.4 环境要求 * 二、安装配置步骤 * 2.1 安装VS Code扩展 * 2.1.1 打开插件商店搜索Pencil安装 * 2.1.2 查看MCP自动安装 * 2.2 注册账户 * 2.3 验证MCP配置 * 2.

By Ne0inhk
【保姆级教程】爆火开源项目 Next AI Draw.io 上手指南:一句话画流程图

【保姆级教程】爆火开源项目 Next AI Draw.io 上手指南:一句话画流程图

目录 一、部署方式选择说明(先看这个) 二、部署前准备(非常重要) 三、方式一:Docker 一行命令启动(最推荐) 四、方式二:源码本地运行(适合二次开发) 五、配置API_Key 六、案例展示 七、写到最后 最近一个开源项目 Next AI Draw.io 在 GitHub 上迅速走红,只需要一句自然语言,就能自动生成流程图、架构图,甚至是完整的 AWS / GCP / Azure 云架构示意图,引发了不少开发者和产品经理的关注。它将大模型能力与 draw.io 深度结合,把“画图”这件原本又慢又累的事情,直接变成了“对话即出图”,无论是技术方案梳理、

By Ne0inhk