跳到主要内容
Java 并发常见问题总结 | 极客日志
Java java 算法
Java 并发常见问题总结 综述由AI生成 总结了 Java 并发编程中的核心问题。首先解释了线程异常不影响进程退出的原因及 isAlive() 方法的陷阱。接着讨论了 int 赋值与对象创建的原子性问题,以及 JMM 中 happens-before 和 as-if-serial 的区别。对比了 ForkJoinPool 与 ThreadPoolExecutor 的工作模式及适用场景,并详细解析了 CountDownLatch、CyclicBarrier 和 Semaphore 的同步机制。深入分析了 CompletableFuture 相对于 FutureTask 的优势及其底层实现。最后阐述了 CAS 的硬件原子性保证、读写锁原理以及 AQS 框架的核心设计思想,包括同步队列与条件队列的实现细节。
PentesterX 发布于 2026/3/30 更新于 2026/5/24 22 浏览Java 线程出现异常,进程为啥不会退出?
因为 Java 是采用线程独立模型,各个线程之间互相独立,有各自的上下文,当一个线程出现错误的时候,只会影响到这个线程自己本身,不会影响到其它的线程,更不会导致程序退出。
不过我们这里介绍的异常更多是 Exception,如果是 Error 级别的,通常意味着硬件层面不够,才有可能 会导致退出。
此外 Exception 我们是可以通过捕获的,捕获了的话也不会导致线程直接死掉。
Java 是如何判断一个线程是否存活的?需要注意什么吗?
通过 isAlive() 方法:
public class Main {
public static void main (String[] args) throws InterruptedException {
Thread t1 = new Thread (() -> {
System.out.println("t1 begin" );
try { Thread.sleep(1000 ); }
catch (InterruptedException e) { }
System.out.println("t1 end" );
});
t1.start();
System.out.println("t1.isAlive()=" + t1.isAlive());
t1.join();
System.out.println("t1.isAlive()=" + t1.isAlive());
}
}
但是没那么简单,可以看一下以下代码的输出:
public class Main {
public static void main (String[] args) throws InterruptedException {
Thread t1 = new Thread (() -> {
System.out.println("t1 begin" );
{ Thread.sleep( ); }
(InterruptedException e) { }
System.out.println( );
});
(() -> {
(t1) {
System.out.println( );
{ Thread.sleep( ); }
(InterruptedException e) { }
System.out.println( );
System.out.println( + t1.isAlive());
}
});
t1.start();
t2.start();
}
}
try
1000
catch
"t1 end"
Thread
t2
=
new
Thread
synchronized
"t2 begin"
try
2000
catch
"t2 end"
"t1 isAlive:"
t1 begin
t2 begin
t1 end
t2 end
t1 isAlive:true
出现这样的原因就是当一个线程执行完成之后,需要先拿到线程对象(t1)的锁才能去修改线程状态,但是因为这个时候这把锁被线程 2 拿着,所以线程 1 没办法修改状态。那导致的现象 t1 线程已经退出,但是状态始终是存活。
int a=1 是原子性操作吗?User a = new User(); 是原子性操作吗? 在单线程的情况下可以认为是原子性的,这个语句本身只是声明一个变量并赋值为 1。
但是在多线程情况下就不一定能做到原子性,可能会有其它的赋值操作导致覆盖问题。
在多线程环境下,我们无法保证线程执行的先后顺序,所以无法做到原子性。
为了做到原子性,一般会使用锁机制或是原子类(CAS)。
User a = new User(); 不是原子操作。
它底层其实需要先初始化一个内存空间、在内存空间创建对象、变量指向内存空间这几步。
但是在多线程环境下且伴随指令重排的话,可能会导致某一个线程拿到的其实是没初始化好的对象。
happens-before 和 as-if-serial 有啥区别和联系? happens-before 主要是解决多线程 环境下线程之间的可见性 问题,比如线程 A 对加了 volatile 变量进行修改,且 A 线程执行顺序是快于线程 B 的,那么执行线程 B 的时候一定要能看见线程 A 的最新修改。
as-if-serial 主要解决单线程 环境下的有序性 问题,也就是单线程环境下,无论底层的指令经过怎么样的重排序,最终的结果都应该和代码预期的结果一致。
ForkJoinPool 和 ThreadPoolExecutor 区别是什么?
工作方式(算法) :ThreadPoolExecutor 是任务分配,ForkJoinPool 是工作窃取。
线程数量 :ThreadPoolExecutor 需要手动设置,ForkJoinPool 自动扩缩容。
任务队列 :ThreadPoolExecutor 所有线程共享一个全局队列,ForkJoinPool 每个线程都有自己的队列,然后还有一个全局队列。
适合类型 :ThreadPoolExecutor 比较适合那种 IO 比较密集且任务不可分割,ForkJoinPool 适合任务可以拆分的计算类型任务。
任务分配就是每个线程到全局队列领取任务,如果领取不到就会阻塞等待。
工作窃取就是每个线程会维护自己的一个工作队列,然后执行完之后会主动去看一下其它线程是否执行完成,如果没执行完会偷一些任务过来执行。
ThreadPoolExecutor 和 ForkJoinPool 的全局队列存储的其实都是外部提交给线程池大的任务。
只不过 ForkJoinPool 内部会对大的任务进行拆分成一个个的小任务,然后每个线程会自己 fork 一部分小任务到自己的工作队列中去执行。
CompletableFuture 为啥默认用 ForkJoinPool?
因为 CompletableFuture 的场景主要是将多个线程的执行结果进行合并或是用来编排线程的执行顺序,更像是拆分执行完再合并,和 ForkJoinPool 的能力比较契合。
但是对于一些 IO 密集型的任务,最好还是不要直接默认 ForkJoinPool,因为 IO 阻塞住某一个线程的话,反而会卡慢整个任务进度。
那说白了 ForkJoinPool 其实就是把任务拆分成很多的小任务交给多个线程去执行,本质就是压榨 cpu 的性能,比较适合计算比较多的场景。
CountDownLatch、CyclicBarrier、Semaphore 区别? 这三个都是多线程情况下的同步工具,或者说是通信工具,但是在具体的功能和实现上还是有一些区别的。
public class Main {
public static void main (String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch (3 );
for (int i = 0 ; i < 3 ; i++) {
new Thread (() -> {
try {
TimeUnit.SECONDS.sleep(1 );
} catch (Exception e){}
System.out.println("线程执行" );
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
System.out.println("主线程执行" );
}
}
CountDownLatch 其实是一个计数器,主要提供了两个 api:
countDownLatch.countDown() 实现将计数器减 1
countDownLatch.await() 会阻塞等到计数器的数量变成 0 才执行
这个计数器主要的作用就是实现线程等待,比如一个线程或是多个线程的执行需要等待其它的线程执行完才能执行,就可以使用这个计数器。
底层具体实现:实现了 AQS,并且使用 state 表示数量。
public class Main {
public static void main (String[] args) throws InterruptedException {
CyclicBarrier barrier = new CyclicBarrier (3 , () -> {
System.out.println("所有线程执行到这" );
});
for (int i = 0 ; i < 3 ; i++) {
int idx = i;
new Thread (() -> {
System.out.println("线程执行" + idx + "到这" );
try {
barrier.await();
} catch (Exception e) {}
System.out.println("线程执行" + idx + "继续执行" );
}).start();
}
}
}
线程执行 0 到这
线程执行 2 到这
线程执行 1 到这
所有线程执行到这
线程执行 2 继续执行
线程执行 0 继续执行
线程执行 1 继续执行
这个的作用同样和计数器有点像,不过它主要是用来实现同步屏障。
底层其实维护了一个 count,每调用一次 barrier.await(),count 就会减 1,然后会判断,当减少到 0 的时候会执行初始化的时候塞入的任务;如果减少后不为 0 会阻塞线程。
不直接继承 AQS,而是通过 ReentrantLock + Condition 实现(ReentrantLock 底层还是 AQS)。
public class Main {
public static void main (String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore (3 , true );
for (int i = 0 ; i < 10 ; i++) {
int idx = i;
new Thread (() -> {
try {
System.out.println("线程" + idx + "准备拿许可证" );
semaphore.acquire();
System.out.println("线程" + idx + "拿到拿许可证并执行" );
} catch (InterruptedException e) {
throw new RuntimeException (e);
} finally {
semaphore.release();
System.out.println("线程" + idx + "释放许可证" );
}
}).start();
}
}
}
这个同样和计数器很像,不过它更像是一种信号量的方式,初始化的时候指定许可证数量并指定是否要公平模式。
每次 semaphore.acquire() 会尝试去拿许可证,拿到会将许可证减 1,拿不到就阻塞。
semaphore.release() 会释放一个许可证。
Semaphore 底层主要是依赖 AQS 来实现的。
CountDownLatch、CyclicBarrier、Semaphore 这三个其实说白了都是通过计数的方式实现线程同步的。
CountDownLatch 就是简单的一个计数器,调用 countDownLatch.countDown() 实现将计数器减 1,countDownLatch.await() 会阻塞等到计数器的数量变成 0 才执行。
CyclicBarrier 则是一个屏障,调用 barrier.await() 会在底层将 count-- 并判断是否已经为 0,为 0 的话会唤醒和放行前面阻塞的线程,不为 0 则阻塞。
Semaphore 是一个信号量机制,初始化的时候我们就要执行许可证的数量,调用 semaphore.acquire() 会尝试去拿许可证,semaphore.release() 会释放一个许可证。
CountDownLatch 适用于一个线程等待多个线程完成操作的情况。
CyclicBarrier 适用于多个线程在同一个屏障处等待。
Semaphore 适用于一个线程需要等待获取许可证才能访问共享资源。
CompletableFuture 对比 FutureTask 有什么优势? CompletableFuture 的使用方式可参考官方文档或相关教程。
首先就是 FutureTask 只能支持同步获取 ,也就是 get() 方法。
而 CompletableFuture 可以支持同步和异步两种方式 ,主要就是带 Async 的那几个,比如 thenAcceptAsync,以及 handle 之类的。
CompletableFuture 支持链式编程,而且提供了很多的方法,而 FutureTask 只能通过 get() 拿到结果之后再去进行操作,而且很多还要自己手动实现,比如异常捕获。
CompletableFuture 支持对多任务的编排,而 FutureTask 不支持多任务的操作,只能我们自己手动去编排。
CompletableFuture 光是对异常的处理就有好几个 API,而且有些 API 支持只有异常出现才会回调,相当于把异常单独拎出来处理,而 FutureTask 只能通过 try-catch 去包裹 get() 方法才能处理异常。
总结下来 FutureTask 的设计初衷是让我们能对任务的执行结果,异常信息,以及对任务状态的判断和取消,主要是初步的完成这些功能,而 CompletableFuture 相当于一个全能版,是对 FutureTask 的一个全面升级,除了 FutureTask 的基本功能之外,还多考虑了更优雅的处理执行结果和异常的方式,以及支持异步操作避免阻塞主线程,并且重点集成了对多线程编排的功能。
CompletableFuture 的底层是如何实现的? 那我们为了了解 CompletableFuture,可以从下面这三个维度入手。
CompletableFuture 对比 Future 有什么优势?为什么要封装 CompletableFuture?
Future 其实是 jdk8 以前用异编程的工具,主要就是提交一个异步任务 + 获取结果。
但是 Future 具有很多缺点,这些缺点都在 CompletableFuture 得到解决。
无法手动设置值,只能等任务执行完成,在一些失败场景不好处理。
没办法显示处理异常,只能通过 get() 的时候抛出异常才去处理。
多任务同步等待的时候需要借助 CountDownLatch 才能实现。
get() 方法是阻塞的。
Future 无链式调用,代码看起来会比较乱。
总结就是 Future 主要实现了提交异步任务并获取执行结果的基本需求,而 CompletableFuture 实现了 异步任务编排、异常处理、任务组合、非阻塞回调。
CompletableFuture 常见的 API 是什么?有什么区别?
public class Main {
public static void main (String[] args) throws InterruptedException {
CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("111" );
return null ;
});
CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("111" );
return null ;
}, Executors.newFixedThreadPool(10 ));
CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
System.out.println("111" );
});
CompletableFuture<Void> future4 = CompletableFuture.runAsync(() -> {
System.out.println("111" );
}, Executors.newFixedThreadPool(10 ));
}
}
supplyAsync 有返回值
runAsync 无返回值
然后执行时使用的线程池都是,有指定时使用指定的线程池,无指定时使用默认的 ForkJoinPool。
public class Main {
public static void main (String[] args) throws InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("111" );
return 222 ;
}).thenApply(res -> {
System.out.println(res);
return res;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("111" );
return 222 ;
}).thenApplyAsync(res -> {
System.out.println(res);
return res;
});
}
}
无论是 thenApplyAsync 还是 thenApply,其实本质上就是拿到上一个 future 处理之后 return 的结果,然后进行处理,只不过是使用同步还是异步的方式而已,使用异步的话,如果没有指定线程池,同样是用默认的 ForkJoinPool。
public class Main {
public static void main (String[] args) throws InterruptedException {
CompletableFuture.supplyAsync(() -> {
System.out.println("111" );
return 222 ;
}).thenAccept(res -> {
System.out.println(res);
});
CompletableFuture.supplyAsync(() -> {
System.out.println("111" );
return 222 ;
}).thenAccept(res -> {
System.out.println(res);
});
}
}
注意:消费和处理的区别,消费就是消费完之后到这里就结束了,而处理是处理完之后继续返回出去。
在上一步执行完成之后继续执行接下去的逻辑(无返回值)
public class Main {
public static void main (String[] args) throws InterruptedException {
CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("111" );
return 222 ;
}).thenRun(() -> {
System.out.println("222" );
});
CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("111" );
return 222 ;
}).thenRunAsync(() -> {
System.out.println("222" );
});
}
}
不关心上一步的结果,而是直接执行下一步,适合做一些收尾动作。
public class Main {
public static void main (String[] args) throws InterruptedException {
CompletableFuture<Object> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("111" );
return 222 ;
}).thenCompose(res -> {
System.out.println(res);
return null ;
});
}
}
串行的执行多个任务,不过一般用的不多,要串行没必要使用这个。
CompletableFuture<Void> allFutures = CompletableFuture.allOf(noticeDetails.stream()
.map(detail -> CompletableFuture.supplyAsync(() -> {
notice(detail);
return null ;
})).toArray(CompletableFuture[]::new ));
allFutures.whenComplete((v, e) -> {
if (e == null ) {
noticeOrder.setState("SUCCESS" );
Boolean res = noticeOrderService.updateById(noticeOrder);
Assert.isTrue(res, "update failed" );
} else {
log.error("notice failed" , e);
}
});
CompletableFuture 底层怎么实现的?为什么要这么实现?
CompletableFuture 内部采用了一种链式的结构 来处理异步计算的结果,每个 CompletableFuture 都有一个与之关联的 Completion 链,它可以包含多个 Completion 阶段,每个阶段都代表一个异步操作 ,并且可以指定它所依赖的前一个阶段的计算结果。
CompletableFuture 还使用了一种事件驱动 的机制来处理异步计算的完成事件。在一个 CompletableFuture 对象上注册的 Completion 阶段完成后,它会触发一个完成事件,然后 CompletableFuture 对象会执行与之关联的下一个 Completion 阶段。
CompletableFuture 的异步计算是通过线程池来实现的。CompletableFuture 在内部使用了一个 ForkJoinPool 线程池来执行异步任务。当我们创建一个 CompletableFuture 对象时,它会在内部创建一个任务,并提交到 ForkJoinPool 中去执行。
CAS 在操作系统层面是如何保证原子性的? 在 x86 架构的 cpu 中,通常是使用 cmpxchg 指令对总线进行锁住,防止在这个的期间有其它 cpu 来访问,并且在这个期间还会禁止 cpu 出现中断,执行完才会释放。
所以可以总结一下就是通过底层的指令对数据进行上锁,然后执行完释放。
此外因为 cmpxchg 是基于 cpu 缓存一致性协议实现的,所以还能做到可见性。
CAS 一定有自旋吗? 因为 CAS 是比较并交换,多线程同时操作的情况下成功率较低,所以一般会采用自旋的方式提高成功率。
短时间的自旋性能是不错的,至少比直接阻塞线程,然后等待唤醒要好,但是如果长时间自旋的话就不适合了,自旋是一直占用 cpu 的,cpu 没办法干别的事情,导致系统性能下降。
读写锁了解吗? 了解一点。主要是 ReentrantReadWriteLock。
public static class WriteLock implements Lock , java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L ;
private final Sync sync;
protected WriteLock (ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock () { sync.acquire(1 ); }
public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); }
public boolean tryLock () { return sync.tryWriteLock(); }
public boolean tryLock (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1 , unit.toNanos(timeout)); }
public void unlock () { sync.release(1 ); }
public Condition newCondition () { return sync.newCondition(); }
public String toString () { Thread o = sync.getOwner(); return super .toString() + ((o == null ) ? "[Unlocked]" : "[Locked by thread " + o.getName() + "]" ); }
public boolean isHeldByCurrentThread () { return sync.isHeldExclusively(); }
public int getHoldCount () { return sync.getWriteHoldCount(); }
}
public static class ReadLock implements Lock , java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L ;
private final Sync sync;
protected ReadLock (ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock () { sync.acquireShared(1 ); }
public void lockInterruptibly () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); }
public boolean tryLock () { return sync.tryReadLock(); }
public boolean tryLock (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1 , unit.toNanos(timeout)); }
public void unlock () { sync.releaseShared(1 ); }
public Condition newCondition () { throw new UnsupportedOperationException (); }
public String toString () { int r = sync.getReadLockCount(); return super .toString() + "[Read locks = " + r + "]" ; }
}
state 字段的高 16 位表示读锁的重入次数,低 16 位表示写锁的重入次数。
static int sharedCount (int c) { return c >>> SHARED_SHIFT; }
static int exclusiveCount (int c) { return c & EXCLUSIVE_MASK; }
@ReservedStackAccess protected final boolean tryAcquire (int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0 ) {
if (w == 0 || current != getExclusiveOwnerThread()) return false ;
if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error ("Maximum lock count exceeded" );
setState(c + acquires);
return true ;
}
if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false ;
setExclusiveOwnerThread(current);
return true ;
}
判断当前是否有人持有锁(读锁/写锁)
当前有人持有
判断是否是写锁且持有者为自己
不是直接返回失败
是就判断重入次数是否溢出,不溢出就重入次数 +1
当前无人持有
CAS 尝试加锁并设置持有线程为自己
可以看见,当一个线程已经拥有读锁的时候,并没有办法升级为写锁,主要是因为读锁是共享的,升级的话可能造成死锁。
@ReservedStackAccess protected final int tryAcquireShared (int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1 ;
int r = sharedCount(c);
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0 ) {
firstReader = current;
firstReaderHoldCount = 1 ;
}
else if (firstReader == current) {
firstReaderHoldCount++;
}
else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != LockSupport.getThreadId(current)) cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0 ) readHolds.set(rh);
rh.count++;
}
return 1 ;
}
return fullTryAcquireShared(current);
}
判断是否有人加了写锁且该锁属于自己
不是,直接快速失败
如果是说明允许继续拿读锁
判断自己是否是第一个拿读锁的线程,是的话,重入次数 +1
将自己的线程信息和重入次数信息进来起来
@ReservedStackAccess protected final boolean tryReleaseShared (int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
if (firstReaderHoldCount == 1 ) firstReader = null ;
else firstReaderHoldCount--;
}
else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != LockSupport.getThreadId(current)) rh = readHolds.get();
int count = rh.count;
if (count <= 1 ) {
readHolds.remove();
if (count <= 0 ) throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0 ;
}
}
第一个线程是通过变量的方式存储,只有第二个线程才是 ThreadLocal 存储。
HoldCounter 主要就是记录 线程 id 和重入次数。
如何理解 AQS? AQS 就是抽象队列同步器,是 JUC 下的一个同步器基础框架,常见的 Semaphore,ReentrantLock、CountDownLatch 都是基于 AQS 实现的。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io.Serializable {
在 AQS 内部,维护了一个 FIFO 队列和一个 volatile 的 int 类型的 state 变量。在 state=1 的时候表示当前对象锁已经被占有了,state 变量的值修改的动作通过 CAS 来完成。
private volatile int state;
protected final int getState () { return state; }
protected final void setState (int newState) { state = newState; }
protected final boolean compareAndSetState (int expect, int update) {
return U.compareAndSetInt(this , STATE, expect, update);
}
static final class Node {
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
}
private transient volatile Node head;
private transient volatile Node tail;
private Node enq (final Node node) {
for (;;) {
Node t = tail;
if (t == null ) {
if (compareAndSetHead(new Node ())) tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
可以发现底层的队列其实是通过Node 节点组成的双向链表实现的(Node 是 AQS 的一个内部类) 。
当线程抢锁失败就会被封装成一个 Node 节点并放入这条链表的尾部,也就是存放进队尾 。
然后无论是公平锁还是非公平锁,唤醒的逻辑都是一样的,都是唤醒队头节点。
但是公平锁就是唤醒之后,头节点会 调用 tryAcquire 时,hasQueuedPredecessors() 会返回 false,那么就会直接把锁给到头节点,而新来的线程自然就抢不到。
非公平就是唤醒之后还要通过 CAS 去抢夺,因为这个时候可能会有新的线程进来,不一定能抢过。
为什么公平模式下,基本等价于把锁给队头节点,那为何还要调用 tryAcquire 去进行 CAS 抢锁呢?
因为 state 是 volatile 修饰的,它的目的是做到可见性,并且对 state 的修改只能通过 CAS 实现,这样才能保证多线程下的可见性、有序性以及原子性。
所以直接通过这种形式上的 CAS 抢夺反而是更加合理的。
排它锁就是存在多线程竞争同一共享资源时,同一时刻只允许一个线程访问该共享资源 ,也就是多个线程中只能有一个线程获得锁资源,比如 Lock 中的 ReentrantLock 重入锁实现就是用到了 AQS 中的排它锁功能。
共享锁也称为读锁,就是在同一时刻允许多个线程同时获得锁资源 ,比如 CountDownLatch 和 Semaphore 都是用到了 AQS 中的共享锁功能。
AQS 总共有两种队列,一种是同步队列 ,用于实现锁的获取和释放。还有一种是条件队列 ,条件队列也是一个 FIFO 队列,用于在特定条件下管理线程的等待和唤醒 。
AQS 提供了两种模式来支持不同类型的同步器:独占模式和共享模式。
AQS 为什么采用双向链表? 因为数组实现队列的话移除操作比较多,数组的移除性能不是很好,此外就是数组需要扩容,所以使用队列比较合适。
双链表能支持双向遍历,无论是进行数据插入还是删除都会更加的灵活。
但是对于需要在链表中进行元素的插入和删除的场景来说,双向链表无疑是更加适合的 。
**获取同步状态,**AQS 提供了两种 API,一种是支持中断,一种不支持。
public final void acquire (int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();
}
public final void acquireInterruptibly (int arg) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException ();
if (!tryAcquire(arg)) doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly (int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true ;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null ;
failed = false ;
return ;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException ();
}
} finally {
if (failed) cancelAcquire(node);
}
}
在这个方法的 finally 中执行了一个 cancelAcquire 的动作,也就是说在 acquire 的过程中,如果出现了线程中断异常,那么就会执行这个方法,他的主要作用就是将中断的线程节点从 AQS 同步队列中移除。
而涉及到将具体某一个节点移除,使用双向链表会更加的合适,单向链表还需要从头遍历找到这个节点才能删除。
private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true ;
if (ws > 0 ) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0 );
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false ;
}
当一个线程获取锁失败后,就需要根据前面的节点来判断自己接下来的操作,是被挂起,还是继续尝试。
可以发现上面的这个过程就需要查询前一个节点的状态来决定当前节点的状态,所以需要使用双向链表,如果是单向链表还要从头开始遍历。
public final boolean isQueued (Thread thread) {
if (thread == null ) throw new NullPointerException ();
for (Node p = tail; p != null ; p = p.prev)
if (p.waiter == thread) return true ;
return false ;
}
这里是采用从后向前遍历,因为新加入的线程会塞入队尾,从后向前遍历可能可以更快的找到(虽然概率不高)。
可以发现这里需要支持从后向前遍历,所以比如使用双向链表。
AQS 还支持条件变量,这允许线程在特定条件满足之前挂起。条件队列需要能够从等待队列中移动节点到条件队列,以及反向操作 。双向链表使得这种操作更加直接和高效。
AQS 的核心组成介绍一下?分别有什么用?
1 个同步状态
1 个同步队列
2 套模板方法
若干工具方法
private volatile int state;
加 volatile 是为了保证有序性 + 可见性,搭配上 CAS 的原子性,能适配高并发场景。
独占锁:表示资源是否被占用
共享锁:表示资源数量
读写锁:高 16 位表示读锁重入数,低 16 位表示写锁重入数
一个同步队列是指 Node 节点堆起来的双向链表实现的 FIFO 队列
abstract static class Node {
volatile Node prev;
volatile Node next;
Thread waiter;
volatile int status;
}
主要就是标识封装的线程以及这个节点的前后驱节点以及节点状态。
同步队列主要是用来存储抢锁失败的那些线程,它们会被阻塞起来并封装成节点塞入队尾等待被唤醒重新抢锁。
2 套模板方法主要是指 AQS 对共享和独占的具体实现
final int acquire (Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0 , postSpins = 0 ;
boolean interrupted = false , first = false ;
Node pred = null ;
for (;;) {
if (!first && (pred = (node == null ) ? null : node.prev) != null && !(first = (head == pred))) {
if (pred.status < 0 ) {
cleanQueue();
} else if (pred.prev == null ) {
Thread.onSpinWait();
}
}
if (first || pred == null ) {
boolean acquired;
try {
if (shared) acquired = (tryAcquireShared(arg) >= 0 );
else acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false );
throw ex;
}
if (acquired) {
if (first) {
node.prev = null ;
head = node;
pred.next = null ;
node.waiter = null ;
if (shared) signalNextIfShared(node);
if (interrupted) current.interrupt();
}
return 1 ;
}
}
if (node == null ) {
if (shared) node = new SharedNode ();
else node = new ExclusiveNode ();
} else if (pred == null ) {
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t);
if (t == null ) tryInitializeHead();
else if (!casTail(t, node)) node.setPrevRelaxed(null );
else t.next = node;
} else if (first && spins != 0 ) {
--spins;
Thread.onSpinWait();
} else if (node.status == 0 ) {
node.status = WAITING;
} else {
long nanos;
spins = postSpins = (byte )((postSpins << 1 ) | 1 );
if (!timed) LockSupport.park(this );
else if ((nanos = time - System.nanoTime()) > 0L ) LockSupport.parkNanos(this , nanos);
else break ;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible) break ;
}
}
return cancelAcquire(node, interrupted, interruptible);
}
独占 acquire()、acquireInterruptibly()、release()。
共享 acquireShared()、releaseShared()。
最较新的 jdk 版本中大部分逻辑已经抽象封装成一套接口,在接口内部进行判断处理,比如 acquire 方法。
主要就是一些调用底层的 Unsafe 实现的方法,比如 CAS 以及阻塞和唤醒线程。
AQS 的核心设计思想了解吗? AQS 复制封装一些模板方法,AQS 本身是一个抽象类,子类继承之后通过对方法进行重写,从而实现自己想要的逻辑。
AQS 对公平锁和非公平锁的实现说一下? 具体的区别就是在每次唤醒的时候都是唤醒队头元素,不过公平实现的情况下,CAS 的最终结果会是队头元素成功获取到锁,在这个过程中如果有新来的线程就会因为获取不到而进入同步队列等待。
非公平就是 CAS 的时候有可能会让新来的线程成功拿到锁。
AQS 的 acquire 的流程聊一下?和 acquireInterruptibly 有什么区别吗? 为什么 acquire 要忽略中断? public final void acquire (int arg) {
if (!tryAcquire(arg))
acquire(null , arg, false , false , false , 0L );
}
final int acquire (Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0 , postSpins = 0 ;
boolean interrupted = false , first = false ;
Node pred = null ;
for (;;) {
if (!first && (pred = (node == null ) ? null : node.prev) != null && !(first = (head == pred))) {
if (pred.status < 0 ) {
cleanQueue();
continue ;
}
else if (pred.prev == null ) {
Thread.onSpinWait();
continue ;
}
}
if (first || pred == null ) {
boolean acquired;
try {
if (shared) acquired = (tryAcquireShared(arg) >= 0 );
else acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false );
throw ex;
}
if (acquired) {
if (first) {
node.prev = null ;
head = node;
pred.next = null ;
node.waiter = null ;
if (shared)
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
return 1 ;
}
}
if (node == null ) {
if (shared) node = new SharedNode ();
else node = new ExclusiveNode ();
}
else if (pred == null ) {
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t);
if (t == null ) tryInitializeHead();
else if (!casTail(t, node))
node.setPrevRelaxed(null );
else t.next = node;
}
else if (first && spins != 0 ) {
--spins;
Thread.onSpinWait();
}
else if (node.status == 0 ) {
node.status = WAITING;
}
else {
spins = postSpins = (byte )((postSpins << 1 ) | 1 );
if (!timed)
LockSupport.park(this );
else if ((nanos = time - System.nanoTime()) > 0L )
LockSupport.parkNanos(this , nanos);
else
break ;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible) break ;
}
}
return cancelAcquire(node, interrupted, interruptible);
}
尝试获取锁,如果成功直接返回,失败进入 AQS 队列的逻辑。
将当前线程封装成一个 Node 节点,加入队列的尾部。
进入自旋,判断自己是不是队列的第二个节点(head 的下一个节点,也可以认为是第一个等待节点),如果出现异常就会将当前节点设置为 CANCELLED 并结束。
如果是,说明有资格抢锁。
不是,准备阻塞。
有资格抢锁的话会去抢锁,成功直接返回,失败同样等待阻塞。
失败,将节点设置为 WAITING,并调用 park() 将当前线程阻塞住。
被唤醒后,继续抢锁,如果成功就返回。
可以发现以上的逻辑,在出现异常的时候 interrupted |= Thread.interrupted()) && interruptible 只是记录,只有当 interruptible 为 true 时在退出循环,但是这个值调用方传递的是 false,也就是只要没拿到锁或不抛出异常就不会退出。
public final void acquireInterruptibly (int arg) throws InterruptedException {
if (Thread.interrupted() || (!tryAcquire(arg) && acquire(null , arg, false , true , false , 0L ) < 0 )) throw new InterruptedException ();
}
其实底层调用的逻辑和前面是一样的,只不过 interruptible 参数传递的是 true,这样就能做到出现异常的时候会及时的抛出,不会忽略。
acquire 之所以要忽略中断是因为语义决定的,这类锁的特点,我们更加倾向于获取到锁,而不是出现中断退出,当然 AQS 的实现类基本对两种方式都有实现,例如 ReentranLock。
AQS 是如何实现线程的等待和唤醒的? 这个主要是两个方法,Java 中的 park() 和 unpark(),这两个都是本地方法,依赖底层的实现。
park() 负责阻塞线程,unpark() 负责唤醒线程。
当一个线程尝试获取锁或者同步器时,如果获取失败,AQS 会将该线程封装成一个 Node 并添加到等待队列中,然后通过 LockSupport.park() 将该线程阻塞。
当一个线程释放锁或者同步器时,AQS 会通过 LockSupport.unpark() 方法将等待队列中的第一个线程唤醒,并让其重新尝试获取锁或者同步器。
除了基本的等待和唤醒机制,AQS 还提供了条件变量(Condition)的实现,用于在某些条件不满足时让线程等待,并在条件满足时唤醒线程。具体实现是通过创建一个等待队列,将等待的线程封装成 Node 并添加到队列中,然后将这些线程从同步队列中移除,并在条件满足时将等待队列中的所有线程唤醒。
AQS 的同步队列和条件队列原理? AQS 中提供了两种队列,同步队列主要是为了实现锁机制,而条件队列则是为了实现线程间的协调和通信。
目的不同 :同步队列主要用于管理锁的获取和释放,而条件队列用于等待特定条件的满足。
使用方式不同 :同步队列是 AQS 自动管理的,开发者通常不需要直接与之交互;而条件队列是通过 Condition 接口暴露给开发者的,需要显式地调用等待(await)和通知(signal/signalAll)方法。
队列类型不同 :虽然它们都是队列结构,但同步队列是所有基于 AQS 同步器共享的,每个同步器实例只有一个同步队列;条件队列是每个 Condition 实例特有的,一个同步器可以有多个 Condition 对象,因此也就有多个条件队列。
同步队列主要用于实现锁的获取和释放 。如我们常用的 ReentrantLock,就是基于同步队列来实现的。
队列是先进先出的实现逻辑。
同步队列的逻辑就是,当一个线程去尝试获取锁失败的时候会封装成一个 Node 节点,塞入队列的尾部。
然后当持有锁的线程将锁释放的时候会唤醒队头元素起来进行抢锁行为。
static final class Node {
Node prev;
Node next;
Thread thread;
int waitStatus;
}
private Node addWaiter (Node mode) {
Node node = new Node (Thread.currentThread(), mode);
Node pred = tail;
if (pred != null ) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq (final Node node) {
for (;;) {
Node t = tail;
if (t == null ) {
if (compareAndSetHead(new Node ())) tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
条件队列其实是让开发人员自己来制定规则,能做到当线程不满足一定的条件时就进入条件队列,等待被唤醒。
条件队列与同步队列不同,它是基于 Condition 接口实现 的,用于管理那些因为某些条件未满足而等待的线程。当条件满足时,这些线程可以被唤醒。每个 Condition 对象都有自己的一个条件队列 。
ConditionObject 是 AQS 的一个内部类(实现了Condition 接口),用于实现条件变量。条件变量是并发编程中一种用于线程间通信的机制,它允许一个或多个线程在特定条件成立之前等待,同时释放相关的锁。这在某种程度上类似于对象监视器模式中的 wait() 和 notify() 方法,但提供了更灵活和更强大的控制。
public class ConditionObject implements Condition , java.io.Serializable {
private transient Node firstWaiter;
private transient Node lastWaiter;
}
主要的原理:await():使当前线程释放锁并进入等待队列 ,直到被另一个线程的 signal() 或 signalAll() 方法唤醒,或被中断。
ConditionObject 的原理就是把拿到锁,但是不满足特定条件的线程从同步队列移动到条件队列中,等到其满足特定条件之后再从条件队列移动会同步队列重新进行抢锁操作。
同步队列为双向链表,只存等锁的线程,而条件队列是一个单项链表,主要是存储那些拿到锁但是不满足特定条件的线程。
await() 方法就是为已经持有锁,但是不满足我们指定的条件,且不在同步队列的线程创建一个新节点并放入条件队列。
signal() 方法是将当前线程从条件队列移动到同步队列中。
这里最主要的方法就是 await() 以及 signal(),接下来就拆解一下。
**拆解 await() 方法:**它的具体目标就是 释放掉所有锁——进入条件队列——等待被唤醒 。
public final void await () throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException ();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0 ;
while (!isOnSyncQueue(node)) {
LockSupport.park(this );
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT;
if (node.nextWaiter != null ) unlinkCancelledWaiters();
if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode);
}
先判断调用 await() 前是否就已经被中断了,如果中断就直接抛异常。
将当前节点包装成 ConditionNode 并将状态记录为 CONDITION 表示等待条件满足,然后该节点会塞到链表尾部 。
将当前节点拥有的锁全部释放 ,避免其它线程拿不到锁导致死锁现象。
初始化几个变量,记录在这个过程中是否出现了中断以及中断的类型 (后续是补一下补丁就行还是抛异常)。
**(循环阻塞)**开一个循环进行检查,如果发现当前节点在同步队列中,说明条件满足或是已经出现中断,这两种情况都会跳出循环,否则就继续通过 park 方法进行阻塞(阻塞的线程只能通过 signal()/中断 唤醒)。
跳出循环后就会尝试去抢锁 ,如果出现不需要抛异常的中断就会记录需要打补丁。
遍历清理掉条件队列中状态为 CANCELLED 的 Node (比如超时/中断的节点)。
根据上述过程中记录的中断判断是需要打补丁还是抛异常 。
拆解 signal() 方法:把条件队列的 第一个有效线程 ,移回同步队列,唤醒它让它抢锁。
public final void signal () {
if (!isHeldExclusively()) throw new IllegalMonitorStateException ();
Node first = firstWaiter;
if (first != null ) doSignal(first);
}
private void doSignal (Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null ) lastWaiter = null ;
first.nextWaiter = null ;
} while (!transferForSignal(first) && (first = firstWaiter) != null );
}
先检查当前线程是否拿到锁 了,没拿到直接抛出异常 ,避免没拿锁的线程乱唤醒。
将条件队列的头节点拿出来执行 doSignal() 进行唤醒 。
doSignal() 的逻辑就是将条件队列的头节点移除 ,如果移除完发现整个条件队列空了,就将 lastWaiter 也置 null。
执行 transferForSignal(),从头节点开始尝试将一个节点放到同步队列中,如果失败就继续下一个 (因为有些接节点可能已经被取消了,但是来不及清除)。
为什么 await()要用 while 循环判断 isOnSyncQueue(node)?
防止'虚假唤醒':线程可能被中断、超时等非 signal 原因唤醒,此时节点还没到同步队列,必须重新 park;
保证语义:只有节点到了同步队列,才能去抢锁,否则继续等。
fullyRelease(node)为什么要'完全释放'锁?
如果是重入锁(比如线程拿了 3 次锁,state=3),必须把 state 置 0,否则其他线程永远拿不到锁;
释放失败会标记节点为 CANCELLED,避免线程一直等。
条件队列:只需要'从头取、从尾加',单向链表足够,且节省内存;
同步队列:需要频繁移除取消的节点(中断/超时),双向链表通过 prev 指针能快速找到前驱,O(1) 移除,效率更高。
signal()为什么只唤醒头节点,不是所有节点?
避免'惊群效应':唤醒所有线程会导致多个线程同时抢锁,只有一个能拿到,其余继续阻塞,浪费 CPU;
符合 FIFO 语义:条件队列也是先来后到,唤醒头节点最公平。
条件队列的常见用法:基本不用自己去造,直接基于继承了 AQS 的类去实现就行。
class ConditionObjectUsage {
private final ReentrantLock lock = new ReentrantLock ();
private final Condition condition = lock.newCondition();
private boolean isPaid = false ;
public void waitForPayment () throws InterruptedException {
lock.lock();
try {
while (!isPaid) {
condition.await();
}
System.out.println("订单已支付,执行发货逻辑" );
} finally {
lock.unlock();
}
}
public void signalPayment () {
lock.lock();
try {
isPaid = true ;
condition.signal();
} finally {
lock.unlock();
}
}
public static void main (String[] args) throws InterruptedException {
ConditionObjectUsage demo = new ConditionObjectUsage ();
new Thread (() -> {
try {
demo.waitForPayment();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
Thread.sleep(1000 );
new Thread (demo::signalPayment).start();
}
}
说白了无论是哪个 AQS 的实现类,底层其实都是只有一把锁,就是基于 AQS 的 statue 实现的。
那么这里的 ReentrantLock 也不例外,其实就是两个线程来回的抢锁并进行操作。
线程 1 抢到之后发现自己的条件不满足,只能调用 condition.await(); 释放锁的同时将当前线程阻塞住并塞入条件队列。
条件 2 抢到锁之后会修改条件,并唤醒一个条件队列的线程,再释放锁。
在线程 1 中虽然有两步能释放锁,但是没办法同时执行,因为执行了 condition.await(); 释放锁的同时线程也被阻塞住了,不会向下执行,如果没执行到这个方法的,则通过 lock.unlock(); 去释放。
然后可能有些人还是会疑惑,同步队列和条件队列存储哪些线程?
同步队列存储那些抢锁失败的,条件队列存储抢锁成功但是不满足条件。
首先无论是新来的节点还是在同步队列中被唤醒的节点,它第一步会尝试 CAS 获取锁,如果获取成功,就判断是否满足条件,满足就执行逻辑,不满足就放到条件队列中。
注意:想要存储到条件队列的前提是获取锁成功 + 条件不满足 + 不在同步队列中。
不过只要你是获取锁成功的一定不会在同步队列中,即使原本在也会被移除。
相关免费在线工具 Keycode 信息 查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
Escape 与 Native 编解码 JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
JavaScript / HTML 格式化 使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
JavaScript 压缩与混淆 Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
加密/解密文本 使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
Gemini 图片去水印 基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online