c

c鲨鱼辣椒

V1

2023/02/13阅读:21主题:默认主题

AQS(AbstractQueuedSynchronizer)详解

前言

AQS,AbstractQueuedSynchronizer,中文名称抽象队列同步器,是一个用于实现锁和同步器的工具类,我们熟知的 ReentrantLock、CountDownLatch、Semaphore 等就借助了 AQS 实现。通过 AQS,我们可以非常方便的实现一个自定义的同步器。简单来说,AQS 通过一个双向的先进先出(FIFO)队列(下称同步队列)来管理等待线程,如果某个线程发现前驱的线程释放了锁,便会获得锁。乍看之下,AQS 是公平锁,但实际上,线程加入队列前会先尝试获取一次锁,失败后才会加入队列按序等待。所以,在没有特殊处理的情况下,AQS 的锁分配并不具备公平性,这与 synchronized 是十分相似的。

在进入 AQS 之前,我们先来了解一下这几个锁类型:自旋锁、Ticket Lock、MCS 锁与 CLH 锁。

自旋锁、Ticket Lock、MCS 锁与 CLH 锁

一般来说,当一个线程持有了锁,那么其他尝试加锁的线程都将被阻塞。通常,线程的阻塞可以通过挂起线程来实现。但挂起线程的消耗比较大,于是,另一种方法出现了:让线程空转等待,也就是自旋:

    while(!tryAcquire());

通过自旋来阻塞线程的锁叫做自旋锁:

class SpinLock {

    private AtomicBoolean state = new AtomicBoolean(false);

    public void lock() {
        while (!state.compareAndSet(falsetrue)) ;
    }

    public void unlock() {
        state.compareAndSet(truefalse);
    }
}

自旋锁可以避免线程频繁挂起与恢复,但却无法保证公平性,可能存在一个线程长时间无法获得锁的现象。为了解决公平性的问题,各种排队自旋锁出现了,如 Ticket Lock、MCS 锁与 CLH 锁等等。 Ticket Lock 类似于排队叫号业务。线程按取号,按照号码等待执行。

public class TicketLock {
    private AtomicInteger serviceNum = new AtomicInteger(); // 服务号
    private AtomicInteger ticketNum = new AtomicInteger(); // 排队号
    private static final ThreadLocal<Integer> myNum = new ThreadLocal<>();

    public void lock() {
        // 获得排队号
        myNum.set(ticketNum.getAndIncrement());
        while (serviceNum.get() != myNum.get());
    }

    public void unlock() {
        serviceNum.compareAndSet(myNum.get(), serviceNum.get() + 1)
    }
}

虽然 Ticket Lock 保证了公平性,但 serviceNum 与 ticketNum 作为多个线程的共享变量,会被频繁进行读写,开销很大。为了解决这个问题,MCS 锁出现了。使用 MCS 锁则可以减少因为使用共享变量而造成的额外开销。 具体来说,MCS 锁使用队列来使线程排队获得锁。当一个线程释放锁时,将会通知自己的后继线程获得锁:

class MCSLock {
    // 队列尾部
    volatile MCSNode tail;

    private volatile ThreadLocal<MCSNode> curNode=new ThreadLocal<>();

    static class MCSNode {
        // 是否被阻塞
        volatile boolean isBlock = true;
        MCSNode next;
    }

    public void lock() {
        MCSNode mcsNode = new MCSNode();
        // 使用CAS添加至同步队列
        while (true) {
            MCSNode oldTail = tail;
            if (TAIL.compareAndSet(this, oldTail, mcsNode)) {
                oldTail.next = mcsNode;
                break;
            }
        }
        // 等待前驱结点通知
        while (mcsNode.isBlock) ;
    }

    public void unlock() {
        // 通知后继结点
        curNode.get().next.isBlock = false;
    }
}

CLH 锁与 MCS 锁很类似,不同的是,MCS 中自旋轮询己身状态来等待前驱结点的通知,而 CLH 锁则自旋轮询前驱结点的状态来判断是否结束自旋。

class CLHLock {
    volatile CLHNdoe tail;

    private volatile ThreadLocal<CLHNdoe> curNode=new ThreadLocal<>();

    static class CLHNdoe {
        CLHNdoe prev;
        volatile boolean isLock = true;
    }

    public void lock() {
        CLHNdoe clhNdoe = new CLHNdoe();
        curNode.set(clhNdoe);
        // 使用CAS添加至同步队列
        while (true) {
            CLHNdoe oldTail = tail;
            if (TAIL.compareAndSet(this, oldTail, clhNdoe)) {
                clhNdoe.prev = oldTail;
                break;
            }
        }
        // 前驱结点是否释放了锁
        while (clhNdoe.prev.isLock) ;
    }

