LangChain4j 并发处理与线程安全深度解析
一、LangChain4j 并发架构设计
1.1 并发模型概述
LangChain4j 采用多层级并发控制策略:
┌───────────────────────────────────────────────────┐
│ 应用层并发控制 │
├───────────────────────────────────────────────────┤
│ 线程池配置 | 请求队列 | 超时控制 | 熔断机制 │
├───────────────────────────────────────────────────┤
│ HTTP 客户端并发控制 │
├───────────────────────────────────────────────────┤
│ 连接池管理 | 连接复用 | 请求复用 | 流控机制 │
├───────────────────────────────────────────────────┤
│ 供应商 API 并发限制 │
├───────────────────────────────────────────────────┤
│ 速率限制 | 令牌限制 | 配额管理 | 优先级队列 │
└───────────────────────────────────────────────────┘
1.2 核心并发组件
public interface ConcurrentModel {
interface AsyncChatModel extends ChatLanguageModel {
CompletableFuture<Response<String>> generateAsync(String prompt);
CompletableFuture<Response<String>> generateAsync(List<ChatMessage> messages);
}
interface StreamingChatModel {
Flux<Response<String>> generateStream(List<ChatMessage> messages);
}
interface BatchChatModel {
List<Response<String>> generateBatch(List<List<ChatMessage>> batches);
}
}
二、并发请求处理机制详解
2.1 HTTP 客户端并发配置
package com.example.concurrent;
import dev.langchain4j.model.openai.OpenAiChatModel;
import okhttp3.*;
import java.util.concurrent.*;
public class ConcurrentHttpClientConfig {
public static OkHttpClient createHighPerformanceClient() {
ConnectionPool connectionPool = new ConnectionPool(
100,
5, TimeUnit.MINUTES
);
Dispatcher dispatcher = new Dispatcher();
dispatcher.setMaxRequests(200);
dispatcher.setMaxRequestsPerHost(50);
OkHttpClient.Builder builder = new OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.writeTimeout(60, TimeUnit.SECONDS)
.readTimeout(120, TimeUnit.SECONDS)
.callTimeout(300, TimeUnit.SECONDS)
.connectionPool(connectionPool)
.dispatcher(dispatcher);
builder.addInterceptor(new RetryInterceptor(3));
builder.addInterceptor(new RateLimitInterceptor());
builder.addInterceptor(new LoggingInterceptor());
builder.connectionSpecs(Arrays.asList(
ConnectionSpec.MODERN_TLS,
ConnectionSpec.COMPATIBLE_TLS
));
builder.dns(new DnsOverHttps.Builder()
.url(HttpUrl.get("https://cloudflare-dns.com/dns-query"))
.build());
return builder.build();
}
static class RetryInterceptor implements Interceptor {
private final int maxRetries;
public RetryInterceptor(int maxRetries) {
this.maxRetries = maxRetries;
}
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
Response response = null;
IOException exception = null;
for (int retryCount = 0; retryCount <= maxRetries; retryCount++) {
try {
response = chain.proceed(request);
if (shouldRetry(response, retryCount)) {
response.close();
if (retryCount < maxRetries) {
waitBeforeRetry(retryCount);
continue;
}
}
return response;
} catch (IOException e) {
exception = e;
if (retryCount < maxRetries) {
waitBeforeRetry(retryCount);
continue;
}
}
}
throw exception != null ? exception : new IOException("Request failed");
}
private boolean shouldRetry(Response response, int retryCount) {
int code = response.code();
return code == 429 ||
code >= 500 ||
code == 408;
}
private void waitBeforeRetry(int retryCount) {
try {
long waitTime = (long) Math.pow(2, retryCount) * 1000;
Thread.sleep(waitTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
static class RateLimitInterceptor implements Interceptor {
private final RateLimiter rateLimiter;
public RateLimitInterceptor() {
this.rateLimiter = RateLimiter.create(10.0, 20);
}
@Override
public Response intercept(Chain chain) throws IOException {
rateLimiter.acquire();
return chain.proceed(chain.request());
}
}
}
2.2 模型层并发控制
package com.example.concurrent;
import dev.langchain4j.model.chat.ChatLanguageModel;
import dev.langchain4j.model.output.Response;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class ConcurrentChatModel implements ChatLanguageModel {
private final ChatLanguageModel delegate;
private final ExecutorService executorService;
private final Semaphore concurrencyLimiter;
private final AtomicLong requestCounter;
private final ConcurrentHashMap<String, AtomicLong> requestMetrics;
private final CircuitBreaker circuitBreaker;
public ConcurrentChatModel(ChatLanguageModel delegate, int maxConcurrency, int queueSize) {
this.delegate = delegate;
this.executorService = new ThreadPoolExecutor(
maxConcurrency,
maxConcurrency * 2,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueSize),
new ThreadFactory() {
private ();
Thread {
(r, + counter.incrementAndGet());
t.setDaemon();
t;
}
},
.CallerRunsPolicy()
);
.concurrencyLimiter = (maxConcurrency);
.requestCounter = ();
.requestMetrics = <>();
.circuitBreaker = (, , );
}
Response<String> {
generate(Collections.singletonList(UserMessage.from(prompt)));
}
Response<String> {
(!circuitBreaker.allowRequest()) {
();
}
System.nanoTime();
UUID.randomUUID().toString();
{
(!concurrencyLimiter.tryAcquire(, TimeUnit.SECONDS)) {
();
}
CompletableFuture<Response<String>> future = CompletableFuture.supplyAsync(() -> {
{
recordRequestStart(requestId);
delegate.generate(messages);
} (Exception e) {
circuitBreaker.recordFailure();
e;
}
}, executorService);
Response<String> response = future.get(, TimeUnit.SECONDS);
circuitBreaker.recordSuccess();
recordRequestSuccess(requestId, startTime);
response;
} (TimeoutException e) {
circuitBreaker.recordFailure();
(, e);
} (ExecutionException e) {
circuitBreaker.recordFailure();
(, e.getCause());
} (Exception e) {
circuitBreaker.recordFailure();
(, e);
} {
concurrencyLimiter.release();
}
}
List<Response<String>> {
IntStream.range(, n).parallel()
.mapToObj(i -> generate(messages))
.collect(Collectors.toList());
}
Stream<Response<String>> {
StreamSupport.stream(
Spliterators.spliteratorUnknownSize( <Response<String>>() {
;
{
hasNext;
}
Response<String> {
;
}
}, Spliterator.ORDERED), );
}
CompletableFuture<Response<String>> {
CompletableFuture.supplyAsync(() -> generate(messages), executorService);
}
CompletableFuture<List<Response<String>>> {
List<CompletableFuture<Response<String>>> futures = batches.stream()
.map(::generateAsync)
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray( []))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
{
requestCounter.incrementAndGet();
requestMetrics.put(requestId, (System.nanoTime()));
}
{
System.nanoTime() - startTime;
requestMetrics.get(requestId);
(counter != ) {
counter.set(duration);
}
}
Map<String, Object> {
Map<String, Object> metrics = <>();
metrics.put(, requestCounter.get());
metrics.put(, concurrencyLimiter.availablePermits());
metrics.put(, ((ThreadPoolExecutor) executorService).getQueue().size());
metrics.put(, ((ThreadPoolExecutor) executorService).getActiveCount());
List<Long> durations = requestMetrics.values().stream()
.map(AtomicLong::get)
.filter(v -> v > )
.collect(Collectors.toList());
(!durations.isEmpty()) {
durations.stream()
.mapToLong(Long::longValue)
.average()
.orElse() / ;
metrics.put(, avgDuration);
}
metrics;
}
{
failureThreshold;
recoveryTimeout;
failureRateThreshold;
();
();
;
;
{
.failureThreshold = failureThreshold;
.recoveryTimeout = recoveryTimeout;
.failureRateThreshold = failureRateThreshold;
}
{
(!isOpen) {
;
}
(System.currentTimeMillis() - lastFailureTime > recoveryTimeout) {
isOpen = ;
failureCount.set();
successCount.set();
;
}
;
}
{
successCount.incrementAndGet();
calculateFailureRate();
(failureRate < failureRateThreshold) {
failureCount.set();
}
}
{
failureCount.incrementAndGet();
lastFailureTime = System.currentTimeMillis();
(failures >= failureThreshold) {
isOpen = ;
}
}
{
successCount.get() + failureCount.get();
total > ? () failureCount.get() / total : ;
}
}
}
2.3 异步和响应式编程支持
package com.example.concurrent;
import dev.langchain4j.model.chat.ChatLanguageModel;
import dev.langchain4j.model.output.Response;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class ReactiveChatModel {
private final ChatLanguageModel delegate;
private final Scheduler scheduler;
public ReactiveChatModel(ChatLanguageModel delegate) {
this.delegate = delegate;
this.scheduler = Schedulers.newBoundedElastic(100, 1000, "ReactiveChatModel", 60, true);
}
public Mono<Response<String>> generateReactive(List<ChatMessage> messages) {
return Mono.fromCallable(() -> delegate.generate(messages))
.subscribeOn(scheduler)
.timeout(Duration.ofSeconds(120))
.onErrorResume(e -> {
return Mono.error(new RuntimeException("Generation failed", e));
});
}
public Flux<String> generateStreamReactive(List<ChatMessage> messages) {
Flux.create(fluxSink -> {
{
String[] chunks = {, , };
(String chunk : chunks) {
(fluxSink.isCancelled()) {
;
}
fluxSink.next(chunk);
{
Thread.sleep();
} (InterruptedException e) {
fluxSink.error(e);
;
}
}
fluxSink.complete();
} (Exception e) {
fluxSink.error(e);
}
}).subscribeOn(scheduler);
}
Flux<Response<String>> {
Flux.fromIterable(batches)
.parallel()
.runOn(Schedulers.parallel())
.flatMap(::generateReactive)
.sequential();
}
Flux<Response<String>> {
messageFlux
.onBackpressureBuffer()
.flatMap(::generateReactive, );
}
}
三、线程安全问题深度分析
3.1 LangChain4j 核心组件线程安全分析
package com.example.threadsafe;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;
public class ThreadSafetyAnalysis {
public static class StatelessComponent {
public String process(String input) {
return input.toUpperCase();
}
}
public static class StatefulComponent {
private int counter = 0;
public int increment() {
counter++;
return counter;
}
private AtomicInteger safeCounter = new AtomicInteger(0);
public int incrementSafely() {
return safeCounter.incrementAndGet();
}
}
public <K, V> {
ConcurrentHashMap<K, V> cache = <>();
();
V {
lock.readLock().lock();
{
cache.get(key);
} {
lock.readLock().unlock();
}
}
{
lock.writeLock().lock();
{
cache.put(key, value);
} {
lock.writeLock().unlock();
}
}
V {
cache.computeIfAbsent(key, mappingFunction);
}
}
{
ThreadLocal<Session> threadLocalSession = <>();
ThreadLocal<Session> managedThreadLocalSession = <Session>() {
Session {
();
}
{
.get().close();
.remove();
}
};
ConcurrentHashMap<String, Session> sessionMap = <>();
Session {
sessionMap.computeIfAbsent(sessionId, id -> (id));
}
}
{
List<Tool> tools;
();
Object {
toolLock.lock();
{
findTool(toolName);
tool.execute(input);
} {
toolLock.unlock();
}
}
ConcurrentHashMap<String, ReentrantLock> toolLocks = <>();
Object {
toolLocks.computeIfAbsent(toolName, k -> ());
lock.lock();
{
findTool(toolName);
tool.execute(input);
} {
lock.unlock();
}
}
}
}
3.2 LangChain4j 内置组件的线程安全分析
3.2.1 ChatMemory 的线程安全性
package com.example.threadsafe;
import dev.langchain4j.memory.ChatMemory;
import dev.langchain4j.memory.chat.MessageWindowChatMemory;
public class ChatMemoryThreadSafety {
public static void analyzeInMemoryChatMemory() {
ChatMemory memory = MessageWindowChatMemory.withMaxMessages(10);
ChatMemory synchronizedMemory = synchronizeChatMemory(memory);
}
public static ChatMemory synchronizeChatMemory(ChatMemory delegate) {
return new ChatMemory() {
private final Object lock = new Object();
@Override
public String id() {
synchronized (lock) {
return delegate.id();
}
}
{
(lock) {
delegate.add(message);
}
}
List<ChatMessage> {
(lock) {
<>(delegate.messages());
}
}
{
(lock) {
delegate.clear();
}
}
};
}
{
String sessionId;
RedisTemplate<String, Object> redisTemplate;
();
{
.sessionId = sessionId;
.redisTemplate = redisTemplate;
}
{
+ sessionId;
redisTemplate.execute( <Object>() {
Object DataAccessException {
operations.watch(key);
List<ChatMessage> messages = (List<ChatMessage>) operations.opsForValue().get(key);
(messages == ) {
messages = <>();
}
messages.add(message);
operations.multi();
operations.opsForValue().set(key, messages);
operations.exec();
}
});
}
}
}
3.2.2 EmbeddingModel 的线程安全性
package com.example.threadsafe;
import dev.langchain4j.model.embedding.EmbeddingModel;
import dev.langchain4j.model.output.Response;
import java.util.List;
import java.util.concurrent.*;
public class EmbeddingModelConcurrency {
public static class ThreadSafeEmbeddingModel implements EmbeddingModel {
private final EmbeddingModel delegate;
private final ExecutorService executorService;
private final RateLimiter rateLimiter;
public ThreadSafeEmbeddingModel(EmbeddingModel delegate, int maxConcurrency) {
this.delegate = delegate;
this.executorService = new ThreadPoolExecutor(
maxConcurrency,
maxConcurrency * 2,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy()
);
this.rateLimiter = RateLimiter.create(100.0);
}
@Override
public Response<Embedding> {
embedAll(Collections.singletonList(text)).content().get();
}
Response<List<Embedding>> {
rateLimiter.acquire(texts.size());
List<CompletableFuture<Response<Embedding>>> futures = texts.stream()
.map(text -> CompletableFuture.supplyAsync(() -> delegate.embed(text), executorService))
.collect(Collectors.toList());
List<Embedding> embeddings = futures.stream()
.map(CompletableFuture::join)
.map(Response::content)
.collect(Collectors.toList());
Response.from(embeddings);
}
Response<List<Embedding>> {
;
List<List<String>> batches = <>();
( ; i < texts.size(); i += batchSize) {
batches.add(texts.subList(i, Math.min(i + batchSize, texts.size())));
}
List<Embedding> allEmbeddings = batches.parallelStream()
.flatMap(batch -> {
rateLimiter.acquire(batch.size());
delegate.embedAll(batch).content().stream();
})
.collect(Collectors.toList());
Response.from(allEmbeddings);
}
}
}
四、并发最佳实践和模式
4.1 连接池最佳实践
package com.example.bestpractice;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
public class ConnectionPoolBestPractice {
public static CloseableHttpClient createOptimalHttpClient() {
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(200);
connectionManager.setDefaultMaxPerRoute(50);
connectionManager.setValidateAfterInactivity(30000);
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(30000)
.setSocketTimeout(60000)
.setConnectionRequestTimeout(5000)
.build();
return HttpClientBuilder.create()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(requestConfig)
.setRetryHandler(new DefaultHttpRequestRetryHandler(3, ))
.disableCookieManagement()
.setKeepAliveStrategy( ())
.build();
}
{
PoolingHttpClientConnectionManager connectionManager;
ScheduledExecutorService monitorExecutor;
{
.connectionManager = connectionManager;
.monitorExecutor = Executors.newSingleThreadScheduledExecutor();
startMonitoring();
}
{
monitorExecutor.scheduleAtFixedRate(() -> {
{
connectionManager.getTotalStats();
System.out.println();
System.out.println( + stats.getAvailable());
System.out.println( + stats.getLeased());
System.out.println( + stats.getPending());
System.out.println( + stats.getMax());
(stats.getAvailable() < ) {
System.err.println();
}
connectionManager.closeExpiredConnections();
connectionManager.closeIdleConnections(, TimeUnit.SECONDS);
} (Exception e) {
System.err.println( + e.getMessage());
}
}, , , TimeUnit.SECONDS);
}
}
}
4.2 异步编程最佳实践
package com.example.bestpractice;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class AsyncBestPractice {
public static ExecutorService createOptimalThreadPool() {
int corePoolSize = Runtime.getRuntime().availableProcessors();
int maxPoolSize = corePoolSize * 2;
return new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10000),
new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "LLM-Worker-" + counter.incrementAndGet());
t.setDaemon(true);
t.setUncaughtExceptionHandler((thread, throwable) -> {
System.err.println( + thread.getName() + + throwable.getMessage());
});
t;
}
},
.AbortPolicy()
);
}
{
CompletableFuture<String> {
CompletableFuture<String> timeoutFuture = <>();
CompletableFuture.runAsync(() -> {
{
Thread.sleep(unit.toMillis(timeout));
timeoutFuture.completeExceptionally( ());
} (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
future.applyToEither(timeoutFuture, Function.identity());
}
CompletableFuture<List<String>> {
(maxConcurrency);
List<CompletableFuture<String>> controlledFutures = futures.stream()
.map(future -> CompletableFuture.supplyAsync(() -> {
{
semaphore.acquire();
future.get();
} (InterruptedException e) {
Thread.currentThread().interrupt();
(e);
} (ExecutionException e) {
(e);
} {
semaphore.release();
}
}))
.collect(Collectors.toList());
CompletableFuture.allOf(controlledFutures.toArray( []))
.thenApply(v -> controlledFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
}
{
Flux<String> {
source
.onBackpressureBuffer(, BufferOverflowStrategy.DROP_LATEST)
.publishOn(Schedulers.parallel())
.doOnNext(item -> {
})
.doOnError(error -> {
})
.doOnComplete(() -> {
});
}
Mono<String> {
mono
.retryWhen(Retry.backoff(, Duration.ofSeconds())
.maxBackoff(Duration.ofSeconds())
.jitter()
.doAfterRetry(retrySignal -> {
}))
.timeout(Duration.ofSeconds())
.onErrorResume(error -> {
Mono.just();
});
}
}
}
五、性能优化和监控
5.1 性能监控实现
package com.example.monitoring;
import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.binder.jvm.*;
import io.micrometer.core.instrument.binder.system.*;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class PerformanceMonitor {
private final MeterRegistry meterRegistry;
private final ConcurrentHashMap<String, Timer> timers;
private final AtomicLong totalRequests;
private final AtomicLong failedRequests;
private final AtomicLong activeRequests;
public PerformanceMonitor() {
this.meterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
this.timers = new ConcurrentHashMap<>();
this.totalRequests = new AtomicLong();
this.failedRequests = new AtomicLong();
this.activeRequests = new AtomicLong();
registerMetrics();
}
private void registerMetrics() {
().bindTo(meterRegistry);
().bindTo(meterRegistry);
().bindTo(meterRegistry);
().bindTo(meterRegistry);
().bindTo(meterRegistry);
().bindTo(meterRegistry);
Gauge.builder(, totalRequests, AtomicLong::get)
.description()
.register(meterRegistry);
Gauge.builder(, activeRequests, AtomicLong::get)
.description()
.register(meterRegistry);
Gauge.builder(, failedRequests, AtomicLong::get)
.description()
.register(meterRegistry);
}
Timer.Sample {
activeRequests.incrementAndGet();
Timer.start(meterRegistry);
}
{
totalRequests.incrementAndGet();
activeRequests.decrementAndGet();
timers.computeIfAbsent(operation, op ->
Timer.builder()
.tag(, op)
.tag(, )
.publishPercentiles(, , )
.register(meterRegistry)
);
sample.stop(timer);
}
{
totalRequests.incrementAndGet();
failedRequests.incrementAndGet();
activeRequests.decrementAndGet();
timers.computeIfAbsent(operation + , op ->
Timer.builder()
.tag(, operation)
.tag(, )
.tag(, error)
.register(meterRegistry)
);
sample.stop(timer);
}
{
Counter.builder()
.tag(, )
.register(meterRegistry)
.increment(inputTokens);
Counter.builder()
.tag(, )
.register(meterRegistry)
.increment(outputTokens);
}
String {
((PrometheusMeterRegistry) meterRegistry).scrape();
}
}
5.2 压力测试和性能分析
package com.example.testing;
import dev.langchain4j.model.chat.ChatLanguageModel;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@State(Scope.Benchmark)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@Warmup(iterations = 3, time = 5)
@Measurement(iterations = 5, time = 10)
@Fork(2)
@Threads(10)
public class ConcurrentPerformanceTest {
private ChatLanguageModel model;
private ExecutorService executorService;
private CountDownLatch latch;
private AtomicInteger successCount;
private AtomicInteger failureCount;
@Setup
public void setup() {
model = createChatModel();
executorService = Executors.newFixedThreadPool(50);
successCount = new AtomicInteger();
failureCount = new AtomicInteger();
}
@Benchmark
@BenchmarkMode({Mode.Throughput, Mode.AverageTime})
public void InterruptedException {
;
latch = (concurrentRequests);
( ; i < concurrentRequests; i++) {
executorService.submit(() -> {
{
model.generate( + Thread.currentThread().getId());
successCount.incrementAndGet();
} (Exception e) {
failureCount.incrementAndGet();
} {
latch.countDown();
}
});
}
latch.await(, TimeUnit.SECONDS);
}
{
executorService.shutdown();
System.out.println( + successCount.get());
System.out.println( + failureCount.get());
System.out.println( + () successCount.get() / (successCount.get() + failureCount.get()));
}
RunnerException {
()
.include(ConcurrentPerformanceTest.class.getSimpleName())
.build();
(options).run();
}
}
六、面试要点总结
6.1 LangChain4j 并发处理核心要点
- 多层并发控制:
- 应用层:线程池、队列、超时控制
- HTTP 层:连接池、连接复用
- API 层:速率限制、配额管理
- 线程安全性分析:
- 无状态组件:天然线程安全
- 有状态组件:需要同步机制
- 共享资源:需要原子操作或锁保护
- 内置组件线程安全:
- ChatMemory:需要外部同步或使用线程安全实现
- EmbeddingModel:通常无状态,但需要并发控制
- VectorStore:需要事务支持或乐观锁
6.2 常见的线程安全问题
内存可见性问题:
private volatile int counter = 0;
public void increment() {
counter++;
}
private AtomicInteger counter = new AtomicInteger(0);
public void increment() {
counter.incrementAndGet();
}
死锁风险:
synchronized(lockA) {
synchronized(lockB) {
}
}
Object firstLock = lockA.hashCode() < lockB.hashCode() ? lockA : lockB;
Object secondLock = firstLock == lockA ? lockB : lockA;
synchronized(firstLock) {
synchronized(secondLock) {
}
}
竞态条件:
if (!initialized) {
initialize();
initialized = true;
}
if (!initialized) {
synchronized(this) {
if (!initialized) {
initialize();
initialized = true;
}
}
}
6.3 最佳实践总结
- 连接池配置:
- 根据并发需求调整连接数
- 监控连接池状态
- 定期清理空闲连接
- 线程池配置:
- 根据 CPU 核心数设置线程数
- 使用有界队列避免内存溢出
- 配置合理的拒绝策略
- 错误处理:
- 实现重试机制
- 使用熔断器防止雪崩
- 监控错误率并告警
- 性能监控:
- 监控请求延迟和吞吐量
- 跟踪资源使用情况
- 设置性能基线
- 测试验证:
6.4 面试问题回答要点
Q: LangChain4j 如何处理并发请求?
A: LangChain4j 通过多层级并发控制机制:
- 应用层使用线程池和队列管理并发
- HTTP 层使用连接池优化网络请求
- API 层实现速率限制和配额管理
- 支持异步和响应式编程模型
Q: LangChain4j 是否存在线程安全问题?
A: 需要具体情况分析:
- 无状态组件(如模型)通常是线程安全的
- 有状态组件(如 ChatMemory)需要额外同步
- 共享资源需要适当的并发控制
- 建议使用线程安全的数据结构和原子操作
Q: 如何优化 LangChain4j 的并发性能?
A: 可以从以下几个方面优化:
- 合理配置连接池和线程池参数
- 实现请求批处理和缓存
- 使用异步非阻塞 IO
- 监控和调整系统资源
- 根据负载动态调整并发策略