StateFlow 和 SharedFlow 都是 Kotlin 协程中的数据流类型,属于热数据流(Hot Flow)。官方概念简介如下:
StateFlow:一个状态容器式可观察数据流,可以向其收集器发出当前状态和新状态。它始终包含一个值,适合表示 UI 状态。
SharedFlow:SharedFlow 是 StateFlow 的可配置性极高的泛化数据流(StateFlow 继承于 SharedFlow),适合事件通知或需要重放历史值的场景。
对于两者的基本使用以及区别,本文会给出一些关于如何在业务中选择合适热流的建议,以及详细的单元测试代码实现。
StateFlow 的一般用法
以读取数据库数据为例,Repository 负责从数据库读取相应数据并返回一个 flow,在 ViewModel 收集这个 flow 中的数据并更新状态(StateFlow)。在 MVVM 模型中,ViewModel 中暴露出来的 StateFlow 应该是 UI 层中唯一的可信数据来源,注意是唯一,这点跟使用 LiveData 的时候不同。
为什么要在 ViewModel 中暴露热流而不是冷流?
如果我们如果暴露出的是普通的冷流(Cold Flow),会导致每次有新的流收集者时就会触发一次 emit,造成资源浪费。例如,当用户切换 Tab 导致 Activity 重建时,冷流会重新执行查询逻辑,可能导致重复请求网络或数据库。
所以如果 Repository 提供的只有简单的冷流怎么办?很简单,将之转换成热流就好了!通常可以采用以下两种方式:
- 手动转换:还是正常收集冷流,收集到一个数据就往另外构建的 MutableStateFlow 或 MutableSharedFlow 发送。
- 使用拓展函数:使用
stateIn 或 shareIn 拓展函数转换成热流。
既然官方给我们提供了拓展函数,那肯定是直接使用这个方案最好,使用方式如下:
private const val DEFAULT_TIMEOUT = 500L
@HiltViewModel
class MyViewModel @Inject constructor(
userRepository: UserRepository
): ViewModel() {
val userFlow: StateFlow<UiState> = userRepository
.getUsers()
.asResult()
.map { result ->
when(result) {
is Result.Loading -> UiState.Loading
is Result.Success -> UiState.Success(result.data)
is Result.Error -> UiState.Error(result.exception)
}
}
.stateIn(
scope = viewModelScope,
initialValue = UiState.Loading,
started = SharingStarted.WhileSubscribed(DEFAULT_TIMEOUT)
)
}
其中 started 参数保证了当配置改变(如屏幕旋转)时不会重新触发订阅,避免不必要的计算。DEFAULT_TIMEOUT 定义了停止订阅后多久清除缓存的状态。
在一些业务复杂的页面,比如首页,通常会有多个数据来源,也就有多个 flow,为了保证单一可靠数据源原则,我们可以使用 combine 函数将多个 flow 组成一个 flow,然后再使用 stateIn 函数转换成 StateFlow。
shareIn 拓展函数使用方式也是类似的,只不过没有初始值 initialValue 参数,且默认行为是立即开始共享,需配合 replay 参数控制重放数量。
StateFlow 与 SharedFlow 如何选择?
上文说到,我们应该在 ViewModel 中暴露出热流,现在我们有两个热流-StateFlow 和 SharedFlow,如何选择?没什么特定的规则,选择的时候只需要想一下以下几个问题:
-
我真的需要在特定的时间、位置获取 Flow 的最新状态吗?
如果不需要,那考虑 SharedFlow,比如常用的事件通知功能(如 Toast 提示、导航跳转)。
-
我需要重复发射和收集同样的值吗?
如果需要,那考虑 SharedFlow,因为 StateFlow 会忽略连续两次重复的值(基于 equals 比较),而 SharedFlow 可以配置重放策略。
-
当有新的订阅者订阅的时候,我需要发射最近的多个值吗?
如果需要,那考虑 SharedFlow,可以配置 replay 参数。StateFlow 只能重放最后一个值。
-
是否需要初始值?
StateFlow 必须有初始值,SharedFlow 可选。UI 状态通常需要初始值来渲染加载态。
JetPack Compose 中收集流的方式
关于在 UI 层收集 ViewModel 层的热流方式,官方文档已经有介绍,但是没有补充在 JetPack Compose 中的收集流方式,下面补充一下。
先添加依赖 implementation 'androidx.lifecycle:lifecycle-runtime-compose:2.6.0-alpha03'(版本号视项目实际情况而定)。
val uiState by viewModel.userFlow.collectAsStateWithLifecycle()
val uiState by viewModel.userFlow.collectAsStateWithLifecycle(
initialValue = UiState.Loading
)
when(uiState) {
is UiState.Loading -> {
}
is UiState.Success -> {
}
is UiState.Error -> {
}
}
使用 collectAsStateWithLifecycle() 也是可以保证流的收集操作只发生在应用位于前台的时候,避免造成资源浪费。这比普通的 collectAsState() 更安全,因为它会自动处理 Lifecycle 状态。
单元测试实践
由于我们会在 ViewModel 中使用到 viewModelScope,首先可以定义一个 MainDispatcherRule,用于设置 MainDispatcher,确保测试在单线程下运行,便于调试。
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.test.TestDispatcher
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.resetMain
import kotlinx.coroutines.test.setMain
import org.junit.rules.TestRule
import org.junit.rules.TestWatcher
import org.junit.runner.Description
class MainDispatcherRule(
val testDispatcher: TestDispatcher = UnconfinedTestDispatcher()
) : TestWatcher() {
override fun starting(description: Description) {
super.starting(description)
Dispatchers.setMain(testDispatcher)
}
override fun finished(description: Description) {
super.finished(description)
Dispatchers.resetMain()
}
}
将 MainDispatcherRule 用于 ViewModel 单元测试代码中:
class MyViewModelTest {
@get:Rule
val mainDispatcherRule = MainDispatcherRule()
}
1. 测试 StateFlow
现在我们有一个业务 ViewModel 如下:
@HiltViewModel
class MyViewModel @Inject constructor(
private val userRepository: UserRepository
) : ViewModel() {
private val _userFlow = MutableStateFlow<UiState>(UiState.Loading)
val userFlow: StateFlow<UiState> = _userFlow.asStateFlow()
fun onRefresh() {
viewModelScope.launch {
userRepository
.getUsers().asResult()
.collect { result ->
_userFlow.update {
when (result) {
is Result.Loading -> UiState.Loading
is Result.Success -> UiState.Success(result.data)
is Result.Error -> UiState.Error(result.exception)
}
}
}
}
}
}
单元测试代码如下:
class MyViewModelTest{
@get:Rule
val mainDispatcherRule = MainDispatcherRule()
private val repository = TestUserRepository()
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `when initialized, repository emits loading and data`() = runTest {
val viewModel = MyViewModel(repository)
val users = listOf(User("id1"), User("id2"))
assertEquals(UiState.Loading, viewModel.userFlow.value)
repository.sendUsers(users)
viewModel.onRefresh()
assertEquals(UiState.Success(users), viewModel.userFlow.value)
}
}
class TestUserRepository : UserRepository {
private val usersFlow =
MutableSharedFlow<List<User>>(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
override fun getUsers(): Flow<List<User>> {
return usersFlow
}
suspend fun sendUsers(users: List<>) {
usersFlow.emit(users)
}
}
如果 ViewModel 中使用的是 stateIn 拓展函数:
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `when initialized, repository emits loading and data`() = runTest {
val viewModel = MainWithStateinViewModel(repository)
val users = listOf(User("id1"))
val collectJob = launch(UnconfinedTestDispatcher(testScheduler)) {
viewModel.userFlow.collect()
}
assertEquals(UiState.Loading, viewModel.userFlow.value)
repository.sendUsers(users)
assertEquals(UiState.Success(users), viewModel.userFlow.value)
collectJob.cancel()
}
2. 测试 SharedFlow
测试 SharedFlow 可以使用一个开源库,Turbine 是一个用于测试 Flow 的小型开源库,它能提供更清晰的断言接口。
测试使用 sharedIn 拓展函数的 SharedFlow:
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `when initialized, repository emits loading and data`() = runTest {
val viewModel = MainWithShareInViewModel(repository)
val users = listOf(User("id1"))
repository.sendUsers(users)
viewModel.userFlow.test {
val firstItem = awaitItem()
assertEquals(UiState.Loading, firstItem)
val secondItem = awaitItem()
assertEquals(UiState.Success(users), secondItem)
}
}
使用 Turbine 的 .test {} 块可以自动管理协程作用域,并且提供 awaitItem()、awaitComplete()、awaitError() 等方法来验证流的行为。
最佳实践与常见陷阱
- 内存泄漏:确保在 ViewModel 销毁时取消所有协程。使用
viewModelScope 可以自动处理,但手动启动协程时需小心。
- 生命周期感知:在 Compose 中优先使用
collectAsStateWithLifecycle(),避免在非前台状态下消耗资源。
- 并发控制:在测试中务必使用
runTest 和自定义 TestDispatcher,否则异步操作可能导致测试不稳定。
- 错误处理:确保
Flow 中的异常被正确捕获并转换为 UiState.Error,避免应用崩溃。
- 初始值:StateFlow 必须提供初始值,防止 UI 在首次加载前处于空白状态。