性能提升
Backpressure
Observable 作为事件的生产者,当消费者消耗事件的速度远小于其生产速度,如果将累积的事件缓存起来,无限制的堆积必然会产生 OOM。为了避免,需要对上游的数据流进行控制,所以就有了 backpressure 的概念。
1 | enum BackpressureMode { |
Hot/Cold source
Observable and Flowable
在 RxJava 1 中,所有 type 都隐含 backpressure 的概念,但并不是都支持此操作,所以在某些场景容易被错误的使用产生 crash (MissingBackpressureException)。RxJava 2 将原来耦合 Observable 里的 backpressure 概念操作提取出来 (Flowable),让代码的语义更明确。
Example
RxJava 1.x1
2
3
4
5Observable<MotionEvent> events
= RxView.touches(paintView);
Observable<Row> rows
= db.createQuery("SELECT * …");
RxJava 2.x
1 | Observable<MotionEvent> events |
一个不支持 backpressure 的 Observable.(RxJava 1.x )1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26Observable.<Integer>create(emitter -> {
emitter.onNext(1);
emitter.onNext(2);
}, Emitter.BackpressureMode.ERROR).subscribe(new Subscriber<Integer>() {
public void onStart() {
super.onStart();
request(1);
}
public void onCompleted() {
System.out.println("complete");
}
public void onError(Throwable e) {
e.printStackTrace();
}
public void onNext(Integer integer) {
System.out.println(integer);
}
});
如何选择
When to use Observable
- 最多不超过 1000 个元素,不会产生 OOM。
- 处理 GUI 事件(鼠标/点击事件)这种很难用 backpressure 描述,并且也不是很频繁的操作。可以处理事件频率小于 1000 Hz 也可以考虑用 sampling/debouncing 限制 Observable.
- 你的数据流本质上是同步的,但是你的平台不支持 Java Stream API,或者出于某些原因不能使用这个 feature. Observable 比 Flowable 具有更高的性能优势。
When to use Flowable
- 有 10k+ 的元素会被生成,使用 Flowable 可以限制数据产生的速度。
- Cold source (Read file/database/Network).
- Many blocking and/or pull-based data sources which may eventually get a non-blocking reactive API/driver in the future.
新的订阅/取消接口
1 |
|
subscribe()/subscribeWith()
Flowable:
Observable:
新的工具
- Single:
fromrx.Single.SingleSubscriber<T>
toio.reactivex.SingleObserver<T>
1
2
3
4
5interface SingleObserver<T> {
void onSubscribe(Disposable d);
void onSuccess(T value);
void onError(Throwable error);
} - Completable:
fromrx.Completable.CompletableSubscriber
toio.reactivex.CompletableObserver
1
2
3
4
5interface CompletableObserver<T> {
void onSubscribe(Disposable d);
void onComplete();
void onError(Throwable error);
} - Maybe:相当于
Single
和Completable
的结合体1
2
3
4
5
6Maybe.just(1)
.map(v -> v + 1)
.filter(v -> v == 1)
.defaultIfEmpty(2)
.test()
.assertResult(2);Example
1
2
3
4
5
6
7
8
9
10
11interface UserManager {
Observable<User> getUser();
void setName(String name);
void setAge(int age);
}
interface UserManager {
Observable<User> getUser();
Completable setName(String name);
Completable setAge(int age);
}
创建数据源
目前可以监听用户的 unsubscribes
操作:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16OkHttpClient client = // …
Request request = // …
Observable.create(e -> {
Call call = client.newCall(request);
e.setCancelation(() -> call.cancel());
call.enqueue(new Callback() {
public void onResponse(Response r) throws IOException {
e.onNext(r.body().string());
e.onComplete();
}A
public void onFailure(IOException e) {
e.onError(e);
}
});
});
关于 Nulls
RxJava 2.x no longer accepts null values and the following will yield NullPointerException immediately or as a signal to downstream。
…