ReentrantReadWriteLock读写锁源码
ReentrantReadWriteLock读写锁源码学习
April 24, 2024
0 words
0 read

一 为什么出现读写锁

因为ReentrantLock是互斥锁,如果有一个操作是读多写少,同时还需要保证线程安全。那么使用ReentrantLock的效率会比较低

因为多个线程对同一个数据进行读操作时,也不会造成线程安全问题。

读锁是共享,多个线程可以同时拿到读锁。

写锁是互斥的,A线程拿到写锁,其他线程拿不到,需要等待A释放锁

读读操作共享 写写操作互斥 读写操作互斥 写读操作互斥,单个线程获取写锁后再次获取读锁,可以拿到.(写读可重入) 单个线程获取读锁后,再次获取写锁,拿不到. (读写不可重入,先拿到读锁再拿写锁,不行,拿不到!)

使用方式:

static ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 
//写锁  
static ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();  
//读锁  
static ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
public static void main(String[] args) {  
    //写读可重入  
    writeLock.lock();  
    System.out.println("拿到写锁");  
    readLock.lock();  
    System.out.println("拿到读写");
}

读写不可重入,下面代码死锁了. ![[Pasted image 20231211223328.png]]

读写锁的核心思想

ReentrantReadWriteLock还是基于AQS实现的,很多功能的实现跟ReentrantLock类似. 还是根据AQS的state来确定当前线程是否拿到锁资源

**state 为int类型,二进制是32位

state表示读锁: 将state的高16位作为读锁标识

state表示写锁:将state的低16为作为写锁标识*

锁重入问题

  • 写锁重入:因为写操作和其他操作是互斥的,代表同一时间,只有一个线程可以持有写锁,只要锁重入,就对低位加1.而且锁重入的限制,从原来的2^31 - 1,变为了2^16 - 1.变短了,锁重入次数变少了.

  • 读锁重入:读锁的重入,不能仿照写锁的方式,因为写锁是互斥锁,同一时间只有一个线程持有锁,但是读锁是共享锁,同一时间会有多个线程持有.所以每个获取到读锁的线程,记录锁重入的方式都是基于自己的ThreadLocal存储锁重入次数.

  • 读锁重入的时候就不操作state了吗?不对.每次锁重入还是要修改state,只是记录当前线程重入的次数,需要基于ThreadLocal记录

00000000 00000000 00000000 00000000 state 00000000 00000000 00000000 00000001 线程A获取写锁 -> 低十六位加1 00000000 00000000 00000000 00000010 线程A再次获取写锁 -> 低十六位再加1

读锁:获取不到,线程去排队,当前state状态 00000000 00000000 00000000 00000010 写锁全部释放,(唤醒刚才排队的线程);state状态 00000000 00000000 00000000 00000000

线程获取读锁;state状态 00000000 00000001 00000000 0000000 在int的高16位加1

如果A线程获取了读锁,B线程也获取了读锁,假如此时的state状态是3,那么就要根据ThreadLocal去判断到底是那个线程重入了两次

每个读操作的线程,在获取锁的时候都需要开辟一个ThradLocal,读写锁位了优化,做了两手操作

  • 第一个拿到读锁的线程,不用ThradLocal记录重入次数,在读写锁内部有一个成员变量firstRead记录重入数;ThreadLocal 可以理解为一个map, key是线程,value是重入次数

private transient Thread firstReader;   //第一个获取锁的线程
private transient int firstReaderHoldCount;//记录第一个线程重入的次数

  • 还记录了最后一个拿到锁的线程的重入次数.

  • private transient HoldCounter cachedHoldCounter;

写锁的操作

写锁加锁还是走的acquire方法

