生产环境中的常见挑战
在生产环境中,Elasticsearch 常面临性能与稳定性挑战。例如电商系统上线初期可能因分词器配置错误导致搜索结果不准确,大促期间集群 CPU 100% 往往源于 wildcard 查询使用不当。日志系统存储 TB 级数据时,分片数设置错误会导致管理开销巨大。向量搜索场景下,Java High Level Client 内存泄漏需通过正确关闭 BulkProcessor 解决。
经验表明:理解 ES 原理的开发者能避免搜索引擎埋雷风险。
摘要
Elasticsearch 是基于 Lucene 的分布式搜索引擎,通过倒排索引实现毫秒级检索。本文深度解析 ES 集群架构、分片原理、查询优化机制,揭秘 Java 客户端的最佳实践。通过完整电商搜索实战,对比不同查询方式的性能差异,提供索引设计、查询优化、集群监控等核心问题的解决方案。包含企业级配置模板、性能调优数据和故障排查手册。
1. 为什么选择 Elasticsearch?
1.1 从数据库的痛苦说起
MySQL 做搜索的典型问题如下:
-- MySQL 模糊查询
SELECT * FROM products WHERE name LIKE '%手机%' OR description LIKE '%手机%' OR tags LIKE '%手机%' ORDER BY create_time DESC LIMIT 100 OFFSET 0;
图 1:MySQL 搜索的问题

MySQL 搜索的痛点:
- LIKE '%xxx%' 导致全表扫描
- 多字段 OR 查询性能极差
- 无法支持复杂评分排序
- 分词、同义词、拼音搜索不支持
1.2 Elasticsearch 的优势
ES 的倒排索引(Inverted Index)是核心:
// 倒排索引结构示例
public class InvertedIndex {
// 词项 -> 文档列表
Map<String, List<Posting>> index = new HashMap<>();
// 文档 1: "华为手机很好用"
// 文档 2: "小米手机性价比高"
// 倒排索引:
// "华为" -> [文档 1]
// "手机" -> [文档 1, 文档 2]
// "小米" -> [文档 2]
// "很好用" -> [文档 1]
// "性价比" -> [文档 2]
}
代码清单 2:倒排索引原理
搜索过程对比:
图 2:MySQL vs ES 搜索流程对比

性能对比测试(1000 万商品数据):
| 场景 | MySQL | Elasticsearch | 性能差距 |
|---|---|---|---|
| 单字段模糊查询 | 3200ms | 45ms | 71 倍 |
| 多字段 OR 查询 | 8500ms | 65ms | 130 倍 |
| 复杂条件 + 排序 | 12000ms | 85ms | 141 倍 |
| 内存占用 | 4.2GB | 1.8GB | 57% |
2. ES 核心架构解析
2.1 集群架构
图 3:ES 集群架构

节点类型:
- 主节点(Master):管理集群状态、分片分配
- 数据节点(Data):存储数据、执行 CRUD
- 协调节点(Coordinating):路由请求、聚合结果
- 摄取节点(Ingest):数据预处理
2.2 索引与分片
// 索引创建示例
@Configuration
public class IndexConfig {
public void createProductIndex(RestHighLevelClient client) throws IOException {
CreateIndexRequest request = new CreateIndexRequest("products");
// 索引设置
request.settings(Settings.builder()
.put("index.number_of_shards", 3) // 主分片数
.put("index.number_of_replicas", 1) // 副本数
.put("index.refresh_interval", "1s") // 刷新间隔
.put("analysis.analyzer.default.type", "ik_max_word")); // 分词器
// Mapping 定义
XContentBuilder mapping = JsonXContent.contentBuilder()
.startObject()
.startObject("properties")
.startObject("id")
.field("type", "keyword")
.endObject()
.startObject("name")
.field("type", "text")
.field("analyzer", "ik_max_word")
.field("search_analyzer", "ik_smart")
.field("fields", JsonXContent.contentBuilder()
.startObject()
.startObject("pinyin")
.field("type", "text")
.field("analyzer", "pinyin_analyzer")
.endObject()
.startObject("keyword")
.field("type", "keyword")
.field("ignore_above", 256)
.endObject()
.endObject())
.endObject()
.startObject("price")
.field("type", "double")
.endObject()
.startObject("category_id")
.field("type", "integer")
.endObject()
.startObject("sales")
.field("type", "integer")
.endObject()
.startObject("create_time")
.field("type", "date")
.field("format", "yyyy-MM-dd HH:mm:ss||epoch_millis")
.endObject()
.endObject()
.endObject();
request.mapping(mapping);
CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
if (response.isAcknowledged()) {
log.info("索引创建成功");
}
}
}
代码清单 3:索引创建配置
分片原理:
图 4:分片与副本分布

