1、异常操作符
1.1 emit 的 try-catch 禁区
在协程中,Flow 极大地简化了异步数据流的处理。然而,很多开发者在处理异常时,往往会习惯性地在 collect 或 flow { ... } 构建块中使用经典的 try-catch。当你尝试在 emit 周围包裹 try-catch 时,你可能已经踏入了一个被称为 “下游异常泄露” 的陷阱。比如,以下的错误示范:在构建器中拦截一切。
1 | fun main(): Unit = runBlocking { |
在上面的代码中,下游 collect 抛出的 IllegalStateException 会被上游的 catch 捕获。这破坏了流的封装性,导致下游的逻辑错误被上游悄悄地「消化」或错误处理了。
1.1.1 反直觉的实质
函数嵌套调用
为什么上游的
try-catch会捕获下游的异常?这是一个反直觉又非常深刻的问题,触及了 Kotlin Flow 设计的核心:emit本质上是一个同步的挂起函数调用。我们需要拆解
emit与collect之间的协作协议。在 Flow 的底层实现中,emit并不是把数据扔进一个异步队列,而是直接调用了下游传进来的collector函数。当你写下这段代码时:
1
2
3
4
5
6
7
8
9
10// 简化后的伪代码,展示调用链
flow {
try {
emit(1) // 这里的执行流会跳转到 collect 块
} catch (e: Exception) {
println("上游捕获了: $e")
}
}.collect { value ->
throw IllegalStateException("下游崩溃")
}其内部逻辑在运行时等同于:
1
2
3
4
5
6
7
8
9
10
11
12
13
14// 逻辑展开
val collector = object : FlowCollector<Int> {
override suspend fun emit(value: Int) {
// 这就是你写在 .collect { ... } 里的逻辑
throw IllegalStateException("下游崩溃")
}
}
// 执行 flow 块
try {
collector.emit(1) // 直接调用了上面的重写函数
} catch (e: Exception) {
println("上游捕获了: $e")
}原理总结:从 CPU 执行栈的角度看,下游的
collect逻辑实际上是作为emit函数的扩展延伸在运行。因此,下游抛出的异常会沿着调用栈直接向上传递回emit的调用处。内存模型与协程栈
在协程中,
emit是一个suspend函数。当emit被调用时,当前的协程并不会切换线程(除非使用了flowOn),它只是在同一个协程作用域内执行了下游的代码块。- 上游(Producer):处于调用栈的底部。
- emit:处于中间。
- 下游(Consumer):处于调用栈的顶部。
当顶部发生未捕获异常时,根据 JVM/Kotlin 的异常冒泡机制,它必然会穿过
emit。如果emit被包裹在try-catch中,这个异常就会被当成「上游自己的异常」拦截掉。
1.1.2 手动双重嵌套
《函数嵌套调用》 和 《内存模型与协程栈》 的解释好像有点儿道理,但总感觉怪怪的。直觉敏锐的你终于回过味儿来了,你在 collect 内部也套了一层 try-catch。
1 | fun main(): Unit = runBlocking { |
从代码执行的物理链路来看,异常确实会在下游被优先捕获,而不会向上冒泡给上游,你的逻辑是通的。但从架构设计和工程实践来看,这种写法实际上把代码变成了一团「防御性地雷」,让我们深入剖析一下这种写法的本质:
为什么「双重 try-catch」能生效?
在 JVM 的异常处理机制中,异常会沿着调用栈寻找最近的匹配
catch块。当
collect块内部抛出异常时,当前的调用栈如下:①
collect { ... }逻辑内部 ← 异常发生地②
emit(1)③
flow { ... }构建块由于你在第一层(
collect内部)就准备好了try-catch,异常在还没来得及逃出collect作用域进入emit内部时,就已经被处理掉了。因此,上游的catch根本感知不到任何异常发生。这种写法的“坏味道”
虽然代码能跑通,但它带来了三个严重的问题:
A. 职责边界模糊
在
flow { ... }里写try-catch包裹emit,本质上是在表达:“我不信任我的消费者,我要替它收尸。”这违反了解耦原则。一个好的 Producer(上游)应该只负责产出数据,至于 Consumer(下游)拿到数据是画图还是存库、执行成功还是失败,上游不应该干涉。
B. 吞掉致命错误
如果你在上游使用了宽泛的
catch (e: Exception),你不仅捕获了业务异常,还可能捕获了:CancellationException(协程取消信号,被捕获会导致协程无法正常停止)。- 下游由于 OOM 或其他运行时不可恢复的逻辑错误,这会导致流在一种「僵尸状态」下继续运行。
C. 代码冗余与不可维护
如果每个
emit都要套try-catch,每个collect也要套try-catch,代码的逻辑密度会极低。正确的分层防御模型
在 Flow 中,我们追求的是 “谁污染,谁治理”:
异常发生位置 处理策略 推荐工具 数据源头(如 API 报错) 上游处理或转换成 Result 对象 .catch { ... } 异常结束时为具体的错误实例处理过程(如 map 逻辑) 算子链捕获 .catch { ... }业务消费(如 UI 渲染) 下游自己处理 collect { try-catch }
1.1.3 异常透明原则
Kotlin Flow 的设计遵循 「异常透明性」 原则。其核心哲学是流的逻辑应该能够预测异常的来源。这意味着:流的构建器(Producer)不应该捕获发生在下游操作符或终止操作(Consumer)中的异常。
「手动双重嵌套」章节中的方案在执行逻辑上是正确的,但它是一种「过度防御」。如果你确定某些异常是下游业务逻辑的一部分(比如输入校验失败),那么在 collect 里 try-catch 是完全正确的。但请务必把上游 emit 周围那个 try-catch 拆掉。像下面这样:
1 | fun main(): Unit = runBlocking { |
事实上官方提供了颗粒度更细、更符合直觉的操作符来捕获异常,下面通过你的方案和官方方案进行一次深度的对比。
你的方案(手动双重嵌套):
1
2
3
4
5flow {
try { emit(1) } catch (e: Exception) { /* 防御下游 */ }
}.collect {
try { /* 业务逻辑 */ } catch (e: Exception) { /* 处理自己 */ }
}官方推荐方案(异常透明):
1
2
3
4
5
6
7
8
9
10
11
12flow {
emit(1) // 保持纯粹,异常直接抛给下游
}
.catch { e ->
// 只负责上游(数据源)的兜底
emit(DEFAULT_VALUE)
}
.collect { value ->
// 下游的异常由下游自己决定是 catch 还是直接崩掉
runCatching { doSomething(value) }
.onFailure { handle(it) }
}
1.2 catch 操作符
catch 是一个中间操作符,专门用于拦截并处理在该操作符 之前(上游) 发生的异常。
1.2.1 功能定位
- 捕获异常:拦截上游抛出的
Throwable。 - 恢复流:通过
emit发射一个默认值或兜底数据。 - 转换异常:重新抛出一个自定义的业务异常。
- 终止流:处理完逻辑后,让流正常结束。
1.2.2 代码示例
上游优先原则,是 catch 最著名的特性:它对下游发生的异常视而不见。
场景 A:捕获上游异常(成功)
1
2
3
4
5
6
7
8
9flow {
emit("A")
throw RuntimeException("上游炸了")
}.catch { e ->
emit("Backup Data") // 成功捕获并恢复
}.collect {
println(it)
}
// 输出: A, Backup Data场景 B:捕获下游异常(失败)
1
2
3
4
5
6flowOf("A", "B")
.catch { e -> println("捕获到了: $e") } // 它在 collect 之前
.collect {
check(it != "A") { "下游炸了" }
}
// 结果: 程序崩溃,抛出 IllegalStateException。catch 没起作用!
为什么设计成这样?这就是异常透明性。Flow 设计者认为:操作符不应该静默地吞掉由于下游处理逻辑(如 UI 渲染、数据库写入)导致的错误。如果下游坏了,那是消费者的责任。
1.2.3 整合异常链路
如果你希望统一处理整个链路的异常,常见的模式是 onEach + catch 组合,将业务逻辑从 collect 挪到 onEach 中:
1 | flowOf("Data") |
1.2.3 工作原理
你可能会好奇:当异常冒泡时,catch 怎么知道这个异常是来自上面的 map,还是下面的 collect?
在 Flow 内部,所有的发射动作都通过 FlowCollector 完成。catch 内部使用了一个特殊的封装类。当它检测到异常抛出时,会对比 异常发生的 Collector 实例。
当你调用 .catch { ... } 时,底层实际进入了 catchImpl 内部函数。这里的函数嵌套调用逻辑与「反直觉的实质」章节类似:
1 | // 简化后的官方源码逻辑 (kotlinx-coroutines-core) |
从源码中可以发现,如果异常是从下游 collect 传回来的,它会直接重新抛出,绝不拦截。这里有个小细节,catchImpl 会主动把 CancellationException 重新抛出,确保协程取消机制不被破坏。
2、retry 操作符
retry 和 retryWhen 是针对上游 Flow 发生异常时的处理操作符。它们与 catch 一起,构成了 Flow 异常处理的基石。不同于 catch 的「兜底」和「转换」,重试操作符的核心使命是尝试修复:在检测到异常后,试图重新启动出错的流,以期待在下一次尝试中能够成功完成。
2.1 retry():无条件的“无限重试”
API 语义:
retry操作符静静地「守候」在 Flow 链条中。只有当上游 Flow 抛出了异常,并且这个异常沿着链条传递到了retry节点时,它才会被激活。核心逻辑:重启上游。当接管异常后,它并不像
catch那样直接向下方发射新数据或发出流完成信号,而是执行一项具有颠覆性的操作——重启上游整个 Flow 链条。重启概念:所谓重启,在 Flow 的底层实现中意味着:对上游的 Flow 对象重新调用
collect函数。- 这会启动一个全新的数据收集流程。
- 如果你的上游是一个通过
flow { ... }构建器创建的流,这意味着flow代码块中的代码将从头开始重新执行。
下游隔离:一个非常重要且巧妙的设计是,下游对上游的重启过程完全感知不到。
retry的作用范围严格限制在其上游 Flow 链条。- 对于
retry之后的操作符和最终的收集者(Collector)来说,数据流看起来是连续的。它们只是持续地接收数据,而不知道这些数据是来自第一次尝试,还是来自某次重试。这种隔离性保持了下游逻辑的简洁。
算子兼容:如果
retry与最初的数据源 Flow 之间存在诸如map、filter等中间操作符,异常仍会逐级向下传递,直至被retry捕获。当retry重启上游时,它实际上是重新collect了紧邻其上游的操作符(比如map)返回的 Flow。这个操作符会依次重新collect它的上游,最终触发整个链条的重新执行。代码示例:
1
2
3
4
5
6
7flow {
emit(1)
throw RuntimeException("Oops!")
}.retry() // 无限重试
.collect { println(it) }
// 输出结果:1, 1, 1, 1, 1 ... (死循环)循环陷阱:当我们调用不带任何参数的
retry()时,其默认逻辑是只要有异常到达,就立即重启上游 Flow。这引入了一个潜在的危险 — 无限循环现象。循环成因:代码示例的上游 Flow 的逻辑是固定在发送数据 1 后抛出一个异常。
① 流开始,发送 1,然后抛出异常。
②
retry()捕获异常,重启上游。③ 上游再次从头开始,发送 1,又抛出同一个异常。
④
retry()再次重启……结果:这个过程会无休止地进行下去,直到程序被外部终止。
下游视角:下游将看到一个无限的数据序列:1, 1, 1, 1, …,仿佛这个 Flow 始终在正常运行,永远不会结束。
适用场景:极少数情况,通常用于必须成功的后台任务,且你确定异常是暂时性的。
危险警告:若上游异常是持续性的,会导致 CPU 占用过高或日志爆满。
2.2 retry(retries: Long):固定次数的重试
在生产环境中几乎很少直接使用无参版本,因为它缺乏对异常情况的克制。为了防止无限循环,我们通常会限制重试的次数。
- 参数含义:
retries表示额外重试的次数。 - 执行流程:
- 初始运行 1 次,发生异常。
- 触发第 1 次重试,又发生异常。
- .. 直到完成第
n次重试。 - 如果第
n+1次依然抛出异常,retry将不再拦截,异常会直接“穿透”并向下游抛出。
- 代码示例:通过传入一个长整型参数,你可以限制重试的总次数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14var count = 0
flow {
println("开始尝试...")
if (count++ < 2) throw IOException("网络错误")
emit("成功获取数据")
}.retry(2) // 最多额外重试 2 次
.collect { println(it) }
输出:
-------------------------
开始尝试... (初始执行,失败)
开始尝试... (第1次重试,失败)
开始尝试... (第2次重试,成功)
成功获取数据 - 核心逻辑:如果第 3 次尝试(初始 1 + 重试 2)仍然失败,异常将不再被拦截,直接抛给下游。
2.3 retry(retries, predicate):精准打击特定异常
有时候我们只想针对特定的错误进行重试(比如 IOException),而对于逻辑错误(比如 NullPointerException)则希望直接崩溃。
- 判定逻辑:这是一个双重约束。只有当「次数未满」且「判定函数返回 true」时,重试才会发生。
- 灵活应用:你可以在
predicate块中通过异常的message或类型进行精细化过滤。 - 代码示例:
1
2
3
4
5
6
7
8
9flow {
emit(1)
throw IllegalArgumentException("参数错误")
}.retry(3) { cause ->
// 只有 IO 异常才重试,逻辑错误(如参数错误)直接崩溃
cause is IOException
}.collect { println(it) }
// 输出:1,然后直接抛出 IllegalArgumentException 并崩溃 - 穿透机制:一旦超过重试次数或者异常类型不匹配,
retry就不再拦截,异常会继续向下传递,直至触发catch或导致程序崩溃。
2.4 retryWhen:高阶重试逻辑
如果你觉得 retry 的参数写起来不够灵活,retryWhen 提供了更高维度的控制。它将异常原因和当前重试次数(从 0 开始)封装进了一个 lambda 中。
- 代码示例:
1
2
3
4
5
6
7
8
9flow.retryWhen { cause, attempt ->
// attempt 从 0 开始计数
if (cause is NetworkException && attempt < 3) {
delay(1000) // 重试前歇 1 秒,这是 retry 做不到的便捷
true // 返回 true 表示继续重试
} else {
false // 放弃重试,异常向下抛出
}
} - 等价逻辑:
retry(n) { ... }实际上就是retryWhen { e, i -> i < n && predicate(e) }的简化版。
2.5 retry 与 catch 的区别
这是最容易产生误区的地方。请记住这张对比表:
| 特性 | retry |
catch |
|---|---|---|
| 角色定位 | 试图修复(修理工) | 善后处理(清洁工) |
| 上游状态 | 整链重启:从头开始执行代码块 | 链路死亡:上游不再运行 |
| 下游数据 | 呈现连续性,无感知 | 收到 catch 块中 emit 的新数据 |
| 典型位置 | 紧跟在易出错的算子之后 | 放置在流的末尾进行收尾 |
3、生命周期操作符
在掌握了 Flow 的重试机制后,理解其全流程监听操作符(onStart、onCompletion、onEmpty)是构建商业级响应式代码的最后一块拼图。这三个 API 能够让你精准地在数据流的“生、老、病、死”各个阶段注入逻辑。
3.1 onStart:流的“初始化”哨兵
onStart 是在下游开始收集数据之前触发的操作符。它最常用于 UI 开发中开启 Loading 动画。
- 触发时机:当终端操作符(如
collect)被调用,且上游 Flow 正式生产数据之前执行。 - 逆序执行:如果链条中有多个
onStart,它们是从下往上触发的。这与普通操作符(如map)完全相反。 - 异常处理:传统的
try-catch无法捕获onStart内部的崩溃,但 Flow 的catch操作符可以。 - 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16flow {
emit("数据请求成功")
}
.onStart {
// 这里适合做初始化:展示进度条、重置状态
println("1. 靠近上游的 onStart")
}
.onStart {
println("2. 靠近下游的 onStart")
}
.collect { println(it) }
// 输出顺序:
// 2. 靠近下游的 onStart
// 1. 靠近上游的 onStart
// 数据请求成功
3.2 onCompletion:流的“谢幕”记录员
onCompletion 相当于 Java 中的 finally 块。无论 Flow 是正常跑完还是中途崩溃,它都会被调用。
- 触发条件:所有数据发送完毕(正常结束)或 抛出异常(异常结束)。
- 参数含义:接收一个
Throwable?参数。正常结束时为null;异常结束时为具体的错误实例。 - 与 catch 的区别:它是“旁观者”。它能看到异常,但不会拦截异常。异常在
onCompletion执行完后,依然会继续向下游传播。 - 代码示例:
1
2
3
4
5
6
7
8
9
10
11flow {
emit(1)
throw RuntimeException("网络意外中断")
}
.onCompletion { cause ->
// 适合做清理工作:隐藏进度条、关闭数据库连接
if (cause != null) println("流因异常结束: ${cause.message}")
else println("流正常结束")
}
.catch { /* 真正拦截并处理异常的地方 */ }
.collect()
3.3 onEmpty:流的“空状态”处理器
这是一个非常实用的「特种」操作符,专门处理那些「虽然成功了但没给数据」的情况。
- 触发条件:必须同时满足——Flow 正常结束 且 从未发送过任何数据。
- 拦截逻辑:如果 Flow 是因为异常而提前终止的,即便之前没发过数据,
onEmpty也不会触发。 - 补救能力:你可以在
onEmpty的代码块中使用emit()来补发一些默认值。 - 代码示例:
1
2
3
4
5
6
7
8
9
10flow<String> {
// 模拟搜索结果为空
}
.onEmpty {
println("搜索结果为空,发送占位提醒")
emit("暂无相关内容")
}
.collect { println(it) }
// 输出:暂无相关内容
3.4 API 对比与选型指南
为了方便记忆,我们可以通过下表快速区分这几个 API:
| API | 触发场景 | 异常感知 | 能否拦截异常 | 典型用途 |
|---|---|---|---|---|
onStart |
collect 刚启动时 |
不能捕获自身异常 | 否 | 显示 Loading、准备资源 |
onCompletion |
结束时(正常/异常) | 能(通过参数) | 否 | 隐藏 Loading、释放资源 |
onEmpty |
正常结束且 0 发射 | 否 | N/A | 展示缺省页、发默认值 |
catch |
仅异常结束时 | 能 | 是 | 错误兜底、转换异常 |
3.5 最佳实践:生命周期组合拳
在 Android 架构(如 MVVM/MVI)中,我们通常将这些 API 组合使用,以实现严丝合缝的状态管理:
1 | repository.getData() |
理解了这三个 API 的精确触发点,你就能像控制交响乐团一样,指挥 Flow 在正确的时刻做出正确的响应。
在 Android 真实的开发架构(如 MVVM 或 MVI)中,将 Flow 的生命周期操作符与 UI 状态管理结合,可以极大地减少 Activity 或 Fragment 中的逻辑堆砌。
下面是一个模拟真实网络请求的完整示例,虽然部分 API 超纲,但不影响理解。我们定义一个 UiState 包装类,并利用 onStart、onEmpty、catch 和 onCompletion 来驱动 UI 变化。
定义统一的 UI 状态
首先,我们需要一个密封类来代表 UI 的各种阶段。
1
2
3
4
5
6sealed class UiState<out T> {
object Loading : UiState<Nothing>()
object Empty : UiState<Nothing>()
data class Success<T>(val data: T) : UiState<T>()
data class Error(val exception: Throwable) : UiState<Nothing>()
}ViewModel 中的逻辑编排
在 ViewModel 中,我们调用 Repository 获取数据,并使用操作符将原始数据流转换为
UiState流。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36class UserViewModel(private val repository: UserRepository) : ViewModel() {
private val _userState = MutableStateFlow<UiState<List<User>>>(UiState.Loading)
val userState = _userState.asStateFlow()
fun fetchUsers() {
viewModelScope.launch {
repository.getRemoteUsers()
// 1. 开始请求时,立即发射 Loading 状态
.onStart {
_userState.value = UiState.Loading
}
// 2. 如果请求成功但返回了空列表,转换为 Empty 状态
.onEmpty {
_userState.value = UiState.Empty
}
// 3. 发生网络异常或解析异常时,捕获并转换为 Error 状态
.catch { e ->
_userState.value = UiState.Error(e)
}
// 4. 无论成功还是失败,都可以在这里做最后的清理(如打点统计)
.onCompletion { cause ->
println("请求结束,是否异常:${cause != null}")
}
// 5. 正常的成功数据处理
.collect { users ->
if (users.isNotEmpty()) {
_userState.value = UiState.Success(users)
} else {
// 注意:如果上游用了 flowOf() 但没发数据,onEmpty 会触发
// 如果发了空列表,这里手动处理或在 repository 层控制
}
}
}
}
}Repository 层的配合
Repository 负责提供数据源。为了触发
onEmpty,我们需要确保在没有数据时,流是正常结束且没有调用emit的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15class UserRepository(private val apiService: ApiService) {
fun getRemoteUsers(): Flow<List<User>> = flow {
val response = apiService.getUsers() // 假设这是个挂起函数
if (response.isSuccessful) {
val body = response.body()
if (body.isNullOrEmpty()) {
// 不调用 emit,直接结束流,从而触发下游的 onEmpty
} else {
emit(body)
}
} else {
throw HttpException(response) // 抛出异常触发下游 catch
}
}.flowOn(Dispatchers.IO) // 切换到 IO 线程执行
}UI 层的响应
在 Activity 或 Fragment 中,我们只需要观察这一个
StateFlow。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.userState.collect { state ->
when (state) {
is UiState.Loading -> showLoading(true)
is UiState.Success -> {
showLoading(false)
adapter.submitList(state.data)
}
is UiState.Empty -> {
showLoading(false)
showEmptyLayout()
}
is UiState.Error -> {
showLoading(false)
showErrorMsg(state.exception.message)
}
}
}
}
}
- 为什么这是最佳实践?
- 职责分离:ViewModel 负责逻辑调度,UI 只负责根据状态渲染。
- 代码简洁:不再需要手动在
try-catch前后写loading.value = true/false,所有生命周期被操作符自动化。 - 单向数据流:状态的起点是
onStart,终点是collect后的 UI 更新,路径唯一且清晰。 - 异常安全:由于使用了
catch操作符,即使网络层崩溃,APP 也能优雅地展示错误页,而不是直接闪退。
API 进阶玩法
在真实的网络请求中,我们通常不希望一出错就直接显示错误页,而是希望它能静默重试几次(比如网络抖动)。如果重试 3 次还是不行,再抛出异常交给
catch处理。这就是 「指数退避算法」 :每次重试的等待时间加倍,给服务器和网络留出喘息机会。进阶版:带自动重试的网络请求流
我们将之前的逻辑进行升级,在
onStart之后、catch之前插入retryWhen。核心逻辑实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29repository.getRemoteUsers()
.onStart {
// 1. 订阅开始,UI 转圈
_userState.value = UiState.Loading
}
.retryWhen { cause, attempt ->
// 2. 只有特定异常(如网络超时)且重试次数 < 3 时才重试
if (cause is IOException && attempt < 3) {
// 指数退避:第一次等 1s,第二次 2s,第三次 4s
val delayTime = (2.0.pow(attempt.toDouble()) * 1000).toLong()
println("请求失败,正在进行第 ${attempt + 1} 次重试,等待 ${delayTime}ms...")
delay(delayTime)
true // 返回 true,Flow 会重启上游(重新调用 Repository 的方法)
} else {
false // 超过次数或非网络异常,放弃重试,异常向下传递
}
}
.onEmpty {
// 3. 重试都结束了,结果还是空
_userState.value = UiState.Empty
}
.catch { e ->
// 4. 重试耗尽或中途遇到不可修复异常,展示错误 UI
_userState.value = UiState.Error(e)
}
.collect { users ->
// 5. 最终成功拿到数据
_userState.value = UiState.Success(users)
}为什么这是“最佳实践”中的天花板?
这种写法解决了移动端开发中几个非常棘手的痛点:
- 静默失败:用户可能只是进电梯时断了下网,
retryWhen在后台默默搞定了一切,用户感知不到报错,体验极佳。 - 避免“重试风暴”:如果服务器宕机,瞬间发起的数万次立即重试会形成 DDoS 攻击。指数退避(
delay)优雅地拉开了请求间隔。 - 状态隔离:
onStart只负责开启 Loading。retryWhen负责在后台“折腾”。collect只负责接收最后那个“修成正果”的数据。- 下游 UI 层完全不需要写
if (retryCount < 3)这种丑陋的补丁代码。
- 静默失败:用户可能只是进电梯时断了下网,
避坑指南
关于 onStart 的位置:
务必把
onStart放在retryWhen之前。- 如果放在
retryWhen之后,每次重试都会触发一遍onStart(比如导致进度条闪烁)。 - 放在之前,由于 Flow 的「隔离性」,
onStart只会在最初订阅时触发一次。
- 如果放在
Repository 的幂等性:
确保你的 Repository 方法是「幂等」的(即多次调用不会产生副作用),因为
retryWhen会实实在在地重新跑一遍上游代码。UI 状态重置:
如果你的
UiState中包含错误信息,记得在onStart里把它清空,否则重试成功后,UI 上可能还残留着上一次报错的痕迹。
现在,你已经拥有了一套完整的、具备自愈能力的数据流模板:
启动 (
onStart) → 尝试 → 失败重试 (retryWhen) → 判空 (onEmpty) → 最终容错 (catch) → 渲染 (collect) → 释放资源 (onCompletion)。这套逻辑不仅适用于网络请求,也同样适用于蓝牙连接、传感器监听、数据库同步等任何不稳定的异步场景。
4、flowOn 操作符
Flow 是 Kotlin 协程中用于处理异步数据流的核心工具。在构建数据流时,我们经常需要精细控制不同阶段代码执行的线程环境,这在 Flow 中是通过 flowOn 操作符实现的。
4.1 flowOn:核心的上游调度器
flowOn 是 Flow 中最频繁使用的 API。它的唯一任务是:改变其上游操作符执行时的协程上下文。
- 作用范围:仅影响
flowOn之前的代码(上游),不影响之后的代码(下游)。 - 设计哲学:上游(生产者)决定自己的工作环境,下游(消费者)保持不受干扰。
- 代码示例:
1
2
3
4
5
6
7
8flow {
emit(DB.query()) // 在 IO 线程执行
}
.map { it.toUpperCase() } // 在 IO 线程执行
.flowOn(Dispatchers.IO) // <--- 作用于上方所有逻辑
.collect {
updateUI(it) // 仍然在 collect 所在的线程(如 Main)执行
}
4.2 flowOn 与 withContext 的区别
在普通协程中,withContext 是切换线程的标准方式。但在 Flow 中,它有两个截然不同的使用场景:
合法场景:计算逻辑内切换
在
map或transform内部,如果你只想让某一段代码跑在特定线程(比如算力密集型任务),可以使用它。1
2
3
4
5.map { data ->
withContext(Dispatchers.Default) {
complexCalculation(data) // 只有这一行在 Default 执行
}
}违规场景:包裹
emit绝对不要用
withContext包裹emit调用。这会触发IllegalStateException,因为 Flow 内部有「透明性检查」,要求emit必须在原始收集中上下文中调用。
4.3 flowOn 融合机制
当你在链式调用中连续写了多个 flowOn 时,Kotlin 编译器并不会傻傻地创建多个中转站,而是会进行融合。
融合规则:很多人以为
flowOn像普通 Context 组合一样是「后者覆盖前者」,但在 Flow 的连续链式调用中,为了保护数据源的执行环境,Kotlin 采用了先入为主的融合策略——最靠近flow { }块的那个flowOn才是真正的赢家。源码解读:
当你在一个已经是 ChannelFlow 的对象上再次调用
flowOn时,它不会包装新层,而是执行以下fuse逻辑:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15// 摘自 kotlinx.coroutines.flow.internal.ChannelFlow
public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> {
// 注意这里的合并顺序!
// context 是当前(下方/右侧)flowOn 传入的上下文
// this.context 是上游(上方/左侧)flowOn 已经持有的上下文
val newContext = context + this.context
val newCapacity = when {
onBufferOverflow != BufferOverflow.SUSPEND -> capacity
else -> ... // 缓冲容量的融合逻辑
}
// 返回一个融合后的新对象,使用的是 newContext
return FusedDispatchChannelFlow(upstream, newContext, newCapacity, onBufferOverflow)
}根据源码
ChannelFlow.kt的实现,当多个flowOn融合时,执行的是新Context + 旧Context。1
NewContext = Downstream Context + Upstream Context
因为在 Kotlin 协程中,
+号右边的上下文会覆盖左边的同类项。所以,上游(Upstream)声明的 Dispatcher 永远站在+号的右边,它才是那个最后的赢家。代码示例:
1
2
3
4
5
6
7
8
9flow { // 原始 Flow
println("CoroutineContext : ${currentCoroutineContext()}")
emit(1)
}
.flowOn(Dispatchers.IO) // context1 (上方:生成的 ChannelFlow,其 this.context = IO)
.flowOn(Dispatchers.Default) // context2 (下方:调用 fuse,传入 context = Default)
.collect()
// 输出 CoroutineContext : [ProducerCoroutine{Active}@ce9d8b5, Dispatchers.IO]上面的例子中,所有上游逻辑最终都会直接在
Dispatchers.IO中执行。这减少了不必要的协程创建和线程切换开销。注意:如果中间夹杂了其他操作符(如
map),则不会融合,而是各自管辖各自的区间。融合过程:源码中的合并顺序是:
当前(下方)的 context + 上游(上方)的 this.contextflowOn(IO)包装了原始的flow。- 当代码执行到
.flowOn(Dispatchers.Default)时,它会检查上游。 - 上游是一个已经包含了
Dispatchers.IO的FusibleFlow。 - 执行合并
newContext = Dispatchers.Default + Dispatchers.IO。 - 在协程中,
+号右侧的元素优先级更高。 Dispatchers.IO覆盖了Dispatchers.Default。
综合结论:由于合并是
Default + IO,根据协程上下文的覆盖规则,IO 覆盖了 Default。- 物理结果:最上面的
flowOn(IO)赢了。 - 直观感受:上面的覆盖了下面的。
- 底层逻辑:下方的
flowOn尝试融合上方的上下文,但因为+操作符的覆盖特性,原本在上面的(更靠近数据的)优先级更高。
- 物理结果:最上面的
设计哲学:为什么设计成「上方优先」?这其实非常符合逻辑。我们来看这个场景:
1
2
3
4
5
6
7
8// 库作者写的代码
fun getRemoteData() = flow { emit(fetch()) }.flowOn(Dispatchers.IO)
// 你作为调用者写的代码
getRemoteData()
.map { process(it) }
.flowOn(Dispatchers.Default)
.collect()- 库作者:明确知道
fetch()必须在IO执行。 - 调用者:希望后续处理在
Default执行。 - 融合结果:
fetch()依然运行在IO上。这保护了库作者对「数据源生产环境」的声明,不被外部的flowOn轻易篡改。
- 库作者:明确知道
分段式 flowOn:只有连续的
flowOn才会融合。 如果中间插入了普通操作符,它们就各司其职:1
2
3
4
5flow { emit(1) } // 运行在 IO
.flowOn(Dispatchers.IO)
.map { it + 1 } // 运行在 Default (因为这一段被下面的 flowOn 管控)
.flowOn(Dispatchers.Default)
.collect() // 运行在 Main在这种情况下,不存在覆盖问题,而是分段管理。
4.4 channelFlow:flowOn 的底层亲戚
当你使用 flowOn 时,系统底层其实创建了一个 Channel。这就是为什么 flowOn 能够实现并发:上游在 A 线程生产并丢进 Channel,下游在 B 线程从 Channel 拿数据。
- API 关系:
flowOn实际上是ChannelFlowOperator的一种封装。 - 适用场景:如果你的逻辑本身就需要跨线程并发生产(比如同时启动多个协程发射数据),直接使用
channelFlow会比flow { ... }.flowOn()更高效且灵活。
4.5 下游 CoroutineContext 的定制方案
既然 flowOn 管不了下游,那我想让 collect 里的代码跑在特定线程该怎么办?
有四个方案,按推荐程度排序:
使用
launchIn(官方推荐):替代传统的collect。它是官方推荐的简洁写法,无性能差异。1
2flow.onEach { ... } // 数据收集块
.launchIn(GlobalScope.launch(Dispatchers.IO) {}) // 在指定协程中收集,并启动使用
onEach+flowOn实现统一: 将收集逻辑移入onEach,再对其应用flowOn。1
2
3flow.onEach { ... } // 这里的 collect 块移到了这里
.flowOn(Dispatchers.IO) // 定制 onEach 内部的上下文
.collect() // 这里只剩下启动 Flow使用
withContext包裹collect():前提是不含新的emit。1
2
3withContext(Dispatchers.IO) { // 确保不涉及发射新的 Flow
flow.collect { ... }
}直接在调用处指定:将
flow.collect { ... }置于目标上下文的协程中。1
2
3GlobalScope.launch(Dispatchers.Main) { // 目标上下文
flow.collect { ... }
}
5、buffer 操作符
在 Kotlin Flow 的高阶开发中,buffer 是一个绕不开的核心概念。很多开发者对它的理解仅停留在「暂存数据」上,但实际上,它是打破 Flow 线性执行限制、实现异步并发处理的底层引擎。
5.1 buffer:打破串行约束的“蓄水池”
buffer 是 Flow 中最基础的功能性操作符,用于为数据流添加缓冲能力。
- 核心功能:默认情况下,Flow 是双维度串行的——不仅单条数据内操作串行,多条数据之间也必须等前一条
collect完,后一条才开始生产。buffer允许上游在下游处理期间提前生产后续数据。 - 本质辨析:
buffer的出现,本质上是在生产和消费之间挖了一个“蓄水池”。- 解耦执行:它会让上游和下游运行在不同的协程中。上游可以开足马力生产,把数据丢进缓冲区就走;下游则按自己的节奏从缓冲区拿数据。
- 性能飞跃:原本总耗时 = (生产时间 + 消费时间) × 总数;有了
buffer后,总耗时 ≈ Max(生产总时间, 消费总时间)。
- API 参数:
capacity:缓冲区容量。onBufferOverflow:溢出策略(挂起、丢弃最旧、丢弃最新)。
- 底层原理:基于
Channel实现。它会启动一个新的协程来运行上游代码,通过 Channel 与下游通信。 - 代码示例:默认情况下,Flow 是同步的。加上
buffer后,上游生产和下游消费将运行在不同的协程中。运行效果:上游不会等下游处理完才生产下一个。你会看到上游快速连续产出 3 个数据(存入缓存),而下游慢悠悠地处理。1
2
3
4
5
6
7
8
9
10
11
12flow {
for (i in 1..3) {
delay(100) // 模拟生产耗时
emit(i)
println("上游:生产了 $i")
}
}
.buffer(2) // 添加容量为 2 的缓冲区
.collect { value ->
delay(300) // 模拟下游消费很慢
println("下游:处理了 $value")
}
5.2 buffer 与 flowOn:同一枚硬币的两面
你可能发现 flowOn 也能加速 Flow。其实,flowOn 的底层和 buffer 共享同一个实现类:ChannelFlow。
flowOn:侧重于协程上下文切换(比如切到 IO 线程),但它会隐式开启缓冲区。buffer:侧重于容量配置(capacity)和溢出策略(onBufferOverflow)。代码示例:这是最常用的 API,用于改变上游数据生产的上下文环境。
1
2
3
4
5
6
7
8database.getUsersFlow()
.filter { it.isActive } // 在 IO 线程执行
.map { it.toUiModel() } // 在 IO 线程执行
.flowOn(Dispatchers.IO) // 改变上游线程
.collect { uiModels ->
// 在调用 collect 的线程(通常是 Main)执行
updateUI(uiModels)
}关键点:
flowOn只影响它上方的操作。如果你在collect之后再写一个map,那个map是不受flowOn影响的。
5.3 buffer:融合规则
与 flowOn 类似,当多个 buffer 连用时,它不会简单地叠加出无数个中转站,而是会触发操作符融合。
操作符融合的核心规则表
场景 融合规则 最终结果 策略冲突 右侧优先覆盖 如果右侧显式指定了 DROP_OLDEST或DROP_LATEST,左侧的所有配置(容量+策略)全失效。容量融合 相加原则 如果双方策略都是默认的 SUSPEND,capacity会累加。上下文融合 合并原则 连续的 flowOn会将CoroutineContext合并。深度拆解:三个典型的融合案例
案例 A:容量累加(默认策略)
1
flow.buffer(2).buffer(3).collect { ... }
- 逻辑:双方都没有指定
onBufferOverflow,默认为SUSPEND。 - 结果:触发融合,最终得到一个容量为 5(2 + 3)的缓冲区。
案例 B:策略覆盖(右侧优先)
1
flow.buffer(10).buffer(1, onBufferOverflow = DROP_OLDEST).collect { ... }
- 逻辑:右侧操作符明确要求了「丢弃旧数据」的特殊策略。
- 结果:左侧的
buffer(10)被彻底抛弃。最终效果等同于conflate(),缓冲区容量仅为 1。
案例 C:混合双打(buffer + flowOn)
1
flow.flowOn(Dispatchers.IO).buffer(20).collect { ... }
- 逻辑:
flowOn默认自带 64 容量的SUSPEND缓冲。 - 结果:融合成一个在
Dispatchers.IO执行、容量为 84(64 + 20)的ChannelFlow。
- 逻辑:双方都没有指定
背后的逻辑:为什么这么设计?
Flow 的底层基石是
ChannelFlow。每当你调用一次buffer或flowOn,系统都会尝试检查上一个操作符是不是也是ChannelFlow:- 如果是:直接修改上一个对象的参数,不创建新对象(融合)。
- 如果不是:创建一个新的
ChannelFlow包装起来。
这种设计保证了性能开销最小化。即使你写了十个
buffer,只要策略相容,最后也只有一个协程切换和缓冲区。配置建议
在复杂的 Android 业务中(比如从 Room 数据库读取数据并进行耗时计算),推荐的配置方式是:
- 先定环境,再定容量
1
2
3
4// 推荐写法
flow.flowOn(Dispatchers.IO)
.buffer(Channel.CONFLATED) // 显式覆盖策略,只拿最新
.collect { ... } - 避免隐式覆盖:如果你已经在上游库里定义了
buffer,下游再使用带有DROP_策略的buffer时要格外小心,因为它会杀掉你上游苦心经营的所有缓冲容量。
- 先定环境,再定容量
5.4 conflate:只取最新的“跳帧”器
conflate 是 buffer 的一种便捷变体,专门用于处理生产速率远高于消费速率的场景。
- 等价关系:它完全等价于
buffer(capacity = 1, onBufferOverflow = DROP_OLDEST)。 - 应用场景:比如股票价格实时刷新、传感器数值显示。下游只关心此刻最新的状态,中间由于处理太慢而积压的数据可以直接丢弃。
- 行为特性:它永远不会挂起生产者,只会不断用新值覆盖缓冲区中的旧值。
- 代码示例:当你只关心最新状态(如 UI 刷新、传感器数值),中间积压的数据可以直接丢弃。 运行效果:由于 UI 处理慢,缓冲区里旧的股价会被新来的直接覆盖。下游可能只收到了 100, 104, 109,中间的数值被自动跳过了。
1
2
3
4
5
6
7
8
9
10
11
12
13val stockPriceFlow = flow {
repeat(10) { i ->
emit(100 + i) // 模拟快速波动的股价
delay(50)
}
}
stockPriceFlow
.conflate() // 等价于 buffer(1, DROP_OLDEST)
.collect { price ->
println("UI 更新股价:$price")
delay(200) // 模拟 UI 渲染较慢
}
5.5 mapLatest 与 collectLatest:追新弃旧
这两个 API 是基于缓冲机制实现的高级逻辑封装。
mapLatest:底层利用了容量为 0 的缓冲触发机制。当新数据到达时,如果旧的数据转换还没做完,它会立即取消旧协程,启动新转换。collectLatest:本质上是mapLatest的一种特殊组合。它确保了每次只处理最新的生产结果。关键点:它们依赖
buffer提供的异步能力来感知“新数据”的压迫,从而实现中断逻辑。代码示例:
1
2
3
4
5
6
7
8
9searchFlow
.mapLatest { query ->
// 模拟网络搜索请求
delay(500)
api.search(query)
}
.collect { results ->
showResults(results)
}运行效果:如果用户在 500ms 内连续输入了 “A”, “AB”, “ABC”,
mapLatest会自动取消掉 “A” 和 “AB” 的搜索请求,只保留并完成 “ABC” 的搜索。代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19val simpleFlow = flow {
repeat(3) { i ->
delay(100) // 生产很快
emit(i)
}
}
simpleFlow.collectLatest { value ->
println("开始处理数据: $value")
delay(500) // 处理很慢
println("完成处理数据: $value")
}
输出
-------------
开始处理数据: 0
开始处理数据: 1
开始处理数据: 2
完成处理数据: 2运行逻辑:
- 数据
0刚开始处理,1就来了。由于collectLatest的特性,处理0的协程被立即取消(所以没打印“完成处理数据: 0”)。 - 数据
1同理,还没处理完就被2给顶掉了。 - 只有最后一条数据
2因为后面没有新数据了,才得以平安地跑完整个流程。
- 数据
5.6 onBufferOverflow:精准的溢出控制
当蓄水池满了,新来的数据怎么办?buffer 提供了三种 onBufferOverflow 方案:
SUSPEND(默认):最稳健。池子满了上游就挂起等一等。
DROP_OLDEST:最果断。丢掉最老的数据,把最新的存进来。这在状态更新(比如最新的股价)场景中非常常用。
DROP_LATEST:最固执。丢掉当前进不来的最新数据,保住池子里的。
- 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13val fastFlow = flow {
repeat(100) { emit(it) }
}
fastFlow
.buffer(
capacity = 10,
onBufferOverflow = BufferOverflow.DROP_LATEST // 满了就丢掉最新的,保住旧的
)
.collect {
delay(100)
println("处理:$it")
}
6、合并操作符
实际开发中,我们经常需要处理来自不同数据源的流——比如同时从数据库和网络获取数据,或者需要将多个传感器的数值进行聚合。Kotlin 提供了两类核心 API 来解决这些需求:一类是「展开式合并」(把多个流揉成一个流),另一类是「配对式合并」(把多个流的数据拿来做计算)。
6.1 展开式合并:从“多路”到“单路”
6.1.1 merge:穿插发送
merge 是最直接的合并方式。它把多个独立的 Flow 放在一起,谁先发出数据,它就转发谁。
- 合并行为:完全异步,数据在时间轴上穿插。
- 代码示例:
1
2
3
4val flow1 = flowOf(1, 2).onEach { delay(100) }
val flow2 = flowOf("A", "B").onEach { delay(150) }
merge(flow1, flow2).collect { println(it) }
// 输出可能是: 1, A, 2, B (取决于实际发射时间)
6.1.2 flattenConcat:顺序展开-铺平嵌套流
针对 Flow<Flow<T>>(嵌套流),它会按顺序处理。只有第一个子流全部结束,才会开启第二个。
- 合并行为:串行展开,保证顺序。
- 代码示例:假设你有两个任务,任务 A 发送 1, 2, 3,任务 B 发送 4, 5, 6。即便任务 B 的生产速度更快,
flattenConcat也会让它老老实实在后面排队。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
fun main() = runBlocking {
// 1. 创建一个嵌套流 Flow<Flow<Int>>
val nestedFlow: Flow<Flow<Int>> = flow {
emit(flow {
emit(1); delay(100); emit(2); delay(100); emit(3)
}) // 子流 A
emit(flow {
emit(4); delay(10); emit(5); delay(10); emit(6)
}) // 子流 B(虽然延迟小,但必须排队)
}
println("开始顺序展开...")
// 2. 使用 flattenConcat 铺平
nestedFlow.flattenConcat().collect { value ->
println("收到数据: $value")
}
}
输出
-------------
开始顺序展开...
收到数据: 1
收到数据: 2
收到数据: 3
收到数据: 4
收到数据: 5
收到数据: 6
6.1.3 flattenMerge:并发展开-铺平嵌套流
针对 Flow<Flow<T>>(嵌套流),它会同时启动多个子流的收集工作,并将它们的数据按实际发射的时间顺序汇聚到主流中。
- 合并行为:并发展开,穿插转发。数据是「谁快谁先出」。如果子流 A 慢、子流 B 快,你会看到 B 的数据穿插在 A 的数据之间。
- 容量控制:通过
concurrency参数控制同时运行的子流数量(默认值为 16)。concurrency = 1:退化为flattenConcat,变成纯串行。concurrency = n:同时最多允许 n 个子流处于活动状态。如果上游发出了第 n+1 个流,它会挂起并等待前面的流结束。
- 代码示例:假设你有多个下载任务,每个任务都是一个 Flow。我们希望它们同时开始,而不是一个接一个下载。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
fun main() = runBlocking {
val taskFlow: Flow<Flow<String>> = flow {
emit(flow {
delay(200); emit("任务 A - 进度 50%"); delay(200); emit("任务 A - 完成")
})
emit(flow {
delay(100); emit("任务 B - 进度 50%"); delay(100); emit("任务 B - 完成")
})
}
println("开始并发展开(flattenMerge)...")
// concurrency 参数决定同时并行的流数量
taskFlow.flattenMerge(concurrency = 2).collect { value ->
println("收到信号: $value")
}
}
输出
-------------
开始并发展开(flattenMerge)...
收到信号: 任务 B - 进度 50% // B 延迟小,先出数据
收到信号: 任务 A - 进度 50%
收到信号: 任务 B - 完成
收到信号: 任务 A - 完成
6.1.4 flatMapConcat:严谨的“接力赛”
它将上游发出的每个元素转换(Map)成一个新的 Flow,并以串行的方式将这些 Flow 的数据收集起来。
- 行为特性
- 顺序执行:它非常守规矩。上游发出第一个元素并触发转换后的 Flow,
flatMapConcat会一直等待这个 Flow 完全结束,才会去处理上游的第二个元素。 - 无并发:它内部没有并行机制。无论上游发多快,下游的处理始终是一个接一个的。
- 背压支持:由于它是串行的,如果子流处理很慢,上游的生产协程会自动挂起,天然支持背压。
- 顺序执行:它非常守规矩。上游发出第一个元素并触发转换后的 Flow,
- 代码示例:顺序执行网络请求。假设你需要先获取「用户 ID」,拿到后再去获取「用户详情」,最后获取「用户订单」。这三个任务必须有先后顺序。
1 |
|
6.1.5 flatMapMerge:火力全开的“并发处理器”
它将上游发出的每个元素转换(Map)成一个新的 Flow,并并发地收集这些子 Flow 的数据。
- 行为特性
- 并发执行:它不会等待前一个子流结束。只要上游发出了新元素,
flatMapMerge就会立即启动一个新的协程来处理转换后的子流。 - 数据穿插:因为是并发执行,子流之间的数据发送顺序取决于它们的耗时。执行快的子流数据会先到达下游。
- 并发数控制:通过
concurrency参数限制同时运行的任务数量,防止瞬时并发过高撑爆内存或线程池。
- 并发执行:它不会等待前一个子流结束。只要上游发出了新元素,
- 代码示例:并发请求多个用户信息。假设我们有一组用户 ID,我们需要同时获取他们的详情,而不是一个接一个地等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
fun main() = runBlocking {
val userIds = flowOf(1, 2, 3) // 上游:多个用户 ID
println("开始并发处理用户信息...")
val startTime = System.currentTimeMillis()
userIds.flatMapMerge(concurrency = 3) { id ->
// 将每个 ID 转换为并发的异步请求 Flow
fetchUserDetail(id)
}.collect { detail ->
println("收到详情: $detail (耗时: ${System.currentTimeMillis() - startTime}ms)")
}
}
fun fetchUserDetail(id: Int): Flow<String> = flow {
// 模拟不同接口耗时不同,用户 1 最慢
val delayTime = if (id == 1) 400L else 200L
delay(delayTime)
emit("用户 $id 的数据")
}
输出
-------------
开始并发处理用户信息...
收到详情: 用户 2 的数据 (耗时: 296ms)
收到详情: 用户 3 的数据 (耗时: 307ms)
收到详情: 用户 1 的数据 (耗时: 475ms)
6.1.6 flatMapLatest:追新弃旧的“搜索神器”
它将上游发出的每个元素转换(Map)成一个新的 Flow。但关键在于,一旦上游发出了新元素,它会立即取消之前由旧元素生成的子流,并转而收集新流的数据。
- 行为特性
- 立即中断:它对上游信号极其敏感。只要新信号一到,旧的异步任务(如网络请求、数据库查询)会被立刻掐断。
- 单流运行:在任何给定的时间点,下游只会活跃地收集一个子流的数据。
- 资源节约:通过及时取消过时的请求,极大地节省了 CPU、内存和网络带宽。
- 代码示例:搜索框联想。这是
flatMapLatest最经典的实战案例。用户在输入框打字时,我们只想要最后一次完整输入的搜索结果。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
fun main() = runBlocking {
// 模拟用户的连续输入流
val searchQuery = flow {
emit("A"); delay(100)
emit("AB"); delay(100)
emit("ABC")
}
println("开始实时搜索处理...")
searchQuery.flatMapLatest { query ->
// 将搜索词映射为网络请求流
searchRemoteApi(query)
}.collect { result ->
println("展示结果: $result")
}
}
fun searchRemoteApi(query: String): Flow<String> = flow {
println(">>> 正在为 '$query' 发起网络请求...")
delay(500) // 模拟网络延迟
emit("'$query' 的搜索结果")
println("<<< '$query' 请求成功完成")
}
输出
-------------
开始实时搜索处理...
>>> 正在为 'A' 发起网络请求...
>>> 正在为 'AB' 发起网络请求... (此时 'A' 的请求被取消)
>>> 正在为 'ABC' 发起网络请求... (此时 'AB' 的请求被取消)
<<< 'ABC' 请求成功完成
展示结果: 'ABC' 的搜索结果
6.2 配对式合并:数据间的“化学反应”
这类操作符不仅仅是转发数据,更侧重于多路数据的协同计算。
6.2.1 combine:感知全局的“状态同步器”
它同时监听两个或多个 Flow。只要其中任何一个 Flow 发出了新数据,它就会取出所有流的最新值,通过你定义的变换函数计算出一个新结果。
- 行为特性
- 最新值驱动:它不要求所有流同步发射。哪怕 Flow A 没动,只要 Flow B 动了,
combine也会立刻用 “A 的旧值 + B 的新值” 算一遍。 - 元素复用性:这是它与
zip的最大区别。一个流发出的值可以被多次复用,直到该流发出下一个新值为止。 - 等待机制:在初始阶段,
combine会等待所有参与的 Flow 都至少发出过一个值后,才会触发第一次计算。
- 最新值驱动:它不要求所有流同步发射。哪怕 Flow A 没动,只要 Flow B 动了,
- 代码示例:登录按钮的可用性校验。这是 Android 实战中最经典、最优雅的
combine用法。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35fun main() = runBlocking {
val phoneFlow = MutableStateFlow("") // 手机号输入
val codeFlow = MutableStateFlow("") // 验证码输入
val protocolFlow = MutableStateFlow(false) // 是否勾选协议
// 使用 combine 聚合三个状态
val isLoginBtnEnabled = combine(phoneFlow, codeFlow, protocolFlow) { phone, code, accepted ->
// 只有三个条件同时满足,才返回 true
phone.length == 11 && code.length == 6 && accepted
}
// 模拟 UI 订阅
val job = launch {
isLoginBtnEnabled.collect { enabled ->
println(">>> 登录按钮状态: ${if (enabled) "可用" else "禁用"}")
}
}
// 模拟用户操作
delay(100); phoneFlow.value = "13800138000"
delay(100); codeFlow.value = "123456"
delay(100); protocolFlow.value = true // 此时按钮才应该亮起
delay(100); protocolFlow.value = false // 勾选取消,按钮再次禁用
delay(500)
job.cancel()
}
输出
-------------
>>> 登录按钮状态: 禁用 (初始等待中...)
>>> 登录按钮状态: 禁用 (手机号填了)
>>> 登录按钮状态: 禁用 (验证码填了)
>>> 登录按钮状态: 可用 (协议勾选,全部满足!)
>>> 登录按钮状态: 禁用 (协议取消)
6.2.2 zip:严格对齐的“拉链”
它将两个 Flow 的元素按索引顺序一一配对。第一个流的第 n 个元素,必须等到第二个流的第 n 个元素发射后,两者才会结合并向下游发射。
- 行为特性
- 两两配对:它遵循严格的「拉链」逻辑(Zip)。
flowA[0]只跟flowB[0]结合,flowA[1]只跟flowB[1]结合。 - 不可复用性:一旦两个元素配对成功并转换发出,它们就从各自的等待队列中消失了。这与
combine的「复用旧值」截然不同。 - 短板效应:合并后的 Flow 会在任意一个子流结束时立即结束。如果 Flow A 发了 10 个值,Flow B 只发了 2 个,那么 zip 后的流只会发出 2 个结果,剩下的 8 个 A 元素会被无情抛弃。
- 两两配对:它遵循严格的「拉链」逻辑(Zip)。
- 代码示例:双接口并行请求汇总。在 Android 中,我们经常需要同时请求两个接口,且只有当两个接口都返回数据时,渲染界面才有意义。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19fun main() = runBlocking {
val names = flowOf("张三", "李四", "王五").onEach { delay(100) }
val scores = flowOf(95, 88).onEach { delay(300) } // 注意:比 names 少一个
println("开始拉链合并 (zip)...")
names.zip(scores) { name, score ->
"$name 的成绩是: $score"
}.collect { result ->
println(">>> 最终结果: $result")
}
}
输出
-------------
开始拉链合并 (zip)...
>>> 最终结果: 张三 的成绩是: 95 (等待了 300ms)
>>> 最终结果: 李四 的成绩是: 88 (又等待了 300ms)
// 王五被抛弃了,因为 scores 已经结束,zip 随之终止
6.2.3 combineTransform:自由度极高的“逻辑转换器”
它结合了 combine 的多流监听能力和 transform 的灵活发射能力。它允许你在多个流的最新值到达时,手动控制发射(emit)的时机和次数。
- 行为特性
- 手动发射控制:与
combine自动返回闭包结果不同,combineTransform要求你显式调用emit()。你可以不发、发一次、或在一个循环里发多次。 - 异步友好:在转换块内部,你可以直接调用
delay或其他挂起函数,非常适合在合并数据时顺便做个网络请求或耗时计算。 - 最新值触发:继承了
combine的特性,任何一个上游流更新,都会触发这个转换块重新执行。
- 手动发射控制:与
- 代码示例:带缓存检查的复杂搜索。假设你有两个流:一个是“搜索词”,一个是“网络状态”。只有当网络在线时才发起搜索,且如果搜索词太短,我们直接提示错误。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39fun main() = runBlocking {
val queryFlow = MutableStateFlow("")
val networkFlow = MutableStateFlow(true) // 模拟网络状态
println("开始执行 combineTransform...")
combineTransform(queryFlow, networkFlow) { query, isOnline ->
// 1. 逻辑过滤:如果网络离线,不发请求,直接发提示
if (!isOnline) {
emit("当前网络不可用,请检查设置")
return@combineTransform
}
// 2. 条件判断:搜索词太短不触发异步逻辑
if (query.length < 2) {
emit("请输入至少2个字符")
} else {
// 3. 异步操作:模拟合并时的耗时转换
emit("正在搜索: $query...")
delay(500)
emit("'$query' 的搜索结果已加载")
}
}.collect { status ->
println(">>> UI 收到状态: $status")
}
// 模拟操作
delay(100); queryFlow.value = "A"
delay(1000); queryFlow.value = "Android"
delay(1000); networkFlow.value = false // 模拟掉线
}
输出
-------------
>>> UI 收到状态: 请输入至少2个字符
>>> UI 收到状态: 请输入至少2个字符 (A 发射了)
>>> UI 收到状态: 正在搜索: Android...
>>> UI 收到状态: 'Android' 的搜索结果已加载
>>> UI 收到状态: 当前网络不可用,请检查设置
6.2.4 combine vs combineTransform:怎么选?
| 特性 | combine |
combineTransform |
|---|---|---|
| 输出形式 | 必须有返回值(1:1 映射) | 手动 emit(1:N 或 1:0) |
| 灵活性 | 较低,适合简单的状态组合 | 极高,适合复杂的异步转换逻辑 |
| 代码可读性 | 简洁直观 | 逻辑较重时可能稍显复杂 |
7、终端转换操作符
在 Kotlin Flow 中,终端操作符是流的「终点站」。所有的中间操作(如 map, filter)都是懒加载的,只有当你调用了终端操作符,Flow 才会真正开始生产数据并执行逻辑。终端操作符本质上都是 collect 的特化实现。下面为你逐一拆解这些「终点站」API。
7.1 first 与 firstOrNull
它们只关心流发出的第一个元素。一旦拿到符合条件的第一个值,就会立即取消上游流的收集,从而节省资源。
行为特性:拿到第一个满足条件的数就立刻「掐断」流(Cancel)。
代码示例:
1
2
3
4
5val flow = flowOf(1, 2, 3).onEach { println("生产: $it") }
// 拿到第一个大于 1 的数就走
val result = flow.first { it > 1 }
println("最终结果: $result")运行效果:你会发现只打印了「生产: 1」和「生产: 2」,数字 3 根本不会被生产,因为 first 拿到 2 后就直接 Cancel 了。
注意:如果流为空,
first会抛出NoSuchElementException;而firstOrNull会安全地返回null。
7.2 last 与 lastOrNull
与 first 相反,它会坚持到流的最后一刻,只返回最后一个发出的元素。
- 行为特性:必须等流全部走完,才给你最后一个。
- 代码示例: 注意:不要对“无限流”(如位置监听、传感器长连接)调用
1
2val result = flowOf("A", "B", "C").last()
println("最后一个是: $result") // 输出: Clast,否则协程会永久挂起,因为流永远不会结束。
7.3 single 与 singleOrNull
这两个 API 带有强烈的校验逻辑:它要求流中有且仅有一个元素。
- 行为特性:这是一种“严格校验”。它要求流里有且仅有一个元素。多一个或少一个都会抛异常。
- 代码示例:
1
2
3
4// 正常情况
val singleValue = flowOf(100).single()
// 异常情况:如果 flowOf(100, 200).single() 则会抛出 IllegalStateException - 适用场景:适用于通过唯一 ID 查询数据库,逻辑上预期结果必须只有一个的场景。
7.4 count
它统计流中符合条件的元素个数,而不必把所有数据都存入内存。
- 行为特性:在
collect的过程中计数。 - 代码示例:
1
2
3val errorCount = flowOf(200, 404, 500, 200)
.count { it >= 400 }
println("错误请求数: $errorCount") // 输出: 2 - 适用场景:统计日志中错误出现的次数。
flow.count { it.level == ERROR }。它比toList().size优雅得多,因为它是流式处理,不需要在内存里开辟空间存整个列表。
7.5 toList 与 toSet
将异步的数据流固化为内存中的集合,方便后续的随机访问。
- 行为特性:把流里的每一滴水都装进桶里。
- 代码示例:
1
2val list = flowOf(1, 2, 2, 3).toList() // [1, 2, 2, 3]
val set = flowOf(1, 2, 2, 3).toSet() // [1, 2, 3] (去重) - 风险提示:在 Android 中处理大数据量时,慎用
toList,防止 OOM(内存溢出)。
7.6 toCollection
它是一个挂起函数,通过调用 collect 收集 Flow 中的所有元素,并将其依次添加到你提供的目标集合中,最后返回该集合。
- 行为特性:
- 对象复用:与
toList每次都创建一个新列表不同,toCollection强制要求你传入一个已有的MutableCollection。这在需要减少内存抖动、复用大对象时非常有用。 - 类型灵活:只要是
MutableCollection的子类(如ArrayList、HashSet、LinkedHashSet甚至自定义集合),都可以作为目标。 - 阻塞收集:它会持续收集直到上游 Flow 完成(
onComplete)。如果上游是无限流,此函数将永远挂起。
- 对象复用:与
- 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15fun main() = runBlocking {
// 1. 准备一个现有的目标集合
val myArchive = mutableListOf<String>("初始数据")
val flow = flowOf("Kotlin", "Coroutine", "Flow").onEach { delay(50) }
println("开始填充集合...")
// 2. 将 Flow 数据收集并填充到 myArchive 中
flow.toCollection(myArchive)
// 3. 此时 myArchive 已经被修改
println("最终集合内容: $myArchive")
// 输出: [初始数据, Kotlin, Coroutine, Flow]
} - 实战技巧:配合
HashSet去重。如果你从数据库查出了一批数据,又从网络查出了一批数据,想要快速合并并去重,可以这样做:1
2
3
4
5
6
7
8val uniqueTags = hashSetOf<String>()
// 先收集第一波
tagsFlow1.toCollection(uniqueTags)
// 再收集第二波,同一个对象会自动处理去重
tagsFlow2.toCollection(uniqueTags)
updateUI(uniqueTags)
7.7 produceIn
这是唯一一个非挂起的终端操作符。它会将 Flow 转换为一个 ReceiveChannel。
- 行为特性:它是唯一一个非挂起的终端操作符。它会启动一个新协程,把 Flow 的数据塞进一个
Channel(通道)里。 - 代码示例:
1
2
3
4// 需要传入一个 CoroutineScope
val channel = flowOf(1, 2, 3).produceIn(viewModelScope)
// 之后可以在任何地方通过 channel.receive() 获取数据 - 适用场景:当你需要将 Flow 的数据提供给一些不方便直接写
collect的旧代码或特定异步组件时。