1、协作与等待机制的演进之路
在并发编程中,协作永远是一个核心命题。当我们需要多个任务按照特定的先后顺序执行,或者主任务必须等待多个子任务全部完成后才能继续时,传统的 Java 线程模型往往需要借助复杂的同步工具。随着 Kotlin 协程的兴起,这种「停住—等待—恢复」的逻辑发生了范式转移。
1.1 为什么我们需要协作与等待
协程天生具有并行性。无论是在同一个作用域下启动的同级协程,还是具有父子关系的协程,它们默认都是并发执行的。但在实际业务场景中,我们经常遇到依赖关系:
- 场景 A:必须等待登录接口(协程1)返回 Token 后,才能请求用户信息(协程2)。
- 场景 B:主界面渲染前,必须等待三个独立的初始化数据接口全部加载完成。
这种「先后依赖」或「一等多」的需求,正是我们要讨论的协作机制。
1.2 Java 的倒计时门闩:CountDownLatch
在 Java 并发包中,CountDownLatch 是处理「一等多」场景的利器。
- 工作原理:它像一个带有计数器的「门闩」。初始化时设定一个
count值,调用await()的线程会被阻塞,直到其他线程调用countDown()将计数器减至零,门闩才会打开。 - 核心痛点:虽然功能强大,但在线程模型下,它缺乏结构化生命周期的支持。如果某个线程意外崩溃而没有调用
countDown(),等待的线程可能会陷入永久阻塞,且处理取消逻辑非常繁琐。 - 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28fun main() = runBlocking<Unit> {
// countdown 译为倒计时,latch 是门闩、插销,组合起来就是用于倒计时的插销
val countDownLatch = CountDownLatch(2)
thread {
// await() 会在 CountDownLatch 内的 count 减到 0 时结束等待
countDownLatch.await()
println("递减计数被返回,计数: ${countDownLatch.count}")
}
thread {
sleep(1000)
println("递减计数调用前,计数: ${countDownLatch.count}")
// countDown() 会调用原子操作让 CountDownLatch 内的 count 减 1
countDownLatch.countDown()
}
thread {
sleep(2000)
println("递减计数调用前,计数: ${countDownLatch.count}")
countDownLatch.countDown()
}
}
输出
--------------------
递减计数调用前,计数: 2
递减计数调用前,计数: 1
递减计数被返回,计数: 0 - 逻辑拆解:这段代码主要有三个角色(线程)在协作:
初始化计数器:
val countDownLatch = CountDownLatch(2)这里设置了一个初始值为 2 的计数器。意味着需要调用两次 countDown() 才能“打开门闩”。
等待者(线程 1):
使用
countDownLatch.await()。这个线程运行后会立即进入阻塞状态。它在心里默念:“除非计数器变成 0,否则我绝不往下走。”执行者(线程 2 和 线程 3):
这两个线程分别模拟耗时操作(
sleep),然后调用countDown()。- 线程 2 在 1 秒后把计数从 2 减到 1。
- 线程 3 在 2 秒后把计数从 1 减到 0。
1.3 协程中的等价实现方案
在 Kotlin 协程中,我们不需要总是搬出 CountDownLatch 这样的重型武器。协程提供了多种更符合直觉的等价实现:
1.3.1 Job.join():最直接的等待
对于简单的先后依赖,join() 是最常用的方式。调用 job.join() 会挂起当前协程,直到目标协程执行结束。
- 优势:在协程中,
join()支持结构化取消。如果等待的协程被取消,join()会感知并妥善处理,这比线程级的Thread.join()安全得多。 - 实现“一等多”:你可以简单地在一个主协程里,依次对多个子协程的 Job 调用
join()。 - 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26fun main() = runBlocking<Unit> {
// 两个前置任务
val preJob1 = launch {
delay(1000)
println("Job1 结束")
}
val preJob2 = launch {
delay(2000)
println("Job2 结束")
}
// 此协程需要等待两个协程执行之后再运行自己的内容
launch {
preJob1.join()
preJob2.join()
// 等待完前置任务,再做自己的事...
println("Main 开始")
}
}
输出
--------------------
Job1 结束
Job2 结束
Main 开始
1.3.2 async/await:带结果的协作
如果你不仅要等待协程结束,还需要拿到它的返回值,那么 async 启动协程并配合 await() 是最佳选择。
- 模式:
val data = async { ... }.await()。 - 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28fun main() = runBlocking<Unit> {
// 两个前置任务
val deferred1 = async {
delay(1000)
println("deferred1 结束")
"结果1"
}
val deferred2 = async {
delay(2000)
println("deferred2 结束")
"结果2"
}
// 此协程需要等待两个协程执行之后再运行自己的内容
launch {
val result1 = deferred1.await()
val result2 = deferred2.await()
// 等待完前置任务,再做自己的事...
println("获取[$result1-$result2],Main 开始")
}
}
输出
--------------------
deferred1 结束
deferred2 结束
获取[结果1-结果2],Main 开始
1.3.3 Channel:最接近语义的模拟
如果你需要实现「不关心具体是哪个协程完成,只需等待固定次数的信号」的逻辑,Channel 是最自然的替代品。
- 构建:创建一个容量等于等待次数的
Channel。 - 计数:任务完成后调用
send()发送信号。 - 等待:主协程通过一个简单的循环调用
receive()来消耗计数。 - 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29private fun main() = runBlocking<Unit> {
// 指定 Channel 的容量为 2
val channel = Channel<Unit>(2)
// 由于要等待两次发送数据才能继续执行后续代码,因此要 repeat(2) 接收
launch {
repeat(2) { i ->
println("执行第 ${i + 1} 次 receive()")
channel.receive()
}
println("receive() 执行完毕,Main 开始")
}
launch {
delay(1000)
channel.send(Unit)
}
launch {
delay(2000)
channel.send(Unit)
}
}
输出
--------------------
执行第 1 次 receive()
执行第 2 次 receive()
receive() 执行完毕,Main 开始
2、select:从“全量等待”到“竞速夺魁”
在上一章节中,我们讨论了如何使用 join 或 await 来等待协程任务的完成。那种模式可以被称为「全量等待」:你必须等到所有目标任务都结束,才能继续下一步。
但在真实的业务场景中,我们经常遇到「竞速」需求:比如同时请求两个服务器镜像,哪个先返回就用哪个;或者在等待任务完成时,如果超过 5 秒还没结果就直接走超时逻辑。
这时候,Kotlin 协程为我们提供了一个极其强大的专属武器:select 挂起函数。
2.1 select 的核心语义:先到先得
select 的命名非常直观,即「选择」。它的核心逻辑是在其内部开启一个多协程竞赛。
- 竞速模型:
select会同时监听多个协程操作。 - 胜者为王:谁最先完成,
select就执行谁对应的回调。 - 一锤定音:一旦某个监听器触发并执行完毕,其返回值就是整个
select函数的返回值,其他还在进行的竞争者将被忽略(虽然它们还在后台运行,但其结果不再影响本次select)。 - 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35fun main() = runBlocking {
val scope = CoroutineScope(EmptyCoroutineContext)
val job1 = scope.launch {
delay(1000)
println("job1 done")
}
val job2 = scope.launch {
delay(2000)
println("job2 done")
}
val job = scope.launch {
val result = select {
// select 只执行最先结束的 onJoin 回调
job1.onJoin {
1
}
job2.onJoin {
2
}
}
println("result: $result")
}
// job.join()
joinAll(job, job1, job2)
}
输出
--------------------
job1 done
result: 1
job2 done
2.2 select 常用监听器
select 通过一系列带 on 前缀的内部函数来注册监听器,涵盖了协程协作的各种场景。
2.2.1 onJoin:监听 Job 的结束
与普通的 job.join() 挂起并等待不同,onJoin 只是注册一个监听。
- 语义:不论
Job是正常完成还是被取消,只要它结束了,onJoin的回调就会触发。 - 返回值:回调内部的代码逻辑结果,将作为
select的返回值。 - 注意点:
onJoin并不等同于调用join(),它更像是一个生命周期结束的回调钩子。 - 代码示例:当你启动了多个后台任务,只要其中一个执行完毕(无论成功或被取消),你就想执行下一步逻辑时,
onJoin是最佳选择。1
2
3
4
5
6
7
8
9
10val job1 = launch { delay(1000L); println("Job 1 finished") }
val job2 = launch { delay(2000L); println("Job 2 finished") }
val winner = select<String> {
job1.onJoin { "Job 1 was the fastest!" }
job2.onJoin { "Job 2 was the fastest!" }
}
println("Result: $winner")
// 输出: Job 1 finished -> Result: Job 1 was the fastest!
2.2.2 onAwait:带参数的竞速
如果你监听的是 Deferred 任务(由 async 启动),那么 onAwait 就会非常实用。
- 数据传递:与
onJoin不同,onAwait的回调函数接收一个参数,这个参数就是其所监听任务的返回结果。 - 非调用本质:它内部并未实际调用
await(),而是直接捕获结果并注入回调。 - 代码示例:如果你需要从多个异步操作中取最快返回的那个结果,请使用
onAwait。1
2
3
4
5
6
7
8
9
10val deferred1 = async { delay(1500L); "Data from Server A" }
val deferred2 = async { delay(500L); "Data from Server B" }
val fastestData = select<String> {
deferred1.onAwait { result -> "Received: $result" }
deferred2.onAwait { result -> "Received: $result" }
}
println(fastestData)
// 输出: Received: Data from Server B
2.2.3 Channel 监听:onSend 与 onReceive
Channel 在 select 中扮演着重要角色,因为它是实现多路复用(Multiplexing)的关键。
- onSend:监听数据是否发送成功。如果
select里的onSend是最早完成的(比如 Channel 的缓冲区有了空间),那么数据发送逻辑就会胜出。 - onReceive:对称地,监听是否有数据被成功接收。还有
onReceiveCatching版本,用于更优雅地处理 Channel 关闭的情况。 - 代码示例:
select对 Channel 的支持让多路复用变得非常简单。你可以同时监听「发送数据」和「接收数据」的就绪状态。注意:在使用1
2
3
4
5
6
7
8
9
10
11
12
13
14
15val channelA = Channel<String>()
val channelB = Channel<String>()
// 模拟发送端
launch { delay(100L); channelA.send("Message from A") }
launch { delay(200L); channelB.send("Message from B") }
// 消费端:谁先有数据就接谁的
val received = select<String> {
channelA.onReceive { value -> "From A: $value" }
channelB.onReceive { value -> "From B: $value" }
}
println(received)
// 输出: From A: Message from AonSend时,你必须提供要发送的值作为参数,例如channel.onSend(element) { ... }。这在处理「背压」(当 Channel 缓冲区满时,尝试向多个 Channel 发送数据,哪个能发出去就发哪个)的场景中非常有用。
2.3 全量等待 vs 竞速等待:语义对比
为了更直观地理解,我们可以通过下表对比两种协作范式:
| 范式 | 代表 API | 核心目标 | 逻辑模式 |
|---|---|---|---|
| 全量等待 | join(), await(), awaitAll() |
确保所有任务全部完成 | 串行或并行收敛 |
| 竞速等待 | select { ... } |
只要有一个最快者完成 | 先到先得 |
2.4 onTimeout:竞速中的兜底策略
在并发编程中,超时逻辑往往是最难写的。但在 select 中,onTimeout 让这一切变得异常简单。
- 唯一性:通常在一个
select块中只需要一个onTimeout。 - 触发条件:如果在指定的超时时间内(毫秒或 Duration),所有其他的
onJoin、onAwait或onSend/onReceive监听器都没有完成,那么onTimeout就会胜出,执行其回调。 - 竞速意义:
onTimeout实际上是在与任务进行时间赛跑,确保业务逻辑不会无限期地挂起。 - 代码示例:网络请求与超时的竞速。假设我们正在请求一个可能响应很慢的 API,我们希望在 2 秒内得到结果,否则返回默认值或提示错误。 在 Kotlin 1.5+ 中,你还可以使用更具可读性的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19val remoteDataDeferred = async {
delay(3000L) // 模拟一个耗时 3 秒的长任务
"Real Server Data"
}
val finalResult = select<String> {
// 监听异步任务的结果
remoteDataDeferred.onAwait { data ->
"Success: $data"
}
// 设置 2 秒的兜底策略
onTimeout(2000L) {
"Timed Out: Using cached data instead."
}
}
println(finalResult)
// 输出: Timed Out: Using cached data instead.Duration扩展属性:1
2
3
4
5
6
7
8
9
10import kotlin.time.Duration.Companion.seconds
select<Unit> {
job.onJoin { println("Task completed just in time!") }
// 使用 Duration 语义更清晰
onTimeout(1.5.seconds) {
println("Too slow! Cancelling and moving on.")
}
} - 为什么
onTimeout比withTimeout更好用?
虽然 Kotlin 提供了withTimeout挂起函数,但在复杂的多路复用场景下,select的onTimeout具有独特优势:- 非异常机制:
withTimeout在超时时会抛出TimeoutCancellationException,如果你不希望用「异常」来控制正常的业务流程(例如返回默认值),onTimeout更加自然。 - 多源合并:你可以同时监听 Channel 接收、Job 完成和超时,这在
withTimeout中需要嵌套多层逻辑,而在select中只是多写一行代码的事。
- 非异常机制:
3、互斥锁和共享变量
在多线程或多协程的环境下,我们经常会遇到一个诡异的现象:明明代码逻辑是 number++ 100 万次,再 number-- 100 万次,理论结果应该是 0,但实际运行结果却每次都不同,且由于随机性,往往偏离 0 很大。
这就是并发编程中最臭名昭著的敌人:竞态条件。
3.1 竞态条件
竞态条件指的是:程序的输出结果取决于多个线程执行的时序或交错顺序,而这种顺序是不可控的。简单来说,当两个或多个线程同时访问并尝试修改同一个共享变量,且最终结果取决于线程运行的精确顺序时,就发生了竞态条件。
触发条件:T-R-W。竞态条件通常发生在满足以下三个条件时:
- T(Threads):存在两个或更多的线程并发执行。
- R(Shared Resource):它们访问同一个共享资源(如全局变量、文件、数据库记录)。
- W(Write):至少有一个线程在执行写操作(修改资源)。
竞态本质:Read-Modify-Write(RMW)。竞态条件的本质在于,许多看起来只有一行代码的操作,在底层被拆分成了多个步骤。最经典的就是「自增」操作:
count++。底层 CPU 视角下,这行代码其实是三步:
- Read(读取):从内存把
count的值(假设是 0)读入寄存器。 - Modify(修改):在寄存器里把这个值加 1(变成 1)。
- Write(写入):把寄存器里的 1 写回到内存。
- Read(读取):从内存把
竞态案例:线程 A 执行
count++操作,线程 B 执行count--操作。- 线程 A 读取了 0。
- 在线程 A 还没来得及加 1 时,线程 B 也读取了 0。
- 线程 A 加 1 得到 1,并写入内存。
- 线程 B 减 1 得到 -1,并再次写入内存。
- 结果:线程 A 执行了
count++,线程 B 执行了count--,但内存里的值是 -1,而不是预期的 0。
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18fun main() = runBlocking {
var number = 0
val threadA = thread {
repeat(100_0000) {
number++
}
}
val threadB = thread {
repeat(100_0000) {
number--
}
}
threadA.join()
threadB.join()
println("result: $number") // 输出随机数
}本质原因:
本质上是因为
count++和count--不是原子操作。在 CPU 看来,它们是「读-改-写」三个指令。只要这三个指令之间插入了其他线程的操作,数据就会乱套。如何修正:
在 Kotlin/Java 这种典型的 Android 开发环境下,通常有三种方案:
- 加锁(
synchronized或ReentrantLock):强迫 A 完整执行完三个步骤后,B 才能开始。 - 原子类(
AtomicInteger):利用硬件指令(CAS)保证「读-改-写」是一口气完成的,中间不可拆分。 - 协程互斥锁(
Mutex):在协程环境中使用非阻塞的锁。
- 加锁(
3.2 synchronized:最简单的内置锁
3.2.1 Java 中的 synchronized
synchronized 是 Java 提供的一种「互斥锁」机制。它主要用于解决多线程环境下的数据竞态问题,确保在同一时刻,只有一个线程可以执行特定的代码块或方法。
- 核心能力:
- 原子性:确保锁定的代码段作为一个不可分割的单元执行。
- 可见性:保证在释放锁之前,对共享变量的修改对随后获取锁的线程可见。
- 有序性:虽然不禁止指令重排,但在同步块内部,程序的执行结果将遵循单线程语义。
- 方法签名:
- 实例方法同步(Instance Lock)~ 锁对象:
this(当前实例对象)
1
2
3public synchronized void updateData() {
// 线程必须获取该实例的监视器锁才能进入
} - 静态方法同步(Class Lock)~ 锁对象:
ClassName.class(该类的 Class 对象)
1
2
3public static synchronized void globalConfig() {
// 全局范围内,该类的所有实例共享这一把锁
} - 代码块同步(Block Lock)~ 锁对象:开发者自定义的任何对象
obj
1
2
3
4
5public void process() {
synchronized(lockObject) {
// 临界区代码
}
}
- 实例方法同步(Instance Lock)~ 锁对象:
- 代码示例:增加同步代码块,线程 A 执行了
count++,线程 B 执行了count--,输出符合预期的 0。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24fun main() = runBlocking {
var number = 0
val lock = Any()
val threadA = thread {
repeat(100_0000) {
synchronized(lock) {
number++
}
}
}
val threadB = thread {
repeat(100_0000) {
synchronized(lock) {
number--
}
}
}
threadA.join()
threadB.join()
println("result: $number") // 输出 0
}
3.2.3 协程中的 synchronized
同样,用 synchronized(lock) {} 包裹代码块的写法也可以用在协程中。
- 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25fun main() = runBlocking {
var number = 0
val lock = Any()
val scope = CoroutineScope(EmptyCoroutineContext)
val jobA = scope.launch {
repeat(100_0000) {
synchronized(lock) {
number++
}
}
}
val jobB = scope.launch {
repeat(100_0000) {
synchronized(lock) {
number--
}
}
}
jobA.join()
jobB.join()
println("result: $number") // 输出 0
}
3.2.3 Kotlin 中的 @synchronized
在 Kotlin 中,虽然也能使用 synchronized(lock) {} 直接包裹代码块。但 Kotlin 有自己的 API,取而代之的是 @Synchronized 注解(Annotation),但它的作用范围仅限于整个方法。如果改写线程的竞态代码,我们需要将对变量的操作提取到独立的方法中:
1 | class Counter { |
3.2.3 synchronized(lock) 与 @Synchronized 的区别
虽然它们都实现了同步,但在底层实现和灵活性上有显著差异:
| 特性 | synchronized(lock) { ... } |
@Synchronized (注解) |
|---|---|---|
| 锁定范围 | 代码块级别。可以精准控制需要同步的最小行数。 | 方法级别。锁定整个方法体,粒度较粗。 |
| 锁对象 | 自定义对象。你可以专门创建一个 val lock = Any()。 |
隐式对象。实例方法锁 this,静态方法锁 Class。 |
| 字节码实现 | 对应 monitorenter 和 monitorexit 指令。 |
在方法的访问标志中添加 ACC_SYNCHRONIZED 标记。 |
3.3 ReentrantLock:显式重入锁
3.3.1 ReentrantLock
ReentrantLock 是 Lock 接口的标准实现,提供了一种比 synchronized 更灵活、更强大的重入互斥锁。它允许开发者显式地控制锁的获取和释放,并提供了公平性选择、非阻塞锁获取以及可中断的锁请求等高级特性。
核心特性:
- 显式控制:必须手动调用
lock()和unlock()。 - 公平性选择:支持“非公平(默认)”和“公平”模式。公平锁会按照线程等待的顺序分配锁。
- 可中断性:线程在等待锁的过程中可以响应中断。
- 条件变量:支持多个
Condition对象,实现比wait/notify更精细的线程间通讯。
- 显式控制:必须手动调用
方法签名:
基础加锁与释放
方法 描述 void lock()获取锁。如果锁被占用,当前线程将阻塞直至成功。 void unlock()释放锁。必须在 finally块中调用以防死锁。高级获取机制
方法 描述 boolean tryLock()非阻塞获取。如果锁当前可用则返回 true,否则立即返回false。boolean tryLock(long time, TimeUnit unit)在指定时间内尝试获取锁,超时则放弃。 void lockInterruptibly()获取锁,但允许在等待时被 Thread.interrupt()中断并抛出异常。状态查询
方法 描述 int getHoldCount()查询当前线程保持此锁的次数(重入次数)。 boolean isHeldByCurrentThread()查询当前线程是否持有此锁。
模版代码:
为了保证异常安全性,
ReentrantLock的使用必须遵循以下try-finally模板:1
2
3
4
5
6
7
8
9
10
11val lock = ReentrantLock()
fun safeMethod() {
lock.lock() // 1. 在 try 之前加锁
try {
// 2. 临界区代码
// number++
} finally {
lock.unlock() // 3. 必须在 finally 中释放,确保不会因异常导致死锁
}
}
3.3.2 ReentrantLock 与 synchronized 的区别
| 特性 | ReentrantLock |
synchronized |
|---|---|---|
| 实现层面 | JDK API(基于 AQS 框架) | JVM 关键字(原生支持) |
| 灵活性 | 手动控制,支持跨方法块 | 自动加锁/释放,粒度固定 |
| 等待可中断 | 支持 (lockInterruptibly) |
不支持 |
| 公平性 | 支持公平与非公平 | 只支持非公平 |
| 性能 | 竞争极度激烈时通常表现更稳 | 锁升级优化后,两者在高并发下表现接近 |
3.4 AtomicInteger:原子更新
事实上,在 Java 并发编程中,还有一个 AtomicInteger 类,利用硬件底层的(CAS)指令保证「读-改-写」操作的原子性,实现了无锁的线程安全。
构造方法:
1
2
3
4
5// 创建一个初始值为 0 的 AtomicInteger
AtomicInteger atomicInt = new AtomicInteger();
// 创建一个初始值为 100 的 AtomicInteger
AtomicInteger atomicIntCustom = new AtomicInteger(100);方法签名:
这些方法是
AtomicInteger的灵魂,它们保证了操作的原子性。方法名 说明 等价的非原子操作 getAndIncrement()先获取当前值,再自增 1 i++incrementAndGet()先自增 1,再获取结果 ++igetAndDecrement()先获取当前值,再自减 1 i--decrementAndGet()先自减 1,再获取结果 --iaddAndGet(int delta)增加指定值,并返回结果 i += delta核心原理:compareAndSet,这是所有原子类的灵魂方法。
1
public fun compareAndSet(expect: Int, update: Int): Boolean
- 逻辑:如果当前值等于
expect,则将其更新为update并返回true;否则不做任何操作并返回false。 - 场景:手动实现自旋锁或复杂的并发控制逻辑。
- 逻辑:如果当前值等于
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24fun main() = runBlocking {
// 使用 AtomicInteger 代替 Int
val number = AtomicInteger(0)
val threadA = thread {
repeat(100_0000) {
// 原子性自增
number.incrementAndGet()
}
}
val threadB = thread {
repeat(100_0000) {
// 原子性自减
number.decrementAndGet()
}
}
threadA.join()
threadB.join()
// 使用 get() 获取最终数值
println("result: ${number.get()}") // 输出 0
}
3.5 Mutex:协程时代的锁
在协程环境中使用传统的 synchronized 或 ReentrantLock 会有一个致命缺点:它们会阻塞底层线程。如果协程在等待锁时把线程卡死了,那么该线程上的其他协程也将无法运行,造成极大的资源浪费。
3.4.1 Mutex:挂起式锁
Kotlin 协程引入了原生锁 Mutex。它是协程专用的互斥锁,其优势在于:当锁被占用时,尝试获取锁的协程会挂起(Suspend),让出底层线程去执行其他任务,直到锁被释放。
核心特性:
- 非阻塞挂起:等待锁时不会卡住 CPU 线程,允许线程去处理其他协程任务。
- 协程安全:专门为 Kotlin 协程设计,完美契合
suspend函数。 - 不可重入:默认实现是不可重入的。如果同一个协程在持有锁的情况下再次尝试获取锁,会导致死锁。
- 轻量级:由于不涉及操作系统内核的线程调度,在处理协程间的资源竞争时性能优异。
方法签名:
核心锁定方法
方法 描述 suspend fun lock(owner: Any? = null)挂起函数。获取锁,如果锁被占用则挂起协程。 fun unlock(owner: Any? = null)释放锁。如果 owner与加锁时不一致,会抛出异常。fun tryLock(owner: Any? = null): Boolean尝试立即获取锁。成功返回 true,失败返回false,不挂起。状态查询
方法 描述 val isLocked: Boolean<如果锁已被占用,则返回 true。
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31fun main() = runBlocking {
var number = 0
val mutex = Mutex()
val scope = CoroutineScope(EmptyCoroutineContext)
val jobA = scope.launch {
repeat(100_0000) {
try {
mutex.lock()
number++
} finally {
mutex.unlock()
}
}
}
val jobB = scope.launch {
repeat(100_0000) {
try {
mutex.lock()
number--
} finally {
mutex.unlock()
}
}
}
jobA.join()
jobB.join()
println("result: $number") // 输出 0
}
3.4.2 withLock:最佳实践
为了防止忘记释放锁,官方推荐使用扩展函数 withLock。它相当于 try { lock() } finally { unlock() } 的语法糖。
- 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
val mutex = Mutex()
var counter = 0
suspend fun safeIncrement() {
// 自动加锁并在执行完毕后(即使发生异常)释放锁
mutex.withLock {
counter++
}
} - 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25fun main() = runBlocking {
var number = 0
val mutex = Mutex()
val scope = CoroutineScope(EmptyCoroutineContext)
val jobA = scope.launch {
repeat(100_0000) {
mutex.withLock {
number++
}
}
}
val jobB = scope.launch {
repeat(100_0000) {
mutex.withLock {
number--
}
}
}
jobA.join()
jobB.join()
println("result: $number") // 输出 0
}
3.5 Semaphore:信号量
Kotlin 还提供了一个与 Java Semaphore 定位相同的 Semaphore,只不过是协程版本。Semaphore 维护了一组虚拟的 许可(Permits)。线程在执行特定操作前必须先获得许可,操作完成后释放许可。它并不强制锁定资源,而是通过计数机制来控制同时访问特定资源的线程数量。
核心能力:
- 并发限流:限制同时处理特定任务的线程上限。
- 资源池管理:常用于实现数据库连接池、线程池或文件句柄池。
- 公平性控制:支持公平模式(按 FIFO 顺序发放许可)和非公平模式(抢占式)。
- 异步释放:与
Lock不同,释放许可的线程不需要是获取许可的那个线程。
方法签名:
核心获取与释放
方法 描述 void acquire()阻塞获取。尝试获取 1 个许可,如果没有则线程进入等待状态。 void release()归还许可。增加 1 个可用许可,并可能唤醒等待中的线程。 void acquire(int n)一次性获取 n 个许可。 void release(int n)一次性归还 n 个许可。 非阻塞与尝试获取
方法 描述 boolean tryAcquire()立即返回。成功获得许可返回 true,否则返回false。boolean tryAcquire(long time, TimeUnit unit)在给定时间内尝试获取许可。 状态检视
方法 描述 int availablePermits()返回当前可用的许可总数。 int drainPermits()获取并返回立即可用的所有许可,将可用数清零。
模版代码:
在 Kotlin 中,为了防止由于逻辑异常导致许可「丢失」(即只领不还),建议配合
try-finally使用:1
2
3
4
5
6
7
8
9
10
11val semaphore = Semaphore(3) // 允许 3 个并发
fun accessResource() {
semaphore.acquire() // 获取许可
try {
// 执行受限资源访问,如:调用第三方 API
println("Thread ${Thread.currentThread().name} is working...")
} finally {
semaphore.release() // 必须释放,否则停车场会“永远客满”
}
}工作原理:计数器与等待队列
- 初始化:设置
state = N(初始许可数)。 - Acquire:检查
state是否 > 0。若是,则通过 CAS 将其减 1;若否,将线程放入等待队列。 - Release:将
state加 1,并通知等待队列中的首个线程前来领取。
- 初始化:设置
4、ThreadLocal
在 Java 并发编程的工具箱中,ThreadLocal 始终扮演着一个独特且关键的角色。它既不像局部变量那样「短命」,也不像静态变量那样「大公无私」。随着 Kotlin 协程的普及,如何在异步挂起环境中继续发挥 ThreadLocal 的余热,成为了每个 Android 和 Java 开发者必须掌握的进阶课题。
4.1 什么是 ThreadLocal
从语法结构上看,它是 Thread(线程) 的 Local Variable(局部变量)。其核心特性可以概括为:线程隔离副本。
这意味着,即便多个线程访问同一个 ThreadLocal 实例,每个线程拿到的都是该变量的一个独立副本。你在 A 线程 set 的值,B 线程永远 get 不到。
- 概念定义:
ThreadLocal是一种作用域限定于单一线程、但能突破方法边界(Method Boundary)的变量机制。它为缺乏结构化管理能力的线程,提供了跨方法传递状态的能力。 - 基本用法:
ThreadLocal通常配合泛型使用。
1 | class ProcessManager { |
代码示例:验证线程隔离性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38import kotlin.concurrent.thread
object ThreadLocalDemo {
// 1. 声明一个全局唯一的 ThreadLocal 实例
// 在 Kotlin 中通常放在 object 或 companion object 中
val threadContent = ThreadLocal<String>()
fun main(args: Array<String>) {
// 在主线程中设置值
threadContent.set("这是主线程的消息")
println("${Thread.currentThread().name} -> 读取值: ${threadContent.get()}")
// 2. 开启一个子线程
thread(name = "Thread-Sub") {
// 子线程初始读取应该是 null,因为它拿不到主线程的副本
println("${Thread.currentThread().name} -> 初始读取: ${threadContent.get()}")
// 子线程设置属于自己的副本
threadContent.set("子线程的私有秘密")
println("${Thread.currentThread().name} -> 设置后读取: ${threadContent.get()}")
}.join() // 等待子线程执行完毕
// 3. 回到主线程再次验证
// 主线程的值依然保持原样,未被子线程污染
println("${Thread.currentThread().name} -> 最终读取: ${threadContent.get()}")
// 记得清理
threadContent.remove()
}
}
输出
--------------------
main -> 读取值: 这是主线程的消息
Thread-Sub -> 初始读取: null
Thread-Sub -> 设置后读取: 子线程的私有秘密
main -> 最终读取: 这是主线程的消息隔离本质:在 JVM 层面,每一个
Thread对象内部都持有一个ThreadLocalMap。当你调用threadContent.set()时,代码本质上是在做:Thread.currentThread().threadLocals.put(threadContent, value)潜在隐患:这段代码展示了
ThreadLocal强绑定于Thread.currentThread()。这正是为什么当我们在协程中使用Dispatchers.Default或IO进行线程切换时,数据会突然「消失」的根本原因。
4.2 当 ThreadLocal 遇到 Kotlin 协程
在传统的阻塞式编程中,一段逻辑始终运行在一个线程上,ThreadLocal 表现完美。但在协程环境下,情况发生了变化:
线程切换导致的失效
协程的本质是“可挂起的计算”。一个协程可能在线程 A 上开始执行,挂起后在线程 B 上恢复。
风险点:如果你在线程 A 中通过
ThreadLocal.set()存储了值,当协程切换到线程 B 执行get()时,结果将返回null。这种不确定性会导致严重的业务 BUG。协程的等价物:CoroutineContext
在 Kotlin 协程中,
CoroutineContext实际上扮演了类似ThreadLocal的角色,且它天然支持结构化生命周期管理。
4.3 适配方案:asContextElement
当我们必须在协程中使用现有的 Java 库(这些库依赖 ThreadLocal)时,Kotlin 官方提供了一个标准且简洁的解决方案:asContextElement。
关键步骤:
- 转换:调用
threadLocal.asContextElement(value)。 - 封装:将生成的上下文元素注入到
launch、async或withContext中。
- 转换:调用
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15val myThreadLocal = ThreadLocal<String>()
fun main() = runBlocking {
// 将 ThreadLocal 转换为协程上下文的一部分
val contextElement = myThreadLocal.asContextElement(value = "Hello Coroutine!")
launch(Dispatchers.Default + contextElement) {
// 无论协程如何切换线程,这里的 get() 始终能拿到值
println("Current value: ${myThreadLocal.get()}")
delay(100) // 模拟挂起和可能的线程切换
println("Value after delay: ${myThreadLocal.get()}")
}
}保障效果:
通过这种方式,协程框架会在每次调度(挂起后恢复)时,自动将上下文中的值重新同步到当前执行线程的
ThreadLocal中。无论内部如何切换线程,只要未脱离该上下文作用域,get()的一致性就能得到保障。