分片设计原则:
- 单个分片不超过 50GB
- 分片数 = 数据总量 / 50GB
- 副本数 = 节点数 - 1(至少 1 个)
- 避免过度分片(每个分片有开销)
3. Java 客户端实战
3.1 客户端选型对比
| 客户端 | 优点 | 缺点 | 推荐场景 |
|---|---|---|---|
| RestHighLevelClient | 官方维护,功能全 | 笨重,API 复杂 | 新项目,需要完整功能 |
| Java Low Level Client | 轻量,灵活 | 需要手动处理 JSON | 简单查询,性能敏感 |
| Spring Data Elasticsearch | 简洁,集成 Spring | 版本兼容问题 | Spring Boot 项目 |
| JestClient | 简单易用 | 已停止维护 | 不推荐新项目 |
我们的选择:新项目用 RestHighLevelClient,Spring Boot 项目用 Spring Data Elasticsearch。
3.2 RestHighLevelClient 配置
@Configuration
@Slf4j
public class ElasticsearchConfig {
@Value("${elasticsearch.hosts:localhost:9200}")
private String hosts;
@Value("${elasticsearch.username:}")
private String username;
@Value("${elasticsearch.password:}")
private String password;
@Bean
public RestHighLevelClient restHighLevelClient() {
// 解析主机列表
String[] hostArray = hosts.split(",");
HttpHost[] httpHosts = new HttpHost[hostArray.length];
for (int i = 0; i < hostArray.length; i++) {
String[] hostPort = hostArray[i].split(":");
httpHosts[i] = new HttpHost(
hostPort[0].trim(),
Integer.parseInt(hostPort[1].trim()),
"http");
}
// 配置 Builder
RestClientBuilder builder = RestClient.builder(httpHosts)
.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
.setConnectTimeout(5000) // 连接超时 5 秒
.setSocketTimeout(60000) // Socket 超时 60 秒
.setConnectionRequestTimeout(1000)) // 请求超时 1 秒
.setHttpClientConfigCallback(httpClientBuilder -> {
// 连接池配置
httpClientBuilder.setMaxConnTotal(100) // 最大连接数
.setMaxConnPerRoute(50) // 每路由最大连接数
.setKeepAliveStrategy((response, context) -> 60000); // 保活 60 秒
// 认证配置
if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(username, password));
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
return httpClientBuilder;
})
.setFailureListener(new RestClient.FailureListener() {
@Override
public void onFailure(Node node) {
log.error("ES 节点连接失败:{}", node.getHost());
}
});
return new RestHighLevelClient(builder);
}
// 健康检查
@Scheduled(fixedDelay = 30000)
public void checkClusterHealth() {
try {
ClusterHealthRequest request = new ClusterHealthRequest();
ClusterHealthResponse response = restHighLevelClient().cluster().health(request, RequestOptions.DEFAULT);
ClusterHealthStatus status = response.getStatus();
if (status != ClusterHealthStatus.GREEN) {
log.warn("ES 集群状态异常:{}, 详情:{}", status, response.toString());
// 发送告警
alertService.sendAlert("ES 集群状态异常", "状态:" + status + ", 详情:" + response.toString());
}
} catch (Exception e) {
log.error("ES 集群健康检查失败", e);
}
}
}
代码清单 4:ES 客户端配置
3.3 Spring Data Elasticsearch 配置
@Configuration
@EnableElasticsearchRepositories(basePackages = "com.example.repository")
@Slf4j
public class SpringDataESConfig {
@Value("${elasticsearch.hosts:localhost:9200}")
private String hosts;
@Bean
public RestHighLevelClient elasticsearchClient() {
// 同上面配置
return restHighLevelClient();
}
@Bean
public ElasticsearchRestTemplate elasticsearchTemplate() {
return new ElasticsearchRestTemplate(elasticsearchClient());
}
// 实体类映射
@Document(indexName = "products", createIndex = false)
@Setting(settingPath = "/settings/product-settings.json")
@Mapping(mappingPath = "/mappings/product-mapping.json")
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Product {
@Id
private String id;
@Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
@MultiField(mainField = @Field(type = FieldType.Text, analyzer = "ik_max_word"), otherFields = {
@InnerField(suffix = "pinyin", type = FieldType.Text, analyzer = "pinyin"),
@InnerField(suffix = "keyword", type = FieldType.Keyword)
})
private String name;
@Field(type = FieldType.Double)
private Double price;
@Field(type = FieldType.Integer)
private Integer categoryId;
@Field(type = FieldType.Integer)
private Integer sales;
@Field(type = FieldType.Date, format = DateFormat.date_hour_minute_second)
private Date createTime;
// Geo 字段
@GeoPointField
private GeoPoint location;
}
// Repository 接口
public interface ProductRepository extends ElasticsearchRepository<Product, String> {
// 方法名查询
List<Product> findByName(String name);
List<Product> findByPriceBetween(Double minPrice, Double maxPrice);
Page<Product> findByCategoryId(Integer categoryId, Pageable pageable);
// @Query 注解
@Query("{\"match\": {\"name\": \"?0\"}}")
List<Product> findByNameCustom(String name);
// 原生查询
@Query("{\"bool\": {\"must\": [" +
"{\"match\": {\"name\": \"?0\"}}," +
"{\"range\": {\"price\": {\"gte\": ?1, \"lte\": ?2}}}" +
"]}}")
List<Product> searchByNameAndPriceRange(String name, Double minPrice, Double maxPrice);
}
}
代码清单 5:Spring Data ES 配置
4. 索引设计最佳实践
4.1 索引生命周期管理
@Component
@Slf4j
public class IndexLifecycleManager {
// 1. 按时间滚动索引
public String getDailyIndex(String indexPrefix) {
String date = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy.MM.dd"));
return indexPrefix + "-" + date;
}
// 2. 创建带别名的索引
public void createIndexWithAlias(String indexName, String alias) throws IOException {
CreateIndexRequest request = new CreateIndexRequest(indexName);
// 索引设置
request.settings(getIndexSettings());
// 创建索引
CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
if (response.isAcknowledged()) {
// 添加别名
IndicesAliasesRequest aliasRequest = new IndicesAliasesRequest();
AliasActions aliasAction = new AliasActions(AliasActions.Type.ADD)
.index(indexName)
.alias(alias);
aliasRequest.addAliasAction(aliasAction);
client.indices().updateAliases(aliasRequest, RequestOptions.DEFAULT);
log.info("创建索引成功:{},别名:{}", indexName, alias);
}
}
// 3. 索引滚动策略
@Scheduled(cron = "0 0 0 * * ?") // 每天零点执行
public void rolloverIndex() throws IOException {
String todayIndex = getDailyIndex("logs");
String alias = "logs-current";
// 检查别名指向的索引
GetAliasesRequest request = new GetAliasesRequest(alias);
GetAliasesResponse response = client.indices().getAlias(request, RequestOptions.DEFAULT);
Map<String, Set<AliasMetadata>> aliases = response.getAliases();
if (aliases.isEmpty()) {
// 第一次创建
createIndexWithAlias(todayIndex, alias);
} else {
// 检查当前索引大小
String currentIndex = aliases.keySet().iterator().next();
IndexStats stats = getIndexStats(currentIndex);
// 如果超过 50GB 或创建超过 7 天,创建新索引
if (stats.getStoreSizeInBytes() > 50L * 1024 * 1024 * 1024 || isIndexOlderThan(currentIndex, 7)) {
// 创建新索引
createIndexWithAlias(todayIndex, alias);
// 切换别名
switchAlias(alias, currentIndex, todayIndex);
// 关闭旧索引
closeOldIndex(currentIndex);
}
}
}
// 4. 索引关闭和删除
public void manageIndexLifecycle() throws IOException {
// 关闭 30 天前的索引
closeIndicesOlderThan(30);
// 删除 90 天前的索引
deleteIndicesOlderThan(90);
// 强制合并 segments
forceMergeIndicesOlderThan(7);
}
// 5. 索引模板
public void createIndexTemplate() throws IOException {
PutIndexTemplateRequest request = new PutIndexTemplateRequest("logs-template");
request.patterns(Arrays.asList("logs-*"));
request.settings(getIndexSettings());
request.mapping(getIndexMapping());
// 别名配置
request.alias(new Alias("logs-all"));
// 优先级
request.order(1);
// 版本
request.version(1);
PutIndexTemplateResponse response = client.indices()
.putTemplate(request, RequestOptions.DEFAULT);
if (response.isAcknowledged()) {
log.info("索引模板创建成功");
}
}
}
代码清单 6:索引生命周期管理
4.2 映射设计技巧
{
"mappings": {
"dynamic": "strict",
// 禁止动态字段
"_source": {
"enabled": true,
"excludes": ["big_field"]
},
"properties": {
"id": {
"type": "keyword",
"ignore_above": 256
},
"title": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart",
"fields": {
"pinyin": {
"type": "text",
"analyzer": "pinyin_analyzer"
},
"keyword": {
"type": "keyword",
"ignore_above": 256
},
"completion": {
"type": "completion",
"analyzer": "simple"
}
}
},
"price": {
"type": "scaled_float",
"scaling_factor": 100
},
"location": {
"type": "geo_point"
},
"tags": {
"type": "text",
"analyzer": "whitespace"
},
"attributes": {
"type": "nested",
// 嵌套类型
"properties": {
"key": {
"type": "keyword"
},
"value": {
"type": "keyword"
}
}
},
"vector": {
"type": "dense_vector",
// 向量字段
"dims": 128
}
}
}
}
代码清单 7:映射设计示例
5. 查询优化实战
5.1 查询类型对比
@Service
@Slf4j
public class ProductSearchService {
// 1. Match 查询(最常用)
public SearchResponse searchByMatch(String keyword) throws IOException {
SearchRequest request = new SearchRequest("products");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchQuery("name", keyword)
.analyzer("ik_smart") // 指定搜索分词器
.operator(Operator.AND) // 必须包含所有词
.minimumShouldMatch("75%")); // 至少匹配 75%
sourceBuilder.from(0);
sourceBuilder.size(20);
sourceBuilder.sort(SortBuilders.scoreSort());
request.source(sourceBuilder);
return client.search(request, RequestOptions.DEFAULT);
}
// 2. MultiMatch 查询(多字段)
public SearchResponse searchByMultiMatch(String keyword) throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.multiMatchQuery(keyword)
.fields("name^3", "name.pinyin^2", "description") // 权重设置
.type(MultiMatchQueryBuilder.Type.BEST_FIELDS) // 最佳字段策略
.tieBreaker(0.3f)); // 其他字段权重
return executeSearch(sourceBuilder);
}
// 3. Term 查询(精确匹配)
public SearchResponse searchByTerm(String category) throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.termQuery("category.keyword", category));
return executeSearch(sourceBuilder);
}
// 4. Bool 查询(组合查询)
public SearchResponse searchByBool(String keyword, Double minPrice, Double maxPrice, String category) throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// MUST: 必须匹配
if (StringUtils.isNotBlank(keyword)) {
boolQuery.must(QueryBuilders.matchQuery("name", keyword));
}
// FILTER: 过滤,不计算分数
if (minPrice != null || maxPrice != null) {
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("price");
if (minPrice != null) {
rangeQuery.gte(minPrice);
}
if (maxPrice != null) {
rangeQuery.lte(maxPrice);
}
boolQuery.filter(rangeQuery);
}
// SHOULD: 应该匹配(影响分数)
if (StringUtils.isNotBlank(category)) {
boolQuery.should(QueryBuilders.termQuery("category.keyword", category))
.minimumShouldMatch(1);
}
// MUST_NOT: 必须不匹配
boolQuery.mustNot(QueryBuilders.termQuery("status", "deleted"));
sourceBuilder.query(boolQuery);
return executeSearch(sourceBuilder);
}
// 5. 聚合查询
public SearchResponse searchWithAggregation() throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 查询所有
sourceBuilder.query(QueryBuilders.matchAllQuery());
sourceBuilder.size(0); // 不返回 hits,只返回聚合
// 价格区间聚合
AggregationBuilder priceAgg = AggregationBuilders
.range("price_ranges")
.field("price")
.addRange(0, 100)
.addRange(100, 500)
.addRange(500, 1000)
.addRange(1000, 5000)
.addRange(5000, Double.MAX_VALUE);
// 品类聚合
AggregationBuilder categoryAgg = AggregationBuilders
.terms("categories")
.field("category.keyword")
.size(10);
// 嵌套聚合
AggregationBuilder nestedAgg = AggregationBuilders
.nested("attributes", "attributes")
.subAggregation(AggregationBuilders
.terms("attr_keys")
.field("attributes.key")
.size(10));
sourceBuilder.aggregation(priceAgg);
sourceBuilder.aggregation(categoryAgg);
sourceBuilder.aggregation(nestedAgg);
return executeSearch(sourceBuilder);
}
// 6. 搜索建议
public SuggestResponse searchSuggest(String prefix) throws IOException {
SearchRequest request = new SearchRequest("products");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.size(0);
// Completion Suggester
CompletionSuggestionBuilder suggestion = SuggestBuilders.completionSuggestion("name.completion")
.prefix(prefix, Fuzziness.AUTO)
.skipDuplicates(true)
.size(10);
SuggestBuilder suggestBuilder = new SuggestBuilder();
suggestBuilder.addSuggestion("product_suggest", suggestion);
sourceBuilder.suggest(suggestBuilder);
request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
return response.getSuggest();
}
}
代码清单 8:各种查询方式实现
5.2 性能优化技巧
@Component
@Slf4j
public class QueryOptimizer {
// 1. 分页优化
public SearchResponse searchWithPagination(String keyword, int page, int size) throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 常规分页(深度分页性能差)
// sourceBuilder.from((page - 1) * size).size(size);
// 推荐:Search After 分页
if (page > 1) {
// 获取上一页最后一条的 sort 值
Object[] searchAfter = getSearchAfterValues(page - 1, size);
sourceBuilder.searchAfter(searchAfter);
}
sourceBuilder.size(size);
sourceBuilder.sort(SortBuilders.fieldSort("_id")); // 必须有排序
return executeSearch(sourceBuilder);
}
// 2. 查询重写
public SearchResponse optimizedSearch(String keyword) throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 原来的 wildcard 查询(性能极差)
// QueryBuilders.wildcardQuery("name", "*" + keyword + "*");
// 优化方案 1:使用 ngram
QueryBuilders.matchQuery("name.ngram", keyword);
// 优化方案 2:使用前缀查询
if (keyword.length() < 10) {
QueryBuilders.prefixQuery("name.keyword", keyword);
}
// 优化方案 3:使用 edge ngram
QueryBuilders.matchQuery("name.edge_ngram", keyword);
return executeSearch(sourceBuilder);
}
// 3. 过滤器缓存
public SearchResponse searchWithFilterCache() throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// 经常过滤的条件用 filter,可以被缓存
boolQuery.filter(QueryBuilders.termQuery("status", "active"));
boolQuery.filter(QueryBuilders.rangeQuery("price")
.gte(100).lte(1000));
// 搜索条件
boolQuery.must(QueryBuilders.matchQuery("name", "手机"));
sourceBuilder.query(boolQuery);
return executeSearch(sourceBuilder);
}
// 4. 字段数据加载
public SearchResponse searchWithFieldData() throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 避免在 text 字段上排序
// sourceBuilder.sort(SortBuilders.fieldSort("name")); // 错误
// 使用 keyword 子字段排序
sourceBuilder.sort(SortBuilders.fieldSort("name.keyword"));
// 或者使用 doc values
sourceBuilder.docValueField("name.keyword");
return executeSearch(sourceBuilder);
}
// 5. 查询超时设置
public SearchResponse searchWithTimeout(String keyword, long timeoutMs) throws IOException {
SearchRequest request = new SearchRequest("products");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.timeout(new TimeValue(timeoutMs, TimeUnit.MILLISECONDS));
sourceBuilder.query(QueryBuilders.matchQuery("name", keyword));
request.source(sourceBuilder);
// 设置请求超时
RequestOptions.Builder options = RequestOptions.DEFAULT.toBuilder();
options.setHttpAsyncResponseConsumerFactory(
new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024));
return client.search(request, options.build());
}
// 6. 并行查询
public List<SearchResponse> parallelSearch(List<String> keywords) throws InterruptedException, ExecutionException {
List<CompletableFuture<SearchResponse>> futures = new ArrayList<>();
for (String keyword : keywords) {
CompletableFuture<SearchResponse> future = CompletableFuture.supplyAsync(() -> {
try {
return searchByMatch(keyword);
} catch (IOException e) {
throw new RuntimeException(e);
}
}, executor);
futures.add(future);
}
// 等待所有查询完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allFutures.get(); // 等待完成
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
}
代码清单 9:查询性能优化
6. 批量操作与实时性
6.1 Bulk 批量操作
@Component
@Slf4j
public class BulkOperationService {
// 1. 简单的 Bulk 操作
public void bulkIndexProducts(List<Product> products) throws IOException {
BulkRequest request = new BulkRequest();
for (Product product : products) {
IndexRequest indexRequest = new IndexRequest("products")
.id(product.getId())
.source(JsonUtils.toJson(product), XContentType.JSON);
request.add(indexRequest);
}
BulkResponse response = client.bulk(request, RequestOptions.DEFAULT);
if (response.hasFailures()) {
log.error("Bulk 操作失败:{}", response.buildFailureMessage());
// 处理失败项
for (BulkItemResponse item : response.getItems()) {
if (item.isFailed()) {
log.error("文档{}失败:{}", item.getId(), item.getFailureMessage());
// 重试逻辑
retryFailedItem(item);
}
}
} else {
log.info("Bulk 操作成功,处理{}个文档", products.size());
}
}
// 2. 带回调的 Bulk
public void bulkWithListener(List<Product> products) {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
log.info("开始执行 Bulk,包含{}个请求", request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
log.error("Bulk 执行失败:{}", response.buildFailureMessage());
} else {
log.info("Bulk 执行成功,耗时{}ms", response.getTook().getMillis());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
log.error("Bulk 执行异常", failure);
}
};
BulkProcessor.Builder builder = BulkProcessor.builder(
(request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener);
// 配置
builder.setBulkActions(1000) // 每 1000 个请求执行一次
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) // 每 5MB 执行一次
.setFlushInterval(TimeValue.timeValueSeconds(5)) // 每 5 秒执行一次
.setConcurrentRequests(2) // 并发数
.setBackoffPolicy(BackoffPolicy
.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
BulkProcessor processor = builder.build();
// 添加文档
for (Product product : products) {
IndexRequest request = new IndexRequest("products")
.id(product.getId())
.source(JsonUtils.toJson(product), XContentType.JSON);
processor.add(request);
}
// 关闭 processor
processor.awaitClose(30, TimeUnit.SECONDS);
}
// 3. 更新操作
public void bulkUpdateProducts(List<Product> products) throws IOException {
BulkRequest request = new BulkRequest();
for (Product product : products) {
UpdateRequest updateRequest = new UpdateRequest("products", product.getId())
.doc(JsonUtils.toJson(product), XContentType.JSON)
.docAsUpsert(true); // 如果不存在则插入
request.add(updateRequest);
}
client.bulk(request, RequestOptions.DEFAULT);
}
// 4. 删除操作
public void bulkDeleteProducts(List<String> ids) throws IOException {
BulkRequest request = new BulkRequest();
for (String id : ids) {
DeleteRequest deleteRequest = new DeleteRequest("products", id);
request.add(deleteRequest);
}
client.bulk(request, RequestOptions.DEFAULT);
}
// 5. 批量查询
public MultiGetResponse bulkGetProducts(List<String> ids) throws IOException {
MultiGetRequest request = new MultiGetRequest();
for (String id : ids) {
request.add(new MultiGetRequest.Item("products", id));
}
return client.mget(request, RequestOptions.DEFAULT);
}
}
代码清单 10:批量操作实现
6.2 实时性控制
@Component
@Slf4j
public class RealtimeControlService {
// 1. 立即刷新
public void indexWithRefresh(Product product) throws IOException {
IndexRequest request = new IndexRequest("products")
.id(product.getId())
.source(JsonUtils.toJson(product), XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
// 立即刷新
client.index(request, RequestOptions.DEFAULT);
}
// 2. 等待刷新
public void indexWithWaitFor(Product product) throws IOException {
IndexRequest request = new IndexRequest("products")
.id(product.getId())
.source(JsonUtils.toJson(product), XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
// 等待刷新
client.index(request, RequestOptions.DEFAULT);
}
// 3. 手动刷新
public void manualRefresh() throws IOException {
RefreshRequest request = new RefreshRequest("products");
RefreshResponse response = client.indices().refresh(request, RequestOptions.DEFAULT);
log.info("手动刷新完成,分片数:{}", response.getTotalShards());
}
// 4. 强制合并
public void forceMerge() throws IOException {
ForceMergeRequest request = new ForceMergeRequest("products");
request.maxNumSegments(1); // 合并为 1 个 segment
request.onlyExpungeDeletes(true); // 只清理删除的文档
ForceMergeResponse response = client.indices().forcemerge(request, RequestOptions.DEFAULT);
log.info("强制合并完成,分片数:{}", response.getTotalShards());
}
// 5. 刷新间隔设置
public void updateRefreshInterval(int seconds) throws IOException {
UpdateSettingsRequest request = new UpdateSettingsRequest("products");
Settings settings = Settings.builder()
.put("index.refresh_interval", seconds + "s")
.build();
request.settings(settings);
AcknowledgedResponse response = client.indices()
.putSettings(request, RequestOptions.DEFAULT);
if (response.isAcknowledged()) {
log.info("更新刷新间隔成功:{}秒", seconds);
}
}
// 6. 实时搜索方案
public SearchResponse realtimeSearch(String keyword) throws IOException {
// 先搜索
SearchResponse response = searchByMatch(keyword);
// 如果没结果,等待并重试
if (response.getHits().getTotalHits().value == 0) {
try {
Thread.sleep(1000); // 等待 1 秒
response = searchByMatch(keyword);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
return response;
}
}
代码清单 11:实时性控制
7. 企业级实战案例
7.1 电商商品搜索系统
@Service
@Slf4j
public class ECommerceSearchService {
// 商品搜索
public SearchResult<Product> searchProducts(ProductSearchRequest request) throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 构建 Bool 查询
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// 关键词搜索
if (StringUtils.isNotBlank(request.getKeyword())) {
handleKeywordSearch(boolQuery, request.getKeyword());
}
// 分类过滤
if (request.getCategoryId() != null) {
boolQuery.filter(QueryBuilders.termQuery("category_id", request.getCategoryId()));
}
// 价格区间
if (request.getMinPrice() != null || request.getMaxPrice() != null) {
RangeQueryBuilder priceRange = QueryBuilders.rangeQuery("price");
if (request.getMinPrice() != null) {
priceRange.gte(request.getMinPrice());
}
if (request.getMaxPrice() != null) {
priceRange.lte(request.getMaxPrice());
}
boolQuery.filter(priceRange);
}
// 品牌过滤
if (CollectionUtils.isNotEmpty(request.getBrandIds())) {
boolQuery.filter(QueryBuilders.termsQuery("brand_id", request.getBrandIds()));
}
// 属性过滤
if (CollectionUtils.isNotEmpty(request.getAttributes())) {
handleAttributesFilter(boolQuery, request.getAttributes());
}
// 地理位置过滤
if (request.getLocation() != null && request.getDistance() != null) {
handleGeoFilter(boolQuery, request.getLocation(), request.getDistance());
}
sourceBuilder.query(boolQuery);
// 排序
handleSorting(sourceBuilder, request.getSortBy(), request.getSortOrder());
// 分页
sourceBuilder.from((request.getPage() - 1) * request.getSize())
.size(request.getSize());
// 高亮
if (StringUtils.isNotBlank(request.getKeyword())) {
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.field("name")
.preTags("<em>")
.postTags("</em>")
.fragmentSize(200)
.numOfFragments(3);
sourceBuilder.highlighter(highlightBuilder);
}
// 聚合
addAggregations(sourceBuilder);
// 执行搜索
SearchResponse response = executeSearch(sourceBuilder);
// 处理结果
return processSearchResult(response, request);
}
private void handleKeywordSearch(BoolQueryBuilder boolQuery, String keyword) {
// 多种搜索方式组合
BoolQueryBuilder keywordQuery = QueryBuilders.boolQuery();
// 1. 精确匹配(最高权重)
keywordQuery.should(QueryBuilders.matchPhraseQuery("name", keyword)
.boost(3.0f));
// 2. 拼音匹配
keywordQuery.should(QueryBuilders.matchQuery("name.pinyin", keyword)
.boost(2.0f));
// 3. 分词匹配
keywordQuery.should(QueryBuilders.matchQuery("name", keyword)
.boost(1.0f));
// 4. 描述匹配
keywordQuery.should(QueryBuilders.matchQuery("description", keyword)
.boost(0.5f));
// 5. 标签匹配
keywordQuery.should(QueryBuilders.matchQuery("tags", keyword)
.boost(0.3f));
boolQuery.must(keywordQuery);
}
private void handleAttributesFilter(BoolQueryBuilder boolQuery, List<AttributeFilter> attributes) {
for (AttributeFilter attr : attributes) {
NestedQueryBuilder nestedQuery = QueryBuilders.nestedQuery(
"attributes",
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("attributes.key", attr.getKey()))
.must(QueryBuilders.termsQuery("attributes.value", attr.getValues())),
ScoreMode.None);
boolQuery.filter(nestedQuery);
}
}
private void handleGeoFilter(BoolQueryBuilder boolQuery, GeoPoint location, String distance) {
GeoDistanceQueryBuilder geoQuery = QueryBuilders.geoDistanceQuery("location")
.point(location.getLat(), location.getLon())
.distance(distance);
boolQuery.filter(geoQuery);
}
private void handleSorting(SearchSourceBuilder sourceBuilder, String sortBy, String sortOrder) {
if ("price".equals(sortBy)) {
sourceBuilder.sort("price", "desc".equals(sortOrder) ? SortOrder.DESC : SortOrder.ASC);
} else if ("sales".equals(sortBy)) {
sourceBuilder.sort("sales", SortOrder.DESC);
} else if ("score".equals(sortBy)) {
sourceBuilder.sort(SortBuilders.scoreSort());
} else {
// 综合排序:分数*0.6 + 销量*0.3 + 价格*0.1
Script script = new Script(
"doc['_score'].value * 0.6 + " +
"doc['sales'].value * 0.3 + " +
"(10000 - doc['price'].value) * 0.1");
sourceBuilder.sort(SortBuilders.scriptSort(script, ScriptSortBuilder.ScriptSortType.NUMBER));
}
}
private void addAggregations(SearchSourceBuilder sourceBuilder) {
// 价格区间聚合
AggregationBuilder priceAgg = AggregationBuilders
.range("price_agg")
.field("price")
.addRange(0, 100)
.addRange(100, 300)
.addRange(300, 500)
.addRange(500, 1000)
.addRange(1000, 5000);
// 品牌聚合
AggregationBuilder brandAgg = AggregationBuilders
.terms("brand_agg")
.field("brand_id")
.size(20);
// 分类聚合
AggregationBuilder categoryAgg = AggregationBuilders
.terms("category_agg")
.field("category_id")
.size(10);
// 属性聚合
AggregationBuilder attributeAgg = AggregationBuilders
.nested("attributes_agg", "attributes")
.subAggregation(AggregationBuilders
.terms("attribute_keys")
.field("attributes.key")
.size(10)
.subAggregation(AggregationBuilders
.terms("attribute_values")
.field("attributes.value")
.size(10)));
sourceBuilder.aggregation(priceAgg);
sourceBuilder.aggregation(brandAgg);
sourceBuilder.aggregation(categoryAgg);
sourceBuilder.aggregation(attributeAgg);
}
private SearchResult<Product> processSearchResult(SearchResponse response, ProductSearchRequest request) {
SearchResult<Product> result = new SearchResult<>();
// 总条数
result.setTotal(response.getHits().getTotalHits().value);
// 当前页数据
List<Product> products = new ArrayList<>();
for (SearchHit hit : response.getHits()) {
Product product = JsonUtils.fromJson(hit.getSourceAsString(), Product.class);
product.setScore(hit.getScore());
// 高亮处理
if (hit.getHighlightFields() != null) {
HighlightField highlight = hit.getHighlightFields().get("name");
if (highlight != null && highlight.getFragments().length > 0) {
product.setHighlightName(highlight.getFragments()[0].string());
}
}
products.add(product);
}
result.setData(products);
// 聚合结果
processAggregations(result, response.getAggregations());
// 搜索建议
if (StringUtils.isNotBlank(request.getKeyword()) && products.isEmpty()) {
result.setSuggestions(getSuggestions(request.getKeyword()));
}
return result;
}
}
代码清单 12:电商商品搜索系统
7.2 日志分析系统
@Service
@Slf4j
public class LogAnalysisService {
// 日志搜索
public LogSearchResult searchLogs(LogSearchRequest request) throws IOException {
String index = getLogIndex(request.getStartTime(), request.getEndTime());
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 时间范围查询
RangeQueryBuilder timeRange = QueryBuilders.rangeQuery("@timestamp")
.gte(request.getStartTime())
.lte(request.getEndTime())
.format("yyyy-MM-dd HH:mm:ss");
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
.filter(timeRange);
// 关键词搜索
if (StringUtils.isNotBlank(request.getKeyword())) {
boolQuery.must(QueryBuilders.queryStringQuery(request.getKeyword())
.field("message")
.field("level")
.field("logger_name")
.defaultOperator(Operator.AND));
}
// 级别过滤
if (CollectionUtils.isNotEmpty(request.getLevels())) {
boolQuery.filter(QueryBuilders.termsQuery("level", request.getLevels()));
}
// 应用过滤
if (StringUtils.isNotBlank(request.getApplication())) {
boolQuery.filter(QueryBuilders.termQuery("application", request.getApplication()));
}
sourceBuilder.query(boolQuery);
// 排序
sourceBuilder.sort(SortBuilders.fieldSort("@timestamp")
.order(SortOrder.DESC));
// 分页
sourceBuilder.from((request.getPage() - 1) * request.getSize())
.size(request.getSize());
// 聚合
addLogAggregations(sourceBuilder, request);
// 执行搜索
SearchResponse response = executeSearch(index, sourceBuilder);
return processLogSearchResult(response, request);
}
// 错误日志统计
public ErrorStats getErrorStats(Date startTime, Date endTime, String application) throws IOException {
String index = getLogIndex(startTime, endTime);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 时间范围
RangeQueryBuilder timeRange = QueryBuilders.rangeQuery("@timestamp")
.gte(startTime)
.lte(endTime);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
.filter(timeRange)
.filter(QueryBuilders.termQuery("level", "ERROR"));
if (StringUtils.isNotBlank(application)) {
boolQuery.filter(QueryBuilders.termQuery("application", application));
}
sourceBuilder.query(boolQuery);
sourceBuilder.size(0);
// 错误数量聚合
AggregationBuilder countAgg = AggregationBuilders
.cardinality("error_count")
.field("trace_id");
// 按应用分组
AggregationBuilder appAgg = AggregationBuilders
.terms("by_application")
.field("application")
.size(20)
.subAggregation(AggregationBuilders
.cardinality("app_error_count")
.field("trace_id"));
// 按时间分组
AggregationBuilder timeAgg = AggregationBuilders
.dateHistogram("by_time")
.field("@timestamp")
.calendarInterval(DateHistogramInterval.HOUR)
.format("yyyy-MM-dd HH:00")
.minDocCount(0)
.extendedBounds(
new LongBounds(startTime.getTime(), endTime.getTime()))
.subAggregation(AggregationBuilders
.cardinality("hour_error_count")
.field("trace_id"));
sourceBuilder.aggregation(countAgg);
sourceBuilder.aggregation(appAgg);
sourceBuilder.aggregation(timeAgg);
SearchResponse response = executeSearch(index, sourceBuilder);
return processErrorStats(response);
}
// 慢查询分析
public SlowQueryStats analyzeSlowQueries(Date startTime, Date endTime) throws IOException {
String index = getLogIndex(startTime, endTime);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 查找慢查询日志
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
.filter(QueryBuilders.rangeQuery("@timestamp")
.gte(startTime)
.lte(endTime))
.filter(QueryBuilders.termQuery("logger_name", "slow_query"))
.filter(QueryBuilders.rangeQuery("duration").gte(1000)); // 超过 1 秒
sourceBuilder.query(boolQuery);
sourceBuilder.size(0);
// 按 SQL 类型分组
AggregationBuilder sqlTypeAgg = AggregationBuilders
.terms("by_sql_type")
.field("sql_type")
.size(10)
.subAggregation(AggregationBuilders
.avg("avg_duration")
.field("duration"))
.subAggregation(AggregationBuilders
.max("max_duration")
.field("duration"))
.subAggregation(AggregationBuilders
.percentiles("duration_percentiles")
.field("duration")
.percentiles(50, 90, 95, 99));
// 按表分组
AggregationBuilder tableAgg = AggregationBuilders
.terms("by_table")
.field("table_name")
.size(20)
.subAggregation(AggregationBuilders
.avg("avg_duration")
.field("duration"));
sourceBuilder.aggregation(sqlTypeAgg);
sourceBuilder.aggregation(tableAgg);
SearchResponse response = executeSearch(index, sourceBuilder);
return processSlowQueryStats(response);
}
}
代码清单 13:日志分析系统
8. 性能优化与监控
8.1 性能调优
@Component
@Slf4j
public class PerformanceTuner {
// 1. JVM 调优
public void tuneJvm() {
// ES 推荐 JVM 配置
// -Xms4g -Xmx4g
// 堆内存,不超过 32GB
// -XX:+UseG1GC
// G1 垃圾回收器
// -XX:MaxGCPauseMillis=200
// -XX:G1ReservePercent=25
// -XX:InitiatingHeapOccupancyPercent=30
}
// 2. 索引设置优化
public Settings getOptimizedSettings() {
return Settings.builder()
.put("index.number_of_shards", 3) // 根据数据量调整
.put("index.number_of_replicas", 1)
.put("index.refresh_interval", "30s") // 写入频繁时调大
.put("index.translog.durability", "async") // 异步 translog
.put("index.translog.sync_interval", "5s")
.put("index.translog.flush_threshold_size", "512mb")
.put("index.merge.scheduler.max_thread_count", 1) // 机械硬盘
.put("index.merge.scheduler.max_merge_count", 6)
.put("index.unassigned.node_left.delayed_timeout", "5m")
.build();
}
// 3. 查询调优
public SearchSourceBuilder tuneSearch(SearchSourceBuilder builder) {
// 启用查询缓存
builder.query(QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("name", "手机"))
.filter(QueryBuilders.termQuery("status", "active")))
.requestCache(true); // 开启查询缓存
// 设置超时
builder.timeout(TimeValue.timeValueSeconds(5));
// 限制返回字段
builder.fetchSource(new String[]{"id", "name", "price"}, null);
// 禁用评分
builder.query(QueryBuilders.constantScoreQuery(
QueryBuilders.termQuery("category", "electronics")));
return builder;
}
// 4. 监控指标收集
@Scheduled(fixedDelay = 60000)
public void collectMetrics() throws IOException {
// 集群健康
ClusterHealthRequest healthRequest = new ClusterHealthRequest();
ClusterHealthResponse healthResponse = client.cluster()
.health(healthRequest, RequestOptions.DEFAULT);
// 节点状态
NodesStatsRequest nodesRequest = new NodesStatsRequest();
nodesRequest.indices(true);
nodesRequest.os(true);
nodesRequest.jvm(true);
nodesRequest.threadPool(true);
NodesStatsResponse nodesResponse = client.nodes()
.stats(nodesRequest, RequestOptions.DEFAULT);
// 索引状态
IndicesStatsRequest indicesRequest = new IndicesStatsRequest();
indicesRequest.all();
IndicesStatsResponse indicesResponse = client.indices()
.stats(indicesRequest, RequestOptions.DEFAULT);
// 发送到监控系统
sendToMonitoringSystem(healthResponse, nodesResponse, indicesResponse);
// 检查是否需要扩容
checkAndScale(healthResponse, nodesResponse, indicesResponse);
}
// 5. 热点分片识别
public void identifyHotShards() throws IOException {
IndicesStatsRequest request = new IndicesStatsRequest();
request.all();
IndicesStatsResponse response = client.indices()
.stats(request, RequestOptions.DEFAULT);
Map<String, IndicesStats> indices = response.getIndices();
for (Map.Entry<String, IndicesStats> entry : indices.entrySet()) {
String index = entry.getKey();
IndicesStats stats = entry.getValue();
ShardStats[] shardStats = stats.getShards();
for (ShardStats shardStat : shardStats) {
// 检查查询负载
if (shardStat.getStats().getSearch().getTotal().getQueryCount() > 10000) {
// 阈值
log.warn("热点分片:{}/{},查询次数:{}", index, shardStat.getShardRouting().getId(), shardStat.getStats().getSearch().getTotal().getQueryCount());
// 触发分片重分配
rerouteHotShard(index, shardStat);
}
}
}
}
}
代码清单 14:性能调优
8.2 监控告警
# prometheus 配置
scrape_configs:
- job_name: 'elasticsearch'
static_configs:
- targets: ['localhost:9200']
metrics_path: '/_prometheus/metrics'
# 告警规则
alerting_rules:
- alert: ClusterHealthRed
expr: elasticsearch_cluster_health_status{color="red"} == 1
for: 5m
labels:
severity: critical
annotations:
summary: "ES 集群状态为 RED"
- alert: HighCpuUsage
expr: rate(elasticsearch_process_cpu_percent[5m]) > 0.8
for: 2m
labels:
severity: warning
annotations:
summary: "ES 节点 CPU 使用率过高"
- alert: HighHeapUsage
expr: elasticsearch_jvm_memory_used_bytes / elasticsearch_jvm_memory_max_bytes > 0.8
for: 2m
labels:
severity: warning
annotations:
summary: "ES 节点堆内存使用率过高"
- alert: HighDiskUsage
expr: elasticsearch_filesystem_data_used_bytes / elasticsearch_filesystem_data_size_bytes > 0.8
for: 5m
labels:
severity: warning
annotations:
summary: "ES 节点磁盘使用率过高"
- alert: HighSearchLatency
expr: histogram_quantile(0.95, rate(elasticsearch_indices_search_query_time_seconds_bucket[5m])) > 1
for: 2m
labels:
severity: warning
annotations:
summary: "ES 搜索延迟过高"
- alert: UnassignedShards
expr: elasticsearch_cluster_health_unassigned_shards > 0
for: 5m
labels:
severity: critical
annotations:
summary: "ES 有未分配的分片"
代码清单 15:监控告警配置
9. 故障排查指南
9.1 常见问题排查
@Component
@Slf4j
public class TroubleshootingGuide {
// 1. 搜索慢
public void diagnoseSlowSearch(String index, String query) throws IOException {
// 启用 profile
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchQuery("name", query));
sourceBuilder.profile(true);
SearchRequest request = new SearchRequest(index);
request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 分析 profile 结果
Map<String, ProfileShardResult> profileResults = response.getProfileResults();
for (Map.Entry<String, ProfileShardResult> entry : profileResults.entrySet()) {
log.info("分片{}的 profile 结果:", entry.getKey());
for (QueryProfileShardResult queryProfile : entry.getValue().getQueryProfileResults()) {
log.info("查询类型:{}, 耗时:{}ms", queryProfile.getQueryName(), queryProfile.getTime());
// 打印详细信息
for (ProfileResult profileResult : queryProfile.getQueryResults()) {
log.info("详细信息:{}", profileResult);
}
}
}
}
// 2. 写入慢
public void diagnoseSlowIndexing(String index) throws IOException {
// 检查 refresh 间隔
GetSettingsRequest settingsRequest = new GetSettingsRequest()
.indices(index)
.names("index.refresh_interval");
GetSettingsResponse settingsResponse = client.indices()
.getSettings(settingsRequest, RequestOptions.DEFAULT);
log.info("refresh 间隔:{}", settingsResponse.getSetting(index, "index.refresh_interval"));
// 检查 translog
IndicesStatsRequest statsRequest = new IndicesStatsRequest()
.indices(index)
.clear()
.docs(true)
.store(true)
.indexing(true)
.search(true);
IndicesStatsResponse statsResponse = client.indices()
.stats(statsRequest, RequestOptions.DEFAULT);
IndexStats indexStats = statsResponse.getIndex(index);
log.info("索引统计:{}", indexStats);
}
// 3. 内存高
public void diagnoseHighMemory() throws IOException {
NodesStatsRequest request = new NodesStatsRequest();
request.jvm(true);
NodesStatsResponse response = client.nodes()
.stats(request, RequestOptions.DEFAULT);
for (NodeStats nodeStats : response.getNodes()) {
JvmStats jvmStats = nodeStats.getJvm();
JvmStats.Mem mem = jvmStats.getMem();
log.info("节点{}内存使用:{}/{}, GC 次数:{}", nodeStats.getNode().getName(), mem.getUsed(), mem.getHeapMax(), jvmStats.getGc().getCollectors().get("young").getCollectionCount());
// 检查 fielddata
if (nodeStats.getIndices().getFieldData() != null) {
log.info("fielddata 内存:{}", nodeStats.getIndices().getFieldData().getMemorySize());
}
}
}
// 4. 分片未分配
public void diagnoseUnassignedShards() throws IOException {
ClusterAllocationExplainRequest request = new ClusterAllocationExplainRequest();
ClusterAllocationExplainResponse response = client.cluster()
.allocationExplain(request, RequestOptions.DEFAULT);
log.info("分片分配解释:{}", response.getExplanation());
// 检查磁盘空间
NodesStatsRequest nodesRequest = new NodesStatsRequest();
nodesRequest.fs(true);
NodesStatsResponse nodesResponse = client.nodes()
.stats(nodesRequest, RequestOptions.DEFAULT);
for (NodeStats nodeStats : nodesResponse.getNodes()) {
FsInfo fsInfo = nodeStats.getFs();
for (FsInfo.Path path : fsInfo) {
log.info("节点{}磁盘使用:{}/{} ({}%)", nodeStats.getNode().getName(), path.getAvailable(), path.getTotal(), (path.getTotal() - path.getAvailable()) * 100 / path.getTotal());
}
}
}
// 5. 热点节点
public void diagnoseHotNodes() throws IOException {
NodesStatsRequest request = new NodesStatsRequest();
request.indices(true);
request.os(true);
NodesStatsResponse response = client.nodes()
.stats(request, RequestOptions.DEFAULT);
for (NodeStats nodeStats : response.getNodes()) {
// 检查查询负载
long queryCount = nodeStats.getIndices().getSearch()
.getTotal().getQueryCount();
// 检查索引负载
long indexCount = nodeStats.getIndices().getIndexing()
.getTotal().getIndexCount();
// 检查 CPU
double cpuPercent = nodeStats.getOs().getCpu().getPercent();
log.info("节点{}负载 - 查询:{}, 索引:{}, CPU: {}%", nodeStats.getNode().getName(), queryCount, indexCount, cpuPercent);
if (cpuPercent > 80 || queryCount > 10000) {
log.warn("节点{}可能是热点节点", nodeStats.getNode().getName());
}
}
}
}
代码清单 16:故障排查工具
10. 选型与总结
10.1 ES vs 其他方案对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| Elasticsearch | 功能全,生态好,性能优秀 | 资源消耗大,运维复杂 | 全文搜索、日志分析 |
| Solr | 成熟稳定,功能丰富 | 社区活跃度下降,实时性差 | 文档搜索、企业搜索 |
| OpenSearch | ES 开源分支,AWS 支持 | 生态不如 ES | AWS 环境,需要完全开源 |
| MeiliSearch | 轻量快速,简单易用 | 功能相对简单 | 小型应用,简单搜索 |
| PostgreSQL | 事务支持,SQL 查询 | 搜索功能弱,性能差 | 已有 PG,简单搜索需求 |
10.2 最佳实践总结
- 分片设计要合理:单个分片不超过 50GB
- 映射设计要严谨:禁用动态映射,明确字段类型
- 查询要优化:避免 wildcard,善用 filter
- 监控要全面:集群健康、性能指标、业务指标
- 容量要规划:提前规划扩容,设置水位线
- 备份要定期:定期快照,测试恢复
总结
Elasticsearch 是强大的搜索引擎,但不是银弹。理解原理,合理设计,持续监控,才能用好这个强大的工具。
团队在实践中常遇到分片数不合理、查询未优化、缺乏监控导致故障等问题。
记住:ES 是工具,不是魔法。结合业务特点,设计合适方案,做好监控和优化,才是正道。
参考资料
官方文档
- Elasticsearch 官方文档 - 最全的 ES 文档
- Java 客户端文档 - Java 客户端详细文档
源码学习
- Elasticsearch 源码 - 官方源码
- Lucene 源码 - 底层搜索引擎
最佳实践
- Elasticsearch 最佳实践 - 官方最佳实践
- 电商搜索架构 - 电商搜索架构设计
监控工具
- Kibana 监控 - ES 官方监控工具
- Prometheus 监控 - 指标监控
最后建议:从简单场景开始,理解原理后再尝试复杂方案。做好监控,设置合理的分片和副本,定期优化查询。记住:搜索优化是个持续的过程,不是一次性的任务。


