[UPLUS-134] Kafka Consumer 성능 개선 및 로직 구현#3
Conversation
Summary of ChangesHello @starboxxxx, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! 이 PR은 Kafka Consumer의 전반적인 성능을 개선하고 알림 처리 로직을 최적화하는 데 중점을 둡니다. 주요 변경 사항은 Kafka 메시지 처리 방식을 단일 메시지에서 배치 처리로 전환하고, Redis Lua 스크립트를 활용하여 중복 제거 로직을 효율적으로 구현하는 것입니다. 이를 통해 알림 시스템의 처리량과 안정성이 크게 향상될 것으로 기대됩니다. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
이 PR은 Kafka Consumer의 성능을 개선하기 위해 배치 처리와 Redis Lua 스크립트를 이용한 중복 제거 로직을 도입했습니다. 전반적으로 성능 개선 목표에 부합하는 좋은 변경이라고 생각합니다. 다만, 몇 가지 잠재적인 문제점과 개선 사항이 있어 리뷰 의견을 남깁니다. 주요 내용으로는 에러 처리 중 데이터 유실 가능성, Redis Lua 스크립트의 원자성 문제, 그리고 설정값 하드코딩 등이 있습니다. 아래의 상세 리뷰를 확인해주세요.
| } catch (Exception e) { | ||
| log.warn("[DESERIALIZE FAIL] offset={}", record.offset(), e); | ||
| } |
There was a problem hiding this comment.
| if redis.call('SETNX', key, 1) == 1 then | ||
| redis.call('EXPIRE', key, ttl) | ||
| results[i] = 1 | ||
| else | ||
| results[i] = 0 | ||
| end |
There was a problem hiding this comment.
SETNX와 EXPIRE를 순차적으로 호출하고 있습니다. SETNX 성공 후 EXPIRE가 실행되기 전에 Redis 서버가 다운되거나 스크립트 실행이 중단되면, TTL이 설정되지 않은 키가 영구적으로 남게 될 수 있습니다. 이는 의도치 않은 중복 제거 및 메모리 누수로 이어질 수 있습니다. Redis 2.6.12 버전부터 지원되는 SET key value EX seconds NX 명령어를 사용하면 SETNX와 EXPIRE를 원자적으로 실행할 수 있어 이 문제를 해결할 수 있습니다.
if redis.call('SET', key, 1, 'EX', ttl, 'NX') then
results[i] = 1
else
results[i] = 0
end
| factory.setAutoStartup(isWorker); | ||
|
|
||
| factory.setBatchListener(true); | ||
| factory.setConcurrency(6); // 🔥 핵심 |
There was a problem hiding this comment.
Kafka 컨슈머의 동시성(concurrency)을 6으로 하드코딩하셨습니다. 이 값은 실행 환경에 따라 튜닝이 필요할 수 있으므로, application.yml 파일에서 설정으로 관리하는 것이 유지보수 측면에서 더 좋습니다. Environment 객체를 사용하여 프로퍼티 값을 주입받는 것을 권장합니다. 예를 들어 application.yml에 spring.kafka.listener.concurrency: 6과 같이 설정할 수 있습니다.
| factory.setConcurrency(6); // 🔥 핵심 | |
| factory.setConcurrency(env.getProperty("spring.kafka.listener.concurrency", Integer.class, 6)); // 🔥 핵심 |
| public RedisScript<List> dedupBatchScript() { | ||
| DefaultRedisScript<List> script = new DefaultRedisScript<>(); |
There was a problem hiding this comment.
RedisScript와 DefaultRedisScript에 raw type인 List를 사용하고 있습니다. NotificationSendDedupService에서 List<Long>으로 캐스팅하는 것을 고려할 때, RedisScript<List<Long>>과 같이 타입을 명시해주는 것이 타입 안정성을 높이고 코드 가독성을 향상시킵니다.
| public RedisScript<List> dedupBatchScript() { | |
| DefaultRedisScript<List> script = new DefaultRedisScript<>(); | |
| public RedisScript<List<Long>> dedupBatchScript() { | |
| DefaultRedisScript<List<Long>> script = new DefaultRedisScript<>(); |
| } | ||
|
|
||
| UsageNotificationEvent event = events.get(i); | ||
| String message = formatter.format(event, LocalDateTime.now(ZoneId.of("Asia/Seoul"))); |
| ack.acknowledge(); | ||
|
|
||
| long elapsedMs = System.currentTimeMillis() - batchStart; | ||
| double tps = processed / (elapsedMs / 1000.0); |
There was a problem hiding this comment.
SonarQube Quality Summary (Community)❌ Quality Gate FAILED Branch: Issues
Measures
🔗 Dashboard: https://sonarqube.swthewhite.store/dashboard?id=api-message&branch=feat/UPLUS-134 Generated automatically by GitHub Actions. |
🍀 이슈 번호
UPLUS-134
✅ 작업 사항
Kafka Consumer 성능 개선 및 로직 구현
📋 체크리스트
⌨ 기타