跳到主要内容Elasticsearch 核心概念与 Java 客户端实战 | 极客日志Javajava算法
Elasticsearch 核心概念与 Java 客户端实战
Elasticsearch 基于 Lucene 实现分布式全文检索。解析集群架构、分片原理及 Java 客户端配置,涵盖 RestHighLevelClient 与 Spring Data Elasticsearch 选型。提供索引设计、查询优化、批量操作及监控告警方案。通过电商搜索与日志分析案例,展示性能调优与故障排查方法,包含 JVM 设置、分片策略及冷热数据管理最佳实践。
神经兮兮24 浏览 生产环境中的常见挑战
在生产环境中,Elasticsearch 常面临性能与稳定性挑战。例如电商系统上线初期可能因分词器配置错误导致搜索结果不准确,大促期间集群 CPU 100% 往往源于 wildcard 查询使用不当。日志系统存储 TB 级数据时,分片数设置错误会导致管理开销巨大。向量搜索场景下,Java High Level Client 内存泄漏需通过正确关闭 BulkProcessor 解决。
经验表明:理解 ES 原理的开发者能避免搜索引擎埋雷风险。
摘要
Elasticsearch 是基于 Lucene 的分布式搜索引擎,通过倒排索引实现毫秒级检索。本文深度解析 ES 集群架构、分片原理、查询优化机制,揭秘 Java 客户端的最佳实践。通过完整电商搜索实战,对比不同查询方式的性能差异,提供索引设计、查询优化、集群监控等核心问题的解决方案。包含企业级配置模板、性能调优数据和故障排查手册。
1. 为什么选择 Elasticsearch?
1.1 从数据库的痛苦说起
MySQL 做搜索的典型问题如下:
SELECT * FROM products WHERE name LIKE '%手机%' OR description LIKE '%手机%' OR tags LIKE '%手机%' ORDER BY create_time DESC LIMIT 100 OFFSET 0;
图 1:MySQL 搜索的问题

MySQL 搜索的痛点:
- LIKE '%xxx%' 导致全表扫描
- 多字段 OR 查询性能极差
- 无法支持复杂评分排序
- 分词、同义词、拼音搜索不支持
1.2 Elasticsearch 的优势
ES 的倒排索引(Inverted Index)是核心:
public class InvertedIndex {
Map<String, List<Posting>> index = new HashMap<>();
}
图 2:MySQL vs ES 搜索流程对比

| 场景 | MySQL | Elasticsearch | 性能差距 |
|---|
| 单字段模糊查询 | 3200ms | 45ms | 71 倍 |
| 多字段 OR 查询 | 8500ms | 65ms | 130 倍 |
| 复杂条件 + 排序 | 12000ms | 85ms | 141 倍 |
| 内存占用 | 4.2GB | 1.8GB | 57% |
2. ES 核心架构解析
2.1 集群架构
图 3:ES 集群架构

- 主节点(Master):管理集群状态、分片分配
- 数据节点(Data):存储数据、执行 CRUD
- 协调节点(Coordinating):路由请求、聚合结果
- 摄取节点(Ingest):数据预处理
2.2 索引与分片
@Configuration
public class IndexConfig {
public void createProductIndex(RestHighLevelClient client) throws IOException {
CreateIndexRequest request = new CreateIndexRequest("products");
request.settings(Settings.builder()
.put("index.number_of_shards", 3)
.put("index.number_of_replicas", 1)
.put("index.refresh_interval", "1s")
.put("analysis.analyzer.default.type", "ik_max_word"));
XContentBuilder mapping = JsonXContent.contentBuilder()
.startObject()
.startObject("properties")
.startObject("id")
.field("type", "keyword")
.endObject()
.startObject("name")
.field("type", "text")
.field("analyzer", "ik_max_word")
.field("search_analyzer", "ik_smart")
.field("fields", JsonXContent.contentBuilder()
.startObject()
.startObject("pinyin")
.field("type", "text")
.field("analyzer", "pinyin_analyzer")
.endObject()
.startObject("keyword")
.field("type", "keyword")
.field("ignore_above", 256)
.endObject()
.endObject())
.endObject()
.startObject("price")
.field("type", "double")
.endObject()
.startObject("category_id")
.field("type", "integer")
.endObject()
.startObject("sales")
.field("type", "integer")
.endObject()
.startObject("create_time")
.field("type", "date")
.field("format", "yyyy-MM-dd HH:mm:ss||epoch_millis")
.endObject()
.endObject()
.endObject();
request.mapping(mapping);
CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
if (response.isAcknowledged()) {
log.info("索引创建成功");
}
}
}
图 4:分片与副本分布

