在并发编程中,最让人头疼的就是多线程共享变量导致的锁竞争与数据不同步。Kotlin 协程借鉴了 Go 语言的哲学:“不要通过共享内存来通信,而要通过通信来共享内存。”
实现这一哲学的核心工具,就是 Channel(通道)。通过 Channel,我们避免了复杂的 synchronized 或 Lock,将并发竞争转换成了简单的流式处理。
1、什么是 Channel
Channel 是「协程间」传递数据的底层协作工具。你可以把 Channel 想象成工厂里的自动化传送带:
- 生产者:负责把生产的零件放上传送带,对应
send方法。 - 消费者:负责从传送带另一头取走零件,对应
receive方法。 - 挂起非阻塞:如果传送带满了,生产者会自动停下(挂起)等空位;如果传送带空了,消费者会自动停下(挂起)等零件。
2、Channel 与 Flow
在 Kotlin 协程的并发模型中,Channel 和 Flow 是处理多数据流的核心工具,但它们的定位和工作模式有着本质的区别。
简单来说:Channel 是「热」的(类似电台),Flow 是「冷」的(类似点播)。 Flow 在很多底层实现上依赖 Channel 进行数据分发。而 Flow 根据应用场景又划分为三类逻辑模型:数据流、事件流、状态流。
| 模型 | 组件 | 用途 | 依赖关系 |
|---|---|---|---|
| 状态流 | StateFlow |
保存并监听应用当前的状态。 | StateFlow 是 SharedFlow 的特化版本。 |
| 事件流 | SharedFlow |
一次性的、通知类的动作。 | SharedFlow 内部由 Flow 实现。 |
| 数据流 | Flow |
基础的数据传输。 | Flow 在很多底层实现上依赖 Channel 进行数据分发。 |
2.1 Channel:协程间的通信管道(热流)
Channel 遵循的是 CSP(通信顺序进程) 并发模型:不要通过共享内存来通信,而要通过通信来共享内存。
核心特性
- 热流:无论有没有接收者,
Channel都在运作。一旦数据被发送,它就在管道里,如果没有人拿走,它可能就会积压或导致发送端挂起。 - 点对点:数据一旦被一个消费者接收,就会从管道中消失。
- 协作性:
send和receive都是挂起函数。
- 热流:无论有没有接收者,
代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14fun main(): Unit = runBlocking {
val scope = CoroutineScope(EmptyCoroutineContext)
val channel = Channel<List<Github.Contributor>>()
scope.launch {
channel.send(github.contributors("square", "retrofit"))
}
scope.launch {
while (isActive) {
val contributors = channel.receive()
println(contributors)
}
}
delay(10000)
}
2.2 Flow:异步数据流(冷流)
Flow 是为了解决异步计算中产生多个值的问题,它的设计非常接近于 Java 的 Stream 或 RxJava。
核心特性
- 冷流:只有当调用「末端操作符」(如
collect)时,flow { ... }块才开执行。这就是「冷流」的本质——没有消费者,就没有生产。 - 惰性执行:每次调用
collect,整个flow { ... }块会重新执行一遍。这与Channel不同,Channel的数据被收走就没了。 - 透明性:
emit负责把数据推出去,collect负责接住,中间可以插入filter或map等操作符进行加工。 - 声明式:拥有丰富的操作符(
map,filter,zip等)。
- 冷流:只有当调用「末端操作符」(如
代码结构
Flow 采用了 「声明式」 的设计思想。
- 生产端(
flow { ... }):定义了数据的产生规则,但它并不关心谁来收,也不关心什么时候收。 - 中转站(
map, filter):定义了数据的加工规则。 - 消费端(
collect):触发整个流水线的开关。
- 生产端(
代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14fun main(): Unit = runBlocking {
val flow = flow {
for (i in 0..2) {
emit(i)
}
}
val scope = CoroutineScope(EmptyCoroutineContext)
scope.launch {
flow.collect {
println(it)
}
}
delay(10000)
}
2.3 进阶:SharedFlow 与 StateFlow(热 Flow)
为了让 Flow 也能拥有类似 Channel 的「热」特性(即可以跨协程广播,不需要每次 collect 都重启),Kotlin 推出了 SharedFlow 和 StateFlow。
SharedFlow(事件流)
- 适合发送「动作」或「指令」(如点击按钮后弹个提示)。
- 支持多个订阅者,可以配置缓存。
StateFlow(状态流)
- 它是 SharedFlow 的特殊变体,专门用于保存状态。
- 类似 LiveData,必须有一个初始值,且只发最新的值给新订阅者。
2.4 选型指南
用 Channel 的场景:
- 任务队列:你需要把任务分发给多个消费者处理。
- 跨协程长连接:比如 A 协程负责下载,B 协程负责解析。
- 单次事件:确保消息必须被处理且只处理一次。
用 Flow 的场景:
- 数据库查询:UI 观察数据库中数据的变化。
- 网络请求链:请求 A 拿到结果后转换成请求 B。
- 连续的数据处理:对一系列异步产生的数据进行
map/filter等操作。
用 StateFlow 的场景:
- MVVM 架构:在 ViewModel 中持有 UI 状态,供
Activity/Fragment观察。
- MVVM 架构:在 ViewModel 中持有 UI 状态,供
总结一句话:如果你需要协同(多个协程一起干活),选 Channel;如果你需要响应式地处理异步数据序列,选 Flow。
3、Channel 工作模式
3.1 async/await 的局限
要理解 Channel,我们得先看它的「前辈」—async/await。
async 和 launch 的区别在于:async 启动一个协程,并返回一个 Deferred<T>,它承诺给你一个一次性的结果。通过 await,你可以挂起等待直到结果出现。这很美好,但它有一个致命的单次通信局限:
- 只能发一次:
async就像一次性快递,送到即结束。 - 无法持续推送:如果你需要处理实时数据(比如股票软件持续刷新股价、传感器数据流),
async/await根本转不起来。你总不能为了获取 100 次更新,去写 100 个async?
1 | fun main(): Unit = runBlocking { |
我们需要的,是一个能够持续发送、持续接收的多值通信模型。这就是 Channel 登场的时刻。
1 | fun main(): Unit = runBlocking { |
3.2 Channel 的双重面孔
在很多人的直觉里,发送(Send)和接收(Receive)应该是两个完全不同的对象。但在 Kotlin 的设计中,它们其实是一体的。
channel() 并不是构造函数,而是一个工厂函数。它返回的是 Channel 接口的具体实现(如 BufferedChannel 或 ConflatedChannel)。这意味着 Channel 接口同时继承了 SendChannel 和 ReceiveChannel,它天生具备「双向通行」的能力,只是在不同场景下被赋予了不同的角色。
1 | public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> { |
3.3 单订阅者与事件瓜分
这是很多开发者在使用 Channel 时常会陷入一个思维陷阱:误以为它是发布/订阅模式下的广播机制。事实恰恰相反,Channel 是典型的 点对点(Unicast) 通信模型。
当多个消费者(协程)同时监听同一个 Channel 时,数据会遵循「抢占式分发”原则:
- Channel(负载均衡):一条数据仅会被其中一个消费者消费,其余消费者无法获取。它适用于任务分发或工作池场景。
- 广播(Pub/Sub):一条数据会被所有订阅者接收(如
SharedFlow或StateFlow)。它适用于状态变更通知或事件总线场景。
在下面的示例中,即便有两个接收协程,它们也不会各自获得完整的数据流,而是 瓜分(负载均衡) 了这 11 条数据。
1 | fun main(): Unit = runBlocking { |
总结建议:
- 如果你需要负载均衡(一个任务只需被处理一次):请放心使用
Channel。 - 如果你需要广播机制(一个事件需要通知所有组件):请弃用
Channel,改用SharedFlow或StateFlow。
4、Channel API
1 | public fun <E> Channel( |
4.1 管道的发送与接收
Channel 的核心是一个并发安全的队列。它有两个关键操作:send() 和 receive()。这两个函数都是挂起函数,这意味着当管道满或空时,协程会交出控制权,而不会阻塞线程。
遍历的艺术:for 循环与迭代
虽然手动调用
receive()可以获取数据,但 Kotlin 提供了更符合直觉的语法:- 挂起式 for 循环:这是推荐的遍历方式。当 Channel 中没有数据时,循环会挂起;当 Channel 调用
close()且缓冲区清空后,循环自动结束。 - 消费函数
consumeEach:一种内联扩展,确保在处理完所有元素或发生异常时关闭 Channel。
- 挂起式 for 循环:这是推荐的遍历方式。当 Channel 中没有数据时,循环会挂起;当 Channel 调用
4.2 缓冲区与容量控制
Channel 的行为高度依赖其「容量(Capacity)」配置。理解这一点是掌握协程背压(Backpressure)的关键。
RENDEZVOUS(约会模式 / 0容量)
这是 Channel 的默认模式。
- 语义:发送者和接收者必须「见一面」才能完成数据传递。
- 行为:如果没有接收者在等待,
send()会立即挂起;如果没有发送者在发送,receive()也会挂起。
预设缓冲区策略
类型 容量值 行为描述 UNLIMITED Int.MAX_VALUE理论上无限大。 send永远不挂起,直到耗尽内存(OOM 风险)。BUFFERED 默认 64 指定固定大小的缓冲区。满了才挂起 send。CONFLATED 1 合并模式。只保留最新元素,新数据会覆盖旧数据, send永远不挂起。基础遍历与 Rendezvous(约会)模式
默认情况下,Channel 的容量为 0。发送和接收必须「手拉手」才能完成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21fun main(): Unit = runBlocking {
val channel = Channel<Int>() // 默认 RENDEZVOUS
launch {
for (x in 1..3) {
println("发送端: 准备发送 $x")
channel.send(x) // 如果没有接收者,这里会挂起
println("发送端: $x 已送达")
}
channel.close() // 记得关闭,否则接收端 for 循环不会结束
}
launch {
// 使用 for 循环优雅地遍历,内部会自动处理挂起和关闭逻辑
for (y in channel) {
println("接收端: 收到 $y")
delay(100) // 模拟耗时操作
}
println("接收端: 管道已关闭,遍历结束")
}
}
4.3 缓冲溢出策略
当缓冲区满了,除了默认的挂起(SUSPEND)外,我们还可以自定义处理逻辑:
- SUSPEND(默认):缓冲区满时,发送端挂起。
- DROP_OLDEST:丢弃队列中最老的一个元素,插入新元素。这在 UI 状态更新(只看最新)或日志采样中非常有用。
- DROP_LATEST:丢弃当前准备发送的新元素,保留缓冲区里的旧数据。
1 | fun main(): Unit = runBlocking { |
4.4 资源防泄露
这是 Channel 构造函数中一个非常实用的参数。
- 场景:假设你发送了一个
FileDescriptor或Bitmap到 Channel,但在被接收前,协程被取消了。 - 解决:在
onUndeliveredElement回调中执行资源的close()或recycle(),确保即使在异常关闭的情况下也不会发生内存泄漏。
1 | class Resource(val id: Int) { |
4.5 关闭与取消
理解 close() 与 cancel() 的语义差异,能避免很多线上 Bug。
| 执行 | 操作 | 影响 isClosedForSend |
影响 isClosedForReceive |
缓冲区数据 |
|---|---|---|---|---|
| 发送端 | close() |
立即为 true |
缓冲区空了后才为 true |
保留,直到被收完 |
| 接收端 | cancel() |
立即为 true |
立即为 true |
立即清空/丢弃 |
发送端关闭
close()- 行为:调用后,
isClosedForSend变为true。此时再send()会抛出异常。 - 接收端影响:非常重要! 接收端仍可以继续获取缓冲区中剩余的数据。只有当数据全部取完后,
isClosedForReceive才变为true。
- 行为:调用后,
接收端取消
cancel()- 行为:这是一种「双向切断」。缓冲区内的数据会被立即丢弃,所有后续的收发操作都会失败,并抛出
CancellationException。
- 行为:这是一种「双向切断」。缓冲区内的数据会被立即丢弃,所有后续的收发操作都会失败,并抛出
4.6 非挂起操作
在某些无法挂起的地方(如 Android 的 OnClickListener),你可以使用 try... 系列函数。它们会立即返回一个 ChannelResult,通过 isSuccess 或 getOrNull() 来判断操作是否成功,而不会挂起协程。
trySend() / tryReceive():瞬时返回ChannelResult,绝不挂起。ChannelResult:通过isSuccess、isFailure字段判断状态,或使用getOrNull()安全取值。
如果你不想在 Channel 关闭时处理抛出的异常,可以使用 receiveCatching()。它会将成功、失败、以及关闭状态(包含异常信息)封装在结果对象中。
1 | fun main(): Unit = runBlocking { |
4.7 总结与建议
- 简单通知:用默认的
RENDEZVOUS。 - 高频 UI 更新:用
CONFLATED(容量 1 +DROP_OLDEST)。 - 资源密集型操作:务必在构造函数中传入
onUndeliveredElement。 - 跨层级异常传递:优先使用
receiveCatching()而不是原生的receive()。
5、produce()
在 Kotlin 协程中,produce() 是一个非常方便的协程构建器,专门用于实现 生产者-消费者模式。它本质上是一个「语法糖」,将协程的启动与 Channel 的创建逻辑打包在了一起。
5.1 什么是 produce()
produce 构建器启动一个协程,并返回一个 ReceiveChannel。在这个协程内部,你可以使用 send() 方法发送数据。当你调用 produce { ... } 时,其大括号内的 this 是 ProducerScope。有趣的是:
ProducerScope实现了SendChannel接口。produce的返回值是ReceiveChannel。- 事实上,
ProducerScope和ReceiveChannel底层指向的是同一个对象。
这就好比 launch 函数中 this (CoroutineScope) 与返回的 Job 对象的关系,它们是同一事物的不同视角。
它最大的价值在于:自动管理生命周期。 当你手动使用 Channel 时,必须记得在生产者结束时调用 channel.close(),否则消费者会一直等待(导致挂起泄漏)。而 produce 会在协程代码块执行完毕或发生异常时,自动帮你关闭 Channel。
1 |
|
5.2 produce() 的本质
produce 构建器启动一个协程,并返回一个 ReceiveChannel,允许你通过它持续获取数据。produce 的本质是一个双重身份的容器:
- 它是一个
CoroutineScope:你可以像在launch中一样启动子协程、抛异常、管理生命周期。 - 它是一个
SendChannel:它提供了一个send()方法,允许你随时向外输出数据。
通过查看 produce 的源码,我们可以发现 block 参数的 receiver 类型是 ProducerScope,
1 |
|
而 ProducerScope 同时继承了 CoroutineScope 和 SendChannel<E> 接口,这意味着 ProducerScope 不仅可以被当成 CoroutineScope 使用,还可以在作用域内直接调用 SendChannel<E> 的 send() 方法。
1 | /** |
6、actor()
produce 是将 Channel 的创建和消息发送进行了合并。相反,actor 是将 Channel 的创建和消息的接收进行了合并。
6.1 什么是 actor()
在 Kotlin 协程中,Actor 是一种基于 消息传递 的并发模型。它的核心思想是:一个特定状态的变量只能由一个专门的协程(即 Actor)来维护。其他协程通过向这个 Actor 发送「邮件-消息」来间接修改状态,从而彻底避免了线程锁(Lock)和原子变量(Atomic)带来的复杂性。
一个 Actor 本质上是 一个状态 + 一个 Channel + 一个协程 的组合:
- 私有状态:封装在 Actor 内部,外部无法直接访问。
- 邮箱(Channel):外界发送消息的入口。
- 顺序处理:Actor 内部会通过循环逐一处理消息,天然不存在并发冲突。
actor 构建器启动一个协程,并返回一个 SendChannel,允许你通过它持续发送数据。
1 |
|
6.2 actor 的优势与局限
优势
- 无锁并发:通过消息队列将并行转为串行,性能在高竞争下通常优于锁。
- 状态隔离:状态只存在于 Actor 协程内部,逻辑更加清晰,易于调试。
- 天然背压:由于 Actor 底层是 Channel,可以配置缓冲区(Capacity),防止消息挤压。
局限与替代方案
目前官方更倾向于让开发者根据需求自行组合:
- 单纯的状态共享:推荐使用
Mutex(互斥锁)配合普通变量。 - 响应式状态传递:推荐使用
MutableStateFlow。 - 自定义 Actor:可以手动定义一个
Channel并在协程中while(isActive)循环处理。
- 单纯的状态共享:推荐使用
6.3 actor 的容量配置
就像普通的 Channel 一样,actor 也可以配置缓冲区:
- RENDEZVOUS (0):默认值,发送方会挂起直到 Actor 开始处理这条消息。
- UNLIMITED:发送方不挂起,消息无限堆积(慎用)。
- CONFLATED:只处理最新的一条消息,旧的还没来得及处理的消息会被覆盖。
actor 是处理复杂内部状态的最佳实践。当你发现多个协程在争抢同一个变量,且操作逻辑较为复杂时,与其在代码里写满 synchronized 或 Mutex,不如试着把这个变量丢进一个 actor 里,让它通过消息安静地排队处理。