Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions src/main/java/site/holliverse/admin/config/SolapiConfig.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package site.holliverse.admin.config;

import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
Expand All @@ -9,21 +8,18 @@
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;
import site.holliverse.admin.integration.sms.SolapiSmsClient;
import site.holliverse.shared.monitoring.http.ObservedRestTemplateInterceptor;

@Profile("admin")
@Configuration
@EnableConfigurationProperties(SolapiProperties.class)
public class SolapiConfig {

@Bean
public RestTemplate solapiRestTemplate(SolapiProperties properties, MeterRegistry meterRegistry) {
public RestTemplate solapiRestTemplate(SolapiProperties properties) {
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
factory.setConnectTimeout(properties.connectTimeoutMs());
factory.setReadTimeout(properties.readTimeoutMs());
RestTemplate restTemplate = new RestTemplate(factory);
restTemplate.getInterceptors().add(new ObservedRestTemplateInterceptor(meterRegistry, "solapi"));
return restTemplate;
return new RestTemplate(factory);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,54 +2,33 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.f4b6a3.tsid.Tsid;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import site.holliverse.customer.error.CustomerErrorCode;
import site.holliverse.customer.error.CustomerException;

import com.github.f4b6a3.tsid.Tsid;
import site.holliverse.customer.integration.external.AdminLogFeaturesClient;
import site.holliverse.customer.web.dto.log.UserLogRequest;
import site.holliverse.customer.error.CustomerErrorCode;
import site.holliverse.customer.error.CustomerException;
import site.holliverse.shared.monitoring.CustomerMetrics;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Service
@Profile("customer")
@RequiredArgsConstructor
public class UserLogService {

private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
private final AdminLogFeaturesClient adminLogFeaturesClient;
private final CustomerMetrics customerMetrics;
private final MeterRegistry meterRegistry;
private final DistributionSummary batchSizeSummary;
private final Map<String, Counter> requestCounters = new ConcurrentHashMap<>();
private final Map<String, Counter> resultCounters = new ConcurrentHashMap<>();

public UserLogService(KafkaTemplate<String, String> kafkaTemplate,
ObjectMapper objectMapper,
AdminLogFeaturesClient adminLogFeaturesClient,
CustomerMetrics customerMetrics,
MeterRegistry meterRegistry) {
this.kafkaTemplate = kafkaTemplate;
this.objectMapper = objectMapper;
this.adminLogFeaturesClient = adminLogFeaturesClient;
this.customerMetrics = customerMetrics;
this.meterRegistry = meterRegistry;
this.batchSizeSummary = DistributionSummary.builder("holliverse.userlog.batch.size")
.description("Batch size of user log submissions")
.register(meterRegistry);
}

@Value("${app.topic.client-events}")
private String topic;
Expand All @@ -60,22 +39,26 @@ public void publishBatch(Long memberId, List<UserLogRequest> requests) {
return;
}
customerMetrics.recordUserLogBatchSize(requests.size());
batchSizeSummary.record(requests.size());
requestCounter("batch").increment();
for (UserLogRequest request : requests) {
doPublish(memberId, request);
}

// 이벀트 전달
requests.forEach(request -> sendAdminTarget(memberId, request));
}

@Async("userLogTaskExecutor")
public void publish(Long memberId, UserLogRequest request) {
requestCounter("single").increment();
doPublish(memberId, request);

// 이벀트 전달
sendAdminTarget(memberId, request);
}

/**
* 단일 둜그λ₯Ό Kafka둜 전솑. {@link #publish}, {@link #publishBatch}μ—μ„œλ§Œ 호좜.
* self-invocation μ‹œ @Asyncκ°€ μ μš©λ˜μ§€ μ•ŠμœΌλ―€λ‘œ 곡톡 λ‘œμ§μ„ private둜 뢄리함.
*/
private void doPublish(Long memberId, UserLogRequest request) {
UserLogEventName eventName = UserLogEventName.from(request.eventName());

Expand All @@ -84,7 +67,6 @@ private void doPublish(Long memberId, UserLogRequest request) {
eventId = decodeTsidToLong(request.tsid());
} catch (IllegalArgumentException e) {
customerMetrics.recordUserLogPublish(request.eventName(), "invalid_tsid");
resultCounter("invalid_tsid").increment();
throw new CustomerException(CustomerErrorCode.INVALID_USER_LOG_EVENT_ID);
}

Expand All @@ -102,7 +84,6 @@ private void doPublish(Long memberId, UserLogRequest request) {
json = objectMapper.writeValueAsString(payload);
} catch (JsonProcessingException e) {
customerMetrics.recordUserLogPublish(eventName.value(), "serialization_error");
resultCounter("serialization_error").increment();
log.warn("[UserLog] 직렬화 μ‹€νŒ¨ memberId={}", memberId, e);
return;
}
Expand All @@ -111,24 +92,29 @@ private void doPublish(Long memberId, UserLogRequest request) {
.whenComplete((result, ex) -> {
if (ex != null) {
customerMetrics.recordUserLogPublish(eventName.value(), "kafka_error");
resultCounter("kafka_error").increment();
log.warn("[UserLog] Kafka 전솑 μ‹€νŒ¨ memberId={} eventName={}",
memberId, eventName.value(), ex);
return;
}
customerMetrics.recordUserLogPublish(eventName.value(), "success");
resultCounter("kafka_success").increment();
});
}

/**
* ν”„λ‘ νŠΈμ—μ„œ μ „λ‹¬ν•œ TSID λ¬Έμžμ—΄μ„ tsid-creator 라이브러리둜 λ””μ½”λ”©ν•œλ‹€.
*/
private static long decodeTsidToLong(String tsid) {
if (tsid == null || tsid.isBlank()) {
throw new IllegalArgumentException("TSID must not be null or blank");
}
// Tsid.from(...) λ‚΄λΆ€μ—μ„œ ν˜•μ‹Β·κΈΈμ΄Β·μ•ŒνŒŒλ²³ 검증을 μˆ˜ν–‰ν•œλ‹€.
Tsid parsed = Tsid.from(tsid);
return parsed.toLong();
}

/**
* Admin 이벀트 λ³€ν™˜.
*/
private void sendAdminTarget(Long memberId, UserLogRequest request) {
UserLogEventName eventName = UserLogEventName.from(request.eventName());
if (!isAdminTarget(eventName)) {
Expand All @@ -142,25 +128,12 @@ private void sendAdminTarget(Long memberId, UserLogRequest request) {
);
}

/**
* λŒ€μƒ 이벀트.
*/
private boolean isAdminTarget(UserLogEventName eventName) {
return eventName == UserLogEventName.CLICK_COMPARE
|| eventName == UserLogEventName.CLICK_PENALTY
|| eventName == UserLogEventName.CLICK_CHANGE;
}

private Counter requestCounter(String mode) {
return requestCounters.computeIfAbsent(mode, ignored ->
Counter.builder("holliverse.userlog.requests")
.description("User log request count by mode")
.tag("mode", mode)
.register(meterRegistry));
}

private Counter resultCounter(String result) {
return resultCounters.computeIfAbsent(result, ignored ->
Counter.builder("holliverse.userlog.publish")
.description("User log publish result count")
.tag("result", result)
.register(meterRegistry));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package site.holliverse.customer.application.usecase.recommendation;

import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import site.holliverse.customer.integration.kafka.dto.RecommendationMessagePayload;
import site.holliverse.customer.persistence.entity.PersonaRecommendation;
import site.holliverse.customer.persistence.entity.RecommendedProductItem;
import site.holliverse.customer.persistence.repository.PersonaRecommendationRepository;

import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* recommendation-topic λ©”μ‹œμ§€μ˜ 도메인 처리 λ‹΄λ‹Ή.
* - persona_recommendation upsert
* - λŒ€κΈ° 쀑인 Future μ™„λ£Œ
*/
@Service
@Profile("customer")
@RequiredArgsConstructor
public class RecommendationKafkaConsumeUseCase {

private final PersonaRecommendationRepository personaRecommendationRepository;
private final RecommendationPendingFutureRegistry pendingFutureRegistry;

@Transactional
public RecommendationKafkaConsumeResult execute(RecommendationMessagePayload message) {
List<RecommendedProductItem> products = message.recommendedProducts() == null
? Collections.emptyList()
: message.recommendedProducts().stream()
.map(p -> new RecommendedProductItem(
p.rank(),
p.productId(),
p.productName(),
p.productType(),
p.productPrice(),
p.salePrice(),
p.tags() != null ? p.tags() : Collections.emptyList(),
p.reason()
))
.toList();

String cachedText = message.cachedLlmRecommendation() != null ? message.cachedLlmRecommendation() : "";

PersonaRecommendation saved = personaRecommendationRepository
.findById(message.memberId())
.map(entity -> {
entity.updateRecommendation(message.segment(), cachedText, products);
return personaRecommendationRepository.save(entity);
})
.orElseGet(() -> personaRecommendationRepository.save(
PersonaRecommendation.builder()
.memberId(message.memberId())
.segment(message.segment())
.cachedLlmRecommendation(cachedText)
.recommendedProducts(products)
.build()));

String outcome = "stored_without_waiter";
CompletableFuture<RecommendationResult> future = pendingFutureRegistry.remove(message.memberId());
if (future != null) {
future.complete(RecommendationResult.fromEntity(saved, RecommendationResult.RecommendationSource.FASTAPI));
outcome = "completed_pending";
}

return new RecommendationKafkaConsumeResult(
message.memberId(),
message.segment().name(),
products.size(),
saved.getUpdatedAt(),
outcome
);
}

public void completeExceptionally(Long memberId, Exception e) {
if (memberId == null) {
return;
}
CompletableFuture<RecommendationResult> future = pendingFutureRegistry.remove(memberId);
if (future != null) {
future.completeExceptionally(e);
}
}

public record RecommendationKafkaConsumeResult(
Long memberId,
String segment,
int productCount,
Instant updatedAt,
String outcome
) {}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package site.holliverse.customer.application.usecase.recommendation;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -19,12 +16,6 @@ public class RecommendationPendingFutureRegistry {

private final ConcurrentHashMap<Long, CompletableFuture<RecommendationResult>> pending = new ConcurrentHashMap<>();

public RecommendationPendingFutureRegistry(MeterRegistry meterRegistry) {
Gauge.builder("holliverse.recommendation.pending.size", pending, Map::size)
.description("Current number of pending recommendation futures")
.register(meterRegistry);
}

/**
* ν•΄λ‹Ή νšŒμ›μ— λŒ€ν•œ λŒ€κΈ° Futureλ₯Ό λ°˜ν™˜ν•˜κ±°λ‚˜, μ—†μœΌλ©΄ μƒˆλ‘œ 생성해 등둝 ν›„ λ°˜ν™˜.
*/
Expand Down
Loading
Loading