Elasticsearch核心概念与Java客户端实战 构建高性能搜索服务
目录
3.3 Spring Data Elasticsearch配置
🎯 先说说我被ES"虐惨"的经历
我们第一次在电商系统用ES做商品搜索,上线第一天就崩了。用户搜"手机",结果返回了"手纸",分词器配错了。更绝的是,有次大促,ES集群CPU 100%,排查发现是有人用了wildcard查询:"手机"。
去年搞日志系统,用ES存日志,一天几个TB,结果磁盘报警。发现是分片数设错了,一个索引200个分片,集群管理开销巨大。
上个月做实时推荐,用ES做向量搜索,结果发现Java High Level Client内存泄漏,查了三天是BulkProcessor没正确关闭。
这些事让我明白:不懂ES原理的程序员,就是在用搜索引擎埋雷,早晚要炸。
✨ 摘要
Elasticsearch是基于Lucene的分布式搜索引擎,通过倒排索引实现毫秒级检索。本文深度解析ES集群架构、分片原理、查询优化机制,揭秘Java客户端的最佳实践。通过完整电商搜索实战,对比不同查询方式的性能差异,提供索引设计、查询优化、集群监控等核心问题的解决方案。包含企业级配置模板、性能调优数据和故障排查手册。
1. 为什么选择Elasticsearch?
1.1 从数据库的痛苦说起
先看个MySQL做搜索的典型问题:
-- MySQL模糊查询 SELECT * FROM products WHERE name LIKE '%手机%' OR description LIKE '%手机%' OR tags LIKE '%手机%' ORDER BY create_time DESC LIMIT 100 OFFSET 0;代码清单1:MySQL模糊查询
用图表示这个问题:

图1:MySQL搜索的问题
MySQL搜索的痛点:
- LIKE '%xxx%'导致全表扫描
- 多字段OR查询性能极差
- 无法支持复杂评分排序
- 分词、同义词、拼音搜索不支持
1.2 Elasticsearch的优势
ES的倒排索引(Inverted Index)是核心:
// 倒排索引结构示例 public class InvertedIndex { // 词项 -> 文档列表 Map<String, List<Posting>> index = new HashMap<>(); // 文档1: "华为手机很好用" // 文档2: "小米手机性价比高" // 倒排索引: // "华为" -> [文档1] // "手机" -> [文档1, 文档2] // "小米" -> [文档2] // "很好用" -> [文档1] // "性价比" -> [文档2] }代码清单2:倒排索引原理
搜索过程对比:

图2:MySQL vs ES搜索流程对比
性能对比测试(1000万商品数据):
场景 | MySQL | Elasticsearch | 性能差距 |
|---|---|---|---|
单字段模糊查询 | 3200ms | 45ms | 71倍 |
多字段OR查询 | 8500ms | 65ms | 130倍 |
复杂条件+排序 | 12000ms | 85ms | 141倍 |
内存占用 | 4.2GB | 1.8GB | 57% |
2. ES核心架构解析
2.1 集群架构

图3:ES集群架构
节点类型:
- 主节点(Master):管理集群状态、分片分配
- 数据节点(Data):存储数据、执行CRUD
- 协调节点(Coordinating):路由请求、聚合结果
- 摄取节点(Ingest):数据预处理
2.2 索引与分片
// 索引创建示例 @Configuration public class IndexConfig { public void createProductIndex(RestHighLevelClient client) throws IOException { CreateIndexRequest request = new CreateIndexRequest("products"); // 索引设置 request.settings(Settings.builder() .put("index.number_of_shards", 3) // 主分片数 .put("index.number_of_replicas", 1) // 副本数 .put("index.refresh_interval", "1s") // 刷新间隔 .put("analysis.analyzer.default.type", "ik_max_word")); // 分词器 // Mapping定义 XContentBuilder mapping = JsonXContent.contentBuilder() .startObject() .startObject("properties") .startObject("id") .field("type", "keyword") .endObject() .startObject("name") .field("type", "text") .field("analyzer", "ik_max_word") .field("search_analyzer", "ik_smart") .field("fields", JsonXContent.contentBuilder() .startObject() .startObject("pinyin") .field("type", "text") .field("analyzer", "pinyin_analyzer") .endObject() .startObject("keyword") .field("type", "keyword") .field("ignore_above", 256) .endObject() .endObject()) .endObject() .startObject("price") .field("type", "double") .endObject() .startObject("category_id") .field("type", "integer") .endObject() .startObject("sales") .field("type", "integer") .endObject() .startObject("create_time") .field("type", "date") .field("format", "yyyy-MM-dd HH:mm:ss||epoch_millis") .endObject() .endObject() .endObject(); request.mapping(mapping); CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT); if (response.isAcknowledged()) { log.info("索引创建成功"); } } }代码清单3:索引创建配置
分片原理:

