跳到主要内容
极客日志极客日志
首页博客AI提示词GitHub精选代理工具
搜索
|注册
博客列表
Javajava算法

Elasticsearch 核心概念与 Java 客户端实战

Elasticsearch 基于 Lucene 实现分布式全文检索。解析集群架构、分片原理及 Java 客户端配置,涵盖 RestHighLevelClient 与 Spring Data Elasticsearch 选型。提供索引设计、查询优化、批量操作及监控告警方案。通过电商搜索与日志分析案例,展示性能调优与故障排查方法,包含 JVM 设置、分片策略及冷热数据管理最佳实践。

神经兮兮发布于 2026/3/21更新于 2026/4/265 浏览
Elasticsearch 核心概念与 Java 客户端实战

生产环境中的常见挑战

在生产环境中,Elasticsearch 常面临性能与稳定性挑战。例如电商系统上线初期可能因分词器配置错误导致搜索结果不准确,大促期间集群 CPU 100% 往往源于 wildcard 查询使用不当。日志系统存储 TB 级数据时,分片数设置错误会导致管理开销巨大。向量搜索场景下,Java High Level Client 内存泄漏需通过正确关闭 BulkProcessor 解决。

经验表明:理解 ES 原理的开发者能避免搜索引擎埋雷风险。

摘要

Elasticsearch 是基于 Lucene 的分布式搜索引擎,通过倒排索引实现毫秒级检索。本文深度解析 ES 集群架构、分片原理、查询优化机制,揭秘 Java 客户端的最佳实践。通过完整电商搜索实战,对比不同查询方式的性能差异,提供索引设计、查询优化、集群监控等核心问题的解决方案。包含企业级配置模板、性能调优数据和故障排查手册。

1. 为什么选择 Elasticsearch?

1.1 从数据库的痛苦说起

MySQL 做搜索的典型问题如下:

-- MySQL 模糊查询
SELECT * FROM products WHERE name LIKE '%手机%' OR description LIKE '%手机%' OR tags LIKE '%手机%' ORDER BY create_time DESC LIMIT 100 OFFSET 0;

图 1:MySQL 搜索的问题 文章配图

MySQL 搜索的痛点:

  1. LIKE '%xxx%' 导致全表扫描
  2. 多字段 OR 查询性能极差
  3. 无法支持复杂评分排序
  4. 分词、同义词、拼音搜索不支持
1.2 Elasticsearch 的优势

ES 的倒排索引(Inverted Index)是核心:

// 倒排索引结构示例
public class InvertedIndex {
    // 词项 -> 文档列表
    Map<String, List<Posting>> index = new HashMap<>();
    // 文档 1: "华为手机很好用"
    // 文档 2: "小米手机性价比高"
    // 倒排索引:
    // "华为" -> [文档 1]
    // "手机" -> [文档 1, 文档 2]
    // "小米" -> [文档 2]
    // "很好用" -> [文档 1]
    // "性价比" -> [文档 2]
}

代码清单 2:倒排索引原理

搜索过程对比:

图 2:MySQL vs ES 搜索流程对比 文章配图

性能对比测试(1000 万商品数据):

场景MySQLElasticsearch性能差距
单字段模糊查询3200ms45ms71 倍
多字段 OR 查询8500ms65ms130 倍
复杂条件 + 排序12000ms85ms141 倍
内存占用4.2GB1.8GB57%

2. ES 核心架构解析

2.1 集群架构

图 3:ES 集群架构 文章配图

节点类型:

  • 主节点(Master):管理集群状态、分片分配
  • 数据节点(Data):存储数据、执行 CRUD
  • 协调节点(Coordinating):路由请求、聚合结果
  • 摄取节点(Ingest):数据预处理
2.2 索引与分片
// 索引创建示例
@Configuration
public class IndexConfig {
    public void createProductIndex(RestHighLevelClient client) throws IOException {
        CreateIndexRequest request = new CreateIndexRequest("products");
        // 索引设置
        request.settings(Settings.builder()
            .put("index.number_of_shards", 3) // 主分片数
            .put("index.number_of_replicas", 1) // 副本数
            .put("index.refresh_interval", "1s") // 刷新间隔
            .put("analysis.analyzer.default.type", "ik_max_word")); // 分词器
        // Mapping 定义
        XContentBuilder mapping = JsonXContent.contentBuilder()
            .startObject()
            .startObject("properties")
            .startObject("id")
            .field("type", "keyword")
            .endObject()
            .startObject("name")
            .field("type", "text")
            .field("analyzer", "ik_max_word")
            .field("search_analyzer", "ik_smart")
            .field("fields", JsonXContent.contentBuilder()
                .startObject()
                .startObject("pinyin")
                .field("type", "text")
                .field("analyzer", "pinyin_analyzer")
                .endObject()
                .startObject("keyword")
                .field("type", "keyword")
                .field("ignore_above", 256)
                .endObject()
                .endObject())
            .endObject()
            .startObject("price")
            .field("type", "double")
            .endObject()
            .startObject("category_id")
            .field("type", "integer")
            .endObject()
            .startObject("sales")
            .field("type", "integer")
            .endObject()
            .startObject("create_time")
            .field("type", "date")
            .field("format", "yyyy-MM-dd HH:mm:ss||epoch_millis")
            .endObject()
            .endObject()
            .endObject();
        request.mapping(mapping);
        CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
        if (response.isAcknowledged()) {
            log.info("索引创建成功");
        }
    }
}

代码清单 3:索引创建配置

分片原理:

图 4:分片与副本分布 文章配图

分片设计原则:

  1. 单个分片不超过 50GB
  2. 分片数 = 数据总量 / 50GB
  3. 副本数 = 节点数 - 1(至少 1 个)
  4. 避免过度分片(每个分片有开销)

3. Java 客户端实战

3.1 客户端选型对比
客户端优点缺点推荐场景
RestHighLevelClient官方维护,功能全笨重,API 复杂新项目,需要完整功能
Java Low Level Client轻量,灵活需要手动处理 JSON简单查询,性能敏感
Spring Data Elasticsearch简洁,集成 Spring版本兼容问题Spring Boot 项目
JestClient简单易用已停止维护不推荐新项目

我们的选择:新项目用 RestHighLevelClient,Spring Boot 项目用 Spring Data Elasticsearch。

3.2 RestHighLevelClient 配置
@Configuration
@Slf4j
public class ElasticsearchConfig {
    @Value("${elasticsearch.hosts:localhost:9200}")
    private String hosts;
    @Value("${elasticsearch.username:}")
    private String username;
    @Value("${elasticsearch.password:}")
    private String password;

