ReentrantLock源码(JDK1.8)
ReentrantLock源码学习(JDK1.8)
April 24, 2024
0 words
0 read

ReentrantLock介绍

ReentrantLock为互斥锁,跟synchronized一样。

如果竞争比较激烈推荐使用ReentrantLock锁。如果几乎没有竞争使用synchronized。 synchronized锁有锁升级,当升级到重量级锁无法降级,重量级锁会设置到线程挂起,所以效率低。

Lock锁是公平 + 非公平锁,synchronized是非公平锁 Lock锁还提供了 reentrantLock.lockInterruptibly();允许线程在获取锁的期间被中断 Lock锁基于AQS和CAS实现,synchronized基于对象实现

ReentrantLock使用方式:

public static void main(String[] args) {  
    ReentrantLock reentrantLock = new ReentrantLock();  
    reentrantLock.lock();  
    try {  
        System.out.println("进入锁");// do something  
    } finally {  
        reentrantLock.unlock();  
    }  
}

ReentrantLock的lock()方法源码(JDK1.8)

公平&非公平

//公平锁的lock方法
final void lock() {
    //直接调用acquire
    acquire(1);
}
//非公平锁的lock方法
final void lock() {
    //非公平锁尝试将State从0变为1,如果成功代表获取锁资源,如果没成功调用acquire
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

State是什么? AQS#private volatile int state; volatile修饰的int类型变量,多个线程会通过CAS修饰这个变量,在并发情况下只会有一个线程修改成功

如果修改state失败会怎样?如果线程没有拿到锁资源,会到AQS的双向链表中排队等待(此期间,线程可能会挂起)

AQS的双向链表是什么? 基于内部类Node实现,Node中包含上一个、下一个、线程;Node类型的头和尾在AQS中维护

public abstract class AbstractQueuedSynchronizer  
    extends AbstractOwnableSynchronizer  
    implements java.io.Serializable {
    
    private transient volatile Node head; 头
    private transient volatile Node tail; 尾
    
    static final class Node {
    //前一个
    volatile Node prev;
    //下一个
    volatile Node next;
    //当前线程(没拿到锁的线程信息)
    volatile Thread thread;
    }
}

AQS和Node图示:

acquire方法

//AQS提供的业务方法,会调用具体处理方法
public final void acquire(int arg) {
    //调用tryAcquire方法:尝试获取锁资源(公平、非公平),拿到锁资源,返回true,结束,没有拿到锁资源,需要执行&&后的逻辑
    if (!tryAcquire(arg) &&
        //当没有获取锁资源,会先调用addWaiter(),会将没有获取锁资源的线程封装为Node对象,并且插入到AQS队列的末尾作为tail.
        //继续调用acquireQueued方法,查看当前排队的Node是否在队列的前面,如果在前面(head的next),尝试获取锁资源,如果没在前面尝试挂起线程,阻塞
        //Node.EXCLUSIVE 表示互斥锁
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire方法

两种实现:公平和非公平

  1. 如果state为0,尝试获取锁资源

  2. 如果state不为0,看一下是否是锁重入

- 非公平锁实现

final boolean nonfairTryAcquire(int acquires) {  
    //拿到当前线程
    final Thread current = Thread.currentThread();  
    //获取到核心变量state
    int c = getState();
    //如果==0,表示没有线程占用当前资源
    if (c == 0) {  
        //没人占用锁资源,直接抢一波锁,不管有没有线程排队
        if (compareAndSetState(0, acquires)) {
            //将当前占用这个互斥锁的线程属性设置为当前线程  
            setExclusiveOwnerThread(current);  
            //返回true,拿锁成功
            return true;  
        }  
    }  
    //当前state不等于0,说明有线程占用锁资源
    //判断拿着锁的线程是否是当前线程(锁重入)
    else if (current == getExclusiveOwnerThread()) {
        // 将state再次加1
        int nextc = c + acquires;  
        //锁重入是否超过最大值
        if (nextc < 0)
        //01111111 11111111 11111111 11111111  int类型正数最大值
        //+ 1
        //10000000 00000000 00000000 00000000   高位是符号位 加1成为复数的话代表超过最大值所以抛出异常
            throw new Error("Maximum lock count exceeded");  
        //没超过,则设置为state
        setState(nextc);  
        //拿到锁,返回true
        return true;  
    }  
    //否则返回false
    return false;  
}


- 公平锁实现

protected final boolean tryAcquire(int acquires) {  
//拿到当前线程
    final Thread current = Thread.currentThread();  
    //获取到核心变量state
    int c = getState();  
     //如果==0,表示没有线程占用当前资源
    if (c == 0) {  
        //判断是否有线程排队,如果有线程排队,返回true,配上前面的!,那会直接不执行返回最外层的false
        if (!hasQueuedPredecessors() &&
            //如果没有线程排队,直接CAS尝试获取锁资源  
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);  
            return true;  
        }  
    }  
    else if (current == getExclusiveOwnerThread()) {  
        int nextc = c + acquires;  
        if (nextc < 0)  
            throw new Error("Maximum lock count exceeded");  
        setState(nextc);  
        return true;  
    }  
    return false;  
}


判断是否排队的代码

public final boolean hasQueuedPredecessors() {  
    //获取尾
    Node t = tail;
    //获取头
    Node h = head;  
    Node s;  
    //头不等于尾
    return h != t &&  
        //头节点下一个是null      线程不是当前线程
        ((s = h.next) == null || s.thread != Thread.currentThread());  
}

addWaiter方法

在获取锁失败,需要将当前线程封装为Node对象,并且插入到AQS的末尾

private Node addWaiter(Node mode) {  
    //将当前线程封装为Node对象,mode为null,代表互斥锁
    Node node = new Node(Thread.currentThread(), mode);  
    // pred是tail节点
    Node pred = tail;  
    //如果pred不为null,有线程正在排队
    if (pred != null) {  
        // 新封装的node的上一个指向尾节点
        node.prev = pred;  
        //CAS交换当前节点变为tail节点
        if (compareAndSetTail(pred, node)) {
            //之前的tail的next只想当前节点  
            pred.next = node;  
            return node;  
        }  
    }  
    //添加的流程为:自己prev指向    tail指向自己   前节点next指向我
    //整个方法只执行一次,如果上述CAS失败了,就基于enq()方法添加到AQS队列
    enq(node);  
    return node;  
}

enq方法

private Node enq(final Node node) {  
    //死循环,一直放。直到成功
    for (;;) {  
        Node t = tail;  
        //如果tail为null,说明当前没有Node在队列
        if (t == null) {
            //创建一个新的node作为头节点,并将tail和head执行一个Node
            if (compareAndSetHead(new Node()))
                tail = head;  
        } else {  
            //和addWaiter中代码一致
            node.prev = t;  
            if (compareAndSetTail(t, node)) {
                t.next = node;  
                return t;  
            }  
        }  
    }  
}


PS:没有排队的话AQS队列的情况是这样的: head == tail == null

acquireQueued源码

acquireQueued方法会查看当前排队的Node是否是head的next,如果是就尝试获取锁资源,如果不是那么就尝试将当前Node的线程挂起(unsafe.[ark()])

// arg = 1
final boolean acquireQueued(final Node node, int arg) {  
    //声明一个变量,标识,默认是true,代表获取锁资源失败
    boolean failed = true;  
    try {  
        boolean interrupted = false;  
        for (;;) {
            // predecessor();获取当前节点的上一个节点,如果上一个节点为null,抛出异常
            final Node p = node.predecessor();  
            if (p == head &&  //说明当前节点是head的next
                //tryAcquire方法竞争锁,成功:true,失败:false
                tryAcquire(arg)) {  
                //进来说明拿到锁资源成功
                //当前节点置为head,thread和前一个节点置为null
                setHead(node);  
                p.next = null; // help GC  指向null,可达性分析找不到,会被回收
                //设置获取锁资源标志成功
                failed = false;  
                return interrupted;  
            }  
            //获取锁资源失败,走下面,尝试将锁挂起
            //第一:当前节点的上一个节点的状态正常
            //第二:挂起线程
            if (shouldParkAfterFailedAcquire(p, node) &&  
                parkAndCheckInterrupt())  
                interrupted = true;  
        }  
    } finally {
        if (failed)
            //当前节点状态改为CANCELLED
            cancelAcquire(node);  
    }  
}


shouldParkAfterFailedAcquire源码

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {  
    //拿到上一个节点的状态
    int ws = pred.waitStatus;
    //如果上一个节点为-1
    if (ws == Node.SIGNAL)
        //返回true,挂起线程  
         return true;  
    //如果上一个节点是取消状态    
    if (ws > 0) {  
            //循环往前找,找到一个专题柜台小于等于0的节点
           do {  
            node.prev = pred = pred.prev;  
        } while (pred.waitStatus > 0);  
        pred.next = node;  
    } else {  
        //将小于等于0的节点状态改为-1
       compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  
    }  
    return false;  
}


ReentrantLock的unlock方法

释放锁资源

  • 将state减1

  • 如果state减为0了,唤醒在队列中排队的node(一定唤醒离head最近的)

release释放锁方法

释放锁不分公平或者非公平

public final boolean release(int arg) {  
    //核心释放锁源码
    if (tryRelease(arg)) {  
        //代表释放锁资源释放干净了。
        Node h = head;  //拿到头节点
        //头节点不为null,并且头节点状态不为0,唤醒排队的线程
        if (h != null && h.waitStatus != 0)  
            //唤醒节点的后续节点(不为null的情况下)
            unparkSuccessor(h);  
        return true;  
    }  
    //
    return false;  
}


tryRelease 源码-> 核心释放锁源码

protected final boolean tryRelease(int releases) {  
    //state - 1
    int c = getState() - releases;  
    //判断当前线程是否是占用锁的线程?
    if (Thread.currentThread() != getExclusiveOwnerThread())  
        throw new IllegalMonitorStateException();  
    //是否成功将锁资源释放完毕   state == 0    
    boolean free = false;  
    //锁释放干净
    if (c == 0) {  
        free = true;
        //将占用锁资源的属性设置为null  
        setExclusiveOwnerThread(null);  
    }  
    //将state置为0
    setState(c);
    //返回true,代表锁释放干净了
    return free;  
}

private void unparkSuccessor(Node node) {  
    //拿到头节点状态
    int ws = node.waitStatus;
    //如果头节点不为0,换为0
    if (ws < 0)  
        compareAndSetWaitStatus(node, ws, 0);  
      //
     Node s = node.next;  
     //如果头节点为null或者大于0
    if (s == null || s.waitStatus > 0) {  
        //next节点不需要唤醒,需要唤醒next的next节点
        s = null;  
        //从尾部往前找,找到状态正确的节点(小于等于0代表正常状态)
        for (Node t = tail; t != null && t != node; t = t.prev)  
            if (t.waitStatus <= 0)  
                s = t;  
    }  
    //如果拿到状态正常的节点,并且不为null,唤醒线程
    if (s != null)  
        LockSupport.unpark(s.thread);  
}


节点状态是1表示CANCELLED,是不正常的,节点状态小于等于0是正常的

unpark 唤醒线程

public static void unpark(Thread thread) {  
    if (thread != null)  
        UNSAFE.unpark(thread);  
}

为什么唤醒线程的时候要从尾部往前找?不是从前往后找?

从前往后找有可能会出现前一个节点的next指针还没有指向后一个节点,有可能会丢失节点,导致这个节点不会被唤醒。 addWaiter中是先将当前Node指针指向前面的节点然后将tail赋值给当前Node.最后才是将上一个节点的next指针指向Node

More Articles
See All