c鲨鱼辣椒
2023/02/13阅读:21主题:默认主题
AQS(AbstractQueuedSynchronizer)详解
前言
AQS,AbstractQueuedSynchronizer,中文名称抽象队列同步器,是一个用于实现锁和同步器的工具类,我们熟知的 ReentrantLock、CountDownLatch、Semaphore 等就借助了 AQS 实现。通过 AQS,我们可以非常方便的实现一个自定义的同步器。简单来说,AQS 通过一个双向的先进先出(FIFO)队列(下称同步队列)来管理等待线程,如果某个线程发现前驱的线程释放了锁,便会获得锁。乍看之下,AQS 是公平锁,但实际上,线程加入队列前会先尝试获取一次锁,失败后才会加入队列按序等待。所以,在没有特殊处理的情况下,AQS 的锁分配并不具备公平性,这与 synchronized 是十分相似的。
在进入 AQS 之前,我们先来了解一下这几个锁类型:自旋锁、Ticket Lock、MCS 锁与 CLH 锁。
自旋锁、Ticket Lock、MCS 锁与 CLH 锁
一般来说,当一个线程持有了锁,那么其他尝试加锁的线程都将被阻塞。通常,线程的阻塞可以通过挂起线程来实现。但挂起线程的消耗比较大,于是,另一种方法出现了:让线程空转等待,也就是自旋:
while(!tryAcquire());
通过自旋来阻塞线程的锁叫做自旋锁:
class SpinLock {
private AtomicBoolean state = new AtomicBoolean(false);
public void lock() {
while (!state.compareAndSet(false, true)) ;
}
public void unlock() {
state.compareAndSet(true, false);
}
}
自旋锁可以避免线程频繁挂起与恢复,但却无法保证公平性,可能存在一个线程长时间无法获得锁的现象。为了解决公平性的问题,各种排队自旋锁出现了,如 Ticket Lock、MCS 锁与 CLH 锁等等。 Ticket Lock 类似于排队叫号业务。线程按取号,按照号码等待执行。
public class TicketLock {
private AtomicInteger serviceNum = new AtomicInteger(); // 服务号
private AtomicInteger ticketNum = new AtomicInteger(); // 排队号
private static final ThreadLocal<Integer> myNum = new ThreadLocal<>();
public void lock() {
// 获得排队号
myNum.set(ticketNum.getAndIncrement());
while (serviceNum.get() != myNum.get());
}
public void unlock() {
serviceNum.compareAndSet(myNum.get(), serviceNum.get() + 1)
}
}
虽然 Ticket Lock 保证了公平性,但 serviceNum 与 ticketNum 作为多个线程的共享变量,会被频繁进行读写,开销很大。为了解决这个问题,MCS 锁出现了。使用 MCS 锁则可以减少因为使用共享变量而造成的额外开销。 具体来说,MCS 锁使用队列来使线程排队获得锁。当一个线程释放锁时,将会通知自己的后继线程获得锁:
class MCSLock {
// 队列尾部
volatile MCSNode tail;
private volatile ThreadLocal<MCSNode> curNode=new ThreadLocal<>();
static class MCSNode {
// 是否被阻塞
volatile boolean isBlock = true;
MCSNode next;
}
public void lock() {
MCSNode mcsNode = new MCSNode();
// 使用CAS添加至同步队列
while (true) {
MCSNode oldTail = tail;
if (TAIL.compareAndSet(this, oldTail, mcsNode)) {
oldTail.next = mcsNode;
break;
}
}
// 等待前驱结点通知
while (mcsNode.isBlock) ;
}
public void unlock() {
// 通知后继结点
curNode.get().next.isBlock = false;
}
}
CLH 锁与 MCS 锁很类似,不同的是,MCS 中自旋轮询己身状态来等待前驱结点的通知,而 CLH 锁则自旋轮询前驱结点的状态来判断是否结束自旋。
class CLHLock {
volatile CLHNdoe tail;
private volatile ThreadLocal<CLHNdoe> curNode=new ThreadLocal<>();
static class CLHNdoe {
CLHNdoe prev;
volatile boolean isLock = true;
}
public void lock() {
CLHNdoe clhNdoe = new CLHNdoe();
curNode.set(clhNdoe);
// 使用CAS添加至同步队列
while (true) {
CLHNdoe oldTail = tail;
if (TAIL.compareAndSet(this, oldTail, clhNdoe)) {
clhNdoe.prev = oldTail;
break;
}
}
// 前驱结点是否释放了锁
while (clhNdoe.prev.isLock) ;
}
public void unlock() {
// 释放锁
curNode.get().isLock = false;
}
}
AQS 的同步队列便采取了 CLH 锁的模式。
基础概念
state
state 表示当前的共享资源状态,通过 getState(), setState(), compareAndSetState()三个方法维护。在实现自定义同步器的时候,只需要实现对 state 的获取和释放,而无需维护等待队列的入队、出队等操作。
waitStatus
用来表示每个结点的状态,有这么几种类型:
-
0,入队列的默认状态 -
1,cancelled,取消 -
-1,signal,后继结点等待唤醒 -
-2,condition,表示结点等待在 condition 上 -
-3,propagate,结点与后继结点无条件传播
独占模式
acquire():获取资源
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 尝试获取资源,也就是是新线程加入队列之前,会首先尝试争抢资源
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 独占模式(EXCLUSIVE),将线程结点加入队列尾部(addWaiter),之后尝试获得锁(acquireQueued)
selfInterrupt();
}
里面主要有 3 个方法,分别是 tryAcquire,addWaiter 和 acquireQueued,分别来看一下。
tryAcquire:尝试获取资源
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
直接抛出异常……事实上,这个方法就是自定义同步器需要实现的方法了。用户需要通过改变 state 的状态实现方法,从而完成资源的释放与获取。此类方法一共有 5 个,分别是
-
tryAcquire:尝试获取资源(独占模式) -
tryRelease:尝试释放资源(独占模式) -
tryAcquireShared:尝试获取资源(共享模式) -
tryReleaseShared:尝试释放资源(共享模式) -
isHeldExclusively:是否独占
addWaiter:将结点加入队列
private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail); // 设置前驱结点
if (compareAndSetTail(oldTail, node)) { // 通过CAS将结点加入队列
oldTail.next = node; // 加入成功,则设置next连接
return node;
}
} else {
initializeSyncQueue(); // 初始化队列:设置头结点Head与尾结点tail
}
}
}
acquireQueued:排队获取资源
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
// 尝试获得锁,失败则阻塞(park),等待恢复(unpark)
for (;;) {
final Node p = node.predecessor(); // 获取前驱结点
if (p == head && tryAcquire(arg)) { // 如果前驱是头结点,那么便有资格去尝试获取资源(可能是前驱结点唤醒自己的)
setHead(node); // 设置头结点
p.next = null; // help GC,前驱结点出队
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node)) // 判断是否需要park
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 前驱处于SIGNAL态,直接返回true,自身park等待唤醒
return true;
if (ws > 0) { // 前驱被取消
// 向前遍历,直到找到没被取消的结点,排队
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 设置成SIGNAL
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
release():释放资源
public final boolean release(int arg) {
if (tryRelease(arg)) { // 尝试释放锁
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // unpark 后继线程
return true;
}
return false;
}
上文所列举的方法是不支持相应中断(不抛出 InterruptedException 异常)的,而实际上,AQS 除了支持不响应中断的方式之外,还支持相应中断和限时抢占两种模式:
// 响应中断
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
// 限时抢占
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
例子:ReentrantLock
我们熟知的ReentrantLock的加锁和解锁就是通过AQS的独占模式所实现的。具体来说,ReentrantLock内部实现了一个同步器Sync,维护了state的状态。当state等于0时,说明资源是可抢占状态。当state=n>0时,说明持有了n个资源——也就是重入了多少次。
Sync是抽象类,其下有两个子类,分别是FairSync与NonFairSync。顾名思义,一个是公平锁,一个是非公平锁。因为AQS的抢占策略是进入排队前先尝试抢占一次,所以是非公平性的。公平锁在tryAcquire()方法里多了一个判断,只有当前线程没有前驱结点的时候才会返回true,因此避免了排队前的抢占,实现了公平性。
共享模式
除了独占模式,AQS 而支持共享模式,也就是多个线程在同时取到锁。与独占锁一样,共享锁也支持不响应中断、响应中断、限时抢占三种方式。
acquireShared:获取资源(共享模式)
与独占模式大同小异:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) // 子类需重写:负数失败,0表示成功但后续线程不能继续获取资源,正数表示成功且仍有剩余资源
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 结点是share模式
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) { // 获取资源成功
setHeadAndPropagate(node, r); // 设置头结点并且唤醒后继
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
} finally {
if (interrupted)
selfInterrupt();
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
// 仍有资源,继续唤醒
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
releaseShared:释放资源(共享模式)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 子类需重写
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 后继等待唤醒
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue;
unparkSuccessor(h); // unpark head的后继结点
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
例子:Semaphore、CountDownLatch与CyclicBarrir
Semaphore信号量和CountDownLatch就是利用共享模式所实现的。
Semaphore和ReentrantLock一样,其中也实现了抽象类Sync和子类FairSync和NonfairSync,但用的是共享模式,state的值就是许可的数量。每次获取资源会判断当前剩余的许可数量available和所需的许可数量acquires,只有available-acquires>=0并且cas(availbale,available-acquires)成功时,才会获取资源成功。
CountDownLatch的计数count即是state的值,每次调用countDown()方法,state就会减一,当state等于0时,最后一个调用countDown()的线程将会唤醒等待线程。 CyclicBarrir内部同样维护了一个count,但却和state毫无关系。它是通过ReentrantLock的condition来实现的。每次调用await,都会使count减1。当count不为0时,将会调用condition.await();当count为0时,将会调用condition.signalAll(),唤醒所有等待线程,并重置count为参与方parties的个数。
Condition
AQS 中实现 Condition 接口的是 ConditionObject。ConditionObject 采用队列来管理线程(以下称为条件队列),firstWaiter 表示头结点,lastWaiter 表示尾结点。
await():等待
await()方法会将结点加至条件队列的结尾、并释放占有的资源:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException(); // 中断异常
Node node = addConditionWaiter(); // 添加至条件队列
int savedState = fullyRelease(node); // 释放node的资源
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // node不在同步队列中
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 被中断
break;
}
// 被唤醒后
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // to THROW_IE:throw InterruptedException on exit from wait
interruptMode = REINTERRUPT; // REINTERRUPT: reinterrupt on exit from wait
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters(); // 遍历全部
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode); // 中断处理
}
// 是否在同步队列中
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node); // 从尾部遍历,判断是否在同步队列中
}
}
为什么要从尾部遍历呢?因为加入同步队列的操作,保证 node.prev 不为空,但不保证已经增加至同步队列:
// ..
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) { // CAS操作不保证成功
oldTail.next = node;
return oldTail;
}
} else {
initializeSyncQueue();
}
}
// ..
signal()
signal()方法会唤醒条件队列的第一个结点,将其移入同步队列的尾部。
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter;
private transient Node lastWaiter;
public final void signal() {
if (!isHeldExclusively()) // 是否独占,子类需重写
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first); // signal first结点
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && // 转移至同步队列
(first = firstWaiter) != null);
}
// 将条件队列中的线程结点转移至同步队列中
final boolean transferForSignal(Node node) {
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
Node p = enq(node); // 加入同步队列,返回原tail结点
int ws = p.waitStatus;
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) // 被取消或CAS失败。
LockSupport.unpark(node.thread);
return true;
}
作者介绍