ViewModel 中的 StateFlow 和 SharedFlow:使用建议以及单元测试
StateFlow 和 SharedFlow 都是 Kotlin Coroutines 中的数据流类型,用于在异步环境中传递数据。理解它们的区别并正确应用于 Android 开发中,对于构建响应式、可测试的 MVVM 架构至关重要。
核心概念简介
StateFlow:
- 一个状态容器式可观察数据流,可以向其收集器发出当前状态和新状态。
- 是热数据流(Hot Flow),始终持有最新值。
- 适用于 UI 状态管理,确保订阅者总能获取到最新的状态快照。
SharedFlow:
- SharedFlow 是 StateFlow 的可配置性极高的泛化数据流(StateFlow 继承于 SharedFlow)。
- 支持更灵活的重放策略(replay)和背压处理(backpressure)。
- 适用于事件通知、一次性操作或需要控制重播次数的场景。
为什么在 ViewModel 中暴露热流而不是冷流?
在 MVVM 模型中,ViewModel 中暴露出来的 StateFlow 或 SharedFlow 应该是 UI 层中唯一的可信数据来源。如果我们暴露出的是普通的冷流(Cold Flow),会导致每次有新的流收集者时就会触发一次 emit,造成资源浪费(例如重复的网络请求)。
如果 Repository 提供的只有简单的冷流,可以通过以下两种方式将其转换为热流:
- 手动转换:正常收集冷流,收集到一个数据就往另外构建的 StateFlow 或 SharedFlow 发送。
- 使用拓展函数:直接使用
stateIn或shareIn拓展函数转换成热流。这是官方推荐的方式。
使用 stateIn 示例
private const val DEFAULT_TIMEOUT = 500L
@HiltViewModel
class MyViewModel @Inject constructor(
private val userRepository: UserRepository
) : ViewModel() {
// 将冷流转换为热流 StateFlow
val userFlow: StateFlow<UiState> = userRepository
.getUsers()
.asResult() // 此处返回 Flow<Result<User>>
.map { result ->
when (result) {
is Result.Loading -> UiState.Loading
is Result.Success -> UiState.Success(result.data)
is Result.Error -> UiState.Error(result.exception)
}
}
.stateIn(
scope = viewModelScope,
initialValue = UiState.Loading,
started = SharingStarted.WhileSubscribed(DEFAULT_TIMEOUT)
)
}
参数说明:
scope: 通常传入viewModelScope,确保生命周期绑定。initialValue: 初始状态,UI 加载时立即显示。started: 控制何时开始收集。WhileSubscribed表示当没有订阅者时停止收集,并在超时后清理,防止内存泄漏。
多数据源合并
在一些业务复杂的页面,比如首页,通常会有多个数据来源。为了保证单一可靠数据源原则,我们可以使用 combine 函数将多个 flow 组成一个 flow,然后再使用 stateIn 函数转换成 StateFlow。
val combinedState = combine(userFlow, settingsFlow) { user, settings ->
CombinedState(user, settings)
}.stateIn(viewModelScope, SharingStarted.WhileSubscribed(), CombinedState.EMPTY)
StateFlow 与 SharedFlow 如何选择?
虽然两者都是热流,但在具体场景下选择有所不同。选择时主要考虑以下问题:
-
我真的需要在特定的时间、位置获取 Flow 的最新状态吗?
- 如果不需要(例如事件通知),那考虑 SharedFlow。
- 如果需要(例如 UI 状态),StateFlow 更合适,因为它保证有初始值且总是持有最新值。
-
我需要重复发射和收集同样的值吗?
- 如果需要,那考虑 SharedFlow。因为 StateFlow 会忽略连续两次重复的值(基于 equals 比较)。
-
当有新的订阅者订阅的时候,我需要发射最近的多个值吗?
- 如果需要,那考虑 SharedFlow,可以配置
replay参数。StateFlow 默认 replay 为 1。
- 如果需要,那考虑 SharedFlow,可以配置
SharedFlow 的高级配置
SharedFlow 允许配置 replay 和 onBufferOverflow:
replay: 新订阅者能接收到的历史消息数量。onBufferOverflow: 当缓冲区满时的行为(如DROP_OLDEST,CATCH,SUSPEND)。
JetPack Compose 中收集流的方式
关于在 UI 层收集 ViewModel 层的热流方式,官方文档已有介绍,但在 JetPack Compose 中推荐使用生命周期感知的方法。
先添加依赖:
implementation 'androidx.lifecycle:lifecycle-runtime-compose:2.6.0-alpha03'
代码示例
// 收集 StateFlow
val uiState by viewModel.userFlow.collectAsStateWithLifecycle()
// 收集 SharedFlow,区别在于需要赋初始值
val sharedState by viewModel.eventFlow.collectAsStateWithLifecycle(
initialValue = UiState.Loading
)
when (uiState) {
is UiState.Loading -> LoadingView()
is UiState.Success -> SuccessView(uiState.data)
is UiState.Error -> ErrorView(uiState.exception)
}
注意: 使用 collectAsStateWithLifecycle() 可以保证流的收集操作只发生在应用位于前台的时候,避免造成资源浪费。这与 collectAsState 不同,后者不会自动暂停/恢复。
单元测试实践
由于我们会在 ViewModel 中使用到 viewModelScope,首先可以定义一个 MainDispatcherRule,用于设置 MainDispatcher,确保协程在测试中同步执行。
1. 设置测试调度器
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.test.TestDispatcher
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.resetMain
import kotlinx.coroutines.test.setMain
import org.junit.rules.TestWatcher
import org.junit.runner.Description
class MainDispatcherRule(
val testDispatcher: TestDispatcher = UnconfinedTestDispatcher()
) : TestWatcher() {
override fun starting(description: Description) {
super.starting(description)
Dispatchers.setMain(testDispatcher)
}
override fun finished(description: Description) {
super.finished(description)
Dispatchers.resetMain()
}
}
将规则应用到测试类中:
class MyViewModelTest {
@get:Rule
val mainDispatcherRule = MainDispatcherRule()
// ... 测试方法
}
2. 测试 StateFlow
对于 StateFlow,可以直接访问 .value 属性来获取当前状态,无需等待收集完成。
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `when initialized, repository emits loading and data`() = runTest {
// arrange
val viewModel = MyViewModel(repository)
val users = listOf(User("Alice"), User("Bob"))
// 初始值应该是 UiState.Loading,因为 stateFlow 可以直接获取最新值
assertEquals(UiState.Loading, viewModel.userFlow.value)
// action
repository.sendUsers(users)
viewModel.onRefresh()
// check
assertEquals(UiState.Success(users), viewModel.userFlow.value)
}
Mock Repository 实现:
class TestUserRepository : UserRepository {
private val usersFlow = MutableSharedFlow<List<User>>(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
override fun getUsers(): Flow<List<User>> = usersFlow
suspend fun sendUsers(users: List<User>) {
usersFlow.emit(users)
}
}
使用 stateIn 时的测试:
当 ViewModel 内部使用 stateIn 时,外部无法直接通过 .value 捕获所有中间状态,需要在测试中启动收集。
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `test stateIn with external collection`() = runTest {
val viewModel = MainWithStateinViewModel(repository)
val users = listOf(User("Alice"))
// 启动收集以触发 stateIn 的逻辑
val collectJob = launch(UnconfinedTestDispatcher(testScheduler)) {
viewModel.userFlow.collect()
}
assertEquals(UiState.Loading, viewModel.userFlow.value)
repository.sendUsers(users)
assertEquals(UiState.Success(users), viewModel.userFlow.value)
collectJob.cancel()
}
3. 测试 SharedFlow
测试 SharedFlow 可以使用 Turbine 库,它提供了更流畅的断言 API。
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `test SharedFlow events`() = runTest {
val viewModel = MainWithShareInViewModel(repository)
val users = listOf(User("Alice"))
repository.sendUsers(users)
viewModel.userFlow.test {
val firstItem = awaitItem()
assertEquals(UiState.Loading, firstItem)
val secondItem = awaitItem()
assertEquals(UiState.Success(users), secondItem)
}
}
Turbine 的 test 扩展函数会自动处理协程作用域,简化了测试代码。
最佳实践总结
- 统一数据源:ViewModel 应作为 UI 的唯一可信数据源,避免 View 直接调用 Repository。
- 生命周期安全:始终使用
viewModelScope和SharingStarted.WhileSubscribed防止内存泄漏。 - 状态不可变:尽量使用
Immutable对象作为 StateFlow 的类型,避免意外修改。 - 错误处理:在
asResult等转换层统一处理异常,不要将原始 Exception 暴露给 UI。 - 测试覆盖:针对冷流转热流、边界条件、取消操作编写单元测试,确保逻辑健壮。
通过合理使用 StateFlow 和 SharedFlow,并结合完善的单元测试,可以显著提升 Android 应用的稳定性和可维护性。

