跳到主要内容LangChain4j 并发处理与线程安全深度解析 | 极客日志JavaAIjava
LangChain4j 并发处理与线程安全深度解析
综述由AI生成深入解析了 LangChain4j 框架下的并发处理机制与线程安全问题。内容涵盖多层级并发控制架构(应用层、HTTP 层、API 层),详细阐述了 HTTP 客户端配置、模型层并发控制及异步/响应式编程支持。重点分析了核心组件的线程安全性,包括无状态与有状态组件的区别,以及 ChatMemory 和 EmbeddingModel 的具体实现方案。此外,文章提供了连接池、线程池配置的最佳实践,性能监控与压力测试方法,并总结了面试中关于并发处理的核心要点与常见线程安全问题的解决方案。
王者29 浏览 LangChain4j 并发处理与线程安全深度解析
一、LangChain4j 并发架构设计
1.1 并发模型概述
LangChain4j 采用多层级并发控制策略:
┌───────────────────────────────────────────────────┐
│ 应用层并发控制 │
├───────────────────────────────────────────────────┤
│ 线程池配置 | 请求队列 | 超时控制 | 熔断机制 │
├───────────────────────────────────────────────────┤
│ HTTP 客户端并发控制 │
├───────────────────────────────────────────────────┤
│ 连接池管理 | 连接复用 | 请求复用 | 流控机制 │
├───────────────────────────────────────────────────┤
│ 供应商 API 并发限制 │
├───────────────────────────────────────────────────┤
│ 速率限制 | 令牌限制 | 配额管理 | 优先级队列 │
└───────────────────────────────────────────────────┘
1.2 核心并发组件
public interface ConcurrentModel {
interface AsyncChatModel extends ChatLanguageModel {
CompletableFuture<Response<String>> generateAsync(String prompt);
CompletableFuture<Response<String>> generateAsync(List<ChatMessage> messages);
}
interface StreamingChatModel {
Flux<Response<String>> generateStream(List<ChatMessage> messages);
}
interface BatchChatModel {
List<Response<String>> generateBatch(List<List<ChatMessage>> batches);
}
}
二、并发请求处理机制详解
2.1 HTTP 客户端并发配置
package com.example.concurrent;
dev.langchain4j.model.openai.OpenAiChatModel;
okhttp3.*;
java.util.concurrent.*;
{
OkHttpClient {
(
,
, TimeUnit.MINUTES
);
();
dispatcher.setMaxRequests();
dispatcher.setMaxRequestsPerHost();
OkHttpClient. .Builder()
.connectTimeout(, TimeUnit.SECONDS)
.writeTimeout(, TimeUnit.SECONDS)
.readTimeout(, TimeUnit.SECONDS)
.callTimeout(, TimeUnit.SECONDS)
.connectionPool(connectionPool)
.dispatcher(dispatcher);
builder.addInterceptor( ());
builder.addInterceptor( ());
builder.addInterceptor( ());
builder.connectionSpecs(Arrays.asList(
ConnectionSpec.MODERN_TLS,
ConnectionSpec.COMPATIBLE_TLS
));
builder.dns( .Builder()
.url(HttpUrl.get())
.build());
builder.build();
}
{
maxRetries;
{
.maxRetries = maxRetries;
}
Response IOException {
chain.request();
;
;
( ; retryCount <= maxRetries; retryCount++) {
{
response = chain.proceed(request);
(shouldRetry(response, retryCount)) {
response.close();
(retryCount < maxRetries) {
waitBeforeRetry(retryCount);
;
}
}
response;
} (IOException e) {
exception = e;
(retryCount < maxRetries) {
waitBeforeRetry(retryCount);
;
}
}
}
exception != ? exception : ();
}
{
response.code();
code == ||
code >= ||
code == ;
}
{
{
() Math.pow(, retryCount) * ;
Thread.sleep(waitTime);
} (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
{
RateLimiter rateLimiter;
{
.rateLimiter = RateLimiter.create(, );
}
Response IOException {
rateLimiter.acquire();
chain.proceed(chain.request());
}
}
}
import
import
import
public
class
ConcurrentHttpClientConfig
public
static
createHighPerformanceClient
()
ConnectionPool
connectionPool
=
new
ConnectionPool
100
5
Dispatcher
dispatcher
=
new
Dispatcher
200
50
Builder
builder
=
new
OkHttpClient
30
60
120
300
new
RetryInterceptor
3
new
RateLimitInterceptor
new
LoggingInterceptor
new
DnsOverHttps
"https://cloudflare-dns.com/dns-query"
return
static
class
RetryInterceptor
implements
Interceptor
private
final
int
public
RetryInterceptor
(int maxRetries)
this
@Override
public
intercept
(Chain chain)
throws
Request
request
=
Response
response
=
null
IOException
exception
=
null
for
int
retryCount
=
0
try
if
if
continue
return
catch
if
continue
throw
null
new
IOException
"Request failed"
private
boolean
shouldRetry
(Response response, int retryCount)
int
code
=
return
429
500
408
private
void
waitBeforeRetry
(int retryCount)
try
long
waitTime
=
long
2
1000
catch
static
class
RateLimitInterceptor
implements
Interceptor
private
final
public
RateLimitInterceptor
()
this
10.0
20
@Override
public
intercept
(Chain chain)
throws
return
2.2 模型层并发控制
package com.example.concurrent;
import dev.langchain4j.model.chat.ChatLanguageModel;
import dev.langchain4j.model.output.Response;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class ConcurrentChatModel implements ChatLanguageModel {
private final ChatLanguageModel delegate;
private final ExecutorService executorService;
private final Semaphore concurrencyLimiter;
private final AtomicLong requestCounter;
private final ConcurrentHashMap<String, AtomicLong> requestMetrics;
private final CircuitBreaker circuitBreaker;
public ConcurrentChatModel(ChatLanguageModel delegate, int maxConcurrency, int queueSize) {
this.delegate = delegate;
this.executorService = new ThreadPoolExecutor(
maxConcurrency,
maxConcurrency * 2,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueSize),
new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "ChatModel-Worker-" + counter.incrementAndGet());
t.setDaemon(true);
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
this.concurrencyLimiter = new Semaphore(maxConcurrency);
this.requestCounter = new AtomicLong();
this.requestMetrics = new ConcurrentHashMap<>();
this.circuitBreaker = new CircuitBreaker(5, 10000, 0.5);
}
@Override
public Response<String> generate(String prompt) {
return generate(Collections.singletonList(UserMessage.from(prompt)));
}
@Override
public Response<String> generate(List<ChatMessage> messages) {
if (!circuitBreaker.allowRequest()) {
throw new RuntimeException("Circuit breaker is open");
}
long startTime = System.nanoTime();
String requestId = UUID.randomUUID().toString();
try {
if (!concurrencyLimiter.tryAcquire(30, TimeUnit.SECONDS)) {
throw new RuntimeException("Concurrency limit timeout");
}
CompletableFuture<Response<String>> future = CompletableFuture.supplyAsync(() -> {
try {
recordRequestStart(requestId);
return delegate.generate(messages);
} catch (Exception e) {
circuitBreaker.recordFailure();
throw e;
}
}, executorService);
Response<String> response = future.get(120, TimeUnit.SECONDS);
circuitBreaker.recordSuccess();
recordRequestSuccess(requestId, startTime);
return response;
} catch (TimeoutException e) {
circuitBreaker.recordFailure();
throw new RuntimeException("Request timeout", e);
} catch (ExecutionException e) {
circuitBreaker.recordFailure();
throw new RuntimeException("Request failed", e.getCause());
} catch (Exception e) {
circuitBreaker.recordFailure();
throw new RuntimeException("Unexpected error", e);
} finally {
concurrencyLimiter.release();
}
}
@Override
public List<Response<String>> generate(List<ChatMessage> messages, int n) {
return IntStream.range(0, n).parallel()
.mapToObj(i -> generate(messages))
.collect(Collectors.toList());
}
@Override
public Stream<Response<String>> generateStream(List<ChatMessage> messages) {
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(new Iterator<Response<String>>() {
private boolean hasNext = true;
@Override
public boolean hasNext() {
return hasNext;
}
@Override
public Response<String> next() {
return null;
}
}, Spliterator.ORDERED), false);
}
public CompletableFuture<Response<String>> generateAsync(List<ChatMessage> messages) {
return CompletableFuture.supplyAsync(() -> generate(messages), executorService);
}
public CompletableFuture<List<Response<String>>> generateBatchAsync(List<List<ChatMessage>> batches) {
List<CompletableFuture<Response<String>>> futures = batches.stream()
.map(this::generateAsync)
.collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
private void recordRequestStart(String requestId) {
requestCounter.incrementAndGet();
requestMetrics.put(requestId, new AtomicLong(System.nanoTime()));
}
private void recordRequestSuccess(String requestId, long startTime) {
long duration = System.nanoTime() - startTime;
AtomicLong counter = requestMetrics.get(requestId);
if (counter != null) {
counter.set(duration);
}
}
public Map<String, Object> getMetrics() {
Map<String, Object> metrics = new HashMap<>();
metrics.put("totalRequests", requestCounter.get());
metrics.put("activeRequests", concurrencyLimiter.availablePermits());
metrics.put("queueSize", ((ThreadPoolExecutor) executorService).getQueue().size());
metrics.put("activeThreads", ((ThreadPoolExecutor) executorService).getActiveCount());
List<Long> durations = requestMetrics.values().stream()
.map(AtomicLong::get)
.filter(v -> v > 0)
.collect(Collectors.toList());
if (!durations.isEmpty()) {
double avgDuration = durations.stream()
.mapToLong(Long::longValue)
.average()
.orElse(0.0) / 1_000_000.0;
metrics.put("avgResponseTimeMs", avgDuration);
}
return metrics;
}
private static class CircuitBreaker {
private final int failureThreshold;
private final long recoveryTimeout;
private final double failureRateThreshold;
private final AtomicInteger failureCount = new AtomicInteger();
private final AtomicInteger successCount = new AtomicInteger();
private volatile long lastFailureTime = 0;
private volatile boolean isOpen = false;
public CircuitBreaker(int failureThreshold, long recoveryTimeout, double failureRateThreshold) {
this.failureThreshold = failureThreshold;
this.recoveryTimeout = recoveryTimeout;
this.failureRateThreshold = failureRateThreshold;
}
public boolean allowRequest() {
if (!isOpen) {
return true;
}
if (System.currentTimeMillis() - lastFailureTime > recoveryTimeout) {
isOpen = false;
failureCount.set(0);
successCount.set(0);
return true;
}
return false;
}
public void recordSuccess() {
successCount.incrementAndGet();
double failureRate = calculateFailureRate();
if (failureRate < failureRateThreshold) {
failureCount.set(0);
}
}
public void recordFailure() {
int failures = failureCount.incrementAndGet();
lastFailureTime = System.currentTimeMillis();
if (failures >= failureThreshold) {
isOpen = true;
}
}
private double calculateFailureRate() {
int total = successCount.get() + failureCount.get();
return total > 0 ? (double) failureCount.get() / total : 0.0;
}
}
}
2.3 异步和响应式编程支持
package com.example.concurrent;
import dev.langchain4j.model.chat.ChatLanguageModel;
import dev.langchain4j.model.output.Response;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class ReactiveChatModel {
private final ChatLanguageModel delegate;
private final Scheduler scheduler;
public ReactiveChatModel(ChatLanguageModel delegate) {
this.delegate = delegate;
this.scheduler = Schedulers.newBoundedElastic(100, 1000, "ReactiveChatModel", 60, true);
}
public Mono<Response<String>> generateReactive(List<ChatMessage> messages) {
return Mono.fromCallable(() -> delegate.generate(messages))
.subscribeOn(scheduler)
.timeout(Duration.ofSeconds(120))
.onErrorResume(e -> {
return Mono.error(new RuntimeException("Generation failed", e));
});
}
public Flux<String> generateStreamReactive(List<ChatMessage> messages) {
return Flux.create(fluxSink -> {
try {
String[] chunks = {"Chunk1", "Chunk2", "Chunk3"};
for (String chunk : chunks) {
if (fluxSink.isCancelled()) {
break;
}
fluxSink.next(chunk);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
fluxSink.error(e);
return;
}
}
fluxSink.complete();
} catch (Exception e) {
fluxSink.error(e);
}
}).subscribeOn(scheduler);
}
public Flux<Response<String>> generateBatchReactive(List<List<ChatMessage>> batches) {
return Flux.fromIterable(batches)
.parallel()
.runOn(Schedulers.parallel())
.flatMap(this::generateReactive)
.sequential();
}
public Flux<Response<String>> generateWithBackpressure(Flux<List<ChatMessage>> messageFlux) {
return messageFlux
.onBackpressureBuffer(100)
.flatMap(this::generateReactive, 10);
}
}
三、线程安全问题深度分析
3.1 LangChain4j 核心组件线程安全分析
package com.example.threadsafe;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;
public class ThreadSafetyAnalysis {
public static class StatelessComponent {
public String process(String input) {
return input.toUpperCase();
}
}
public static class StatefulComponent {
private int counter = 0;
public int increment() {
counter++;
return counter;
}
private AtomicInteger safeCounter = new AtomicInteger(0);
public int incrementSafely() {
return safeCounter.incrementAndGet();
}
}
public static class ThreadSafeCache<K, V> {
private final ConcurrentHashMap<K, V> cache = new ConcurrentHashMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public V get(K key) {
lock.readLock().lock();
try {
return cache.get(key);
} finally {
lock.readLock().unlock();
}
}
public void put(K key, V value) {
lock.writeLock().lock();
try {
cache.put(key, value);
} finally {
lock.writeLock().unlock();
}
}
public V computeIfAbsent(K key, Function<K, V> mappingFunction) {
return cache.computeIfAbsent(key, mappingFunction);
}
}
public static class SessionManager {
private static final ThreadLocal<Session> threadLocalSession = new ThreadLocal<>();
private static final ThreadLocal<Session> managedThreadLocalSession = new ThreadLocal<Session>() {
@Override
protected Session initialValue() {
return new Session();
}
@Override
public void remove() {
super.get().close();
super.remove();
}
};
private final ConcurrentHashMap<String, Session> sessionMap = new ConcurrentHashMap<>();
public Session getSession(String sessionId) {
return sessionMap.computeIfAbsent(sessionId, id -> new Session(id));
}
}
public static class ToolExecutor {
private final List<Tool> tools;
private final ReentrantLock toolLock = new ReentrantLock();
public Object executeTool(String toolName, Object input) {
toolLock.lock();
try {
Tool tool = findTool(toolName);
return tool.execute(input);
} finally {
toolLock.unlock();
}
}
private final ConcurrentHashMap<String, ReentrantLock> toolLocks = new ConcurrentHashMap<>();
public Object executeToolWithFineGrainedLock(String toolName, Object input) {
ReentrantLock lock = toolLocks.computeIfAbsent(toolName, k -> new ReentrantLock());
lock.lock();
try {
Tool tool = findTool(toolName);
return tool.execute(input);
} finally {
lock.unlock();
}
}
}
}
3.2 LangChain4j 内置组件的线程安全分析
3.2.1 ChatMemory 的线程安全性
package com.example.threadsafe;
import dev.langchain4j.memory.ChatMemory;
import dev.langchain4j.memory.chat.MessageWindowChatMemory;
public class ChatMemoryThreadSafety {
public static void analyzeInMemoryChatMemory() {
ChatMemory memory = MessageWindowChatMemory.withMaxMessages(10);
ChatMemory synchronizedMemory = synchronizeChatMemory(memory);
}
public static ChatMemory synchronizeChatMemory(ChatMemory delegate) {
return new ChatMemory() {
private final Object lock = new Object();
@Override
public String id() {
synchronized (lock) {
return delegate.id();
}
}
@Override
public void add(ChatMessage message) {
synchronized (lock) {
delegate.add(message);
}
}
@Override
public List<ChatMessage> messages() {
synchronized (lock) {
return new ArrayList<>(delegate.messages());
}
}
@Override
public void clear() {
synchronized (lock) {
delegate.clear();
}
}
};
}
public static class RedisChatMemory implements ChatMemory {
private final String sessionId;
private final RedisTemplate<String, Object> redisTemplate;
private final ReentrantLock lock = new ReentrantLock();
public RedisChatMemory(String sessionId, RedisTemplate<String, Object> redisTemplate) {
this.sessionId = sessionId;
this.redisTemplate = redisTemplate;
}
@Override
public void add(ChatMessage message) {
String key = "chat:memory:" + sessionId;
redisTemplate.execute(new SessionCallback<Object>() {
@Override
public Object execute(RedisOperations operations) throws DataAccessException {
operations.watch(key);
List<ChatMessage> messages = (List<ChatMessage>) operations.opsForValue().get(key);
if (messages == null) {
messages = new ArrayList<>();
}
messages.add(message);
operations.multi();
operations.opsForValue().set(key, messages);
return operations.exec();
}
});
}
}
}
3.2.2 EmbeddingModel 的线程安全性
package com.example.threadsafe;
import dev.langchain4j.model.embedding.EmbeddingModel;
import dev.langchain4j.model.output.Response;
import java.util.List;
import java.util.concurrent.*;
public class EmbeddingModelConcurrency {
public static class ThreadSafeEmbeddingModel implements EmbeddingModel {
private final EmbeddingModel delegate;
private final ExecutorService executorService;
private final RateLimiter rateLimiter;
public ThreadSafeEmbeddingModel(EmbeddingModel delegate, int maxConcurrency) {
this.delegate = delegate;
this.executorService = new ThreadPoolExecutor(
maxConcurrency,
maxConcurrency * 2,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy()
);
this.rateLimiter = RateLimiter.create(100.0);
}
@Override
public Response<Embedding> embed(String text) {
return embedAll(Collections.singletonList(text)).content().get(0);
}
@Override
public Response<List<Embedding>> embedAll(List<String> texts) {
rateLimiter.acquire(texts.size());
List<CompletableFuture<Response<Embedding>>> futures = texts.stream()
.map(text -> CompletableFuture.supplyAsync(() -> delegate.embed(text), executorService))
.collect(Collectors.toList());
List<Embedding> embeddings = futures.stream()
.map(CompletableFuture::join)
.map(Response::content)
.collect(Collectors.toList());
return Response.from(embeddings);
}
public Response<List<Embedding>> embedAllOptimized(List<String> texts) {
int batchSize = 100;
List<List<String>> batches = new ArrayList<>();
for (int i = 0; i < texts.size(); i += batchSize) {
batches.add(texts.subList(i, Math.min(i + batchSize, texts.size())));
}
List<Embedding> allEmbeddings = batches.parallelStream()
.flatMap(batch -> {
rateLimiter.acquire(batch.size());
return delegate.embedAll(batch).content().stream();
})
.collect(Collectors.toList());
return Response.from(allEmbeddings);
}
}
}
四、并发最佳实践和模式
4.1 连接池最佳实践
package com.example.bestpractice;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
public class ConnectionPoolBestPractice {
public static CloseableHttpClient createOptimalHttpClient() {
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(200);
connectionManager.setDefaultMaxPerRoute(50);
connectionManager.setValidateAfterInactivity(30000);
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(30000)
.setSocketTimeout(60000)
.setConnectionRequestTimeout(5000)
.build();
return HttpClientBuilder.create()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(requestConfig)
.setRetryHandler(new DefaultHttpRequestRetryHandler(3, true))
.disableCookieManagement()
.setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy())
.build();
}
public static class ConnectionPoolMonitor {
private final PoolingHttpClientConnectionManager connectionManager;
private final ScheduledExecutorService monitorExecutor;
public ConnectionPoolMonitor(PoolingHttpClientConnectionManager connectionManager) {
this.connectionManager = connectionManager;
this.monitorExecutor = Executors.newSingleThreadScheduledExecutor();
startMonitoring();
}
private void startMonitoring() {
monitorExecutor.scheduleAtFixedRate(() -> {
try {
PoolStats stats = connectionManager.getTotalStats();
System.out.println("=== Connection Pool Stats ===");
System.out.println("Available: " + stats.getAvailable());
System.out.println("Leased: " + stats.getLeased());
System.out.println("Pending: " + stats.getPending());
System.out.println("Max: " + stats.getMax());
if (stats.getAvailable() < 10) {
System.err.println("WARNING: Low available connections!");
}
connectionManager.closeExpiredConnections();
connectionManager.closeIdleConnections(60, TimeUnit.SECONDS);
} catch (Exception e) {
System.err.println("Monitor error: " + e.getMessage());
}
}, 0, 30, TimeUnit.SECONDS);
}
}
}
4.2 异步编程最佳实践
package com.example.bestpractice;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class AsyncBestPractice {
public static ExecutorService createOptimalThreadPool() {
int corePoolSize = Runtime.getRuntime().availableProcessors();
int maxPoolSize = corePoolSize * 2;
return new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10000),
new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "LLM-Worker-" + counter.incrementAndGet());
t.setDaemon(true);
t.setUncaughtExceptionHandler((thread, throwable) -> {
System.err.println("Uncaught exception in " + thread.getName() + ": " + throwable.getMessage());
});
return t;
}
},
new ThreadPoolExecutor.AbortPolicy()
);
}
public static class CompletableFutureBestPractice {
public CompletableFuture<String> processWithTimeout(CompletableFuture<String> future, long timeout, TimeUnit unit) {
CompletableFuture<String> timeoutFuture = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(unit.toMillis(timeout));
timeoutFuture.completeExceptionally(new TimeoutException("Operation timeout"));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
return future.applyToEither(timeoutFuture, Function.identity());
}
public CompletableFuture<List<String>> processBatchWithConcurrencyLimit(List<CompletableFuture<String>> futures, int maxConcurrency) {
Semaphore semaphore = new Semaphore(maxConcurrency);
List<CompletableFuture<String>> controlledFutures = futures.stream()
.map(future -> CompletableFuture.supplyAsync(() -> {
try {
semaphore.acquire();
return future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} finally {
semaphore.release();
}
}))
.collect(Collectors.toList());
return CompletableFuture.allOf(controlledFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> controlledFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
}
public static class ReactiveBestPractice {
public Flux<String> processWithBackpressure(Flux<String> source) {
return source
.onBackpressureBuffer(1000, BufferOverflowStrategy.DROP_LATEST)
.publishOn(Schedulers.parallel())
.doOnNext(item -> {
})
.doOnError(error -> {
})
.doOnComplete(() -> {
});
}
public Mono<String> processWithRetry(Mono<String> mono) {
return mono
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(10))
.jitter(0.5)
.doAfterRetry(retrySignal -> {
}))
.timeout(Duration.ofSeconds(30))
.onErrorResume(error -> {
return Mono.just("fallback");
});
}
}
}
五、性能优化和监控
5.1 性能监控实现
package com.example.monitoring;
import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.binder.jvm.*;
import io.micrometer.core.instrument.binder.system.*;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class PerformanceMonitor {
private final MeterRegistry meterRegistry;
private final ConcurrentHashMap<String, Timer> timers;
private final AtomicLong totalRequests;
private final AtomicLong failedRequests;
private final AtomicLong activeRequests;
public PerformanceMonitor() {
this.meterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
this.timers = new ConcurrentHashMap<>();
this.totalRequests = new AtomicLong();
this.failedRequests = new AtomicLong();
this.activeRequests = new AtomicLong();
registerMetrics();
}
private void registerMetrics() {
new JvmMemoryMetrics().bindTo(meterRegistry);
new JvmGcMetrics().bindTo(meterRegistry);
new ProcessorMetrics().bindTo(meterRegistry);
new JvmThreadMetrics().bindTo(meterRegistry);
new UptimeMetrics().bindTo(meterRegistry);
new ProcessorMetrics().bindTo(meterRegistry);
Gauge.builder("langchain4j.requests.total", totalRequests, AtomicLong::get)
.description("Total number of requests")
.register(meterRegistry);
Gauge.builder("langchain4j.requests.active", activeRequests, AtomicLong::get)
.description("Active number of requests")
.register(meterRegistry);
Gauge.builder("langchain4j.requests.failed", failedRequests, AtomicLong::get)
.description("Failed number of requests")
.register(meterRegistry);
}
public Timer.Sample startTimer(String operation) {
activeRequests.incrementAndGet();
return Timer.start(meterRegistry);
}
public void recordSuccess(Timer.Sample sample, String operation) {
totalRequests.incrementAndGet();
activeRequests.decrementAndGet();
Timer timer = timers.computeIfAbsent(operation, op ->
Timer.builder("langchain4j.operation.duration")
.tag("operation", op)
.tag("status", "success")
.publishPercentiles(0.5, 0.95, 0.99)
.register(meterRegistry)
);
sample.stop(timer);
}
public void recordFailure(Timer.Sample sample, String operation, String error) {
totalRequests.incrementAndGet();
failedRequests.incrementAndGet();
activeRequests.decrementAndGet();
Timer timer = timers.computeIfAbsent(operation + ".error", op ->
Timer.builder("langchain4j.operation.duration")
.tag("operation", operation)
.tag("status", "error")
.tag("error", error)
.register(meterRegistry)
);
sample.stop(timer);
}
public void recordTokenUsage(int inputTokens, int outputTokens) {
Counter.builder("langchain4j.tokens.total")
.tag("type", "input")
.register(meterRegistry)
.increment(inputTokens);
Counter.builder("langchain4j.tokens.total")
.tag("type", "output")
.register(meterRegistry)
.increment(outputTokens);
}
public String getMetrics() {
return ((PrometheusMeterRegistry) meterRegistry).scrape();
}
}
5.2 压力测试和性能分析
package com.example.testing;
import dev.langchain4j.model.chat.ChatLanguageModel;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@State(Scope.Benchmark)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@Warmup(iterations = 3, time = 5)
@Measurement(iterations = 5, time = 10)
@Fork(2)
@Threads(10)
public class ConcurrentPerformanceTest {
private ChatLanguageModel model;
private ExecutorService executorService;
private CountDownLatch latch;
private AtomicInteger successCount;
private AtomicInteger failureCount;
@Setup
public void setup() {
model = createChatModel();
executorService = Executors.newFixedThreadPool(50);
successCount = new AtomicInteger();
failureCount = new AtomicInteger();
}
@Benchmark
@BenchmarkMode({Mode.Throughput, Mode.AverageTime})
public void testConcurrentRequests() throws InterruptedException {
int concurrentRequests = 100;
latch = new CountDownLatch(concurrentRequests);
for (int i = 0; i < concurrentRequests; i++) {
executorService.submit(() -> {
try {
model.generate("Test message " + Thread.currentThread().getId());
successCount.incrementAndGet();
} catch (Exception e) {
failureCount.incrementAndGet();
} finally {
latch.countDown();
}
});
}
latch.await(30, TimeUnit.SECONDS);
}
@TearDown
public void tearDown() {
executorService.shutdown();
System.out.println("Success: " + successCount.get());
System.out.println("Failure: " + failureCount.get());
System.out.println("Success rate: " + (double) successCount.get() / (successCount.get() + failureCount.get()));
}
public static void main(String[] args) throws RunnerException {
Options options = new OptionsBuilder()
.include(ConcurrentPerformanceTest.class.getSimpleName())
.build();
new Runner(options).run();
}
}
六、面试要点总结
6.1 LangChain4j 并发处理核心要点
- 多层并发控制:
- 应用层:线程池、队列、超时控制
- HTTP 层:连接池、连接复用
- API 层:速率限制、配额管理
- 线程安全性分析:
- 无状态组件:天然线程安全
- 有状态组件:需要同步机制
- 共享资源:需要原子操作或锁保护
- 内置组件线程安全:
- ChatMemory:需要外部同步或使用线程安全实现
- EmbeddingModel:通常无状态,但需要并发控制
- VectorStore:需要事务支持或乐观锁
6.2 常见的线程安全问题
private volatile int counter = 0;
public void increment() {
counter++;
}
private AtomicInteger counter = new AtomicInteger(0);
public void increment() {
counter.incrementAndGet();
}
synchronized(lockA) {
synchronized(lockB) {
}
}
Object firstLock = lockA.hashCode() < lockB.hashCode() ? lockA : lockB;
Object secondLock = firstLock == lockA ? lockB : lockA;
synchronized(firstLock) {
synchronized(secondLock) {
}
}
if (!initialized) {
initialize();
initialized = true;
}
if (!initialized) {
synchronized(this) {
if (!initialized) {
initialize();
initialized = true;
}
}
}
6.3 最佳实践总结
- 连接池配置:
- 根据并发需求调整连接数
- 监控连接池状态
- 定期清理空闲连接
- 线程池配置:
- 根据 CPU 核心数设置线程数
- 使用有界队列避免内存溢出
- 配置合理的拒绝策略
- 错误处理:
- 实现重试机制
- 使用熔断器防止雪崩
- 监控错误率并告警
- 性能监控:
- 监控请求延迟和吞吐量
- 跟踪资源使用情况
- 设置性能基线
- 测试验证:
6.4 面试问题回答要点
Q: LangChain4j 如何处理并发请求?
A: LangChain4j 通过多层级并发控制机制:
- 应用层使用线程池和队列管理并发
- HTTP 层使用连接池优化网络请求
- API 层实现速率限制和配额管理
- 支持异步和响应式编程模型
Q: LangChain4j 是否存在线程安全问题?
A: 需要具体情况分析:
- 无状态组件(如模型)通常是线程安全的
- 有状态组件(如 ChatMemory)需要额外同步
- 共享资源需要适当的并发控制
- 建议使用线程安全的数据结构和原子操作
Q: 如何优化 LangChain4j 的并发性能?
A: 可以从以下几个方面优化:
- 合理配置连接池和线程池参数
- 实现请求批处理和缓存
- 使用异步非阻塞 IO
- 监控和调整系统资源
- 根据负载动态调整并发策略
相关免费在线工具
- Keycode 信息
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
- Escape 与 Native 编解码
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
- JavaScript / HTML 格式化
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
- JavaScript 压缩与混淆
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
- RSA密钥对生成器
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
- Mermaid 预览与可视化编辑
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online