    @Bean
    public RestHighLevelClient restHighLevelClient() {
        // 解析主机列表
        String[] hostArray = hosts.split(",");
        HttpHost[] httpHosts = new HttpHost[hostArray.length];
        for (int i = 0; i < hostArray.length; i++) {
            String[] hostPort = hostArray[i].split(":");
            httpHosts[i] = new HttpHost(
                hostPort[0].trim(), 
                Integer.parseInt(hostPort[1].trim()), 
                "http");
        }
        // 配置 Builder
        RestClientBuilder builder = RestClient.builder(httpHosts)
            .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
                .setConnectTimeout(5000) // 连接超时 5 秒
                .setSocketTimeout(60000) // Socket 超时 60 秒
                .setConnectionRequestTimeout(1000)) // 请求超时 1 秒
            .setHttpClientConfigCallback(httpClientBuilder -> {
                // 连接池配置
                httpClientBuilder.setMaxConnTotal(100) // 最大连接数
                    .setMaxConnPerRoute(50) // 每路由最大连接数
                    .setKeepAliveStrategy((response, context) -> 60000); // 保活 60 秒
                // 认证配置
                if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) {
                    CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                    credentialsProvider.setCredentials(
                        AuthScope.ANY, 
                        new UsernamePasswordCredentials(username, password));
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                }
                return httpClientBuilder;
            })
            .setFailureListener(new RestClient.FailureListener() {
                @Override
                public void onFailure(Node node) {
                    log.error("ES 节点连接失败:{}", node.getHost());
                }
            });
        return new RestHighLevelClient(builder);
    }

    // 健康检查
    @Scheduled(fixedDelay = 30000)
    public void checkClusterHealth() {
        try {
            ClusterHealthRequest request = new ClusterHealthRequest();
            ClusterHealthResponse response = restHighLevelClient().cluster().health(request, RequestOptions.DEFAULT);
            ClusterHealthStatus status = response.getStatus();
            if (status != ClusterHealthStatus.GREEN) {
                log.warn("ES 集群状态异常:{}, 详情:{}", status, response.toString());
                // 发送告警
                alertService.sendAlert("ES 集群状态异常", "状态:" + status + ", 详情:" + response.toString());
            }
        } catch (Exception e) {
            log.error("ES 集群健康检查失败", e);
        }
    }
}

代码清单 4:ES 客户端配置

3.3 Spring Data Elasticsearch 配置
@Configuration
@EnableElasticsearchRepositories(basePackages = "com.example.repository")
@Slf4j
public class SpringDataESConfig {
    @Value("${elasticsearch.hosts:localhost:9200}")
    private String hosts;

    @Bean
    public RestHighLevelClient elasticsearchClient() {
        // 同上面配置
        return restHighLevelClient();
    }

    @Bean
    public ElasticsearchRestTemplate elasticsearchTemplate() {
        return new ElasticsearchRestTemplate(elasticsearchClient());
    }

    // 实体类映射
    @Document(indexName = "products", createIndex = false)
    @Setting(settingPath = "/settings/product-settings.json")
    @Mapping(mappingPath = "/mappings/product-mapping.json")
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Product {
        @Id
        private String id;

        @Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
        @MultiField(mainField = @Field(type = FieldType.Text, analyzer = "ik_max_word"), otherFields = {
            @InnerField(suffix = "pinyin", type = FieldType.Text, analyzer = "pinyin"),
            @InnerField(suffix = "keyword", type = FieldType.Keyword)
        })
        private String name;

        @Field(type = FieldType.Double)
        private Double price;

        @Field(type = FieldType.Integer)
        private Integer categoryId;

        @Field(type = FieldType.Integer)
        private Integer sales;

        @Field(type = FieldType.Date, format = DateFormat.date_hour_minute_second)
        private Date createTime;

        // Geo 字段
        @GeoPointField
        private GeoPoint location;
    }

    // Repository 接口
    public interface ProductRepository extends ElasticsearchRepository<Product, String> {
        // 方法名查询
        List<Product> findByName(String name);
        List<Product> findByPriceBetween(Double minPrice, Double maxPrice);
        Page<Product> findByCategoryId(Integer categoryId, Pageable pageable);

        // @Query 注解
        @Query("{\"match\": {\"name\": \"?0\"}}")
        List<Product> findByNameCustom(String name);

        // 原生查询
        @Query("{\"bool\": {\"must\": [" +
            "{\"match\": {\"name\": \"?0\"}}," +
            "{\"range\": {\"price\": {\"gte\": ?1, \"lte\": ?2}}}" +
            "]}}")
        List<Product> searchByNameAndPriceRange(String name, Double minPrice, Double maxPrice);
    }
}

代码清单 5:Spring Data ES 配置

4. 索引设计最佳实践

4.1 索引生命周期管理
@Component
@Slf4j
public class IndexLifecycleManager {
    // 1. 按时间滚动索引
    public String getDailyIndex(String indexPrefix) {
        String date = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy.MM.dd"));
        return indexPrefix + "-" + date;
    }

    // 2. 创建带别名的索引
    public void createIndexWithAlias(String indexName, String alias) throws IOException {
        CreateIndexRequest request = new CreateIndexRequest(indexName);
        // 索引设置
        request.settings(getIndexSettings());
        // 创建索引
        CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
        if (response.isAcknowledged()) {
            // 添加别名
            IndicesAliasesRequest aliasRequest = new IndicesAliasesRequest();
            AliasActions aliasAction = new AliasActions(AliasActions.Type.ADD)
                .index(indexName)
                .alias(alias);
            aliasRequest.addAliasAction(aliasAction);
            client.indices().updateAliases(aliasRequest, RequestOptions.DEFAULT);
            log.info("创建索引成功:{},别名:{}", indexName, alias);
        }
    }

    // 3. 索引滚动策略
    @Scheduled(cron = "0 0 0 * * ?") // 每天零点执行
    public void rolloverIndex() throws IOException {
        String todayIndex = getDailyIndex("logs");
        String alias = "logs-current";
        // 检查别名指向的索引
        GetAliasesRequest request = new GetAliasesRequest(alias);
        GetAliasesResponse response = client.indices().getAlias(request, RequestOptions.DEFAULT);
        Map<String, Set<AliasMetadata>> aliases = response.getAliases();
        if (aliases.isEmpty()) {
            // 第一次创建
            createIndexWithAlias(todayIndex, alias);
        } else {
            // 检查当前索引大小
            String currentIndex = aliases.keySet().iterator().next();
            IndexStats stats = getIndexStats(currentIndex);
            // 如果超过 50GB 或创建超过 7 天,创建新索引
            if (stats.getStoreSizeInBytes() > 50L * 1024 * 1024 * 1024 || isIndexOlderThan(currentIndex, 7)) {
                // 创建新索引
                createIndexWithAlias(todayIndex, alias);
                // 切换别名
                switchAlias(alias, currentIndex, todayIndex);
                // 关闭旧索引
                closeOldIndex(currentIndex);
            }
        }
    }

    // 4. 索引关闭和删除
    public void manageIndexLifecycle() throws IOException {
        // 关闭 30 天前的索引
        closeIndicesOlderThan(30);
        // 删除 90 天前的索引
        deleteIndicesOlderThan(90);
        // 强制合并 segments
        forceMergeIndicesOlderThan(7);
    }

    // 5. 索引模板
    public void createIndexTemplate() throws IOException {
        PutIndexTemplateRequest request = new PutIndexTemplateRequest("logs-template");
        request.patterns(Arrays.asList("logs-*"));
        request.settings(getIndexSettings());
        request.mapping(getIndexMapping());
        // 别名配置
        request.alias(new Alias("logs-all"));
        // 优先级
        request.order(1);
        // 版本
        request.version(1);
        PutIndexTemplateResponse response = client.indices()
            .putTemplate(request, RequestOptions.DEFAULT);
        if (response.isAcknowledged()) {
            log.info("索引模板创建成功");
        }
    }
}

代码清单 6:索引生命周期管理

