ReentrantLock 中的 AQS
2025年1月12日大约 6 分钟
ReentrantLock 中的 AQS
在很多八股文中都是把 ReentrantLock 和 Synchronized 对比着讲解,但是大多八股文都是只讲解到它俩特点上的区别,显得比较空,所以我整理了一些关于 ReentrantLock 代码层面的东西,希望对你有所帮助。
使用样例
先给大家写一个简关于 ReentrantLock 的使用代码案例,先大致看一下 ReentrantLock 的流程后再往下深入了解。
public class ProducerConsumerExample {
    private static final int MAX_SIZE = 5;
    private final Queue<Integer> queue = new LinkedList<>();
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
    // 生产者线程类
    private class Producer implements Runnable {
        @Override
        public void run() {
            int value = 0;
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == MAX_SIZE) {
                        System.out.println("队列已满,生产者等待...");
                        // await()会释放当前线程获得的锁资源
                        notFull.await();
                    }
                    value++;
                    queue.add(value);
                    System.out.println("生产者生产: " + value);
                    notEmpty.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }
    // 消费者线程类
    private class Consumer implements Runnable {
        @Override
        public void run() {
            while (true) {
                lock.lock();
                try {
                    while (queue.isEmpty()) {
                        System.out.println("队列为空,消费者等待...");
                        notEmpty.await();
                    }
                    int value = queue.poll();
                    System.out.println("消费者消费: " + value);
                    notFull.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}ReentrantLock 的挂起逻辑
在我的Synchronized讲解中提到了Synchronized的挂起逻辑,其实对比着看,就会发现 ReentrantLock 的挂起逻辑也和它差不多。WaitSet 就相当于 Condition 单向链表,EntryList 就相当于 AQS 队列。先来介绍一下ReentrantLock的相关的数据结构吧!
ReentrantLock 的结构
ReentrantLock 是基于 AQS 实现的,在 AQS 中有一个双向链表(结点为 Node 对象),还有一个 Condition 的单向链表。其是如下定义的:
// extends 和 implements 省略
public abstract class AbstractQueuedSynchronizer{
    static final class Node {
        volatile Node prev;
        volatile Node next;
        // 注意结点状态,下面会用
        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;
        volatile Thread thread;
    }
    // 用这俩构成个双向链表
    private transient volatile Node head;
    private transient volatile Node tail;
    // AQS中内部类
    public class ConditionObject implements Condition {
        // 与上面是同一个Node
        // 虽然定义了前驱节点,但是并没有前序遍历过,所以还是个单向的
        private transient Node firstWaiter;
        private transient Node lastWaiter;
    }
}上面介绍的 AQS 双向链表是用于存放没有获得锁资源的线程,而 Condition 单向链表存放的是等待被唤醒的线程。下面是相关代码:
- 首先看看挂起读逻辑
 
    public class ConditionObject implements Condition {
        public final void await() throws InterruptedException {
            // 如果线程被中断了
            if (Thread.interrupted())
                throw new InterruptedException();
            // 将当前线程封装成Node,加入到Condition单向链表的末尾
            Node node = addConditionWaiter();
            // 释放当前线程的锁资源!!!! 返回线程重入次数
            int savedState = fullyRelease(node);
            // int interruptMode = 0;
            // 如果这个线程不在AQS双向链表中,因为可能会出现并发问题,所以检查一下
            while (!isOnSyncQueue(node)) {
                // 挂起
                LockSupport.park(this); // 阻塞在这里等待被唤醒
                // 三种情况
                // 1. 被signal() 唤醒 interruptMode == 0
                // 2. 被interrupt() 唤醒 修改Node状态,并放入到AQS双向链表
                // 3. 被signal() 唤醒后,又 interrupt()了 确保当前node放入AQS双向链表,再执行后面逻辑
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // acquireQueued(node, savedState) 尝试获取锁
            // 省略
        }
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else // Condition单向链表加到末尾
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
    }- 下面时唤醒的逻辑
 
public class ConditionObject implements Condition {
    // 唤醒操作
    public final void signal() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            // 从头节点开始唤醒
            doSignal(first);
    }
    private void doSignal(Node first) {
        do {
            // 将头节点移出Condition单向链表
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
            // 往下看
        } while (!transferForSignal(first) &&(first = firstWaiter) != null);
    }
    final boolean transferForSignal(Node node) {
        // 将Node结点状态由-2变为0
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        // 将该node节点放入AQS的双向链表中,(插到尾部)
        Node p = enq(node); // p是插入node后,node的prev
        // 这里主要是为了确保自己的前一个node可以唤醒我
        int ws = p.waitStatus; // ws是p的状态,就是Node的哪几个常量
        // 如果p的状态是1(被取消,无效结点)
        // 或者p不是无效结点的话,将p的状态设置为SIGNAL(-1)如果改失败了才执行if的逻辑
        //      就是提醒p:记得自己拿到锁后,把我(node)唤醒。
        // 就是增强代码健壮性,防止node无人唤醒
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            // 自己唤醒自己
            LockSupport.unpark(node.thread);
        return true;
    }
}可重入锁加锁流程
ReentrantLock.lock() -> NonfairSync.lock() -> AQS.acquire(1) -> NonfairSync.tryAcquire(1) -> Sync.nonfairTryAcquire(1)
    // AQS中
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && // 获取锁失败进入下面的
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 加入阻塞队列中(双向链表)
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 第一个节点入队的操作
        enq(node);
        return node;
    }    // Sync中
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) { // 锁没被占用
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } // 锁被占用,判断获得锁的线程是不是当前线程
        else if (current == getExclusiveOwnerThread()) {
            //  nextc = c + 1
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }可重入锁的解锁流程
ReentrantLock.unlock() -> AQS.release(1) -> Sync.tryRelease(1)
    // AbstractQueuedSynchronizer 的 release 方法
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // 唤醒队列中的一个节点(线程)头节点的next
                // 如果第一个唤醒失败,则从尾节点往前遍历依此唤醒
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    protected final boolean tryRelease(int releases) {
        // 进行 -1 操作
        int c = getState() - releases;
        // 判断当前线程是否是获得这个锁的线程
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }轻松一下
- 非公平锁中的
lock()和trylock()方法有什么区别? 
    public void lock() {
        // 未获取到锁,被放到AQS队列中
        sync.lock();
    }
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 获取到返回true
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 锁重入
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        // 获取不到不阻塞,直接返回false
        return false;
    }- 获取锁的方法为什么要加过期时间?
 
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }- 防止死锁
 - 防止线程一直尝试获取锁
 - 对于具有严格时间限制的操作,其可允许线程在无法及时获取锁时(返回 false 时)继续执行替代操作。
 
- 如何实现独占锁和共享锁? 通过重写 AQS 中的
tryAcquire()和tryRelease()方法实现独占锁,重写tryAcquireShared()和tryReleaseShared()方法实现共享锁。