2017-10-07 | learn

kotlin-coroutines

Source:https://kotlinlang.org/docs/reference/coroutines.html

耗时操作我们见得多了,网络 IO,文件 IO,CPU/GPU 密集计算等等等等。这些操作执行完之前,其运行线程处于阻塞状态。Coroutines 为我们提供了一种更轻量,更可控的方法应对这些业务:suspension of a coroutine

Coroutines 将杂活部分抽出到依赖库, 以此简化异步程序。使用 coroutines 将允许我们以连续的形式表示异步逻辑,底层库会处理异步。它会将相关代码封装成 callback,并订阅相关事件,将任务分配到不同线程(甚至是不同机器)上,待其执行完毕恢复原有线程,顺序执行。

许多其他语言的异步机制在 kotlin 中都可以使用 coroutines 实现。包括 async/await from C# and ECMAScript, channels and select from Go, and generators/yield from C# and Python.更多相关内容可以参见.

Blocking vs Suspending

本质上来说,coroutines 指的是那些可以被挂起从而避免阻塞 thread 的操作。阻塞线程通常意味着很大的性能消耗,尤其是在高负载的场景下,仅有一小部分 thread 可以被执行,如果其中一个 thread 被阻塞会导致一些重要的 work 被 delay。

而 coroutine suspension 基本没有额外的性能消耗。没有上下文(context)切换也不需要其他额外的 OS 支持。更重要的是 suspension 在很大程度上可以由用户来控制:作为一个库的维护人员,我们可以决定 suspension and optimize/log/intercept 的行为。

另一个不同是 coroutines 不能在随机指令处挂起,只能在 suspension points 处调用,所以需要用 suspend 标记一个函数。

Suspending functions

1
2
3
suspend fun doSomething(foo: Foo): Bar {
...
}

上面这样定义的方法叫做 suspending funtion,意味着调用此方法有可能会挂起一个 coroutines(当结果返回时,library 可以取消挂起)。
Suspending function 也可以有参数和返回值,但只能在 coroutines 和其他 suspending functions 中调用。实际上我们至少需要一个 suspending 函数来启动 coroutines,通常是一个匿名函数(比如一个 suspending lambda)。一个例子:

1
fun <T> async(block: suspend () -> T)

async()只是个普通函数,但是它可以接收一个 suspending lambda 做参数。这样我们就能传入一个 lambda 并调用它。

1
2
3
4
async {
doSomething(foo)
...
}

同上,await() 也可以是一个 suspending function( 当然也要在 async() 块内调用),它可以暂时挂起,直到计算结果返回。

1
2
3
4
5
async {
...
val result = computation.await()
...
}

More information on how actual async/await functions work in kotlinx.coroutines can be found here.

suspending function 不能再函数中调用。

另外,suspending function 可以是 virtual 的(写在接口/抽象类里)。实现这些方法时也要加上 suspend modifier.

1
2
3
4
5
6
7
interface Base {
suspend fun foo()
}

class Derived: Base {
override suspend fun foo() { ... }
}

@RestrictsSuspension annotation

扩展函数(还有 lambda)也可以标记为 suspend,这将允许以 DSL 的形式创建或者其他 API 的用户可以对其进行扩展。当然, library 的作者也可能需要禁止用户添加新的方式 suspend 一个 coroutines。

@RestrictsSuspension 注解就是为此而生的。使用这个注解标注的 class 或 interface R,其 subspending 扩展必须 delegate R 的所有成员或者 R 的其他扩展。又因为扩展函数之间不能无限的代理(程序会死循环),这就保证了所有的 suspensions 都是由 R 的成员发起的。

少数情况下,我们的 library 需要使用某个特殊的方式处理每个 suspension。举个例子,使用 buildSequence() 函数时,我们需要确保 coroutine 中的所有挂起的调用都要以 yield() 或 yieldAll() 结束。所以 这里用 RestrictsSuspension 标注。

1
2
3
4
@RestrictsSuspension
public abstract class SequenceBuilder<in T> {
...
}

The inner workings of coroutines

