Skip to content

Commit f72235b

Browse files
committed
refactor: ♻️ 对es服务中的实现进行一定的优化
1 parent 36a446b commit f72235b

File tree

5 files changed

+224
-199
lines changed

5 files changed

+224
-199
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
package com.stephen.cloud.common.elasticsearch.sync;
2+
3+
import co.elastic.clients.elasticsearch.ElasticsearchClient;
4+
import co.elastic.clients.elasticsearch._types.Refresh;
5+
import co.elastic.clients.elasticsearch.core.BulkRequest;
6+
import co.elastic.clients.elasticsearch.core.BulkResponse;
7+
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
8+
import com.fasterxml.jackson.databind.DeserializationFeature;
9+
import com.fasterxml.jackson.databind.JavaType;
10+
import com.fasterxml.jackson.databind.JsonNode;
11+
import com.fasterxml.jackson.databind.ObjectMapper;
12+
import jakarta.annotation.Resource;
13+
import lombok.extern.slf4j.Slf4j;
14+
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
15+
import org.springframework.stereotype.Service;
16+
17+
import java.io.IOException;
18+
import java.util.List;
19+
20+
@Slf4j
21+
@Service
22+
@ConditionalOnBean(ElasticsearchClient.class)
23+
public class ElasticsearchIndexSyncService {
24+
25+
private static final int BULK_BATCH_SIZE = 500;
26+
27+
private final ObjectMapper objectMapper = new ObjectMapper()
28+
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
29+
30+
@Resource
31+
private ElasticsearchClient elasticsearchClient;
32+
33+
public void syncSingle(String index, Class<?> clazz, String operation, Long dataId, String dataContent)
34+
throws IOException {
35+
String op = normalizeOperation(operation);
36+
37+
if ("delete".equals(op)) {
38+
elasticsearchClient.delete(d -> d.index(index).id(String.valueOf(dataId)));
39+
log.info("[EsIndexSync] ES 删除成功, index: {}, id: {}", index, dataId);
40+
return;
41+
}
42+
if (!("upsert".equals(op) || "create".equals(op))) {
43+
log.debug("[EsIndexSync] 忽略的操作类型: {}", op);
44+
return;
45+
}
46+
47+
JsonNode root = objectMapper.readTree(dataContent);
48+
if (isMarkedDeleted(root)) {
49+
elasticsearchClient.delete(d -> d.index(index).id(String.valueOf(dataId)));
50+
log.info("[EsIndexSync] 业务数据标记为已逻辑删除,同步执行 ES 物理删除: id={}", dataId);
51+
return;
52+
}
53+
Object dto = objectMapper.treeToValue(root, clazz);
54+
elasticsearchClient.index(i -> i.index(index).id(String.valueOf(dataId)).document(dto));
55+
log.info("[EsIndexSync] ES 同步索引成功: index={}, id={}", index, dataId);
56+
}
57+
58+
public void syncBatch(String index, Class<?> clazz, String operation, List<String> dataContentList)
59+
throws IOException {
60+
if (dataContentList == null || dataContentList.isEmpty()) {
61+
log.debug("[EsIndexSync] 批量同步数据项为空,跳过");
62+
return;
63+
}
64+
boolean isDelete = "delete".equals(normalizeOperation(operation));
65+
int batchSize = Math.max(1, BULK_BATCH_SIZE);
66+
67+
BulkRequest.Builder br = newBulkRequestBuilder();
68+
int inChunk = 0;
69+
long totalLines = dataContentList.size();
70+
71+
for (String content : dataContentList) {
72+
boolean added = appendBulkLine(br, index, clazz, content, isDelete);
73+
if (!added) {
74+
continue;
75+
}
76+
inChunk++;
77+
if (inChunk >= batchSize) {
78+
flushBulk(br, index, inChunk);
79+
br = newBulkRequestBuilder();
80+
inChunk = 0;
81+
}
82+
}
83+
if (inChunk > 0) {
84+
flushBulk(br, index, inChunk);
85+
}
86+
log.info("[EsIndexSync] 批量同步处理完成: index={}, lines={}", index, totalLines);
87+
}
88+
89+
private BulkRequest.Builder newBulkRequestBuilder() {
90+
return new BulkRequest.Builder().refresh(Refresh.False);
91+
}
92+
93+
private boolean appendBulkLine(BulkRequest.Builder br, String index, Class<?> clazz, String content,
94+
boolean isDelete) throws IOException {
95+
JsonNode root = objectMapper.readTree(content);
96+
JsonNode idNode = root.get("id");
97+
if (idNode == null || idNode.isNull()) {
98+
return false;
99+
}
100+
long idLong = idNode.isNumber() ? idNode.longValue() : Long.parseLong(idNode.asText());
101+
String idStr = String.valueOf(idLong);
102+
103+
if (isDelete) {
104+
br.operations(op -> op.delete(d -> d.index(index).id(idStr)));
105+
return true;
106+
}
107+
108+
if (isMarkedDeleted(root)) {
109+
br.operations(op -> op.delete(d -> d.index(index).id(idStr)));
110+
return true;
111+
}
112+
113+
JavaType javaType = objectMapper.getTypeFactory().constructType(clazz);
114+
Object dto = objectMapper.readValue(content, javaType);
115+
br.operations(op -> op.index(i -> i.index(index).id(idStr).document(dto)));
116+
return true;
117+
}
118+
119+
private static boolean isMarkedDeleted(JsonNode root) {
120+
JsonNode n = root.get("isDelete");
121+
if (n == null || n.isNull()) {
122+
return false;
123+
}
124+
if (n.isNumber()) {
125+
return n.intValue() == 1;
126+
}
127+
if (n.isTextual()) {
128+
return "1".equals(n.asText());
129+
}
130+
return false;
131+
}
132+
133+
private void flushBulk(BulkRequest.Builder br, String index, int opCount) throws IOException {
134+
BulkRequest request = br.build();
135+
if (request.operations() == null || request.operations().isEmpty()) {
136+
return;
137+
}
138+
BulkResponse result = elasticsearchClient.bulk(request);
139+
if (result.errors()) {
140+
log.error("[EsIndexSync] 批量同步存在失败项, index={}", index);
141+
logBulkFailures(result);
142+
} else {
143+
log.info("[EsIndexSync] 批量同步子批次成功: index={}, ops={}", index, opCount);
144+
}
145+
}
146+
147+
private void logBulkFailures(BulkResponse result) {
148+
for (BulkResponseItem item : result.items()) {
149+
if (item.error() != null) {
150+
log.error("[EsIndexSync] 失败项: id={}, reason={}", item.id(), item.error().reason());
151+
} else if (item.status() >= 400) {
152+
log.error("[EsIndexSync] 失败项: id={}, status={}", item.id(), item.status());
153+
}
154+
}
155+
}
156+
157+
private static String normalizeOperation(String operation) {
158+
if (operation == null) {
159+
return "";
160+
}
161+
return operation.trim().toLowerCase();
162+
}
163+
}

