Observable

empty()

返回一个不向 observe 发送任何项目的 Observable 对象,并且立即调用它的 onComplete() 方法

onErrorReturn()

当发生错误的时候,指示一个 ObservableSource 发送一个数据,而不是去调用 onError

onErrorReturn()

默认情况下,当遇到一个错误时,将从数据流中拦截这个错误,并发送到 Observer,接着调用其 Observer 的 onError() 方法,之后在不调用 Observer 其他方法的情况下退出,onErrorReturn() 方法将会改变这个行为,如果传递一个函数 (resumeFunction) 给 ObservableSource 的 onErrorReturn() 方法,当再次遇到错误的时候,不再是调用 Observer 的 onError() 方法,它将会发送 resumeFunction 的返回值

该方法可以用于阻止错误的传播以及提供发生错误时需要的备用数据

Schedulers

用于返回 Schedulers 实例的静态工厂方法
可以通过以下四个方法设置 Schedulers 的初始值:

  • RxJavaPlugins.setInitComputationSchedulerHandler()
  • RxJavaPlugins. setInitIoSchedulerHandler()
  • RxJavaPlugins. setInitNewThreadSchedulerHandler()
  • RxJavaPlugins. setInitSingleSchedulerHandler()

以下方法设置 Schedulers 运行时的值:

  • RxJavaPlugins.setComputationSchedulerHandler()
  • RxJavaPlugins. setIoSchedulerHandler()
  • RxJavaPlugins. setNewThreadSchedulerHandler()
  • RxJavaPlugins. setSingleSchedulerHandler()

computation()

该方法返回一个用于计算工作的默认且共享的 Scheduler 实例,它可以用于循环事件,处理回调和其他的计算工作,不推荐用于执行阻塞 IO 的工作

默认实例具有一个等于 JVM 可用处理器数量的单线程 ScheduledExecutorService 实例的后备池

没有处理的错误将会发送到 scheduler 线程的 Thread.UncaughtExceptionHandler

该 scheduler 对于 Scheduler.Worker 实例的泄漏不敏感,如果没有取消 timed/delayed 任务,执行 and/or 任务可能会导致资源泄漏

如果 RxJavaPlugins.setFailOnNonBlockingScheduler(boolean) 设置了 true,在 scheduler 运行过程中执行阻止的操作符,会导致 IllegalStateException

在使用 Scheduler 之前,可以通过必须设置的系统属性控制标准的 scheduler 的某些属性,支持的系统属性(通过 System.getProperty() 获取):

  • rx2.computation-threads (int):设置 Computation() 的 Scheduler 的线程数,默认是可用 CPU 数
  • rx2.computation-priority (int):设置 Computation() 的 Scheduler 的线程优先级,默认是 Thread.NORM_PRIORITY,也就是一般的优先级

前面已经提到了,可以通过 RxJavaPlugins.setInitComputationSchedulerHandler() 方法覆盖 scheduler 的初始化默认值,需要注意的是,由于初始化的生命周期,在 Scheduler 初始化完成前调用其他返回 Scheduler 实例的方法都会抛出 NullPointerException,一旦 Scheduler 初始化完成,可以通过 RxJavaPlugins.setComputationSchedulerHandler() 方法覆盖返回的 Scheduler 实例

通过 RxJavaPlugins.createComputationScheduler(ThreadFactory) 方法可以利用一个自定义的 ThreadFactory 创建一个新的实例,需要注意的是该自定义实例需要手动调用 Scheduler.shutdown() 允许 JVM 退出或者 J2EE 容器正确卸载

使用该 scheduler 作为基础的运算符都用 @SchedulerSupport(COMPUTATION) 注解标识

Disposable

isDisposed()

如果资源已经被处理,则返回 true

dispose()

处理资源,该操作应该是幂等的

Error handing

RxJava 2.x 中一个重要的设计需求就是异常不应该被忽略,这意味着当下游的生命周期到达了终点或者下游取消了即将发生错误的序列,将不会发生错误

