RxJava 和 RxAndroid 五(线程调度)补充研究
线程调度文章:
研究结论:默认情况下,不指定生产者线程和消费者线程,运行在当前线程,
但是如果emitter在发射的时候放在一个新的线程里,那么加工线程和消费线程保持和这个新的线程一致
测试代码:
public class MainDemo {
public static void main(String[] args){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(final ObservableEmitter<Integer> emitter) throws Exception {
new Thread(new Runnable() {
@Override
public void run() {
emitter.onNext(1);
emitter.onComplete();
}
}).start();
System.out.println( "rx_call_1:" + Thread.currentThread().getName() );
}
})
. map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer cmsResponse) throws Exception {
System.out.println( "rx_call_map:" + Thread.currentThread().getName() );
return 2;
}
}).
subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println( "rx_call_2:" + Thread.currentThread().getName() );
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
}
下面虽然指定里生产者线程 subscribeOn(Schedulers.io()),但是加工线程和消费线程还是跟随emiter.onnext执行的线程
public class MainDemo { public static void main(String[] args){ Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(final ObservableEmitter<Integer> emitter) throws Exception { new Thread(new Runnable() { @Override public void run() { emitter.onNext(1); emitter.onComplete(); } }).start(); System.out.println( "rx_call_1:" + Thread.currentThread().getName() ); } }) . map(new Function<Integer, Integer>() { @Override public Integer apply(Integer cmsResponse) throws Exception { System.out.println( "rx_call_map:" + Thread.currentThread().getName() ); return 2; } }) .subscribeOn(Schedulers.io()) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { System.out.println( "rx_call_2:" + Thread.currentThread().getName() ); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } }