Loading...
墨滴

小飞飞

2021/11/24  阅读:23  主题:科技蓝

Java并发系列-AQS详解

Java并发系列-AQS详解

前言

  • AQS核心思想是什么?如何实现的,以及底层的数据结构
  • 线程获取锁失败,之后的处理流程是什么
  • 处于排队等候机制中的线程一直无法获取到锁,需要一直等待嘛,AQS还有用别的策略解决这一问题
  • Lock函数通过Acquire方法进行加锁的时候,具体是如何进行加锁

AQS简介

AQS全称为AbstractQueuedSynchronizer,AQS是用来构建锁和同步器的框架,类似我们常用的ReentrantLock,CountDownLatch,ThreadPoolExecutor等,核心思想主要是:用一个volatle修饰的state表示共享资源是否空闲,如果请求的共享资源空闲,会将当前的请求资源的线程设置为有效工作线程,并且会将共享资源设置为锁定状态。如果被请求的共享资源被占用的话,AQS通过CLH队列实现了线程阻塞等待以及唤醒机制,会将暂时获取不到锁的线程加入到队列中

 private volatile int state;
status
status

state

我们可以看到,这个状态量为private,内部提供了几个访问这个字段的方法(final修饰的,子类无法重写)

//获取State值
 protected final int getState() {
        return state;
    }
//设置State的值
    protected final void setState(int newState) {
        state = newState;
    }
//使用CAS方式更新State
    protected final boolean compareAndSetState(int expect, int update) {  
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

独占和共享

通过和state对比,实现独占和共享

/**
*独占
*/

final boolean nonfairTryAcquire(int acquires) {
  final Thread current = Thread.currentThread();
  int c = getState();
  //判断state是否等于0 线程进行独占操作
   if (c == 0) {
       ……
 }  
}
   /**
   *共享
   */

private void doAcquireShared(int arg) {
  for (;;) {
        ……
    // 尝试获取一定数量的锁
    int r = tryAcquireShared(arg);
    ////判断state是否大于0进行共享操作
        if (r >= 0) {
     // 获取锁成功,而且还有剩余资源,就设置当前结点为head,并继续唤醒下一个线程
     setHeadAndPropagate(node, r);
        ……
    }
}

自定义的同步器要么实现独占,要么实现共享,我们在重写的时候,只需要实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。接下来我们非公平锁为例子,了解一下队列

队列

多线程来请求共享资源,如果线程获取锁失败(获取锁的方式分为公平和非公平方式),就需要加入等待队列中,带着下面两个问题,我们看下队列相关的知识点

  • 获取锁失败的线程何时加入队列
  • 如何加入队列
何时加入队列

我们借助ReentrantLock来看下AQS相关队列的东西,平常开发过程中我们使用ReentrantLock的时候,一般是这样使用的

  Lock lock = new ReentrantLock();
   try {
          lock.lock();
             ……
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

ReentrantLock底层是AQS来实现的,ReentrantLock内部有分为公平锁和非公平锁,

//默认是非公平锁
public ReentrantLock() {
        sync = new NonfairSync();
    }
 public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
}

//非公平锁调用Lock
static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        final void lock() {
            if (compareAndSetState(01))
              //获取成功,不再往下执行
                setExclusiveOwnerThread(Thread.currentThread());
            else
              //失败,将线程放入等待队列中
                acquire(1);
        }
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
   //调用AQS中再次尝试获取锁,获取失败,调用addWait加入等待队列中
   public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

从上面源代码中,我们可以总结出第一个问题的答案,当线程执行Acquire(1)尝试获取锁的时候,如果获取失败,调用addWaiter加入等待队列中

如何加入队列

获取锁失败之后,会调用addWaiter方法,将线程封装成Node节点加入队列中,具体实现方法如下

//java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter
private Node addWaiter(Node mode) {
  //当前节点新建一个节点 
  Node node = new Node(Thread.currentThread(), mode);
   // 节点的前驱节点执行CLH尾结点pred
    Node pred = tail;
    if (pred != null) {
      //完成尾结点设置
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
          //pred指的是尾节点
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
新来的节点
新来的节点

如图所示,

  • 新节点来的之后,首先将新节点的前驱节点指向尾节点
  • 通过cas(pred,node)方法完成尾结点指向新插入的节点这个操作
    • 为啥用CAS方式设置尾部节点,原因就是如果我们在设置途中,如果别的线程比如T2先这个节点插入尾部,这个时候Pred指针指的是老的位置,Tail这个时候应该指向的是新T2线程的Node节点位置,这个时候CAS失败,就需要走enq方法完成尾节点设置。
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
              //节点为空,初始化节点出来
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
              //节点不为空,就是对应设置尾结点失败走addWaiter()方法那一套完成尾结点设置
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
啥时候出队列

这线程以Node节点的方式添加到等待队列中了,那什么时候出去呢,接下来我们带着这两个问题一起了解下啥时候出队列

  • 何时出队列(添加节点成功之后 队列中的元素就会一直尝试获取锁)
  • 如何出队列(节点获取锁成功,将节点设置为取消状态,节点出队列)

上面我们讲的addWait方法,会将线程以Node节点的数据结构添加到双端队列中,这个方法其实是返回一个包含改线程的Node,Node作为参数进入到acquireQueued()方法。acquireQueued()就是对排队的线程获取锁操作,这玩意会一直把放入队列中的线程不断的获取锁,一直到获取成功或者不再获取(线程被中断)。我们一起看下**acquireQueued()**方法

   public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
             //线程设置中断,当前线程挂起
            selfInterrupt();
    }

    final boolean acquireQueued(final Node node, int arg) {
      //标记是否成功拿到资源 
      boolean failed = true;
        try {
          //中断标识
            boolean interrupted = false;
            for (;;) {
              //开始自旋,等待的线程要么获取到锁,要么就中断,才会跳出for循环
               //获取前驱节点
                final Node p = node.predecessor();
                //当前节点的前驱节点是头节点,说明当前节点在真实数据的首部,说明当前节点前面没有节点等待获取锁资源,则可以尝试获取锁
                if (p == head && tryAcquire(arg)) {
                   //获取锁成功,将当前节点设置为头结点
                    setHead(node);
                    p.next = null// help GC
                    failed = false;
                    return interrupted;
                }
              /**
               * 头节点没有获取到锁两种情况 1:p节点不是头节点 2:非公平锁,被别的锁捷足先登被强占
               *  出现上述两种情况,判断当前Node是否需要被阻塞(防止无限循环浪费资源) 这里判断方法主要根据前驱节点状态去判断是否挂起,方法主要是                          * shouldParkAfterFailedAcquire()会将前驱节点的waitStatus变为了SIGNAL=-1, parkAndCheckInterrupt会挂起当前线程,然后判断线程是否        * 被中断,中断就重新跑CAS自旋
               */

                if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
                  //将当前线程挂起,阻止调用栈
                    interrupted = true;
            }
        } finally {
             // 拿到了锁,failed=false,没有拿到锁则将此线程对应的node的waitStatus改为CANCEL(1),CANCEL状态表示当前结点已取消调度。当timeout或被中断(响应中断的情况,下)会触发变更为此状态,进入该状态后的结点将不会再变化。 
            if (failed)
                cancelAcquire(node);
        }
    }
 /**
 *java.util.concurrent.locks.AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire
 * 根据前驱节点判断当前线程是否应该被阻塞
 */

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //获取前驱节点的状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
          //前置节点的状态,如果状态是SIGNAL(SIGNAL表示释放锁之后,立马会唤醒后继节点),我等着被唤醒
            return true;
        if (ws > 0) {
          //前驱节点取消状态,表明前驱节点已经超时或者中断,需要从同步队列中删除该前驱节点
            do {
                //循环向前查找,把取消的节点从队列中去除
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
           //将前取节点的状态设置不再是取消状态,将前驱节点设置为-1,准备好状态,当前线程不阻塞等待这被唤醒
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
 //挂起当前线程
 private final boolean parkAndCheckInterrupt() {
       /**
       *线程被唤醒的情况有两种,一是被unpark(),二是被interrupt(),
       *如果是第二种情况的话,需要返回被中断的标志,然后在acquire顶层方法自我中断补上
       */

        LockSupport.park(this);
        return Thread.interrupted();
    }

// psacquireQueued我有个疑问,这样循环之后节点状态不就成这样了 …->就序(-1)-->当前的挂起…… 

简单总结一下上面的流程

  • CAS自旋的方式判断传入的Node节点的前节点状态,如果是前节点是head节点,Node就尝试获取锁,获取成功的话就将Node节点设置为Head节点,以前的Head等于null,然后跳出自旋,final中不执行
  • 如果前节点不是head获取加锁失败,就调用shouldParkAfterFailedAcquire,函数会将前驱节点的waitStatus变为了SIGNAL=-1,调用 parkAndCheckInterrupt会挂起当前线程,Node等待前面节点唤醒。

看完上面的源码,我们可能会想到一下新的问题

  • shouldParkAfterFailedAcquire()中取消节点是什么时候生成的?
  • 线程被挂起的时候什么时候被通知()
cancelled节点生成

获取锁失败(什么时候失败?中断,获取锁超时?),则将此线程对应的node的waitStatus改为CANCEL,会在final里面将节点设置为取消状态,总结三种情况

  • 要取消的节点是尾节点
    • 当前节点如果是尾结点,从后往前设置第一个非取消状态设置为未节点
  • 要取消的节点是Head的后继节点
    • 唤醒当前节点的后继节点
  • 要取消的节点在链表的中间
    • 把当前节点的前驱节点的后继指针指向当前节点的后继节点
    private void cancelAcquire(Node node) {
        //节点为空的筛掉
        if (node == null)
            return;
      //当前节点设置不关联任何线程
        node.thread = null;
        Node pred = node.prev;
        while (pred.waitStatus > 0)
          //通过前驱节点跳过取消状态的Node,取消的是以前设置的取消状态
            node.prev = pred = pred.prev;
        //获取前驱节点的后继节点(我感觉是为了将当前节点设置为取消状态挂到这个节点上,不同意见小伙伴可以关注下公众号程序员fly,我们一起探讨下,期待跟大神一块     学习)
        Node predNext = pred.next;
        //当前节点设置为取消状态
        node.waitStatus = Node.CANCELLED;
        if (node == tail && compareAndSetTail(node, pred)) {
           //当前节点如果是尾结点,从后往前设置第一个非取消状态设置为未节点
           compareAndSetNext(pred, predNext, null);
        } else {
          /**既不是头也不是尾,当前节点在中间
          *  判断 当前节点的前驱节点是否为signal(唤醒状态)||不是尝试把前驱节点设置为唤醒状态
          *  两个添加满足其一,再&&当前节点的前驱节点是不是为Null
          *  将当前前驱节点的后继执行尝试指向当前线程的后继节点
          */

            int ws;
            if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
              //当前节点是head的后继节点,或者上面条件都不满足,唤醒当前节点的后继节点
                unparkSuccessor(node);
            }
            node.next = node; // help GC
        }
    }

如何解锁

AQS中释放锁的方法是release(),如果state=0(彻底释放)。就会唤醒等待队列中的其他线程来获取资源

   //独占模式下释放锁
   public void unlock() {
        sync.release(1);
    }
   public final boolean release(int arg) {
        if (tryRelease(arg)) {
          //锁没有被线程持有,获取头节点
            Node h = head;
            if (h != null && h.waitStatus != 0)
              //头节点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态 
              unparkSuccessor(h);
            return true;
        }
        return false;
    }
   //返回true说明该锁没有被任何线程持有
  protected final boolean tryRelease(int releases) {
            //减少可重入次数
         int c = getState() - releases;
        //当前线程不是持有持有锁的线程,抛出一次
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
             //持有线程全部释放,将当前独占锁所有线程设置为null,更新state 
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    //释放锁之后当前线程要做的就是唤醒CLH队列中第一个在等待资源的线程,也就是head结点后面的线程,此时调用的方法是unparkSuccessor(),
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        // 获取节点的下一个节点
        Node s = node.next;
       //下一个节点是null||下一个节点为cancelled,找到队列中最开始的非cancell节点
        if (s == null || s.waitStatus > 0) {
            s = null;
          //从尾部节点开始找,找到队列第一个waitStatus<0的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
      //当前节点的下个节点不为空,而且状态<=0,当前节点unpark
        if (s != null)
          //唤醒
            LockSupport.unpark(s.thread);
    }
总结
  • tryRelease方法会减去state对于的值,等于0的话会彻底释放资源,完全释放资源之后,会调用unparkSuccessor唤醒CLH队列中第一个等待资源的线程

公平锁VS非公平锁

   /**
   * ReentrantLock公平锁 
   */

   static final class FairSync extends Sync {
        final void lock() {
            acquire(1);
        }
   //AQS部分  
   public final void acquire(int arg) {
        if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
              //公平平锁在获取锁之前会先判断等待队列是否为空或者自己是否位于队列头部,该条件通过才能继续获取锁
                if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
  /**
   *ReentrantLock非公平锁
   */

   static final class NonfairSync extends Sync {
        final void lock() {
          //非公平锁在尝试获取锁时,不会调用hasQueuedPredecessors
            if (compareAndSetState(01))
              //获取锁成功
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

问题

我们尝试说下这几个问题的回答点

  • AQS核心思想是什么?如何实现的,以及底层的数据结构
    • state以及CLH队列
  • 线程获取锁失败,之后的处理流程是什么
    • 队列排队等候,线程继续等待,保留获取锁的可能,获取锁的流程仍然继续
  • 处于排队等候机制中的线程一直无法获取到锁,需要一直等待嘛,AQS还有用别的策略解决这一问题
    • 节点变为取消状态(finally执行的),节点从链表中摘掉
  • Lock函数通过Acquire方法进行加锁的时候,具体是如何进行加锁
    • AQS的Acquire会调用tryAcquire方法,tryAcquire由各个自定义同步器实现(公平非公平),通过tryAcquire完成加锁过程,

巨人肩膀

  • 文章主要参考自美团技术文章从ReentrantLock的实现看AQS的原理及应用,在此基础上做了增改

  • https://juejin.cn/post/7006895386103119908

  • https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html

  • https://juejin.cn/post/6896272531808845837#heading-14

闲谈

感觉有帮助的同学还请点赞关注,这将对我是很大的鼓励~,公众号有自己开始总结的一系列文章,需要的小伙伴还请关注下个人公众号程序员fly呀,干货多多,湿货也不少(∩_∩)。

扫码关注一下呗
扫码关注一下呗

小飞飞

2021/11/24  阅读:23  主题:科技蓝

作者介绍

小飞飞