在 Kotlin 协程的生态中,Flow 通常被视为冷流,即只有在调用 collect 时才开始生产数据。但在实际开发中,我们往往需要更灵活的机制:多个订阅者共享数据源、生产与消费解耦、或者持久化状态。这正是 SharedFlow 与 StateFlow 的用武之地。
1、SharedFlow
普通 Flow 的核心逻辑是收集驱动。每次调用 collect 都会启动一个新的生产流程,这在处理独立的数据请求时没问题,但在处理全局事件时效率低下。
1.1 核心本质:从拉取到订阅
普通的 Flow 是冷流。它的逻辑是按需生产:只有当你调用 collect 时,生产块才会执行。这就像点外卖,你不下单,厨师就不炒菜。
而 SharedFlow 的出现,标志着模型从 「数据流收集」 向 「事件流订阅」 的转换。
- 解耦:生产流程独立于消费流程。
- 广播:一个生产者可以对应多个订阅者(Subscriber)。
- 热流:即使没人听,它也可能在后台默默运行(取决于启动策略)。
1.2 shareIn:将冷流转为热流
1.2.1 函数签名
1 | public fun <T> Flow<T>.shareIn( |
通过 shareIn 操作符,你可以将任何普通 Flow 转换为 SharedFlow。它有三个关键参数:
scope: CoroutineScope(协程作用域)
- 作用:定义上游
Flow开始生产数据并进行广播的协程范围。 - 关键点:
SharedFlow的生命周期与该scope绑定。如果scope被取消,上游生产协程也会随之停止。在 Android 中,通常使用viewModelScope。
- 作用:定义上游
replay: Int(重播缓存)
- 默认值:
0 - 作用:定义发出的最新数据在内存中保留的数量。
- 行为:当新的订阅者调用
collect时,它会立即收到缓存中的这N条历史数据。replay = 0:纯实时流,新订阅者只能看到开始订阅后的新数据。replay = 1:新订阅者能看到“上一条”发出的数据(类似粘性事件)。
- 默认值:
started: SharingStarted(启动策略)
这是
shareIn最精妙的参数,它决定了上游数据流何时「开火」以及何时「熄火」。策略模式 行为描述 适用场景 Eagerly立即启动。无论有没有订阅者,上游立即开始生产。 只要对象存在就必须运行的任务(如后台长连接)。 Lazy延迟启动。第一个订阅者出现时启动,且永不停止。 初始化开销大,但一旦启动就希望保持活跃的任务。 WhileSubscribed按需启动/停止。有订阅者时启动,无订阅者时停止。 Android 推荐方案。节省电力和内存,仅在页面可见时工作。 进阶:
WhileSubscribed的精细控制1
2
3
4SharingStarted.WhileSubscribed(
stopTimeoutMillis = 5000,
replayExpirationMillis = 0
)stopTimeoutMillis (5000ms):当最后一个订阅者消失后,上游会多跑 5 秒才真正关闭。replayExpirationMillis:停止后,缓存的数据保留多久。默认Long.MAX_VALUE表示永久保留。
为什么? 在 Android 中,屏幕旋转会导致 Activity 重建,旧订阅者取消到新订阅者加入会有短暂空窗期。设置 5 秒超时可避免因旋屏导致的上游流频繁重启。
1.2.2 代码示例
1 | // 将一个冷流(如每秒一次的数据库查询)转换为热流 |
1.2.3 行为特征
生产者与消费者解耦
普通 Flow 的collect会触发flow { ... }块的执行;而shareIn返回的SharedFlow仅仅是开启了一个转发器协程。上游在发,你在不在听,它都在那里。引用计数机制
shareIn内部维护着一个订阅者计数器。特别是WhileSubscribed模式下,它通过监听subscriptionCount的变化来触发上游的launch或cancel。返回类型:
SharedFlow<T>
shareIn的返回结果是只读的SharedFlow。如果你需要手动通过emit发送数据,应该使用MutableSharedFlow。
1.3 MutableSharedFlow:手动发送事件
如果说 shareIn 是将现有的冷流「热化」,那么 MutableSharedFlow 就是一个完全自主的广播电台。它不依赖于上游 Flow,允许你在任何协程、任何位置手动发布事件。
1.3.1 构造函数
1 | public fun <T> MutableSharedFlow( |
通过 MutableSharedFlow,你可以构建一个不依赖上游 Flow 的事件流。它有三个关键参数:
- replay (重播缓存)
- 定义:新订阅者接入时,立即重发给它的历史数据条数。
- 本质:它是已消费数据的持久化。
- 场景:设置为
1时,它具有“粘性”,确保 UI 组件在订阅瞬间能拿到最近的一个状态。
- extraBufferCapacity (额外缓冲区)
- 定义:在
replay之外,额外预留的缓冲区容量。 - 公式:总容量
TotalCapacity=replay+extraBufferCapacity。 - 本质:它是为了处理生产与消费速率不匹配的缓冲。
- 定义:在
- onBufferOverflow (溢出策略)
当总容量(replay+extra)满载,且仍有新数据进入时,电台该如何处理:SUSPEND(默认):发送端(emit)挂起,直到有订阅者腾出空间。DROP_OLDEST:丢弃最老的一条数据,存入最新的。这是 StateFlow 的默认逻辑。DROP_LATEST:丢弃当前正要发送的数据,保留缓冲区不动。
1.3.2 关键方法
数据发布:
emitvstryEmitsuspend fun emit(value: T):- 挂起函数:如果缓冲区满了且策略是
SUSPEND,它会一直等。 - 保证送达:只要协程不取消,数据一定能进入缓冲区。
- 挂起函数:如果缓冲区满了且策略是
fun tryEmit(value: T): Boolean:- 普通函数:尝试非阻塞式发送。
- 返回值:成功进入缓冲区返回
true;如果缓冲区满且需要挂起,则直接返回false并不发送。
订阅监听:
collect- 特征:返回类型为
Nothing,意味着它是一个无限循环。 - 停止:只能通过取消所在的协程(Job)来停止监听。
- 特征:返回类型为
只读转换:
asSharedFlow()- 工程实践:为了防止外部恶意调用
emit破坏逻辑,ViewModel通常只暴露转换后的SharedFlow。
- 工程实践:为了防止外部恶意调用
1.3.3 概括总结
- MutableSharedFlow 是
SharedFlow的增强版,因为它实现了FlowCollector接口。 - 它通过
replay提供缓存,通过extraBufferCapacity提供缓冲。 - 它是构建 MVI 架构中 Effect(副作用) 或 事件总线(Event Bus) 的官方首选工具。
1.3.4 代码示例
根据 MVI (Model-View-Intent) 或 干净架构 的原则,事件的触发逻辑通常被封装在 ViewModel 中,而 Activity 通过调用 ViewModel 的方法来「间接」触发事件。
以下是一个完整的双向互动示例:Activity 如何通过点击按钮触发逻辑,并最终收到 SharedFlow 发回的反馈。
ViewModel 层:逻辑中心。ViewModel 负责定义“动作”和“结果”。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26class MainViewModel : ViewModel() {
// 内部私有:用于发送导航或弹窗事件
private val _eventFlow = MutableSharedFlow<UiEvent>(replay = 0)
// 外部只读:Activity 订阅它
val eventFlow = _eventFlow.asSharedFlow()
// 触发逻辑的方法
fun onLoginClicked(username: String) {
viewModelScope.launch {
if (username.isEmpty()) {
// 触发一个“提示错误”的事件
_eventFlow.emit(UiEvent.ShowToast("用户名不能为空"))
} else {
// 模拟网络请求后触发“导航”事件
_eventFlow.emit(UiEvent.NavigateToHome)
}
}
}
// 定义事件类型(密封类)
sealed class UiEvent {
data class ShowToast(val message: String) : UiEvent()
object NavigateToHome : UiEvent()
}
}Activity 层:事件的发送者 + 接收者。Activity 的角色是将用户操作传给 ViewModel,并等待 ViewModel 的指令。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32class MainActivity : AppCompatActivity() {
private val viewModel: MainViewModel by viewModels()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
val binding = ActivityMainBinding.inflate(layoutInflater)
setContentView(binding.root)
// --- 1. 事件的触发 (Trigger) ---
binding.btnLogin.setOnClickListener {
val name = binding.etUsername.text.toString()
// Activity 不直接发 Flow,而是调用 ViewModel 的业务方法
viewModel.onLoginClicked(name)
}
// --- 2. 事件的收集 (Collection) ---
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.eventFlow.collect { event ->
when (event) {
is MainViewModel.UiEvent.ShowToast -> {
Toast.makeText(this@MainActivity, event.message, Toast.LENGTH_SHORT).show()
}
is MainViewModel.UiEvent.NavigateToHome -> {
startActivity(Intent(this@MainActivity, HomeActivity::class.java))
}
}
}
}
}
}
}
1.3.5 架构收益
单向数据流 (UDF):
- Activity → ViewModel:通过普通的函数调用(
onLoginClicked)。 - ViewModel → Activity:通过订阅
Flow(eventFlow.collect)。 - 这种闭环让调试变得非常简单:如果你发现没弹窗,你只需要检查
ViewModel的emit是否执行了。
- Activity → ViewModel:通过普通的函数调用(
业务逻辑脱离 UI:
- ViewModel 不知道 Activity 到底是怎么显示错误的(是弹 Toast 还是 SnackBar)。它只负责发出一句:“嘿,显示个错误!”。
防止重复触发:
- 通过
SharedFlow(replay = 0),用户旋转屏幕时,由于 ViewModel 中的方法并没有被再次调用,且 Flow 缓存为 0,Activity 重启后不会再次收到“登录成功”的旧指令,有效避免了逻辑重跑。
- 通过
1.3.6 解耦通信
虽然不推荐在 UI 层直接持有 MutableSharedFlow,但在某些极特殊的组件间通信场景下,你可以通过一个单例的 EventBus 来触发:
1 | // 极少使用的场景:直接在 Activity 发送 |
注意:这会破坏架构的整洁性,通常仅用于跨模块的解耦通信。
2、StateFlow
如果说 SharedFlow 是一个多功能的广播电台,那么 StateFlow 就是那个电台里永远显示当前时刻的时钟。它是 SharedFlow 的一个高度特化版本,专为管理和观察状态而设计。在 Android 开发中,它几乎完美替代了 LiveData,并深度集成了 Kotlin 协程的特性。
2.1 核心本质:特化版的 SharedFlow
StateFlow 的本质是满足以下条件的 SharedFlow:
replay恒等于 1:新订阅者永远能收到最新的状态。onBufferOverflow恒等于DROP_OLDEST:新状态总是覆盖旧状态。- 强制初始值:状态不能为“空”,必须从某个值开始。
2.2 MutableStateFlow:构造函数
1 | public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> |
value参数:创建时必须传入一个初始值。行为特性:
- 读写合一:既可以作为 Flow 被收集,也可以通过
.value直接读写。 - 防抖机制:默认内置了
distinctUntilChanged逻辑。如果新赋的值与旧值相等(equals为 true),它不会触发任何通知。
- 读写合一:既可以作为 Flow 被收集,也可以通过
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25// 定义一个简单的状态包装类
data class UserState(val name: String = "", val age: Int = 0)
// A. 初始化:必须传入一个初始值
// 就像定义变量一样,StateFlow 不允许“真空”状态
val _uiState = MutableStateFlow(UserState(name = "未登录", age = 18))
// B. 暴露只读接口 (最佳实践)
// 这样外部只能 collect(观察),不能直接修改 value
val uiState: StateFlow<UserState> = _uiState.asStateFlow()
fun handleExamples() {
// 1. 直接同步读取当前值 (无需协程)
// 这是 StateFlow 的巨大优势:随时获取快照
val currentData = _uiState.value
println("当前用户:${currentData.name}")
// 2. 直接赋值 (简单覆盖)
// 注意:赋值会立即触发所有正在 collect 的协程
_uiState.value = UserState(name = "张三", age = 25)
// 3. 防抖特性演示
// 如果赋的值与旧值完全相等(equals),collect 端不会收到任何波动
_uiState.value = UserState(name = "张三", age = 25) // 这一次赋值不会触发 UI 刷新
}单一数据源:为什么代码里推荐用
asStateFlow()?在 ViewModel 中,我们通常持有
MutableStateFlow(私有),但对外暴露StateFlow(公有)。这遵循了 单一数据源(SSOT) 原则:只有 ViewModel 有权定义“真相”,Activity 只能作为“观众”。
2.3 核心属性:.value
这是 StateFlow 与其他 Flow 最大的区别。
- 直接访问:在非协程作用域(如普通的函数逻辑)中,你可以随时通过
stateFlow.value获取当前状态,而无需等待collect。 - 线程安全:对
.value的读取和赋值是线程安全的。
2.4 关键操作符:update { … } (原子更新)
这是处理并发状态更新的关键 API。
1 | public inline fun <T> MutableStateFlow<T>.update(function: (state: T) -> T) |
工作原理:内部使用 CAS (Compare And Swap) 乐观锁。它会读取当前值,执行变换逻辑,如果写入时发现值已被其他线程修改,则会自动重试。
应用场景:
- 逻辑依赖旧值:只要你的新状态是基于旧状态计算出来的(如
count + 1,list + item),就必须用update。 - 多协程写入:如果状态可能在不同的
launch块或不同的线程中被修改,请使用update。 - 简单覆盖:如果你的逻辑是完全替换(如
value = NewState()),且不关心旧值是什么,可以直接使用.value = ...。
- 逻辑依赖旧值:只要你的新状态是基于旧状态计算出来的(如
错误示范:非原子更新(有丢数据风险)。当多个协程(如同时处理多个点赞点击)并发执行时,这种写法会导致计数错误。
1
2
3
4
5
6
7
8fun incrementLikesWrong() {
viewModelScope.launch(Dispatchers.Default) {
// 协程 A 读到 10,协程 B 也读到 10
val currentLikes = _uiState.value.likes
// 协程 A 设置为 11,协程 B 随后也设置为 11(导致丢失了一次计数)
_uiState.value = _uiState.value.copy(likes = currentLikes + 1)
}
}正确示范:使用
update(原子安全)。update会确保在写入新值之前,当前值没有被其他线程改动。如果改动了,它会自动重新执行 Lambda 块。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15data class PostState(val likes: Int = 0, val isTrending: Boolean = false)
private val _uiState = MutableStateFlow(PostState())
fun incrementLikesSafe() {
viewModelScope.launch(Dispatchers.Default) {
// update 是原子性的,适合高频并发修改状态
_uiState.update { currentState ->
// 基于当前最新状态返回一个新状态
currentState.copy(
likes = currentState.likes + 1,
isTrending = currentState.likes + 1 > 100 // 逻辑联动
)
}
}
}在 Kotlin 中,给一个类加上
data关键字后,编译器会自动为它生成一些常用方法,包括:equals()、hashCode()、toString()、copy()。其中copy()方法的核心作用是:创建一个与原对象属性几乎一模一样的新对象,但允许你修改其中的某些属性。当你调用
currentState.copy(likes = ...)时,Kotlin 在后台做了两件事:- 克隆:复制当前
currentState对象中所有的原始数据。 - 替换:把你在括号里明确指定的那个属性(例如
likes)替换成新值。
1
2
3
4
5
6
7
8
9_uiState.update { currentState ->
// 假设当前 currentState 是 PostState(likes=10, isTrending=false)
currentState.copy(
likes = currentState.likes + 1, // 把复制出来的副本中的 likes 变成 11
isTrending = currentState.likes + 1 > 100 // 把副本中的 isTrending 根据新值重新计算
)
// 最终返回这个“改过某些地方”的新对象,并把 _uiState 指向它
}- 克隆:复制当前
2.5 衍生操作符:getAndUpdate 与 updateAndGet
有时你不仅想更新状态,还想在更新的同时拿到旧值或新值。
getAndUpdate:先返回当前值,再执行更新(类似i++)。updateAndGet:先执行更新,再返回更新后的新值(类似++i)。
1 | // 示例:更新状态并记录日志 |
2.5 转换 API:stateIn
类似于 shareIn,用于将冷流转换为 StateFlow。
1 | public fun <T> Flow<T>.stateIn( |
initialValue(初始值):强制要求,因为StateFlow不能没有值。started(启动策略):WhileSubscribed(5000):这是 Android UI 状态的最佳实践。当最后一个订阅者消失后,上游会多运行 5 秒(以适应 Activity 旋转屏幕),随后停止生产,节省资源。
- 代码示例:假设你有一个从 Room 数据库读取用户列表的冷流,你希望在 ViewModel 中将其转为
StateFlow供界面使用。1
2
3
4
5
6
7
8
9
10
11
12
13class UserViewModel(private val repository: UserRepository) : ViewModel() {
// 使用 stateIn 将 Cold Flow 转换为 Hot StateFlow
val userListState: StateFlow<List<User>> = repository.getAllUsers()
.map { users -> users.filter { it.isActive } } // 可以进行各种转换操作
.stateIn(
scope = viewModelScope, // 绑定到 ViewModel 生命周期
// 关键策略:WhileSubscribed(5000)
// 当 UI 可见时运行;当 UI 不可见(如切到后台)超过 5 秒后自动停止生产,节省资源
started = SharingStarted.WhileSubscribed(5000),
initialValue = emptyList() // 必须提供初始值
)
} - 核心价值:
- 单一生产源:无论有多少个 Fragment 订阅
userListState,上游的数据库查询只会运行一份。 - 状态保持:即使在 UI 还没准备好时,
StateFlow也会通过initialValue保证有一个合法值,并在数据到来后持久化最新值。 - 生命周期安全:通过配合
viewModelScope和WhileSubscribed,你再也不用手动去在onStop里关闭数据连接了。
- 单一生产源:无论有多少个 Fragment 订阅
3 StateFlow vs LiveData
在 Android 开发的演进过程中,从 LiveData 转向 StateFlow 是目前官方推荐的大趋势。简单来说,LiveData 是为早期的生命周期管理设计的,而 StateFlow 则是为了更现代、更强大的 Kotlin 协程生态而生的。
以下是为什么要更倾向于使用 StateFlow 的核心原因:
3.1 纯粹的 Kotlin 原生支持
- LiveData 是 Android 库的一部分,强依赖于
android.arch.lifecycle。如果你想在 KMP (Kotlin Multiplatform) 项目(如 iOS 和 Android 共享逻辑)中使用它,几乎不可能。 - StateFlow 属于 Kotlin 协程库 (
kotlinx.coroutines)。它不依赖 Android SDK,可以在任何 Kotlin 环境下运行。这意味着你的ViewModel逻辑可以更轻松地进行单元测试。
3.2 更强大的处理能力
LiveData 的变换功能(如 map, switchMap)非常有限。而 StateFlow 作为 Flow 的一种,可以使用协程库中数十种强大的操作符。
| 功能 | LiveData |
StateFlow |
|---|---|---|
| 线程切换 | 相对固定,观察者通常在主线程 | 极其灵活,可使用 flowOn 切换线程 |
| 异步处理 | 较难处理复杂的异步逻辑 | 天生支持挂起函数,处理复杂流非常丝滑 |
| 背压/缓冲 | 不支持 | 通过协程机制完美支持 |
3.3 始终一致的状态
- StateFlow 必须有一个初始值。这确保了无论何时订阅,它总是能立刻拿到一个当前状态。
- LiveData 可以没有初始值(为
null),这有时会导致在 UI 层处理数据时出现不可预期的空指针或空状态。
3.4 解决“数据倒灌”与粘性问题
虽然 StateFlow 也是粘性的(新订阅者会收到最后一次的值),但通过配合 Lifecycle.repeatOnLifecycle API,它可以更优雅地处理生命周期,避免在后台做多余的更新,这比 LiveData 隐式处理生命周期更加透明且可控。
3.5 不要忽略 UI 层的配合
虽然 StateFlow 更好用,但在 Android Activity/Fragment 中收集它时,你需要意识到它不感知生命周期。
- 注意:不能直接用
stateFlow.collect { ... },否则即使 App 在后台,协程也会继续跑,消耗电量。 - 推荐写法: 总之,选择
1
2
3
4
5
6
7viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.uiState.collect { data ->
// 更新 UI
}
}
}StateFlow是为了拥抱更标准、功能更全的 Kotlin 协程生态。它能让你的代码摆脱 Android 平台的束缚,提供更精细的并发控制。
4 实战案例
至此我们已经理解了 StateFlow(存状态) 和 SharedFlow(发事件) 的分工,我们直接上实战:一个标准的 MVI (Model-View-Intent) 极简架构模板。
4.1 经典案例
这个案例模拟了一个常见的社交功能:点击「点赞」按钮,数字增加(状态),同时弹出一个「感谢点赞」的提示(事件)。
ViewModel 层:状态与事件的统一管理
在 MVI 架构中,我们将 UI 的所有状态封装在一个 Data Class 中,将所有动作封装在一个 Sealed Class 中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32// 1. UI 状态:用 StateFlow 承载
data class LikeUiState(
val count: Int = 0,
val isLoading: Boolean = false
)
// 2. 一次性事件:用 SharedFlow 承载
sealed class LikeUiEvent {
data class ShowToast(val message: String) : LikeUiEvent()
object PlayAnimation : LikeUiEvent()
}
class LikeViewModel : ViewModel() {
private val _uiState = MutableStateFlow(LikeUiState())
val uiState = _uiState.asStateFlow()
private val _eventFlow = MutableSharedFlow<LikeUiEvent>()
val eventFlow = _eventFlow.asSharedFlow()
// 核心逻辑:点赞
fun handleLike() {
viewModelScope.launch {
// A. 更新状态:使用 update 确保多线程安全
_uiState.update { it.copy(count = it.count + 1) }
// B. 发送事件:通知 UI 播放动画或弹窗
_eventFlow.emit(LikeUiEvent.PlayAnimation)
_eventFlow.emit(LikeUiEvent.ShowToast("感谢支持!"))
}
}
}Activity 层:双流并发收集
在 Activity 中,我们需要同时监听这两个流。利用
repeatOnLifecycle,我们可以确保它们在后台时都保持静默。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38class LikeActivity : AppCompatActivity() {
private val viewModel: LikeViewModel by viewModels()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
val binding = ActivityLikeBinding.inflate(layoutInflater)
setContentView(binding.root)
// 触发:用户点击
binding.btnLike.setOnClickListener {
viewModel.handleLike()
}
// 收集:开启生命周期感知的协程
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
// 使用 launch 并发收集两个流
// 监听状态流 (StateFlow)
launch {
viewModel.uiState.collect { state ->
binding.tvLikeCount.text = "点赞数: ${state.count}"
}
}
// 监听事件流 (SharedFlow)
launch {
viewModel.eventFlow.collect { event ->
when (event) {
is LikeUiEvent.ShowToast -> showToast(event.message)
is LikeUiEvent.PlayAnimation -> startHeartAnimation()
}
}
}
}
}
}
}
4.2 黄金标准
为什么这个模板是“黄金标准”?
- 分工明确:
uiState负责告诉 View “你现在长什么样”;eventFlow负责告诉 View “你现在要做什么”。 - 数据安全:即便用户在
ShowToast发出的一瞬间旋转了屏幕,Activity 重启后,uiState会自动重发最新的数字(因为replay=1),而eventFlow不会重发 Toast(因为replay=0)。 - 原子更新:
update { ... }规避了快速连续点击可能带来的数据错乱风险。