Coroutines 完全是由编译技术实现的(无 VM 或 OS限制),suspension 生效是通过代码转换。每个 suspension 函数(可能会涉及优化,这里不做过多解释)都会被转换成一个状态机,所有状态都对应 suspending 调用。在挂起前,next state 储存于一个编译器生成类的字段内,以及相关局部变量等。等到 coroutines 需要继续时,所有局部变量都会被恢复,状态机从挂起时的状态继续运行。

被挂起的 coroutines 可以以 object 的形式传递或保存。这个对象是 Continuation,这里描述的所有代码转换实际相当于典型的 Continuation-passing style(中文介绍)[https://www.zhihu.com/question/20259086]。所以 suspending 函数背后其实需要额外的 Continuation 类型参数。

更多细节可以在设计文档里找到。

Experimental status of coroutines

当前 coroutines 处于试验状态,也就是说在正式版发布前可能还会有更改。kotlin 1.1 编译会有警告,opt-inflag可以移去这个。

当前 coroutines api 位于 kotlin.coroutines.experimental 包内。正式版会放到 kotlin.coroutines

Standard APIs

三个主要构成

  • language support (i.s. suspending functions, as described above);
  • low-level core API in the Kotlin Standard Library;
  • high-level APIs that can be used directly in the user code.

Low-level API: kotlin.coroutines

低级 API 只用于实现高级 library 时,包含两个主要的 package:

  • kotlin.coroutines.experimental 比如
    • createCoroutine(),
    • startCoroutine(),
    • suspendCoroutine();
  • kotlin.coroutines.experimental.intrinsics 低级内联函数,比如 suspendCoroutineOrReturn

更多 API

Generators API in kotlin.coroutines

kotlin.coroutines.experimental package 里唯一的应用级 api

  • buildSequence()
  • buildIterator()

因为与 sequence 相关,所以位于 kotlin-stdlib 内。以上函数实现了 generators,比如用它方便的生成一个 lazy sequence:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val fibonacciSeq = buildSequence {
var a = 0
var b = 1

yield(1)

while (true) {
yield(a + b)

val tmp = a + b
a = b
b = tmp
}
}

通过不断调用 yield() 函数产生斐波那切数列,并且它是 lazy 的。我们可以用这个 sequence 拿有限个数的数字。比如这样 fibonacciSeq.take(8).toList() results in [1, 1, 2, 3, 5, 8, 13, 21]. 同时 coroutines cheap enough 用于实践。

实验:验证 lazy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val lazySeq = buildSequence {
print("START ")
for (i in 1..5) {
yield(i)
print("STEP ")
}
print("END")
}

// Print the first three elements of the sequence
lazySeq.take(3).forEach { print("$it ") }

// output:
// START 1 STEP 2 STEP 3

更多例子:

1
2
3
4
5
6
7
8
val lazySeq = buildSequence {
yield(0)
yieldAll(1..10)
}

lazySeq.forEach { print("$it ") }
// output
0 1 2 3 4 5 6 7 8 9 10

The buildIterator() works similarly to buildSequence(), but returns a lazy iterator.

你可以给 SequenceBuilder 写扩展函数为 buildSequence() 添加自定义 yield 逻辑( 还记得 @RestrictsSuspension 吗):

1
2
3
4
5
6
7
suspend fun SequenceBuilder<Int>.yieldIfOdd(x: Int) {
if (x % 2 != 0) yield(x)
}

val lazySeq = buildSequence {
for (i in 1..10) yieldIfOdd(i)
}

Other high-level APIs: kotlinx.coroutines

只有和 coroutines 相关的 core api 位于 kotlin 标准库。大部分由 core primitives 和很可能会被基于 coroutines 的库使用到的 interface 组成 。

大部分应用级 API 位于一个单独的 library:kotlinx.coroutines。它包括:

  • 平台无关的异步编码 kotlinx-coroutines-core
    • 这个模块有类似 Go 的 channels ,支持 select 和其他方便的原语
    • 完全指南在此
  • 基于 JDK 8 中 CompletableFuture 的 API:kotlinx-coroutines-jdk8
  • 基于 JDK 7 and higher 的非阻塞 IO (NIO):kotlinx-coroutines-nio
  • Support for Swing (kotlinx-coroutines-swing) and JavaFx (kotlinx-coroutines-javafx);
  • Support for RxJava: kotlinx-coroutines-rx