Flow 操作符是处理异步数据流的核心工具。它们遵循函数式编程范式,允许你以链式调用的方式对数据进行变换、过滤、组合和缓冲。为了方便理解,我们可以将 Flow 操作符分为 中间操作符、末端操作符、组合操作符、线程上下文与生命周期 四大类:
中间操作符
这类操作符的作用是「转换」。它们会接收一个 Flow,进行处理后返回一个新的 Flow。它们是冷操作符,意味着在调用末端操作符之前,这些逻辑不会执行。
- 过滤类:
filter: 保留符合条件的元素。filterNotNull: 剔除所有null值,并自动收紧泛型类型(从T?变为T)。distinctUntilChanged: 过滤掉连续重复的值。
- 限流类:
take: 仅取前 n 个元素。drop: 跳过前 n 个元素。debounce: 防抖处理,仅在指定时间内没有新值发射时才转发最后一个值(常用于搜索框输入)。
- 变换类:
map: 对每个发射的值进行转换(如将Int转为String)。transform: 比map更灵活,可以在 Lambda 中多次调用emit(),甚至不发射。
- 过滤类:
末端操作符
这类操作符会触发 Flow 的执行(激活冷流)。它们通常是挂起函数(
suspend),并返回一个结果或完成整个流的处理。collect: 最基础的末端操作符,开始收集流中的值。first/last: 获取流中的第一个或最后一个元素(若为空则抛异常)。toList/toSet: 将流中的所有元素收集到集合中。reduce/fold: 对流中的值进行累加计算。fold需要提供初始值。
组合操作符
当你需要处理多个数据源,或者在一个 Flow 中触发另一个 Flow 时,会用到这些操作符。
zip: 将两个流的元素「对齐」合并。只有当两个流都发射了对应索引的值时,才会产出结果。combine: 只要其中一个流有新值,就取两个流中最新的值进行组合。- 展平流
flatMapConcat: 按顺序连接流(处理完第一个再处理第二个)。flatMapMerge: 并发处理多个流。flatMapLatest: 当新元素到来时,立即取消之前的子流处理,只处理最新的。
线程上下文与生命周期
在 Android 开发中,切换线程和观察声明周期至关重要。
flowOn: 指定上游代码块运行的调度器(如Dispatchers.IO)。它只改变它之前的操作符所在的上下文。catch: 优雅地处理异常,而不是直接崩溃。onStart/onCompletion: 分别在 Flow 开始(收集之前)和完成(不论成功与否)时执行操作。retry: 在发生错误时重新运行上游代码,可指定重试次数和条件。
1、filter 操作符
在 Kotlin Flow 的开发中,过滤操作符是我们处理数据流的第一道防线。它们决定了哪些数据可以继续传递,哪些数据该被拦截。
1.1 filter:通用过滤器
filter 是所有过滤操作的基础。它接收一个返回 Boolean 的 Lambda 表达式。
API 语义:如果 Lambda 返回
true,则保留元素;返回false,则丢弃。代码示例:
1
2
3flowOf(1, 2, 3, 4, 5)
.filter { it % 2 == 0 } // 只保留偶数
.collect { println(it) } // 输出: 2, 4适用场景:最基础的条件筛选,如过滤价格大于 100 的商品。
1.2 filterNot:逻辑取反
filterNot 是 filter 的反向版本。虽然逻辑上等同于 filter { !condition },但它能让代码更符合自然语言的阅读习惯。
API 语义:如果 Lambda 返回
true,则丢弃元素;返回false,则保留。代码示例:
1
2val userFlow = flowOf(User(name = "A", isAdmin = true), User(name = "B", isAdmin = false))
userFlow.filterNot { it.isAdmin } // 语义明确:过滤掉管理员适用场景:当你更关注「排除哪些数据」而不是「保留哪些数据」时。
1.3 filterNotNull:空安全处理
在 Android 开发中,我们经常处理 Flow<T?>。如果后续操作符需要非空对象,手动判断 null 会非常繁琐。
API 语义:剔除流中所有的
null元素。类型转换:这是该 API 最强大的地方。它能将
Flow<T?>转换为Flow<T>,在编译器层级实现类型收紧。代码示例:
1
2val nullableFlow: Flow<String?> = flowOf("A", null, "B")
val nonNullFlow: Flow<String> = nullableFlow.filterNotNull() // 类型已进化适用场景:从数据库或 API 获取到可空数据流,需要清理
null值以供 UI 渲染。
1.4 filterIsInstance:运行时类型筛选
当你处理混合类型的流(如 Flow<Any>)时,你可能只对某种特定类型的数据感兴趣。
API 语义:利用 Kotlin 的
reified(类型实化)关键字,筛选出属于指定类及其子类的元素。代码示例:
1
2
3
4
5
6val values = flowOf("A", 1, "B")
values.filterIsInstance<String>() // 仅保留 String 类型
.collect { /* 此时 it 的类型已自动推断为 String */ }
values.filterIsInstance(String::class) // 仅保留 String 类型
.collect { /* 此时 it 的类型已自动推断为 String */ }泛型陷阱:这是许多开发者最容易忽略的 JVM 底层限制。虽然
reified允许我们写出filterIsInstance<List<String>>(),但由于 JVM 类型擦除,运行时系统实际上只知道它是一个List,无法区分它是List<String>还是List<Int>。结论:
filterIsInstance只能精确识别最外层的类,无法穿透嵌套泛型。如果你必须过滤出
List<String>,你需要采用「先探测外层,再手动校验内层」的组合策略:1
2flow.filterIsInstance<List<*>>() // 1. 先确保它是 List
.filter { it.all { element -> element is String } } // 2. 手动校验内部元素类型适用场景:处理多类型 UI 布局、状态机事件流。
2、distinctUntilChanged 操作符
处理频繁发射的数据流时,「去重」是优化性能、减少 UI 冗余刷新的关键手段。distinctUntilChanged 系列操作符正是为此而生。
2.1 distinctUntilChanged:默认去重
这是最基础、最常用的去重 API。它不需要任何参数,直接作用于数据流。
API 语义:对比当前发射的元素与上一个元素,若相等则丢弃,若不同则转发。
代码示例:
1
2
3flowOf(1, 2, 2, 3, 3, 1)
.distinctUntilChanged()
.collect { println(it) } // 输出: 1, 2, 3, 1判断机制:使用 Kotlin 的
==操作符。在底层,这对应于equals()方法的调用。空安全特性:该实现天然兼容
null。在 Kotlin 中null == null返回true,因此连续的null会被视为重复并过滤。
2.2 distinctUntilChanged { old, new -> … }:自定义去重
有时候默认的 equals() 逻辑太死板,无法满足特定的业务需求。此时你可以提供一个自定义的谓词。
API 语义:自定义「相等」的逻辑。Lambda 接收旧值和新值,返回
true表示相等(过滤),返回false表示不同(保留)。代码示例:
1
2
3
4// 只有当新旧值差值大于 0.1 时才发射
flow.distinctUntilChanged { old, new ->
abs(new - old) < 0.1
}实战场景:处理浮点数精度。例如,当两次位置经纬度的差值小于 0.0001 时,我们认为位置没有显著变化,不需要刷新地图。
2.3 distinctUntilChangedBy { it.property }
在处理复杂对象(如 User 或 Order)时,我们往往只想根据某个核心字段来决定是否去重,而不是比较整个对象。
API 语义:接收一个
keySelector函数,从原始数据中提取出一个「键(Key)」。操作符会根据这个 Key 的连续变化来决定是否过滤原始数据。关键特性:不改变源数据。
keySelector提取的 Key 仅用于比较逻辑,不会修改发送给下游的原始对象。代码示例:忽略大小写去重。
1
2
3val flow = flowOf("hello", "HELLO", "world")
flow.distinctUntilChangedBy { it.uppercase() }
.collect { println(it) } // 输出: "hello", "world"注意:虽然比较时用了大写,但输出的依然是原始的小写 “hello”,这保证了数据的完整性。
2.4 如何精准选型
- 基础类型(Int, String)或 Data Class:直接使用
distinctUntilChanged(),代码最简洁。 - 复杂对象且只需根据 ID 去重:使用
distinctUntilChangedBy { it.id },性能最高且不破坏数据结构。 - 模糊匹配或范围判断:使用带 Lambda 的
distinctUntilChanged { old, new -> ... }。
3、时间操作符
对「时间」的掌控力往往决定了代码的质量。Kotlin Flow 提供的三个时间相关操作符:timeout、sample 和 debounce,分别对应了三种完全不同的流控哲学。
3.1 timeout:节奏的“监视器”
timeout 的核心逻辑是:“上游发射太慢,我就及时止损。” 它通过监控相邻两发数据之间的时间间隔,确保流的实时性。
核心机制
- 计时启动:从
collect开始,或从上一条数据发射后立即重新计时。 - 抛出异常:若在预设时间内未等到下条数据,抛出
TimeoutCancellationException。
- 计时启动:从
代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
fun observeInputStatus(inputFlow: Flow<String>) = scope.launch {
inputFlow
.timeout(5.seconds) // 设置 5 秒超时
.catch { e ->
if (e is TimeoutCancellationException) {
emit("已停止输入") // 超时后切换状态
}
}
.collect {
println("用户正在输入: $it")
}
}
3.2 sample:固定频率的“采样员”
sample 的哲学是:“不管上游发多快,我只按我的表来。” 它将连续的流离散化,只捕捉特定时刻的快照。
核心机制
- 固定窗口:开启周期性“掐表”计时(如每 1s 一次)。
- 最新获胜:在一个周期内,无论上游发了多少条数据,只保留最新的一条。
代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14// 模拟高频变化的股票价格
val stockPriceFlow = flow {
while (true) {
emit(Random.nextDouble(100.0, 150.0))
delay(10) // 每 10 毫秒发射一次
}
}
stockPriceFlow
.sample(1000) // 每 1 秒采样一次,降低 UI 刷新频率
.onEach { price ->
updateChart(price)
}
.launchIn(viewModelScope)
3.3 debounce:动中取静的“消音器”
debounce 的逻辑是:“等你不折腾了,我再取最后的值。” 它非常擅长处理那些“高频震荡但最终会趋于稳定”的场景。
核心机制
- 动态计时:每收到新数据,旧计时器立即作废,重新开始计时。
- 静默发射:只有当上游在指定时间内(静默期)没有新数据,才发射最后缓存的数据。
代码示例:
1
2
3
4
5
6
7
8
9
10
11searchQueryFlow
.filter { it.isNotBlank() }
.debounce(500) // 只有用户停顿 500ms 才会发起搜索
.distinctUntilChanged() // 避免重复请求
.flatMapLatest { query ->
searchApi.search(query)
}
.onEach { results ->
showResults(results)
}
.launchIn(viewModelScope)避坑指南:不要在按钮点击事件中使用 Debounce。
debounce会引入感知延迟。针对按钮防抖,我们通常需要立即执行第一次并忽略后续点击。推荐方案:自定义
throttleFirst1
2
3
4
5
6
7
8
9
10fun <T> Flow<T>.throttleFirst(windowDuration: Long): Flow<T> = flow {
var lastEmissionTime = 0L
collect { value ->
val currentTime = System.currentTimeMillis()
if (currentTime - lastEmissionTime > windowDuration) {
lastEmissionTime = currentTime
emit(value)
}
}
}
4、限流操作符 drop 和 take
在处理数据流(Flow)时,我们经常需要处理「前缀」数据。Kotlin 提供了四种核心操作符来处理这种需求。它们的共同特点是:一旦判定逻辑触发(达到数量或条件不符),后续的判定行为就会彻底停止。
4.1 drop(n: Int):跳过前 N 个
这是最简单的操作符。它像一个计数器,前 n 个元素进入时会被拦截并丢弃。
- API 语义:前 n 条数据直接舍弃,从第 n+1 条开始,开启「全量转发」模式。
- 代码示例:
1
2flowOf(1, 2, 3, 4, 5).drop(2).collect { println(it) }
// 输出: 3, 4, 5 - 应用场景:当你订阅一个
SharedFlow,但不想处理重播(Replay)出来的旧缓存,只想看之后的新数据时。
4.2 dropWhile { predicate }:条件触发跳过
dropWhile 更加灵活,它不是看数量,而是看条件。
- 关键行为:它会逐一检查流入的数据。只要
predicate返回true,就丢弃;一旦遇到第一个返回false的数据,它会立即转发该数据,并永久关闭判定逻辑。 - 特别注意:一旦判定停止,后面即便再出现符合条件的数据,也不会被丢弃了。
- 代码示例:
1
2
3// 目标:跳过所有小于 3 的前缀数据
flowOf(1, 2, 3, 1, 5).dropWhile { it < 3 }.collect { println(it) }
// 输出: 3, 1, 5 (注意:第二个 1 被转发了,因为判定在遇到 3 时已终止)
4.3 take(n: Int):只取前 N 个
take 的逻辑与 drop 相反,它是为了“尽早结束”。
- API 语义:仅转发前 n 个元素。发完第 n 个后,它会主动发出一个取消信号,终止上游 Flow。
- 代码示例:
1
2flowOf(1, 2, 3, 4, 5).take(2).collect { println(it) }
// 输出: 1, 2 (流在此处结束,3、4、5 不会被生产) - 性能意义:它是「响应式」的,能告诉上游:“我够了,你可以停工了。”这在处理无限循环流时至关重要。
4.4 takeWhile { predicate }:条件触发截断
这是最具「破坏性」的操作符,用于维护流的连续有效性。
- 关键行为:只要条件满足(
true),数据就转发;一旦遇到第一个返回false的数据,该数据被丢弃,且整个 Flow 立即宣告结束。 - 代码示例:
1
2
3// 目标:只要数字小于 3 就继续转发
flowOf(1, 2, 3, 1, 5).takeWhile { it < 3 }.collect { println(it) }
// 输出: 1, 2 (遇到 3 时不符合条件,流直接掐断,后面的 1 和 5 永远见不到了)
5、map 操作符
在响应式编程中,map 是最基本的操作,但在复杂的业务场景下,单纯的映射往往不够用。为此,Kotlin Flow 提供了三个维度各异的 API 来应对同步转换、空值过滤和异步竞争。
5.1 map:基础同步映射
map 是最原始的「加工厂」,它对数据流进行一对一的转换。
- 工作模式:同步且线性。它会严格按照上游发送数据的顺序进行处理。只有当前面的数据转换完成并发送给下游后,才会处理下一条数据。
- 行为逻辑:
- 输入 1 个,输出 1 个。
- 即使转换结果是
null,也会原样转发给下游。
- 代码示例:
1
2flowOf(1, 2, 3).map { it * 2 }.collect { println(it) }
// 输出: 2, 4, 6 - 适用场景:简单的格式转换、非耗时的计算逻辑。
5.2 mapNotNull:映射与空值过滤
在实际开发中,转换逻辑往往伴随着「有效性检查」。如果转换失败,我们通常希望这条数据直接消失。
- 工作模式:转换 + 质检。它在执行完转换逻辑后,会自动检查结果。
- 行为逻辑:
- 等价于
map { ... }.filter { it != null }。 - 如果闭包返回
null,这条数据会被流静默丢弃,下游不会收到任何信号。
- 等价于
- 代码示例:
1
2
3val data = flowOf("1", "2", "abc", "3")
data.mapNotNull { it.toIntOrNull() }.collect { println(it) }
// 输出: 1, 2, 3 ("abc" 转换结果为 null,被自动过滤) - 适用场景:数据清洗、从混合列表中提取特定类型、尝试解析可能失败的 JSON。
5.3 mapLatest:异步中断映射
这是处理高频触发任务的神器。它的核心逻辑不是「顺序」,而是「最新优先」。
工作模式:异步且可抢占。当新数据到达时,如果旧的数据转换任务还在进行,
mapLatest会毫不留情地「取消」旧任务,转而启动新任务。行为逻辑:
- 它利用了 Kotlin 协程的取消机制。
- 不保证每一个输入都有输出,只保证「最新」的一个输入能最终完成转换(如果之后没有新数据干扰)。
经典案例:搜索建议,当用户在搜索框输入“K”、“Ko”、“Kot”时,每一个字母都会触发 API 请求。
- 输入“K”,启动请求 R1。
- 还没等 R1 回来,用户输入了“Ko”。
mapLatest立即取消 R1,启动 R2。
这样可以极大节省资源,并确保 UI 显示的内容永远匹配用户最后一次的操作。
5.4 mapLatest:去重映射
在实际开发(尤其是搜索框、过滤器联动等场景)中,单纯使用 mapLatest 有时还是会造成资源浪费。配合 distinctUntilChanged 可以实现更高级的性能优化。mapLatest 虽然能取消旧任务,但它无法阻止重复任务的开启。
- 场景:用户在搜索框输入了 “Kotlin”,然后误触删除了 “n” 又飞快补了一个 “n”。
- 问题:此时上游发出的内容序列是
["Kotlin", "Kotli", "Kotlin"]。对于mapLatest来说,最后的 “Kotlin” 是一个新信号,它会取消之前的请求并重新发起一个新的 “Kotlin” 搜索请求。 - 浪费:明明搜索词没变,却重新请求了一次网络。
- 代码示例:过滤掉与上一次发出的值重复的数据。
1
2
3
4
5
6
7searchQueryFlow
.debounce(300) // 防抖:停顿300ms才向下发送
.distinctUntilChanged() // 去重:如果内容没变,不触发后续逻辑
.mapLatest { query -> // 异步:只处理最新的请求
searchApi.doSearch(query)
}
.collect { ... }
6、transform 操作符
如果说 map 和 filter 是精美的「成品家具」,那么 transform 系列就是你手中的「木材与电锯」——它允许你从底层开始,随心所欲地构建数据流转换逻辑。
6.1 transform:万能的发射控制器
transform 是所有转换操作符的鼻祖。事实上,map 在底层就是通过调用 transform 实现的。
- 核心逻辑:它不再要求你「返回」一个值,而是给你一个
FlowCollector上下文。你必须手动调用emit()来发送数据。 - API 特性:
- 灵活性:你可以不发射(过滤)、发射一次(映射)、或发射多次(一对多)。
- 上下文:大括号内的
this指向收集器,允许你自由控制发射时机。
- 代码示例:
1
2
3
4
5
6flowOf(1, 2, 3).transform { value ->
emit("开始处理 $value")
if (value % 2 == 0) {
emit("发现偶数: $value") // 一对多发射
}
}.collect { println(it) } - 适用场景:需要根据上游的一个信号产生多个结果,或者需要复杂的嵌套逻辑决定是否发射数据。
6.2 transformWhile:带“自毁”机制的转换
当你需要在满足某个条件时立即掐断整个 Flow,transformWhile 是最高效的工具。它完美结合了 transform 的灵活性和 takeWhile 的截断能力。
- 核心逻辑:闭包必须返回一个
Boolean。只要返回true,流就继续;一旦返回false,整个 Flow 立即宣告结束。 - API 特性:
- 执行时序:先执行大括号内的代码,最后根据返回值决定是否继续。
- 注意事项:如果你在代码块里先
emit(data)然后return false,那么这条data依然会被发送出去,流会在发送完这一条后关闭。
- 代码示例:
1
2
3
4flowOf(1, 2, 3, 4, 5).transformWhile { value ->
emit(value)
value < 3 // 当 value 等于 3 时返回 false,Flow 停止,4 和 5 永远不会被处理
}.collect { println(it) } - 适用场景:监控数据流,当发现某个异常信号或达到某个阈值时,立即停止所有后续处理。
6.3 transformLatest:最新优先的异步加工
这是处理「高频更新」场景的终极利器。它是 mapLatest 的底层实现,核心在于任务的可取消性。
- 核心逻辑:当上游发送新数据时,如果上一个数据的
transformLatest闭包还在运行(比如正在进行耗时的网络请求或delay),它会立即取消旧任务。 - API 特性:
- 竞争机制:它不保证每个输入都有产出。如果输入太快,中间过程会被直接“掐掉”。
- 异步友好:非常适合闭包内包含挂起函数(
suspend)的场景。
- 代码示例:
1
2
3
4
5
6
7
8
9flow {
emit(1)
delay(50)
emit(2)
}.transformLatest { value ->
delay(100) // 模拟耗时处理
emit("结果: $value")
}.collect { println(it) }
// 输出:由于 2 在 50ms 时到达,此时 1 的 100ms 处理还没完,1 被取消,最终只输出 "结果: 2" - 适用场景:搜索框实时联想、UI 状态快速连续切换时的资源释放。
7、withIndex 操作符
在真实开发过程中,我们经常需要知道某个数据是流中的第几个元素。虽然我们可以手动维护一个外部计数器,但 Kotlin 提供了一个更加优雅的解决方案:withIndex 操作符。
7.1 withIndex:流的序号化
withIndex 是一个中间操作符,它的唯一使命就是为流中的每个元素按顺序编号。
功能定义:它会将原始流中的每一个元素 T 包装成一个带序号的复合对象。
序号规则:从 0 开始递增。
输入输出:
- 输入:
Flow<T> - 输出:
Flow<IndexedValue<T>>
- 输入:
核心结构:
IndexedValue<T>当你使用了
withIndex后,下游接收到的不再是裸数据,而是一个IndexedValue类型的 Data Class。它在 Kotlin 标准库中的结构大致如下:
1
data class IndexedValue<out T>(val index: Int, val value: T)
index:这是系统自动分配的索引,代表它是流中的第几个元素。value:这是你原来的数据。
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12flowOf("A", "B", "C")
.withIndex()
.collect { item ->
println("第 ${item.index} 个元素是 ${item.value}")
}
flowOf("A", "B", "C")
.withIndex()
.collect { (index, value) ->
// 直接使用 index 和 value 变量
println("第 $index 个元素是 $value")
}
7.2 collectIndexed:终端收集的“计数器”
collectIndexed 是一个终端操作符(Terminal Operator)。这意味着当你调用它时,整个数据流才会真正开始启动并产生数据。
功能定义:它在收集上游发出的每一个元素时,会自动附带一个从 0 开始的整数索引。
函数签名:
1
2
3suspend inline fun <T> Flow<T>.collectIndexed(
crossinline action: suspend (index: Int, value: T) -> Unit
)返回值:
Unit。由于它是终结操作,它不会返回新的 Flow,而是直接消费数据。核心语法:双参数 Lambda。
与普通
collect不同,collectIndexed的闭包提供两个参数,顺序固定为:(index, value)。index:Int类型,表示当前元素在流中的序号(0, 1, 2…)。value:T类型,表示上游发出的原始数据。
代码示例:
1
2
3
4
5
6
7
8
9
10
11val flow = flowOf("Kotlin", "Java", "Python")
flow.collectIndexed { index, name ->
println("第 ${index + 1} 个录入的语言是: $name")
}
输出:
------------------------
第 1 个录入的语言是: Kotlin
第 2 个录入的语言是: Java
第 3 个录入的语言是: Python关键特性:直接性与终端性
collectIndexed的设计初衷是为了简化终点的处理逻辑。- 无需解构:不同于
withIndex().collect { (i, v) -> ... },collectIndexed的参数直接就是分离的,不需要从IndexedValue对象中解构,语法更加直截了当。 - 触发执行:作为一个终端操作符,它是 Flow 的「点火开关」。一旦调用,上游的
flow { ... }块才会开始运行。
- 无需解构:不同于
7.3 withIndex 与 collectIndexed 的差异
这是面试或开发中最容易混淆的两个 API。它们都能提供索引,但定位完全不同:
| 特性 | withIndex |
collectIndexed |
|---|---|---|
| 角色定位 | 中间转换操作符 | 终端收集操作符 |
| 参数形式 | 包装成 IndexedValue 对象 |
直接提供两个参数 (index, value) |
| 链式调用 | 后面可以继续接 filter,map等 |
它是终点,后面不能再接任何东西 |
| 灵活性 | 极高:可以在流的中间阶段利用索引过滤 | 一般:索引仅供最终处理使用 |
8、reduce 和 fold 操作符
当我们谈论「把一堆数据变成一个结果」时,reduce 和 fold 是绕不开的双子星。这套操作符体系其实是在解决两个核心问题:“从哪开始?” 以及 “要不要过程?”
8.1 reduce:自动初始化的归约器
reduce 是最纯粹的聚合操作,它不依赖外部初始值,完全利用数据流内部的元素进行合并。
- API 特性:
- 初始值:默认取集合或流的第一个元素作为初始累加器(Accumulator)。
- 计算顺序:从第二个元素开始参与计算。
- 类型限制:累加结果的类型必须与元素类型一致(例如
Int只能聚合成Int)。
- Flow 中的表现:
- 属于终端操作符(Terminal Operator),是挂起函数。
- 调用后会启动 Flow 收集,直到流结束才返回最终的单一结果。
- 代码示例:
1
2// 1+2+3+4+5 = 15
val result = flowOf(1, 2, 3, 4, 5).reduce { acc, value -> acc + value }
8.2 fold:带初始值的灵活聚合
如果你需要一个「底座」,或者希望聚合后的结果与原数据类型不同,fold 是唯一的选择。
- API 特性:
- 初始值:必须显式提供一个
initial参数。 - 计算顺序:从第一个元素开始就与
initial进行运算。 - 类型自由:支持跨类型聚合(例如将
Int流聚合成一个String)。
- 初始值:必须显式提供一个
- Flow 中的表现:
- 同样是终端操作符和挂起函数。
- 即使上游流为空,它也会安全地返回你设定的初始值,而不会像
reduce那样抛出异常。
- 代码示例:
1
2// 初始值 "Result:", 结果为 "Result:123"
val result = flowOf(1, 2, 3).fold("Result:") { acc, value -> acc + value }
8.3 runningReduce:记录过程的累加流
当你不仅想要「终点」,还想要「沿途的风景」时,runningReduce 就派上用场了。
- API 特性:
- 它不是终端操作,而是一个中间转换操作符。
- 它会记录每一次
reduce计算的中间结果。
- Flow 中的表现:
- 非挂起函数,返回的是一个新的
Flow。 - 上游每发射一个新数据,它都会把当前的累加值作为一个新数据发射给下游。
- 非挂起函数,返回的是一个新的
- 代码示例:
1
2// 输入[1,2,3],输出 1,3,6(即 1, 1+2, 3+3)
flowOf(1, 2, 3).runningReduce { acc, value -> acc + value }.collect { println(it) }
8.4 runningFold:状态流的基石
这是响应式编程中最强大的操作符之一。在 Kotlin Flow 中,它有一个更响亮的别名:scan。
- API 特性:
- 结合了
fold的初始值和running的过程记录。 - 返回结果中,第一个元素永远是初始值。
- 结合了
- Flow 中的表现:
- 中间操作符,返回新
Flow。 - 它是处理「状态积累」的核心工具。比如在 Android 中累积用户的点击次数或合并 UI 状态。
- 中间操作符,返回新
- 代码示例:
1
2// 输出: 0, 1, 3, 6
flowOf(1, 2, 3).scan(0) { acc, value -> acc + value }.collect { println(it) }
9、onEach 操作符
onEach 是一个中间操作符。它的本质非常纯粹:数据透传。它接收什么,就向下游发送什么。如果你在 onEach 里打印数据,数据本身不会受到任何影响。
功能定义:每当上游 Flow 发射出一个数据项时,
onEach闭包内的代码块就会被触发执行。非修改型:它是 「非转换型」 的。无论你在
onEach闭包里做了什么(只要不抛出异常),它返回的都是一个与上游完全相同的新 Flow。数据流经它之后,会原封不动地传给下游。API 签名:
1
fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>
代码示例:
1
2
3flowOf(1, 2, 3)
.onEach { println("监听到数据:$it") } // 副作用操作:打印
.collect() // 触发流的收集核心机制:执行时机与数据流向
理解
onEach的关键在于掌握它的执行顺序。当一条数据从上游产生后,它会首先进入
onEach的代码块。在代码块执行完毕后,该数据才会继续流向下一个操作符或最后的收集器(Collector)。- 参数传递:
onEach闭包中包含一个参数,即当前正在流经的数据项。 - 前置响应:它确保在下游收到数据之前,你的监听逻辑已经执行完毕。这在调试时非常有用,你可以清晰地看到数据进入某个处理环节前的原始状态。
- 参数传递:
链式调用:多重监听
由于
onEach遵循「输入什么,输出什么」的原则,它天然支持连续调用。你可以根据业务逻辑的职责,将不同的监听任务拆分到多个onEach中。执行逻辑:数据会像通过流水线岗哨一样,依次触发每一个
onEach。代码示例:
1
2
3flow.onEach { logToConsole(it) } // 第一个岗哨:打日志
.onEach { saveToCache(it) } // 第二个岗哨:存缓存
.collect()
这种写法比将所有逻辑堆在同一个
collect块中更加解耦、易于维护。组合使用:与 filter 的协作
onEach的执行频率取决于它在链条中的位置以及上游操作符的行为。如果你在filter之后接onEach,那么只有符合过滤条件的数据才会触发监听。案例逻辑解析:
1
2
3
4flowOf(1, 2, 3, 4, 5)
.filter { it % 2 == 0 } // 只保留偶数
.onEach { println("通过过滤的数据: $it") }
.collect()- 数据
1:在filter处被截断,不会触发后面的onEach。 - 数据
2:通过filter,触发onEach打印,最后进入collect。
- 数据
适用场景:与 filter 的协作
调试:在流的各个环节插入
println,定位数据是在哪一步变掉或丢掉的。UI 反馈:数据每来一个,就让进度条动一下,但不要影响数据的后续计算。
解耦副作用:将日志记录、埋点等非核心业务逻辑从
collect或map中抽离出来。
10、chunked 操作符
在高性能的流式编程中,我们经常需要处理「批处理」需求。比如,你不想每产生一条日志就发起一次网络请求,也不想每产生一条数据就执行一次数据库事务。这时,chunked 就是一个打包员。它在流水线上放了一个「储物箱」,只有当箱子装满了,或者上游已经没东西发了,它才会把整个箱子封好,发给下游。
核心任务:
chunked是一个中间转换操作符,它的核心任务是改变数据流的“传输粒度”。功能定义:它会将上游发出的单个元素进行缓存,直到积累到指定的数量,然后将这些元素打包成一个
List发射给下游。类型转换:
- 输入:
Flow<T> - 输出:
Flow<List<T>>
- 输入:
API 签名:
1
2
fun <T> Flow<T>.chunked(size: Int): Flow<List<T>>逻辑规范:
chunked唯一接受的参数就是size,它决定了每个「包」里最多容纳多少个元素。- 满载发射:当内部缓存的元素数量达到
size时,立即打包成List发出。 - 余块发射:当上游 Flow 正常结束(Complete)时,如果缓存中还有剩余元素(哪怕只有 1 个,不足
size),也会被封装成最后一个List发出。
- 满载发射:当内部缓存的元素数量达到
代码示例:
1
2
3
4
5
6
7
8flowOf(1, 2, 3, 4, 5)
.chunked(2)
.collect { println(it) }
/* 输出:
[1, 2]
[3, 4]
[5] <-- 即使不足2个,也会独立成块
*/实战价值:为什么我们需要把流变“碎”为“整”?主要基于以下三个维度的考量:
- 减少 IO 开销
在 Android 数据库(如 Room)操作中,批量插入insertAll(list)的效率远高于循环插入insert(item)。chunked(50)可以让原本 100 次的磁盘 IO 降低到 2 次。 - 降低请求频率
对于埋点上报,通过chunked积累一定数量后再发送,可以有效合并 HTTP 请求,节省用户流量并降低服务器压力。 - 提升渲染性能
如果流的发送频率极高(如传感器数据),UI 刷新可能会造成卡顿。通过chunked缓冲一小部分数据后再统一更新 UI 列表,可以显著提高帧率。
- 减少 IO 开销
核心建议:使用
chunked时要平衡 “实时性” 和 “效率”。size越大,处理效率越高,但数据到达下游的延迟也会随之增加。