public final void acquire(int arg) {  
    //尝试获取锁资源,成功返回,不成功执行addWaiter()
    if (!tryAcquire(arg) &&  
        //addWaiter 将当前没拿到thread封装位node.
        //acquireQueued 当前排队的线程能否竞争锁资源,不能挂起线程阻塞
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  
        selfInterrupt();  
}

addWaiter3

private Node addWaiter(Node mode) {  
    Node node = new Node(Thread.currentThread(), mode);  
    // Try the fast path of enq; backup to full enq on failure  
    Node pred = tail;  
    if (pred != null) {  
        node.prev = pred;  
        if (compareAndSetTail(pred, node)) {  
            pred.next = node;  
            return node;  
        }  
    }  
    enq(node);  
    return node;  
}

acquireQueued

final boolean acquireQueued(final Node node, int arg) {  
    boolean failed = true;  
    try {  
        boolean interrupted = false;  
        for (;;) {  
            final Node p = node.predecessor();  
            if (p == head && tryAcquire(arg)) {  
                setHead(node);  
                p.next = null; // help GC  
                failed = false;  
                return interrupted;  
            }  
            if (shouldParkAfterFailedAcquire(p, node) &&  
                parkAndCheckInterrupt())  
                interrupted = true;  
        }  
    } finally {  
        if (failed)  
            cancelAcquire(node);  
    }  
}

tryAcquire

protected final boolean tryAcquire(int acquires) {  
    //获取到当前线程
    Thread current = Thread.currentThread();
    //获取到state  
    int c = getState();
    //拿到写锁的低16位标识
    int w = exclusiveCount(c);  
    //c != 0 ,要么有读操作拿着锁,要么有写操作拿着锁
    if (c != 0) {  
        // w == 0 表示没有写锁,拿不到,返回
        // w != 0,表示有写锁,后面接续判断占用写锁的是否是当前线程,如果不是返回false
        if (w == 0 || current != getExclusiveOwnerThread())  
            return false;  
        //肯定是写锁,对低位+1看是否 重入次数大于最大值,抛异常
        if (w + exclusiveCount(acquires) > MAX_COUNT)  
            throw new Error("Maximum lock count exceeded");  
        //如果没有超过锁重入次数  + 1,返回true,拿到锁资源
        setState(c + acquires);  
        return true;  
    }  
    //逻辑到这.说明c==0,读锁写锁都没有,这时候开始抢
    //读写锁也分为公平锁和非公平锁
    //公平:看下是否有排队的
    //非公平:直接抢
    if (writerShouldBlock() ||  
        //CAS将state从0改为1
        !compareAndSetState(c, c + acquires))  
        return false;  
    //设置当前拥有独占访问权限的线程
    //将当前持有互斥锁的线程改为自己.
    setExclusiveOwnerThread(current);  
    return true;  
}

//公平:看下是否有排队的方法
public final boolean hasQueuedPredecessors() {  
    Node h = head;  
    Node s;  
    return h != t &&  
        ((s = h.next) == null || s.thread != Thread.currentThread());  
}

前置知识

state 高16 :读, 低16:写 00000000 00000000 00000000 00000000

00000000 00000001 00000000 00000000 = SHARED_UNIT

00000000 00000000 11111111 11111111 = MAX_COUNT

00000000 00000000 11111111 11111111 = EXCLUSIVE_MASK & 00000000 00000001 00000000 00000001

static final int SHARED_SHIFT   = 16;  
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);  
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;  
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;  
  
//只拿表示读锁的高16位
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }  
//只拿到表示写锁的低16位
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

释放锁

读写锁的释放锁跟ReentrantLock一致,这是需要单独获取低16位,判断是否为0,为0释放成功

public final boolean release(int arg) {  
    //tryRelease是读写锁重新实现的方法
    if (tryRelease(arg)) {  
        Node h = head;  
        if (h != null && h.waitStatus != 0)  
            unparkSuccessor(h);  
        return true;  
    }  
    return false;  
}

tryRelease

protected final boolean tryRelease(int releases) {  
    //现在想释放锁的线程是否是持有锁的线程,不是抛出异常
    if (!isHeldExclusively())  
        throw new IllegalMonitorStateException();  
    //state - 1
    int nextc = getState() - releases;  
    //拿nextc获取低16位的只,判断是否为0
    boolean free = exclusiveCount(nextc) == 0;  
    //为0返回true
    if (free)  
        //将持有互斥锁的线程信息置为null
        setExclusiveOwnerThread(null);
     //将减1之后的state赋值给state    
    setState(nextc);  
    return free;  
}

读锁的操作

读写锁的读锁加锁操作

public final void acquireShared(int arg) {  
    //tryAcquireShared 是尝试获取锁资源,获取到返回1,没获取到返回-1
    if (tryAcquireShared(arg) < 0)  
        //前面没拿到锁,这边需要排队~
        doAcquireShared(arg);  
}

tryAcquireShared


