前言
Kotlin coroutines 为我们提供了一种编写良好异步代码的简易 API。在 Kotlin coroutines 中,我们可以自定义 CoroutineScope
,用来管理我们启动的协程的运行位置。需要注意的是,每个 coroutine 都需要运行在 CoroutineScope 中。
Architecture components 为我们提供了在各组件中使用协程的官方支持。
Add KTX dependencies
这些 coroutine scopes 作为 Architecture components 的扩展包提供给开发者使用,它们位于 KTX extensions。通过单独添加以下依赖,我们就可以使用它们。
- ViewModelScope:
androidx.lifecycle:lifecycle-viewmodel-ktx:2.1.0-beta01
或更高 - LifecycleScope:
androidx.lifecycle:lifecycle-runtime-ktx:2.2.0-alpha01
或更高 - LiveData:
androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-alpha01
或更高
最新版本可在 Google Maven 中找到。
Lifecycle-aware coroutine scopes
Architecture components 为我们提供了 ViewModelScope
和 LifecycleScope
用于管理我们在 ViewModel 和 Lifecycle 中启动的协程。
ViewModelScope
Usage
ViewModelScope
是 ViewModel 的一个扩展属性,因此我们可以在每个 ViewModel 的子类中使用它。每个在 ViewModelScope 中启动的协程都会在 ViewModel 销毁(ViewModel#onCleared()
)的时候自动取消(cancel)。如果我们只需要在 ViewModel 存活(active)时做一些逻辑处理,使用协程是一个好的选择。举个栗子,假如我们在 ViewModel 中为 View 层计算一些数据然后将结果显示到 UI 上,我们应该限定这些计算工作在 ViewModel 的生命周期内执行。这样当我们的 ViewModel 销毁的时候,这些计算工作也会自动取消,避免资源浪费和内存泄露。
我们可以使用 ViewModel 的扩展属性 – ViewModelScope
,来限定协程的运行作用域。使用方式如下:
class MyViewModel: ViewModel {
init {
// 在 ViewModelScope 中启动一个 coroutine
viewModelScope.launch {
// 这个 coroutine 会在 ViewModel 销毁时被自动取消。
}
}
}
Source code
下面我们来看下源码是如何实现的。
ViewModel.kt
// 用于存放 viewModelScope 的 key
private const val JOB_KEY = "androidx.lifecycle.ViewModelCoroutineScope.JOB_KEY"
/**
* [CoroutineScope] tied to this [ViewModel].
* This scope will be canceled when ViewModel will be cleared, i.e [ViewModel.onCleared] is called
*
* This scope is bound to [Dispatchers.Main]
*/
val ViewModel.viewModelScope: CoroutineScope
get() {
// 首先从缓存中取 CoroutineScope,若非第一次调用 viewModelScope,则会直接返回
val scope: CoroutineScope? = this.getTag(JOB_KEY)
if (scope != null) {
return scope
}
// 若首次调用,则新建一个 CloseableCoroutineScope,并存在 ViewModel 中。
// 这个 CloseableCoroutineScope 与主线程绑定。
return setTagIfAbsent(JOB_KEY,
CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main))
}
internal class CloseableCoroutineScope(context: CoroutineContext) : Closeable, CoroutineScope {
// 实现 CoroutineScope 接口,设置 CoroutineContext
override val coroutineContext: CoroutineContext = context
// 实现 Closeable 接口
override fun close() {
// 取消该 scope 中启动的所有 coroutines
coroutineContext.cancel()
}
}
ViewModel.java
下面我们带着以下疑问,来看下 ViewModel 的源码。ViewModel 的源码也就几十行。
getTag
是如何实现的?setTagIfAbsent
是如何实现的?- 为什么
viewModelScope
中启动的线程会在 ViewModel 销毁时被自动取消。
public abstract class ViewModel {
// Can't use ConcurrentHashMap, because it can lose values on old apis (see b/37042460)
@Nullable
private final Map<String, Object> mBagOfTags = new HashMap<>();
private volatile boolean mCleared = false;
/**
* This method will be called when this ViewModel is no longer used and will be destroyed.
* <p>
* It is useful when ViewModel observes some data and you need to clear this subscription to
* prevent a leak of this ViewModel.
*/
@SuppressWarnings("WeakerAccess")
protected void onCleared() {
}
@MainThread
final void clear() {
mCleared = true;
// Since clear() is final, this method is still called on mock objects
// and in those cases, mBagOfTags is null. It'll always be empty though
// because setTagIfAbsent and getTag are not final so we can skip
// clearing it
if (mBagOfTags != null) {
synchronized (mBagOfTags) {
for (Object value : mBagOfTags.values()) {
// see comment for the similar call in setTagIfAbsent
closeWithRuntimeException(value);
}
}
}
onCleared();
}
/**
* Sets a tag associated with this viewmodel and a key.
* If the given {@code newValue} is {@link Closeable},
* it will be closed once {@link #clear()}.
* <p>
* If a value was already set for the given key, this calls do nothing and
* returns currently associated value, the given {@code newValue} would be ignored
* <p>
* If the ViewModel was already cleared then close() would be called on the returned object if
* it implements {@link Closeable}. The same object may receive multiple close calls, so method
* should be idempotent.
*/
<T> T setTagIfAbsent(String key, T newValue) {
T previous;
synchronized (mBagOfTags) {
//noinspection unchecked
previous = (T) mBagOfTags.get(key);
if (previous == null) {
mBagOfTags.put(key, newValue);
}
}
T result = previous == null ? newValue : previous;
if (mCleared) {
// It is possible that we'll call close() multiple times on the same object, but
// Closeable interface requires close method to be idempotent:
// "if the stream is already closed then invoking this method has no effect." (c)
closeWithRuntimeException(result);
}
return result;
}
/**
* Returns the tag associated with this viewmodel and the specified key.
*/
@SuppressWarnings("TypeParameterUnusedInFormals")
<T> T getTag(String key) {
//noinspection unchecked
synchronized (mBagOfTags) {
return (T) mBagOfTags.get(key);
}
}
private static void closeWithRuntimeException(Object obj) {
if (obj instanceof Closeable) {
try {
((Closeable) obj).close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
可以看到:
mBagOfTags
这个成员变量是一个 HashMap,用于存放键值对,getTag(String key)
就是从mBagOfTags
取出该key
对应的值,并做了强制类型转换。setTagIfAbsent(String key, T newValue)
就是将该newValue
存储到mBagOfTags
中,以便下次取出使用。需要注意的是,如果想要给已经存在的 key 设置一个新值(newValue
),将不会生效,新值会被忽略,然后返回已经存在的旧值(previous
)。并且,如果ViewModel#clear()
已经被系统调用(比如它的 Activity/Fragment 已经销毁)时(mCleared = true
),新存储的值会调用closeWithRuntimeException(Object obj)
。- 在
ViewModel#clear()
中,会遍历mBagOfTags
,然后调用closeWithRuntimeException(Object obj)
。 closeWithRuntimeException(Object obj)
方法中,如果这个obj
是实现了Closeable
接口的类的对象,就会调用它的close
方法。
回到这个问题:为什么 viewModelScope
中启动的线程会在 ViewModel 销毁时被自动取消?
现在就可以有答案了:因为 ViewModel 的扩展属性 viewModelScope
是一个实现了 Closeable
接口的 CloseableCoroutineScope
,并且存放在了 ViewModel 的 mBagOfTags
中。由于 ViewModel#clear()
时会将 mBagOfTags
中所有实现了 Closeable
接口的类的对象关闭(close),所以会回调 CloseableCoroutineScope#close()
方法,并此方法内,取消了所有该 CoroutineScope 中的所有协程。
Test
TestCoroutineDispatcher
由 ViewModel.kt
源码可知 viewModelScope
是运行在主线程中的(CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main)
)。Dispatchers.Main
在 Android 平台是指 UI 线程,它的实现依赖 Looper.getMainLooper()
。因此我们无法使用 Unit tests 测试它们。
幸运的是,官方为我们提供了测试依赖可以替换 Dispatchers.Main
的实现。
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:$coroutines_version'
我们可以使用该库提供的 Dispatchers.setMain(dispatcher: CoroutineDispatcher)
来重写它,并且它也为我们提供了一个默认实现:TestCoroutineDispatcher。TestCoroutineDispatcher
是一个 CoroutineDispatcher
的实现类,我们可以使用它控制 Coroutines 的执行,比如 pause/resume 或控制它的虚拟时钟。该类是在 Kotlin Coroutines v1.2.1 新增的,目前还是一个实验性 API。可以查看官方文档。
我们不应在单元测试时使用 Dispatchers.Unconfined
来替换 Dispatchers.Main
,它不会是我们预想的那样验证结果或耗时。由于单元测试应该运行在一个隔离环境内,不受其它因素的影响,所以每次执行完一个测试,我们要恢复初始状态,可以调用 Dispatchers.resetMain()
重置。
当然,我们可以自定义 Rule 来避免样板代码:
@ExperimentalCoroutinesApi
class CoroutinesTestRule(
val testDispatcher: TestCoroutineDispatcher = TestCoroutineDispatcher()
) : TestWatcher() {
override fun starting(description: Description?) {
super.starting(description)
Dispatchers.setMain(testDispatcher)
}
override fun finished(description: Description?) {
super.finished(description)
Dispatchers.resetMain()
testDispatcher.cleanupTestCoroutines()
}
}
然后我们就可以在测试类中使用这个 Rule:
class MainViewModelUnitTest {
@get:Rule
var coroutinesTestRule = CoroutinesTestRule()
@Test
fun test() {
...
}
}
使用 Mockito 测试 Coroutines
我们一般使用 Mockito 的 verify
方法验证对象的方法是否调用,但这并不是一个完美的方式。我们最好验证我们的逻辑代码是否正确,比如某个元素是否存在。
在验证某个对象的方法是否调用前,我们要确保所有已启动的协程都执行完毕。举个例子:
class MainViewModel(private val dependency: Any): ViewModel {
fun sampleMethod() {
viewModelScope.launch {
val hashCode = dependency.hashCode()
// TODO: do something with hashCode
}
}
class MainViewModelUnitTest {
// Mockito setup goes here
...
@get:Rule
var coroutinesTestRule = CoroutinesTestRule()
@Test
fun test() = coroutinesTestRule.testDispatcher.runBlockingTest {
val subject = MainViewModel(mockObject)
subject.sampleMethod()
// Checks mockObject called the hashCode method that is expected from the coroutine created in sampleMethod
verify(mockObject).hashCode()
}
}
在 MainViewModelUnitTest
测试类中,我们使用了 TestCoroutineDispatcher
提供的 runBlockingTest
函数。由于 TestCoroutineDispatcher
重写了 Dispatchers.Main
,所以 MainViewModel
中的协程将会在这个 Dispatcher 中运行。runBlockingTest
函数可以保证所有测试代码中的协程都会同步执行。因此 verify
方法也将会在所有协程运行完后才会执行验证行为。
LifecycleScope
Usage
LifecycleScope
是 Lifecyle 的一个扩展属性,因此我们可以在任何可以拿到 Lifecyle 的地方(一般是 Activity/Fragment)使用它。每个在 LifecycleScope 中启动的协程都会在 Lifecycle 销毁的时候自动取消(cancel)。我们可以通过 lifecycle.coroutineScope
和 lifecycleOwner.lifecycleScope
使用 Lifecycle 的 LifecycleScope
。
下面这个例子,示范了怎么使用 lifecycleOwner.lifecycleScope
来异步创建预计算的文本。
class MyFragment: Fragment() {
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)
viewLifecycleOwner.lifecycleScope.launch {
val params = TextViewCompat.getTextMetricsParams(textView)
val precomputedText = withContext(Dispatchers.Default) {
PrecomputedTextCompat.create(longTextContent, params)
}
TextViewCompat.setPrecomputedText(textView, precomputedText)
}
}
}
Soure code
Lifecyle.kt
/**
* [CoroutineScope] tied to this [Lifecycle].
*
* This scope will be cancelled when the [Lifecycle] is destroyed.
*
* This scope is bound to [Dispatchers.Main]
*/
val Lifecycle.coroutineScope: LifecycleCoroutineScope
get() {
while (true) {
val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl?
if (existing != null) {
return existing
}
val newScope = LifecycleCoroutineScopeImpl(
this,
SupervisorJob() + Dispatchers.Main
)
if (mInternalScopeRef.compareAndSet(null, newScope)) {
newScope.register()
return newScope
}
}
}
/**
* [CoroutineScope] tied to a [Lifecycle] and [Dispatchers.Main]
*
* This scope will be cancelled when the [Lifecycle] is destroyed.
*
* This scope provides specialised versions of `launch`: [launchWhenCreated], [launchWhenStarted],
* [launchWhenResumed]
*/
abstract class LifecycleCoroutineScope internal constructor() : CoroutineScope {
internal abstract val lifecycle: Lifecycle
/**
* Launches and runs the given block when the [Lifecycle] controlling this
* [LifecycleCoroutineScope] is at least in [Lifecycle.State.CREATED] state.
*
* The returned [Job] will be cancelled when the [Lifecycle] is destroyed.
* @see Lifecycle.whenCreated
* @see Lifecycle.coroutineScope
*/
fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job = launch {
lifecycle.whenCreated(block)
}
/**
* Launches and runs the given block when the [Lifecycle] controlling this
* [LifecycleCoroutineScope] is at least in [Lifecycle.State.STARTED] state.
*
* The returned [Job] will be cancelled when the [Lifecycle] is destroyed.
* @see Lifecycle.whenStarted
* @see Lifecycle.coroutineScope
*/
fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job = launch {
lifecycle.whenStarted(block)
}
/**
* Launches and runs the given block when the [Lifecycle] controlling this
* [LifecycleCoroutineScope] is at least in [Lifecycle.State.RESUMED] state.
*
* The returned [Job] will be cancelled when the [Lifecycle] is destroyed.
* @see Lifecycle.whenResumed
* @see Lifecycle.coroutineScope
*/
fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job = launch {
lifecycle.whenResumed(block)
}
}
internal class LifecycleCoroutineScopeImpl(
override val lifecycle: Lifecycle,
override val coroutineContext: CoroutineContext
) : LifecycleCoroutineScope(), LifecycleEventObserver {
init {
// in case we are initialized on a non-main thread, make a best effort check before
// we return the scope. This is not sync but if developer is launching on a non-main
// dispatcher, they cannot be 100% sure anyways.
if (lifecycle.currentState == Lifecycle.State.DESTROYED) {
coroutineContext.cancel()
}
}
fun register() {
// TODO use Main.Immediate once it is graduated out of experimental.
launch(Dispatchers.Main) {
if (lifecycle.currentState >= Lifecycle.State.INITIALIZED) {
lifecycle.addObserver(this@LifecycleCoroutineScopeImpl)
} else {
coroutineContext.cancel()
}
}
}
override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
if (lifecycle.currentState <= Lifecycle.State.DESTROYED) {
lifecycle.removeObserver(this)
coroutineContext.cancel()
}
}
}
LifecycleOwner.kt
/**
* [CoroutineScope] tied to this [LifecycleOwner]'s [Lifecycle].
*
* This scope will be cancelled when the [Lifecycle] is destroyed.
*
* This scope is bound to [Dispatchers.Main].
*/
val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
get() = lifecycle.coroutineScope
Suspend Lifecycle-aware coroutines
尽管 CoroutineScope
为我们提供了一种合适的方式来自动取消耗时操作,但在有些情况下,Lifecycle
不在一个确定的状态时,也就是说是一个暂态,随时可能会转移到其他状态时,我们可能想挂起(suspend
)我们的方法执行。举个栗子,我们必须在 Lifecycle
的状态大于等于 STARTED
时,才可以使用 FragmentTransaction
。对于这些情况,Lifecycle
也提供了对应的扩展方法:lifecycle.whenCreated
、lifecycle.whenStarted
和 lifecycle.whenResumed
。如果 Lifecycle
没有到达期望的最小状态时,运行在这些方法内的所有协程都会被挂起。
下面这个例子展示了至少需要 Lifecycle
达到 STARTED
状态时,才会执行 lifecycle.whenStarted
里面的代码。
class MyFragment: Fragment {
init { // Notice that we can safely launch in the constructor of the Fragment.
lifecycleScope.launch {
whenStarted {
// The block inside will run only when Lifecycle is at least STARTED.
// It will start executing when fragment is started and
// can call other suspend methods.
loadingView.visibility = View.VISIBLE
val canAccess = withContext(Dispatchers.IO) {
checkUserAccess()
}
// When checkUserAccess returns, the next line is automatically
// suspended if the Lifecycle is not *at least* STARTED.
// We could safely run fragment transactions because we know the
// code won't run unless the lifecycle is at least STARTED.
loadingView.visibility = View.GONE
if (canAccess == false) {
findNavController().popBackStack()
} else {
showContent()
}
}
// This line runs only after the whenStarted block above has completed.
}
}
}
当 Lifecyle
销毁时,使用这些 when
方法启动的协程将会被自动取消。下面的例子,一旦 Lifecyle
的状态变为 DESTROYED
, finally
代码块会立即执行。
class MyFragment: Fragment {
init {
lifecycleScope.launchWhenStarted {
try {
// Call some suspend functions.
} finally {
// This line might execute after Lifecycle is DESTROYED.
if (lifecycle.state >= STARTED) {
// Here, since we've checked, it is safe to run any
// Fragment transactions.
}
}
}
}
}
注意:尽管这些
Lifecycle
的扩展属性或扩展方法为我们提供了很大的便利,但在Lifecyle
生命周期内,我们最好在能保证消息有效的情况下使用它们(比如上面的 precomputed text)。
另外需要注意的是,Activity 重启(restart) 时,这些协程不会被重启(restart)。
Use coroutines with LiveData
我们在使用 LiveData
时,经常需要异步获取数据然后设置给 LiveData
。比如,我们可能需要读取用户设置,然后展示到 UI 上。这时,我们可以使用 liveData
扩展构造函数,在该函数内可以调用 suspend
函数获取数据并将结果传递给 LiveData
。
如下所示,loadUser()
是一个 suspend
函数,通过查询数据库返回一个 User
对象。使用 liveData
扩展构造函数,我们可以异步调用 loadUser()
,然后使用 emit()
方法改变 LiveData
的 value
。
val user: LiveData<User> = liveData {
val data = database.loadUser() // loadUser is a suspend function.
emit(data)
}
liveData
扩展构造函数为 Coroutines 和 LiveData
提供了结构化并发(Structured concurrency)的支持。livedata
包含的代码块会在 LiveData
变为 active
时自动执行,并在 LiveData
变为 inactive
时在一个可配置的时间后自动取消。若该代码块在执行完之前就被取消了,那么在 LiveData
再次 active
时,代码块也会重新执行。但若它已经执行完毕,则不会重新执行。并且该代码块只会在被自动取消的情况下才会在 LiveData
再次 active
时重新执行。 如果该代码块由于其他原因(比如抛出 CancelationExeption)被取消了,它也不会重新重新执行。
我们可以在 livedata
扩展构造函数内发射多个值。每次发射(调用 emit()
)都会执行主线程的 suspend
函数,直到该值被设置给 LiveData
。
val user: LiveData<Result> = liveData {
emit(Result.loading())
try {
emit(Result.success(fetchUser())
} catch(ioException: Exception) {
emit(Result.error(ioException))
}
}
我们也可以使用在 Transformations 提供的操作符内使用 liveData
,如下所示:
class MyViewModel: ViewModel() {
private val userId: LiveData<String> = MutableLiveData()
val user = userId.switchMap { id ->
liveData(context = viewModelScope.coroutineContext + Dispatchers.IO) {
emit(database.loadUserById(id))
}
}
}
我们也可以在任何时候使用 emitSource()
函数发射多个值给 LiveData
。但要注意:每次调用 emit()
或 emitSource()
都会清除之前添加的值。 可见源码(LiveDataScope)。
class UserDao: Dao {
@Query("SELECT * FROM User WHERE id = :id")
fun getUser(id: String): LiveData<User>
}
class MyRepository {
fun getUser(id: String) = liveData<User> {
val disposable = emitSource(
userDao.getUser(id).map {
Result.loading(it)
}
)
try {
val user = webservice.fetchUser(id)
// Stop the previous emission to avoid dispatching the updated user
// as `loading`.
disposable.dispose()
// Update the database.
userDao.insert(user)
// Re-establish the emission with success type.
emitSource(
userDao.getUser(id).map {
Result.success(it)
}
)
} catch(exception: IOException) {
// Any call to `emit` disposes the previous one automatically so we don't
// need to dispose it here as we didn't get an updated value.
emitSource(
userDao.getUser(id).map {
Result.error(exception, it)
}
)
}
}
}
Reference
- 【Official】Use Kotlin coroutines with Architecture components
- 【Medium】Easy Coroutines in Android: viewModelScope
- 【GitHub】Kotlin/kotlinx.coroutines
- 【Codelab】Unit Testing with Coroutines
联系
我是 xiaobailong24,您可以通过以下平台找到我: