因为ReentrantLock是互斥锁,如果有一个操作是读多写少,同时还需要保证线程安全。那么使用ReentrantLock的效率会比较低
因为多个线程对同一个数据进行读操作时,也不会造成线程安全问题。
读锁是共享,多个线程可以同时拿到读锁。
写锁是互斥的,A线程拿到写锁,其他线程拿不到,需要等待A释放锁
读读操作共享 写写操作互斥 读写操作互斥 写读操作互斥,单个线程获取写锁后再次获取读锁,可以拿到.(写读可重入) 单个线程获取读锁后,再次获取写锁,拿不到. (读写不可重入,先拿到读锁再拿写锁,不行,拿不到!)
static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
//写锁
static ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
//读锁
static ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
public static void main(String[] args) {
//写读可重入
writeLock.lock();
System.out.println("拿到写锁");
readLock.lock();
System.out.println("拿到读写");
}
读写不可重入,下面代码死锁了. ![[Pasted image 20231211223328.png]]
ReentrantReadWriteLock还是基于AQS实现的,很多功能的实现跟ReentrantLock类似. 还是根据AQS的state来确定当前线程是否拿到锁资源
**state 为int类型,二进制是32位
state表示读锁: 将state的高16位作为读锁标识
state表示写锁:将state的低16为作为写锁标识*
写锁重入:因为写操作和其他操作是互斥的,代表同一时间,只有一个线程可以持有写锁,只要锁重入,就对低位加1.而且锁重入的限制,从原来的2^31 - 1,变为了2^16 - 1.变短了,锁重入次数变少了.
读锁重入:读锁的重入,不能仿照写锁的方式,因为写锁是互斥锁,同一时间只有一个线程持有锁,但是读锁是共享锁,同一时间会有多个线程持有.所以每个获取到读锁的线程,记录锁重入的方式都是基于自己的ThreadLocal
存储锁重入次数.
读锁重入的时候就不操作state了吗?不对.每次锁重入还是要修改state,只是记录当前线程重入的次数,需要基于ThreadLocal记录
00000000 00000000 00000000 00000000 state 00000000 00000000 00000000 00000001 线程A获取写锁 -> 低十六位加1 00000000 00000000 00000000 00000010 线程A再次获取写锁 -> 低十六位再加1
读锁:获取不到,线程去排队,当前state状态 00000000 00000000 00000000 00000010 写锁全部释放,(唤醒刚才排队的线程);state状态 00000000 00000000 00000000 00000000
线程获取读锁;state状态 00000000 00000001 00000000 0000000 在int的高16位加1
如果A线程获取了读锁,B线程也获取了读锁,假如此时的state状态是3,那么就要根据ThreadLocal去判断到底是那个线程重入了两次
每个读操作的线程,在获取锁的时候都需要开辟一个ThradLocal,读写锁位了优化,做了两手操作
第一个拿到读锁的线程,不用ThradLocal记录重入次数,在读写锁内部有一个成员变量firstRead记录重入数;ThreadLocal 可以理解为一个map, key是线程,value是重入次数
private transient Thread firstReader; //第一个获取锁的线程
private transient int firstReaderHoldCount;//记录第一个线程重入的次数
还记录了最后一个拿到锁的线程的重入次数.
private transient HoldCounter cachedHoldCounter;
public final void acquire(int arg) {
//尝试获取锁资源,成功返回,不成功执行addWaiter()
if (!tryAcquire(arg) &&
//addWaiter 将当前没拿到thread封装位node.
//acquireQueued 当前排队的线程能否竞争锁资源,不能挂起线程阻塞
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter3
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
protected final boolean tryAcquire(int acquires) {
//获取到当前线程
Thread current = Thread.currentThread();
//获取到state
int c = getState();
//拿到写锁的低16位标识
int w = exclusiveCount(c);
//c != 0 ,要么有读操作拿着锁,要么有写操作拿着锁
if (c != 0) {
// w == 0 表示没有写锁,拿不到,返回
// w != 0,表示有写锁,后面接续判断占用写锁的是否是当前线程,如果不是返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//肯定是写锁,对低位+1看是否 重入次数大于最大值,抛异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//如果没有超过锁重入次数 + 1,返回true,拿到锁资源
setState(c + acquires);
return true;
}
//逻辑到这.说明c==0,读锁写锁都没有,这时候开始抢
//读写锁也分为公平锁和非公平锁
//公平:看下是否有排队的
//非公平:直接抢
if (writerShouldBlock() ||
//CAS将state从0改为1
!compareAndSetState(c, c + acquires))
return false;
//设置当前拥有独占访问权限的线程
//将当前持有互斥锁的线程改为自己.
setExclusiveOwnerThread(current);
return true;
}
//公平:看下是否有排队的方法
public final boolean hasQueuedPredecessors() {
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
state 高16 :读, 低16:写 00000000 00000000 00000000 00000000
00000000 00000001 00000000 00000000 = SHARED_UNIT
00000000 00000000 11111111 11111111 = MAX_COUNT
00000000 00000000 11111111 11111111 = EXCLUSIVE_MASK & 00000000 00000001 00000000 00000001
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
//只拿表示读锁的高16位
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
//只拿到表示写锁的低16位
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
读写锁的释放锁跟ReentrantLock一致,这是需要单独获取低16位,判断是否为0,为0释放成功
public final boolean release(int arg) {
//tryRelease是读写锁重新实现的方法
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease
protected final boolean tryRelease(int releases) {
//现在想释放锁的线程是否是持有锁的线程,不是抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//state - 1
int nextc = getState() - releases;
//拿nextc获取低16位的只,判断是否为0
boolean free = exclusiveCount(nextc) == 0;
//为0返回true
if (free)
//将持有互斥锁的线程信息置为null
setExclusiveOwnerThread(null);
//将减1之后的state赋值给state
setState(nextc);
return free;
}
public final void acquireShared(int arg) {
//tryAcquireShared 是尝试获取锁资源,获取到返回1,没获取到返回-1
if (tryAcquireShared(arg) < 0)
//前面没拿到锁,这边需要排队~
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
//获取当前线程
Thread current = Thread.currentThread();
//获取state
int c = getState();
//获取低16位的值(state,也就是写锁标识) ,不等于0,代表有写锁
if (exclusiveCount(c) != 0 &&
//如果持有写锁的不是当前线程,排队去
getExclusiveOwnerThread() != current)
return -1;
//没有写锁
//获取读锁信息
int r = sharedCount(c);
//公平锁:有人排队直接拜拜,返回true,没人排队,返回false
//非公平锁:正常逻辑是非公平直接抢,因为是读锁,每次抢占只要CAS成功,必然成功,这就会出现问题:写操作无法在读锁的情况下抢占资源,导致写线程饥饿,一致阻塞
//非公平锁会查看next是否是写锁的,如果是,返回true,如果不是返回false
if (!readerShouldBlock() &&
//查看读锁是否已经达到了最大限制,没有达到,返回true,往下走
r < MAX_COUNT &&
//CAS对state高16位加1
compareAndSetState(c, c + SHARED_UNIT)) {
///拿到锁资源成功
if (r == 0) {
//第一个拿到锁资源的线程,用first存储
firstReader = current;
//重入次数
firstReaderHoldCount = 1;
} else if (firstReader == current) {
//锁重入,就是第一个拿到读锁的线程,直接对fiestReaderHoldCount++记录重入次数
firstReaderHoldCount++;
} else {
//不是第一个拿到锁资源的
//先拿到cachedHoldCounter,最后一个线程的重入次数
HoldCounter rh = cachedHoldCounter;
//rh == null,我是第二个拿到读锁的
//或者,发现之前有最后一个来的,但是不是当前线程,将我设置为最后一个线程.
if (rh == null || rh.tid != getThreadId(current))
//获取自己的重入次数,并赋值给cacheHoldCounter.
cachedHoldCounter = rh = readHolds.get();
//之前拿过, 现在如果为0,赋值为ThreadLocal
else if (rh.count == 0)
readHolds.set(rh);
//重入次数 +1
//第一个:可能是第一次拿
//第二个:可能是重入操作
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
//拿state
int c = getState();
//现在有互斥锁,不是自己,拜拜
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
//公平:有排队,进入,没排队,过
//非公平:head的next是不是写锁,是拜拜,不是,过
} else if (readerShouldBlock()) {
// 这个逻辑里不会让你拿到锁,做阻塞前的准备
if (firstReader == current) {
//什么都不做
} else {
//获取最后一个拿到读锁资源的
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
//拿到我自己的纪录重入次数的
rh = readHolds.get();
//如果我是0,绝对不是重入擦欧总
if (rh.count == 0)
//将我ThreadLocal中的值移除掉
readHolds.remove();
}
}
//
if (rh.count == 0)
//返回-1,等待阻塞
return -1;
}
}
//超过读锁最大值,抛异常
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//CAS竞争锁资源
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
private void doAcquireShared(int arg) {
//将当前线程封装为node,当前node为共享锁,并添加到队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取上一个节点
final Node p = node.predecessor();
if (p == head) {
//如果我的上一个是head,尝试再次获取锁资源
int r = tryAcquireShared(arg);
if (r >= 0) {
//如果r > 0,代表获取锁成功
//如果AQS中
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
protected final boolean tryReleaseShared(int unused) {
//拿到当前线程
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
//拿到cache最后的线程
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
//不是!从ThreadLocal中拿
rh = readHolds.get();
//拿到当前线程条数
int count = rh.count;
//必然释放干净了
if (count <= 1) {
//干净就移除ThreadLocal中的条数
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
//到这,对state操作
for (;;) {
// 获取现在的state
int c = getState();
//减高16位的1
int nextc = c - SHARED_UNIT;
//以CAS赋值
if (compareAndSetState(c, nextc))
//减完之后,state如果为0,返回true,代表完全释放OK
return nextc == 0;
}
}