Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added logs/notification-preview.log
Empty file.
13 changes: 11 additions & 2 deletions src/main/java/com/project/global/config/KafkaConfig.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package com.project.global.config;

import java.util.Arrays;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

Expand All @@ -12,12 +16,17 @@
public class KafkaConfig {

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory, Environment env) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(consumerFactory);

boolean isWorker = Arrays.asList(env.getActiveProfiles()).contains("notification-worker");

factory.setAutoStartup(isWorker);

factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);

return factory;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package com.project.notification.consumer;

import java.time.LocalDateTime;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.project.notification.dto.NotificationRequestEvent;
import com.project.notification.service.NotificationService;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -22,44 +20,37 @@
@RequiredArgsConstructor
public class NotificationConsumer {

private final NotificationService notificationService;
private final NotificationSendDedupService dedupService;
private final ObjectMapper objectMapper;

@Value("${notification.consumer.concurrency:1}")
private int concurrency;
private final UsageNotificationMessageFormatter formatter;

@KafkaListener(
id = "notification-consumer",
topics = "notification_topic",
groupId = "notification-consumer-group",
topics = "notification-usage",
containerFactory = "kafkaListenerContainerFactory")
@Profile("notification-worker")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
log.info(
"Received notification message. topic: {}, partition: {}, offset: {}",
record.topic(),
record.partition(),
record.offset());
log.info("CONSUME START offset={}, value={}", record.offset(), record.value());

try {
Map<String, Object> rawPayload =
objectMapper.readValue(record.value(), new TypeReference<>() {});
UsageNotificationEvent event =
objectMapper.readValue(record.value(), UsageNotificationEvent.class);

NotificationRequestEvent event = parseEvent(rawPayload);
String eventId = event.eventId().toString();

if (event == null) {
log.error("Failed to parse notification event. value: {}", record.value());
if (!dedupService.tryAcquire(eventId)) {
log.info("[SKIP] duplicated eventId={}", eventId);
ack.acknowledge();
return;
}

notificationService.processNotification(event, rawPayload);
String format = formatter.format(event, LocalDateTime.now());

SendNotificationLogger.write(format);

} catch (JsonProcessingException e) {
log.error("Failed to deserialize message. value: {}", record.value(), e);
} catch (Exception e) {
log.error("Failed to process notification. value: {}", record.value(), e);
} finally {
ack.acknowledge();
} catch (Exception e) {
log.error("[CONSUME FAIL]", e);
ack.acknowledge(); // 지금 구조상 스킵이 맞음
}
Comment on lines +51 to 54
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

예외 발생 시 메시지를 ack 처리하면 해당 메시지는 유실되어 알림이 누락될 수 있습니다. 이는 데이터 정합성에 심각한 문제를 야기할 수 있습니다.

일시적인 오류(예: DB 접속 불가)의 경우 재시도를 통해 문제를 해결할 수 있어야 하며, 영구적인 오류(예: 메시지 포맷 오류)의 경우 메시지를 Dead Letter Queue(DLQ)로 보내 나중에 분석할 수 있도록 하는 것이 좋습니다.

ack.acknowledge()를 호출하는 대신, 예외를 다시 던지거나 SeekToCurrentErrorHandler와 같은 스프링 카프카의 오류 처리 메커니즘을 활용하여 재시도 및 DLQ 전송을 구현하는 것을 강력히 권장합니다.

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.project.notification.consumer;

import java.time.Duration;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import lombok.RequiredArgsConstructor;

@Service
@RequiredArgsConstructor
public class NotificationSendDedupService {
private final StringRedisTemplate redisTemplate;
private static final Duration TTL = Duration.ofDays(7);

public boolean tryAcquire(String eventId) {
Boolean success =
redisTemplate.opsForValue().setIfAbsent("notification:event:" + eventId, "1", TTL);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Redis 키의 접두사 "notification:event:" 와 값 "1" 이 하드코딩되어 있습니다. 가독성과 유지보수성을 높이기 위해 private static final 상수로 추출하는 것을 권장합니다.

예시:

private static final String DEDUP_KEY_PREFIX = "notification:event:";
private static final String DEDUP_KEY_VALUE = "1";

// ...
redisTemplate.opsForValue().setIfAbsent(DEDUP_KEY_PREFIX + eventId, DEDUP_KEY_VALUE, TTL);

return Boolean.TRUE.equals(success);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.project.notification.consumer;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.LocalDateTime;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SendNotificationLogger {

private static final Path LOG_PATH = Path.of("logs/notification-preview.log");

public static void write(String content) {
try {
Files.createDirectories(LOG_PATH.getParent());

Files.writeString(
LOG_PATH,
"""
===============================
%s
%s
===============================

"""
.formatted(LocalDateTime.now(), content),
StandardOpenOption.CREATE,
StandardOpenOption.WRITE,
StandardOpenOption.APPEND);
} catch (IOException e) {
log.error("Failed to write notification preview file", e);
}
}
Comment on lines +14 to +36
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

SendNotificationLogger 구현에 대해 몇 가지 개선점을 제안합니다.

  1. 하드코딩된 경로: 로그 파일 경로(logs/notification-preview.log)가 하드코딩되어 있습니다. 환경에 따라 경로를 변경할 수 있도록 application.yml에서 설정을 주입받는 것이 더 유연합니다.
  2. 직접 파일 쓰기: Files.writeString을 사용하는 방식은 초안에서는 괜찮지만, 운영 환경에서는 스레드 안전성, 성능, 로그 관리(로테이션 등) 측면에서 불리합니다.

운영 환경을 고려하여, 이 클래스를 Spring Bean으로 만들고 Logback/Log4j2 같은 로깅 프레임워크의 FileAppender를 사용하도록 리팩토링하는 것을 권장합니다. 경로 설정도 외부에서 주입받도록 변경하면 더욱 견고하고 유연한 코드가 될 것입니다.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.project.notification.consumer;

import java.util.UUID;

public record UsageNotificationEvent(
UUID eventId,
Long id,
Long subId,
String period,
String unit,
int threshold,
int percent,
long totalUsedMb,
long allotmentMb) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.project.notification.consumer;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

import org.springframework.stereotype.Component;

@Component
public class UsageNotificationMessageFormatter {

public String format(UsageNotificationEvent event, LocalDateTime now) {

String providedGb = formatGb(event.allotmentMb());
String usedGb = formatGb(event.totalUsedMb());
String time = now.format(DateTimeFormatter.ofPattern("MM/dd HH:mm:ss"));

return """
[LG U+]
[Web발신]
[LG U+] 이번 달 데이터 사용량 안내

고객님, 「유쓰 5G 데이터 플러스」
요금제의 기본 데이터 사용량을 안내해 드립니다.

▶ 데이터 사용량 안내
- 제공량: %sGB
- 사용량: %d%% %sGB
※ %s 기준

"""
.formatted(providedGb, event.percent(), usedGb, time);
}

private String formatGb(long mb) {
return String.format("%.2f", mb / 1024.0);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

MB를 GB로 변환할 때 사용된 1024.0은 '매직 넘버'입니다. 코드의 가독성을 높이고 의도를 명확히 하기 위해 private static final double MB_PER_GB = 1024.0; 와 같이 상수로 정의하여 사용하는 것을 권장합니다.

}
}
67 changes: 52 additions & 15 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,19 @@ spring:
application:
name: api-message

kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
consumer:
group-id: notification-consumer-group
auto-offset-reset: earliest
enable-auto-commit: false
max-poll-records: 1
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

datasource:
url: jdbc:postgresql://${POSTGRES_HOST:localhost}:${POSTGRES_PORT:5432}/${POSTGRES_DATABASE:app}
username: ${POSTGRES_USER:postgres}
password: ${POSTGRES_PASSWORD:postgres}
driver-class-name: org.postgresql.Driver

batch:
job:
enabled: false

jpa:
hibernate:
ddl-auto: update
ddl-auto: none
show-sql: true
database-platform: org.hibernate.dialect.PostgreSQLDialect
properties:
Expand All @@ -42,6 +36,52 @@ spring:
redis:
url: redis://${REDIS_USERNAME:default}:${REDIS_PASSWORD:}@${REDIS_ENDPOINT:localhost:6379}

kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}

properties:
security.protocol: SASL_SSL
sasl.mechanism: PLAIN
sasl.jaas.config: >
org.apache.kafka.common.security.plain.PlainLoginModule required
username='${KAFKA_API_KEY}'
password='${KAFKA_API_SECRET}';

producer:
retries: 2147483647
batch-size: 65536
compression-type: lz4
buffer-memory: 67108864
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer

properties:
enable-idempotence: true
linger.ms: 20
compression.type: lz4
max.in.flight.requests.per.connection: 5

consumer:
group-id: usage-notification-worker
auto-offset-reset: earliest
enable-auto-commit: false

key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

max-poll-records: 1
max-poll-interval-ms: 300000
session-timeout-ms: 10000
heartbeat-interval-ms: 3000

properties:
spring.json.trusted.packages: com.project.rdb.batch.model.dto

listener:
auto-startup: false



server:
port: 8080

Expand Down Expand Up @@ -85,9 +125,6 @@ logging:
org.springframework.web: INFO
org.hibernate.SQL: WARN


ureca:
secret-key: ${AES_SECRET_KEY:12345678901234567890123456789012}

notification:
consumer:
concurrency: ${CONSUMER_CONCURRENCY:1}