4.2 映射设计技巧
{
  "mappings": {
    "dynamic": "strict",
    // 禁止动态字段
    "_source": {
      "enabled": true,
      "excludes": ["big_field"]
    },
    "properties": {
      "id": {
        "type": "keyword",
        "ignore_above": 256
      },
      "title": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart",
        "fields": {
          "pinyin": {
            "type": "text",
            "analyzer": "pinyin_analyzer"
          },
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          },
          "completion": {
            "type": "completion",
            "analyzer": "simple"
          }
        }
      },
      "price": {
        "type": "scaled_float",
        "scaling_factor": 100
      },
      "location": {
        "type": "geo_point"
      },
      "tags": {
        "type": "text",
        "analyzer": "whitespace"
      },
      "attributes": {
        "type": "nested",
        // 嵌套类型
        "properties": {
          "key": {
            "type": "keyword"
          },
          "value": {
            "type": "keyword"
          }
        }
      },
      "vector": {
        "type": "dense_vector",
        // 向量字段
        "dims": 128
      }
    }
  }
}

代码清单 7:映射设计示例

5. 查询优化实战

5.1 查询类型对比
@Service
@Slf4j
public class ProductSearchService {
    // 1. Match 查询(最常用)
    public SearchResponse searchByMatch(String keyword) throws IOException {
        SearchRequest request = new SearchRequest("products");
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query(QueryBuilders.matchQuery("name", keyword)
            .analyzer("ik_smart") // 指定搜索分词器
            .operator(Operator.AND) // 必须包含所有词
            .minimumShouldMatch("75%")); // 至少匹配 75%
        sourceBuilder.from(0);
        sourceBuilder.size(20);
        sourceBuilder.sort(SortBuilders.scoreSort());
        request.source(sourceBuilder);
        return client.search(request, RequestOptions.DEFAULT);
    }

    // 2. MultiMatch 查询(多字段)
    public SearchResponse searchByMultiMatch(String keyword) throws IOException {
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query(QueryBuilders.multiMatchQuery(keyword)
            .fields("name^3", "name.pinyin^2", "description") // 权重设置
            .type(MultiMatchQueryBuilder.Type.BEST_FIELDS) // 最佳字段策略
            .tieBreaker(0.3f)); // 其他字段权重
        return executeSearch(sourceBuilder);
    }

    // 3. Term 查询(精确匹配)
    public SearchResponse searchByTerm(String category) throws IOException {
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query(QueryBuilders.termQuery("category.keyword", category));
        return executeSearch(sourceBuilder);
    }

    // 4. Bool 查询(组合查询)
    public SearchResponse searchByBool(String keyword, Double minPrice, Double maxPrice, String category) throws IOException {
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        // MUST: 必须匹配
        if (StringUtils.isNotBlank(keyword)) {
            boolQuery.must(QueryBuilders.matchQuery("name", keyword));
        }
        // FILTER: 过滤,不计算分数
        if (minPrice != null || maxPrice != null) {
            RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("price");
            if (minPrice != null) {
                rangeQuery.gte(minPrice);
            }
            if (maxPrice != null) {
                rangeQuery.lte(maxPrice);
            }
            boolQuery.filter(rangeQuery);
        }
        // SHOULD: 应该匹配(影响分数)
        if (StringUtils.isNotBlank(category)) {
            boolQuery.should(QueryBuilders.termQuery("category.keyword", category))
                .minimumShouldMatch(1);
        }
        // MUST_NOT: 必须不匹配
        boolQuery.mustNot(QueryBuilders.termQuery("status", "deleted"));
        sourceBuilder.query(boolQuery);
        return executeSearch(sourceBuilder);
    }

    // 5. 聚合查询
    public SearchResponse searchWithAggregation() throws IOException {
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        // 查询所有
        sourceBuilder.query(QueryBuilders.matchAllQuery());
        sourceBuilder.size(0); // 不返回 hits,只返回聚合
        // 价格区间聚合
        AggregationBuilder priceAgg = AggregationBuilders
            .range("price_ranges")
            .field("price")
            .addRange(0, 100)
            .addRange(100, 500)
            .addRange(500, 1000)
            .addRange(1000, 5000)
            .addRange(5000, Double.MAX_VALUE);
        // 品类聚合
        AggregationBuilder categoryAgg = AggregationBuilders
            .terms("categories")
            .field("category.keyword")
            .size(10);
        // 嵌套聚合
        AggregationBuilder nestedAgg = AggregationBuilders
            .nested("attributes", "attributes")
            .subAggregation(AggregationBuilders
                .terms("attr_keys")
                .field("attributes.key")
                .size(10));
        sourceBuilder.aggregation(priceAgg);
        sourceBuilder.aggregation(categoryAgg);
        sourceBuilder.aggregation(nestedAgg);
        return executeSearch(sourceBuilder);
    }

    // 6. 搜索建议
    public SuggestResponse searchSuggest(String prefix) throws IOException {
        SearchRequest request = new SearchRequest("products");
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.size(0);
        // Completion Suggester
        CompletionSuggestionBuilder suggestion = SuggestBuilders.completionSuggestion("name.completion")
            .prefix(prefix, Fuzziness.AUTO)
            .skipDuplicates(true)
            .size(10);
        SuggestBuilder suggestBuilder = new SuggestBuilder();
        suggestBuilder.addSuggestion("product_suggest", suggestion);
        sourceBuilder.suggest(suggestBuilder);
        request.source(sourceBuilder);
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        return response.getSuggest();
    }
}

代码清单 8:各种查询方式实现

5.2 性能优化技巧
@Component
@Slf4j
public class QueryOptimizer {
    // 1. 分页优化
    public SearchResponse searchWithPagination(String keyword, int page, int size) throws IOException {
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        // 常规分页(深度分页性能差)
        // sourceBuilder.from((page - 1) * size).size(size);
        // 推荐:Search After 分页
        if (page > 1) {
            // 获取上一页最后一条的 sort 值
            Object[] searchAfter = getSearchAfterValues(page - 1, size);
            sourceBuilder.searchAfter(searchAfter);
        }
        sourceBuilder.size(size);
        sourceBuilder.sort(SortBuilders.fieldSort("_id")); // 必须有排序
        return executeSearch(sourceBuilder);
    }

    // 2. 查询重写
    public SearchResponse optimizedSearch(String keyword) throws IOException {
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        // 原来的 wildcard 查询(性能极差)
        // QueryBuilders.wildcardQuery("name", "*" + keyword + "*");
        // 优化方案 1:使用 ngram
        QueryBuilders.matchQuery("name.ngram", keyword);
        // 优化方案 2:使用前缀查询
        if (keyword.length() < 10) {
            QueryBuilders.prefixQuery("name.keyword", keyword);
        }
        // 优化方案 3:使用 edge ngram
        QueryBuilders.matchQuery("name.edge_ngram", keyword);
        return executeSearch(sourceBuilder);
    }

    // 3. 过滤器缓存
    public SearchResponse searchWithFilterCache() throws IOException {
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        // 经常过滤的条件用 filter,可以被缓存
        boolQuery.filter(QueryBuilders.termQuery("status", "active"));
        boolQuery.filter(QueryBuilders.rangeQuery("price")
            .gte(100).lte(1000));
        // 搜索条件
        boolQuery.must(QueryBuilders.matchQuery("name", "手机"));
        sourceBuilder.query(boolQuery);
        return executeSearch(sourceBuilder);
    }

