Java 线程出现异常,进程为啥不会退出?
因为 Java 是采用线程独立模型,各个线程之间互相独立,有各自的上下文,当一个线程出现错误的时候,只会影响到这个线程自己本身,不会影响到其它的线程,更不会导致程序退出。
不过我们这里介绍的异常更多是 Exception,如果是 Error 级别的,通常意味着硬件层面不够,才有可能会导致退出。
此外 Exception 我们是可以通过捕获的,捕获了的话也不会导致线程直接死掉。
总结了 Java 并发编程中的核心问题。首先解释了线程异常不影响进程退出的原因及 isAlive() 方法的陷阱。接着讨论了 int 赋值与对象创建的原子性问题,以及 JMM 中 happens-before 和 as-if-serial 的区别。对比了 ForkJoinPool 与 ThreadPoolExecutor 的工作模式及适用场景,并详细解析了 CountDownLatch、CyclicBarrier 和 Semaphore 的同步机制。深入分析了 CompletableFuture 相对于 FutureTask 的优势及其底层实现。最后阐述了 CAS 的硬件原子性保证、读写锁原理以及 AQS 框架的核心设计思想,包括同步队列与条件队列的实现细节。
因为 Java 是采用线程独立模型,各个线程之间互相独立,有各自的上下文,当一个线程出现错误的时候,只会影响到这个线程自己本身,不会影响到其它的线程,更不会导致程序退出。
不过我们这里介绍的异常更多是 Exception,如果是 Error 级别的,通常意味着硬件层面不够,才有可能会导致退出。
此外 Exception 我们是可以通过捕获的,捕获了的话也不会导致线程直接死掉。
通过 isAlive() 方法:
public class Main {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
System.out.println("t1 begin");
try { Thread.sleep(1000); }
catch (InterruptedException e) { }
System.out.println("t1 end");
});
t1.start();
System.out.println("t1.isAlive()=" + t1.isAlive());
t1.join(); // main 会在这里阻塞等到 t1 执行完
System.out.println("t1.isAlive()=" + t1.isAlive());
}
}
但是没那么简单,可以看一下以下代码的输出:
public class Main {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
System.out.println("t1 begin");
try { Thread.sleep(1000); }
catch (InterruptedException e) { }
System.out.println("t1 end");
});
Thread t2 = new Thread(() -> {
synchronized (t1) {
System.out.println("t2 begin");
try { Thread.sleep(2000); }
catch (InterruptedException e) { }
System.out.println("t2 end");
System.out.println("t1 isAlive:" + t1.isAlive());
}
});
t1.start();
t2.start();
}
}
输出结果可能为:
t1 begin
t2 begin
t1 end
t2 end
t1 isAlive:true
出现这样的原因就是当一个线程执行完成之后,需要先拿到线程对象(t1)的锁才能去修改线程状态,但是因为这个时候这把锁被线程 2 拿着,所以线程 1 没办法修改状态。那导致的现象 t1 线程已经退出,但是状态始终是存活。
int a=1
在单线程的情况下可以认为是原子性的,这个语句本身只是声明一个变量并赋值为 1。
但是在多线程情况下就不一定能做到原子性,可能会有其它的赋值操作导致覆盖问题。
例如:
int a = 0; // 线程 1
a = 1; // 线程 2
a = 2;
在多线程环境下,我们无法保证线程执行的先后顺序,所以无法做到原子性。
为了做到原子性,一般会使用锁机制或是原子类(CAS)。
User a = new User(); 不是原子操作。
它底层其实需要先初始化一个内存空间、在内存空间创建对象、变量指向内存空间这几步。
但是在多线程环境下且伴随指令重排的话,可能会导致某一个线程拿到的其实是没初始化好的对象。
其实这两个是 JMM 中比较重要的两个设计语义。
happens-before 主要是解决多线程环境下线程之间的可见性问题,比如线程 A 对加了 volatile 变量进行修改,且 A 线程执行顺序是快于线程 B 的,那么执行线程 B 的时候一定要能看见线程 A 的最新修改。
as-if-serial 主要解决单线程环境下的有序性问题,也就是单线程环境下,无论底层的指令经过怎么样的重排序,最终的结果都应该和代码预期的结果一致。
主要的区别:
任务分配 VS 工作窃取
ThreadPoolExecutor 和 ForkJoinPool 的全局队列存储的其实都是外部提交给线程池大的任务。
只不过 ForkJoinPool 内部会对大的任务进行拆分成一个个的小任务,然后每个线程会自己 fork 一部分小任务到自己的工作队列中去执行。
CompletableFuture 为啥默认用 ForkJoinPool?
因为 CompletableFuture 的场景主要是将多个线程的执行结果进行合并或是用来编排线程的执行顺序,更像是拆分执行完再合并,和 ForkJoinPool 的能力比较契合。
但是对于一些 IO 密集型的任务,最好还是不要直接默认 ForkJoinPool,因为 IO 阻塞住某一个线程的话,反而会卡慢整个任务进度。
那说白了 ForkJoinPool 其实就是把任务拆分成很多的小任务交给多个线程去执行,本质就是压榨 cpu 的性能,比较适合计算比较多的场景。
这三个都是多线程情况下的同步工具,或者说是通信工具,但是在具体的功能和实现上还是有一些区别的。
CountDownLatch
使用方式:
public class Main {
public static void main(String[] args) throws InterruptedException {
// 初始的时候计时器为 3
CountDownLatch countDownLatch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e){}
System.out.println("线程执行");
countDownLatch.countDown(); // 计数器减 1
}).start();
}
countDownLatch.await(); // 这里会阻塞等到计数器为 0
System.out.println("主线程执行");
}
}
CountDownLatch 其实是一个计数器,主要提供了两个 api:
countDownLatch.countDown() 实现将计数器减 1countDownLatch.await() 会阻塞等到计数器的数量变成 0 才执行这个计数器主要的作用就是实现线程等待,比如一个线程或是多个线程的执行需要等待其它的线程执行完才能执行,就可以使用这个计数器。
底层具体实现:实现了 AQS,并且使用 state 表示数量。
CyclicBarrier
使用方式:
public class Main {
public static void main(String[] args) throws InterruptedException {
// 初始化的时候执行需要等待多少个线程
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("所有线程执行到这");
});
for (int i = 0; i < 3; i++) {
int idx = i;
new Thread(() -> {
System.out.println("线程执行" + idx + "到这");
try {
// count -- 底层判断到 count--操作后到 0 会放行,否则阻塞
barrier.await();
} catch (Exception e) {}
System.out.println("线程执行" + idx + "继续执行");
}).start();
}
}
}
输出示例:
线程执行 0 到这
线程执行 2 到这
线程执行 1 到这
所有线程执行到这
线程执行 2 继续执行
线程执行 0 继续执行
线程执行 1 继续执行
这个的作用同样和计数器有点像,不过它主要是用来实现同步屏障。
底层其实维护了一个 count,每调用一次 barrier.await(),count 就会减 1,然后会判断,当减少到 0 的时候会执行初始化的时候塞入的任务;如果减少后不为 0 会阻塞线程。
不直接继承 AQS,而是通过 ReentrantLock + Condition 实现(ReentrantLock 底层还是 AQS)。
Semaphore
使用方式:
public class Main {
public static void main(String[] args) throws InterruptedException {
// 第一个参数是许可证数量,第二个参数是是否开启公平模式
Semaphore semaphore = new Semaphore(3, true);
for (int i = 0; i < 10; i++) {
int idx = i;
new Thread(() -> {
try {
System.out.println("线程" + idx + "准备拿许可证");
semaphore.acquire(); // 拿许可证,拿不到会阻塞
System.out.println("线程" + idx + "拿到拿许可证并执行");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
semaphore.release(); // 释放一个许可证
System.out.println("线程" + idx + "释放许可证");
}
}).start();
}
}
}
这个同样和计数器很像,不过它更像是一种信号量的方式,初始化的时候指定许可证数量并指定是否要公平模式。
semaphore.acquire() 会尝试去拿许可证,拿到会将许可证减 1,拿不到就阻塞。semaphore.release() 会释放一个许可证。Semaphore 底层主要是依赖 AQS 来实现的。
那我们可以总结一下:
CountDownLatch、CyclicBarrier、Semaphore 这三个其实说白了都是通过计数的方式实现线程同步的。
不过具体的用途和功能有区别。
CountDownLatch 就是简单的一个计数器,调用 countDownLatch.countDown() 实现将计数器减 1,countDownLatch.await() 会阻塞等到计数器的数量变成 0 才执行。
CyclicBarrier 则是一个屏障,调用 barrier.await() 会在底层将 count-- 并判断是否已经为 0,为 0 的话会唤醒和放行前面阻塞的线程,不为 0 则阻塞。
Semaphore 是一个信号量机制,初始化的时候我们就要执行许可证的数量,调用 semaphore.acquire() 会尝试去拿许可证,semaphore.release() 会释放一个许可证。
CountDownLatch 适用于一个线程等待多个线程完成操作的情况。 CyclicBarrier 适用于多个线程在同一个屏障处等待。 Semaphore 适用于一个线程需要等待获取许可证才能访问共享资源。
CompletableFuture 的使用方式可参考官方文档或相关教程。
主要的区别从 API 的丰富度就可以看出。
首先就是 FutureTask 只能支持同步获取,也就是 get() 方法。
而 CompletableFuture 可以支持同步和异步两种方式,主要就是带 Async 的那几个,比如 thenAcceptAsync,以及 handle 之类的。
CompletableFuture 支持链式编程,而且提供了很多的方法,而 FutureTask 只能通过 get() 拿到结果之后再去进行操作,而且很多还要自己手动实现,比如异常捕获。
CompletableFuture 支持对多任务的编排,而 FutureTask 不支持多任务的操作,只能我们自己手动去编排。
CompletableFuture 光是对异常的处理就有好几个 API,而且有些 API 支持只有异常出现才会回调,相当于把异常单独拎出来处理,而 FutureTask 只能通过 try-catch 去包裹 get() 方法才能处理异常。
总结下来 FutureTask 的设计初衷是让我们能对任务的执行结果,异常信息,以及对任务状态的判断和取消,主要是初步的完成这些功能,而 CompletableFuture 相当于一个全能版,是对 FutureTask 的一个全面升级,除了 FutureTask 的基本功能之外,还多考虑了更优雅的处理执行结果和异常的方式,以及支持异步操作避免阻塞主线程,并且重点集成了对多线程编排的功能。
那我们为了了解 CompletableFuture,可以从下面这三个维度入手。
CompletableFuture 对比 Future 有什么优势?为什么要封装 CompletableFuture?
Future 其实是 jdk8 以前用异编程的工具,主要就是提交一个异步任务 + 获取结果。
总结就是 Future 主要实现了提交异步任务并获取执行结果的基本需求,而 CompletableFuture 实现了 异步任务编排、异常处理、任务组合、非阻塞回调。
CompletableFuture 常见的 API 是什么?有什么区别?
执行异步任务:
public class Main {
public static void main(String[] args) throws InterruptedException {
// supplyAsync 有返回值
// 不指定线程池执行,默认使用 ForkJoinPool
CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("111");
return null;
});
// 指定线程,使用我们自己指定的线程去执行
CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("111");
return null;
}, Executors.newFixedThreadPool(10));
// runAsync 无返回值
// 不指定线程池执行,默认使用 ForkJoinPool
CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
System.out.println("111");
});
// 指定线程,使用我们自己指定的线程去执行
CompletableFuture<Void> future4 = CompletableFuture.runAsync(() -> {
System.out.println("111");
}, Executors.newFixedThreadPool(10));
}
}
其实就两个 API,取决于你需不需要执行结果:
然后执行时使用的线程池都是,有指定时使用指定的线程池,无指定时使用默认的 ForkJoinPool。
链式处理 API:
处理前一个结果(有返回值)
public class Main {
public static void main(String[] args) throws InterruptedException {
// 同步回调
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("111");
return 222;
}).thenApply(res -> {
System.out.println(res);
return res;
});
// 异步回调
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("111");
return 222;
}).thenApplyAsync(res -> {
System.out.println(res);
return res;
});
}
}
无论是 thenApplyAsync 还是 thenApply,其实本质上就是拿到上一个 future 处理之后 return 的结果,然后进行处理,只不过是使用同步还是异步的方式而已,使用异步的话,如果没有指定线程池,同样是用默认的 ForkJoinPool。
处理前一个结果(无返回值)
public class Main {
public static void main(String[] args) throws InterruptedException {
// 同步回调
CompletableFuture.supplyAsync(() -> {
System.out.println("111");
return 222;
}).thenAccept(res -> {
System.out.println(res);
});
CompletableFuture.supplyAsync(() -> {
System.out.println("111");
return 222;
}).thenAccept(res -> {
System.out.println(res);
});
}
}
这两个 api 的本质区别就是是否异步。
注意:消费和处理的区别,消费就是消费完之后到这里就结束了,而处理是处理完之后继续返回出去。
在上一步执行完成之后继续执行接下去的逻辑(无返回值)
public class Main {
public static void main(String[] args) throws InterruptedException {
// 同步回调
CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("111");
return 222;
}).thenRun(() -> {
System.out.println("222");
});
CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("111");
return 222;
}).thenRunAsync(() -> {
System.out.println("222");
});
}
}
不关心上一步的结果,而是直接执行下一步,适合做一些收尾动作。
串行执行任务:
public class Main {
public static void main(String[] args) throws InterruptedException {
// 串行执行
CompletableFuture<Object> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("111");
return 222;
}).thenCompose(res -> {
System.out.println(res);
return null;
});
}
}
串行的执行多个任务,不过一般用的不多,要串行没必要使用这个。
并行执行多个任务:
//异步通知下游系统
CompletableFuture<Void> allFutures = CompletableFuture.allOf(noticeDetails.stream()
.map(detail -> CompletableFuture.supplyAsync(() -> {
notice(detail);
return null;
})).toArray(CompletableFuture[]::new));
//所有任务通知成功后,更新代还通知单
allFutures.whenComplete((v, e) -> {
if (e == null) {
noticeOrder.setState("SUCCESS");
Boolean res = noticeOrderService.updateById(noticeOrder);
Assert.isTrue(res, "update failed");
} else {
log.error("notice failed", e);
}
});
CompletableFuture 底层怎么实现的?为什么要这么实现?
CompletableFuture 内部采用了一种链式的结构来处理异步计算的结果,每个 CompletableFuture 都有一个与之关联的 Completion 链,它可以包含多个 Completion 阶段,每个阶段都代表一个异步操作,并且可以指定它所依赖的前一个阶段的计算结果。
CompletableFuture 还使用了一种事件驱动的机制来处理异步计算的完成事件。在一个 CompletableFuture 对象上注册的 Completion 阶段完成后,它会触发一个完成事件,然后 CompletableFuture 对象会执行与之关联的下一个 Completion 阶段。
CompletableFuture 的异步计算是通过线程池来实现的。CompletableFuture 在内部使用了一个 ForkJoinPool 线程池来执行异步任务。当我们创建一个 CompletableFuture 对象时,它会在内部创建一个任务,并提交到 ForkJoinPool 中去执行。
底层是通过硬件层面提供的原子指令实现的。
在 x86 架构的 cpu 中,通常是使用 cmpxchg 指令对总线进行锁住,防止在这个的期间有其它 cpu 来访问,并且在这个期间还会禁止 cpu 出现中断,执行完才会释放。
所以可以总结一下就是通过底层的指令对数据进行上锁,然后执行完释放。
此外因为 cmpxchg 是基于 cpu 缓存一致性协议实现的,所以还能做到可见性。
不一定,但是一般会采用自旋的方式。
因为 CAS 是比较并交换,多线程同时操作的情况下成功率较低,所以一般会采用自旋的方式提高成功率。
短时间的自旋性能是不错的,至少比直接阻塞线程,然后等待唤醒要好,但是如果长时间自旋的话就不适合了,自旋是一直占用 cpu 的,cpu 没办法干别的事情,导致系统性能下降。
最好就是有次数的自旋。
了解一点。主要是 ReentrantReadWriteLock。
它里面封装了写锁和读锁。
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() { sync.acquire(1); }
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
public boolean tryLock() { return sync.tryWriteLock(); }
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public String toString() { Thread o = sync.getOwner(); return super.toString() + ((o == null) ? "[Unlocked]" : "[Locked by thread " + o.getName() + "]"); }
public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); }
public int getHoldCount() { return sync.getWriteHoldCount(); }
}
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() { sync.acquireShared(1); }
public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public boolean tryLock() { return sync.tryReadLock(); }
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
public void unlock() { sync.releaseShared(1); }
public Condition newCondition() { throw new UnsupportedOperationException(); }
public String toString() { int r = sync.getReadLockCount(); return super.toString() + "[Read locks = " + r + "]"; }
}
读锁是共享的,写锁是互斥的,读写也是互斥的。
state 字段的高 16 位表示读锁的重入次数,低 16 位表示写锁的重入次数。
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
可以看一下写锁和读锁加锁时有什么区别:
写锁:
@ReservedStackAccess protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
// 当前确实有人持有锁
if (c != 0) {
// 写锁为 0 或是锁不属于当前线程都无法加锁成功
if (w == 0 || current != getExclusiveOwnerThread()) return false;
// 锁确实是当前线程的,但是重入次数过多,所以需要报错
if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded");
// 重入次数加 1
setState(c + acquires);
return true;
}
// 当前没人持有任何锁,尝试 CAS 加锁,并将当前持有线程设置为自己
if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false;
setExclusiveOwnerThread(current);
return true;
}
流程总结:
可以看见,当一个线程已经拥有读锁的时候,并没有办法升级为写锁,主要是因为读锁是共享的,升级的话可能造成死锁。
读锁:
@ReservedStackAccess protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 如果已经有人加了写锁并且这个锁的拥有者不是自己,直接返回错误
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1;
int r = sharedCount(c);
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
// 当前线程是第一个加读锁的线程
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
}
// 判断已经拥有读锁的线程是自己,重入次数 +1
else if (firstReader == current) {
firstReaderHoldCount++;
}
// 将当前线程 Id 记录下来并记录重入次数
else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != LockSupport.getThreadId(current)) cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0) readHolds.set(rh);
rh.count++;
}
return 1;
}
// 兜底
return fullTryAcquireShared(current);
}
流程总结:
读锁解锁时:
@ReservedStackAccess protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 当前线程是第一个加读锁者
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1) firstReader = null;
else firstReaderHoldCount--;
}
// 不是第一个,说明是使用 ThreadLocal 进行存储
// 找到对应的 ThreadLocal 之后进行移除
else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != LockSupport.getThreadId(current)) rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0) throw unmatchedUnlockException();
}
--rh.count;
}
// CAS 减少重入次数
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
可以发现重入线程的记录这一块是分开存储的。
第一个线程是通过变量的方式存储,只有第二个线程才是 ThreadLocal 存储。
主要是为了在只有一个线程加锁的情况下进行提速。
HoldCounter 主要就是记录 线程 id 和重入次数。
AQS 就是抽象队列同步器,是 JUC 下的一个同步器基础框架,常见的 Semaphore,ReentrantLock、CountDownLatch 都是基于 AQS 实现的。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
在 AQS 内部,维护了一个 FIFO 队列和一个 volatile 的 int 类型的 state 变量。在 state=1 的时候表示当前对象锁已经被占有了,state 变量的值修改的动作通过 CAS 来完成。
源码:
/**
* The synchronization state.
*/
private volatile int state;
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() { return state; }
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) { state = newState; }
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
return U.compareAndSetInt(this, STATE, expect, update);
}
// Node 类用于构建队列
static final class Node {
// 标记节点状态。常见状态有 CANCELLED(表示线程取消)、SIGNAL(表示后继节点需要运行)、CONDITION(表示节点在条件队列中)等。
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 节点中的线程,存储线程引用,指向当前节点所代表的线程。
volatile Thread thread;
}
// 队列头节点,延迟初始化。只在 setHead 时修改
private transient volatile Node head;
// 队列尾节点,延迟初始化。
private transient volatile Node tail;
// 入队操作
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 必须先初始化
if (compareAndSetHead(new Node())) tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
可以发现底层的队列其实是通过Node 节点组成的双向链表实现的(Node 是 AQS 的一个内部类)。
当线程抢锁失败就会被封装成一个 Node 节点并放入这条链表的尾部,也就是存放进队尾。
同时,当有线程释放锁的时候会唤醒队头节点的线程。
然后无论是公平锁还是非公平锁,唤醒的逻辑都是一样的,都是唤醒队头节点。
tryAcquire 时,hasQueuedPredecessors() 会返回 false,那么就会直接把锁给到头节点,而新来的线程自然就抢不到。为什么公平模式下,基本等价于把锁给队头节点,那为何还要调用 tryAcquire 去进行 CAS 抢锁呢?
因为 state 是 volatile 修饰的,它的目的是做到可见性,并且对 state 的修改只能通过 CAS 实现,这样才能保证多线程下的可见性、有序性以及原子性。
所以直接通过这种形式上的 CAS 抢夺反而是更加合理的。
从本质上看,AQS 提供了两种锁:
分别是排它锁和共享锁。
AQS 总共有两种队列,一种是同步队列,用于实现锁的获取和释放。还有一种是条件队列,条件队列也是一个 FIFO 队列,用于在特定条件下管理线程的等待和唤醒。
AQS 提供了两种模式来支持不同类型的同步器:独占模式和共享模式。
为什么选择链表,而不是数组?
因为数组实现队列的话移除操作比较多,数组的移除性能不是很好,此外就是数组需要扩容,所以使用队列比较合适。
为什么使用双向链表而不是单链表?
双链表能支持双向遍历,无论是进行数据插入还是删除都会更加的灵活。
单链表的优势则是内存比较省。
但是对于需要在链表中进行元素的插入和删除的场景来说,双向链表无疑是更加适合的。
看一下 AQS 里面的具体场景:
**获取同步状态,**AQS 提供了两种 API,一种是支持中断,一种不支持。
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();
}
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
if (!tryAcquire(arg)) doAcquireInterruptibly(arg);
}
/**
* Acquires in exclusive interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException();
}
} finally {
if (failed) cancelAcquire(node);
}
}
在这个方法的 finally 中执行了一个 cancelAcquire 的动作,也就是说在 acquire 的过程中,如果出现了线程中断异常,那么就会执行这个方法,他的主要作用就是将中断的线程节点从 AQS 同步队列中移除。
而涉及到将具体某一个节点移除,使用双向链表会更加的合适,单向链表还需要从头遍历找到这个节点才能删除。
高效的挂起支持
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //如果 ws 为 Node.SIGNAL(值为 -1),
//这意味着前驱节点已经处于等待状态,期望在释放同步状态时唤醒后继节点。
//在这种情况下,方法可以直接返回 true,指示当前线程可以安全地挂起。
return true;
if (ws > 0) {
//如果 ws 大于 0,说明前驱节点已被取消。
//此时,循环向前遍历等待队列,跳过所有已取消的节点,直到找到一个未被取消的节点作为新的前驱节点,并更新相应的链接。
//这个过程是为了维护等待队列的健康状态,移除其中的无效节点。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前驱节点的等待状态不是 SIGNAL(也即,它是 0 或 PROPAGATE),
//则将前驱节点的等待状态更新为 SIGNAL。
//这是通过 compareAndSetWaitStatus 方法完成的,它安全地修改节点的状态,以指示当前节点(node)在释放同步状态时需要被唤醒。
//这里并不立即挂起当前线程,而是返回 false,让调用者知道它需要再次尝试获取同步状态,在无法获取时再决定挂起。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
当一个线程获取锁失败后,就需要根据前面的节点来判断自己接下来的操作,是被挂起,还是继续尝试。
而不是直接在一个死循环中无限的等待。
可以发现上面的这个过程就需要查询前一个节点的状态来决定当前节点的状态,所以需要使用双向链表,如果是单向链表还要从头开始遍历。
高效的判断一个线程是否在等待队列中
public final boolean isQueued(Thread thread) {
if (thread == null) throw new NullPointerException();
for (Node p = tail; p != null; p = p.prev)
if (p.waiter == thread) return true;
return false;
}
这里是采用从后向前遍历,因为新加入的线程会塞入队尾,从后向前遍历可能可以更快的找到(虽然概率不高)。
可以发现这里需要支持从后向前遍历,所以比如使用双向链表。
支持条件队列
AQS 还支持条件变量,这允许线程在特定条件满足之前挂起。条件队列需要能够从等待队列中移动节点到条件队列,以及反向操作。双向链表使得这种操作更加直接和高效。
总的来说就是:
1 个同步状态指的就是
private volatile int state;
加 volatile 是为了保证有序性 + 可见性,搭配上 CAS 的原子性,能适配高并发场景。
有什么用?
这个看具体的实现类的场景:
一个同步队列是指 Node 节点堆起来的双向链表实现的 FIFO 队列
abstract static class Node {
volatile Node prev; // initially attached via casTail
volatile Node next; // visibly nonnull when signallable
Thread waiter; // visibly nonnull when enqueued
volatile int status; // written by owner, atomic bit ops by others
}
主要就是标识封装的线程以及这个节点的前后驱节点以及节点状态。
同步队列主要是用来存储抢锁失败的那些线程,它们会被阻塞起来并封装成节点塞入队尾等待被唤醒重新抢锁。
2 套模板方法主要是指 AQS 对共享和独占的具体实现
final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0;
// retries upon unpark of first thread
boolean interrupted = false, first = false;
Node pred = null; // predecessor of node when enqueued
/*
* Repeatedly:
* Check if node now first
* if so, ensure head stable, else ensure valid predecessor
* if node is first or not yet enqueued, try acquiring
* else if node not yet created, create it
* else if not yet enqueued, try once to enqueue
* else if woken from park, retry (up to postSpins times)
* else if WAITING status not set, set and retry
* else park and clear WAITING status, and check cancellation
*/
for (;;) {
if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) {
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // JDK9+优化:提示 CPU 当前线程自旋,减少空耗 continue;
}
}
if (first || pred == null) {
boolean acquired;
try {
if (shared) acquired = (tryAcquireShared(arg) >= 0);
else acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
if (acquired) {
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared) signalNextIfShared(node);
if (interrupted) current.interrupt();
}
return 1;
}
}
if (node == null) {
// allocate; retry before enqueue
if (shared) node = new SharedNode();
else node = new ExclusiveNode();
} else if (pred == null) {
// try to enqueue
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence,提升性能
if (t == null) tryInitializeHead();
else if (!casTail(t, node)) node.setPrevRelaxed(null); // back out
else t.next = node;
} else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
} else if (node.status == 0) {
node.status = WAITING; // enable signal and recheck
} else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed) LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L) LockSupport.parkNanos(this, nanos);
else break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible) break;
}
}
return cancelAcquire(node, interrupted, interruptible);
}
独占 acquire()、acquireInterruptibly()、release()。 共享 acquireShared()、releaseShared()。 最较新的 jdk 版本中大部分逻辑已经抽象封装成一套接口,在接口内部进行判断处理,比如 acquire 方法。
若干工具方法
主要就是一些调用底层的 Unsafe 实现的方法,比如 CAS 以及阻塞和唤醒线程。
AQS 复制封装一些模板方法,AQS 本身是一个抽象类,子类继承之后通过对方法进行重写,从而实现自己想要的逻辑。
具体的区别就是在每次唤醒的时候都是唤醒队头元素,不过公平实现的情况下,CAS 的最终结果会是队头元素成功获取到锁,在这个过程中如果有新来的线程就会因为获取不到而进入同步队列等待。
非公平就是 CAS 的时候有可能会让新来的线程成功拿到锁。
acquire()
public final void acquire(int arg) {
if (!tryAcquire(arg)) // 抢锁失败
acquire(null, arg, false, false, false, 0L);
}
/**
* 核心的锁获取方法,是 AQS(抽象队列同步器)的核心实现
* @param node 封装当前线程的节点(初始为 null,后续创建)
* @param arg 获取锁的参数(如重入锁的重入次数,共享锁的许可数)
* @param shared 是否为共享模式(true=共享,false=独占)
* @param interruptible 是否可中断(true=响应中断,false=忽略中断)
* @param timed 是否超时获取(true=有超时时间,false=无限等待)
* @param time 超时时间(纳秒),仅 timed=true 时有效
* @return 1=获取成功;其他值=取消获取(如中断/超时)
*/
final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) {
// 当前请求锁的线程
Thread current = Thread.currentThread();
// spins:初始自旋次数;postSpins:唤醒后重试自旋次数(用于减少不公平性)
byte spins = 0, postSpins = 0;
// interrupted:标记线程是否被中断;first:标记当前节点是否为队列第二个节点(head 的后继)
boolean interrupted = false, first = false;
// pred:当前节点的前驱节点(入队后赋值)
Node pred = null;
/*
* 核心自旋逻辑,循环执行以下操作:
* 1. 检查当前节点是否成为队列第二个节点(head 的后继)
* 2. 若节点是第二个节点或未入队,尝试获取锁
* 3. 若节点未创建,则创建(共享/独占节点)
* 4. 若节点未入队,则尝试入队
* 5. 若从 park 中唤醒,重试(最多 postSpins 次)
* 6. 若节点状态未设置为 WAITING,则设置并重试
* 7. 否则阻塞线程,并检查取消条件(中断/超时)
*/
for (;;) {
// 无限自旋,直到获取成功/取消获取
// ========== 第一步:检查节点位置和前驱节点有效性 ==========
// !first:还未标记为队列第二个节点
// pred = node.prev:获取当前节点的前驱
// !(first = (head == pred)):判断前驱是否为 head(即当前节点是否为第二个节点)
if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) {
// 前驱节点状态<0:前驱已取消(CANCELLED),清理队列中无效节点
if (pred.status < 0) {
cleanQueue(); // 清理队列中已取消的节点
continue; // 清理后重新自旋
}
// 前驱节点的前驱为 null(前驱不是 head),说明队列未稳定,自旋等待
else if (pred.prev == null) {
Thread.onSpinWait(); // JDK9+优化:提示 CPU 自旋,优化性能
continue;
}
}
// ========== 第二步:尝试获取锁(节点是第二个/未入队) ==========
if (first || pred == null) {
// 节点是第二个节点 或 未入队(直接尝试获取)
boolean acquired;
// 是否获取成功
try {
// 共享模式:调用 tryAcquireShared(需子类实现,返回>=0 表示成功)
// 独占模式:调用 tryAcquire(需子类实现,返回 true 表示成功)
if (shared) acquired = (tryAcquireShared(arg) >= 0);
else acquired = tryAcquire(arg);
} catch (Throwable ex) {
// 获取过程中抛出异常,取消获取并传播异常
cancelAcquire(node, interrupted, false);
throw ex;
}
// 获取锁成功
if (acquired) {
if (first) {
// 当前节点是队列第二个节点(需要更新 head)
node.prev = null; // 新 head 的前驱置 null
head = node; // 将当前节点设为新的 head
pred.next = null; // 原 head 的后继置 null(帮助 GC)
node.waiter = null; // 清空节点关联的线程(释放引用)
if (shared) // 共享模式:唤醒后续共享节点(传播唤醒)
signalNextIfShared(node);
if (interrupted) // 若线程曾被中断,恢复中断状态(保证中断状态不丢失)
current.interrupt();
}
return 1; // 获取成功,返回 1
}
}
// ========== 第三步:节点未创建则创建 ==========
if (node == null) {
// 首次自旋,节点尚未创建
if (shared) node = new SharedNode(); // 共享模式:创建共享节点
else node = new ExclusiveNode(); // 独占模式:创建独占节点
}
// ========== 第四步:节点未入队则尝试入队 ==========
else if (pred == null) {
// 节点已创建但未入队(pred=null)
node.waiter = current; // 节点关联当前线程
Node t = tail; // 获取队列尾节点
// 松弛设置前驱(避免不必要的内存屏障,提升性能)
node.setPrevRelaxed(t);
if (t == null) tryInitializeHead(); // 队列为空,初始化 head 节点
else if (!casTail(t, node)) // CAS 更新尾节点(保证原子性)
// CAS 失败,说明有其他线程竞争入队,回滚前驱设置
node.setPrevRelaxed(null);
else t.next = node; // CAS 成功,原尾节点的后继指向当前节点
}
// ========== 第五步:自旋等待(减少阻塞,提升性能) ==========
else if (first && spins != 0) {
--spins; // 减少自旋次数(降低不公平性)
Thread.onSpinWait(); // 提示 CPU 自旋,优化性能
}
// ========== 第六步:设置节点状态为 WAITING(允许被唤醒) ==========
else if (node.status == 0) {
node.status = WAITING; // 设置状态为等待,允许后续被 signal 唤醒
}
// ========== 第七步:阻塞当前线程(核心等待逻辑) ==========
else {
// 调整 postSpins(指数级增加,最多到 Byte.MAX_VALUE)
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed) // 非超时模式:无限阻塞,直到被 unpark
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L) // 超时模式:剩余时间>0,阻塞指定纳秒数
LockSupport.parkNanos(this, nanos);
else // 超时时间已到,退出自旋
break;
node.clearStatus(); // 唤醒后清空节点状态(重置为 0)
// 检查线程是否被中断:
// 1. Thread.interrupted():清除中断状态并返回是否中断
// 2. 若 interruptible=true,中断则退出自旋
if ((interrupted |= Thread.interrupted()) && interruptible) break;
}
}
// 退出自旋(中断/超时),取消获取锁并返回结果
return cancelAcquire(node, interrupted, interruptible);
}
大概的逻辑就是这样:
可以发现以上的逻辑,在出现异常的时候 interrupted |= Thread.interrupted()) && interruptible 只是记录,只有当 interruptible 为 true 时在退出循环,但是这个值调用方传递的是 false,也就是只要没拿到锁或不抛出异常就不会退出。
acquireInterruptibly()
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted() || (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0)) throw new InterruptedException();
}
其实底层调用的逻辑和前面是一样的,只不过 interruptible 参数传递的是 true,这样就能做到出现异常的时候会及时的抛出,不会忽略。
acquire 之所以要忽略中断是因为语义决定的,这类锁的特点,我们更加倾向于获取到锁,而不是出现中断退出,当然 AQS 的实现类基本对两种方式都有实现,例如 ReentranLock。
这个主要是两个方法,Java 中的 park() 和 unpark(),这两个都是本地方法,依赖底层的实现。
park() 负责阻塞线程,unpark() 负责唤醒线程。
当一个线程尝试获取锁或者同步器时,如果获取失败,AQS 会将该线程封装成一个 Node 并添加到等待队列中,然后通过 LockSupport.park() 将该线程阻塞。
当一个线程释放锁或者同步器时,AQS 会通过 LockSupport.unpark() 方法将等待队列中的第一个线程唤醒,并让其重新尝试获取锁或者同步器。
除了基本的等待和唤醒机制,AQS 还提供了条件变量(Condition)的实现,用于在某些条件不满足时让线程等待,并在条件满足时唤醒线程。具体实现是通过创建一个等待队列,将等待的线程封装成 Node 并添加到队列中,然后将这些线程从同步队列中移除,并在条件满足时将等待队列中的所有线程唤醒。
AQS 中提供了两种队列,同步队列主要是为了实现锁机制,而条件队列则是为了实现线程间的协调和通信。
结论:
同步队列:
同步队列主要用于实现锁的获取和释放。如我们常用的 ReentrantLock,就是基于同步队列来实现的。
static final class Node {
// 前驱和后继节点,构成双向链表
Node prev;
Node next;
// 线程本身
Thread thread;
// 状态信息,表示节点在同步队列中的等待状态
int waitStatus;
// ...
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速路径:直接尝试在尾部插入节点
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 快速路径失败时,进入完整的入队操作
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 队列为空,初始化
if (compareAndSetHead(new Node())) tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
同步队列的抢锁逻辑主要看
条件队列:
条件队列其实是让开发人员自己来制定规则,能做到当线程不满足一定的条件时就进入条件队列,等待被唤醒。
条件队列与同步队列不同,它是基于 Condition 接口实现的,用于管理那些因为某些条件未满足而等待的线程。当条件满足时,这些线程可以被唤醒。每个 Condition 对象都有自己的一个条件队列。
ConditionObject 是 AQS 的一个内部类(实现了Condition接口),用于实现条件变量。条件变量是并发编程中一种用于线程间通信的机制,它允许一个或多个线程在特定条件成立之前等待,同时释放相关的锁。这在某种程度上类似于对象监视器模式中的 wait() 和 notify() 方法,但提供了更灵活和更强大的控制。
public class ConditionObject implements Condition, java.io.Serializable {
// 条件队列的首尾节点
private transient Node firstWaiter;
private transient Node lastWaiter;
// ...
}
主要的原理:await():使当前线程释放锁并进入等待队列,直到被另一个线程的 signal() 或 signalAll() 方法唤醒,或被中断。
ConditionObject 的原理就是把拿到锁,但是不满足特定条件的线程从同步队列移动到条件队列中,等到其满足特定条件之后再从条件队列移动会同步队列重新进行抢锁操作。
主要的区别:
这里最主要的方法就是 await() 以及 signal(),接下来就拆解一下。
**拆解 await() 方法:**它的具体目标就是 释放掉所有锁——进入条件队列——等待被唤醒。
public final void await() throws InterruptedException {
// 步骤 1:检查前置中断(响应中断的第一步)
// 含义:如果线程在调用 await() 前已经被中断,直接抛异常(不允许带着中断状态等)
if (Thread.interrupted()) throw new InterruptedException();
// 步骤 2:入条件队列(把当前线程封装成 Node,加入 Condition 的单向链表)
// addConditionWaiter() 逻辑:new 一个 Node(状态为 CONDITION),加到 lastWaiter 后面
Node node = addConditionWaiter();
// 步骤 3:完全释放锁(核心!必须释放锁,不然其他线程拿不到锁)
// fullyRelease():释放 AQS 的 state(同步状态),返回释放前的 state 值(比如重入锁的重入次数)
// 为什么'完全释放'?因为线程要去等条件,必须把锁彻底交出去,不能留重入次数
int savedState = fullyRelease(node);
// 步骤 4:初始化中断模式(记录等待过程中是否被中断、中断的类型)
// 0=未中断,THROW_IE=抛异常,REINTERRUPT=补中断标记
int interruptMode = 0;
// 步骤 5:循环阻塞(只要没回同步队列,就一直等)
// isOnSyncQueue(node):判断当前 Node 是否在 AQS 的同步队列里(不在就继续等)
while (!isOnSyncQueue(node)) {
// 阻塞线程(核心!线程进入休眠,直到被 signal()/中断唤醒)
LockSupport.park(this);
// 检查等待期间是否被中断,更新 interruptMode
// checkInterruptWhileWaiting():返回 0=未中断,THROW_IE=被中断且没被 signal,REINTERRUPT=被中断但已被 signal
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break;
// 被中断了,跳出循环,准备处理中断
}
// 步骤 6:回到同步队列后,重新抢锁
// acquireQueued(node, savedState):就是 AQS 的抢锁逻辑(循环 + 阻塞),返回是否被中断
// 如果抢锁过程中被中断,且不是'signal 后中断',则标记为 REINTERRUPT(后续补中断)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT;
// 步骤 7:清理条件队列的无效节点(GC 优化)
// unlinkCancelledWaiters():遍历条件队列,移除状态为 CANCELLED 的 Node(比如超时/中断的节点)
if (node.nextWaiter != null) unlinkCancelledWaiters();
// 步骤 8:处理中断(根据 interruptMode 决定抛异常还是补标记)
// reportInterruptAfterWait():THROW_IE 抛 InterruptedException,REINTERRUPT 调用 selfInterrupt() 补标记
if (interruptMode != 0) reportInterruptAfterWait(interruptMode);
}
具体的流程可以总结一下。
拆解 signal() 方法:把条件队列的第一个有效线程,移回同步队列,唤醒它让它抢锁。
public final void signal() {
// 步骤 1:检查权限(必须持有独占锁才能唤醒,否则抛异常)
// isHeldExclusively():AQS 的方法,子类实现(比如 ReentrantLock 判断当前线程是否持有锁)
// 为什么要检查?防止没拿锁的线程乱唤醒,导致锁语义混乱
if (!isHeldExclusively()) throw new IllegalMonitorStateException();
// 步骤 2:拿到条件队列的头节点(第一个等条件的线程)
Node first = firstWaiter;
if (first != null) doSignal(first); // 真正执行唤醒逻辑
}
private void doSignal(Node first) {
do {
// 步骤 3:移除条件队列的头节点(更新 firstWaiter)
// 如果移除后队列为空,lastWaiter 也置 null
if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null;
first.nextWaiter = null; // 断开当前节点和条件队列的关联
// 步骤 4:把节点移到同步队列(transferForSignal()是核心)
// transferForSignal 返回 false → 节点已取消,换下一个节点重试
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
具体的流程可以总结一下:
常见问题:
为什么 await()要用 while 循环判断 isOnSyncQueue(node)?
fullyRelease(node)为什么要'完全释放'锁?
条件队列是单向链表,同步队列是双向链表,为什么?
signal()为什么只唤醒头节点,不是所有节点?
扩展:
条件队列的常见用法:基本不用自己去造,直接基于继承了 AQS 的类去实现就行。
class ConditionObjectUsage {
// 1. 定义基于 AQS 的独占锁(ReentrantLock 底层就是 AQS)
private final ReentrantLock lock = new ReentrantLock();
// 2. 获取 ConditionObject 实例(lock.newCondition() 直接返回 ConditionObject)
private final Condition condition = lock.newCondition();
// 业务条件(比如'订单是否支付成功')
private boolean isPaid = false;
// ===== 线程 1:等待条件满足 =====
public void waitForPayment() throws InterruptedException {
// 第一步:必须先获取锁(调用 await() 前必须持有锁,否则抛异常)
lock.lock();
try {
// 第二步:循环判断条件(避免虚假唤醒,绝对不能用 if)
while (!isPaid) {
// 条件不满足 → 释放锁,进入 ConditionObject 的条件队列等待
condition.await(); // 这里相当于阻塞住了
}
// 第三步:条件满足,执行业务逻辑
System.out.println("订单已支付,执行发货逻辑");
} finally {
// 第四步:释放锁(必须在 finally,防止异常导致锁泄漏)
lock.unlock();
}
}
// ===== 线程 2:满足条件后唤醒等待线程 =====
public void signalPayment() {
lock.lock();
try {
// 第一步:修改业务条件(先满足条件,再唤醒)
isPaid = true;
// 第二步:唤醒条件队列中第一个等待的线程(底层调用 ConditionObject.signal())
condition.signal(); // 如果要唤醒所有线程,用 condition.signalAll();
} finally {
lock.unlock();
}
}
// 测试
public static void main(String[] args) throws InterruptedException {
ConditionObjectUsage demo = new ConditionObjectUsage();
// 启动线程 1:等待支付
new Thread(() -> {
try {
demo.waitForPayment();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
// 模拟 1 秒后支付完成
Thread.sleep(1000);
// 启动线程 2:唤醒等待线程
new Thread(demo::signalPayment).start();
}
}
到这里相信你已经能看懂了。
说白了无论是哪个 AQS 的实现类,底层其实都是只有一把锁,就是基于 AQS 的 statue 实现的。
那么这里的 ReentrantLock 也不例外,其实就是两个线程来回的抢锁并进行操作。
线程 1 抢到之后发现自己的条件不满足,只能调用 condition.await(); 释放锁的同时将当前线程阻塞住并塞入条件队列。
条件 2 抢到锁之后会修改条件,并唤醒一个条件队列的线程,再释放锁。
在线程 1 中虽然有两步能释放锁,但是没办法同时执行,因为执行了 condition.await(); 释放锁的同时线程也被阻塞住了,不会向下执行,如果没执行到这个方法的,则通过 lock.unlock(); 去释放。
然后可能有些人还是会疑惑,同步队列和条件队列存储哪些线程?
同步队列存储那些抢锁失败的,条件队列存储抢锁成功但是不满足条件。
也就是有两条路:
首先无论是新来的节点还是在同步队列中被唤醒的节点,它第一步会尝试 CAS 获取锁,如果获取成功,就判断是否满足条件,满足就执行逻辑,不满足就放到条件队列中。
然后获取失败是存储到同步队列中。
注意:想要存储到条件队列的前提是获取锁成功 + 条件不满足 + 不在同步队列中。
不过只要你是获取锁成功的一定不会在同步队列中,即使原本在也会被移除。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online