Coroutines

CoroutineScope

协程作用域,任何协程都必须运行在 CoroutineScope 中。

  • GlobalScope:全局作用域,非结构化并发,持有 EmptyCoroutineContext
  • runBlocking:全局作用域,一般用于 main() 或测试中,会阻塞当前线程。
  • coroutineScope:局部作用域,结构化并发,当所有子协程执行完毕才会返回,并且等待过程中不会阻塞当前线程;其中一个子协程发生异常,所有其他子协程都会取消。
  • supervisorScope:局部作用域,结构化并发,当所有子协程执行完毕才会返回,并且等待过程中不会阻塞当前线程;其中一个子协程发生异常,不会影响其它子协程。
  • viewModelScope:Android ktx 提供的用于 ViewModel 中启动协程的作用域,结构化并发,会在 ViewModel#clear() 中自动取消。
  • CoroutineScope:可以自己实现该接口,实现自定义作用域。内部有个 CoroutineContext 属性。

Coroutine builders

  • launch:创建一个不会阻塞当前线程的新协程,并返回给该协程一个 Job 引用。
  • runBlocking:创建一个新协程,该协程执行完毕前会阻塞当前线程,并返回协程执行结果。
  • async:创建一个不会阻塞当前线程的新协程,并返回协程执行结果类型的 Deferred 对象。
  • withContext:改变当前协程的 CoroutineContext,一般配合 Dispatchers 使用。

Coroutine context

协程运行的上下文,以键值对的方式存储各种不同元素。

  • EmptyCoroutineContext:空实现,不会改变协程的行为,类似空 Map。
  • CoroutineName:为了便于调试可设置协程名称。
  • Job:协程的生命周期,可用于取消协程。协程与其子协程通过 Job 联系在一起,它会等待所有子协程都执行完毕,并在其中一个发生异常时取消所有的子协程(想要各个子协程不相互影响可以使用 SupervisorJob)。取消父协程的 Job 会取消该父协程的所有子协程 Job;其中一个子协程执行失败或抛出 CancellationException 也会导致父协程被取消。
  • CoroutineExceptionHandler:处理协程内部未被捕获的异常。
  • ContinuationInterceptor:协程恢复拦截器,主要在 dispatchers 中使用。

Coroutine dispatchers

协程调度器,有以下几种实现:

  • Dispatchers.Default:每次都切换到线程池的一个不同线程,一般用于 CPU 密集型任务。
  • Dispatchers.Main:平台相关的主线程,默认不可用,需要在各自平台实现。
  • Dispatchers.IO:用于执行一些阻塞的 IO 操作,如网络请求、数据库读写,文件操作等。
  • Dispatchers.Unconfined:总是使用第一个可用线程,具有不确定性,性能最优。
  • newSingleThreadContext:实验性,使用一个单线程创建新的协程上下文。
  • newFixedThreadPoolContext:实验性,使用一个指定大小的线程池创建新的协程上下文。

Channels

通道,类似 BlockingQueue,提供非阻塞的 send()receive,可实现生产者–消费者模型。可以关闭通道。

  • 扇出(Fan-out):多个协程也许会接收相同的通道,在它们之间进行分布式工作。
  • 扇入(Fan-in):多个协程可以发送到同一个通道。
fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
    for (x in 1..5) send(x * x)
}

val squares = produceSquares()
squares.consumeEach { println(it) } //1, 4, 9, 16, 25

Sequence builder

val childNumbers = sequence {
  yield(1)
  print("AAA")
  yieldAll(listOf(2, 3))
}
childNumbers.forEach { print(it) } //1AAA23

val nums = childNumbers.joinToString() // AAA
print(nums) // 1, 2, 3

Deal with shared state

共享的可变状态与并发。

  • AtomicInteger:可使用原子基本类型。线程安全的。
  • AtomicReference<V>:可使用原子引用类型。线程安全的。
  • newSingleThreadContext:使用单线程模型。
  • Mutex:互斥,类似 synchronizedReentranLock。给关键代码块加锁,确保不会被同时执行。
    private val mutex = Mutex()
    mutex.withLock { /**/ }
    

Actors

actor 协程构建器是一个双重的 produce 协程构建器。一个 actor 与它接收消息的通道相关联,而一个 producer 与它发送元素的通道相关联。
在高负载下比锁更有效,因为在这种情况下它总是有工作要做,而且根本不需要切换到不同的上下文。

// 计数器 Actor 的各种类型
sealed class CounterMsg
object IncCounter : CounterMsg() // 递增计数器的单向消息
object PrintCounter : CounterMsg() // 打印的单向消息
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 携带回复的请求

// 这个函数启动一个新的计数器 actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // actor 状态
    for (msg in channel) { // 即将到来消息的迭代器
        when (msg) {
            is IncCounter -> counter++
            is PrintCounter -> print(counter)
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

Reference

联系

我是 xiaobailong24,您可以通过以下平台找到我: