Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/main/java/com/project/controller/BatchController.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.project.controller;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.project.rdb.batch.orchestrator.UsageOrchestrator;

import lombok.RequiredArgsConstructor;

@RestController
@RequestMapping("/api/batch")
@RequiredArgsConstructor
public class BatchController {

private final UsageOrchestrator usageOrchestrator;

@PostMapping("/usage/run")
public ResponseEntity<String> runUsagePipeline() {
try {
usageOrchestrator.run();
return ResponseEntity.ok("✅ usage batch pipeline started");
} catch (Exception e) {
return ResponseEntity.internalServerError()
.body("❌ batch execution failed: " + e.getMessage());
}
}
Comment on lines +20 to +28
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

runUsagePipeline() 메서드는 배치 작업을 동기적으로 실행합니다. 배치 작업은 완료하는 데 시간이 오래 걸릴 수 있으므로, HTTP 요청이 타임아웃될 가능성이 매우 높습니다. JobLauncher를 비동기적으로 실행하도록 설정하거나, 별도의 스레드 풀(ExecutorService)을 사용하여 비동기적으로 작업을 시작하고 즉시 202 Accepted와 같은 응답을 반환하는 것이 좋습니다.

예를 들어, SimpleAsyncTaskExecutor를 사용하여 JobLauncher를 구성할 수 있습니다.

// In a config class
@Bean
public JobLauncher asyncJobLauncher(JobRepository jobRepository) throws Exception {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
    jobLauncher.afterPropertiesSet();
    return jobLauncher;
}

이렇게 구성된 asyncJobLauncher를 주입받아 사용하면 jobLauncher.run() 호출이 즉시 반환됩니다.

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public enum GlobalErrorCode implements BaseErrorCode {
HttpStatus.BAD_REQUEST, "COMMON_011", "UsageNotification Produce 과정에서 에러가 발생하였습니다"),
USAGE_OUTBOX_WRITER_FAILED(
HttpStatus.BAD_REQUEST, "COMMON_012", "UsageOutbox Writer 배치 시스템이 실패하였습니다"),
PLAN_NOT_VALID(HttpStatus.BAD_REQUEST, "COMMON_013", "존재하지 않는 요금제입니다");
PLAN_NOT_VALID(HttpStatus.BAD_REQUEST, "COMMON_013", "존재하지 않는 요금제입니다"),
BATCH_NOT_FINISHED(HttpStatus.BAD_REQUEST, "COMMON_014", "이전 배치 작업이 끝나지 않았습니다");

private final HttpStatus httpStatus;
private final String customCode;
Expand Down
55 changes: 0 additions & 55 deletions src/main/java/com/project/rdb/BatchJobRunner.java

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class UsageNotificationOutbox {
private int threshold;

@Column(nullable = false)
private int percent;
private double percent;

@Column(name = "total_used_mb", nullable = false)
private Long totalUsedMb;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import com.project.rdb.batch.model.dto.UsageNotificationEvent;
import com.project.rdb.batch.model.dto.UsageNotificationOutboxRow;
import com.project.rdb.batch.notificationsend.dto.NotificationMessage;
import com.project.rdb.batch.notificationsend.processor.NotificationSendProcessor;
import com.project.rdb.batch.notificationsend.writer.NotificationSendWriter;
import com.project.rdb.batch.usagenotification.dto.UsageNotificationOutboxRow;

import lombok.RequiredArgsConstructor;

Expand Down Expand Up @@ -44,7 +44,7 @@ public Job notificationSendJob() {
@Bean
public Step notificationSendStep() {
return new StepBuilder("notificationSendStep", jobRepository)
.<UsageNotificationOutboxRow, UsageNotificationEvent>chunk(CHUNK_SIZE, txManager)
.<UsageNotificationOutboxRow, NotificationMessage>chunk(CHUNK_SIZE, txManager)
.reader(outboxReader)
.processor(notificationSendProcessor)
.writer(notificationSendWriter)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.project.rdb.batch.notificationsend.dto;

import java.util.Map;
import java.util.UUID;

public record NotificationMessage(
UUID eventId,
Long id,
Long templateGroupId,
Map<String, Object> subscriptionInfo,
Map<String, Object> variables) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.project.rdb.batch.notificationsend.dto;

import java.util.concurrent.CompletableFuture;

import org.springframework.kafka.support.SendResult;

public record NotificationSendTask(
NotificationMessage event, CompletableFuture<SendResult<String, String>> future) {}
Original file line number Diff line number Diff line change
@@ -1,32 +1,40 @@
package com.project.rdb.batch.notificationsend.processor;

import java.util.Map;
import java.util.UUID;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

import com.project.rdb.batch.model.dto.UsageNotificationEvent;
import com.project.rdb.batch.model.dto.UsageNotificationOutboxRow;
import com.project.rdb.batch.notificationsend.dto.NotificationMessage;
import com.project.rdb.batch.usagenotification.dto.UsageNotificationOutboxRow;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class NotificationSendProcessor
implements ItemProcessor<UsageNotificationOutboxRow, UsageNotificationEvent> {
implements ItemProcessor<UsageNotificationOutboxRow, NotificationMessage> {

@Override
public UsageNotificationEvent process(UsageNotificationOutboxRow item) {
public NotificationMessage process(UsageNotificationOutboxRow item) {

return new UsageNotificationEvent(
return new NotificationMessage(
UUID.randomUUID(),
item.id(),
item.subId(),
item.period(),
item.unit(),
item.threshold(),
item.percent(),
item.totalUsedMb(),
item.allotmentMb());
101L,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

templateGroupId101L로 하드코딩되어 있습니다. 이러한 값은 변경될 가능성이 있으므로, 설정 파일(application.yml)이나 데이터베이스에서 값을 읽어오도록 하여 유연성을 높이는 것이 좋습니다.

Map.of(
"subId", item.subId(),
"phoneNumber", item.phoneNumber(),
"email", item.email()),
Map.of(
"period", item.period(),
"threshold", item.threshold(),
"percent", item.percent(),
"totalUsedMb", item.totalUsedMb(),
"allotmentMb", item.allotmentMb(),
"phoneNumber", item.phoneNumber(),
"email", item.email(),
"createdAt", item.createdAt()));
Comment on lines +30 to +38
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

NotificationMessage를 생성할 때 phoneNumberemail 필드가 subscriptionInfo 맵과 variables 맵 양쪽에 중복으로 포함되고 있습니다. 이것이 의도된 동작이 아니라면, 데이터를 한 곳에만 포함시켜 불필요한 데이터 전송을 줄이는 것이 좋습니다. subscriptionInfo는 구독자 식별 정보, variables는 템플릿에 사용될 변수로 역할을 분리하는 것이 명확해 보입니다.

                Map.of(
                        "period", item.period(),
                        "threshold", item.threshold(),
                        "percent", item.percent(),
                        "totalUsedMb", item.totalUsedMb(),
                        "allotmentMb", item.allotmentMb(),
                        "createdAt", item.createdAt()));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.project.rdb.batch.model.dto.UsageNotificationOutboxRow;
import com.project.rdb.batch.usagenotification.dto.UsageNotificationOutboxRow;

@Configuration
public class NotificationSendReaderConfig {
Expand All @@ -19,17 +19,24 @@ public JdbcCursorItemReader<UsageNotificationOutboxRow> notificationSendOutboxRe
String sql =
"""
SELECT
id,
sub_id,
period,
unit,
threshold,
percent,
total_used_mb,
allotment_mb
FROM usage_notification_outbox
WHERE status = 'PENDING'
ORDER BY id
o.id,
o.sub_id,
o.period,
o.plan_name,
o.threshold,
o.percent,
o.total_used_mb,
o.allotment_mb,
s.phone_number,
c.email_enc,
o.created_at
FROM usage_notification_outbox o
JOIN subscription s
ON s.sub_id = o.sub_id
JOIN customer c
ON c.customer_id = s.customer_id
WHERE o.status = 'PENDING'
ORDER BY o.id
""";

return new JdbcCursorItemReaderBuilder<UsageNotificationOutboxRow>()
Expand All @@ -43,11 +50,14 @@ public JdbcCursorItemReader<UsageNotificationOutboxRow> notificationSendOutboxRe
rs.getLong("id"),
rs.getLong("sub_id"),
rs.getString("period"),
rs.getString("unit"),
rs.getString("plan_name"),
rs.getInt("threshold"),
rs.getInt("percent"),
rs.getDouble("percent"),
rs.getLong("total_used_mb"),
rs.getLong("allotment_mb")))
rs.getLong("allotment_mb"),
rs.getString("phone_number"),
rs.getString("email_enc"),
rs.getTimestamp("created_at").toLocalDateTime()))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,26 @@
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.project.rdb.batch.model.dto.NotificationSendTask;
import com.project.rdb.batch.model.dto.UsageNotificationEvent;
import com.project.rdb.batch.model.repository.UsageNotificationOutboxRepository;
import com.project.rdb.batch.notificationsend.dto.NotificationMessage;
import com.project.rdb.batch.notificationsend.dto.NotificationSendTask;

import lombok.RequiredArgsConstructor;

@Component
@RequiredArgsConstructor
public class NotificationSendWriter implements ItemWriter<UsageNotificationEvent> {
public class NotificationSendWriter implements ItemWriter<NotificationMessage> {

private final KafkaTemplate<String, String> kafkaTemplate;
private final UsageNotificationOutboxRepository repository;
private final ObjectMapper objectMapper;
private static final int MAX_FAILURE_REASON_LENGTH = 255;

@Override
public void write(Chunk<? extends UsageNotificationEvent> items) {
public void write(Chunk<? extends NotificationMessage> items) {

// usageNotificationOutboxId 추출
List<Long> ids = items.getItems().stream().map(UsageNotificationEvent::id).toList();
List<Long> ids = items.getItems().stream().map(NotificationMessage::id).toList();

// PROCESSING Status 먼저 DB 반영
repository.markProcessing(ids);
Expand All @@ -42,12 +42,13 @@ public void write(Chunk<? extends UsageNotificationEvent> items) {

Map<Long, String> failedReasons = new HashMap<>();

for (UsageNotificationEvent event : items) {
for (NotificationMessage event : items) {
try {
String payload = objectMapper.writeValueAsString(event);
String key = String.valueOf(event.subscriptionInfo().get("subId"));

CompletableFuture<SendResult<String, String>> future =
kafkaTemplate.send("notification-usage", event.subId().toString(), payload);
kafkaTemplate.send("noti-tp", key, payload);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Kafka 메시지를 발행하는 토픽 이름이 noti-tp로 지정되어 있습니다. 하지만 NotificationSendConsumernotification-usage 토픽을 구독하고 있습니다. 토픽 이름이 일치하지 않아 메시지가 소비되지 않습니다. 두 곳의 토픽 이름을 일치시켜야 합니다. 컨슈머의 토픽 이름이 변경되지 않았으므로, 여기서 토픽 이름을 notification-usage로 유지하는 것이 좋아 보입니다.

Suggested change
kafkaTemplate.send("noti-tp", key, payload);
kafkaTemplate.send("notification-usage", key, payload);


tasks.add(new NotificationSendTask(event, future));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.project.rdb.batch.orchestrator;

import java.time.LocalDateTime;
import java.util.Map;

import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import lombok.RequiredArgsConstructor;

@Service
@RequiredArgsConstructor
public class BatchTimeWindowService {

private final NamedParameterJdbcTemplate jdbc;

public LocalDateTime resolve(String jobName) {
return jdbc.queryForObject(
"""
SELECT last_processed_at
FROM batch_watermark
WHERE job_name = :jobName
""",
Map.of("jobName", jobName),
LocalDateTime.class);
}

@Transactional
public void update(String jobName, LocalDateTime newFrom) {
jdbc.update(
"""
UPDATE batch_watermark
SET last_processed_at = :newFrom
WHERE job_name = :jobName
""",
Map.of(
"jobName", jobName,
"newFrom", newFrom));
}
}
Loading
Loading