    public void unlock() {
        // 释放锁
        curNode.get().isLock = false;
    }
}

AQS 的同步队列便采取了 CLH 锁的模式。

基础概念

state

state 表示当前的共享资源状态,通过 getState(), setState(), compareAndSetState()三个方法维护。在实现自定义同步器的时候,只需要实现对 state 的获取和释放,而无需维护等待队列的入队、出队等操作。

waitStatus

用来表示每个结点的状态,有这么几种类型:

  • 0,入队列的默认状态
  • 1,cancelled,取消
  • -1,signal,后继结点等待唤醒
  • -2,condition,表示结点等待在 condition 上
  • -3,propagate,结点与后继结点无条件传播

独占模式

acquire():获取资源

    public final void acquire(int arg) {
        if (!tryAcquire(arg) && // 尝试获取资源,也就是是新线程加入队列之前,会首先尝试争抢资源
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 独占模式(EXCLUSIVE),将线程结点加入队列尾部(addWaiter),之后尝试获得锁(acquireQueued)
            selfInterrupt();
    }

里面主要有 3 个方法,分别是 tryAcquire,addWaiter 和 acquireQueued,分别来看一下。

tryAcquire:尝试获取资源

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

直接抛出异常……事实上,这个方法就是自定义同步器需要实现的方法了。用户需要通过改变 state 的状态实现方法,从而完成资源的释放与获取。此类方法一共有 5 个,分别是

  • tryAcquire:尝试获取资源(独占模式)
  • tryRelease:尝试释放资源(独占模式)
  • tryAcquireShared:尝试获取资源(共享模式)
  • tryReleaseShared:尝试释放资源(共享模式)
  • isHeldExclusively:是否独占

addWaiter:将结点加入队列

private Node addWaiter(Node mode) {
        Node node = new Node(mode);

        for (;;) {
            Node oldTail = tail;
            if (oldTail != null) {
                node.setPrevRelaxed(oldTail); // 设置前驱结点
                if (compareAndSetTail(oldTail, node)) { // 通过CAS将结点加入队列
                    oldTail.next = node; // 加入成功,则设置next连接
                    return node;
                }
            } else {
                initializeSyncQueue(); // 初始化队列:设置头结点Head与尾结点tail
            }
        }
    }

acquireQueued:排队获取资源

    final boolean acquireQueued(final Node node, int arg) {
        boolean interrupted = false;
        try {
            // 尝试获得锁,失败则阻塞(park),等待恢复(unpark)
            for (;;) {
                final Node p = node.predecessor(); // 获取前驱结点
                if (p == head && tryAcquire(arg)) { // 如果前驱是头结点,那么便有资格去尝试获取资源(可能是前驱结点唤醒自己的)
                    setHead(node); // 设置头结点
                    p.next = null// help GC,前驱结点出队
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node)) // 判断是否需要park
                    interrupted |= parkAndCheckInterrupt();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            if (interrupted)
                selfInterrupt();
            throw t;
        }
    }

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) // 前驱处于SIGNAL态,直接返回true,自身park等待唤醒
            return true;
        if (ws > 0) { // 前驱被取消
            // 向前遍历,直到找到没被取消的结点,排队
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 设置成SIGNAL
            pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
        }
        return false;
    }

release():释放资源

    public final boolean release(int arg) {
        if (tryRelease(arg)) { // 尝试释放锁
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h); // unpark 后继线程
            return true;
        }
        return false;
    }

上文所列举的方法是不支持相应中断(不抛出 InterruptedException 异常)的,而实际上,AQS 除了支持不响应中断的方式之外,还支持相应中断和限时抢占两种模式:

    // 响应中断
    public final void acquireInterruptibly(int arg)
            throws InterruptedException 
{
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

    // 限时抢占
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException 
{
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

例子:ReentrantLock

我们熟知的ReentrantLock的加锁和解锁就是通过AQS的独占模式所实现的。具体来说,ReentrantLock内部实现了一个同步器Sync,维护了state的状态。当state等于0时,说明资源是可抢占状态。当state=n>0时,说明持有了n个资源——也就是重入了多少次。

Sync是抽象类,其下有两个子类,分别是FairSync与NonFairSync。顾名思义,一个是公平锁,一个是非公平锁。因为AQS的抢占策略是进入排队前先尝试抢占一次,所以是非公平性的。公平锁在tryAcquire()方法里多了一个判断,只有当前线程没有前驱结点的时候才会返回true,因此避免了排队前的抢占,实现了公平性。

共享模式

除了独占模式,AQS 而支持共享模式,也就是多个线程在同时取到锁。与独占锁一样,共享锁也支持不响应中断、响应中断、限时抢占三种方式。

acquireShared:获取资源(共享模式)

与独占模式大同小异:

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0// 子类需重写:负数失败,0表示成功但后续线程不能继续获取资源,正数表示成功且仍有剩余资源
            doAcquireShared(arg);
    }

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED); // 结点是share模式
        boolean interrupted = false;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) { // 获取资源成功
                        setHeadAndPropagate(node, r); // 设置头结点并且唤醒后继
                        p.next = null// help GC
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node))
                    interrupted |= parkAndCheckInterrupt();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        } finally {
            if (interrupted)
                selfInterrupt();
        }
    }

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head;
        setHead(node);
        // 仍有资源,继续唤醒
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

