Brooks

珍惜生命中的每一次冲动

0%

Rxjava1.x 升级到 Rxjava2.x

升级缘由

1、官方声明:版本 1.x 最终的寿命为2018 3月31日。不会发生进一步的开发、支持、维护、PR 和更新。 (https://github.com/ReactiveX/RxJava)

2、RxJava1 中 Observable 不能很好地支持 backpressure ,会抛出MissingBackpressureException。在 RxJava2 中 Oberservable 不再支持 backpressure ,而使用新增的 Flowable 来支持 backpressure 。

3、RxJava 2.x 在性能和内存利用率方面普遍优于 1.x (https://github.com/akarnokd/akarnokd-misc/issues/2)

4、项目中存在多个Rxjava2版本和Rxjava1,(三方库依赖Rxjava),客户端使用混乱,重复依赖~

差异对照

0、导包不同

RxJava2 导入包一般多了一个reactivex

1
import io.reactivex.Observable;

1、新增Flowable

RxJava1 中 Observable 不能很好地支持 backpressure ,会抛出MissingBackpressureException。所以在 RxJava2 中 Observable 不再支持 backpressure ,而使用新增的 Flowable 来支持 backpressure 。

2、ActionN 和 FuncN 改名

1
2
3
4
5
6
7
8
9
10
Action0 改名成 Action
Action1 改名成 Consumer
Action2 改名成了 BiConsumer
Action3 - Action9都不再使用了
ActionN 变成了 Consumer

Func1 改名成 Function
Func2 改名成 BiFunction
Func3 - Func9 改名成 Function3 - Function9,
FuncN 由 Function<Object[], R> 取代。对应的抽象方法也有所修改,call需要修改为apply

3、Observable.OnSubscribe -> ObservableOnSubscribe

Observable.OnSubscribe 改成了 ObservableOnSubscribe

4、Observable.Transformer -> ObservableTransformer

1
Observable.Transformer 改成了 ObservableTransformer

5、Observable.from -> Observable.fromIterable

1
Observable.from  -> Observable.fromIterable 

6. Subscription -> Disposable

1
2
3
在 RxJava2 中,由于已经存在了 org.reactivestreams.subscription 这个类,为了避免名字冲突将原先的 rx.Subscription 改名为 io.reactivex.disposables.Disposable。

顺便说下,Disposable必须单次使用,用完就要销毁。

7、SubScriber -> Observer

1
io.reactivex.Observer

8、ObservableOnSubscribe 中使用 ObservableEmitter 发送数据

1
ObservableOnSubscribe 不再使用 Subscriber 而是用 ObservableEmitter 替代。

9、Observer接口中 onCompleted 改成了 onComplete

10、Schedulers.immediate( ) -> Schedulers.trampoline( )

1
Schedulers.immediate( )的作用为在当前线程立即执行任务,功能等同于RxJava2中的Schedulers.trampoline( )

11.RxView.clicks -> RxBase.clicks 自定义处理事件

1
2
com.common.utils.rx.ViewClickOnSubscribe
com.common.utils.rx.ViewLongClickOnSubscribe

12、CompositeSubscription -> CompositeDisposable

1
io.reactivex.disposables.CompositeDisposable

13、Observable.takeFirst -> Observable.takeUntil

1
io.reactivex.Observable#takeUntil(io.reactivex.functions.Predicate<? super T>)

14、Rxjava2 onNext方法不能回调 null对象,否则Emitterf发射器里会直接抛出异常:

1
onNext called with null. Null values are generally not allowed in 2.x operators and sources.

解决办法:使用一个对象包裹接口返回结果,将包装后对象发射出去,再次里面取出结果

15、 Observable.just(null) 替换

Rxjava2对just函数 进行了null校验,不允许发送null值。

1
2
3
4
5
//io.reactivex.Observable#just(T) 
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}

16、异常捕获处理 setErrorHandler

The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with

1
2
3
4
5
6
7
8
9
10
解决方案1:在 Application 初始化时候设置 RxJavaPlugin 的 ErrorHandler。RxJava2的一个重要的设计理念是:不吃掉任何一个异常。产生的问题是,当RxJava2“downStream”取消订阅后,“upStream”仍有可能抛出异常,这时由于已经取消订阅,“downStream”无法处理异常,此时的异常无人处理,便会导致程序崩溃。

RxJavaPlugins.setErrorHandler(new Consumer() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println(“err:” + throwable.getMessage());
}
});

// 原文链接:https://blog.csdn.net/qq_36017059/article/details/110140369

17、取消订阅

1
2
3
Observable : doOnUnsubscribe ->  doOnDispose

Flowable: doOnDispose -> doOnCancel

18、背压替换使用 Flowable

1
使用到.onBackpressureDrop() ,onBackpressureBuffer 函数,请替换 Observable为 Flowable。 专门用于处理背压。

欢迎关注我的其它发布渠道