Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,23 +1,49 @@
package site.holliverse.customer.application.usecase.log;

import lombok.extern.slf4j.Slf4j;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import site.holliverse.customer.integration.external.AdminLogFeaturesClient;
import site.holliverse.customer.persistence.entity.UserLogAdminDispatchOutbox;

/**
* admin log-feature 별도 executor로 분리
*/
@Slf4j
@Service
@Profile("customer")
@RequiredArgsConstructor
public class AdminLogFeatureDispatchService {

private final AdminLogFeaturesClient adminLogFeaturesClient;
private final UserLogAdminDispatchOutboxStateService stateService;

@Async("adminLogFeatureTaskExecutor")
public void dispatch(Long memberId, UserLogEventName eventName, String timestamp) {
adminLogFeaturesClient.sendLogFeature(memberId, eventName, timestamp);
public void dispatch(Long eventId) {
stateService.get(eventId).ifPresentOrElse(
this::dispatchClaimedRow,
() -> log.warn("[UserLog][Outbox] claimed row not found eventId={}", eventId)
);
}

private void dispatchClaimedRow(UserLogAdminDispatchOutbox row) {
try {
var result = adminLogFeaturesClient.sendLogFeature(
row.getMemberId(),
UserLogEventName.from(row.getEventName()),
row.getEventTimestamp().toString()
);

if (result.success()) {
stateService.markAcked(row.getEventId());
return;
}

stateService.markRetry(row.getEventId(), result.errorMessage());
} catch (Exception e) {
stateService.markRetry(row.getEventId(), e.getClass().getSimpleName() + ":" + e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package site.holliverse.customer.application.usecase.log;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Profile;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import site.holliverse.customer.persistence.entity.UserLogAdminDispatchOutbox;
import site.holliverse.customer.persistence.entity.UserLogDispatchStatus;
import site.holliverse.customer.persistence.repository.UserLogAdminDispatchOutboxRepository;
import site.holliverse.customer.web.dto.log.UserLogRequest;
import site.holliverse.shared.monitoring.CustomerMetrics;

import java.time.Instant;
import java.util.List;

@Slf4j
@Service
@Profile("customer")
@RequiredArgsConstructor
public class UserLogAdminDispatchOutboxService {

private final UserLogAdminDispatchOutboxRepository repository;
private final UserLogAdminDispatchOutboxStateService stateService;
private final AdminLogFeatureDispatchService dispatchService;
private final ObjectMapper objectMapper;
private final CustomerMetrics customerMetrics;

@Transactional
public void enqueue(Long eventId, Long memberId, UserLogEventName eventName, UserLogRequest request) {
try {
UserLogPayload payload = new UserLogPayload(
eventId,
request.timestamp(),
request.event(),
eventName.value(),
memberId,
request.eventProperties()
);

repository.saveAndFlush(UserLogAdminDispatchOutbox.builder()
.eventId(eventId)
.memberId(memberId)
.eventName(eventName.value())
.eventType(request.event())
.eventTimestamp(Instant.parse(request.timestamp()))
.payload(objectMapper.valueToTree(payload))
.status(UserLogDispatchStatus.READY)
.attemptCount(0)
.build());
customerMetrics.recordAdminLogFeatureOutbox("stored");
} catch (DataIntegrityViolationException e) {
customerMetrics.recordAdminLogFeatureOutbox("duplicate");
} catch (Exception e) {
customerMetrics.recordAdminLogFeatureOutbox("store_error");
}
Comment on lines +57 to +59
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

모든 예외(Exception)를 catch하면서 메트릭만 기록하고 에러 로그를 남기지 않고 있습니다. DB 연결 오류나 직렬화 실패 등 예기치 못한 장애 발생 시 원인 파악이 매우 어려울 수 있습니다. 최소한 에러 메시지와 스택 트레이스를 로깅하도록 개선이 필요합니다.

        } catch (Exception e) {
            log.error("[UserLog][Outbox] Failed to enqueue admin dispatch outbox eventId={}", eventId, e);
            customerMetrics.recordAdminLogFeatureOutbox("store_error");
        }

}

public void dispatchReadyBatch(int batchSize) {
List<Long> claimedIds = stateService.claimReadyBatch(batchSize);
for (Long eventId : claimedIds) {
try {
dispatchService.dispatch(eventId);
customerMetrics.recordAdminLogFeatureDispatch("enqueued");
} catch (TaskRejectedException e) {
customerMetrics.recordAdminLogFeatureDispatch("rejected");
stateService.markRetry(eventId, "dispatch_rejected: " + e.getClass().getSimpleName());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package site.holliverse.customer.application.usecase.log;

import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import site.holliverse.customer.persistence.entity.UserLogAdminDispatchOutbox;
import site.holliverse.customer.persistence.repository.UserLogAdminDispatchOutboxRepository;
import site.holliverse.shared.monitoring.CustomerMetrics;

import java.time.Instant;
import java.util.List;
import java.util.Optional;

@Service
@Profile("customer")
@RequiredArgsConstructor
public class UserLogAdminDispatchOutboxStateService {

private final UserLogAdminDispatchOutboxRepository repository;
private final CustomerMetrics customerMetrics;

@Value("${app.userlog.admin-dispatch.retry-delay-ms:10000}")
private long retryDelayMs;

@Value("${app.userlog.admin-dispatch.max-attempts:5}")
private int maxAttempts;

@Transactional
public List<Long> claimReadyBatch(int batchSize) {
List<Long> ids = repository.findReadyEventIdsForUpdate(batchSize);
if (ids.isEmpty()) {
return ids;
}

List<UserLogAdminDispatchOutbox> rows = repository.findAllById(ids);
rows.forEach(UserLogAdminDispatchOutbox::markProcessing);
repository.saveAll(rows);
Comment on lines +33 to +40
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

현재 claimReadyBatch 로직은 ID 조회 후 다시 전체 엔티티를 조회(findAllById)하고 상태를 변경하여 저장하는 3단계 DB 라운드트립이 발생합니다. 성능 최적화를 위해 네이티브 쿼리에서 엔티티 리스트를 직접 반환받거나, 벌크 업데이트 쿼리를 사용하는 방안을 권장합니다.

return ids;
}

@Transactional(readOnly = true)
public Optional<UserLogAdminDispatchOutbox> get(Long eventId) {
return repository.findById(eventId);
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void markAcked(Long eventId) {
repository.findById(eventId).ifPresent(row -> {
row.markAcked();
repository.save(row);
customerMetrics.recordAdminLogFeatureOutbox("acked");
});
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void markRetry(Long eventId, String errorMessage) {
repository.findById(eventId).ifPresent(row -> {
if (row.getAttemptCount() + 1 >= maxAttempts) {
row.markDead(errorMessage);
customerMetrics.recordAdminLogFeatureOutbox("dead");
} else {
row.markRetry(errorMessage, Instant.now().plusMillis(retryDelayMs));
customerMetrics.recordAdminLogFeatureOutbox("retry");
}
repository.save(row);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package site.holliverse.customer.application.usecase.log;

import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
@Profile("customer")
@RequiredArgsConstructor
public class UserLogAdminDispatchScheduler {

private final UserLogAdminDispatchOutboxService outboxService;

@Value("${app.userlog.admin-dispatch.batch-size:100}")
private int batchSize;

@Scheduled(fixedDelayString = "${app.userlog.admin-dispatch.fixed-delay-ms:1000}")
public void dispatch() {
outboxService.dispatchReadyBatch(batchSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.stereotype.Service;

import com.github.f4b6a3.tsid.Tsid;
Expand All @@ -27,7 +26,7 @@ public class UserLogService {

private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
private final AdminLogFeatureDispatchService adminLogFeatureDispatchService;
private final UserLogAdminDispatchOutboxService userLogAdminDispatchOutboxService;
private final CustomerMetrics customerMetrics;

@Value("${app.topic.client-events}")
Expand Down Expand Up @@ -122,15 +121,15 @@ private void sendAdminTarget(Long memberId, UserLogRequest request) {
}

try {
adminLogFeatureDispatchService.dispatch(
long eventId = decodeTsidToLong(request.tsid());
userLogAdminDispatchOutboxService.enqueue(
eventId,
memberId,
eventName,
request.timestamp()
request
);
customerMetrics.recordAdminLogFeatureDispatch("enqueued");
} catch (TaskRejectedException e) {
log.warn("[UserLog] Admin log-feature dispatch rejected. memberId={}, eventName={}", memberId, eventName);
customerMetrics.recordAdminLogFeatureDispatch("rejected");
} catch (IllegalArgumentException e) {
customerMetrics.recordAdminLogFeatureOutbox("invalid_tsid");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@ public AdminLogFeaturesClient(

/**
* log-features API 호출. baseUrl이 비어 있으면 호출하지 않음(no-op).
* 실패 시 로깅만 하고 예외 전파하지 않음(배치 처리 방해 방지).
* 실패 시 결과 객체로 반환하고 예외 전파하지 않음(재시도 경로 위임).
*/
public void sendLogFeature(long memberId, UserLogEventName eventType, String timeStamp) {
public DispatchResult sendLogFeature(long memberId, UserLogEventName eventType, String timeStamp) {
if (baseUrl == null || baseUrl.isBlank()) {
return DispatchResult.ok();
}

String path = logFeaturesPath;
String url = baseUrl + path;
Expand All @@ -58,13 +61,25 @@ public void sendLogFeature(long memberId, UserLogEventName eventType, String tim
customerMetrics.stopAdminLogFeatureDuration(timerSample, "non_2xx");
log.warn("[AdminLogFeatures] POST {} memberId={} eventType={} status={}",
url, memberId, eventType.value(), response.getStatusCode());
return;
return DispatchResult.fail("non_2xx:" + response.getStatusCode().value());
}
customerMetrics.stopAdminLogFeatureDuration(timerSample, "success");
return DispatchResult.ok();
} catch (RestClientException e) {
customerMetrics.stopAdminLogFeatureDuration(timerSample, "error");
log.warn("[AdminLogFeatures] POST {} memberId={} eventType={} failed",
url, memberId, eventType.value(), e);
return DispatchResult.fail(e.getClass().getSimpleName() + ":" + e.getMessage());
}
}

public record DispatchResult(boolean success, String errorMessage) {
public static DispatchResult ok() {
return new DispatchResult(true, null);
}

public static DispatchResult fail(String errorMessage) {
return new DispatchResult(false, errorMessage);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package site.holliverse.customer.persistence.entity;

import com.fasterxml.jackson.databind.JsonNode;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.EnumType;
import jakarta.persistence.Enumerated;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.hibernate.annotations.CreationTimestamp;
import org.hibernate.annotations.JdbcTypeCode;
import org.hibernate.annotations.UpdateTimestamp;
import org.hibernate.type.SqlTypes;
import site.holliverse.shared.persistence.BaseEntity;

import java.time.Instant;

@Entity
@Table(name = "user_log_admin_dispatch_outbox")
@Getter
@Builder
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class UserLogAdminDispatchOutbox extends BaseEntity {

@Id
@Column(name = "event_id", nullable = false)
private Long eventId;

@Column(name = "member_id", nullable = false)
private Long memberId;

@Column(name = "event_name", nullable = false, length = 200)
private String eventName;

@Column(name = "event_type", nullable = false, length = 100)
private String eventType;

@Column(name = "event_timestamp", nullable = false)
private Instant eventTimestamp;

@JdbcTypeCode(SqlTypes.JSON)
@Column(name = "payload", nullable = false, columnDefinition = "jsonb")
private JsonNode payload;

@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false, length = 20)
private UserLogDispatchStatus status;

@Column(name = "attempt_count", nullable = false)
private int attemptCount;

@Column(name = "next_retry_at")
private Instant nextRetryAt;

@Column(name = "last_error", columnDefinition = "TEXT")
private String lastError;

public void markProcessing() {
this.status = UserLogDispatchStatus.PROCESSING;
this.lastError = null;
}

public void markAcked() {
this.status = UserLogDispatchStatus.ACKED;
this.nextRetryAt = null;
this.lastError = null;
}

public void markRetry(String errorMessage, Instant nextRetryAt) {
this.status = UserLogDispatchStatus.RETRY;
this.attemptCount += 1;
this.nextRetryAt = nextRetryAt;
this.lastError = errorMessage;
}

public void markDead(String errorMessage) {
this.status = UserLogDispatchStatus.DEAD;
this.attemptCount += 1;
this.nextRetryAt = null;
this.lastError = errorMessage;
}
}
Loading
Loading