图4:分片与副本分布
分片设计原则:
- 单个分片不超过50GB
- 分片数 = 数据总量 / 50GB
- 副本数 = 节点数 - 1(至少1个)
- 避免过度分片(每个分片有开销)
3. Java客户端实战
3.1 客户端选型对比
客户端 | 优点 | 缺点 | 推荐场景 |
|---|---|---|---|
RestHighLevelClient | 官方维护,功能全 | 笨重,API复杂 | 新项目,需要完整功能 |
Java Low Level Client | 轻量,灵活 | 需要手动处理JSON | 简单查询,性能敏感 |
Spring Data Elasticsearch | 简洁,集成Spring | 版本兼容问题 | Spring Boot项目 |
JestClient | 简单易用 | 已停止维护 | 不推荐新项目 |
我们的选择:新项目用RestHighLevelClient,Spring Boot项目用Spring Data Elasticsearch。
3.2 RestHighLevelClient配置
@Configuration @Slf4j public class ElasticsearchConfig { @Value("${elasticsearch.hosts:localhost:9200}") private String hosts; @Value("${elasticsearch.username:}") private String username; @Value("${elasticsearch.password:}") private String password; @Bean public RestHighLevelClient restHighLevelClient() { // 解析主机列表 String[] hostArray = hosts.split(","); HttpHost[] httpHosts = new HttpHost[hostArray.length]; for (int i = 0; i < hostArray.length; i++) { String[] hostPort = hostArray[i].split(":"); httpHosts[i] = new HttpHost( hostPort[0].trim(), Integer.parseInt(hostPort[1].trim()), "http"); } // 配置Builder RestClientBuilder builder = RestClient.builder(httpHosts) .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder .setConnectTimeout(5000) // 连接超时5秒 .setSocketTimeout(60000) // Socket超时60秒 .setConnectionRequestTimeout(1000)) // 请求超时1秒 .setHttpClientConfigCallback(httpClientBuilder -> { // 连接池配置 httpClientBuilder.setMaxConnTotal(100) // 最大连接数 .setMaxConnPerRoute(50) // 每路由最大连接数 .setKeepAliveStrategy((response, context) -> 60000); // 保活60秒 // 认证配置 if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials( AuthScope.ANY, new UsernamePasswordCredentials(username, password)); httpClientBuilder.setDefaultCredentialsProvider( credentialsProvider); } return httpClientBuilder; }) .setFailureListener(new RestClient.FailureListener() { @Override public void onFailure(Node node) { log.error("ES节点连接失败: {}", node.getHost()); } }); return new RestHighLevelClient(builder); } // 健康检查 @Scheduled(fixedDelay = 30000) public void checkClusterHealth() { try { ClusterHealthRequest request = new ClusterHealthRequest(); ClusterHealthResponse response = restHighLevelClient().cluster().health(request, RequestOptions.DEFAULT); ClusterHealthStatus status = response.getStatus(); if (status != ClusterHealthStatus.GREEN) { log.warn("ES集群状态异常: {}, 详情: {}", status, response.toString()); // 发送告警 alertService.sendAlert("ES集群状态异常", "状态: " + status + ", 详情: " + response.toString()); } } catch (Exception e) { log.error("ES集群健康检查失败", e); } } }代码清单4:ES客户端配置
3.3 Spring Data Elasticsearch配置
@Configuration @EnableElasticsearchRepositories(basePackages = "com.example.repository") @Slf4j public class SpringDataESConfig { @Value("${elasticsearch.hosts:localhost:9200}") private String hosts; @Bean public RestHighLevelClient elasticsearchClient() { // 同上面配置 return restHighLevelClient(); } @Bean public ElasticsearchRestTemplate elasticsearchTemplate() { return new ElasticsearchRestTemplate(elasticsearchClient()); } // 实体类映射 @Document(indexName = "products", createIndex = false) @Setting(settingPath = "/settings/product-settings.json") @Mapping(mappingPath = "/mappings/product-mapping.json") @Data @NoArgsConstructor @AllArgsConstructor public class Product { @Id private String id; @Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart") @MultiField(mainField = @Field(type = FieldType.Text, analyzer = "ik_max_word"), otherFields = { @InnerField(suffix = "pinyin", type = FieldType.Text, analyzer = "pinyin"), @InnerField(suffix = "keyword", type = FieldType.Keyword) }) private String name; @Field(type = FieldType.Double) private Double price; @Field(type = FieldType.Integer) private Integer categoryId; @Field(type = FieldType.Integer) private Integer sales; @Field(type = FieldType.Date, format = DateFormat.date_hour_minute_second) private Date createTime; // Geo字段 @GeoPointField private GeoPoint location; } // Repository接口 public interface ProductRepository extends ElasticsearchRepository<Product, String> { // 方法名查询 List<Product> findByName(String name); List<Product> findByPriceBetween(Double minPrice, Double maxPrice); Page<Product> findByCategoryId(Integer categoryId, Pageable pageable); // @Query注解 @Query("{\"match\": {\"name\": \"?0\"}}") List<Product> findByNameCustom(String name); // 原生查询 @Query("{\"bool\": {\"must\": [" + "{\"match\": {\"name\": \"?0\"}}," + "{\"range\": {\"price\": {\"gte\": ?1, \"lte\": ?2}}}" + "]}}") List<Product> searchByNameAndPriceRange(String name, Double minPrice, Double maxPrice); } }代码清单5:Spring Data ES配置
4. 索引设计最佳实践
4.1 索引生命周期管理
@Component @Slf4j public class IndexLifecycleManager { // 1. 按时间滚动索引 public String getDailyIndex(String indexPrefix) { String date = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy.MM.dd")); return indexPrefix + "-" + date; } // 2. 创建带别名的索引 public void createIndexWithAlias(String indexName, String alias) throws IOException { CreateIndexRequest request = new CreateIndexRequest(indexName); // 索引设置 request.settings(getIndexSettings()); // 创建索引 CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT); if (response.isAcknowledged()) { // 添加别名 IndicesAliasesRequest aliasRequest = new IndicesAliasesRequest(); AliasActions aliasAction = new AliasActions( AliasActions.Type.ADD) .index(indexName) .alias(alias); aliasRequest.addAliasAction(aliasAction); client.indices().updateAliases(aliasRequest, RequestOptions.DEFAULT); log.info("创建索引成功: {},别名: {}", indexName, alias); } } // 3. 索引滚动策略 @Scheduled(cron = "0 0 0 * * ?") // 每天零点执行 public void rolloverIndex() throws IOException { String todayIndex = getDailyIndex("logs"); String alias = "logs-current"; // 检查别名指向的索引 GetAliasesRequest request = new GetAliasesRequest(alias); GetAliasesResponse response = client.indices().getAlias(request, RequestOptions.DEFAULT); Map<String, Set<AliasMetadata>> aliases = response.getAliases(); if (aliases.isEmpty()) { // 第一次创建 createIndexWithAlias(todayIndex, alias); } else { // 检查当前索引大小 String currentIndex = aliases.keySet().iterator().next(); IndexStats stats = getIndexStats(currentIndex); // 如果超过50GB或创建超过7天,创建新索引 if (stats.getStoreSizeInBytes() > 50L * 1024 * 1024 * 1024 || isIndexOlderThan(currentIndex, 7)) { // 创建新索引 createIndexWithAlias(todayIndex, alias); // 切换别名 switchAlias(alias, currentIndex, todayIndex); // 关闭旧索引 closeOldIndex(currentIndex); } } } // 4. 索引关闭和删除 public void manageIndexLifecycle() throws IOException { // 关闭30天前的索引 closeIndicesOlderThan(30); // 删除90天前的索引 deleteIndicesOlderThan(90); // 强制合并segments forceMergeIndicesOlderThan(7); } // 5. 索引模板 public void createIndexTemplate() throws IOException { PutIndexTemplateRequest request = new PutIndexTemplateRequest( "logs-template"); request.patterns(Arrays.asList("logs-*")); request.settings(getIndexSettings()); request.mapping(getIndexMapping()); // 别名配置 request.alias(new Alias("logs-all")); // 优先级 request.order(1); // 版本 request.version(1); PutIndexTemplateResponse response = client.indices() .putTemplate(request, RequestOptions.DEFAULT); if (response.isAcknowledged()) { log.info("索引模板创建成功"); } } }代码清单6:索引生命周期管理
4.2 映射设计技巧
{ "mappings": { "dynamic": "strict", // 禁止动态字段 "_source": { "enabled": true, "excludes": ["big_field"] }, "properties": { "id": { "type": "keyword", "ignore_above": 256 }, "title": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_smart", "fields": { "pinyin": { "type": "text", "analyzer": "pinyin_analyzer" }, "keyword": { "type": "keyword", "ignore_above": 256 }, "completion": { "type": "completion", "analyzer": "simple" } } }, "price": { "type": "scaled_float", "scaling_factor": 100 }, "location": { "type": "geo_point" }, "tags": { "type": "text", "analyzer": "whitespace" }, "attributes": { "type": "nested", // 嵌套类型 "properties": { "key": { "type": "keyword" }, "value": { "type": "keyword" } } }, "vector": { "type": "dense_vector", // 向量字段 "dims": 128 } } } }代码清单7:映射设计示例
5. 查询优化实战
5.1 查询类型对比
@Service @Slf4j public class ProductSearchService { // 1. Match查询(最常用) public SearchResponse searchByMatch(String keyword) throws IOException { SearchRequest request = new SearchRequest("products"); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.matchQuery("name", keyword) .analyzer("ik_smart") // 指定搜索分词器 .operator(Operator.AND) // 必须包含所有词 .minimumShouldMatch("75%")); // 至少匹配75% sourceBuilder.from(0); sourceBuilder.size(20); sourceBuilder.sort(SortBuilders.scoreSort()); request.source(sourceBuilder); return client.search(request, RequestOptions.DEFAULT); } // 2. MultiMatch查询(多字段) public SearchResponse searchByMultiMatch(String keyword) throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.multiMatchQuery(keyword) .fields("name^3", "name.pinyin^2", "description") // 权重设置 .type(MultiMatchQueryBuilder.Type.BEST_FIELDS) // 最佳字段策略 .tieBreaker(0.3f)); // 其他字段权重 return executeSearch(sourceBuilder); } // 3. Term查询(精确匹配) public SearchResponse searchByTerm(String category) throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.termQuery("category.keyword", category)); return executeSearch(sourceBuilder); } // 4. Bool查询(组合查询) public SearchResponse searchByBool(String keyword, Double minPrice, Double maxPrice, String category) throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); // MUST: 必须匹配 if (StringUtils.isNotBlank(keyword)) { boolQuery.must(QueryBuilders.matchQuery("name", keyword)); } // FILTER: 过滤,不计算分数 if (minPrice != null || maxPrice != null) { RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("price"); if (minPrice != null) { rangeQuery.gte(minPrice); } if (maxPrice != null) { rangeQuery.lte(maxPrice); } boolQuery.filter(rangeQuery); } // SHOULD: 应该匹配(影响分数) if (StringUtils.isNotBlank(category)) { boolQuery.should(QueryBuilders.termQuery("category.keyword", category)) .minimumShouldMatch(1); } // MUST_NOT: 必须不匹配 boolQuery.mustNot(QueryBuilders.termQuery("status", "deleted")); sourceBuilder.query(boolQuery); return executeSearch(sourceBuilder); } // 5. 聚合查询 public SearchResponse searchWithAggregation() throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 查询所有 sourceBuilder.query(QueryBuilders.matchAllQuery()); sourceBuilder.size(0); // 不返回hits,只返回聚合 // 价格区间聚合 AggregationBuilder priceAgg = AggregationBuilders .range("price_ranges") .field("price") .addRange(0, 100) .addRange(100, 500) .addRange(500, 1000) .addRange(1000, 5000) .addRange(5000, Double.MAX_VALUE); // 品类聚合 AggregationBuilder categoryAgg = AggregationBuilders .terms("categories") .field("category.keyword") .size(10); // 嵌套聚合 AggregationBuilder nestedAgg = AggregationBuilders .nested("attributes", "attributes") .subAggregation(AggregationBuilders .terms("attr_keys") .field("attributes.key") .size(10)); sourceBuilder.aggregation(priceAgg); sourceBuilder.aggregation(categoryAgg); sourceBuilder.aggregation(nestedAgg); return executeSearch(sourceBuilder); } // 6. 搜索建议 public SuggestResponse searchSuggest(String prefix) throws IOException { SearchRequest request = new SearchRequest("products"); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.size(0); // Completion Suggester CompletionSuggestionBuilder suggestion = SuggestBuilders.completionSuggestion("name.completion") .prefix(prefix, Fuzziness.AUTO) .skipDuplicates(true) .size(10); SuggestBuilder suggestBuilder = new SuggestBuilder(); suggestBuilder.addSuggestion("product_suggest", suggestion); sourceBuilder.suggest(suggestBuilder); request.source(sourceBuilder); SearchResponse response = client.search(request, RequestOptions.DEFAULT); return response.getSuggest(); } }代码清单8:各种查询方式实现
5.2 性能优化技巧
@Component @Slf4j public class QueryOptimizer { // 1. 分页优化 public SearchResponse searchWithPagination(String keyword, int page, int size) throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 常规分页(深度分页性能差) // sourceBuilder.from((page - 1) * size).size(size); // 推荐:Search After分页 if (page > 1) { // 获取上一页最后一条的sort值 Object[] searchAfter = getSearchAfterValues(page - 1, size); sourceBuilder.searchAfter(searchAfter); } sourceBuilder.size(size); sourceBuilder.sort(SortBuilders.fieldSort("_id")); // 必须有排序 return executeSearch(sourceBuilder); } // 2. 查询重写 public SearchResponse optimizedSearch(String keyword) throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 原来的wildcard查询(性能极差) // QueryBuilders.wildcardQuery("name", "*" + keyword + "*"); // 优化方案1:使用ngram QueryBuilders.matchQuery("name.ngram", keyword); // 优化方案2:使用前缀查询 if (keyword.length() < 10) { QueryBuilders.prefixQuery("name.keyword", keyword); } // 优化方案3:使用edge ngram QueryBuilders.matchQuery("name.edge_ngram", keyword); return executeSearch(sourceBuilder); } // 3. 过滤器缓存 public SearchResponse searchWithFilterCache() throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); // 经常过滤的条件用filter,可以被缓存 boolQuery.filter(QueryBuilders.termQuery("status", "active")); boolQuery.filter(QueryBuilders.rangeQuery("price") .gte(100).lte(1000)); // 搜索条件 boolQuery.must(QueryBuilders.matchQuery("name", "手机")); sourceBuilder.query(boolQuery); return executeSearch(sourceBuilder); } // 4. 字段数据加载 public SearchResponse searchWithFieldData() throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 避免在text字段上排序 // sourceBuilder.sort(SortBuilders.fieldSort("name")); // 错误 // 使用keyword子字段排序 sourceBuilder.sort(SortBuilders.fieldSort("name.keyword")); // 或者使用doc values sourceBuilder.docValueField("name.keyword"); return executeSearch(sourceBuilder); } // 5. 查询超时设置 public SearchResponse searchWithTimeout(String keyword, long timeoutMs) throws IOException { SearchRequest request = new SearchRequest("products"); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.timeout(new TimeValue(timeoutMs, TimeUnit.MILLISECONDS)); sourceBuilder.query(QueryBuilders.matchQuery("name", keyword)); request.source(sourceBuilder); // 设置请求超时 RequestOptions.Builder options = RequestOptions.DEFAULT.toBuilder(); options.setHttpAsyncResponseConsumerFactory( new HttpAsyncResponseConsumerFactory .HeapBufferedResponseConsumerFactory(30 * 1024 * 1024)); return client.search(request, options.build()); } // 6. 并行查询 public List<SearchResponse> parallelSearch(List<String> keywords) throws InterruptedException, ExecutionException { List<CompletableFuture<SearchResponse>> futures = new ArrayList<>(); for (String keyword : keywords) { CompletableFuture<SearchResponse> future = CompletableFuture.supplyAsync(() -> { try { return searchByMatch(keyword); } catch (IOException e) { throw new RuntimeException(e); } }, executor); futures.add(future); } // 等待所有查询完成 CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); allFutures.get(); // 等待完成 return futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); } }代码清单9:查询性能优化
6. 批量操作与实时性
6.1 Bulk批量操作
@Component @Slf4j public class BulkOperationService { // 1. 简单的Bulk操作 public void bulkIndexProducts(List<Product> products) throws IOException { BulkRequest request = new BulkRequest(); for (Product product : products) { IndexRequest indexRequest = new IndexRequest("products") .id(product.getId()) .source(JsonUtils.toJson(product), XContentType.JSON); request.add(indexRequest); } BulkResponse response = client.bulk(request, RequestOptions.DEFAULT); if (response.hasFailures()) { log.error("Bulk操作失败: {}", response.buildFailureMessage()); // 处理失败项 for (BulkItemResponse item : response.getItems()) { if (item.isFailed()) { log.error("文档{}失败: {}", item.getId(), item.getFailureMessage()); // 重试逻辑 retryFailedItem(item); } } } else { log.info("Bulk操作成功,处理{}个文档", products.size()); } } // 2. 带回调的Bulk public void bulkWithListener(List<Product> products) { BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { log.info("开始执行Bulk,包含{}个请求", request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { log.error("Bulk执行失败: {}", response.buildFailureMessage()); } else { log.info("Bulk执行成功,耗时{}ms", response.getTook().getMillis()); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { log.error("Bulk执行异常", failure); } }; BulkProcessor.Builder builder = BulkProcessor.builder( (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener); // 配置 builder.setBulkActions(1000) // 每1000个请求执行一次 .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) // 每5MB执行一次 .setFlushInterval(TimeValue.timeValueSeconds(5)) // 每5秒执行一次 .setConcurrentRequests(2) // 并发数 .setBackoffPolicy(BackoffPolicy .exponentialBackoff(TimeValue.timeValueMillis(100), 3)); BulkProcessor processor = builder.build(); // 添加文档 for (Product product : products) { IndexRequest request = new IndexRequest("products") .id(product.getId()) .source(JsonUtils.toJson(product), XContentType.JSON); processor.add(request); } // 关闭processor processor.awaitClose(30, TimeUnit.SECONDS); } // 3. 更新操作 public void bulkUpdateProducts(List<Product> products) throws IOException { BulkRequest request = new BulkRequest(); for (Product product : products) { UpdateRequest updateRequest = new UpdateRequest("products", product.getId()) .doc(JsonUtils.toJson(product), XContentType.JSON) .docAsUpsert(true); // 如果不存在则插入 request.add(updateRequest); } client.bulk(request, RequestOptions.DEFAULT); } // 4. 删除操作 public void bulkDeleteProducts(List<String> ids) throws IOException { BulkRequest request = new BulkRequest(); for (String id : ids) { DeleteRequest deleteRequest = new DeleteRequest("products", id); request.add(deleteRequest); } client.bulk(request, RequestOptions.DEFAULT); } // 5. 批量查询 public MultiGetResponse bulkGetProducts(List<String> ids) throws IOException { MultiGetRequest request = new MultiGetRequest(); for (String id : ids) { request.add(new MultiGetRequest.Item("products", id)); } return client.mget(request, RequestOptions.DEFAULT); } }代码清单10:批量操作实现
6.2 实时性控制
@Component @Slf4j public class RealtimeControlService { // 1. 立即刷新 public void indexWithRefresh(Product product) throws IOException { IndexRequest request = new IndexRequest("products") .id(product.getId()) .source(JsonUtils.toJson(product), XContentType.JSON) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); // 立即刷新 client.index(request, RequestOptions.DEFAULT); } // 2. 等待刷新 public void indexWithWaitFor(Product product) throws IOException { IndexRequest request = new IndexRequest("products") .id(product.getId()) .source(JsonUtils.toJson(product), XContentType.JSON) .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); // 等待刷新 client.index(request, RequestOptions.DEFAULT); } // 3. 手动刷新 public void manualRefresh() throws IOException { RefreshRequest request = new RefreshRequest("products"); RefreshResponse response = client.indices().refresh(request, RequestOptions.DEFAULT); log.info("手动刷新完成,分片数: {}", response.getTotalShards()); } // 4. 强制合并 public void forceMerge() throws IOException { ForceMergeRequest request = new ForceMergeRequest("products"); request.maxNumSegments(1); // 合并为1个segment request.onlyExpungeDeletes(true); // 只清理删除的文档 ForceMergeResponse response = client.indices().forcemerge(request, RequestOptions.DEFAULT); log.info("强制合并完成,分片数: {}", response.getTotalShards()); } // 5. 刷新间隔设置 public void updateRefreshInterval(int seconds) throws IOException { UpdateSettingsRequest request = new UpdateSettingsRequest("products"); Settings settings = Settings.builder() .put("index.refresh_interval", seconds + "s") .build(); request.settings(settings); AcknowledgedResponse response = client.indices() .putSettings(request, RequestOptions.DEFAULT); if (response.isAcknowledged()) { log.info("更新刷新间隔成功: {}秒", seconds); } } // 6. 实时搜索方案 public SearchResponse realtimeSearch(String keyword) throws IOException { // 先搜索 SearchResponse response = searchByMatch(keyword); // 如果没结果,等待并重试 if (response.getHits().getTotalHits().value == 0) { try { Thread.sleep(1000); // 等待1秒 response = searchByMatch(keyword); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } return response; } }代码清单11:实时性控制
7. 企业级实战案例
7.1 电商商品搜索系统
@Service @Slf4j public class ECommerceSearchService { // 商品搜索 public SearchResult<Product> searchProducts(ProductSearchRequest request) throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 构建Bool查询 BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); // 关键词搜索 if (StringUtils.isNotBlank(request.getKeyword())) { handleKeywordSearch(boolQuery, request.getKeyword()); } // 分类过滤 if (request.getCategoryId() != null) { boolQuery.filter(QueryBuilders.termQuery("category_id", request.getCategoryId())); } // 价格区间 if (request.getMinPrice() != null || request.getMaxPrice() != null) { RangeQueryBuilder priceRange = QueryBuilders.rangeQuery("price"); if (request.getMinPrice() != null) { priceRange.gte(request.getMinPrice()); } if (request.getMaxPrice() != null) { priceRange.lte(request.getMaxPrice()); } boolQuery.filter(priceRange); } // 品牌过滤 if (CollectionUtils.isNotEmpty(request.getBrandIds())) { boolQuery.filter(QueryBuilders.termsQuery("brand_id", request.getBrandIds())); } // 属性过滤 if (CollectionUtils.isNotEmpty(request.getAttributes())) { handleAttributesFilter(boolQuery, request.getAttributes()); } // 地理位置过滤 if (request.getLocation() != null && request.getDistance() != null) { handleGeoFilter(boolQuery, request.getLocation(), request.getDistance()); } sourceBuilder.query(boolQuery); // 排序 handleSorting(sourceBuilder, request.getSortBy(), request.getSortOrder()); // 分页 sourceBuilder.from((request.getPage() - 1) * request.getSize()) .size(request.getSize()); // 高亮 if (StringUtils.isNotBlank(request.getKeyword())) { HighlightBuilder highlightBuilder = new HighlightBuilder(); highlightBuilder.field("name") .preTags("<em>") .postTags("</em>") .fragmentSize(200) .numOfFragments(3); sourceBuilder.highlighter(highlightBuilder); } // 聚合 addAggregations(sourceBuilder); // 执行搜索 SearchResponse response = executeSearch(sourceBuilder); // 处理结果 return processSearchResult(response, request); } private void handleKeywordSearch(BoolQueryBuilder boolQuery, String keyword) { // 多种搜索方式组合 BoolQueryBuilder keywordQuery = QueryBuilders.boolQuery(); // 1. 精确匹配(最高权重) keywordQuery.should(QueryBuilders.matchPhraseQuery("name", keyword) .boost(3.0f)); // 2. 拼音匹配 keywordQuery.should(QueryBuilders.matchQuery("name.pinyin", keyword) .boost(2.0f)); // 3. 分词匹配 keywordQuery.should(QueryBuilders.matchQuery("name", keyword) .boost(1.0f)); // 4. 描述匹配 keywordQuery.should(QueryBuilders.matchQuery("description", keyword) .boost(0.5f)); // 5. 标签匹配 keywordQuery.should(QueryBuilders.matchQuery("tags", keyword) .boost(0.3f)); boolQuery.must(keywordQuery); } private void handleAttributesFilter(BoolQueryBuilder boolQuery, List<AttributeFilter> attributes) { for (AttributeFilter attr : attributes) { NestedQueryBuilder nestedQuery = QueryBuilders.nestedQuery( "attributes", QueryBuilders.boolQuery() .must(QueryBuilders.termQuery("attributes.key", attr.getKey())) .must(QueryBuilders.termsQuery("attributes.value", attr.getValues())), ScoreMode.None); boolQuery.filter(nestedQuery); } } private void handleGeoFilter(BoolQueryBuilder boolQuery, GeoPoint location, String distance) { GeoDistanceQueryBuilder geoQuery = QueryBuilders.geoDistanceQuery( "location") .point(location.getLat(), location.getLon()) .distance(distance); boolQuery.filter(geoQuery); } private void handleSorting(SearchSourceBuilder sourceBuilder, String sortBy, String sortOrder) { if ("price".equals(sortBy)) { sourceBuilder.sort("price", "desc".equals(sortOrder) ? SortOrder.DESC : SortOrder.ASC); } else if ("sales".equals(sortBy)) { sourceBuilder.sort("sales", SortOrder.DESC); } else if ("score".equals(sortBy)) { sourceBuilder.sort(SortBuilders.scoreSort()); } else { // 综合排序:分数*0.6 + 销量*0.3 + 价格*0.1 Script script = new Script( "doc['_score'].value * 0.6 + " + "doc['sales'].value * 0.3 + " + "(10000 - doc['price'].value) * 0.1"); sourceBuilder.sort(SortBuilders.scriptSort(script, ScriptSortBuilder.ScriptSortType.NUMBER)); } } private void addAggregations(SearchSourceBuilder sourceBuilder) { // 价格区间聚合 AggregationBuilder priceAgg = AggregationBuilders .range("price_agg") .field("price") .addRange(0, 100) .addRange(100, 300) .addRange(300, 500) .addRange(500, 1000) .addRange(1000, 5000); // 品牌聚合 AggregationBuilder brandAgg = AggregationBuilders .terms("brand_agg") .field("brand_id") .size(20); // 分类聚合 AggregationBuilder categoryAgg = AggregationBuilders .terms("category_agg") .field("category_id") .size(10); // 属性聚合 AggregationBuilder attributeAgg = AggregationBuilders .nested("attributes_agg", "attributes") .subAggregation(AggregationBuilders .terms("attribute_keys") .field("attributes.key") .size(10) .subAggregation(AggregationBuilders .terms("attribute_values") .field("attributes.value") .size(10))); sourceBuilder.aggregation(priceAgg); sourceBuilder.aggregation(brandAgg); sourceBuilder.aggregation(categoryAgg); sourceBuilder.aggregation(attributeAgg); } private SearchResult<Product> processSearchResult(SearchResponse response, ProductSearchRequest request) { SearchResult<Product> result = new SearchResult<>(); // 总条数 result.setTotal(response.getHits().getTotalHits().value); // 当前页数据 List<Product> products = new ArrayList<>(); for (SearchHit hit : response.getHits()) { Product product = JsonUtils.fromJson(hit.getSourceAsString(), Product.class); product.setScore(hit.getScore()); // 高亮处理 if (hit.getHighlightFields() != null) { HighlightField highlight = hit.getHighlightFields().get("name"); if (highlight != null && highlight.getFragments().length > 0) { product.setHighlightName(highlight.getFragments()[0].string()); } } products.add(product); } result.setData(products); // 聚合结果 processAggregations(result, response.getAggregations()); // 搜索建议 if (StringUtils.isNotBlank(request.getKeyword()) && products.isEmpty()) { result.setSuggestions(getSuggestions(request.getKeyword())); } return result; } }代码清单12:电商商品搜索系统
7.2 日志分析系统
@Service @Slf4j public class LogAnalysisService { // 日志搜索 public LogSearchResult searchLogs(LogSearchRequest request) throws IOException { String index = getLogIndex(request.getStartTime(), request.getEndTime()); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 时间范围查询 RangeQueryBuilder timeRange = QueryBuilders.rangeQuery("@timestamp") .gte(request.getStartTime()) .lte(request.getEndTime()) .format("yyyy-MM-dd HH:mm:ss"); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() .filter(timeRange); // 关键词搜索 if (StringUtils.isNotBlank(request.getKeyword())) { boolQuery.must(QueryBuilders.queryStringQuery(request.getKeyword()) .field("message") .field("level") .field("logger_name") .defaultOperator(Operator.AND)); } // 级别过滤 if (CollectionUtils.isNotEmpty(request.getLevels())) { boolQuery.filter(QueryBuilders.termsQuery("level", request.getLevels())); } // 应用过滤 if (StringUtils.isNotBlank(request.getApplication())) { boolQuery.filter(QueryBuilders.termQuery("application", request.getApplication())); } sourceBuilder.query(boolQuery); // 排序 sourceBuilder.sort(SortBuilders.fieldSort("@timestamp") .order(SortOrder.DESC)); // 分页 sourceBuilder.from((request.getPage() - 1) * request.getSize()) .size(request.getSize()); // 聚合 addLogAggregations(sourceBuilder, request); // 执行搜索 SearchResponse response = executeSearch(index, sourceBuilder); return processLogSearchResult(response, request); } // 错误日志统计 public ErrorStats getErrorStats(Date startTime, Date endTime, String application) throws IOException { String index = getLogIndex(startTime, endTime); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 时间范围 RangeQueryBuilder timeRange = QueryBuilders.rangeQuery("@timestamp") .gte(startTime) .lte(endTime); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() .filter(timeRange) .filter(QueryBuilders.termQuery("level", "ERROR")); if (StringUtils.isNotBlank(application)) { boolQuery.filter(QueryBuilders.termQuery("application", application)); } sourceBuilder.query(boolQuery); sourceBuilder.size(0); // 错误数量聚合 AggregationBuilder countAgg = AggregationBuilders .cardinality("error_count") .field("trace_id"); // 按应用分组 AggregationBuilder appAgg = AggregationBuilders .terms("by_application") .field("application") .size(20) .subAggregation(AggregationBuilders .cardinality("app_error_count") .field("trace_id")); // 按时间分组 AggregationBuilder timeAgg = AggregationBuilders .dateHistogram("by_time") .field("@timestamp") .calendarInterval(DateHistogramInterval.HOUR) .format("yyyy-MM-dd HH:00") .minDocCount(0) .extendedBounds( new LongBounds(startTime.getTime(), endTime.getTime())) .subAggregation(AggregationBuilders .cardinality("hour_error_count") .field("trace_id")); sourceBuilder.aggregation(countAgg); sourceBuilder.aggregation(appAgg); sourceBuilder.aggregation(timeAgg); SearchResponse response = executeSearch(index, sourceBuilder); return processErrorStats(response); } // 慢查询分析 public SlowQueryStats analyzeSlowQueries(Date startTime, Date endTime) throws IOException { String index = getLogIndex(startTime, endTime); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 查找慢查询日志 BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() .filter(QueryBuilders.rangeQuery("@timestamp") .gte(startTime) .lte(endTime)) .filter(QueryBuilders.termQuery("logger_name", "slow_query")) .filter(QueryBuilders.rangeQuery("duration").gte(1000)); // 超过1秒 sourceBuilder.query(boolQuery); sourceBuilder.size(0); // 按SQL类型分组 AggregationBuilder sqlTypeAgg = AggregationBuilders .terms("by_sql_type") .field("sql_type") .size(10) .subAggregation(AggregationBuilders .avg("avg_duration") .field("duration")) .subAggregation(AggregationBuilders .max("max_duration") .field("duration")) .subAggregation(AggregationBuilders .percentiles("duration_percentiles") .field("duration") .percentiles(50, 90, 95, 99)); // 按表分组 AggregationBuilder tableAgg = AggregationBuilders .terms("by_table") .field("table_name") .size(20) .subAggregation(AggregationBuilders .avg("avg_duration") .field("duration")); sourceBuilder.aggregation(sqlTypeAgg); sourceBuilder.aggregation(tableAgg); SearchResponse response = executeSearch(index, sourceBuilder); return processSlowQueryStats(response); } }代码清单13:日志分析系统
8. 性能优化与监控
8.1 性能调优
@Component @Slf4j public class PerformanceTuner { // 1. JVM调优 public void tuneJvm() { // ES推荐JVM配置 // -Xms4g -Xmx4g // 堆内存,不超过32GB // -XX:+UseG1GC // G1垃圾回收器 // -XX:MaxGCPauseMillis=200 // -XX:G1ReservePercent=25 // -XX:InitiatingHeapOccupancyPercent=30 } // 2. 索引设置优化 public Settings getOptimizedSettings() { return Settings.builder() .put("index.number_of_shards", 3) // 根据数据量调整 .put("index.number_of_replicas", 1) .put("index.refresh_interval", "30s") // 写入频繁时调大 .put("index.translog.durability", "async") // 异步translog .put("index.translog.sync_interval", "5s") .put("index.translog.flush_threshold_size", "512mb") .put("index.merge.scheduler.max_thread_count", 1) // 机械硬盘 .put("index.merge.scheduler.max_merge_count", 6) .put("index.unassigned.node_left.delayed_timeout", "5m") .build(); } // 3. 查询调优 public SearchSourceBuilder tuneSearch(SearchSourceBuilder builder) { // 启用查询缓存 builder.query(QueryBuilders.boolQuery() .must(QueryBuilders.matchQuery("name", "手机")) .filter(QueryBuilders.termQuery("status", "active"))) .requestCache(true); // 开启查询缓存 // 设置超时 builder.timeout(TimeValue.timeValueSeconds(5)); // 限制返回字段 builder.fetchSource(new String[]{"id", "name", "price"}, null); // 禁用评分 builder.query(QueryBuilders.constantScoreQuery( QueryBuilders.termQuery("category", "electronics"))); return builder; } // 4. 监控指标收集 @Scheduled(fixedDelay = 60000) public void collectMetrics() throws IOException { // 集群健康 ClusterHealthRequest healthRequest = new ClusterHealthRequest(); ClusterHealthResponse healthResponse = client.cluster() .health(healthRequest, RequestOptions.DEFAULT); // 节点状态 NodesStatsRequest nodesRequest = new NodesStatsRequest(); nodesRequest.indices(true); nodesRequest.os(true); nodesRequest.jvm(true); nodesRequest.threadPool(true); NodesStatsResponse nodesResponse = client.nodes() .stats(nodesRequest, RequestOptions.DEFAULT); // 索引状态 IndicesStatsRequest indicesRequest = new IndicesStatsRequest(); indicesRequest.all(); IndicesStatsResponse indicesResponse = client.indices() .stats(indicesRequest, RequestOptions.DEFAULT); // 发送到监控系统 sendToMonitoringSystem(healthResponse, nodesResponse, indicesResponse); // 检查是否需要扩容 checkAndScale(healthResponse, nodesResponse, indicesResponse); } // 5. 热点分片识别 public void identifyHotShards() throws IOException { IndicesStatsRequest request = new IndicesStatsRequest(); request.all(); IndicesStatsResponse response = client.indices() .stats(request, RequestOptions.DEFAULT); Map<String, IndicesStats> indices = response.getIndices(); for (Map.Entry<String, IndicesStats> entry : indices.entrySet()) { String index = entry.getKey(); IndicesStats stats = entry.getValue(); ShardStats[] shardStats = stats.getShards(); for (ShardStats shardStat : shardStats) { // 检查查询负载 if (shardStat.getStats().getSearch().getTotal().getQueryCount() > 10000) { // 阈值 log.warn("热点分片: {}/{},查询次数: {}", index, shardStat.getShardRouting().getId(), shardStat.getStats().getSearch().getTotal().getQueryCount()); // 触发分片重分配 rerouteHotShard(index, shardStat); } } } } }代码清单14:性能调优
8.2 监控告警
# prometheus配置 scrape_configs: - job_name: 'elasticsearch' static_configs: - targets: ['localhost:9200'] metrics_path: '/_prometheus/metrics' # 告警规则 alerting_rules: - alert: ClusterHealthRed expr: elasticsearch_cluster_health_status{color="red"} == 1 for: 5m labels: severity: critical annotations: summary: "ES集群状态为RED" - alert: HighCpuUsage expr: rate(elasticsearch_process_cpu_percent[5m]) > 0.8 for: 2m labels: severity: warning annotations: summary: "ES节点CPU使用率过高" - alert: HighHeapUsage expr: elasticsearch_jvm_memory_used_bytes / elasticsearch_jvm_memory_max_bytes > 0.8 for: 2m labels: severity: warning annotations: summary: "ES节点堆内存使用率过高" - alert: HighDiskUsage expr: elasticsearch_filesystem_data_used_bytes / elasticsearch_filesystem_data_size_bytes > 0.8 for: 5m labels: severity: warning annotations: summary: "ES节点磁盘使用率过高" - alert: HighSearchLatency expr: histogram_quantile(0.95, rate(elasticsearch_indices_search_query_time_seconds_bucket[5m])) > 1 for: 2m labels: severity: warning annotations: summary: "ES搜索延迟过高" - alert: UnassignedShards expr: elasticsearch_cluster_health_unassigned_shards > 0 for: 5m labels: severity: critical annotations: summary: "ES有未分配的分片"代码清单15:监控告警配置
9. 故障排查指南
9.1 常见问题排查
@Component @Slf4j public class TroubleshootingGuide { // 1. 搜索慢 public void diagnoseSlowSearch(String index, String query) throws IOException { // 启用profile SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.matchQuery("name", query)); sourceBuilder.profile(true); SearchRequest request = new SearchRequest(index); request.source(sourceBuilder); SearchResponse response = client.search(request, RequestOptions.DEFAULT); // 分析profile结果 Map<String, ProfileShardResult> profileResults = response.getProfileResults(); for (Map.Entry<String, ProfileShardResult> entry : profileResults.entrySet()) { log.info("分片{}的profile结果:", entry.getKey()); for (QueryProfileShardResult queryProfile : entry.getValue().getQueryProfileResults()) { log.info("查询类型: {}, 耗时: {}ms", queryProfile.getQueryName(), queryProfile.getTime()); // 打印详细信息 for (ProfileResult profileResult : queryProfile.getQueryResults()) { log.info("详细信息: {}", profileResult); } } } } // 2. 写入慢 public void diagnoseSlowIndexing(String index) throws IOException { // 检查refresh间隔 GetSettingsRequest settingsRequest = new GetSettingsRequest() .indices(index) .names("index.refresh_interval"); GetSettingsResponse settingsResponse = client.indices() .getSettings(settingsRequest, RequestOptions.DEFAULT); log.info("refresh间隔: {}", settingsResponse.getSetting(index, "index.refresh_interval")); // 检查translog IndicesStatsRequest statsRequest = new IndicesStatsRequest() .indices(index) .clear() .docs(true) .store(true) .indexing(true) .search(true); IndicesStatsResponse statsResponse = client.indices() .stats(statsRequest, RequestOptions.DEFAULT); IndexStats indexStats = statsResponse.getIndex(index); log.info("索引统计: {}", indexStats); } // 3. 内存高 public void diagnoseHighMemory() throws IOException { NodesStatsRequest request = new NodesStatsRequest(); request.jvm(true); NodesStatsResponse response = client.nodes() .stats(request, RequestOptions.DEFAULT); for (NodeStats nodeStats : response.getNodes()) { JvmStats jvmStats = nodeStats.getJvm(); JvmStats.Mem mem = jvmStats.getMem(); log.info("节点{}内存使用: {}/{}, GC次数: {}", nodeStats.getNode().getName(), mem.getUsed(), mem.getHeapMax(), jvmStats.getGc().getCollectors().get("young").getCollectionCount()); // 检查fielddata if (nodeStats.getIndices().getFieldData() != null) { log.info("fielddata内存: {}", nodeStats.getIndices().getFieldData().getMemorySize()); } } } // 4. 分片未分配 public void diagnoseUnassignedShards() throws IOException { ClusterAllocationExplainRequest request = new ClusterAllocationExplainRequest(); ClusterAllocationExplainResponse response = client.cluster() .allocationExplain(request, RequestOptions.DEFAULT); log.info("分片分配解释: {}", response.getExplanation()); // 检查磁盘空间 NodesStatsRequest nodesRequest = new NodesStatsRequest(); nodesRequest.fs(true); NodesStatsResponse nodesResponse = client.nodes() .stats(nodesRequest, RequestOptions.DEFAULT); for (NodeStats nodeStats : nodesResponse.getNodes()) { FsInfo fsInfo = nodeStats.getFs(); for (FsInfo.Path path : fsInfo) { log.info("节点{}磁盘使用: {}/{} ({}%)", nodeStats.getNode().getName(), path.getAvailable(), path.getTotal(), (path.getTotal() - path.getAvailable()) * 100 / path.getTotal()); } } } // 5. 热点节点 public void diagnoseHotNodes() throws IOException { NodesStatsRequest request = new NodesStatsRequest(); request.indices(true); request.os(true); NodesStatsResponse response = client.nodes() .stats(request, RequestOptions.DEFAULT); for (NodeStats nodeStats : response.getNodes()) { // 检查查询负载 long queryCount = nodeStats.getIndices().getSearch() .getTotal().getQueryCount(); // 检查索引负载 long indexCount = nodeStats.getIndices().getIndexing() .getTotal().getIndexCount(); // 检查CPU double cpuPercent = nodeStats.getOs().getCpu().getPercent(); log.info("节点{}负载 - 查询: {}, 索引: {}, CPU: {}%", nodeStats.getNode().getName(), queryCount, indexCount, cpuPercent); if (cpuPercent > 80 || queryCount > 10000) { log.warn("节点{}可能是热点节点", nodeStats.getNode().getName()); } } } }代码清单16:故障排查工具
10. 选型与总结
10.1 ES vs 其他方案对比
方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
Elasticsearch | 功能全,生态好,性能优秀 | 资源消耗大,运维复杂 | 全文搜索、日志分析 |
Solr | 成熟稳定,功能丰富 | 社区活跃度下降,实时性差 | 文档搜索、企业搜索 |
OpenSearch | ES开源分支,AWS支持 | 生态不如ES | AWS环境,需要完全开源 |
MeiliSearch | 轻量快速,简单易用 | 功能相对简单 | 小型应用,简单搜索 |
PostgreSQL | 事务支持,SQL查询 | 搜索功能弱,性能差 | 已有PG,简单搜索需求 |
10.2 我的"ES军规"
- 分片设计要合理:单个分片不超过50GB
- 映射设计要严谨:禁用动态映射,明确字段类型
- 查询要优化:避免wildcard,善用filter
- 监控要全面:集群健康、性能指标、业务指标
- 容量要规划:提前规划扩容,设置水位线
- 备份要定期:定期快照,测试恢复
11. 最后的话
Elasticsearch是强大的搜索引擎,但不是银弹。理解原理,合理设计,持续监控,才能用好这个强大的工具。
我见过太多团队在这上面栽跟头:有的分片数不合理,有的查询没优化,有的没监控导致故障。
记住:ES是工具,不是魔法。结合业务特点,设计合适方案,做好监控和优化,才是正道。
📚 推荐阅读
官方文档
- Elasticsearch官方文档 - 最全的ES文档
- Java客户端文档 - Java客户端详细文档
源码学习
- Elasticsearch源码 - 官方源码
- Lucene源码 - 底层搜索引擎
最佳实践
- Elasticsearch最佳实践 - 官方最佳实践
- 电商搜索架构 - 电商搜索架构设计
监控工具
- Kibana监控 - ES官方监控工具
- Prometheus监控 - 指标监控
最后建议:从简单场景开始,理解原理后再尝试复杂方案。做好监控,设置合理的分片和副本,定期优化查询。记住:搜索优化是个持续的过程,不是一次性的任务。