    // 4. 字段数据加载
    public SearchResponse searchWithFieldData() throws IOException {
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        // 避免在 text 字段上排序
        // sourceBuilder.sort(SortBuilders.fieldSort("name")); // 错误
        // 使用 keyword 子字段排序
        sourceBuilder.sort(SortBuilders.fieldSort("name.keyword"));
        // 或者使用 doc values
        sourceBuilder.docValueField("name.keyword");
        return executeSearch(sourceBuilder);
    }

    // 5. 查询超时设置
    public SearchResponse searchWithTimeout(String keyword, long timeoutMs) throws IOException {
        SearchRequest request = new SearchRequest("products");
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.timeout(new TimeValue(timeoutMs, TimeUnit.MILLISECONDS));
        sourceBuilder.query(QueryBuilders.matchQuery("name", keyword));
        request.source(sourceBuilder);
        // 设置请求超时
        RequestOptions.Builder options = RequestOptions.DEFAULT.toBuilder();
        options.setHttpAsyncResponseConsumerFactory(
            new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024));
        return client.search(request, options.build());
    }

    // 6. 并行查询
    public List<SearchResponse> parallelSearch(List<String> keywords) throws InterruptedException, ExecutionException {
        List<CompletableFuture<SearchResponse>> futures = new ArrayList<>();
        for (String keyword : keywords) {
            CompletableFuture<SearchResponse> future = CompletableFuture.supplyAsync(() -> {
                try {
                    return searchByMatch(keyword);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }, executor);
            futures.add(future);
        }
        // 等待所有查询完成
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        allFutures.get(); // 等待完成
        return futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
    }
}

代码清单 9:查询性能优化

6. 批量操作与实时性

6.1 Bulk 批量操作
@Component
@Slf4j
public class BulkOperationService {
    // 1. 简单的 Bulk 操作
    public void bulkIndexProducts(List<Product> products) throws IOException {
        BulkRequest request = new BulkRequest();
        for (Product product : products) {
            IndexRequest indexRequest = new IndexRequest("products")
                .id(product.getId())
                .source(JsonUtils.toJson(product), XContentType.JSON);
            request.add(indexRequest);
        }
        BulkResponse response = client.bulk(request, RequestOptions.DEFAULT);
        if (response.hasFailures()) {
            log.error("Bulk 操作失败:{}", response.buildFailureMessage());
            // 处理失败项
            for (BulkItemResponse item : response.getItems()) {
                if (item.isFailed()) {
                    log.error("文档{}失败:{}", item.getId(), item.getFailureMessage());
                    // 重试逻辑
                    retryFailedItem(item);
                }
            }
        } else {
            log.info("Bulk 操作成功,处理{}个文档", products.size());
        }
    }

    // 2. 带回调的 Bulk
    public void bulkWithListener(List<Product> products) {
        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                log.info("开始执行 Bulk,包含{}个请求", request.numberOfActions());
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                if (response.hasFailures()) {
                    log.error("Bulk 执行失败:{}", response.buildFailureMessage());
                } else {
                    log.info("Bulk 执行成功,耗时{}ms", response.getTook().getMillis());
                }
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                log.error("Bulk 执行异常", failure);
            }
        };
        BulkProcessor.Builder builder = BulkProcessor.builder(
            (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
            listener);
        // 配置
        builder.setBulkActions(1000) // 每 1000 个请求执行一次
            .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) // 每 5MB 执行一次
            .setFlushInterval(TimeValue.timeValueSeconds(5)) // 每 5 秒执行一次
            .setConcurrentRequests(2) // 并发数
            .setBackoffPolicy(BackoffPolicy
                .exponentialBackoff(TimeValue.timeValueMillis(100), 3));
        BulkProcessor processor = builder.build();
        // 添加文档
        for (Product product : products) {
            IndexRequest request = new IndexRequest("products")
                .id(product.getId())
                .source(JsonUtils.toJson(product), XContentType.JSON);
            processor.add(request);
        }
        // 关闭 processor
        processor.awaitClose(30, TimeUnit.SECONDS);
    }

    // 3. 更新操作
    public void bulkUpdateProducts(List<Product> products) throws IOException {
        BulkRequest request = new BulkRequest();
        for (Product product : products) {
            UpdateRequest updateRequest = new UpdateRequest("products", product.getId())
                .doc(JsonUtils.toJson(product), XContentType.JSON)
                .docAsUpsert(true); // 如果不存在则插入
            request.add(updateRequest);
        }
        client.bulk(request, RequestOptions.DEFAULT);
    }

    // 4. 删除操作
    public void bulkDeleteProducts(List<String> ids) throws IOException {
        BulkRequest request = new BulkRequest();
        for (String id : ids) {
            DeleteRequest deleteRequest = new DeleteRequest("products", id);
            request.add(deleteRequest);
        }
        client.bulk(request, RequestOptions.DEFAULT);
    }

    // 5. 批量查询
    public MultiGetResponse bulkGetProducts(List<String> ids) throws IOException {
        MultiGetRequest request = new MultiGetRequest();
        for (String id : ids) {
            request.add(new MultiGetRequest.Item("products", id));
        }
        return client.mget(request, RequestOptions.DEFAULT);
    }
}

代码清单 10:批量操作实现

6.2 实时性控制
@Component
@Slf4j
public class RealtimeControlService {
    // 1. 立即刷新
    public void indexWithRefresh(Product product) throws IOException {
        IndexRequest request = new IndexRequest("products")
            .id(product.getId())
            .source(JsonUtils.toJson(product), XContentType.JSON)
            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        // 立即刷新
        client.index(request, RequestOptions.DEFAULT);
    }

    // 2. 等待刷新
    public void indexWithWaitFor(Product product) throws IOException {
        IndexRequest request = new IndexRequest("products")
            .id(product.getId())
            .source(JsonUtils.toJson(product), XContentType.JSON)
            .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
        // 等待刷新
        client.index(request, RequestOptions.DEFAULT);
    }

    // 3. 手动刷新
    public void manualRefresh() throws IOException {
        RefreshRequest request = new RefreshRequest("products");
        RefreshResponse response = client.indices().refresh(request, RequestOptions.DEFAULT);
        log.info("手动刷新完成,分片数:{}", response.getTotalShards());
    }

    // 4. 强制合并
    public void forceMerge() throws IOException {
        ForceMergeRequest request = new ForceMergeRequest("products");
        request.maxNumSegments(1); // 合并为 1 个 segment
        request.onlyExpungeDeletes(true); // 只清理删除的文档
        ForceMergeResponse response = client.indices().forcemerge(request, RequestOptions.DEFAULT);
        log.info("强制合并完成,分片数:{}", response.getTotalShards());
    }

    // 5. 刷新间隔设置
    public void updateRefreshInterval(int seconds) throws IOException {
        UpdateSettingsRequest request = new UpdateSettingsRequest("products");
        Settings settings = Settings.builder()
            .put("index.refresh_interval", seconds + "s")
            .build();
        request.settings(settings);
        AcknowledgedResponse response = client.indices()
            .putSettings(request, RequestOptions.DEFAULT);
        if (response.isAcknowledged()) {
            log.info("更新刷新间隔成功:{}秒", seconds);
        }
    }

