Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import com.project.notification.consumer.UsageNotificationEvent;
import com.project.notification.consumer.NotificationEvent;
import com.project.notification.service.MessageSendService;

import lombok.RequiredArgsConstructor;
Expand All @@ -34,11 +34,11 @@ public String sendTest(@RequestBody Map<String, Object> request) {
Map<String, Object> variables = (Map<String, Object>) request.get("variables");

// 2. 가짜 이벤트 객체(UsageNotificationEvent) 생성
UsageNotificationEvent event =
new UsageNotificationEvent(
NotificationEvent event =
new NotificationEvent(
UUID.randomUUID(), // 임의의 Event ID 생성
templateGroupId,
new UsageNotificationEvent.SubscriptionInfo(subId, phoneNumber, email),
new NotificationEvent.SubscriptionInfo(subId, phoneNumber, email),
variables);

log.info("[TEST TRIGGER] subId={}, groupId={}", subId, templateGroupId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.project.notification.consumer;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.project.notification.service.MessageSendService;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
@RequiredArgsConstructor
public class InvoiceConsumer {

private final NotificationSendDedupService dedupService;
private final ObjectMapper objectMapper;
private final MessageSendService messageSendService;
private final Counter kafkaBatchProcessedCounter;
private final Timer kafkaBatchProcessingTimer;

@KafkaListener(
topics = "invoice",
containerFactory = "kafkaListenerContainerFactory",
autoStartup = "true")
public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {

final String threadName = Thread.currentThread().getName();
final int batchSize = records.size();

log.info("[BATCH START] thread={}, records={}", threadName, batchSize);

long batchStart = System.currentTimeMillis();

List<NotificationEvent> events = new ArrayList<>(batchSize);
List<String> eventIds = new ArrayList<>(batchSize);

for (ConsumerRecord<String, String> record : records) {
try {
NotificationEvent event =
objectMapper.readValue(record.value(), NotificationEvent.class);
events.add(event);
eventIds.add(event.eventId().toString());
} catch (Exception e) {
log.warn("[DESERIALIZE FAIL] offset={}", record.offset(), e);
}
}

List<Boolean> dedupResults = dedupService.tryAcquireBatch(eventIds);

int processed = 0;
int skipped = 0;

for (int i = 0; i < events.size(); i++) {
if (!dedupResults.get(i)) {
skipped++;
continue;
}

try {
NotificationEvent event = events.get(i);
messageSendService.processEvent(event);
processed++;
} catch (Exception e) {
log.error("[PROCESS FAIL] eventId={}", eventIds.get(i), e);
}
}

ack.acknowledge();

long elapsedMs = System.currentTimeMillis() - batchStart;
double tps = elapsedMs > 0 ? processed / (elapsedMs / 1000.0) : 0;

// Record metrics
kafkaBatchProcessedCounter.increment(processed);
kafkaBatchProcessingTimer.record(elapsedMs, TimeUnit.MILLISECONDS);

log.info(
"[BATCH DONE] thread={}, batchSize={}, processed={}, skipped={}, elapsed={}ms,"
+ " tps={}",
threadName,
batchSize,
processed,
skipped,
elapsedMs,
String.format("%.0f", tps));
}
}
Comment on lines +24 to +98
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

새로 추가된 InvoiceConsumer의 로직이 NotificationConsumer의 로직과 거의 동일합니다. 유일한 차이점은 @KafkaListenertopicsautoStartup 속성뿐입니다. 이렇게 코드가 중복되면 향후 로직 변경 시 두 군데를 모두 수정해야 하므로 유지보수가 어려워집니다.

공통 로직을 처리하는 추상 클래스를 만들고, InvoiceConsumerNotificationConsumer가 이를 상속받아 @KafkaListener 어노테이션만 각자 정의하도록 리팩토링하는 것을 강력히 권장합니다. 이렇게 하면 코드 중복을 제거하고 유지보수성을 높일 수 있습니다.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ public class NotificationConsumer {
private final Counter kafkaBatchProcessedCounter;
private final Timer kafkaBatchProcessingTimer;

@KafkaListener(topics = "usage-noti", containerFactory = "kafkaListenerContainerFactory")
@KafkaListener(
topics = "usage",
containerFactory = "kafkaListenerContainerFactory",
autoStartup = "false")
public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {

final String threadName = Thread.currentThread().getName();
Expand All @@ -39,13 +42,13 @@ public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment

long batchStart = System.currentTimeMillis();

List<UsageNotificationEvent> events = new ArrayList<>(batchSize);
List<NotificationEvent> events = new ArrayList<>(batchSize);
List<String> eventIds = new ArrayList<>(batchSize);

for (ConsumerRecord<String, String> record : records) {
try {
UsageNotificationEvent event =
objectMapper.readValue(record.value(), UsageNotificationEvent.class);
NotificationEvent event =
objectMapper.readValue(record.value(), NotificationEvent.class);
events.add(event);
eventIds.add(event.eventId().toString());
} catch (Exception e) {
Expand All @@ -65,7 +68,7 @@ public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment
}

try {
UsageNotificationEvent event = events.get(i);
NotificationEvent event = events.get(i);
messageSendService.processEvent(event);
processed++;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.util.Map;
import java.util.UUID;

public record UsageNotificationEvent(
public record NotificationEvent(
UUID eventId,
Long templateGroupId,
SubscriptionInfo subscriptionInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import org.springframework.stereotype.Service;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Service
@RequiredArgsConstructor
@Slf4j
public class NotificationSendDedupService {

private final StringRedisTemplate redisTemplate;
Expand All @@ -26,6 +28,7 @@ public boolean tryAcquire(String eventId) {

public List<Boolean> tryAcquireBatch(List<String> eventIds) {
if (eventIds == null || eventIds.isEmpty()) {
log.info("evenIds is null or empty");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

로그 메시지에 오타가 있습니다. 'evenIds'를 'eventIds'로 수정해야 합니다.

Suggested change
log.info("evenIds is null or empty");
log.info("eventIds is null or empty");

return List.of();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

import com.project.global.util.AesUtil;
import com.project.global.util.MaskingUtil;
import com.project.notification.consumer.UsageNotificationEvent;
import com.project.notification.consumer.NotificationEvent;
import com.project.notification.dto.EmailSendRequest;
import com.project.notification.dto.SendResponse;
import com.project.notification.dto.SmsSendRequest;
Expand Down Expand Up @@ -56,10 +56,15 @@ public class MessageSendService {
private final Timer smsProcessingTimer;

// variables 중 암호화된 필드 키 목록
private static final Set<String> ENCRYPTED_KEYS = Set.of("phone_number", "email");
private static final Set<String> ENCRYPTED_KEYS = Set.of("phoneNumber", "email");

@Transactional
public void processEvent(UsageNotificationEvent event) {
public void processEvent(NotificationEvent event) {

if (event.subscriptionInfo().subId() != 1001 && event.subscriptionInfo().subId() != 1002) {
return;
}
Comment on lines +64 to +66
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

subId가 1001 또는 1002인 경우에만 이벤트를 처리하도록 하드코딩되어 있습니다. 이러한 '매직 넘버'는 코드의 의도를 파악하기 어렵게 만들고, 향후 요구사항 변경 시 유연하게 대처하기 어렵게 합니다.

만약 특정 구독자만 처리하기 위한 로직이라면, 이 ID 목록을 외부 설정(예: application.yml)으로 분리하고, 코드에서는 설정 값을 읽어 사용하도록 변경하는 것이 좋습니다. 또한, 상수를 사용하여 1001, 1002와 같은 값에 의미를 부여하는 것을 고려해 보세요. 만약 테스트를 위한 임시 코드였다면, 병합 전에 반드시 제거해야 합니다.


long startTime = System.currentTimeMillis();

Map<String, Object> maskedVariables = prepareVariablesForSending(event.variables());
Expand Down Expand Up @@ -94,6 +99,8 @@ private Map<String, Object> prepareVariablesForSending(Map<String, Object> rawVa
processed.forEach(
(key, value) -> {
if (ENCRYPTED_KEYS.contains(key)) {
log.info("Encrypted key: {}", key);
log.info("Encrypted value: {}", value);
Comment on lines +102 to +103
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

암호화된 키와 값을 INFO 레벨로 로깅하고 있습니다. 비록 값이 암호화되어 있더라도, 프로덕션 환경에서 민감할 수 있는 정보를 INFO 레벨로 기록하는 것은 과도한 로그를 유발하고 잠재적인 보안 위험을 초래할 수 있습니다.

이 로그는 디버깅 목적으로 추가된 것으로 보입니다. 디버깅이 필요한 경우에만 볼 수 있도록 로그 레벨을 DEBUGTRACE로 낮추거나, 더 이상 필요하지 않다면 제거하는 것이 좋습니다.

Suggested change
log.info("Encrypted key: {}", key);
log.info("Encrypted value: {}", value);
log.debug("Encrypted key: {}", key);
log.debug("Encrypted value: {}", value);

try {
// 복호화 시도
String decrypted = aesUtil.decrypt(value.toString());
Expand All @@ -110,9 +117,9 @@ private Map<String, Object> prepareVariablesForSending(Map<String, Object> rawVa
}

private boolean tryEmailSend(
UsageNotificationEvent event, Map<String, Object> maskedVariables, long startTime) {
NotificationEvent event, Map<String, Object> maskedVariables, long startTime) {
long emailStartTime = System.currentTimeMillis();
UsageNotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo();
NotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo();

String email = aesUtil.decrypt(subInfo.email());
if (email == null || email.isBlank()) {
Expand Down Expand Up @@ -222,9 +229,9 @@ private boolean tryEmailSend(
}

private void trySmsSend(
UsageNotificationEvent event, boolean isFallback, Map<String, Object> maskedVariables) {
NotificationEvent event, boolean isFallback, Map<String, Object> maskedVariables) {
long smsStartTime = System.currentTimeMillis();
UsageNotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo();
NotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo();

String phoneNumber = aesUtil.decrypt(subInfo.phoneNumber());
if (phoneNumber == null || phoneNumber.isBlank()) {
Expand Down Expand Up @@ -321,15 +328,15 @@ private void trySmsSend(
}

private void saveMessageLog(
UsageNotificationEvent event,
NotificationEvent event,
Map<String, Object> maskedVariables,
Long templateVersionId,
Channel channel,
MessageStatus status,
String errorMessage,
Long processingTimeMs) {

UsageNotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo();
NotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo();
String recipientEnc = channel == Channel.EMAIL ? subInfo.email() : subInfo.phoneNumber();

Map<String, Object> payload = new HashMap<>();
Expand Down
Loading