RxJava 笔记

记录一些概念和一个小重点。

设计

取自 RxJava 官方 wiki

To use RxJava you create Observables (which emit data items), transform those Observables in various ways to get the precise data items that interest you (by using Observable operators), and then observe and react to these sequences of interesting items (by implementing Observers or Subscribers and then subscribing them to the resulting transformed Observables).

设计上分为两部分,我们用 RxJava 要做的事情全都在这里了。

  • 获取数据并且转换成你想要的样子(发布者做的事情)
    • 各种操作符
    • 产生数据,或者产生新的发布者
  • 订阅并且响应这些数据(订阅者做的事情)
    • 线程切换
    • 取消订阅

取消订阅

多线程环境下,可能一个 Observable 的操作没有完成,用户就退出了 activity,所以需要对这个 Observable 取消订阅。

RxJava2 中,Observable#subscribe 会返回一个 Disposable 对象,这个对象可以取消对 Observable 的订阅。

1
2
3
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
return this.subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
1
2
3
4
5
6
7
8
private Disposable subscribe;
@Override
protected void onDestroy() {
if(subscribe != null) {
subscribe.dispose();
}
super.onDestroy();
}

或者使用 CompositeDisposable 对象收集 Disposable 对象,统一处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private CompositeDisposable compositeDisposable = new CompositeDisposable();
compositeDisposable.add(Observable.just(1).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer data) {
// do some thing with data
}
}));

@Override
protected void onDestroy() {
if(compositeDisposable != null) {
compositeDisposable.dispose();
}
super.onDestroy();
}

也可以使用 RxLifeCycle 来解决这个问题。


附录