Skip to content
Closed

1.7.3 #273

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions src/main/java/site/holliverse/admin/config/SolapiConfig.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package site.holliverse.admin.config;

import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
Expand All @@ -9,21 +8,18 @@
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;
import site.holliverse.admin.integration.sms.SolapiSmsClient;
import site.holliverse.shared.monitoring.http.ObservedRestTemplateInterceptor;

@Profile("admin")
@Configuration
@EnableConfigurationProperties(SolapiProperties.class)
public class SolapiConfig {

@Bean
public RestTemplate solapiRestTemplate(SolapiProperties properties, MeterRegistry meterRegistry) {
public RestTemplate solapiRestTemplate(SolapiProperties properties) {
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
factory.setConnectTimeout(properties.connectTimeoutMs());
factory.setReadTimeout(properties.readTimeoutMs());
RestTemplate restTemplate = new RestTemplate(factory);
restTemplate.getInterceptors().add(new ObservedRestTemplateInterceptor(meterRegistry, "solapi"));
return restTemplate;
return new RestTemplate(factory);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,54 +2,33 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.f4b6a3.tsid.Tsid;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import site.holliverse.customer.error.CustomerErrorCode;
import site.holliverse.customer.error.CustomerException;

import com.github.f4b6a3.tsid.Tsid;
import site.holliverse.customer.integration.external.AdminLogFeaturesClient;
import site.holliverse.customer.web.dto.log.UserLogRequest;
import site.holliverse.customer.error.CustomerErrorCode;
import site.holliverse.customer.error.CustomerException;
import site.holliverse.shared.monitoring.CustomerMetrics;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Service
@Profile("customer")
@RequiredArgsConstructor
public class UserLogService {

private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
private final AdminLogFeaturesClient adminLogFeaturesClient;
private final CustomerMetrics customerMetrics;
private final MeterRegistry meterRegistry;
private final DistributionSummary batchSizeSummary;
private final Map<String, Counter> requestCounters = new ConcurrentHashMap<>();
private final Map<String, Counter> resultCounters = new ConcurrentHashMap<>();

public UserLogService(KafkaTemplate<String, String> kafkaTemplate,
ObjectMapper objectMapper,
AdminLogFeaturesClient adminLogFeaturesClient,
CustomerMetrics customerMetrics,
MeterRegistry meterRegistry) {
this.kafkaTemplate = kafkaTemplate;
this.objectMapper = objectMapper;
this.adminLogFeaturesClient = adminLogFeaturesClient;
this.customerMetrics = customerMetrics;
this.meterRegistry = meterRegistry;
this.batchSizeSummary = DistributionSummary.builder("holliverse.userlog.batch.size")
.description("Batch size of user log submissions")
.register(meterRegistry);
}

@Value("${app.topic.client-events}")
private String topic;
Expand All @@ -60,22 +39,26 @@ public void publishBatch(Long memberId, List<UserLogRequest> requests) {
return;
}
customerMetrics.recordUserLogBatchSize(requests.size());
batchSizeSummary.record(requests.size());
requestCounter("batch").increment();
for (UserLogRequest request : requests) {
doPublish(memberId, request);
}

// ์ด๋ฒคํŠธ ์ „๋‹ฌ
requests.forEach(request -> sendAdminTarget(memberId, request));
}

@Async("userLogTaskExecutor")
public void publish(Long memberId, UserLogRequest request) {
requestCounter("single").increment();
doPublish(memberId, request);

// ์ด๋ฒคํŠธ ์ „๋‹ฌ
sendAdminTarget(memberId, request);
}

/**
* ๋‹จ์ผ ๋กœ๊ทธ๋ฅผ Kafka๋กœ ์ „์†ก. {@link #publish}, {@link #publishBatch}์—์„œ๋งŒ ํ˜ธ์ถœ.
* self-invocation ์‹œ @Async๊ฐ€ ์ ์šฉ๋˜์ง€ ์•Š์œผ๋ฏ€๋กœ ๊ณตํ†ต ๋กœ์ง์„ private๋กœ ๋ถ„๋ฆฌํ•จ.
*/
private void doPublish(Long memberId, UserLogRequest request) {
UserLogEventName eventName = UserLogEventName.from(request.eventName());

Expand All @@ -84,7 +67,6 @@ private void doPublish(Long memberId, UserLogRequest request) {
eventId = decodeTsidToLong(request.tsid());
} catch (IllegalArgumentException e) {
customerMetrics.recordUserLogPublish(request.eventName(), "invalid_tsid");
resultCounter("invalid_tsid").increment();
throw new CustomerException(CustomerErrorCode.INVALID_USER_LOG_EVENT_ID);
}

Expand All @@ -102,7 +84,6 @@ private void doPublish(Long memberId, UserLogRequest request) {
json = objectMapper.writeValueAsString(payload);
} catch (JsonProcessingException e) {
customerMetrics.recordUserLogPublish(eventName.value(), "serialization_error");
resultCounter("serialization_error").increment();
log.warn("[UserLog] ์ง๋ ฌํ™” ์‹คํŒจ memberId={}", memberId, e);
return;
}
Expand All @@ -111,24 +92,29 @@ private void doPublish(Long memberId, UserLogRequest request) {
.whenComplete((result, ex) -> {
if (ex != null) {
customerMetrics.recordUserLogPublish(eventName.value(), "kafka_error");
resultCounter("kafka_error").increment();
log.warn("[UserLog] Kafka ์ „์†ก ์‹คํŒจ memberId={} eventName={}",
memberId, eventName.value(), ex);
return;
}
customerMetrics.recordUserLogPublish(eventName.value(), "success");
resultCounter("kafka_success").increment();
});
}

/**
* ํ”„๋ก ํŠธ์—์„œ ์ „๋‹ฌํ•œ TSID ๋ฌธ์ž์—ด์„ tsid-creator ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋กœ ๋””์ฝ”๋”ฉํ•œ๋‹ค.
*/
private static long decodeTsidToLong(String tsid) {
if (tsid == null || tsid.isBlank()) {
throw new IllegalArgumentException("TSID must not be null or blank");
}
// Tsid.from(...) ๋‚ด๋ถ€์—์„œ ํ˜•์‹ยท๊ธธ์ดยท์•ŒํŒŒ๋ฒณ ๊ฒ€์ฆ์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.
Tsid parsed = Tsid.from(tsid);
return parsed.toLong();
}

/**
* Admin ์ด๋ฒคํŠธ ๋ณ€ํ™˜.
*/
private void sendAdminTarget(Long memberId, UserLogRequest request) {
UserLogEventName eventName = UserLogEventName.from(request.eventName());
if (!isAdminTarget(eventName)) {
Expand All @@ -142,25 +128,12 @@ private void sendAdminTarget(Long memberId, UserLogRequest request) {
);
}

/**
* ๋Œ€์ƒ ์ด๋ฒคํŠธ.
*/
private boolean isAdminTarget(UserLogEventName eventName) {
return eventName == UserLogEventName.CLICK_COMPARE
|| eventName == UserLogEventName.CLICK_PENALTY
|| eventName == UserLogEventName.CLICK_CHANGE;
}

private Counter requestCounter(String mode) {
return requestCounters.computeIfAbsent(mode, ignored ->
Counter.builder("holliverse.userlog.requests")
.description("User log request count by mode")
.tag("mode", mode)
.register(meterRegistry));
}

private Counter resultCounter(String result) {
return resultCounters.computeIfAbsent(result, ignored ->
Counter.builder("holliverse.userlog.publish")
.description("User log publish result count")
.tag("result", result)
.register(meterRegistry));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package site.holliverse.customer.application.usecase.recommendation;

import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import site.holliverse.customer.integration.kafka.dto.RecommendationMessagePayload;
import site.holliverse.customer.persistence.entity.PersonaRecommendation;
import site.holliverse.customer.persistence.entity.RecommendedProductItem;
import site.holliverse.customer.persistence.repository.PersonaRecommendationRepository;

import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* recommendation-topic ๋ฉ”์‹œ์ง€์˜ ๋„๋ฉ”์ธ ์ฒ˜๋ฆฌ ๋‹ด๋‹น.
* - persona_recommendation upsert
* - ๋Œ€๊ธฐ ์ค‘์ธ Future ์™„๋ฃŒ
*/
@Service
@Profile("customer")
@RequiredArgsConstructor
public class RecommendationKafkaConsumeUseCase {

private final PersonaRecommendationRepository personaRecommendationRepository;
private final RecommendationPendingFutureRegistry pendingFutureRegistry;

@Transactional
public RecommendationKafkaConsumeResult execute(RecommendationMessagePayload message) {
List<RecommendedProductItem> products = message.recommendedProducts() == null
? Collections.emptyList()
: message.recommendedProducts().stream()
.map(p -> new RecommendedProductItem(
p.rank(),
p.productId(),
p.productName(),
p.productType(),
p.productPrice(),
p.salePrice(),
p.tags() != null ? p.tags() : Collections.emptyList(),
p.reason()
))
.toList();

String cachedText = message.cachedLlmRecommendation() != null ? message.cachedLlmRecommendation() : "";

PersonaRecommendation saved = personaRecommendationRepository
.findById(message.memberId())
.map(entity -> {
entity.updateRecommendation(message.segment(), cachedText, products);
return personaRecommendationRepository.save(entity);
})
.orElseGet(() -> personaRecommendationRepository.save(
PersonaRecommendation.builder()
.memberId(message.memberId())
.segment(message.segment())
.cachedLlmRecommendation(cachedText)
.recommendedProducts(products)
.build()));

String outcome = "stored_without_waiter";
CompletableFuture<RecommendationResult> future = pendingFutureRegistry.remove(message.memberId());
if (future != null) {
future.complete(RecommendationResult.fromEntity(saved, RecommendationResult.RecommendationSource.FASTAPI));
outcome = "completed_pending";
}

return new RecommendationKafkaConsumeResult(
message.memberId(),
message.segment().name(),
products.size(),
saved.getUpdatedAt(),
outcome
);
}

public void completeExceptionally(Long memberId, Exception e) {
if (memberId == null) {
return;
}
CompletableFuture<RecommendationResult> future = pendingFutureRegistry.remove(memberId);
if (future != null) {
future.completeExceptionally(e);
}
}

public record RecommendationKafkaConsumeResult(
Long memberId,
String segment,
int productCount,
Instant updatedAt,
String outcome
) {}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package site.holliverse.customer.application.usecase.recommendation;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -19,12 +16,6 @@ public class RecommendationPendingFutureRegistry {

private final ConcurrentHashMap<Long, CompletableFuture<RecommendationResult>> pending = new ConcurrentHashMap<>();

public RecommendationPendingFutureRegistry(MeterRegistry meterRegistry) {
Gauge.builder("holliverse.recommendation.pending.size", pending, Map::size)
.description("Current number of pending recommendation futures")
.register(meterRegistry);
}

/**
* ํ•ด๋‹น ํšŒ์›์— ๋Œ€ํ•œ ๋Œ€๊ธฐ Future๋ฅผ ๋ฐ˜ํ™˜ํ•˜๊ฑฐ๋‚˜, ์—†์œผ๋ฉด ์ƒˆ๋กœ ์ƒ์„ฑํ•ด ๋“ฑ๋ก ํ›„ ๋ฐ˜ํ™˜.
*/
Expand Down
Loading
Loading