-
Notifications
You must be signed in to change notification settings - Fork 0
[UPLUS-143] 알림 이벤트 Consumer 로직 수정 #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,98 @@ | ||
| package com.project.notification.consumer; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
| import org.springframework.kafka.annotation.KafkaListener; | ||
| import org.springframework.kafka.support.Acknowledgment; | ||
| import org.springframework.stereotype.Component; | ||
|
|
||
| import com.fasterxml.jackson.databind.ObjectMapper; | ||
| import com.project.notification.service.MessageSendService; | ||
|
|
||
| import io.micrometer.core.instrument.Counter; | ||
| import io.micrometer.core.instrument.Timer; | ||
|
|
||
| import lombok.RequiredArgsConstructor; | ||
| import lombok.extern.slf4j.Slf4j; | ||
|
|
||
| @Slf4j | ||
| @Component | ||
| @RequiredArgsConstructor | ||
| public class InvoiceConsumer { | ||
|
|
||
| private final NotificationSendDedupService dedupService; | ||
| private final ObjectMapper objectMapper; | ||
| private final MessageSendService messageSendService; | ||
| private final Counter kafkaBatchProcessedCounter; | ||
| private final Timer kafkaBatchProcessingTimer; | ||
|
|
||
| @KafkaListener( | ||
| topics = "invoice", | ||
| containerFactory = "kafkaListenerContainerFactory", | ||
| autoStartup = "true") | ||
| public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment ack) { | ||
|
|
||
| final String threadName = Thread.currentThread().getName(); | ||
| final int batchSize = records.size(); | ||
|
|
||
| log.info("[BATCH START] thread={}, records={}", threadName, batchSize); | ||
|
|
||
| long batchStart = System.currentTimeMillis(); | ||
|
|
||
| List<NotificationEvent> events = new ArrayList<>(batchSize); | ||
| List<String> eventIds = new ArrayList<>(batchSize); | ||
|
|
||
| for (ConsumerRecord<String, String> record : records) { | ||
| try { | ||
| NotificationEvent event = | ||
| objectMapper.readValue(record.value(), NotificationEvent.class); | ||
| events.add(event); | ||
| eventIds.add(event.eventId().toString()); | ||
| } catch (Exception e) { | ||
| log.warn("[DESERIALIZE FAIL] offset={}", record.offset(), e); | ||
| } | ||
| } | ||
|
|
||
| List<Boolean> dedupResults = dedupService.tryAcquireBatch(eventIds); | ||
|
|
||
| int processed = 0; | ||
| int skipped = 0; | ||
|
|
||
| for (int i = 0; i < events.size(); i++) { | ||
| if (!dedupResults.get(i)) { | ||
| skipped++; | ||
| continue; | ||
| } | ||
|
|
||
| try { | ||
| NotificationEvent event = events.get(i); | ||
| messageSendService.processEvent(event); | ||
| processed++; | ||
| } catch (Exception e) { | ||
| log.error("[PROCESS FAIL] eventId={}", eventIds.get(i), e); | ||
| } | ||
| } | ||
|
|
||
| ack.acknowledge(); | ||
|
|
||
| long elapsedMs = System.currentTimeMillis() - batchStart; | ||
| double tps = elapsedMs > 0 ? processed / (elapsedMs / 1000.0) : 0; | ||
|
|
||
| // Record metrics | ||
| kafkaBatchProcessedCounter.increment(processed); | ||
| kafkaBatchProcessingTimer.record(elapsedMs, TimeUnit.MILLISECONDS); | ||
|
|
||
| log.info( | ||
| "[BATCH DONE] thread={}, batchSize={}, processed={}, skipped={}, elapsed={}ms," | ||
| + " tps={}", | ||
| threadName, | ||
| batchSize, | ||
| processed, | ||
| skipped, | ||
| elapsedMs, | ||
| String.format("%.0f", tps)); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,9 +8,11 @@ | |
| import org.springframework.stereotype.Service; | ||
|
|
||
| import lombok.RequiredArgsConstructor; | ||
| import lombok.extern.slf4j.Slf4j; | ||
|
|
||
| @Service | ||
| @RequiredArgsConstructor | ||
| @Slf4j | ||
| public class NotificationSendDedupService { | ||
|
|
||
| private final StringRedisTemplate redisTemplate; | ||
|
|
@@ -26,6 +28,7 @@ public boolean tryAcquire(String eventId) { | |
|
|
||
| public List<Boolean> tryAcquireBatch(List<String> eventIds) { | ||
| if (eventIds == null || eventIds.isEmpty()) { | ||
| log.info("evenIds is null or empty"); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| return List.of(); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -12,7 +12,7 @@ | |||||||||
|
|
||||||||||
| import com.project.global.util.AesUtil; | ||||||||||
| import com.project.global.util.MaskingUtil; | ||||||||||
| import com.project.notification.consumer.UsageNotificationEvent; | ||||||||||
| import com.project.notification.consumer.NotificationEvent; | ||||||||||
| import com.project.notification.dto.EmailSendRequest; | ||||||||||
| import com.project.notification.dto.SendResponse; | ||||||||||
| import com.project.notification.dto.SmsSendRequest; | ||||||||||
|
|
@@ -56,10 +56,15 @@ public class MessageSendService { | |||||||||
| private final Timer smsProcessingTimer; | ||||||||||
|
|
||||||||||
| // variables 중 암호화된 필드 키 목록 | ||||||||||
| private static final Set<String> ENCRYPTED_KEYS = Set.of("phone_number", "email"); | ||||||||||
| private static final Set<String> ENCRYPTED_KEYS = Set.of("phoneNumber", "email"); | ||||||||||
|
|
||||||||||
| @Transactional | ||||||||||
| public void processEvent(UsageNotificationEvent event) { | ||||||||||
| public void processEvent(NotificationEvent event) { | ||||||||||
|
|
||||||||||
| if (event.subscriptionInfo().subId() != 1001 && event.subscriptionInfo().subId() != 1002) { | ||||||||||
| return; | ||||||||||
| } | ||||||||||
|
Comment on lines
+64
to
+66
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
만약 특정 구독자만 처리하기 위한 로직이라면, 이 ID 목록을 외부 설정(예: |
||||||||||
|
|
||||||||||
| long startTime = System.currentTimeMillis(); | ||||||||||
|
|
||||||||||
| Map<String, Object> maskedVariables = prepareVariablesForSending(event.variables()); | ||||||||||
|
|
@@ -94,6 +99,8 @@ private Map<String, Object> prepareVariablesForSending(Map<String, Object> rawVa | |||||||||
| processed.forEach( | ||||||||||
| (key, value) -> { | ||||||||||
| if (ENCRYPTED_KEYS.contains(key)) { | ||||||||||
| log.info("Encrypted key: {}", key); | ||||||||||
| log.info("Encrypted value: {}", value); | ||||||||||
|
Comment on lines
+102
to
+103
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 암호화된 키와 값을 이 로그는 디버깅 목적으로 추가된 것으로 보입니다. 디버깅이 필요한 경우에만 볼 수 있도록 로그 레벨을
Suggested change
|
||||||||||
| try { | ||||||||||
| // 복호화 시도 | ||||||||||
| String decrypted = aesUtil.decrypt(value.toString()); | ||||||||||
|
|
@@ -110,9 +117,9 @@ private Map<String, Object> prepareVariablesForSending(Map<String, Object> rawVa | |||||||||
| } | ||||||||||
|
|
||||||||||
| private boolean tryEmailSend( | ||||||||||
| UsageNotificationEvent event, Map<String, Object> maskedVariables, long startTime) { | ||||||||||
| NotificationEvent event, Map<String, Object> maskedVariables, long startTime) { | ||||||||||
| long emailStartTime = System.currentTimeMillis(); | ||||||||||
| UsageNotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo(); | ||||||||||
| NotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo(); | ||||||||||
|
|
||||||||||
| String email = aesUtil.decrypt(subInfo.email()); | ||||||||||
| if (email == null || email.isBlank()) { | ||||||||||
|
|
@@ -222,9 +229,9 @@ private boolean tryEmailSend( | |||||||||
| } | ||||||||||
|
|
||||||||||
| private void trySmsSend( | ||||||||||
| UsageNotificationEvent event, boolean isFallback, Map<String, Object> maskedVariables) { | ||||||||||
| NotificationEvent event, boolean isFallback, Map<String, Object> maskedVariables) { | ||||||||||
| long smsStartTime = System.currentTimeMillis(); | ||||||||||
| UsageNotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo(); | ||||||||||
| NotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo(); | ||||||||||
|
|
||||||||||
| String phoneNumber = aesUtil.decrypt(subInfo.phoneNumber()); | ||||||||||
| if (phoneNumber == null || phoneNumber.isBlank()) { | ||||||||||
|
|
@@ -321,15 +328,15 @@ private void trySmsSend( | |||||||||
| } | ||||||||||
|
|
||||||||||
| private void saveMessageLog( | ||||||||||
| UsageNotificationEvent event, | ||||||||||
| NotificationEvent event, | ||||||||||
| Map<String, Object> maskedVariables, | ||||||||||
| Long templateVersionId, | ||||||||||
| Channel channel, | ||||||||||
| MessageStatus status, | ||||||||||
| String errorMessage, | ||||||||||
| Long processingTimeMs) { | ||||||||||
|
|
||||||||||
| UsageNotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo(); | ||||||||||
| NotificationEvent.SubscriptionInfo subInfo = event.subscriptionInfo(); | ||||||||||
| String recipientEnc = channel == Channel.EMAIL ? subInfo.email() : subInfo.phoneNumber(); | ||||||||||
|
|
||||||||||
| Map<String, Object> payload = new HashMap<>(); | ||||||||||
|
|
||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
새로 추가된
InvoiceConsumer의 로직이NotificationConsumer의 로직과 거의 동일합니다. 유일한 차이점은@KafkaListener의topics와autoStartup속성뿐입니다. 이렇게 코드가 중복되면 향후 로직 변경 시 두 군데를 모두 수정해야 하므로 유지보수가 어려워집니다.공통 로직을 처리하는 추상 클래스를 만들고,
InvoiceConsumer와NotificationConsumer가 이를 상속받아@KafkaListener어노테이션만 각자 정의하도록 리팩토링하는 것을 강력히 권장합니다. 이렇게 하면 코드 중복을 제거하고 유지보수성을 높일 수 있습니다.