这样的错误将路由到 RxJavaPlugins.onError 进行处理,RxJavaPlugins.setErrorHandler(Consumer<Throwable>) 这个方法可以重写这个错误处理器,如果没有一个明确的错误处理器,RxJava 默认会打印异常的堆栈信息到控制台并且调用当前线程未捕获异常的处理器

在 Java 程序中,后面的处理器对 ExecutorService 支持的 Scheduler 无效,程序可以继续保持运行。然而,Android 程序更加的严格,这样的未捕获异常会导致程序终止

这种行为是否是合适的可以进一步的讨论,但无论如何,如果想要避免调用未捕获异常的处理器,使用 RxJava 2.x 的程序应该设置一个无操作的处理器:

1
2
3
4
5
// If Java 8 lambdas are supported
RxJavaPlugins.setErrorHandler(e -> { });

// If no Retrolambda or Jack
RxJavaPlugins.setErrorHandler(Functions.<Throwable>emptyConsumer());

不推荐中间库在测试环境之外更改错误处理器,不幸的是,RxJava 2.x 无法告诉某个生命周期遇到无法传递的异常时,是否该让程序崩溃。识别这些异常的来源和原因很让人讨厌,特别是它们来源于将链路路由到 RxJavaPlugins.onError 的源中

因此,2.0.6引入了一个特定的异常包装器来帮助区分和追踪异常信息:

  • OnErrorNotImplementedException:当用户忘记给 subscribe() 添加错误处理时,重新引入检测
  • ProtocolViolationException:表示操作中的错误
  • UndeliverableException:包装由于 Subscriber/Observer 的生命周期限制而无法传递的原始异常。它由 RxJavaPlugins.onError 自动用于完整的堆栈跟踪,它有助于帮助找到是那个操作符重新路由了原始错误

如果一个未传递的异常是 NullPointerExceptionIllegalStateExceptionIllegalArgumentExceptionCompositeExceptionMissingBackpressureException or OnErrorNotImplementedException 的实例或者子类,这些异常将不会包装成 UndeliverableException

此外,一些第三方的库,当它们被一个 cancel/dispose 调用中断时,很多情况下会导致未传递的异常,2.0.6版本内部的改变是在 cancelling/disposing 一个 task 或者 worker(会导致目标线程中断)之前,会一直 cancel 或者 dispose Subscription/Disposable

1
2
3
4
5
6
7
8
9
10
// in some library
try {
doSomethingBlockingly()
} catch (InterruptedException ex) {
// check if the interrupt is due to cancellation
// if so, no need to signal the InterruptedException
if (!disposable.isDisposed()) {
observer.onError(ex);
}
}

如果库或者代码中已经这么做了,现在应该停止无法传递的 InterruptedExceptions,如果之前没有使用这个模式,我们提倡更新相关的代码或库

如果决定添加一个非空的全局错误处理,这里有一个例子,可以根据它们是否代表一个 bug 或者一个可忽略的程序或网络状态来管理典型的未传递的异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
RxJavaPlugins.setErrorHandler(e -> {
if (e instanceof UndeliverableException) {
e = e.getCause();
}
if ((e instanceof IOException) || (e instanceof SocketException)) {
// fine, irrelevant network problem or API that throws on cancellation
return;
}
if (e instanceof InterruptedException) {
// fine, some blocking code was interrupted by a dispose call
return;
}
if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) {
// that's likely a bug in the application
Thread.currentThread().getUncaughtExceptionHandler()
.handleException(Thread.currentThread(), e);
return;
}
if (e instanceof IllegalStateException) {
// that's a bug in RxJava or in a custom operator
Thread.currentThread().getUncaughtExceptionHandler()
.handleException(Thread.currentThread(), e);
return;
}
Log.warning("Undeliverable exception received, not sure what to do", e);
});