实时的用户输入、多个网络请求的响应,再加上数据库的频繁更新,很容易让你的代码变得混乱不堪。而这,正是 Kotlin Flow 要帮你解决的问题。作为基于协程构建的响应式流 API,Kotlin Flow 让你可以用声明式的方式优雅地处理异步数据流。但要想真正发挥它的强大能力,关键在于熟练掌握各种操作符。
本指南就是为你量身打造的速查手册——聚焦最核心的操作符,清晰说明它们的适用场景和使用方法,助你高效驾驭 Kotlin Flow。
1、一对一转换
1.1 map
将 Flow 中发出的每个元素转换为新元素,实现一对一映射。
1 | fun main() = runBlocking { |
output:
Length of 'Kotlin' is 6
Length of 'Flow' is 4
✦ 适用场景:需要将每个项转换为不同类型或格式,例如在 ViewModel 中将 Date 对象格式化为可显示的 String。
1.2 filter
1 | fun main() = runBlocking { |
✦ 适用场景:根据条件丢弃不需要的值,例如过滤空搜索词或无效数据。
1.3 take
限制性操作符,仅发出 Flow 的前 n 个元素,随后取消 Flow 执行。
1 | fun main() = runBlocking { |
2、累积值
2.1 reduce
终端操作符,从 Flow 的第一个元素开始累积值,并对当前累加器与每个元素执行操作。若 Flow 为空则抛出异常。
1 | fun main() = runBlocking { |
2.2 fold
类似于 reduce,但 fold 需要一个初始值,是更安全的选择。即使 Flow 为空也会返回初始值。
1 | fun main() = runBlocking { |
2.3 runningReduce / scan
中间操作符,在每个元素处理时都发出当前累积值。中间操作符不会触发 Flow 的收集,所以需要 collect 触发 Flow 的收集,而 reduce 这种终端操作符会触发 Flow 的收集。 scan 是更通用的版本,支持指定初始种子值。
1 | fun main() = runBlocking { |
✦ 适用场景:他们会返回每一步的结果,而 reduce / fold 只返回最终结果。适用场景:计算累计总和、跟踪进度,或在状态机中展示状态历史。
3、发出多个值
3.1 transform
高度灵活的操作符,可为每个输入元素发出零个、一个或多个值,对输出流有更强控制力。
1 | fun main() = runBlocking { |
✦ 适用场景:执行复杂转换、引入副作用(如日志记录),或根据单个输入有条件地发出多个值。
4、扁平化嵌套
4.1 flatMapConcat
将每个元素转换为一个 Flow,然后依次连接这些 Flow,只有当前一个 Flow 完成后,下一个才开始。
1 | fun getNumbersFlow(id: Int): Flow<String> = flow { |
✦ 适用场景:仔细看,第一个流的每个数字都会参与到第二个流中。顺序敏感的操作,例如依次上传多个文件,或执行依赖型网络请求。
4.2 flatMapMerge
并发合并由转换函数生成的多个 Flow,可通过 concurrency 参数控制并发数量。
1 | fun getNumbersFlow(id: Int): Flow<String> = flow { |
✦ 适用场景:从结果上注意区分 flatMapConcat,flatMapMerge 不会保证合并的顺序。适用场景:顺序无关的并行操作,例如同时从多个数据源获取数据。
4.3 flatMapLatest
当新元素发出时,立即取消上一个元素对应的 Flow。
1 | val searchQuery = flowOf("search", "search with new term").onEach { delay(200) } |
✦ 适用场景:实时搜索功能,或任何只需关注最新事件结果的场景。
5、上下文与缓冲
5.1 flowOn
更改用于执行上游 Flow 的 CoroutineContext,是 Flow 中切换调度器的正确方式。
1 | fun heavyWork(): Flow<Int> = flow { |
5.2 buffer
通过解耦生产者与消费者实现并发执行:生产者将项放入缓冲区,消费者从中取出。
1 | suspend fun main() { |
✦ 适用场景:得益于 buffer,最后整个数据的收集时间要小于 (200 + 300) * 3。使用场景:当生产者与消费者处理速度不一致时,提升性能。
5.3 conflate
一种缓冲形式,当收集器处理太慢时会丢弃中间值,确保始终获取最新值。
1 | suspend fun main() { |
✦ 适用场景:UI 更新中无需显示中间状态,如股票行情或 GPS 位置更新。
5.4 collectLatest
终端操作符,当新值发出时,取消对前一个值的收集逻辑。
1 | suspend fun main() { |
✦ 适用场景:注意看这里的结果,Finished collecting 只收集了最后一次的值,一定要注意这个特性。适用场景:某项操作耗时较长,且应在新项到达时被取消,例如将用户输入保存到数据库。
6、合并
6.1 zip
等待两个 Flow 各自发出一项后进行组合。任一源 Flow 结束,结果 Flow 即结束。
1 | suspend fun main() { |
6.2 combine
组合两个 Flow 的最新值。只要任一源 Flow 发出新值(且双方至少各发出过一次),就会触发一次发射。
1 | suspend fun main() { |
✦ 适用场景:响应多个数据源的变化。
6.3 merge
将多个 Flow 合并为一个,按发出顺序交错输出所有值。
1 | suspend fun main() { |
✦ 适用场景:将来自不同 UI 组件的多个事件流合并为单一处理流。
7、错误与完成处理
7.1 catch
捕获上游 Flow(即 catch 之前的操作符)中发生的异常,但不捕获下游收集器中的异常。
1 | suspend fun main() { |
✦ 适用场景:优雅地处理错误、提供默认值或记录失败信息。
7.2 onCompletion
在 Flow 完成时(无论成功或异常)执行指定操作。成功时 cause 为 null。
1 | suspend fun main() { |
7.3 retryWhen
在发生异常时根据谓词(包含异常原因和重试次数)决定是否重试。
1 | suspend fun main() { |
retryWhen 的回调有两个参数:
- cause:导致 Flow 失败的异常(Throwable 类型)。
- attempt:当前是第几次重试,从 0 开始计数。
8、工具与副作用
8.1 onEach
对 Flow 中每个元素执行指定操作,但不修改元素本身。
1 | suspend fun main() { |
✦ 适用场景:用于日志、调试或埋点等副作用场景,在不改变数据的前提下观察 Flow。
8.2 debounce
过滤在指定超时内被新值取代的值,仅发出“突发”中的最后一个值。
1 | suspend fun main() { |
✦ 适用场景:处理快速用户输入(如搜索框),避免每次按键都触发 API 请求。
8.3 distinctUntilChanged
抑制与前一个值相同的重复发射。
1 | suspend fun main() { |
✦ 适用场景:防止 UI 因状态未变而进行不必要的重组或更新。
9、核心决策指南
- 数据转换:用于修改流中的值:
map,filter,distinctUntilChanged。 - 累积值:用于跟踪状态或计算持续结果:
scan,runningReduce,fold,reduce。 - 从单一输入发出多个值:用于自定义发射逻辑:
transform。 - 处理嵌套或动态 Flow:根据内部 Flow 行为选择:
flatMapConcat(顺序执行)、flatMapMerge(并发执行)、flatMapLatest(取消旧任务,保留最新)。 - 性能与背压控制:用于优化收集效率与响应性:
buffer,conflate,collectLatest,flowOn。 - 合并多个流:用于组合多个数据源:
zip(按顺序配对)、combine(组合最新值)、merge(交错合并)。 - 错误处理与完成逻辑:用于应对异常和生命周期事件:
catch,onCompletion,retryWhen。 - 调试与副作用:用于插入日志或副作用操作:
onEach。 - 处理高频发射:用于抑制过快的发射频率:
debounce。 - 触发 Flow 执行:终端操作符:
collect,collectLatest,first,single,toList,reduce,fold。