在 Kotlin 的世界里,处理数据流就像是装修房子。虽然工具很多(Channel、Flow、Sequence),但选对工具直接决定了你是「游刃有余」还是「徒手拆墙」。很多开发者在面对这些 API 时常常感到困惑:为什么有了 Sequence,还需要 Flow?Channel 到底还要不要用?
本文将针对这些疑惑来拆解一下 Kotlin 数据流 API 的设计哲学,看看 Flow 是如何成为现代 Kotlin 开发中的「终极方案」。
1、从 Channel 到 Flow
曾经,Channel 是 Kotlin 协程中跨协程通信的「当家花旦」。但如果你还在业务开发中直接使用 Channel 来做数据流转,那可能有点「复古」了。
现在的设计原则非常明确:Channel 是「内部零件」,Flow 是「成品引擎」。
- Channel 的底层化:它依然是协程间通信的基石,但在架构设计上,它已被下沉为
FlowAPI 的底层实现。 - Flow 的上层抽象:Flow 提供了更完整、更安全、体验更好的封装。除非你在编写极度底层的并发组件,否则请把
Channel放进工具箱底部,直接使用Flow。
2、数据流与事件流
Flow API 体系并非一成不变,它根据应用场景进化出了三种核心形态:
| 模型 | 组件 | 核心特点 | 适用场景 |
|---|---|---|---|
| 状态流 | StateFlow |
状态订阅,持有最新值 | UI 状态管理,高频订阅 |
| 事件流 | SharedFlow |
一对多,跨协程广播 | 事件通知(如点击、弹窗) |
| 数据流 | Flow |
顺序性,单一订阅者 | 持续的、按顺序的数据处理 |
本质区别在于:Flow 关注的是数据序列(比如从数据库读取一行行记录),而 SharedFlow 关注的是事件广播(比如“用户点击了按钮”这个事件需要被多个模块同时获知)。
3、Sequence
Sequence 提供了一种类似队列的按序访问 API,但两者有着本质区别:队列是一种预先存储所有数据的数据结构,而 Sequence 是一种按需生成数据的机制。它只定义数据的生成规则而不保存数据本身,采用惰性求值的策略,仅在实际访问时才动态计算并产出下一条数据,有效避免了全量数据预加载的开销。
3.1 Sequence 的创建与遍历
在 Kotlin 中创建 Sequence 非常灵活,通常有以下四种方式:
从现有集合转换:使用
.asSequence()1
2
3
4
5
6val list = listOf(1, 2, 3, 4)
val sequence = list.asSequence()
for (element in sequence) {
println(element)
}直接使用元素创建:使用
sequenceOf()1
2
3
4
5val sequence = sequenceOf("a", "b", "c")
for (element in sequence) {
println(element)
}基于规则动态生成:使用
generateSequence()(非常适合生成无限序列)1
2// 生成一个 1, 2, 4, 8, 16... 的无限序列
val powersOfTwo = generateSequence(1) { it * 2 }使用组装器协程(Builder):使用
sequence { ... }结合yield()挂起函数按需“发射”数据。1
2
3
4
5
6
7
8
9
10val fibonacci = sequence {
var a = 0
var b = 1
while (true) {
yield(a) // 产出当前值,然后暂停,等待下一次访问
val next = a + b
a = b
b = next
}
}
3.2 List 的创建与遍历
直接使用元素创建:使用
listOf()1
2
3
4
5val list = listOf("a", "b", "c")
for (element in list) {
println(element)
}
3.3 中间操作与末端操作
Sequence 的操作分为两类,这是理解其「惰性求值」的关键:
- 中间操作:比如
map、filter、take。它们不会触发任何实际的计算,只是将转换规则记录下来,并返回一个新的 Sequence。 - 末端操作:比如
toList()、first()、sum()、count()。只有调用了末端操作,Sequence 才会真正开始按设定的规则去获取和计算数据。
让我们看两段代码在一个经典场景下的代码对比,直观感受它们在底层处理逻辑上的截然不同。场景:找到第一个大于 2 的偶数的平方。
普通集合的做法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14val list = listOf(1, 2, 3, 4)
val result = list
.filter { println("List Filter: $it"); it % 2 == 0 } // 遍历所有元素,产生新 List [2, 4]
.map { println("List Map: $it"); it * it } // 遍历新 List,产生新 List [4, 16]
.first { println("List First: $it"); it > 2 } // 遍历找到第一个符合条件的
// 输出结果:
// List Filter: 1
// List Filter: 2
// List Filter: 3
// List Filter: 4
// List Map: 2
// List Map: 4
// List First: 4缺点:每次中间操作都会创建一个新的临时集合,并且强迫所有元素走完当前步骤。
Sequence 的做法:
1
2
3
4
5
6
7
8
9
10val sequenceResult = list.asSequence()
.filter { println("Seq Filter: $it"); it % 2 == 0 }
.map { println("Seq Map: $it"); it * it }
.first { println("Seq First: $it"); it > 2 }
// 输出结果:
// Seq Filter: 1 (不符合)
// Seq Filter: 2 (符合,交给下一步)
// Seq Map: 2 (变成 4,交给下一步)
// Seq First: 4 (4 > 2 符合!直接返回,停止整个计算)优势:“用一条,算一条”。元素 1 走完发现不匹配被丢弃;元素 2 走完整个链条发现正是我们需要的结果,计算立即终止(短路)。元素 3 和 4 根本不需要参与计算!没有产生任何中间集合。
3.4 Sequence 的应用场景
虽然 Sequence 看上去很完美,但它本身在包装和迭代器的创建上存在微小的开销,因此并不适用于所有场景。
| 场景 | 推荐使用 | 原因 |
|---|---|---|
小数据量且只有1-2步简单操作 (如 list.map{}.filter{}) |
普通集合 (Iterable) | Sequence 的创建开销会大于它省下的性能,直接用 List 更快。 |
| 大数据量(成千上万条记录) | Sequence | 避免创建大量的中间临时集合,极大地节省内存 (OOM 救星)。 |
| 多步链式操作(3步或以上) | Sequence | 减少全量遍历的次数,避免内存抖动。 |
需要用到短路操作(first(), take(), any()) |
Sequence | 可以提前终止计算,跳过后续不需要处理的元素。 |
| 生成无限长度的数据流 | Sequence | 普通集合无法装下无限数据,必须使用惰性的 Sequence。 |
3.5 Sequence 不支持挂起
如果你深入探究过 Sequence,会发现它是一个非常高效的惰性生产工具。但它有一个致命的「短板」:不支持挂起函数(Suspend Function)。
为什么会有这个限制? 这一切都源于一个名为 @RestrictsSuspension 的注解。当你在 sequence { ... } 作用域内写代码时,编译器会施加严苛的限制:你只能调用 yield() 或 yieldAll()。如果你胆敢在里面加一个 delay() 或者发起网络请求,编译器会立刻给你一个“红牌罚下”。
1 |
|
设计初衷:@RestrictsSuspension 的存在是为了隔离作用域。它强制确保 Sequence 的内部实现与外部协程上下文完全脱钩,防止因生命周期混淆导致的结构化并发灾难。简单来说,Sequence 是 CPU 绑定的,它不希望被复杂的协程挂起逻辑污染。
4、Flow
由于 Sequence 的迭代是同步的,其构建器内不允许调用普通的挂起函数。为了解决异步场景下的数据流处理问题,Flow 应运而生。Flow 本质上是基于协程的异步 Sequence,它打破了同步执行的限制,并具备以下关键能力:
原生挂起支持:你可以自由地在
flow { ... }块中调用任意的suspend函数,例如从网络拉取数据:1
2
3
4
5
6
7flow {
while (true) {
val data = api.fetchData() // 完美支持挂起!
emit(data)
delay(1000)
}
}API 的映射:它们的设计语言是一致的,上手几乎没有门槛:
Sequence.yield()→Flow.emit()Sequence.yieldAll()→Flow.emitAll()
无论是 Sequence 还是 Flow,它们都共享「惰性」基因。这意味着只有在终端操作(如 collect)触发时,代码才会执行。这种按需生产的机制,在处理无限数据流(如轮询)或长耗时计算时,能极大降低首条数据的延迟。
1 | fun main(): Unit = runBlocking { |
Flow 不仅仅是一个 API,它是 Kotlin 协程生态对于异步数据流处理的标准答案。它继承了 Sequence 的惰性思维,借用了 Channel 的通信能力,并完美解决了异步挂起的痛点。
4.1 Flow 的核心原理
要理解 Flow,首先要打破传统的「数据结构缓存数据」的思维定势。Flow 的核心原理可以概括为代码迁移模型。
当我们调用 collect 收集数据时,实质上是将 flow {} 代码块内的生产逻辑,整体「迁移」到了 collect 调用的地方执行,而不是预先在某个后台线程运行并把数据缓存起来。
理解
emit的语义替换我们可以把
emit(value)的执行,看作是被直接替换为了collect代码块中的具体处理逻辑。例如,下面的组合:1
2
3
4
5
6
7
8
9
10
11// 生产者
val flow = flow {
emit(1)
delay(100)
emit(2)
}
// 消费者
flow.collect {
println("$it")
}在本质上完全等价于依次执行以下代码:
1
2
3println("1")
delay(100)
println("2")多次
collect的独立性正因为上述的「代码迁移」特性,每次调用
collect都会重新触发一次完整、独立的数据生产流程。 不同的collect调用之间彼此隔离,互不干扰。如果你对同一个 Flow 调用两次collect,第二次依然会从头开始获取全部数据。
4.2 Flow 与 Channel 的区别
在 Kotlin 的异步数据流框架中,经常会把 Flow 和 Channel 拿来做对比。它们最大的区别就在于冷(Cold)与热(Hot)。
- Channel 是 “热” 的(Hot)
- 生产独立于消费:Channel 的数据生产是独立的。只要调用了
send(),它就会立即生产数据,根本不管当前有没有消费者在调用receive()。 - 持续运行:它的生产流程是统一且持续的,状态不随消费者的变化而重置。
- 生产独立于消费:Channel 的数据生产是独立的。只要调用了
- Flow 是 “冷” 的(Cold)
- 生产绑定于消费:
flow {}仅仅是定义了一套「生产规则」。只有当终端操作符(如collect)被调用时,真正的生产过程才会启动。 - 无共享状态:每次
collect都会触发一次全新、独立的生产流程(包含内部的所有延迟、计算等),没有共享状态。
- 生产绑定于消费:
- “热” 和 “冷” 的记忆要点
- 热 = 无论有没有人收集,都在自顾自地生产。
- 冷 = 只有当有人开始收集时,才按需启动生产。
4.3 Flow 的挂起特性与协程约束
Flow 天然融入了 Kotlin 的协程体系,理解其挂起特性非常重要:
collect是挂起函数:调用collect必须在协程作用域内(如launch或viewModelScope.launch)进行。它的执行会挂起当前协程,直到数据流完成,但绝不会阻塞底层线程。flow创建过程非挂起:flow {}构造函数本身并不是挂起函数,你可以在任何普通的同步代码中调用它。因为它只是「封装」了逻辑,并没有「执行」逻辑。flow代码块内允许挂起:flow {}内部的block参数被声明为suspend类型。这意味着你可以在里面自由地调用delay()、网络请求等其他挂起函数(且没有RestrictsSuspension注解的严格限制)。结构化并发:Flow 遵循协程的结构化并发原则。如果收集 Flow 的协程被取消,Flow 的执行也会自动且安全地停止,防止内存泄漏。
天然的背压支持:在传统响应式框架中,如果生产者发送数据的速度快于消费者处理的速度,就需要复杂的背压策略。Flow 通过挂起函数天然解决了这个问题:当消费者正在处理数据时,它可以“挂起”生产者,直到消费者准备好接收下一个数据。
4.4 Flow 的应用场景
Kotlin Flow 在现代 Android 开发和 Kotlin 后端开发中有着广泛的应用,主要用于处理随时间推移而产生的一系列异步事件或数据。
UI 状态管理(MVVM 架构)
在 Android 开发中,
StateFlow(Flow 的一种热流变体)已经成为替代LiveData的标准方案。它可以用来持有 UI 状态,并在状态发生变化时通知 UI 层进行重绘。- 优势:支持协程的所有操作符,独立于 Android 框架(可在纯 Kotlin 模块中使用),并且具有初始默认值。
数据库观察(如 Room 数据库)
Android 的 Room 数据库原生支持 Flow。当你需要实时观察数据库中某个表或某行数据的变化时,可以返回一个 Flow。
- 优势:只要数据库中的底层数据发生更改,Flow 就会自动发射最新的数据集。这对于实现“单一数据源 (Single Source of Truth)”模式非常完美。
网络请求与轮询
当你需要定期向服务器拉取数据(轮询),或者需要按顺序执行一系列网络请求并在中途处理数据时,Flow 是极佳的选择。
- 优势:结合
delay()函数,可以非常轻松地实现重试机制(retry操作符)或定时轮询。
- 优势:结合
复杂的数据流合并与转换
如果你的应用需要同时监听多个数据源(例如,同时监听本地数据库和网络数据),并根据它们的组合结果来决定下一步动作。
- 优势:使用
combine、zip或merge等操作符,可以优雅地将多个异步数据流合并为一个统一的输出,代码逻辑清晰且易于维护。
- 优势:使用
传感器数据或位置更新
对于持续不断产生数据的场景,例如 GPS 位置更新、设备传感器(陀螺仪、加速度计)数据采集。
- 优势:可以使用
callbackFlow将传统的基于回调的 API 转换为现代的 Flow API,并在数据发送过快时使用conflate或buffer操作符来丢弃旧数据或缓冲新数据。
- 优势:可以使用
4.5 Flow 的应用实例
Flow 最核心的适用场景,是当业务需要将数据生产逻辑(如定时轮询、网络请求)与数据消费逻辑(如 UI 更新、日志记录)彻底分离时。 假设我们需要在界面上实时显示天气更新:
生产端(数据模块):专注数据获取逻辑。利用无限循环和延迟,构建一个持续生产数据的源头。
1
2
3
4
5
6fun getWeatherFlow(): Flow<String> = flow {
while(true) {
emit(fetchWeatherFromServer()) // 获取数据
delay(60_000) // 每分钟拉取一次
}
}消费端(UI 模块):仅需持有
Flow<String>,通过collect持续接收并更新界面。它完全不需要关心数据是怎么拉取的、多久拉取一次。1
2
3
4
5
6// 在 UI 层收集
lifecycleScope.launch {
getWeatherFlow().collect { weather ->
updateUI(weather)
}
}
解耦优势:
- 职责清晰:生产者只管“数据怎么来”,消费者只管“数据怎么用”。
- 高复用性:这个
weatherFlow可以被日志模块、UI 模块、缓存模块同时collect,各自独立执行自己的抓取和消费流程。
在本文的天气示例中,那个包含 while(true) 的无限流,本质上就是一个天气更新事件流,而 collect 就是事件订阅操作。由于 collect 是一个会一直执行直到数据流结束的挂起函数,如果面对的是一个无限流(或者长耗时流),它会一直挂起当前的协程,导致 collect 后面的代码无法执行。标准实践方案:将 collect 置于独立的协程中启动。
1 | lifecycleScope.launch { |
这种模式是协程生态中的标准实践,不仅逻辑清晰,在处理取消操作时也比传统的回调式订阅更加安全可控。
4.6 Flow 的创建
在现代 Android 架构(如基于 MVVM 和 Jetpack Compose 的应用)中,Kotlin Flow 已经成为处理异步数据流的核心组件。相比于基础的挂起函数,Flow 提供了更强大的数据流式处理能力。然而,Flow 提供了种类繁多的创建 API,它们在底层机制、数据共享策略以及并发处理上有着本质的区别。
4.6.1 基础创建 API
对于简单的数据源,Kotlin 提供了最直接的便捷封装。
flowOf:静态数据流flowOf是最基础的创建方式,接收一个或多个可变参数,并返回一个依次发送这些值的 Flow。- 用法示例:
flowOf(1, 2, 3) - 底层原理:它本质上是基础
flow { ... }构建器的便捷封装。内部实现会对传入的每个元素依次调用emit(),等价于flow { elements.forEach { emit(it) } }。
- 用法示例:
asFlow:集合与 Sequence 桥接asFlow是一个强大的扩展函数,作用于Iterable(如 List、Set)或Sequence等接口类型。- 对于普通集合(如
list.asFlow()),它会将列表元素依次发送。 - 对于惰性序列(如
sequence.asFlow()),它将同步流转换为挂起的 Flow,同时完美保留了惰性求值的特性。
- 对于普通集合(如
4.6.2 Channel 衍生的 Flow
当我们的数据源是基于 Channel 时,情况就变得复杂了。这类 Flow 均以现有 Channel 为数据源,创建的是热 Flow。这就意味着,多次 collect 会瓜分 Channel 中已发送的数据,而非像冷流那样为每个收集器独立重放完整数据。
针对 Channel 的消费,Kotlin 提供了两种截然不同的 API 来应对不同的业务约束:
receiveAsFlow:允许多个消费者瓜分数据- 特性:创建一个可以被多次
collect的 Flow。 - 行为:每次
collect时,只会消费 Channel 中未被其他 collector 消费过的剩余数据。它适合那种「任务分发」的场景,谁抢到谁处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21fun main(): Unit = runBlocking {
val channel = Channel<Int>()
val flow = channel.receiveAsFlow()
val scope = CoroutineScope(EmptyCoroutineContext)
scope.launch {
flow.collect {
println("A - $it")
}
}
scope.launch {
flow.collect {
println("B - $it")
}
}
for (i in 0 until 10) {
channel.send(i)
}
delay(10000)
}- 特性:创建一个可以被多次
consumeAsFlow:严格的单次消费约束- 特性:创建一个单次消费的 Flow。
- 行为:首次
collect正常执行并消费 Channel 数据。但如果尝试第二次调用collect,系统会直接抛出IllegalStateException异常,明确提示 “already consumed”。 - 设计意图:这是一种「运行时防护」机制。它的存在是为了提供安全约束,避免开发者误用 Channel 导致关键事件(如单次 UI 导航事件)被多个订阅者意外分流,导致出现「两个 collect 各漏一半事件」的诡异 Bug。
4.6.3 跨协程生产与回调桥接
在标准的 flow { ... } 构建器中,Kotlin 协程有着严格的上下文保护机制:禁止在内部启动其他协程来跨协程调用 emit()。违背此规则会抛出 Emission from another coroutine is detected 异常。比如,运行以下代码就会抛出对应的异常:
1 | fun main(): Unit = runBlocking { |
为了突破这一限制,我们需要引入基于 Channel 构建的冷流 API:channelFlow 和 callbackFlow。
channelFlow:并发与跨协程的安全沙箱核心特性:虽然底层使用了 Channel,但它依然符合冷 Flow 语义。因为每次调用
collect时,它都会在内部触发一次独立的 Channel 创建与生产流程,各个 collector 之间的数据是完全隔离的。生产逻辑:
- 因为底层是 Channel,发送数据使用的是
send()而不是emit()。 - 支持跨协程:允许在
channelFlow代码块内自由启动子协程(例如launch并发获取多路网络数据后合并),天然支持高并发的流生产。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24fun main(): Unit = runBlocking {
val flow = channelFlow {
launch {
delay(2000)
send(2)
}
delay(1000)
send(1)
}
val scope = CoroutineScope(EmptyCoroutineContext)
scope.launch {
flow.collect {
println("A - $it")
}
}
scope.launch {
flow.collect {
println("B - $it")
}
}
delay(10000)
}- 因为底层是 Channel,发送数据使用的是
回调桥接:
channelFlow可以将第三方基于回调的异步 API 转换为响应式 Flow。转换过程比较简单,需要在代码块末尾手动调用awaitClose()挂起当前协程,并在数据发送完毕后调用close()关闭 Channel。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29fun main(): Unit = runBlocking {
val flow = channelFlow {
val call = github.contributorsCall("square", "retrofit")
call.enqueue(object : Callback<List<Contributor>> {
override fun onResponse(
call: Call<List<Contributor>>,
response: Response<List<Contributor>>
) {
val body = response.body()
trySend(body)
close()
}
override fun onFailure(call: Call<List<Contributor>?>, t: Throwable) {
cancel(CancellationException(t))
}
})
awaitClose()
}
val scope = CoroutineScope(EmptyCoroutineContext)
scope.launch {
flow.collect {
println("$it")
}
}
delay(10000)
}
callbackFlow:专为回调 API 打造的桥梁现实开发中,当我们去对接第三方 API 的时候,更适合我们的是
callbackFlow而非channelFlow。callbackFlow的内部实现完全基于channelFlow,它是我们将第三方基于回调的异步 API 转换为响应式 Flow 的首选利器。强制生命周期管理:对接回调时最怕的就是内存泄漏。
callbackFlow强制要求在代码块末尾调用awaitClose()挂起当前协程。如果忘记调用,甚至会导致运行时异常,从而避免遗漏关闭 Channel 或解绑监听器。一旦外部取消了流的收集,awaitClose内部的代码块就会执行,方便我们注销回调。定位对比:
- 单次异步结果:使用
suspendCancellableCoroutine更轻量。 - 连续的数据流(如传感器数据、流式网络响应):使用
callbackFlow形成持续的数据流管道。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31fun main(): Unit = runBlocking {
val flow = fetchContributorsFlow()
val scope = CoroutineScope(EmptyCoroutineContext)
scope.launch {
flow.collect {
println("$it")
}
}
delay(10000)
}
fun fetchContributorsFlow(): Flow<List<Contributor>> = callbackFlow {
val call = github.contributorsCall("square", "retrofit")
call.enqueue(object : Callback<List<Contributor>> {
override fun onResponse(
call: Call<List<Contributor>>,
response: Response<List<Contributor>>
) {
val body = response.body()
trySend(body!!)
close()
}
override fun onFailure(call: Call<List<Contributor>?>, t: Throwable) {
cancel(CancellationException(t))
}
})
awaitClose {
// 清理工作,在 Flow 被取消时执行
}
}下面再来一个模拟的温度传感器的持续的数据流管道例子。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22fun main(): Unit = runBlocking {
val flow = temperatureSensorFlow()
flow.collect {
println("当前温度:$it")
}
}
fun temperatureSensorFlow(): Flow<Int> = callbackFlow {
// 模拟一个传感器,每隔一段时间发送一次温度读数
val sensorJob = launch(Dispatchers.Default) {
while (isActive) {
val randomTemperature = (0..50).random()
send(randomTemperature)
delay((1000..2000).random().toLong()) // 随机延迟
}
}
awaitClose {
// 清理工作,在 Flow 被取消时执行
sensorJob.cancel()
}
}- 单次异步结果:使用
4.6.4 Flow 严禁跨协程发射
在 「跨协程生产与回调桥接」章节中,我们提到 flow { ... } 构建器中,禁止在内部启动其他协程来跨协程调用 emit(),否则会抛出如下异常:
1 | Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated: |
简单翻译一下:Flow 严禁在一个协程中构建流,却在另一个协程中发送(emit)数据。 很多开发者会觉得这是框架的「傲慢」,但在深入探究其底层逻辑后,你会发现这是一个极具深意、为了保障代码可预测性而做的架构决策。
核心模型:Collect 的「代码内联」逻辑
要理解为什么要严禁跨协程
emit,我们必须建立一个代码内联的思维模型。在 Kotlin Flow 的设计中,flow { ... }里的代码并不是独立运行的后台任务,它在逻辑上更像是被「注入」到了collect的调用方中。我们可以把 Flow 的收集过程想象成下面这个过程:
当你在
collect里写下逻辑时,你直觉上认为这段逻辑运行在collect所在的协程中。Flow 的设计正是为了捍卫这种直觉。为什么直觉很重要?
假设你在 Android 的主线程(Main 协程)中调用了
collect来更新界面。如果 Flow 允许在上游(例如flow { ... }的内部)通过launch一个 IO 协程来emit数据,那么collect内部的处理逻辑就会被迫在 IO 线程中执行。后果就是:你的 UI 更新操作会随机抛出
CalledFromWrongThreadException,而你却无从排查,因为你根本不知道数据是从哪个「隐蔽」的协程发出来的。跨协程发射的风险:一场不可控的「线程飘移」
来看一段违反 Flow 约束的伪代码:
1
2
3
4
5
6
7
8
9
10
11
12
13// 注意:这是错误的示范!
val flow = flow {
launch(Dispatchers.IO) {
emit("数据") // 试图在 IO 线程发送数据
}
}
// 在 Main 协程中收集
scope.launch(Dispatchers.Main) {
flow.collect { value ->
textView.text = value // 如果允许上述行为,这行代码将随机在 IO 线程崩溃
}
}如果 Flow 允许这种行为,
collect内部的执行环境将变得完全不可预测。开发者被迫去阅读 Flow 上游所有的源码,确认它是如何生产数据的,这直接导致了逻辑封装的失效。Flow 的设计者为了避免这种「隐式并发陷阱」,做了一个果断的决定:直接通过运行时异常,强制所有的数据生产必须遵循协程上下文一致性。
合法逃生舱:ChannelFlow 的设计哲学
当然,现实业务是复杂的,我们需要并发生产数据的能力。这时候,
channelFlow闪亮登场。为什么
channelFlow可以跨协程发送数据?因为它不是在「发送 Flow」,而是在使用 Channel。- Channel 的本质:它是专门为协程间通信(CSP 模型)设计的原语。
- 解耦:在
channelFlow中,发送方(send)和接收方(collect)通过 Channel 实现了逻辑上的隔离。send操作仅仅是将数据放入管道,而collect依然在它自己的协程里「拉取」数据。
这是一种物理层面的解耦,与普通
flow { ... }的「代码内联」逻辑有着本质的区别。总结:一种保护,而非限制
Kotlin Flow 对跨协程
emit的限制,实际上是给了开发者一个明确的API 契约:普通 Flow:保持「单协程」语义,代码简洁、直觉一致、UI 操作安全。
channelFlow/callbackFlow:当且仅当你需要复杂的并发生产模型时,才去使用它们。
与其说这是限制,不如说这是框架在提醒你:如果你的逻辑需要跨协程交互,请明确选择更健壮的工具,而不是在普通流里玩火。
4.6.5 冷流与热流的本质辨析
很多开发者在学习 Flow 时容易被「冷热」的概念绕晕。实际上,判断冷热的定义基准在于:数据生产与消费是否解耦。
纯热(如原生的 Channel 或 SharedFlow):生产与消费完全独立。不管有没有人收,生产者都在发数据。
纯冷(普通
flow { ... }):按需生产。每次collect都会从头触发一次全新的生产流程。缝合怪(
receiveAsFlow/consumeAsFlow):上游数据源是热的(Channel),但被包装成了 Flow。整体对外表现为“热”,因为多个 collector 会竞争/瓜分同一份数据。伪装者(
channelFlow):虽然名字里带 Channel,也用了send(),但因为每次collect都会新建底层的 Channel,生产和消费依然是绑定的,因此它本质上依然是冷流。
4.7 Flow 的收集
除了基础的 collect,Flow 体系还提供了一整套强大的配套 API,用于精准控制执行流和调度机制:
flowOn(全局协程定制):
改变上游(从 Flow 创建到应用flowOn之前的操作符)的执行上下文。关键点:它绝对不会改变collect所在下游的执行协程。launchIn(便捷流启动):
等价于scope.launch { flow.collect() }。它的价值在于消除嵌套,将协程的作用域绑定与数据流的收集整合为一句链式调用,使得代码结构更加扁平清晰。onEach(预处理管道):
在数据最终交付给collect之前执行副作用(如打印日志、更新 Loading 状态)。通常与launchIn搭配,形成极其优雅的数据处理流水线。collectIndexed(带索引收集):
类似于集合操作,为每一条触达收集器的数据自动提供一个递增索引(从 0 开始),非常适合需要根据发射次序做差异化 UI 渲染的场景。collectLatest(最新值收集):
结合了mapLatest与buffer的思想。当新数据快速到来时,如果上一条数据的处理代码(挂起逻辑)尚未执行完毕,它会自动取消旧数据的处理任务,确保系统资源永远只服务于最新发出的状态。