    // 6. 实时搜索方案
    public SearchResponse realtimeSearch(String keyword) throws IOException {
        // 先搜索
        SearchResponse response = searchByMatch(keyword);
        // 如果没结果,等待并重试
        if (response.getHits().getTotalHits().value == 0) {
            try {
                Thread.sleep(1000); // 等待 1 秒
                response = searchByMatch(keyword);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        return response;
    }
}

代码清单 11:实时性控制

7. 企业级实战案例

7.1 电商商品搜索系统
@Service
@Slf4j
public class ECommerceSearchService {
    // 商品搜索
    public SearchResult<Product> searchProducts(ProductSearchRequest request) throws IOException {
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        // 构建 Bool 查询
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        // 关键词搜索
        if (StringUtils.isNotBlank(request.getKeyword())) {
            handleKeywordSearch(boolQuery, request.getKeyword());
        }
        // 分类过滤
        if (request.getCategoryId() != null) {
            boolQuery.filter(QueryBuilders.termQuery("category_id", request.getCategoryId()));
        }
        // 价格区间
        if (request.getMinPrice() != null || request.getMaxPrice() != null) {
            RangeQueryBuilder priceRange = QueryBuilders.rangeQuery("price");
            if (request.getMinPrice() != null) {
                priceRange.gte(request.getMinPrice());
            }
            if (request.getMaxPrice() != null) {
                priceRange.lte(request.getMaxPrice());
            }
            boolQuery.filter(priceRange);
        }
        // 品牌过滤
        if (CollectionUtils.isNotEmpty(request.getBrandIds())) {
            boolQuery.filter(QueryBuilders.termsQuery("brand_id", request.getBrandIds()));
        }
        // 属性过滤
        if (CollectionUtils.isNotEmpty(request.getAttributes())) {
            handleAttributesFilter(boolQuery, request.getAttributes());
        }
        // 地理位置过滤
        if (request.getLocation() != null && request.getDistance() != null) {
            handleGeoFilter(boolQuery, request.getLocation(), request.getDistance());
        }
        sourceBuilder.query(boolQuery);
        // 排序
        handleSorting(sourceBuilder, request.getSortBy(), request.getSortOrder());
        // 分页
        sourceBuilder.from((request.getPage() - 1) * request.getSize())
            .size(request.getSize());
        // 高亮
        if (StringUtils.isNotBlank(request.getKeyword())) {
            HighlightBuilder highlightBuilder = new HighlightBuilder();
            highlightBuilder.field("name")
                .preTags("<em>")
                .postTags("</em>")
                .fragmentSize(200)
                .numOfFragments(3);
            sourceBuilder.highlighter(highlightBuilder);
        }
        // 聚合
        addAggregations(sourceBuilder);
        // 执行搜索
        SearchResponse response = executeSearch(sourceBuilder);
        // 处理结果
        return processSearchResult(response, request);
    }

    private void handleKeywordSearch(BoolQueryBuilder boolQuery, String keyword) {
        // 多种搜索方式组合
        BoolQueryBuilder keywordQuery = QueryBuilders.boolQuery();
        // 1. 精确匹配(最高权重)
        keywordQuery.should(QueryBuilders.matchPhraseQuery("name", keyword)
            .boost(3.0f));
        // 2. 拼音匹配
        keywordQuery.should(QueryBuilders.matchQuery("name.pinyin", keyword)
            .boost(2.0f));
        // 3. 分词匹配
        keywordQuery.should(QueryBuilders.matchQuery("name", keyword)
            .boost(1.0f));
        // 4. 描述匹配
        keywordQuery.should(QueryBuilders.matchQuery("description", keyword)
            .boost(0.5f));
        // 5. 标签匹配
        keywordQuery.should(QueryBuilders.matchQuery("tags", keyword)
            .boost(0.3f));
        boolQuery.must(keywordQuery);
    }

    private void handleAttributesFilter(BoolQueryBuilder boolQuery, List<AttributeFilter> attributes) {
        for (AttributeFilter attr : attributes) {
            NestedQueryBuilder nestedQuery = QueryBuilders.nestedQuery(
                "attributes", 
                QueryBuilders.boolQuery()
                    .must(QueryBuilders.termQuery("attributes.key", attr.getKey()))
                    .must(QueryBuilders.termsQuery("attributes.value", attr.getValues())), 
                ScoreMode.None);
            boolQuery.filter(nestedQuery);
        }
    }

    private void handleGeoFilter(BoolQueryBuilder boolQuery, GeoPoint location, String distance) {
        GeoDistanceQueryBuilder geoQuery = QueryBuilders.geoDistanceQuery("location")
            .point(location.getLat(), location.getLon())
            .distance(distance);
        boolQuery.filter(geoQuery);
    }

    private void handleSorting(SearchSourceBuilder sourceBuilder, String sortBy, String sortOrder) {
        if ("price".equals(sortBy)) {
            sourceBuilder.sort("price", "desc".equals(sortOrder) ? SortOrder.DESC : SortOrder.ASC);
        } else if ("sales".equals(sortBy)) {
            sourceBuilder.sort("sales", SortOrder.DESC);
        } else if ("score".equals(sortBy)) {
            sourceBuilder.sort(SortBuilders.scoreSort());
        } else {
            // 综合排序:分数*0.6 + 销量*0.3 + 价格*0.1
            Script script = new Script(
                "doc['_score'].value * 0.6 + " +
                "doc['sales'].value * 0.3 + " +
                "(10000 - doc['price'].value) * 0.1");
            sourceBuilder.sort(SortBuilders.scriptSort(script, ScriptSortBuilder.ScriptSortType.NUMBER));
        }
    }

    private void addAggregations(SearchSourceBuilder sourceBuilder) {
        // 价格区间聚合
        AggregationBuilder priceAgg = AggregationBuilders
            .range("price_agg")
            .field("price")
            .addRange(0, 100)
            .addRange(100, 300)
            .addRange(300, 500)
            .addRange(500, 1000)
            .addRange(1000, 5000);
        // 品牌聚合
        AggregationBuilder brandAgg = AggregationBuilders
            .terms("brand_agg")
            .field("brand_id")
            .size(20);
        // 分类聚合
        AggregationBuilder categoryAgg = AggregationBuilders
            .terms("category_agg")
            .field("category_id")
            .size(10);
        // 属性聚合
        AggregationBuilder attributeAgg = AggregationBuilders
            .nested("attributes_agg", "attributes")
            .subAggregation(AggregationBuilders
                .terms("attribute_keys")
                .field("attributes.key")
                .size(10)
                .subAggregation(AggregationBuilders
                    .terms("attribute_values")
                    .field("attributes.value")
                    .size(10)));
        sourceBuilder.aggregation(priceAgg);
        sourceBuilder.aggregation(brandAgg);
        sourceBuilder.aggregation(categoryAgg);
        sourceBuilder.aggregation(attributeAgg);
    }

    private SearchResult<Product> processSearchResult(SearchResponse response, ProductSearchRequest request) {
        SearchResult<Product> result = new SearchResult<>();
        // 总条数
        result.setTotal(response.getHits().getTotalHits().value);
        // 当前页数据
        List<Product> products = new ArrayList<>();
        for (SearchHit hit : response.getHits()) {
            Product product = JsonUtils.fromJson(hit.getSourceAsString(), Product.class);
            product.setScore(hit.getScore());
            // 高亮处理
            if (hit.getHighlightFields() != null) {
                HighlightField highlight = hit.getHighlightFields().get("name");
                if (highlight != null && highlight.getFragments().length > 0) {
                    product.setHighlightName(highlight.getFragments()[0].string());
                }
            }
            products.add(product);
        }
        result.setData(products);
        // 聚合结果
        processAggregations(result, response.getAggregations());
        // 搜索建议
        if (StringUtils.isNotBlank(request.getKeyword()) && products.isEmpty()) {
            result.setSuggestions(getSuggestions(request.getKeyword()));
        }
        return result;
    }
}

代码清单 12:电商商品搜索系统

7.2 日志分析系统
@Service
@Slf4j
public class LogAnalysisService {
    // 日志搜索
    public LogSearchResult searchLogs(LogSearchRequest request) throws IOException {
        String index = getLogIndex(request.getStartTime(), request.getEndTime());
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        // 时间范围查询
        RangeQueryBuilder timeRange = QueryBuilders.rangeQuery("@timestamp")
            .gte(request.getStartTime())
            .lte(request.getEndTime())
            .format("yyyy-MM-dd HH:mm:ss");
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
            .filter(timeRange);
        // 关键词搜索
        if (StringUtils.isNotBlank(request.getKeyword())) {
            boolQuery.must(QueryBuilders.queryStringQuery(request.getKeyword())
                .field("message")
                .field("level")
                .field("logger_name")
                .defaultOperator(Operator.AND));
        }
        // 级别过滤
        if (CollectionUtils.isNotEmpty(request.getLevels())) {
            boolQuery.filter(QueryBuilders.termsQuery("level", request.getLevels()));
        }
        // 应用过滤
        if (StringUtils.isNotBlank(request.getApplication())) {
            boolQuery.filter(QueryBuilders.termQuery("application", request.getApplication()));
        }
        sourceBuilder.query(boolQuery);
        // 排序
        sourceBuilder.sort(SortBuilders.fieldSort("@timestamp")
            .order(SortOrder.DESC));
        // 分页
        sourceBuilder.from((request.getPage() - 1) * request.getSize())
            .size(request.getSize());
        // 聚合
        addLogAggregations(sourceBuilder, request);
        // 执行搜索
        SearchResponse response = executeSearch(index, sourceBuilder);
        return processLogSearchResult(response, request);
    }

    // 错误日志统计
    public ErrorStats getErrorStats(Date startTime, Date endTime, String application) throws IOException {
        String index = getLogIndex(startTime, endTime);
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        // 时间范围
        RangeQueryBuilder timeRange = QueryBuilders.rangeQuery("@timestamp")
            .gte(startTime)
            .lte(endTime);
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
            .filter(timeRange)
            .filter(QueryBuilders.termQuery("level", "ERROR"));
        if (StringUtils.isNotBlank(application)) {
            boolQuery.filter(QueryBuilders.termQuery("application", application));
        }
        sourceBuilder.query(boolQuery);
        sourceBuilder.size(0);
        // 错误数量聚合
        AggregationBuilder countAgg = AggregationBuilders
            .cardinality("error_count")
            .field("trace_id");
        // 按应用分组
        AggregationBuilder appAgg = AggregationBuilders
            .terms("by_application")
            .field("application")
            .size(20)
            .subAggregation(AggregationBuilders
                .cardinality("app_error_count")
                .field("trace_id"));
        // 按时间分组
        AggregationBuilder timeAgg = AggregationBuilders
            .dateHistogram("by_time")
            .field("@timestamp")
            .calendarInterval(DateHistogramInterval.HOUR)
            .format("yyyy-MM-dd HH:00")
            .minDocCount(0)
            .extendedBounds(
                new LongBounds(startTime.getTime(), endTime.getTime()))
            .subAggregation(AggregationBuilders
                .cardinality("hour_error_count")
                .field("trace_id"));
        sourceBuilder.aggregation(countAgg);
        sourceBuilder.aggregation(appAgg);
        sourceBuilder.aggregation(timeAgg);
        SearchResponse response = executeSearch(index, sourceBuilder);
        return processErrorStats(response);
    }

    // 慢查询分析
    public SlowQueryStats analyzeSlowQueries(Date startTime, Date endTime) throws IOException {
        String index = getLogIndex(startTime, endTime);
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        // 查找慢查询日志
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
            .filter(QueryBuilders.rangeQuery("@timestamp")
                .gte(startTime)
                .lte(endTime))
            .filter(QueryBuilders.termQuery("logger_name", "slow_query"))
            .filter(QueryBuilders.rangeQuery("duration").gte(1000)); // 超过 1 秒
        sourceBuilder.query(boolQuery);
        sourceBuilder.size(0);
        // 按 SQL 类型分组
        AggregationBuilder sqlTypeAgg = AggregationBuilders
            .terms("by_sql_type")
            .field("sql_type")
            .size(10)
            .subAggregation(AggregationBuilders
                .avg("avg_duration")
                .field("duration"))
            .subAggregation(AggregationBuilders
                .max("max_duration")
                .field("duration"))
            .subAggregation(AggregationBuilders
                .percentiles("duration_percentiles")
                .field("duration")
                .percentiles(50, 90, 95, 99));
        // 按表分组
        AggregationBuilder tableAgg = AggregationBuilders
            .terms("by_table")
            .field("table_name")
            .size(20)
            .subAggregation(AggregationBuilders
                .avg("avg_duration")
                .field("duration"));
        sourceBuilder.aggregation(sqlTypeAgg);
        sourceBuilder.aggregation(tableAgg);
        SearchResponse response = executeSearch(index, sourceBuilder);
        return processSlowQueryStats(response);
    }
}

代码清单 13:日志分析系统

8. 性能优化与监控

8.1 性能调优
@Component
@Slf4j
public class PerformanceTuner {
    // 1. JVM 调优
    public void tuneJvm() {
        // ES 推荐 JVM 配置
        // -Xms4g -Xmx4g
        // 堆内存,不超过 32GB
        // -XX:+UseG1GC
        // G1 垃圾回收器
        // -XX:MaxGCPauseMillis=200
        // -XX:G1ReservePercent=25
        // -XX:InitiatingHeapOccupancyPercent=30
    }

    // 2. 索引设置优化
    public Settings getOptimizedSettings() {
        return Settings.builder()
            .put("index.number_of_shards", 3) // 根据数据量调整
            .put("index.number_of_replicas", 1)
            .put("index.refresh_interval", "30s") // 写入频繁时调大
            .put("index.translog.durability", "async") // 异步 translog
            .put("index.translog.sync_interval", "5s")
            .put("index.translog.flush_threshold_size", "512mb")
            .put("index.merge.scheduler.max_thread_count", 1) // 机械硬盘
            .put("index.merge.scheduler.max_merge_count", 6)
            .put("index.unassigned.node_left.delayed_timeout", "5m")
            .build();
    }

    // 3. 查询调优
    public SearchSourceBuilder tuneSearch(SearchSourceBuilder builder) {
        // 启用查询缓存
        builder.query(QueryBuilders.boolQuery()
            .must(QueryBuilders.matchQuery("name", "手机"))
            .filter(QueryBuilders.termQuery("status", "active")))
            .requestCache(true); // 开启查询缓存
        // 设置超时
        builder.timeout(TimeValue.timeValueSeconds(5));
        // 限制返回字段
        builder.fetchSource(new String[]{"id", "name", "price"}, null);
        // 禁用评分
        builder.query(QueryBuilders.constantScoreQuery(
            QueryBuilders.termQuery("category", "electronics")));
        return builder;
    }

    // 4. 监控指标收集
    @Scheduled(fixedDelay = 60000)
    public void collectMetrics() throws IOException {
        // 集群健康
        ClusterHealthRequest healthRequest = new ClusterHealthRequest();
        ClusterHealthResponse healthResponse = client.cluster()
            .health(healthRequest, RequestOptions.DEFAULT);
        // 节点状态
        NodesStatsRequest nodesRequest = new NodesStatsRequest();
        nodesRequest.indices(true);
        nodesRequest.os(true);
        nodesRequest.jvm(true);
        nodesRequest.threadPool(true);
        NodesStatsResponse nodesResponse = client.nodes()
            .stats(nodesRequest, RequestOptions.DEFAULT);
        // 索引状态
        IndicesStatsRequest indicesRequest = new IndicesStatsRequest();
        indicesRequest.all();
        IndicesStatsResponse indicesResponse = client.indices()
            .stats(indicesRequest, RequestOptions.DEFAULT);
        // 发送到监控系统
        sendToMonitoringSystem(healthResponse, nodesResponse, indicesResponse);
        // 检查是否需要扩容
        checkAndScale(healthResponse, nodesResponse, indicesResponse);
    }

    // 5. 热点分片识别
    public void identifyHotShards() throws IOException {
        IndicesStatsRequest request = new IndicesStatsRequest();
        request.all();
        IndicesStatsResponse response = client.indices()
            .stats(request, RequestOptions.DEFAULT);
        Map<String, IndicesStats> indices = response.getIndices();
        for (Map.Entry<String, IndicesStats> entry : indices.entrySet()) {
            String index = entry.getKey();
            IndicesStats stats = entry.getValue();
            ShardStats[] shardStats = stats.getShards();
            for (ShardStats shardStat : shardStats) {
                // 检查查询负载
                if (shardStat.getStats().getSearch().getTotal().getQueryCount() > 10000) {
                    // 阈值
                    log.warn("热点分片:{}/{},查询次数:{}", index, shardStat.getShardRouting().getId(), shardStat.getStats().getSearch().getTotal().getQueryCount());
                    // 触发分片重分配
                    rerouteHotShard(index, shardStat);
                }
            }
        }
    }
}

代码清单 14:性能调优

8.2 监控告警
# prometheus 配置
scrape_configs:
  - job_name: 'elasticsearch'
    static_configs:
      - targets: ['localhost:9200']
    metrics_path: '/_prometheus/metrics'

# 告警规则
alerting_rules:
  - alert: ClusterHealthRed
    expr: elasticsearch_cluster_health_status{color="red"} == 1
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "ES 集群状态为 RED"
  - alert: HighCpuUsage
    expr: rate(elasticsearch_process_cpu_percent[5m]) > 0.8
    for: 2m
    labels:
      severity: warning
    annotations:
      summary: "ES 节点 CPU 使用率过高"
  - alert: HighHeapUsage
    expr: elasticsearch_jvm_memory_used_bytes / elasticsearch_jvm_memory_max_bytes > 0.8
    for: 2m
    labels:
      severity: warning
    annotations:
      summary: "ES 节点堆内存使用率过高"
  - alert: HighDiskUsage
    expr: elasticsearch_filesystem_data_used_bytes / elasticsearch_filesystem_data_size_bytes > 0.8
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "ES 节点磁盘使用率过高"
  - alert: HighSearchLatency
    expr: histogram_quantile(0.95, rate(elasticsearch_indices_search_query_time_seconds_bucket[5m])) > 1
    for: 2m
    labels:
      severity: warning
    annotations:
      summary: "ES 搜索延迟过高"
  - alert: UnassignedShards
    expr: elasticsearch_cluster_health_unassigned_shards > 0
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "ES 有未分配的分片"

代码清单 15:监控告警配置

9. 故障排查指南

9.1 常见问题排查
@Component
@Slf4j
public class TroubleshootingGuide {
    // 1. 搜索慢
    public void diagnoseSlowSearch(String index, String query) throws IOException {
        // 启用 profile
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query(QueryBuilders.matchQuery("name", query));
        sourceBuilder.profile(true);
        SearchRequest request = new SearchRequest(index);
        request.source(sourceBuilder);
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        // 分析 profile 结果
        Map<String, ProfileShardResult> profileResults = response.getProfileResults();
        for (Map.Entry<String, ProfileShardResult> entry : profileResults.entrySet()) {
            log.info("分片{}的 profile 结果:", entry.getKey());
            for (QueryProfileShardResult queryProfile : entry.getValue().getQueryProfileResults()) {
                log.info("查询类型:{}, 耗时:{}ms", queryProfile.getQueryName(), queryProfile.getTime());
                // 打印详细信息
                for (ProfileResult profileResult : queryProfile.getQueryResults()) {
                    log.info("详细信息:{}", profileResult);
                }
            }
        }
    }

    // 2. 写入慢
    public void diagnoseSlowIndexing(String index) throws IOException {
        // 检查 refresh 间隔
        GetSettingsRequest settingsRequest = new GetSettingsRequest()
            .indices(index)
            .names("index.refresh_interval");
        GetSettingsResponse settingsResponse = client.indices()
            .getSettings(settingsRequest, RequestOptions.DEFAULT);
        log.info("refresh 间隔:{}", settingsResponse.getSetting(index, "index.refresh_interval"));
        // 检查 translog
        IndicesStatsRequest statsRequest = new IndicesStatsRequest()
            .indices(index)
            .clear()
            .docs(true)
            .store(true)
            .indexing(true)
            .search(true);
        IndicesStatsResponse statsResponse = client.indices()
            .stats(statsRequest, RequestOptions.DEFAULT);
        IndexStats indexStats = statsResponse.getIndex(index);
        log.info("索引统计:{}", indexStats);
    }

    // 3. 内存高
    public void diagnoseHighMemory() throws IOException {
        NodesStatsRequest request = new NodesStatsRequest();
        request.jvm(true);
        NodesStatsResponse response = client.nodes()
            .stats(request, RequestOptions.DEFAULT);
        for (NodeStats nodeStats : response.getNodes()) {
            JvmStats jvmStats = nodeStats.getJvm();
            JvmStats.Mem mem = jvmStats.getMem();
            log.info("节点{}内存使用:{}/{}, GC 次数:{}", nodeStats.getNode().getName(), mem.getUsed(), mem.getHeapMax(), jvmStats.getGc().getCollectors().get("young").getCollectionCount());
            // 检查 fielddata
            if (nodeStats.getIndices().getFieldData() != null) {
                log.info("fielddata 内存:{}", nodeStats.getIndices().getFieldData().getMemorySize());
            }
        }
    }

    // 4. 分片未分配
    public void diagnoseUnassignedShards() throws IOException {
        ClusterAllocationExplainRequest request = new ClusterAllocationExplainRequest();
        ClusterAllocationExplainResponse response = client.cluster()
            .allocationExplain(request, RequestOptions.DEFAULT);
        log.info("分片分配解释:{}", response.getExplanation());
        // 检查磁盘空间
        NodesStatsRequest nodesRequest = new NodesStatsRequest();
        nodesRequest.fs(true);
        NodesStatsResponse nodesResponse = client.nodes()
            .stats(nodesRequest, RequestOptions.DEFAULT);
        for (NodeStats nodeStats : nodesResponse.getNodes()) {
            FsInfo fsInfo = nodeStats.getFs();
            for (FsInfo.Path path : fsInfo) {
                log.info("节点{}磁盘使用:{}/{} ({}%)", nodeStats.getNode().getName(), path.getAvailable(), path.getTotal(), (path.getTotal() - path.getAvailable()) * 100 / path.getTotal());
            }
        }
    }

    // 5. 热点节点
    public void diagnoseHotNodes() throws IOException {
        NodesStatsRequest request = new NodesStatsRequest();
        request.indices(true);
        request.os(true);
        NodesStatsResponse response = client.nodes()
            .stats(request, RequestOptions.DEFAULT);
        for (NodeStats nodeStats : response.getNodes()) {
            // 检查查询负载
            long queryCount = nodeStats.getIndices().getSearch()
                .getTotal().getQueryCount();
            // 检查索引负载
            long indexCount = nodeStats.getIndices().getIndexing()
                .getTotal().getIndexCount();
            // 检查 CPU
            double cpuPercent = nodeStats.getOs().getCpu().getPercent();
            log.info("节点{}负载 - 查询:{}, 索引:{}, CPU: {}%", nodeStats.getNode().getName(), queryCount, indexCount, cpuPercent);
            if (cpuPercent > 80 || queryCount > 10000) {
                log.warn("节点{}可能是热点节点", nodeStats.getNode().getName());
            }
        }
    }
}

代码清单 16:故障排查工具

10. 选型与总结

10.1 ES vs 其他方案对比
方案优点缺点适用场景
Elasticsearch功能全,生态好,性能优秀资源消耗大,运维复杂全文搜索、日志分析
Solr成熟稳定,功能丰富社区活跃度下降,实时性差文档搜索、企业搜索
OpenSearchES 开源分支,AWS 支持生态不如 ESAWS 环境,需要完全开源
MeiliSearch轻量快速,简单易用功能相对简单小型应用,简单搜索
PostgreSQL事务支持,SQL 查询搜索功能弱,性能差已有 PG,简单搜索需求
10.2 最佳实践总结
  1. 分片设计要合理:单个分片不超过 50GB
  2. 映射设计要严谨:禁用动态映射,明确字段类型
  3. 查询要优化:避免 wildcard,善用 filter
  4. 监控要全面:集群健康、性能指标、业务指标
  5. 容量要规划:提前规划扩容,设置水位线
  6. 备份要定期:定期快照,测试恢复

总结

Elasticsearch 是强大的搜索引擎,但不是银弹。理解原理,合理设计,持续监控,才能用好这个强大的工具。

团队在实践中常遇到分片数不合理、查询未优化、缺乏监控导致故障等问题。

记住:ES 是工具,不是魔法。结合业务特点,设计合适方案,做好监控和优化,才是正道。

参考资料

官方文档
  1. Elasticsearch 官方文档 - 最全的 ES 文档
  2. Java 客户端文档 - Java 客户端详细文档
源码学习
  1. Elasticsearch 源码 - 官方源码
  2. Lucene 源码 - 底层搜索引擎
最佳实践
  1. Elasticsearch 最佳实践 - 官方最佳实践
  2. 电商搜索架构 - 电商搜索架构设计
监控工具
  1. Kibana 监控 - ES 官方监控工具
  2. Prometheus 监控 - 指标监控

最后建议:从简单场景开始,理解原理后再尝试复杂方案。做好监控,设置合理的分片和副本,定期优化查询。记住:搜索优化是个持续的过程,不是一次性的任务。

目录

  1. 生产环境中的常见挑战
  2. 摘要
  3. 1. 为什么选择 Elasticsearch?
  4. 1.1 从数据库的痛苦说起
  5. 1.2 Elasticsearch 的优势
  6. 2. ES 核心架构解析
  7. 2.1 集群架构
  8. 2.2 索引与分片
  9. 3. Java 客户端实战
  10. 3.1 客户端选型对比
  11. 3.2 RestHighLevelClient 配置
  12. 3.3 Spring Data Elasticsearch 配置
  13. 4. 索引设计最佳实践
  14. 4.1 索引生命周期管理
  15. 4.2 映射设计技巧
  16. 5. 查询优化实战
  17. 5.1 查询类型对比
  18. 5.2 性能优化技巧
  19. 6. 批量操作与实时性
  20. 6.1 Bulk 批量操作
  21. 6.2 实时性控制
  22. 7. 企业级实战案例
  23. 7.1 电商商品搜索系统
  24. 7.2 日志分析系统
  25. 8. 性能优化与监控
  26. 8.1 性能调优
  27. 8.2 监控告警
  28. prometheus 配置
  29. 告警规则
  30. 9. 故障排查指南
  31. 9.1 常见问题排查
  32. 10. 选型与总结
  33. 10.1 ES vs 其他方案对比
  34. 10.2 最佳实践总结
  35. 总结
  36. 参考资料
  37. 官方文档
  38. 源码学习
  39. 最佳实践
  40. 监控工具
  • 💰 8折买阿里云服务器限时8折了解详情
  • GPT-5.5 超高智商模型1元抵1刀ChatGPT中转购买
  • 代充Chatgpt Plus/pro 帐号了解详情
  • 🤖 一键搭建Deepseek满血版了解详情
  • 一键打造专属AI 智能体了解详情
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • 现阶段大模型的主要行业应用场景分析
  • AI 编程工具收费模式变革:Token 计费时代的开发者生存指南
  • OpenClaw 本地部署配置飞书机器人指南
  • VS Code 关闭 Copilot 代码自动补全设置
  • Alpine Linux apk 包管理器使用指南
  • Arduino BLDC 机器人 IMU 角度读取与 PID 互补滤波控制
  • 前端国际化最佳实践与本地化方案
  • 火山引擎大模型语音识别 ASR 技术实践与代码实现
  • 实测 Gemini Pro:谷歌多模态 AI 的实际应用能力
  • 2024 年人工智能发展趋势全景解析:十大核心方向
  • Kimi K2.5 开源权重多模态旗舰大模型详解
  • KouriChat 本地部署 AI 聊天助手并接入微信远程管理
  • Whisper-large-v3-turbo 深度解析:8 倍速语音识别技术
  • Alf.io 开源活动票务管理系统技术解析
  • Stable Diffusion 数据集标签编辑工具使用指南
  • Ubuntu 安装硬盘分区方案与实践
  • 强化学习与 DeepSeek-R1 训练原理详解
  • Linux 常用命令汇总
  • Face Analysis WebUI 使用 Gradio share 开启远程临时链接调试
  • 宇树科技机器人核心技术详解

相关免费在线工具

  • Keycode 信息

    查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online

  • Escape 与 Native 编解码

    JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online

  • JavaScript / HTML 格式化

    使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online

  • JavaScript 压缩与混淆

    Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online

  • 加密/解密文本

    使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online

  • Gemini 图片去水印

    基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online