Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
379 changes: 377 additions & 2 deletions docs/java/concurrent/aqs.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,93 @@ AQS 定义两种资源共享方式:`Exclusive`(独占,只有一个线程

一般来说,自定义同步器的共享方式要么是独占,要么是共享,他们也只需实现`tryAcquire-tryRelease`、`tryAcquireShared-tryReleaseShared`中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如`ReentrantReadWriteLock`。

### 独占模式与共享模式的深入对比

上面简要介绍了 AQS 的两种资源共享方式,下面从多个维度对独占模式和共享模式进行系统对比,帮助更深入地理解二者的差异。

#### 特性对比

| 对比维度 | 独占模式(Exclusive) | 共享模式(Share) |
| --- | --- | --- |
| **并发度** | 同一时刻只有一个线程能获取到资源 | 同一时刻可以有多个线程同时获取到资源 |
| **获取资源入口** | `acquire(int arg)` | `acquireShared(int arg)` |
| **释放资源入口** | `release(int arg)` | `releaseShared(int arg)` |
| **需要重写的模板方法** | `tryAcquire(int)` / `tryRelease(int)` | `tryAcquireShared(int)` / `tryReleaseShared(int)` |
| **tryXxx 返回值** | `boolean`,`true` 表示获取/释放成功 | `int`(获取时),负数表示失败,0 表示成功但无剩余资源,正数表示成功且有剩余资源;`boolean`(释放时) |
| **唤醒后继节点** | 释放资源时唤醒一个后继节点 | 获取资源成功后,如果还有剩余资源,会继续唤醒后续节点(传播唤醒) |
| **Node 类型标识** | `Node.EXCLUSIVE`(`null`) | `Node.SHARED`(一个静态的 `Node` 实例) |
| **典型实现** | `ReentrantLock`、`ReentrantReadWriteLock` 的写锁 | `Semaphore`、`CountDownLatch`、`ReentrantReadWriteLock` 的读锁 |

#### `state` 在不同同步器中的语义

AQS 中的 `state` 是一个通用的同步状态变量,不同的同步器赋予它不同的含义:

| 同步器 | 模式 | `state` 的语义 |
| --- | --- | --- |
| `ReentrantLock` | 独占 | 表示锁的重入次数。`state == 0` 表示锁空闲;`state > 0` 表示锁被持有,值为重入次数 |
| `ReentrantReadWriteLock` | 独占 + 共享 | 高 16 位表示读锁的持有数量(共享),低 16 位表示写锁的重入次数(独占) |
| `Semaphore` | 共享 | 表示可用许可证的数量。每次 `acquire()` 减少,`release()` 增加 |
| `CountDownLatch` | 共享 | 表示需要等待的计数。每次 `countDown()` 减 1,到 0 时唤醒所有等待线程 |

下面通过一个代码示例来直观感受独占模式和共享模式在使用上的区别:

```java
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.ReentrantLock;

public class ExclusiveVsSharedDemo {
public static void main(String[] args) {
// 独占模式:同一时刻只有 1 个线程能进入临界区
ReentrantLock lock = new ReentrantLock();

// 共享模式:同一时刻最多 3 个线程能进入临界区
Semaphore semaphore = new Semaphore(3);

// 独占模式示例
Runnable exclusiveTask = () -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName()
+ " 获取到独占锁,正在执行...");
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
};

// 共享模式示例
Runnable sharedTask = () -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()
+ " 获取到许可证,正在执行...");
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release();
}
};

System.out.println("=== 独占模式(ReentrantLock)===");
for (int i = 0; i < 5; i++) {
new Thread(exclusiveTask, "独占线程-" + i).start();
}

try { Thread.sleep(3000); } catch (InterruptedException e) { }

System.out.println("\n=== 共享模式(Semaphore)===");
for (int i = 0; i < 5; i++) {
new Thread(sharedTask, "共享线程-" + i).start();
}
}
}
```

运行上面的代码可以观察到:独占模式下 5 个线程严格按顺序一个一个执行,而共享模式下最多有 3 个线程同时执行。

### AQS 资源获取源码分析(独占模式)

AQS 中以独占模式获取资源的入口方法是 `acquire()` ,如下:
Expand Down Expand Up @@ -929,9 +1016,296 @@ protected final boolean tryReleaseShared(int releases) {

`doReleaseShared()` 方法在前文获取资源(共享模式)的部分已进行了详细的源码分析,此处不再重复。

## 常见同步工具类
### Condition 条件队列的工作机制

前面在 `waitStatus` 状态表格中提到过 `CONDITION`(值为 -2)状态,表示节点在 Condition 条件队列中等待。这里系统讲解 Condition 条件队列的工作机制。

#### 什么是 Condition?

`Condition` 是 `java.util.concurrent.locks` 包中定义的接口,它提供了类似于 `Object.wait()` / `Object.notify()` 的线程等待/通知机制,但功能更加强大和灵活。`Condition` 必须与 `Lock` 配合使用,就像 `wait/notify` 必须与 `synchronized` 配合使用一样。

与 `Object` 的 `wait/notify` 相比,`Condition` 的主要优势在于:

- **支持多个等待队列**:一个 `Lock` 可以创建多个 `Condition` 实例,不同的线程可以在不同的条件上等待,实现更精细的线程协作。而 `synchronized` 只有一个等待队列。
- **支持不响应中断的等待**:`Condition` 提供了 `awaitUninterruptibly()` 方法。
- **支持超时等待**:`Condition` 提供了 `awaitNanos(long)` 和 `await(long, TimeUnit)` 方法,可以设定等待的截止时间。

#### AQS 中的两种队列

在 AQS 内部实际上维护了 **两种队列**:

1. **同步队列(CLH 变体队列)**:就是前面详细分析过的双向队列,用于存放获取资源失败而等待的线程节点。
2. **条件队列(Condition Queue)**:是一个单向链表,用于存放调用了 `Condition.await()` 方法而等待的线程节点。每个 `Condition` 实例维护一个独立的条件队列。

条件队列中的节点使用 `Node` 的 `nextWaiter` 指针来链接下一个节点,形成单向链表。条件队列的头节点为 `firstWaiter`,尾节点为 `lastWaiter`。

#### Condition 的核心工作流程

AQS 的内部类 `ConditionObject` 实现了 `Condition` 接口,其核心方法为 `await()` 和 `signal()`。

**`await()` 方法的工作流程:**

1. 将当前线程封装为 `Node` 节点(`waitStatus` 设置为 `CONDITION`),加入到条件队列的尾部。
2. 完全释放当前线程持有的锁(即将 `state` 值置为 0),并保存释放前的 `state` 值。
3. 阻塞当前线程,等待被 `signal()` 唤醒或被中断。
4. 被唤醒后,重新通过 `acquireQueued()` 进入同步队列竞争锁,并恢复之前保存的 `state` 值(重入次数)。

**`signal()` 方法的工作流程:**

1. 检查调用 `signal()` 的线程是否持有锁(不持有则抛出 `IllegalMonitorStateException`)。
2. 将条件队列中第一个等待的节点从条件队列移除。
3. 将该节点的 `waitStatus` 从 `CONDITION` 修改为 `0`,并通过 `enq()` 方法将其加入到同步队列的尾部。
4. 如果同步队列中前驱节点的状态异常(`CANCELLED`)或者 CAS 设置前驱节点状态为 `SIGNAL` 失败,则直接唤醒该线程。

`signalAll()` 方法与 `signal()` 类似,区别在于它会将条件队列中的 **所有** 节点都转移到同步队列中。

下面的代码示例展示了 `Condition` 的典型用法——实现一个简单的有界阻塞队列:

```java
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class SimpleBlockingQueue<T> {
private final Queue<T> queue = new LinkedList<>();
private final int capacity;
private final ReentrantLock lock = new ReentrantLock();
// 两个不同的条件队列:分别用于"队列不满"和"队列不空"
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();

public SimpleBlockingQueue(int capacity) {
this.capacity = capacity;
}

/**
* 向队列中添加元素,如果队列已满则等待。
*/
public void put(T item) throws InterruptedException {
lock.lock();
try {
// 队列满时,在 notFull 条件上等待
while (queue.size() == capacity) {
notFull.await();
}
queue.offer(item);
// 添加元素后,通知在 notEmpty 条件上等待的消费者线程
notEmpty.signal();
} finally {
lock.unlock();
}
}

/**
* 从队列中取出元素,如果队列为空则等待。
*/
public T take() throws InterruptedException {
lock.lock();
try {
// 队列空时,在 notEmpty 条件上等待
while (queue.isEmpty()) {
notEmpty.await();
}
T item = queue.poll();
// 取出元素后,通知在 notFull 条件上等待的生产者线程
notFull.signal();
return item;
} finally {
lock.unlock();
}
}

public static void main(String[] args) {
SimpleBlockingQueue<Integer> blockingQueue = new SimpleBlockingQueue<>(5);

// 生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
blockingQueue.put(i);
System.out.println("生产: " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Producer");

// 消费者线程
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
int item = blockingQueue.take();
System.out.println("消费: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Consumer");

producer.start();
consumer.start();
}
}
```

下面介绍几个基于 AQS 的常见同步工具类。
在上面的例子中,`notFull` 和 `notEmpty` 是两个独立的 `Condition` 实例,分别维护各自的条件队列。生产者在队列满时在 `notFull` 上等待,消费者在队列空时在 `notEmpty` 上等待。这种分离等待条件的设计,避免了不必要的线程唤醒,比 `synchronized` + `wait/notifyAll` 更加高效。

#### `await()` 核心源码分析

```java
// AQS 内部类 ConditionObject
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1、将当前线程封装为 Node 节点,加入条件队列
Node node = addConditionWaiter();
// 2、完全释放锁,并保存释放前的 state 值
int savedState = fullyRelease(node);
int interruptMode = 0;
// 3、如果节点不在同步队列中,则阻塞当前线程
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4、被唤醒后,重新进入同步队列竞争锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
```

`await()` 方法中有两个关键操作:

- `fullyRelease(node)`:完全释放锁(而不是只释放一次),这样即使线程重入了多次锁,也能在等待期间让其他线程获取到锁。被唤醒后会通过 `acquireQueued(node, savedState)` 恢复之前的重入次数。
- `isOnSyncQueue(node)`:判断节点是否已经被转移到同步队列。当其他线程调用 `signal()` 时,节点会从条件队列转移到同步队列,此时 `isOnSyncQueue()` 返回 `true`,线程退出 `while` 循环,开始竞争锁。

### 公平锁与非公平锁的性能差异分析

前面的源码分析中,以 `ReentrantLock` 的非公平锁为例讲解了 `tryAcquire()` 的实现。实际上 `ReentrantLock` 同时支持公平锁和非公平锁两种模式。这里深入分析二者的实现差异及其对性能的影响。

#### 源码层面的差异

`ReentrantLock` 默认使用非公平锁,通过构造参数可以切换为公平锁:

```java
// 非公平锁(默认)
ReentrantLock unfairLock = new ReentrantLock();
// 公平锁
ReentrantLock fairLock = new ReentrantLock(true);
```

二者的核心差异在于 `tryAcquire()` 方法的实现。非公平锁的 `nonfairTryAcquire()` 前面已经分析过,下面看公平锁的实现:

```java
// ReentrantLock.FairSync
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 关键差异:先调用 hasQueuedPredecessors() 判断同步队列中是否有等待更久的线程
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
```

**唯一的区别** 就是公平锁在 CAS 修改 `state` 之前多了一个 `hasQueuedPredecessors()` 判断:

```java
// AQS
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
```

这个方法用于判断当前线程之前是否有其他线程在排队。如果有,则当前线程不能直接获取锁,必须排队等待,从而保证了 **FIFO** 的公平性。

而非公平锁没有这个判断,当锁刚好释放时,新来的线程可以直接通过 CAS 抢到锁,即使同步队列中已经有其他线程在等待。

#### 性能差异对比

| 对比维度 | 非公平锁(默认) | 公平锁 |
| --- | --- | --- |
| **吞吐量** | 更高。新线程有机会直接获取锁,减少了线程上下文切换 | 较低。所有线程都必须排队,增加了上下文切换的开销 |
| **线程饥饿** | 可能发生。极端情况下某些线程长时间无法获取锁 | 不会发生。严格按照请求顺序分配锁 |
| **上下文切换** | 较少。持有锁的线程释放锁后,新到达的线程可能直接获取锁,不需要唤醒队列中的线程 | 较多。每次释放锁都需要唤醒队列中的下一个线程 |
| **适用场景** | 大多数场景(对响应时间和吞吐量要求较高) | 对公平性有严格要求的场景(如资源分配、任务调度) |

**为什么非公平锁性能通常更好?**

关键原因在于 **减少了线程上下文切换的次数**。当持有锁的线程 A 释放锁后:

- **非公平锁**:此时如果恰好有线程 B 正在尝试获取锁(还没有进入同步队列),线程 B 可以直接通过 CAS 获取到锁并立即执行,省去了唤醒队列中线程的开销。而队列中等待的线程被唤醒后发现锁被占用,会重新阻塞,虽然看起来"浪费"了一次唤醒,但总体上减少了线程切换次数。
- **公平锁**:线程 B 必须排到队列尾部,然后唤醒队列头部的线程。从线程被唤醒到真正开始执行之间,存在一段 **调度延迟**(线程状态从阻塞切换到运行),在这段延迟期间锁处于空闲状态,降低了锁的利用率。

Doug Lea 在 `ReentrantLock` 的文档中指出:使用公平锁的程序在多线程环境下的总体吞吐量通常低于使用非公平锁的程序(即更慢),因此 `ReentrantLock` 默认使用非公平模式。但在需要保证请求处理顺序或避免线程饥饿的场景中(如连接池分配),公平锁是更好的选择。

下面通过代码示例来演示公平锁与非公平锁在行为上的差异:

```java
import java.util.concurrent.locks.ReentrantLock;

public class FairVsUnfairLockDemo {
// 分别测试公平锁和非公平锁
private static void testLock(ReentrantLock lock, String lockType) {
System.out.println("=== " + lockType + " ===");
Runnable task = () -> {
for (int i = 0; i < 2; i++) {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " 获取到锁");
} finally {
lock.unlock();
}
}
};

Thread[] threads = new Thread[5];
for (int i = 0; i < 5; i++) {
threads[i] = new Thread(task, lockType + "-线程-" + i);
}
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
try { t.join(); } catch (InterruptedException e) { }
}
System.out.println();
}

public static void main(String[] args) {
// 非公平锁:同一个线程可能连续多次获取到锁
testLock(new ReentrantLock(false), "非公平锁");

// 公平锁:线程按请求顺序交替获取锁
testLock(new ReentrantLock(true), "公平锁");
}
}
```

运行上面的代码可以观察到:非公平锁模式下,同一个线程可能连续多次获取到锁(因为它释放锁后立即又去竞争,有很大概率在队列中的线程被唤醒之前就抢到了锁);而公平锁模式下,线程获取锁的顺序更加均匀,不会出现某个线程连续霸占锁的情况。

## 常见同步工具类

### Semaphore(信号量)

Expand Down Expand Up @@ -1610,3 +1984,4 @@ threadnum:7is finish
- 从 ReentrantLock 的实现看 AQS 的原理及应用:<https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html>

<!-- @include: @article-footer.snippet.md -->
````
Loading