algorithm-service/algorithm-ai-service/src/main/java/com/stephen/cloud/ai/service/impl/KeywordSearchServiceImpl.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.stephen.cloud.ai.knowledge.retrieval.ElasticsearchFilterExpressionConverter;
1212
import com.stephen.cloud.ai.knowledge.retrieval.RagDocumentHelper;
1313
import com.stephen.cloud.ai.service.KeywordSearchService;
14+
import com.stephen.cloud.api.search.constant.EsIndexConstant;
1415
import jakarta.annotation.Resource;
1516
import lombok.extern.slf4j.Slf4j;
1617
import org.apache.commons.lang3.StringUtils;
@@ -24,6 +25,7 @@
2425
import java.util.List;
2526
import java.util.Map;
2627

28+
/** chunk 索引 BM25,字段与索引名同 ChunkEsDTO / EsIndexConstant(与 search 写入一致)。 */
2729
@Service
2830
@Slf4j
2931
public class KeywordSearchServiceImpl implements KeywordSearchService {
@@ -48,13 +50,12 @@ public List<Document> bm25Search(String query, Integer topK, Filter.Expression f
4850
int finalTopK = topK == null || topK <= 0 ? defaultTopK : topK;
4951

5052
BoolQuery.Builder boolBuilder = new BoolQuery.Builder();
53+
boolBuilder.filter(f -> f.term(t -> t.field("isDelete").value(0)));
5154
boolBuilder.must(m -> m.multiMatch(mm -> mm
52-
.fields("content^3", "metadata.sectionTitle^2.5", "metadata.sectionPath^1.5", "metadata.documentName^2")
55+
.fields("content")
5356
.query(query)
5457
.type(co.elastic.clients.elasticsearch._types.query_dsl.TextQueryType.BestFields)
5558
));
56-
57-
// 应用 Spring AI 表达式过滤
5859
if (filterExpression != null) {
5960
Query filterQuery = filterConverter.convert(filterExpression);
6061
if (filterQuery != null) {
@@ -63,7 +64,7 @@ public List<Document> bm25Search(String query, Integer topK, Filter.Expression f
6364
}
6465

6566
SearchRequest searchRequest = new SearchRequest.Builder()
66-
.index(ragRetrievalProperties.getIndexName())
67+
.index(EsIndexConstant.CHUNK_INDEX)
6768
.query(boolBuilder.build()._toQuery())
6869
.size(finalTopK)
6970
.build();
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.stephen.cloud.search.elasticsearch;
2+
3+
import com.stephen.cloud.api.search.constant.EsIndexConstant;
4+
import com.stephen.cloud.api.search.model.entity.ChunkEsDTO;
5+
import com.stephen.cloud.api.search.model.entity.PostEsDTO;
6+
import com.stephen.cloud.api.search.model.entity.UserEsDTO;
7+
import com.stephen.cloud.common.rabbitmq.enums.EsSyncDataTypeEnum;
8+
9+
import java.util.HashMap;
10+
import java.util.Map;
11+
12+
/**
13+
* ES 同步消息在消费端的 dataType 与索引名、文档 DTO 类型映射。
14+
*
15+
* @author StephenQiu30
16+
*/
17+
public final class EsSyncDocumentTypes {
18+
19+
private static final Map<EsSyncDataTypeEnum, Class<?>> DATA_TYPE_CLASS_MAP = new HashMap<>();
20+
21+
static {
22+
DATA_TYPE_CLASS_MAP.put(EsSyncDataTypeEnum.POST, PostEsDTO.class);
23+
DATA_TYPE_CLASS_MAP.put(EsSyncDataTypeEnum.USER, UserEsDTO.class);
24+
DATA_TYPE_CLASS_MAP.put(EsSyncDataTypeEnum.CHUNK, ChunkEsDTO.class);
25+
}
26+
27+
private EsSyncDocumentTypes() {
28+
}
29+
30+
public static String indexOf(EsSyncDataTypeEnum type) {
31+
if (type == null) {
32+
return null;
33+
}
34+
return switch (type) {
35+
case POST -> EsIndexConstant.POST_INDEX;
36+
case USER -> EsIndexConstant.USER_INDEX;
37+
case CHUNK -> EsIndexConstant.CHUNK_INDEX;
38+
};
39+
}
40+
41+
public static Class<?> classOf(EsSyncDataTypeEnum type) {
42+
return type == null ? null : DATA_TYPE_CLASS_MAP.get(type);
43+
}
44+
}

algorithm-service/algorithm-search-service/src/main/java/com/stephen/cloud/search/mq/handler/EsSyncBatchHandler.java

Lines changed: 6 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,24 @@
11
package com.stephen.cloud.search.mq.handler;
22

3-
import cn.hutool.core.collection.CollUtil;
4-
import cn.hutool.json.JSONObject;
5-
import cn.hutool.json.JSONUtil;
6-
import co.elastic.clients.elasticsearch.ElasticsearchClient;
7-
import co.elastic.clients.elasticsearch.core.BulkRequest;
8-
import co.elastic.clients.elasticsearch.core.BulkResponse;
9-
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
10-
import com.stephen.cloud.api.search.constant.EsIndexConstant;
11-
import com.stephen.cloud.api.search.model.entity.BaseEsDTO;
12-
import com.stephen.cloud.api.search.model.entity.PostEsDTO;
13-
import com.stephen.cloud.api.search.model.entity.UserEsDTO;
14-
import com.stephen.cloud.api.search.model.entity.ChunkEsDTO;
3+
import com.stephen.cloud.common.elasticsearch.sync.ElasticsearchIndexSyncService;
154
import com.stephen.cloud.common.rabbitmq.consumer.RabbitMqDedupeLock;
165
import com.stephen.cloud.common.rabbitmq.consumer.RabbitMqHandler;
176
import com.stephen.cloud.common.rabbitmq.enums.EsSyncDataTypeEnum;
187
import com.stephen.cloud.common.rabbitmq.enums.MqBizTypeEnum;
198
import com.stephen.cloud.common.rabbitmq.model.EsSyncBatchMessage;
209
import com.stephen.cloud.common.rabbitmq.model.RabbitMessage;
10+
import com.stephen.cloud.search.elasticsearch.EsSyncDocumentTypes;
2111
import jakarta.annotation.Resource;
2212
import lombok.extern.slf4j.Slf4j;
2313
import org.springframework.stereotype.Component;
2414

25-
import java.io.IOException;
26-
import java.util.HashMap;
27-
import java.util.List;
28-
import java.util.Map;
29-
30-
/**
31-
* ES 批量数据同步处理器
32-
* <p>
33-
* 针对大规模业务变更(如全量重索引),利用 Elasticsearch Bulk API 开启批处理优化。
34-
* 通过 {@link RabbitMqDedupeLock} 锁定批次 MsgId,防范网络抖动在重试期间产生的重复写入压力。
35-
* </p>
36-
*
37-
* @author StephenQiu30
38-
*/
3915
@Slf4j
4016
@Component
4117
@RabbitMqDedupeLock(prefix = "mq:es:sync:batch")
4218
public class EsSyncBatchHandler implements RabbitMqHandler<EsSyncBatchMessage> {
4319

4420
@Resource
45-
private ElasticsearchClient elasticsearchClient;
46-
47-
/**
48-
* 数据类型与实体 DTO 的静态映射表
49-
*/
50-
private static final Map<EsSyncDataTypeEnum, Class<?>> DATA_TYPE_CLASS_MAP = new HashMap<>();
51-
52-
static {
53-
DATA_TYPE_CLASS_MAP.put(EsSyncDataTypeEnum.POST, PostEsDTO.class);
54-
DATA_TYPE_CLASS_MAP.put(EsSyncDataTypeEnum.USER, UserEsDTO.class);
55-
DATA_TYPE_CLASS_MAP.put(EsSyncDataTypeEnum.CHUNK, ChunkEsDTO.class);
56-
}
21+
private ElasticsearchIndexSyncService elasticsearchIndexSyncService;
5722

5823
@Override
5924
public String getBizType() {
@@ -67,82 +32,13 @@ public void onMessage(EsSyncBatchMessage msg, RabbitMessage rabbitMessage) throw
6732
log.warn("[EsSyncBatchHandler] 收到未定义的数据类型: {}", msg.getDataType());
6833
return;
6934
}
70-
71-
Class<?> clazz = DATA_TYPE_CLASS_MAP.get(dataTypeEnum);
72-
String index = getIndexByType(dataTypeEnum);
73-
35+
Class<?> clazz = EsSyncDocumentTypes.classOf(dataTypeEnum);
36+
String index = EsSyncDocumentTypes.indexOf(dataTypeEnum);
7437
if (clazz == null || index == null) {
7538
log.warn("[EsSyncBatchHandler] 处理器暂未支持该数据类型同步: {}", msg.getDataType());
7639
return;
7740
}
78-
79-
List<String> dataList = msg.getDataContentList();
80-
if (CollUtil.isEmpty(dataList)) {
81-
log.debug("[EsSyncBatchHandler] 批量同步数据项为空,跳过");
82-
return;
83-
}
84-
85-
processBatchSync(index, clazz, msg.getOperation(), dataList);
86-
}
87-
88-
/**
89-
* 构建并执行 Bulk 请求逻辑。
90-
*
91-
* @param index 索引名称
92-
* @param clazz DTO 类型
93-
* @param operation 操作类型 (delete/upsert/create)
94-
* @param dataList 待同步的数据 JSON 列表
95-
* @throws IOException ES 传输异常
96-
*/
97-
private void processBatchSync(String index, Class<?> clazz, String operation, List<String> dataList)
98-
throws IOException {
99-
boolean isDelete = "delete".equals(operation);
100-
BulkRequest.Builder br = new BulkRequest.Builder();
101-
102-
for (String content : dataList) {
103-
JSONObject jsonObject = JSONUtil.parseObj(content);
104-
Long id = jsonObject.getLong("id");
105-
if (id == null)
106-
continue;
107-
108-
if (isDelete) {
109-
br.operations(op -> op.delete(d -> d.index(index).id(String.valueOf(id))));
110-
} else {
111-
Object dto = JSONUtil.toBean(content, clazz);
112-
// 物理删除兜底:逻辑删除的数据不应停留在索引中
113-
if (dto instanceof BaseEsDTO baseEsDTO && Integer.valueOf(1).equals(baseEsDTO.getIsDelete())) {
114-
br.operations(op -> op.delete(d -> d.index(index).id(String.valueOf(id))));
115-
} else {
116-
br.operations(op -> op.index(i -> i.index(index).id(String.valueOf(id)).document(dto)));
117-
}
118-
}
119-
}
120-
121-
BulkResponse result = elasticsearchClient.bulk(br.build());
122-
if (result.errors()) {
123-
log.error("[EsSyncBatchHandler] 批量同步执行完毕,但存在失败项");
124-
for (BulkResponseItem item : result.items()) {
125-
if (item.error() != null) {
126-
log.error("[EsSyncBatchHandler] 失败项: id={}, reason={}", item.id(), item.error().reason());
127-
} else if (item.status() >= 400) {
128-
log.error("[EsSyncBatchHandler] 失败项: id={}, status={}", item.id(), item.status());
129-
}
130-
}
131-
} else {
132-
log.info("[EsSyncBatchHandler] 批量同步执行成功: index={}, count={}", index, dataList.size());
133-
}
134-
}
135-
136-
/**
137-
* 索引名称动态路由转发
138-
*/
139-
private String getIndexByType(EsSyncDataTypeEnum type) {
140-
return switch (type) {
141-
case POST -> EsIndexConstant.POST_INDEX;
142-
case USER -> EsIndexConstant.USER_INDEX;
143-
case CHUNK -> EsIndexConstant.CHUNK_INDEX;
144-
default -> null;
145-
};
41+
elasticsearchIndexSyncService.syncBatch(index, clazz, msg.getOperation(), msg.getDataContentList());
14642
}
14743

14844
@Override

0 commit comments

Comments
 (0)