protected final int tryAcquireShared(int unused) {  
    //获取当前线程
    Thread current = Thread.currentThread();  
    //获取state
    int c = getState();
    //获取低16位的值(state,也就是写锁标识) ,不等于0,代表有写锁
    if (exclusiveCount(c) != 0 &&  
        //如果持有写锁的不是当前线程,排队去
        getExclusiveOwnerThread() != current)  
        return -1;  
     //没有写锁
     //获取读锁信息
    int r = sharedCount(c);
    //公平锁:有人排队直接拜拜,返回true,没人排队,返回false
    //非公平锁:正常逻辑是非公平直接抢,因为是读锁,每次抢占只要CAS成功,必然成功,这就会出现问题:写操作无法在读锁的情况下抢占资源,导致写线程饥饿,一致阻塞
    //非公平锁会查看next是否是写锁的,如果是,返回true,如果不是返回false
    if (!readerShouldBlock() &&  
        //查看读锁是否已经达到了最大限制,没有达到,返回true,往下走
        r < MAX_COUNT &&  
        //CAS对state高16位加1
        compareAndSetState(c, c + SHARED_UNIT)) {  
        ///拿到锁资源成功
        if (r == 0) {  
            //第一个拿到锁资源的线程,用first存储
            firstReader = current;  
            //重入次数
            firstReaderHoldCount = 1;  
        } else if (firstReader == current) {
            //锁重入,就是第一个拿到读锁的线程,直接对fiestReaderHoldCount++记录重入次数
            firstReaderHoldCount++;  
        } else {  
            //不是第一个拿到锁资源的
            //先拿到cachedHoldCounter,最后一个线程的重入次数
            HoldCounter rh = cachedHoldCounter;  
            //rh == null,我是第二个拿到读锁的
            //或者,发现之前有最后一个来的,但是不是当前线程,将我设置为最后一个线程.
            if (rh == null || rh.tid != getThreadId(current))  
                //获取自己的重入次数,并赋值给cacheHoldCounter.
                cachedHoldCounter = rh = readHolds.get();  
            //之前拿过, 现在如果为0,赋值为ThreadLocal
            else if (rh.count == 0)  
                readHolds.set(rh);  
                //重入次数 +1
                //第一个:可能是第一次拿
                //第二个:可能是重入操作
            rh.count++;  
        }  
        return 1;  
    }  
    return fullTryAcquireShared(current);  
}

final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
            //拿state
                int c = getState();
                //现在有互斥锁,不是自己,拜拜
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                   //公平:有排队,进入,没排队,过
                   //非公平:head的next是不是写锁,是拜拜,不是,过
                } else if (readerShouldBlock()) {
                    // 这个逻辑里不会让你拿到锁,做阻塞前的准备
                    if (firstReader == current) {
                      //什么都不做  
                    } else {
                        //获取最后一个拿到读锁资源的
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            
                            if (rh == null || rh.tid != getThreadId(current)) {
                                //拿到我自己的纪录重入次数的
                                rh = readHolds.get();
                                //如果我是0,绝对不是重入擦欧总
                                if (rh.count == 0)
                                    //将我ThreadLocal中的值移除掉
                                    readHolds.remove();
                            }
                        }
                        //
                        if (rh.count == 0)
                            //返回-1,等待阻塞
                            return -1;
                    }
                }
                //超过读锁最大值,抛异常
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                //CAS竞争锁资源
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

doAcquireShared 没拿到锁,准备挂起,扔到队列

private void doAcquireShared(int arg) {  
    //将当前线程封装为node,当前node为共享锁,并添加到队列
    final Node node = addWaiter(Node.SHARED);  
    boolean failed = true;  
    try {  
        boolean interrupted = false;  
        for (;;) {
            //获取上一个节点  
            final Node p = node.predecessor();  
            if (p == head) {  
                //如果我的上一个是head,尝试再次获取锁资源
                int r = tryAcquireShared(arg);  
                if (r >= 0) {  
                    //如果r > 0,代表获取锁成功
                    //如果AQS中
                    setHeadAndPropagate(node, r);  
                    p.next = null; // help GC  
                    if (interrupted)  
                        selfInterrupt();  
                    failed = false;  
                    return;  
                }  
            }  
            if (shouldParkAfterFailedAcquire(p, node) &&  
                parkAndCheckInterrupt())  
                interrupted = true;  
        }  
    } finally {  
        if (failed)  
            cancelAcquire(node);  
    }  
}

读锁释放锁

protected final boolean tryReleaseShared(int unused) {  
    //拿到当前线程
    Thread current = Thread.currentThread();  
    if (firstReader == current) {  
        // assert firstReaderHoldCount > 0;  
        if (firstReaderHoldCount == 1)  
            firstReader = null;  
        else  
            firstReaderHoldCount--;  
    } else {  
        //拿到cache最后的线程
        HoldCounter rh = cachedHoldCounter;  
        if (rh == null || rh.tid != getThreadId(current))  
            //不是!从ThreadLocal中拿
            rh = readHolds.get();  
            //拿到当前线程条数
        int count = rh.count;
        //必然释放干净了  
        if (count <= 1) {  
            //干净就移除ThreadLocal中的条数
            readHolds.remove();  
            if (count <= 0)  
                throw unmatchedUnlockException();  
        }  
        --rh.count;  
    }  
    //到这,对state操作
    for (;;) {
        // 获取现在的state
        int c = getState();
        //减高16位的1  
        int nextc = c - SHARED_UNIT;
        //以CAS赋值  
        if (compareAndSetState(c, nextc))  
            //减完之后,state如果为0,返回true,代表完全释放OK          
            return nextc == 0;  
    }  
}

More Articles
See All