
关于
Android 和 KMP 的 Kotlin 协程和 Flow 模式——结构化并发、Flow 操作符、StateFlow 等。
name: kotlin-coroutines-flows description: Kotlin 协程和 Flow 模式,适用于 Android 和 KMP — 结构化并发、Flow 操作符、StateFlow、错误处理和测试。 origin: ECC
Kotlin 协程与 Flows
适用于 Android 和 Kotlin 多平台项目的结构化并发、基于 Flow 的响应式流和协程测试模式。
何时激活
- 使用 Kotlin 协程编写异步代码
- 使用 Flow、StateFlow 或 SharedFlow 进行响应式数据处理
- 处理并发操作(并行加载、防抖、重试)
- 测试协程和 Flows
- 管理协程作用域和取消
结构化并发
作用域层级
Application
└── viewModelScope (ViewModel)
└── coroutineScope { } (structured child)
├── async { } (concurrent task)
└── async { } (concurrent task)
始终使用结构化并发 — 绝不使用 GlobalScope:
// BAD
GlobalScope.launch { fetchData() }
// GOOD — scoped to ViewModel lifecycle
viewModelScope.launch { fetchData() }
// GOOD — scoped to composable lifecycle
LaunchedEffect(key) { fetchData() }
并行分解
使用 coroutineScope + async 进行并行工作:
suspend fun loadDashboard(): Dashboard = coroutineScope {
val items = async { itemRepository.getRecent() }
val stats = async { statsRepository.getToday() }
val profile = async { userRepository.getCurrent() }
Dashboard(
items = items.await(),
stats = stats.await(),
profile = profile.await()
)
}
SupervisorScope
当子任务失败不应取消兄弟任务时使用 supervisorScope:
suspend fun syncAll() = supervisorScope {
launch { syncItems() } // failure here won't cancel syncStats
launch { syncStats() }
launch { syncSettings() }
}
Flow 模式
冷 Flow — 一次性到流的转换
fun observeItems(): Flow<List<Item>> = flow {
itemDao.observeAll()
.map { entities -> entities.map { it.toDomain() } }
.collect { emit(it) }
}
StateFlow 用于 UI 状态
class DashboardViewModel(
observeProgress: ObserveUserProgressUseCase
) : ViewModel() {
val progress: StateFlow<UserProgress> = observeProgress()
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5_000),
initialValue = UserProgress.EMPTY
)
}
WhileSubscribed(5_000) 在最后一个订阅者离开后保持上游活跃 5 秒 — 在配置更改时不重启。
组合多个 Flows
val uiState: StateFlow<HomeState> = combine(
itemRepository.observeItems(),
settingsRepository.observeTheme(),
userRepository.observeProfile()
) { items, theme, profile ->
HomeState(items = items, theme = theme, profile = profile)
}.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5_000), HomeState())
Flow 操作符
// Debounce search input
searchQuery
.debounce(300)
.distinctUntilChanged()
.flatMapLatest { query -> repository.search(query) }
.catch { emit(emptyList()) }
.collect { results -> _state.update { it.copy(results = results) } }
// Retry with exponential backoff
fun fetchWithRetry(): Flow<Data> = flow { emit(api.fetch()) }
.retryWhen { cause, attempt ->
if (cause is IOException && attempt < 3) {
delay(1000L * (1 shl attempt.toInt()))
true
} else {
false
}
}
SharedFlow 用于一次性事件
class ItemListViewModel : ViewModel() {
private val _effects = MutableSharedFlow<Effect>()
val effects: SharedFlow<Effect> = _effects.asSharedFlow()
sealed interface Effect {
data class ShowSnackbar(val message: String) : Effect
data class NavigateTo(val route: String) : Effect
}
private fun deleteItem(id: String) {
viewModelScope.launch {
repository.delete(id)
_effects.emit(Effect.ShowSnackbar("Item deleted"))
}
}
}
// Collect in Composable
LaunchedEffect(Unit) {
viewModel.effects.collect { effect ->
when (effect) {
is Effect.ShowSnackbar -> snackbarHostState.showSnackbar(effect.message)
is Effect.NavigateTo -> navController.navigate(effect.route)
}
}
}
调度器
// CPU-intensive work
withContext(Dispatchers.Default) { parseJson(largePayload) }
// IO-bound work
withContext(Dispatchers.IO) { database.query() }
// Main thread (UI) — default in viewModelScope
withContext(Dispatchers.Main) { updateUi() }
在 KMP 中,使用 Dispatchers.Default 和 Dispatchers.Main(所有平台可用)。Dispatchers.IO 仅限 JVM/Android — 在其他平台使用 Dispatchers.Default 或通过 DI 提供。
取消
协作式取消
长时间运行的循环必须检查取消状态
兼容工具
Claude CodeCursor
标签
移动端
