-
Notifications
You must be signed in to change notification settings - Fork 0
[UPLUS-134] Kafka Consumer 성능 개선 및 로직 구현 #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
070fb54
502b624
6eafcc6
7fdd483
b8dbf34
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,21 @@ | ||||||||||
| package com.project.global.config; | ||||||||||
|
|
||||||||||
| import java.util.List; | ||||||||||
|
|
||||||||||
| import org.springframework.context.annotation.Bean; | ||||||||||
| import org.springframework.context.annotation.Configuration; | ||||||||||
| import org.springframework.core.io.ClassPathResource; | ||||||||||
| import org.springframework.data.redis.core.script.DefaultRedisScript; | ||||||||||
| import org.springframework.data.redis.core.script.RedisScript; | ||||||||||
|
|
||||||||||
| @Configuration | ||||||||||
| public class RedisLuaConfig { | ||||||||||
|
|
||||||||||
| @Bean | ||||||||||
| public RedisScript<List> dedupBatchScript() { | ||||||||||
| DefaultRedisScript<List> script = new DefaultRedisScript<>(); | ||||||||||
|
Comment on lines
+15
to
+16
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
| script.setLocation(new ClassPathResource("redis/dedup_batch.lua")); | ||||||||||
| script.setResultType(List.class); | ||||||||||
| return script; | ||||||||||
| } | ||||||||||
| } | ||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,10 +1,12 @@ | ||
| package com.project.notification.consumer; | ||
|
|
||
| import java.time.LocalDateTime; | ||
| import java.time.ZoneId; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
| import org.springframework.context.annotation.Profile; | ||
| import org.springframework.kafka.annotation.KafkaListener; | ||
| import org.springframework.kafka.support.Acknowledgment; | ||
| import org.springframework.stereotype.Component; | ||
|
|
@@ -24,34 +26,65 @@ public class NotificationConsumer { | |
| private final ObjectMapper objectMapper; | ||
| private final UsageNotificationMessageFormatter formatter; | ||
|
|
||
| @KafkaListener( | ||
| topics = "notification-usage", | ||
| containerFactory = "kafkaListenerContainerFactory") | ||
| @Profile("notification-worker") | ||
| public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) { | ||
| log.info("CONSUME START offset={}, value={}", record.offset(), record.value()); | ||
| @KafkaListener(topics = "usage-noti", containerFactory = "kafkaListenerContainerFactory") | ||
| public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment ack) { | ||
|
|
||
| try { | ||
| UsageNotificationEvent event = | ||
| objectMapper.readValue(record.value(), UsageNotificationEvent.class); | ||
| final String threadName = Thread.currentThread().getName(); | ||
| final int batchSize = records.size(); | ||
|
|
||
| log.info("[BATCH START] thread={}, records={}", threadName, batchSize); | ||
|
|
||
| long batchStart = System.currentTimeMillis(); | ||
|
|
||
| String eventId = event.eventId().toString(); | ||
| // 1️⃣ eventId 수집 | ||
| List<UsageNotificationEvent> events = new ArrayList<>(batchSize); | ||
| List<String> eventIds = new ArrayList<>(batchSize); | ||
|
|
||
| if (!dedupService.tryAcquire(eventId)) { | ||
| log.info("[SKIP] duplicated eventId={}", eventId); | ||
| ack.acknowledge(); | ||
| return; | ||
| for (ConsumerRecord<String, String> record : records) { | ||
| try { | ||
| UsageNotificationEvent event = | ||
| objectMapper.readValue(record.value(), UsageNotificationEvent.class); | ||
| events.add(event); | ||
| eventIds.add(event.eventId().toString()); | ||
| } catch (Exception e) { | ||
| log.warn("[DESERIALIZE FAIL] offset={}", record.offset(), e); | ||
| } | ||
|
Comment on lines
+49
to
51
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| } | ||
|
|
||
| String format = formatter.format(event, LocalDateTime.now()); | ||
| // redis Lua dedup (단 1회 호출) | ||
| List<Boolean> dedupResults = dedupService.tryAcquireBatch(eventIds); | ||
|
|
||
| SendNotificationLogger.write(format); | ||
| int processed = 0; | ||
| int skipped = 0; | ||
|
|
||
| ack.acknowledge(); | ||
| } catch (Exception e) { | ||
| log.error("[CONSUME FAIL]", e); | ||
| ack.acknowledge(); // 지금 구조상 스킵이 맞음 | ||
| // 3️⃣ 결과 기반 처리 | ||
| for (int i = 0; i < events.size(); i++) { | ||
| if (!dedupResults.get(i)) { | ||
| skipped++; | ||
| continue; | ||
| } | ||
|
|
||
| UsageNotificationEvent event = events.get(i); | ||
| String message = formatter.format(event, LocalDateTime.now(ZoneId.of("Asia/Seoul"))); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| SendNotificationLogger.write(message); | ||
| processed++; | ||
| } | ||
|
|
||
| ack.acknowledge(); | ||
|
|
||
| long elapsedMs = System.currentTimeMillis() - batchStart; | ||
| double tps = processed / (elapsedMs / 1000.0); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| log.info( | ||
| "[BATCH DONE] thread={}, batchSize={}, processed={}, skipped={}, elapsed={}ms," | ||
| + " tps={}", | ||
| threadName, | ||
| batchSize, | ||
| processed, | ||
| skipped, | ||
| elapsedMs, | ||
| String.format("%.0f", tps)); | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,21 +1,34 @@ | ||
| package com.project.notification.consumer; | ||
|
|
||
| import java.time.Duration; | ||
| import java.util.List; | ||
|
|
||
| import org.springframework.data.redis.core.StringRedisTemplate; | ||
| import org.springframework.data.redis.core.script.RedisScript; | ||
| import org.springframework.stereotype.Service; | ||
|
|
||
| import lombok.RequiredArgsConstructor; | ||
|
|
||
| @Service | ||
| @RequiredArgsConstructor | ||
| public class NotificationSendDedupService { | ||
|
|
||
| private final StringRedisTemplate redisTemplate; | ||
|
|
||
| private static final Duration TTL = Duration.ofDays(7); | ||
|
|
||
| public boolean tryAcquire(String eventId) { | ||
| Boolean success = | ||
| redisTemplate.opsForValue().setIfAbsent("notification:event:" + eventId, "1", TTL); | ||
| return Boolean.TRUE.equals(success); | ||
| private final RedisScript<List> dedupBatchScript; | ||
|
|
||
| public List<Boolean> tryAcquireBatch(List<String> eventIds) { | ||
|
|
||
| List<String> keys = eventIds.stream().map(id -> "notification:event:" + id).toList(); | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| List<Long> results = | ||
| (List<Long>) | ||
| redisTemplate.execute( | ||
| dedupBatchScript, keys, String.valueOf(TTL.toSeconds())); | ||
|
|
||
| return results.stream().map(v -> v == 1L).toList(); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| -- redis/dedup_batch.lua | ||
| local ttl = tonumber(ARGV[1]) | ||
| local results = {} | ||
|
|
||
| for i, key in ipairs(KEYS) do | ||
| if redis.call('SETNX', key, 1) == 1 then | ||
| redis.call('EXPIRE', key, ttl) | ||
| results[i] = 1 | ||
| else | ||
| results[i] = 0 | ||
| end | ||
|
Comment on lines
+6
to
+11
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| end | ||
|
|
||
| return results | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kafka 컨슈머의 동시성(concurrency)을
6으로 하드코딩하셨습니다. 이 값은 실행 환경에 따라 튜닝이 필요할 수 있으므로,application.yml파일에서 설정으로 관리하는 것이 유지보수 측면에서 더 좋습니다.Environment객체를 사용하여 프로퍼티 값을 주입받는 것을 권장합니다. 예를 들어application.yml에spring.kafka.listener.concurrency: 6과 같이 설정할 수 있습니다.