2017-05-01 | learn

Rxjava / RxJava 2

性能提升

CnFLAFWWgAAuCNc.jpg

Backpressure

Observable 作为事件的生产者,当消费者消耗事件的速度远小于其生产速度,如果将累积的事件缓存起来,无限制的堆积必然会产生 OOM。为了避免,需要对上游的数据流进行控制,所以就有了 backpressure 的概念。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
enum BackpressureMode {
/**
* No backpressure is applied as the onNext calls pass through the Emitter;
* note that this may cause {@link rx.exceptions.MissingBackpressureException} or {@link IllegalStateException}
* somewhere downstream.
*/
NONE,
/**
* Signals a {@link rx.exceptions.MissingBackpressureException} if the downstream can't keep up.
*/
ERROR,
/**
* Buffers (unbounded) all onNext calls until the downstream can consume them.
*/
BUFFER,
/**
* Drops the incoming onNext value if the downstream can't keep up.
*/
DROP,
/**
* Keeps the latest onNext value and overwrites it with newer ones until the downstream
* can consume it.
*/
LATEST
}

Hot/Cold source

b51e7a6e-671e-11e4-986a-1ab38c5e4e88.png

Observable and Flowable

在 RxJava 1 中,所有 type 都隐含 backpressure 的概念,但并不是都支持此操作,所以在某些场景容易被错误的使用产生 crash (MissingBackpressureException)。RxJava 2 将原来耦合 Observable 里的 backpressure 概念操作提取出来 (Flowable),让代码的语义更明确。

Example

RxJava 1.x

1
2
3
4
5
Observable<MotionEvent> events
= RxView.touches(paintView);

Observable<Row> rows
= db.createQuery("SELECT * …");

RxJava 2.x

1
2
3
4
5
Observable<MotionEvent> events
= RxView.touches(paintView);

Flowable<Row> rows
= db.createQuery("SELECT * …");

一个不支持 backpressure 的 Observable.(RxJava 1.x )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Observable.<Integer>create(emitter -> {
emitter.onNext(1);
emitter.onNext(2);
}, Emitter.BackpressureMode.ERROR).subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
super.onStart();
request(1);
}

@Override
public void onCompleted() {
System.out.println("complete");
}

@Override
public void onError(Throwable e) {
e.printStackTrace();
}

@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
});

如何选择

When to use Observable
- 最多不超过  1000 个元素,不会产生 OOM。
- 处理 GUI 事件(鼠标/点击事件)这种很难用 backpressure 描述,并且也不是很频繁的操作。可以处理事件频率小于 1000 Hz 也可以考虑用 sampling/debouncing 限制 Observable.
- 你的数据流本质上是同步的,但是你的平台不支持 Java Stream API,或者出于某些原因不能使用这个 feature. Observable 比 Flowable 具有更高的性能优势。
When to use Flowable
- 有 10k+ 的元素会被生成,使用 Flowable 可以限制数据产生的速度。
- Cold source (Read file/database/Network).
- Many blocking and/or pull-based data sources which may eventually get a non-blocking reactive API/driver in the future.

新的订阅/取消接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

Flowable:
interface Subscriber<T> {
void onNext(T t);
void onComplete();
void onError(Throwable t);
void onSubscribe(Subscription s);
}

interface Subscription {
void cancel();
void request(long r);
}

---

Observable:
interface Observer<T> {
void onNext(T t);
void onComplete();
void onError(Throwable t);
void onSubscribe(Disposable d);
}

interface Disposable {
void dispose();
}
subscribe()/subscribeWith()

Flowable:
flowable.png
Observable:
observable.png


新的工具

  • Single:
    from rx.Single.SingleSubscriber<T> to io.reactivex.SingleObserver<T>
    1
    2
    3
    4
    5
    interface SingleObserver<T> {
    void onSubscribe(Disposable d);
    void onSuccess(T value);
    void onError(Throwable error);
    }
  • Completable:
    from rx.Completable.CompletableSubscriber to io.reactivex.CompletableObserver
    1
    2
    3
    4
    5
    interface CompletableObserver<T> {
    void onSubscribe(Disposable d);
    void onComplete();
    void onError(Throwable error);
    }
  • Maybe:相当于 SingleCompletable 的结合体
    1
    2
    3
    4
    5
    6
    Maybe.just(1)
    .map(v -> v + 1)
    .filter(v -> v == 1)
    .defaultIfEmpty(2)
    .test()
    .assertResult(2);
    Example
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    interface UserManager {
    Observable<User> getUser();
    void setName(String name);
    void setAge(int age);
    }

    interface UserManager {
    Observable<User> getUser();
    Completable setName(String name);
    Completable setAge(int age);
    }

创建数据源

目前可以监听用户的 unsubscribes 操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
OkHttpClient client = // …
Request request = // …

Observable.create(e -> {
Call call = client.newCall(request);
e.setCancelation(() -> call.cancel());
call.enqueue(new Callback() {
@Override public void onResponse(Response r) throws IOException {
e.onNext(r.body().string());
e.onComplete();
}A
@Override public void onFailure(IOException e) {
e.onError(e);
}
});
});

关于 Nulls

RxJava 2.x no longer accepts null values and the following will yield NullPointerException immediately or as a signal to downstream。


参考