- 单个分片不超过 50GB
- 分片数 = 数据总量 / 50GB
- 副本数 = 节点数 - 1(至少 1 个)
- 避免过度分片(每个分片有开销)
3. Java 客户端实战
3.1 客户端选型对比
| 客户端 | 优点 | 缺点 | 推荐场景 |
|---|
| RestHighLevelClient | 官方维护,功能全 | 笨重,API 复杂 | 新项目,需要完整功能 |
| Java Low Level Client | 轻量,灵活 | 需要手动处理 JSON | 简单查询,性能敏感 |
| Spring Data Elasticsearch | 简洁,集成 Spring | 版本兼容问题 | Spring Boot 项目 |
| JestClient | 简单易用 | 已停止维护 | 不推荐新项目 |
我们的选择:新项目用 RestHighLevelClient,Spring Boot 项目用 Spring Data Elasticsearch。
3.2 RestHighLevelClient 配置
@Configuration
@Slf4j
public class ElasticsearchConfig {
@Value("${elasticsearch.hosts:localhost:9200}")
private String hosts;
@Value("${elasticsearch.username:}")
private String username;
@Value("${elasticsearch.password:}")
private String password;
@Bean
public RestHighLevelClient restHighLevelClient() {
String[] hostArray = hosts.split(",");
HttpHost[] httpHosts = new HttpHost[hostArray.length];
for (int i = 0; i < hostArray.length; i++) {
String[] hostPort = hostArray[i].split(":");
httpHosts[i] = new HttpHost(
hostPort[0].trim(),
Integer.parseInt(hostPort[1].trim()),
"http");
}
RestClientBuilder builder = RestClient.builder(httpHosts)
.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
.setConnectTimeout(5000)
.setSocketTimeout(60000)
.setConnectionRequestTimeout(1000))
.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setMaxConnTotal(100)
.setMaxConnPerRoute(50)
.setKeepAliveStrategy((response, context) -> 60000);
if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(username, password));
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
return httpClientBuilder;
})
.setFailureListener(new RestClient.FailureListener() {
@Override
public void onFailure(Node node) {
log.error("ES 节点连接失败:{}", node.getHost());
}
});
return new RestHighLevelClient(builder);
}
@Scheduled(fixedDelay = 30000)
public void checkClusterHealth() {
try {
ClusterHealthRequest request = new ClusterHealthRequest();
ClusterHealthResponse response = restHighLevelClient().cluster().health(request, RequestOptions.DEFAULT);
ClusterHealthStatus status = response.getStatus();
if (status != ClusterHealthStatus.GREEN) {
log.warn("ES 集群状态异常:{}, 详情:{}", status, response.toString());
alertService.sendAlert("ES 集群状态异常", "状态:" + status + ", 详情:" + response.toString());
}
} catch (Exception e) {
log.error("ES 集群健康检查失败", e);
}
}
}
3.3 Spring Data Elasticsearch 配置
@Configuration
@EnableElasticsearchRepositories(basePackages = "com.example.repository")
@Slf4j
public class SpringDataESConfig {
@Value("${elasticsearch.hosts:localhost:9200}")
private String hosts;
@Bean
public RestHighLevelClient elasticsearchClient() {
return restHighLevelClient();
}
@Bean
public ElasticsearchRestTemplate elasticsearchTemplate() {
return new ElasticsearchRestTemplate(elasticsearchClient());
}
@Document(indexName = "products", createIndex = false)
@Setting(settingPath = "/settings/product-settings.json")
@Mapping(mappingPath = "/mappings/product-mapping.json")
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Product {
@Id
private String id;
@Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
@MultiField(mainField = @Field(type = FieldType.Text, analyzer = "ik_max_word"), otherFields = {
@InnerField(suffix = "pinyin", type = FieldType.Text, analyzer = "pinyin"),
@InnerField(suffix = "keyword", type = FieldType.Keyword)
})
private String name;
@Field(type = FieldType.Double)
private Double price;
@Field(type = FieldType.Integer)
private Integer categoryId;
@Field(type = FieldType.Integer)
private Integer sales;
@Field(type = FieldType.Date, format = DateFormat.date_hour_minute_second)
private Date createTime;
@GeoPointField
private GeoPoint location;
}
public interface ProductRepository extends ElasticsearchRepository<Product, String> {
List<Product> findByName(String name);
List<Product> findByPriceBetween(Double minPrice, Double maxPrice);
Page<Product> findByCategoryId(Integer categoryId, Pageable pageable);
@Query("{\"match\": {\"name\": \"?0\"}}")
List<Product> findByNameCustom(String name);
@Query("{\"bool\": {\"must\": [" +
"{\"match\": {\"name\": \"?0\"}}," +
"{\"range\": {\"price\": {\"gte\": ?1, \"lte\": ?2}}}" +
"]}}")
List<Product> searchByNameAndPriceRange(String name, Double minPrice, Double maxPrice);
}
}
4. 索引设计最佳实践
4.1 索引生命周期管理
@Component
@Slf4j
public class IndexLifecycleManager {
public String getDailyIndex(String indexPrefix) {
String date = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy.MM.dd"));
return indexPrefix + "-" + date;
}
public void createIndexWithAlias(String indexName, String alias) throws IOException {
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.settings(getIndexSettings());
CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
if (response.isAcknowledged()) {
IndicesAliasesRequest aliasRequest = new IndicesAliasesRequest();
AliasActions aliasAction = new AliasActions(AliasActions.Type.ADD)
.index(indexName)
.alias(alias);
aliasRequest.addAliasAction(aliasAction);
client.indices().updateAliases(aliasRequest, RequestOptions.DEFAULT);
log.info("创建索引成功:{},别名:{}", indexName, alias);
}
}
@Scheduled(cron = "0 0 0 * * ?")
public void rolloverIndex() throws IOException {
String todayIndex = getDailyIndex("logs");
String alias = "logs-current";
GetAliasesRequest request = new GetAliasesRequest(alias);
GetAliasesResponse response = client.indices().getAlias(request, RequestOptions.DEFAULT);
Map<String, Set<AliasMetadata>> aliases = response.getAliases();
if (aliases.isEmpty()) {
createIndexWithAlias(todayIndex, alias);
} else {
String currentIndex = aliases.keySet().iterator().next();
IndexStats stats = getIndexStats(currentIndex);
if (stats.getStoreSizeInBytes() > 50L * 1024 * 1024 * 1024 || isIndexOlderThan(currentIndex, 7)) {
createIndexWithAlias(todayIndex, alias);
switchAlias(alias, currentIndex, todayIndex);
closeOldIndex(currentIndex);
}
}
}
public void manageIndexLifecycle() throws IOException {
closeIndicesOlderThan(30);
deleteIndicesOlderThan(90);
forceMergeIndicesOlderThan(7);
}
public void createIndexTemplate() throws IOException {
PutIndexTemplateRequest request = new PutIndexTemplateRequest("logs-template");
request.patterns(Arrays.asList("logs-*"));
request.settings(getIndexSettings());
request.mapping(getIndexMapping());
request.alias(new Alias("logs-all"));
request.order(1);
request.version(1);
PutIndexTemplateResponse response = client.indices()
.putTemplate(request, RequestOptions.DEFAULT);
if (response.isAcknowledged()) {
log.info("索引模板创建成功");
}
}
}
4.2 映射设计技巧
{
"mappings": {
"dynamic": "strict",
"_source": {
"enabled": true,
"excludes": ["big_field"]
},
"properties": {
"id": {
"type": "keyword",
"ignore_above": 256
},
"title": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart",
"fields": {
"pinyin": {
"type": "text",
"analyzer": "pinyin_analyzer"
},
"keyword": {
"type": "keyword",
"ignore_above": 256
},
"completion": {
"type": "completion",
"analyzer": "simple"
}
}
},
"price": {
"type": "scaled_float",
"scaling_factor": 100
},
"location": {
"type": "geo_point"
},
"tags": {
"type": "text",
"analyzer": "whitespace"
},
"attributes": {
"type": "nested",
"properties": {
"key": {
"type": "keyword"
},
"value": {
"type": "keyword"
}
}
},
"vector": {
"type": "dense_vector",
"dims": 128
}
}
}
}
5. 查询优化实战
5.1 查询类型对比
@Service
@Slf4j
public class ProductSearchService {
public SearchResponse searchByMatch(String keyword) throws IOException {
SearchRequest request = new SearchRequest("products");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchQuery("name", keyword)
.analyzer("ik_smart")
.operator(Operator.AND)
.minimumShouldMatch("75%"));
sourceBuilder.from(0);
sourceBuilder.size(20);
sourceBuilder.sort(SortBuilders.scoreSort());
request.source(sourceBuilder);
return client.search(request, RequestOptions.DEFAULT);
}
public SearchResponse searchByMultiMatch(String keyword) throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.multiMatchQuery(keyword)
.fields("name^3", "name.pinyin^2", "description")
.type(MultiMatchQueryBuilder.Type.BEST_FIELDS)
.tieBreaker(0.3f));
return executeSearch(sourceBuilder);
}
public SearchResponse searchByTerm(String category) throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.termQuery("category.keyword", category));
return executeSearch(sourceBuilder);
}
public SearchResponse searchByBool(String keyword, Double minPrice, Double maxPrice, String category) throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
if (StringUtils.isNotBlank(keyword)) {
boolQuery.must(QueryBuilders.matchQuery("name", keyword));
}
if (minPrice != null || maxPrice != null) {
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("price");
if (minPrice != null) {
rangeQuery.gte(minPrice);
}
if (maxPrice != null) {
rangeQuery.lte(maxPrice);
}
boolQuery.filter(rangeQuery);
}
if (StringUtils.isNotBlank(category)) {
boolQuery.should(QueryBuilders.termQuery("category.keyword", category))
.minimumShouldMatch(1);
}
boolQuery.mustNot(QueryBuilders.termQuery("status", "deleted"));
sourceBuilder.query(boolQuery);
return executeSearch(sourceBuilder);
}
public SearchResponse searchWithAggregation() throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchAllQuery());
sourceBuilder.size(0);
AggregationBuilder priceAgg = AggregationBuilders
.range("price_ranges")
.field("price")
.addRange(0, 100)
.addRange(100, 500)
.addRange(500, 1000)
.addRange(1000, 5000)
.addRange(5000, Double.MAX_VALUE);
AggregationBuilder categoryAgg = AggregationBuilders
.terms("categories")
.field("category.keyword")
.size(10);
AggregationBuilder nestedAgg = AggregationBuilders
.nested("attributes", "attributes")
.subAggregation(AggregationBuilders
.terms("attr_keys")
.field("attributes.key")
.size(10));
sourceBuilder.aggregation(priceAgg);
sourceBuilder.aggregation(categoryAgg);
sourceBuilder.aggregation(nestedAgg);
return executeSearch(sourceBuilder);
}
public SuggestResponse searchSuggest(String prefix) throws IOException {
SearchRequest request = new SearchRequest("products");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.size(0);
CompletionSuggestionBuilder suggestion = SuggestBuilders.completionSuggestion("name.completion")
.prefix(prefix, Fuzziness.AUTO)
.skipDuplicates(true)
.size(10);
SuggestBuilder suggestBuilder = new SuggestBuilder();
suggestBuilder.addSuggestion("product_suggest", suggestion);
sourceBuilder.suggest(suggestBuilder);
request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
return response.getSuggest();
}
}
5.2 性能优化技巧
@Component
@Slf4j
public class QueryOptimizer {
public SearchResponse searchWithPagination(String keyword, int page, int size) throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
if (page > 1) {
Object[] searchAfter = getSearchAfterValues(page - 1, size);
sourceBuilder.searchAfter(searchAfter);
}
sourceBuilder.size(size);
sourceBuilder.sort(SortBuilders.fieldSort("_id"));
return executeSearch(sourceBuilder);
}
public SearchResponse optimizedSearch(String keyword) throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
QueryBuilders.matchQuery("name.ngram", keyword);
if (keyword.length() < 10) {
QueryBuilders.prefixQuery("name.keyword", keyword);
}
QueryBuilders.matchQuery("name.edge_ngram", keyword);
return executeSearch(sourceBuilder);
}
public SearchResponse searchWithFilterCache() throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.filter(QueryBuilders.termQuery("status", "active"));
boolQuery.filter(QueryBuilders.rangeQuery("price")
.gte(100).lte(1000));
boolQuery.must(QueryBuilders.matchQuery("name", "手机"));
sourceBuilder.query(boolQuery);
return executeSearch(sourceBuilder);
}
public SearchResponse searchWithFieldData() throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.sort(SortBuilders.fieldSort("name.keyword"));
sourceBuilder.docValueField("name.keyword");
return executeSearch(sourceBuilder);
}
public SearchResponse searchWithTimeout(String keyword, long timeoutMs) throws IOException {
SearchRequest request = new SearchRequest("products");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.timeout(new TimeValue(timeoutMs, TimeUnit.MILLISECONDS));
sourceBuilder.query(QueryBuilders.matchQuery("name", keyword));
request.source(sourceBuilder);
RequestOptions.Builder options = RequestOptions.DEFAULT.toBuilder();
options.setHttpAsyncResponseConsumerFactory(
new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024));
return client.search(request, options.build());
}
public List<SearchResponse> parallelSearch(List<String> keywords) throws InterruptedException, ExecutionException {
List<CompletableFuture<SearchResponse>> futures = new ArrayList<>();
for (String keyword : keywords) {
CompletableFuture<SearchResponse> future = CompletableFuture.supplyAsync(() -> {
try {
return searchByMatch(keyword);
} catch (IOException e) {
throw new RuntimeException(e);
}
}, executor);
futures.add(future);
}
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allFutures.get();
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
}
6. 批量操作与实时性
6.1 Bulk 批量操作
@Component
@Slf4j
public class BulkOperationService {
public void bulkIndexProducts(List<Product> products) throws IOException {
BulkRequest request = new BulkRequest();
for (Product product : products) {
IndexRequest indexRequest = new IndexRequest("products")
.id(product.getId())
.source(JsonUtils.toJson(product), XContentType.JSON);
request.add(indexRequest);
}
BulkResponse response = client.bulk(request, RequestOptions.DEFAULT);
if (response.hasFailures()) {
log.error("Bulk 操作失败:{}", response.buildFailureMessage());
for (BulkItemResponse item : response.getItems()) {
if (item.isFailed()) {
log.error("文档{}失败:{}", item.getId(), item.getFailureMessage());
retryFailedItem(item);
}
}
} else {
log.info("Bulk 操作成功,处理{}个文档", products.size());
}
}
public void bulkWithListener(List<Product> products) {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
log.info("开始执行 Bulk,包含{}个请求", request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
log.error("Bulk 执行失败:{}", response.buildFailureMessage());
} else {
log.info("Bulk 执行成功,耗时{}ms", response.getTook().getMillis());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
log.error("Bulk 执行异常", failure);
}
};
BulkProcessor.Builder builder = BulkProcessor.builder(
(request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener);
builder.setBulkActions(1000)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(2)
.setBackoffPolicy(BackoffPolicy
.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
BulkProcessor processor = builder.build();
for (Product product : products) {
IndexRequest request = new IndexRequest("products")
.id(product.getId())
.source(JsonUtils.toJson(product), XContentType.JSON);
processor.add(request);
}
processor.awaitClose(30, TimeUnit.SECONDS);
}
public void bulkUpdateProducts(List<Product> products) throws IOException {
BulkRequest request = new BulkRequest();
for (Product product : products) {
UpdateRequest updateRequest = new UpdateRequest("products", product.getId())
.doc(JsonUtils.toJson(product), XContentType.JSON)
.docAsUpsert(true);
request.add(updateRequest);
}
client.bulk(request, RequestOptions.DEFAULT);
}
public void bulkDeleteProducts(List<String> ids) throws IOException {
BulkRequest request = new BulkRequest();
for (String id : ids) {
DeleteRequest deleteRequest = new DeleteRequest("products", id);
request.add(deleteRequest);
}
client.bulk(request, RequestOptions.DEFAULT);
}
public MultiGetResponse bulkGetProducts(List<String> ids) throws IOException {
MultiGetRequest request = new MultiGetRequest();
for (String id : ids) {
request.add(new MultiGetRequest.Item("products", id));
}
return client.mget(request, RequestOptions.DEFAULT);
}
}
6.2 实时性控制
@Component
@Slf4j
public class RealtimeControlService {
public void indexWithRefresh(Product product) throws IOException {
IndexRequest request = new IndexRequest("products")
.id(product.getId())
.source(JsonUtils.toJson(product), XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(request, RequestOptions.DEFAULT);
}
public void indexWithWaitFor(Product product) throws IOException {
IndexRequest request = new IndexRequest("products")
.id(product.getId())
.source(JsonUtils.toJson(product), XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
client.index(request, RequestOptions.DEFAULT);
}
public void manualRefresh() throws IOException {
RefreshRequest request = new RefreshRequest("products");
RefreshResponse response = client.indices().refresh(request, RequestOptions.DEFAULT);
log.info("手动刷新完成,分片数:{}", response.getTotalShards());
}
public void forceMerge() throws IOException {
ForceMergeRequest request = new ForceMergeRequest("products");
request.maxNumSegments(1);
request.onlyExpungeDeletes(true);
ForceMergeResponse response = client.indices().forcemerge(request, RequestOptions.DEFAULT);
log.info("强制合并完成,分片数:{}", response.getTotalShards());
}
public void updateRefreshInterval(int seconds) throws IOException {
UpdateSettingsRequest request = new UpdateSettingsRequest("products");
Settings settings = Settings.builder()
.put("index.refresh_interval", seconds + "s")
.build();
request.settings(settings);
AcknowledgedResponse response = client.indices()
.putSettings(request, RequestOptions.DEFAULT);
if (response.isAcknowledged()) {
log.info("更新刷新间隔成功:{}秒", seconds);
}
}
public SearchResponse realtimeSearch(String keyword) throws IOException {
SearchResponse response = searchByMatch(keyword);
if (response.getHits().getTotalHits().value == 0) {
try {
Thread.sleep(1000);
response = searchByMatch(keyword);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
return response;
}
}
7. 企业级实战案例
7.1 电商商品搜索系统
@Service
@Slf4j
public class ECommerceSearchService {
public SearchResult<Product> searchProducts(ProductSearchRequest request) throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
if (StringUtils.isNotBlank(request.getKeyword())) {
handleKeywordSearch(boolQuery, request.getKeyword());
}
if (request.getCategoryId() != null) {
boolQuery.filter(QueryBuilders.termQuery("category_id", request.getCategoryId()));
}
if (request.getMinPrice() != null || request.getMaxPrice() != null) {
RangeQueryBuilder priceRange = QueryBuilders.rangeQuery("price");
if (request.getMinPrice() != null) {
priceRange.gte(request.getMinPrice());
}
if (request.getMaxPrice() != null) {
priceRange.lte(request.getMaxPrice());
}
boolQuery.filter(priceRange);
}
if (CollectionUtils.isNotEmpty(request.getBrandIds())) {
boolQuery.filter(QueryBuilders.termsQuery("brand_id", request.getBrandIds()));
}
if (CollectionUtils.isNotEmpty(request.getAttributes())) {
handleAttributesFilter(boolQuery, request.getAttributes());
}
if (request.getLocation() != null && request.getDistance() != null) {
handleGeoFilter(boolQuery, request.getLocation(), request.getDistance());
}
sourceBuilder.query(boolQuery);
handleSorting(sourceBuilder, request.getSortBy(), request.getSortOrder());
sourceBuilder.from((request.getPage() - 1) * request.getSize())
.size(request.getSize());
if (StringUtils.isNotBlank(request.getKeyword())) {
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.field("name")
.preTags("<em>")
.postTags("</em>")
.fragmentSize(200)
.numOfFragments(3);
sourceBuilder.highlighter(highlightBuilder);
}
addAggregations(sourceBuilder);
SearchResponse response = executeSearch(sourceBuilder);
return processSearchResult(response, request);
}
private void handleKeywordSearch(BoolQueryBuilder boolQuery, String keyword) {
BoolQueryBuilder keywordQuery = QueryBuilders.boolQuery();
keywordQuery.should(QueryBuilders.matchPhraseQuery("name", keyword)
.boost(3.0f));
keywordQuery.should(QueryBuilders.matchQuery("name.pinyin", keyword)
.boost(2.0f));
keywordQuery.should(QueryBuilders.matchQuery("name", keyword)
.boost(1.0f));
keywordQuery.should(QueryBuilders.matchQuery("description", keyword)
.boost(0.5f));
keywordQuery.should(QueryBuilders.matchQuery("tags", keyword)
.boost(0.3f));
boolQuery.must(keywordQuery);
}
private void handleAttributesFilter(BoolQueryBuilder boolQuery, List<AttributeFilter> attributes) {
for (AttributeFilter attr : attributes) {
NestedQueryBuilder nestedQuery = QueryBuilders.nestedQuery(
"attributes",
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("attributes.key", attr.getKey()))
.must(QueryBuilders.termsQuery("attributes.value", attr.getValues())),
ScoreMode.None);
boolQuery.filter(nestedQuery);
}
}
private void handleGeoFilter(BoolQueryBuilder boolQuery, GeoPoint location, String distance) {
GeoDistanceQueryBuilder geoQuery = QueryBuilders.geoDistanceQuery("location")
.point(location.getLat(), location.getLon())
.distance(distance);
boolQuery.filter(geoQuery);
}
private void handleSorting(SearchSourceBuilder sourceBuilder, String sortBy, String sortOrder) {
if ("price".equals(sortBy)) {
sourceBuilder.sort("price", "desc".equals(sortOrder) ? SortOrder.DESC : SortOrder.ASC);
} else if ("sales".equals(sortBy)) {
sourceBuilder.sort("sales", SortOrder.DESC);
} else if ("score".equals(sortBy)) {
sourceBuilder.sort(SortBuilders.scoreSort());
} else {
Script script = new Script(
"doc['_score'].value * 0.6 + " +
"doc['sales'].value * 0.3 + " +
"(10000 - doc['price'].value) * 0.1");
sourceBuilder.sort(SortBuilders.scriptSort(script, ScriptSortBuilder.ScriptSortType.NUMBER));
}
}
private void addAggregations(SearchSourceBuilder sourceBuilder) {
AggregationBuilder priceAgg = AggregationBuilders
.range("price_agg")
.field("price")
.addRange(0, 100)
.addRange(100, 300)
.addRange(300, 500)
.addRange(500, 1000)
.addRange(1000, 5000);
AggregationBuilder brandAgg = AggregationBuilders
.terms("brand_agg")
.field("brand_id")
.size(20);
AggregationBuilder categoryAgg = AggregationBuilders
.terms("category_agg")
.field("category_id")
.size(10);
AggregationBuilder attributeAgg = AggregationBuilders
.nested("attributes_agg", "attributes")
.subAggregation(AggregationBuilders
.terms("attribute_keys")
.field("attributes.key")
.size(10)
.subAggregation(AggregationBuilders
.terms("attribute_values")
.field("attributes.value")
.size(10)));
sourceBuilder.aggregation(priceAgg);
sourceBuilder.aggregation(brandAgg);
sourceBuilder.aggregation(categoryAgg);
sourceBuilder.aggregation(attributeAgg);
}
private SearchResult<Product> processSearchResult(SearchResponse response, ProductSearchRequest request) {
SearchResult<Product> result = new SearchResult<>();
result.setTotal(response.getHits().getTotalHits().value);
List<Product> products = new ArrayList<>();
for (SearchHit hit : response.getHits()) {
Product product = JsonUtils.fromJson(hit.getSourceAsString(), Product.class);
product.setScore(hit.getScore());
if (hit.getHighlightFields() != null) {
HighlightField highlight = hit.getHighlightFields().get("name");
if (highlight != null && highlight.getFragments().length > 0) {
product.setHighlightName(highlight.getFragments()[0].string());
}
}
products.add(product);
}
result.setData(products);
processAggregations(result, response.getAggregations());
if (StringUtils.isNotBlank(request.getKeyword()) && products.isEmpty()) {
result.setSuggestions(getSuggestions(request.getKeyword()));
}
return result;
}
}
7.2 日志分析系统
@Service
@Slf4j
public class LogAnalysisService {
public LogSearchResult searchLogs(LogSearchRequest request) throws IOException {
String index = getLogIndex(request.getStartTime(), request.getEndTime());
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
RangeQueryBuilder timeRange = QueryBuilders.rangeQuery("@timestamp")
.gte(request.getStartTime())
.lte(request.getEndTime())
.format("yyyy-MM-dd HH:mm:ss");
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
.filter(timeRange);
if (StringUtils.isNotBlank(request.getKeyword())) {
boolQuery.must(QueryBuilders.queryStringQuery(request.getKeyword())
.field("message")
.field("level")
.field("logger_name")
.defaultOperator(Operator.AND));
}
if (CollectionUtils.isNotEmpty(request.getLevels())) {
boolQuery.filter(QueryBuilders.termsQuery("level", request.getLevels()));
}
if (StringUtils.isNotBlank(request.getApplication())) {
boolQuery.filter(QueryBuilders.termQuery("application", request.getApplication()));
}
sourceBuilder.query(boolQuery);
sourceBuilder.sort(SortBuilders.fieldSort("@timestamp")
.order(SortOrder.DESC));
sourceBuilder.from((request.getPage() - 1) * request.getSize())
.size(request.getSize());
addLogAggregations(sourceBuilder, request);
SearchResponse response = executeSearch(index, sourceBuilder);
return processLogSearchResult(response, request);
}
public ErrorStats getErrorStats(Date startTime, Date endTime, String application) throws IOException {
String index = getLogIndex(startTime, endTime);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
RangeQueryBuilder timeRange = QueryBuilders.rangeQuery("@timestamp")
.gte(startTime)
.lte(endTime);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
.filter(timeRange)
.filter(QueryBuilders.termQuery("level", "ERROR"));
if (StringUtils.isNotBlank(application)) {
boolQuery.filter(QueryBuilders.termQuery("application", application));
}
sourceBuilder.query(boolQuery);
sourceBuilder.size(0);
AggregationBuilder countAgg = AggregationBuilders
.cardinality("error_count")
.field("trace_id");
AggregationBuilder appAgg = AggregationBuilders
.terms("by_application")
.field("application")
.size(20)
.subAggregation(AggregationBuilders
.cardinality("app_error_count")
.field("trace_id"));
AggregationBuilder timeAgg = AggregationBuilders
.dateHistogram("by_time")
.field("@timestamp")
.calendarInterval(DateHistogramInterval.HOUR)
.format("yyyy-MM-dd HH:00")
.minDocCount(0)
.extendedBounds(
new LongBounds(startTime.getTime(), endTime.getTime()))
.subAggregation(AggregationBuilders
.cardinality("hour_error_count")
.field("trace_id"));
sourceBuilder.aggregation(countAgg);
sourceBuilder.aggregation(appAgg);
sourceBuilder.aggregation(timeAgg);
SearchResponse response = executeSearch(index, sourceBuilder);
return processErrorStats(response);
}
public SlowQueryStats analyzeSlowQueries(Date startTime, Date endTime) throws IOException {
String index = getLogIndex(startTime, endTime);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
.filter(QueryBuilders.rangeQuery("@timestamp")
.gte(startTime)
.lte(endTime))
.filter(QueryBuilders.termQuery("logger_name", "slow_query"))
.filter(QueryBuilders.rangeQuery("duration").gte(1000));
sourceBuilder.query(boolQuery);
sourceBuilder.size(0);
AggregationBuilder sqlTypeAgg = AggregationBuilders
.terms("by_sql_type")
.field("sql_type")
.size(10)
.subAggregation(AggregationBuilders
.avg("avg_duration")
.field("duration"))
.subAggregation(AggregationBuilders
.max("max_duration")
.field("duration"))
.subAggregation(AggregationBuilders
.percentiles("duration_percentiles")
.field("duration")
.percentiles(50, 90, 95, 99));
AggregationBuilder tableAgg = AggregationBuilders
.terms("by_table")
.field("table_name")
.size(20)
.subAggregation(AggregationBuilders
.avg("avg_duration")
.field("duration"));
sourceBuilder.aggregation(sqlTypeAgg);
sourceBuilder.aggregation(tableAgg);
SearchResponse response = executeSearch(index, sourceBuilder);
return processSlowQueryStats(response);
}
}
8. 性能优化与监控
8.1 性能调优
@Component
@Slf4j
public class PerformanceTuner {
public void tuneJvm() {
}
public Settings getOptimizedSettings() {
return Settings.builder()
.put("index.number_of_shards", 3)
.put("index.number_of_replicas", 1)
.put("index.refresh_interval", "30s")
.put("index.translog.durability", "async")
.put("index.translog.sync_interval", "5s")
.put("index.translog.flush_threshold_size", "512mb")
.put("index.merge.scheduler.max_thread_count", 1)
.put("index.merge.scheduler.max_merge_count", 6)
.put("index.unassigned.node_left.delayed_timeout", "5m")
.build();
}
public SearchSourceBuilder tuneSearch(SearchSourceBuilder builder) {
builder.query(QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("name", "手机"))
.filter(QueryBuilders.termQuery("status", "active")))
.requestCache(true);
builder.timeout(TimeValue.timeValueSeconds(5));
builder.fetchSource(new String[]{"id", "name", "price"}, null);
builder.query(QueryBuilders.constantScoreQuery(
QueryBuilders.termQuery("category", "electronics")));
return builder;
}
@Scheduled(fixedDelay = 60000)
public void collectMetrics() throws IOException {
ClusterHealthRequest healthRequest = new ClusterHealthRequest();
ClusterHealthResponse healthResponse = client.cluster()
.health(healthRequest, RequestOptions.DEFAULT);
NodesStatsRequest nodesRequest = new NodesStatsRequest();
nodesRequest.indices(true);
nodesRequest.os(true);
nodesRequest.jvm(true);
nodesRequest.threadPool(true);
NodesStatsResponse nodesResponse = client.nodes()
.stats(nodesRequest, RequestOptions.DEFAULT);
IndicesStatsRequest indicesRequest = new IndicesStatsRequest();
indicesRequest.all();
IndicesStatsResponse indicesResponse = client.indices()
.stats(indicesRequest, RequestOptions.DEFAULT);
sendToMonitoringSystem(healthResponse, nodesResponse, indicesResponse);
checkAndScale(healthResponse, nodesResponse, indicesResponse);
}
public void identifyHotShards() throws IOException {
IndicesStatsRequest request = new IndicesStatsRequest();
request.all();
IndicesStatsResponse response = client.indices()
.stats(request, RequestOptions.DEFAULT);
Map<String, IndicesStats> indices = response.getIndices();
for (Map.Entry<String, IndicesStats> entry : indices.entrySet()) {
String index = entry.getKey();
IndicesStats stats = entry.getValue();
ShardStats[] shardStats = stats.getShards();
for (ShardStats shardStat : shardStats) {
if (shardStat.getStats().getSearch().getTotal().getQueryCount() > 10000) {
log.warn("热点分片:{}/{},查询次数:{}", index, shardStat.getShardRouting().getId(), shardStat.getStats().getSearch().getTotal().getQueryCount());
rerouteHotShard(index, shardStat);
}
}
}
}
}
8.2 监控告警
scrape_configs:
- job_name: 'elasticsearch'
static_configs:
- targets: ['localhost:9200']
metrics_path: '/_prometheus/metrics'
alerting_rules:
- alert: ClusterHealthRed
expr: elasticsearch_cluster_health_status{color="red"} == 1
for: 5m
labels:
severity: critical
annotations:
summary: "ES 集群状态为 RED"
- alert: HighCpuUsage
expr: rate(elasticsearch_process_cpu_percent[5m]) > 0.8
for: 2m
labels:
severity: warning
annotations:
summary: "ES 节点 CPU 使用率过高"
- alert: HighHeapUsage
expr: elasticsearch_jvm_memory_used_bytes / elasticsearch_jvm_memory_max_bytes > 0.8
for: 2m
labels:
severity: warning
annotations:
summary: "ES 节点堆内存使用率过高"
- alert: HighDiskUsage
expr: elasticsearch_filesystem_data_used_bytes / elasticsearch_filesystem_data_size_bytes > 0.8
for: 5m
labels:
severity: warning
annotations:
summary: "ES 节点磁盘使用率过高"
- alert: HighSearchLatency
expr: histogram_quantile(0.95, rate(elasticsearch_indices_search_query_time_seconds_bucket[5m])) > 1
for: 2m
labels:
severity: warning
annotations:
summary: "ES 搜索延迟过高"
- alert: UnassignedShards
expr: elasticsearch_cluster_health_unassigned_shards > 0
for: 5m
labels:
severity: critical
annotations:
summary: "ES 有未分配的分片"
9. 故障排查指南
9.1 常见问题排查
@Component
@Slf4j
public class TroubleshootingGuide {
public void diagnoseSlowSearch(String index, String query) throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchQuery("name", query));
sourceBuilder.profile(true);
SearchRequest request = new SearchRequest(index);
request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
Map<String, ProfileShardResult> profileResults = response.getProfileResults();
for (Map.Entry<String, ProfileShardResult> entry : profileResults.entrySet()) {
log.info("分片{}的 profile 结果:", entry.getKey());
for (QueryProfileShardResult queryProfile : entry.getValue().getQueryProfileResults()) {
log.info("查询类型:{}, 耗时:{}ms", queryProfile.getQueryName(), queryProfile.getTime());
for (ProfileResult profileResult : queryProfile.getQueryResults()) {
log.info("详细信息:{}", profileResult);
}
}
}
}
public void diagnoseSlowIndexing(String index) throws IOException {
GetSettingsRequest settingsRequest = new GetSettingsRequest()
.indices(index)
.names("index.refresh_interval");
GetSettingsResponse settingsResponse = client.indices()
.getSettings(settingsRequest, RequestOptions.DEFAULT);
log.info("refresh 间隔:{}", settingsResponse.getSetting(index, "index.refresh_interval"));
IndicesStatsRequest statsRequest = new IndicesStatsRequest()
.indices(index)
.clear()
.docs(true)
.store(true)
.indexing(true)
.search(true);
IndicesStatsResponse statsResponse = client.indices()
.stats(statsRequest, RequestOptions.DEFAULT);
IndexStats indexStats = statsResponse.getIndex(index);
log.info("索引统计:{}", indexStats);
}
public void diagnoseHighMemory() throws IOException {
NodesStatsRequest request = new NodesStatsRequest();
request.jvm(true);
NodesStatsResponse response = client.nodes()
.stats(request, RequestOptions.DEFAULT);
for (NodeStats nodeStats : response.getNodes()) {
JvmStats jvmStats = nodeStats.getJvm();
JvmStats.Mem mem = jvmStats.getMem();
log.info("节点{}内存使用:{}/{}, GC 次数:{}", nodeStats.getNode().getName(), mem.getUsed(), mem.getHeapMax(), jvmStats.getGc().getCollectors().get("young").getCollectionCount());
if (nodeStats.getIndices().getFieldData() != null) {
log.info("fielddata 内存:{}", nodeStats.getIndices().getFieldData().getMemorySize());
}
}
}
public void diagnoseUnassignedShards() throws IOException {
ClusterAllocationExplainRequest request = new ClusterAllocationExplainRequest();
ClusterAllocationExplainResponse response = client.cluster()
.allocationExplain(request, RequestOptions.DEFAULT);
log.info("分片分配解释:{}", response.getExplanation());
NodesStatsRequest nodesRequest = new NodesStatsRequest();
nodesRequest.fs(true);
NodesStatsResponse nodesResponse = client.nodes()
.stats(nodesRequest, RequestOptions.DEFAULT);
for (NodeStats nodeStats : nodesResponse.getNodes()) {
FsInfo fsInfo = nodeStats.getFs();
for (FsInfo.Path path : fsInfo) {
log.info("节点{}磁盘使用:{}/{} ({}%)", nodeStats.getNode().getName(), path.getAvailable(), path.getTotal(), (path.getTotal() - path.getAvailable()) * 100 / path.getTotal());
}
}
}
public void diagnoseHotNodes() throws IOException {
NodesStatsRequest request = new NodesStatsRequest();
request.indices(true);
request.os(true);
NodesStatsResponse response = client.nodes()
.stats(request, RequestOptions.DEFAULT);
for (NodeStats nodeStats : response.getNodes()) {
long queryCount = nodeStats.getIndices().getSearch()
.getTotal().getQueryCount();
long indexCount = nodeStats.getIndices().getIndexing()
.getTotal().getIndexCount();
double cpuPercent = nodeStats.getOs().getCpu().getPercent();
log.info("节点{}负载 - 查询:{}, 索引:{}, CPU: {}%", nodeStats.getNode().getName(), queryCount, indexCount, cpuPercent);
if (cpuPercent > 80 || queryCount > 10000) {
log.warn("节点{}可能是热点节点", nodeStats.getNode().getName());
}
}
}
}
10. 选型与总结
10.1 ES vs 其他方案对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|
| Elasticsearch | 功能全,生态好,性能优秀 | 资源消耗大,运维复杂 | 全文搜索、日志分析 |
| Solr | 成熟稳定,功能丰富 | 社区活跃度下降,实时性差 | 文档搜索、企业搜索 |
| OpenSearch | ES 开源分支,AWS 支持 | 生态不如 ES | AWS 环境,需要完全开源 |
| MeiliSearch | 轻量快速,简单易用 | 功能相对简单 | 小型应用,简单搜索 |
| PostgreSQL | 事务支持,SQL 查询 | 搜索功能弱,性能差 | 已有 PG,简单搜索需求 |
10.2 最佳实践总结
- 分片设计要合理:单个分片不超过 50GB
- 映射设计要严谨:禁用动态映射,明确字段类型
- 查询要优化:避免 wildcard,善用 filter
- 监控要全面:集群健康、性能指标、业务指标
- 容量要规划:提前规划扩容,设置水位线
- 备份要定期:定期快照,测试恢复
总结
Elasticsearch 是强大的搜索引擎,但不是银弹。理解原理,合理设计,持续监控,才能用好这个强大的工具。
团队在实践中常遇到分片数不合理、查询未优化、缺乏监控导致故障等问题。
记住:ES 是工具,不是魔法。结合业务特点,设计合适方案,做好监控和优化,才是正道。
参考资料
官方文档
源码学习
最佳实践
监控工具
最后建议:从简单场景开始,理解原理后再尝试复杂方案。做好监控,设置合理的分片和副本,定期优化查询。记住:搜索优化是个持续的过程,不是一次性的任务。
相关免费在线工具
- Keycode 信息
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
- Escape 与 Native 编解码
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
- JavaScript / HTML 格式化
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
- JavaScript 压缩与混淆
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- Gemini 图片去水印
基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online