releaseShared:释放资源(共享模式)

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) { // 子类需重写
            doReleaseShared();
            return true;
        }
        return false;
    }

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) { // 后继等待唤醒
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;
                    unparkSuccessor(h); // unpark head的后继结点
                }
                else if (ws == 0 &&
                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    continue;
            }
            if (h == head)
                break;
        }
    }

例子:Semaphore、CountDownLatch与CyclicBarrir

Semaphore信号量和CountDownLatch就是利用共享模式所实现的。

Semaphore和ReentrantLock一样,其中也实现了抽象类Sync和子类FairSync和NonfairSync,但用的是共享模式,state的值就是许可的数量。每次获取资源会判断当前剩余的许可数量available和所需的许可数量acquires,只有available-acquires>=0并且cas(availbale,available-acquires)成功时,才会获取资源成功。

CountDownLatch的计数count即是state的值,每次调用countDown()方法,state就会减一,当state等于0时,最后一个调用countDown()的线程将会唤醒等待线程。 CyclicBarrir内部同样维护了一个count,但却和state毫无关系。它是通过ReentrantLock的condition来实现的。每次调用await,都会使count减1。当count不为0时,将会调用condition.await();当count为0时,将会调用condition.signalAll(),唤醒所有等待线程,并重置count为参与方parties的个数。

Condition

AQS 中实现 Condition 接口的是 ConditionObject。ConditionObject 采用队列来管理线程(以下称为条件队列),firstWaiter 表示头结点,lastWaiter 表示尾结点。

await():等待

await()方法会将结点加至条件队列的结尾、并释放占有的资源:

public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException(); // 中断异常
        Node node = addConditionWaiter(); // 添加至条件队列
        int savedState = fullyRelease(node); // 释放node的资源
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) { // node不在同步队列中
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0// 被中断
                break;
        }
        // 被唤醒后
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // to THROW_IE:throw InterruptedException on exit from wait
            interruptMode = REINTERRUPT; // REINTERRUPT: reinterrupt on exit from wait
        if (node.nextWaiter != null// clean up if cancelled
            unlinkCancelledWaiters(); // 遍历全部
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode); // 中断处理
    }

    // 是否在同步队列中
    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null// If has successor, it must be on queue
            return true;

        return findNodeFromTail(node); // 从尾部遍历,判断是否在同步队列中
    }
}

为什么要从尾部遍历呢?因为加入同步队列的操作,保证 node.prev 不为空,但不保证已经增加至同步队列:

    // ..
        for (;;) {
            Node oldTail = tail;
            if (oldTail != null) {
                node.setPrevRelaxed(oldTail);
                if (compareAndSetTail(oldTail, node)) { // CAS操作不保证成功
                    oldTail.next = node;
                    return oldTail;
                }
            } else {
                initializeSyncQueue();
            }
        }
    // ..

signal()

signal()方法会唤醒条件队列的第一个结点,将其移入同步队列的尾部。

public class ConditionObject implements Conditionjava.io.Serializable {
    private transient Node firstWaiter;
    private transient Node lastWaiter;

    public final void signal() {
        if (!isHeldExclusively()) // 是否独占,子类需重写
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first); // signal first结点
    }

    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) && // 转移至同步队列
                (first = firstWaiter) != null);
    }

    // 将条件队列中的线程结点转移至同步队列中
    final boolean transferForSignal(Node node) {
        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
            return false;

        Node p = enq(node); // 加入同步队列,返回原tail结点
        int ws = p.waitStatus;
        if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) // 被取消或CAS失败。
            LockSupport.unpark(node.thread);
        return true;
    }

分类:

后端

标签:

后端

作者介绍

c
c鲨鱼辣椒
V1