2017-09-23 | learn

kotlin-coroutines-basic

协程可以看成是一个轻量的线程
One can think of a coroutine as a light-weight thread

像线程一样可以并行执行或相互通信
Like threads, coroutines can run in parallel, wait for each other and communicate.

最大的区别在于 coroutines 的代价非常小,相当于没有:我们可以写个几百上千的 coroutines ,都不会对性能造成特别大的影响。
The biggest difference is that coroutines are very cheap, almost free: we can create thousands of them, and pay very little in terms of performance.

而创建并维持一个 true thread 的开销很大,想要达到如此数量的 true thread 对当前的设备也是一个很严峻的挑战 。
True threads, on the other hand, are expensive to start and keep around. A thousand threads can be a serious challenge for a modern machine.

My first coroutine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import kotlinx.coroutines.experimental.*

fun main(args: Array<String>) {
println("Start")

// Start a coroutine
launch(CommonPool) {
delay(1000)
println("Hello")
}

Thread.sleep(2000) // wait for 2 seconds
println("Stop")
}

CommonPool 是一个 object 对象,内部为反射调用 ForkJoinPool.commonPool() 方法。
This starts a new coroutine on a given thread pool. In this case we are using CommonPool that uses ForkJoinPool.commonPool().

所以实际上 Threads 仍然存在于使用 coroutines 的程序中,不同的是一个 thread 可以跑很多 coroutines。
Threads still exist in a program based on coroutines, but one thread can run many coroutines, so there’s no need for too many threads.

代码中 corotines 使用 delay(1000),但实际上并没有线程被阻塞,只有 coroutines 自己被挂起。此时 thread 被归还给线程池,当 coroutines 的等待结束后,coroutines 会从线程池拿一个新的线程恢复运行。
We are using the delay() function that’s like Thread.sleep(), but better: it doesn’t block a thread, but only suspends the coroutine itself. The thread is returned to the pool while the coroutine is waiting, and when the waiting is done, the coroutine resumes on a free thread in the pool.

验证:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import kotlinx.coroutines.experimental.*

fun main(args: Array<String>) {
println("Start")

// Start a coroutine
launch(CommonPool) {
println("Hello"+Thread.currentThread())
delay(1000)
println("Hello"+Thread.currentThread())
}

launch(CommonPool) {
println("Helloo"+Thread.currentThread())
delay(1000)
println("Helloo"+Thread.currentThread())
}


Thread.sleep(2000) // wait for 2 seconds
println("Stop")
println(Thread.currentThread())
}

此段代码输出:

1
2
3
4
5
6
7
Start
HelloThread[ForkJoinPool.commonPool-worker-1,5,main]
HellooThread[ForkJoinPool.commonPool-worker-2,5,main]
HelloThread[ForkJoinPool.commonPool-worker-2,5,main]
HellooThread[ForkJoinPool.commonPool-worker-1,5,main]
Stop
Thread[main,5,main]

delay() 函数不能直接在 main() 里调用,编译会报错:
If we try to use the same non-blocking delay() function directly inside main(), we’ll get a compiler error:

Suspend functions are only allowed to be called from a coroutine or another suspend function

实在想用可以这样:

1
2
3
runBlocking {
delay(2000)
}

Let’s run a lot of them

如何证明 coroutines 性能优于 thread 呢

1
2
3
4
5
6
7
8
9
val c = AtomicInteger()

for (i in 1..1_000_000)
thread(start = true) {
c.addAndGet(i)
}

println(c.get())

1
2
3
4
5
6
7
8
9
val c = AtomicInteger()

for (i in 1..1_000_000)
launch(CommonPool) {
c.addAndGet(i)
}

println(c.get())

对比下速度就一目了然了,用 thread 例子要等一分多,coroutines 就快多了,不到 1 秒。
当然,这个程序还有其它问题。主线程可能会在其他任务执行完之前结束。CountdownLatch 不失为一种办法,不过我们还是来看一下更好的方式。
This runs a 1’000’000 threads each of which adds to a common counter. My patience runs out before this program completes on my machine (definitely over a minute).
This example completes in less than a second for me, but it prints some arbitrary number, because some coroutines don’t finish before main() prints the result. Let’s fix that.
We could use the same means of synchronization that are applicable to threads (a CountDownLatch is what crosses my mind in this case), but let’s take a safer and cleaner path.

Async: returning a value from a coroutine

我们可以用 async {} 启动 coroutines。和 launch {}有点像,不过它会返回一个 Deferred<T> 类型的返回值,内部是一个 await() 函数,这个函数返回 coroutines 的执行结果。Deferred 是一个基本的 future(你也可以使用 jdk 提供的其他 future,我们这里简便起见限定使用 Deferred)。

Another way of starting a coroutine is async {}. It is like launch {}, but returns an instance of Deferred, which has an await() function that returns the result of the coroutine. Deferred is a very basic future (fully-fledged JDK futures are also supported, but here we’ll confine ourselves to Deferred for now).

我们来继续搞事情

1
2
3
4
5
val deferred = (1..1_000_000).map { n ->
async (CommonPool) {
n
}
}

此时所有任务均已开启,下面求和

1
val sum = deferred.sumBy { it.await() }

我们拿到所有任务并等待其返回,然后又看到了之前那个报错

Suspend functions are only allowed to be called from a coroutine or another suspend function

await()也不能在 coroutines 之外调用,

1
2
3
4
runBlocking {
val sum = deferred.sumBy { it.await() }
println("Sum: $sum")
}

Now it prints something sensible: 1784293664, because all coroutines complete.

接下来,我们验证 coroutines 真的是并行运行的:

1
2
3
4
5
6
val deferred = (1..1_000_000).map { n ->
async (CommonPool) {
delay(1000)
n
}
}

This takes about 10 seconds on my machine, so yes, coroutines do run in parallel.

Suspending functions

coroutines 最大的价值在于其可以不阻塞 thread,想实现这点需要编译器对其注入一些特殊代码。因此我们需要显示标注 coroutines 函数。

那么,如何定义一个 suspending 函数呢?使用 suspend 关键字。

1
2
3
4
suspend fun workload(n: Int): Int {
delay(1000)
return n
}

over~