Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import site.holliverse.admin.application.usecase.HandleLogFeatureUseCase;
import site.holliverse.admin.application.usecase.LogFeaturesUseCase;
import site.holliverse.admin.web.dto.log.LogFeatureWebhookRequest;
import site.holliverse.admin.web.dto.log.LogFeaturesRequestDto;

/**
* 실시간 로그 기반 feature customer -> admin 전송 로직
Expand All @@ -20,6 +22,7 @@
@RequiredArgsConstructor
public class InternalLogFeatureController {
private final HandleLogFeatureUseCase useCase;
private final LogFeaturesUseCase batchUseCase;


@PostMapping
Expand All @@ -29,4 +32,11 @@ public ResponseEntity<Void> receive (@RequestBody @Valid LogFeatureWebhookReques
return ResponseEntity.accepted().build();
}

@PostMapping("/batch")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

🛑 아키텍처 규칙 위반 (Blocking Issue)

[위반 규칙]: Admin API Path Structure
[문제 이유]: 관리자(Admin) 엔드포인트는 반드시 /api/{version}/admin/... 형식을 따라야 하며, 버전 세그먼트가 'admin'보다 앞에 위치해야 합니다. 현재 설정된 /internal/v1/log-features/batch는 이 표준을 위반하며, 이는 관리자 API를 전용 보안 전략으로 보호하려는 설계와 충돌합니다.
[해결 제안]: 컨트롤러의 @RequestMapping/api/v1/admin/log-features로 변경하여 아키텍처 표준을 준수하십시오.

References
  1. Admin API endpoints should follow the path structure /api/{version}/admin/..., with the version preceding the 'admin' segment.

public ResponseEntity<Void> receiveBatch(@RequestBody @Valid LogFeaturesRequestDto request) {
batchUseCase.execute(request);

return ResponseEntity.accepted().build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import site.holliverse.customer.integration.external.AdminLogFeaturesClient;
import site.holliverse.customer.persistence.entity.UserLogAdminDispatchOutbox;

import java.util.List;

/**
* admin log-feature 별도 executor로 분리
*/
Expand All @@ -21,29 +23,37 @@ public class AdminLogFeatureDispatchService {
private final UserLogAdminDispatchOutboxStateService stateService;

@Async("adminLogFeatureTaskExecutor")
public void dispatch(Long eventId) {
stateService.get(eventId).ifPresentOrElse(
this::dispatchClaimedRow,
() -> log.warn("[UserLog][Outbox] claimed row not found eventId={}", eventId)
);
}
public void dispatchBatch(List<UserLogAdminDispatchOutbox> rows) {
if (rows == null || rows.isEmpty()) {
return;
}

List<Long> eventIds = rows.stream()
.map(UserLogAdminDispatchOutbox::getEventId)
.toList();

private void dispatchClaimedRow(UserLogAdminDispatchOutbox row) {
try {
var result = adminLogFeaturesClient.sendLogFeature(
row.getMemberId(),
UserLogEventName.from(row.getEventName()),
row.getEventTimestamp().toString()
var result = adminLogFeaturesClient.sendLogFeaturesBatch(
rows.get(0).getMemberId(),
rows.stream()
.map(row -> new AdminLogFeaturesClient.BatchLogEvent(
row.getEventId(),
row.getEventTimestamp().toString(),
row.getEventType(),
row.getEventName(),
row.getPayload().path("event_properties")
))
.toList()
);

if (result.success()) {
stateService.markAcked(row.getEventId());
stateService.markAcked(eventIds);
return;
}

stateService.markRetry(row.getEventId(), result.errorMessage());
stateService.markRetry(eventIds, result.errorMessage());
} catch (Exception e) {
stateService.markRetry(row.getEventId(), e.getClass().getSimpleName() + ":" + e.getMessage());
stateService.markRetry(eventIds, e.getClass().getSimpleName() + ":" + e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@

import java.util.ArrayList;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

@Slf4j
@Service
@Profile("customer")
@RequiredArgsConstructor
public class UserLogAdminDispatchOutboxService {

private static final ZoneId BASE_DATE_ZONE = ZoneId.of("Asia/Seoul");

private final UserLogAdminDispatchOutboxStateService stateService;
private final AdminLogFeatureDispatchService dispatchService;
private final ObjectMapper objectMapper;
Expand Down Expand Up @@ -70,14 +76,17 @@ public void enqueue(Long eventId, Long memberId, UserLogEventName eventName, Use
}

public void dispatchReadyBatch(int batchSize) {
List<Long> claimedIds = stateService.claimReadyBatch(batchSize);
for (Long eventId : claimedIds) {
List<UserLogAdminDispatchOutbox> claimedRows = stateService.claimReadyBatchRows(batchSize);
for (List<UserLogAdminDispatchOutbox> rows : groupDispatchRows(claimedRows).values()) {
List<Long> eventIds = rows.stream()
.map(UserLogAdminDispatchOutbox::getEventId)
.toList();
try {
dispatchService.dispatch(eventId);
customerMetrics.recordAdminLogFeatureDispatch("enqueued");
dispatchService.dispatchBatch(rows);
customerMetrics.recordAdminLogFeatureDispatch("enqueued", "batch");
} catch (TaskRejectedException e) {
customerMetrics.recordAdminLogFeatureDispatch("rejected");
stateService.markRetry(eventId, "dispatch_rejected: " + e.getClass().getSimpleName());
customerMetrics.recordAdminLogFeatureDispatch("rejected", "batch");
stateService.markRetry(eventIds, "dispatch_rejected: " + e.getClass().getSimpleName());
}
}
}
Expand Down Expand Up @@ -121,4 +130,24 @@ private static long decodeTsidToLong(String tsid) {
}
return Tsid.from(tsid).toLong();
}

private Map<DispatchGroupKey, List<UserLogAdminDispatchOutbox>> groupDispatchRows(
List<UserLogAdminDispatchOutbox> rows
) {
Map<DispatchGroupKey, List<UserLogAdminDispatchOutbox>> groups = new LinkedHashMap<>();
for (UserLogAdminDispatchOutbox row : rows) {
DispatchGroupKey key = new DispatchGroupKey(
row.getMemberId(),
row.getEventTimestamp().atZone(BASE_DATE_ZONE).toLocalDate()
);
groups.computeIfAbsent(key, ignored -> new ArrayList<>()).add(row);
}
return groups;
}

private record DispatchGroupKey(
Long memberId,
LocalDate baseDate
) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import java.time.Instant;
import java.util.List;
import java.util.Optional;

@Slf4j
@Service
Expand Down Expand Up @@ -54,43 +53,48 @@ public void store(UserLogAdminDispatchOutbox row) {
}

@Transactional
public List<Long> claimReadyBatch(int batchSize) {
public List<UserLogAdminDispatchOutbox> claimReadyBatchRows(int batchSize) {
List<Long> ids = repository.findReadyEventIdsForUpdate(batchSize);
if (ids.isEmpty()) {
return ids;
return List.of();
}

List<UserLogAdminDispatchOutbox> rows = repository.findAllById(ids);
rows.forEach(UserLogAdminDispatchOutbox::markProcessing);
repository.saveAll(rows);
return ids;
}

@Transactional(readOnly = true)
public Optional<UserLogAdminDispatchOutbox> get(Long eventId) {
return repository.findById(eventId);
return rows;
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void markAcked(Long eventId) {
repository.findById(eventId).ifPresent(row -> {
public void markAcked(List<Long> eventIds) {
if (eventIds == null || eventIds.isEmpty()) {
return;
}

List<UserLogAdminDispatchOutbox> rows = repository.findAllById(eventIds);
rows.forEach(row -> {
row.markAcked();
repository.save(row);
customerMetrics.recordAdminLogFeatureOutbox("acked");
});
repository.saveAll(rows);
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void markRetry(Long eventId, String errorMessage) {
repository.findById(eventId).ifPresent(row -> {
public void markRetry(List<Long> eventIds, String errorMessage) {
if (eventIds == null || eventIds.isEmpty()) {
return;
}

List<UserLogAdminDispatchOutbox> rows = repository.findAllById(eventIds);
rows.forEach(row -> {
if (row.getAttemptCount() + 1 >= maxAttempts) {
row.markDead(errorMessage);
customerMetrics.recordAdminLogFeatureOutbox("dead");
} else {
row.markRetry(errorMessage, Instant.now().plusMillis(retryDelayMs));
customerMetrics.recordAdminLogFeatureOutbox("retry");
}
repository.save(row);
});
repository.saveAll(rows);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
public record AdminLogFeaturesProperties(
String baseUrl,
String logFeaturesPath,
String logFeaturesBatchPath,
int connectTimeoutMs,
int readTimeoutMs
) {
Expand All @@ -19,6 +20,9 @@ public record AdminLogFeaturesProperties(
if (logFeaturesPath == null || logFeaturesPath.isBlank()) {
logFeaturesPath = "/internal/v1/log-features";
}
if (logFeaturesBatchPath == null || logFeaturesBatchPath.isBlank()) {
logFeaturesBatchPath = logFeaturesPath + "/batch";
}
}

public boolean isEnabled() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package site.holliverse.customer.integration.external;

import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
Expand All @@ -11,6 +12,8 @@
import site.holliverse.customer.application.usecase.log.UserLogEventName;
import site.holliverse.shared.monitoring.CustomerMetrics;

import java.util.List;

/**
* Admin API POST /internal/v1/log-features 호출용 클라이언트.
* customer 모듈은 admin 패키지를 의존하지 않고 HTTP만 사용(ArchUnit 준수).
Expand All @@ -21,6 +24,7 @@ public class AdminLogFeaturesClient {
private final RestTemplate restTemplate;
private final String baseUrl;
private final String logFeaturesPath;
private final String logFeaturesBatchPath;
private final CustomerMetrics customerMetrics;

public AdminLogFeaturesClient(
Expand All @@ -32,6 +36,7 @@ public AdminLogFeaturesClient(
String url = properties.baseUrl();
this.baseUrl = url;
this.logFeaturesPath = properties.logFeaturesPath();
this.logFeaturesBatchPath = properties.logFeaturesBatchPath();
this.customerMetrics = customerMetrics;
}

Expand All @@ -44,35 +49,76 @@ public DispatchResult sendLogFeature(long memberId, UserLogEventName eventType,
return DispatchResult.ok();
}

String path = logFeaturesPath;
String url = baseUrl + path;
String url = baseUrl + logFeaturesPath;

LogFeatureRequestBody body = new LogFeatureRequestBody(eventType.value(), memberId, timeStamp);
HttpHeaders headers = new HttpHeaders();

headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<LogFeatureRequestBody> entity = new HttpEntity<>(body, headers);
var timerSample = customerMetrics.startSample();

try {
ResponseEntity<String> response = restTemplate.postForEntity(url, entity, String.class);
ResponseEntity<String> response = restTemplate.postForEntity(
url,
jsonEntity(body),
String.class
);

if (!response.getStatusCode().is2xxSuccessful()) {
customerMetrics.stopAdminLogFeatureDuration(timerSample, "non_2xx");
customerMetrics.stopAdminLogFeatureDuration(timerSample, "non_2xx", "single");
log.warn("[AdminLogFeatures] POST {} memberId={} eventType={} status={}",
url, memberId, eventType.value(), response.getStatusCode());
return DispatchResult.fail("non_2xx:" + response.getStatusCode().value());
}
customerMetrics.stopAdminLogFeatureDuration(timerSample, "success");
customerMetrics.stopAdminLogFeatureDuration(timerSample, "success", "single");
return DispatchResult.ok();
} catch (RestClientException e) {
customerMetrics.stopAdminLogFeatureDuration(timerSample, "error");
customerMetrics.stopAdminLogFeatureDuration(timerSample, "error", "single");
log.warn("[AdminLogFeatures] POST {} memberId={} eventType={} failed",
url, memberId, eventType.value(), e);
return DispatchResult.fail(e.getClass().getSimpleName() + ":" + e.getMessage());
}
}

public DispatchResult sendLogFeaturesBatch(long memberId, List<BatchLogEvent> events) {
if (baseUrl == null || baseUrl.isBlank()) {
return DispatchResult.ok();
}
if (events == null || events.isEmpty()) {
return DispatchResult.ok();
}

String url = baseUrl + logFeaturesBatchPath;
customerMetrics.recordAdminLogFeatureBatchSize(events.size());
var timerSample = customerMetrics.startSample();

try {
ResponseEntity<String> response = restTemplate.postForEntity(
url,
jsonEntity(new BatchLogFeatureRequestBody(memberId, events)),
String.class
);

if (!response.getStatusCode().is2xxSuccessful()) {
customerMetrics.stopAdminLogFeatureDuration(timerSample, "non_2xx", "batch");
log.warn("[AdminLogFeatures][Batch] POST {} memberId={} size={} status={}",
url, memberId, events.size(), response.getStatusCode());
return DispatchResult.fail("non_2xx:" + response.getStatusCode().value());
}

customerMetrics.stopAdminLogFeatureDuration(timerSample, "success", "batch");
return DispatchResult.ok();
} catch (RestClientException e) {
customerMetrics.stopAdminLogFeatureDuration(timerSample, "error", "batch");
log.warn("[AdminLogFeatures][Batch] POST {} memberId={} size={} failed",
url, memberId, events.size(), e);
return DispatchResult.fail(e.getClass().getSimpleName() + ":" + e.getMessage());
}
}

private HttpEntity<?> jsonEntity(Object body) {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
return new HttpEntity<>(body, headers);
}

public record DispatchResult(boolean success, String errorMessage) {
public static DispatchResult ok() {
return new DispatchResult(true, null);
Expand All @@ -92,4 +138,19 @@ public record LogFeatureRequestBody(
String timeStamp
) {
}

public record BatchLogFeatureRequestBody(
long memberId,
List<BatchLogEvent> events
) {
}

public record BatchLogEvent(
long eventId,
String timestamp,
String event,
String eventName,
JsonNode eventProperties
) {
}
}
Loading
Loading