4、深入 ReentrantReadWriteLock
4.1 为什么要出现读写锁
synchronized 和 ReentrantLock 都是互斥锁。
如果一个操作是读多写少,且需要保证线程安全,使用上述两种互斥锁效率会很低。
在这种情况下,可以使用 ReentrantReadWriteLock 读写锁去实现。
读读之间是不互斥的,可以并发执行。
但是如果涉及到了写操作,那么必须是互斥的操作。
static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
static ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
static ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
readLock.lock();
try {
System.out.println("子线程!");
try {
Thread.sleep(500000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
readLock.unlock();
}
}).start();
Thread.sleep(1000);
writeLock.lock();
try {
System.out.println("主线程!");
} finally {
writeLock.unlock();
}
}
4.2 读写锁的实现原理
ReentrantReadWriteLock 还是基于 AQS 实现的,对 state 进行操作,拿到锁资源就去干活,如果没有拿到,依然去 AQS 队列中排队。
- 读锁操作:基于 state 的高 16 位进行操作。
- 写锁操作:基于 state 的低 16 位进行操作。
- 可重入性:ReentrantReadWriteLock 依然是可重入锁。
写锁重入:读写锁中的写锁的重入方式基本和 ReentrantLock 一致,没有什么区别,依然是对 state 进行 +1 操作即可,只要确认持有锁资源的线程是当前写锁线程即可。只不过之前 ReentrantLock 的重入次数是 state 的正数取值范围,但是读写锁中写锁范围就变小了。
读锁重入:因为读锁是共享锁。读锁在获取锁资源操作时,是要对 state 的高 16 位进行 +1 操作。因为读锁是共享锁,所以同一时间会有多个读线程持有读锁资源。这样一来,多个读操作在持有读锁时,无法确认每个线程读锁重入的次数。为了去记录读锁重入的次数,每个读操作的线程,都会有一个 ThreadLocal 记录锁重入的次数。
写锁的饥饿问题:读锁是共享锁,当有线程持有读锁资源时,再来一个线程想要获取读锁,直接对 state 修改即可。在读锁资源先被占用后,来了一个写锁资源,此时,大量的需要获取读锁的线程来请求锁资源,如果可以绕过写锁,直接拿资源,会造成写锁长时间无法获取到写锁资源。
读锁在拿到锁资源后,如果再有读线程需要获取读锁资源,需要去 AQS 队列排队。如果队列的前面需要写锁资源的线程,那么后续读线程是无法拿到锁资源的。持有读锁的线程,只会让写锁线程之前的读线程拿到锁资源。
4.3 写锁分析
4.3.1 写锁加锁流程概述
(此处省略流程图)
4.3.2 写锁加锁源码分析
写锁加锁流程
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
setState(c + acquires);
return true;
}
(writerShouldBlock() || !compareAndSetState(c, c + acquires))
;
setExclusiveOwnerThread(current);
;
}
4.3.3 写锁释放锁流程概述 & 释放锁源码
释放的流程和 ReentrantLock 一致,只是在判断释放是否干净时,判断低 16 位的值。
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
4.4 读锁分析
4.4.1 读锁加锁流程概述
- 分析读锁加速的基本流程
- 分析读锁的可重入锁实现以及优化
- 解决 ThreadLocal 内存泄漏问题
- 读锁获取锁自后,如果唤醒 AQS 中排队的读线程
4.4.1.1 基础读锁流程
针对上述简单逻辑的源码分析
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
return ;
}
fullTryAcquireShared(current);
}
{
Node h, s;
(h = head) != &&
(s = h.next) != &&
!s.isShared() &&
s.thread != ;
}
4.4.1.2 读锁重入流程
\=============重入操作
前面阐述过,读锁为了记录锁重入的次数,需要让每个读线程用 ThreadLocal 存储重入次数
ReentrantReadWriteLock 对读锁重入做了一些优化操作
\============记录重入次数的核心
ReentrantReadWriteLock 在内部对 ThreadLocal 做了封装,基于 HoldCount 的对象存储重入次数,在内部有个 count 属性记录,而且每个线程都是自己的 ThreadLocalHoldCounter,所以可以直接对内部的 count 进行++操作。
\=============第一个获取读锁资源的重入次数记录方式
第一个拿到读锁资源的线程,不需要通过 ThreadLocal 存储,内部提供了两个属性来记录第一个拿到读锁资源线程的信息
内部提供了 firstReader 记录第一个拿到读锁资源的线程,firstReaderHoldCount 记录 firstReader 的锁重入次数
\==============最后一个获取读锁资源的重入次数记录方式
最后一个拿到读锁资源的线程,也会缓存他的重入次数,这样++起来更方便
基于 cachedHoldCounter 缓存最后一个拿到锁资源现行的重入次数
\==============最后一个获取读锁资源的重入次数记录方式
重入次数的流程执行方式:
1、判断当前线程是否是第一个拿到读锁资源的:如果是,直接将 firstReader 以及 firstReaderHoldCount 设置为当前线程的信息
2、判断当前线程是否是 firstReader:如果是,直接对 firstReaderHoldCount++ 即可。
3、跟 firstReader 没关系了,先获取 cachedHoldCounter,判断是否是当前线程。
3.1、如果不是,获取当前线程的重入次数,将 cachedHoldCounter 设置为当前线程。
3.2、如果是,判断当前重入次数是否为 0,重新设置当前线程的锁从入信息到 readHolds(ThreadLocal)中,算是初始化操作,重入次数是 0
3.3、前面两者最后都做 count++
上述逻辑源码分析
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
}
else if (firstReader == current) {
firstReaderHoldCount++;
}
else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else (rh.count == )
readHolds.set(rh);
rh.count++;
}
;
}
fullTryAcquireShared(current);
}
4.4.1.3 读锁加锁的后续逻辑 fullTryAcquireShared
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
}
else if (readerShouldBlock()) {
if (firstReader == current) {
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
(rh.count == )
-;
}
}
(sharedCount(c) == MAX_COUNT)
();
(compareAndSetState(c, c + SHARED_UNIT)) {
(sharedCount(c) == ) {
firstReader = current;
firstReaderHoldCount = ;
} (firstReader == current) {
firstReaderHoldCount++;
} {
(rh == )
rh = cachedHoldCounter;
(rh == || rh.tid != getThreadId(current))
rh = readHolds.get();
(rh.count == )
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh;
}
;
}
}
}
4.4.1.4 读线程在 AQS 队列获取锁资源的后续操作
1、正常如果都是读线程来获取读锁资源,不需要使用到 AQS 队列的,直接 CAS 操作即可
2、如果写线程持有着写锁,这是读线程就需要进入到 AQS 队列排队,可能会有多个读线程在 AQS 中。
当写锁释放资源后,会唤醒 head 后面的读线程,当 head 后面的读线程拿到锁资源后,还需要查看 next 节点是否也是读线程在阻塞,如果是,直接唤醒
源码分析
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private {
head;
setHead(node);
(propagate > || h == || h.waitStatus < || (h = head) == || h.waitStatus < ) {
node.next;
(s == || s.isShared())
doReleaseShared();
}
}
4.4.2 读锁的释放锁流程
1、处理重入以及 state 的值
2、唤醒后续排队的 Node
源码分析
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
}
else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
(count <= ) {
readHolds.remove();
(count <= )
unmatchedUnlockException();
}
rh.count--;
}
(;;) {
getState();
c - SHARED_UNIT;
(compareAndSetState(c, nextc))
nextc == ;
}
}
{
(;;) {
head;
(h != && h != tail) {
h.waitStatus;
(ws == Node.SIGNAL) {
(!compareAndSetWaitStatus(h, Node.SIGNAL, ))
;
unparkSuccessor(h);
}
(ws == && !compareAndSetWaitStatus(h, , Node.PROPAGATE))
;
}
(h == head)
;
}
}