-
Notifications
You must be signed in to change notification settings - Fork 0
UPLUS-126 RDB 기반 사용량 배치 시스템 보완 #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3c08cff
a4c4052
799c84b
65b403a
d82ca85
834fc41
67ae527
9deb486
bc58836
d78b888
1e73890
64a8b72
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| package com.project.controller; | ||
|
|
||
| import org.springframework.http.ResponseEntity; | ||
| import org.springframework.web.bind.annotation.PostMapping; | ||
| import org.springframework.web.bind.annotation.RequestMapping; | ||
| import org.springframework.web.bind.annotation.RestController; | ||
|
|
||
| import com.project.rdb.batch.orchestrator.UsageOrchestrator; | ||
|
|
||
| import lombok.RequiredArgsConstructor; | ||
|
|
||
| @RestController | ||
| @RequestMapping("/api/batch") | ||
| @RequiredArgsConstructor | ||
| public class BatchController { | ||
|
|
||
| private final UsageOrchestrator usageOrchestrator; | ||
|
|
||
| @PostMapping("/usage/run") | ||
| public ResponseEntity<String> runUsagePipeline() { | ||
| try { | ||
| usageOrchestrator.run(); | ||
| return ResponseEntity.ok("✅ usage batch pipeline started"); | ||
| } catch (Exception e) { | ||
| return ResponseEntity.internalServerError() | ||
| .body("❌ batch execution failed: " + e.getMessage()); | ||
| } | ||
| } | ||
| } | ||
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| package com.project.rdb.batch.notificationsend.dto; | ||
|
|
||
| import java.util.Map; | ||
| import java.util.UUID; | ||
|
|
||
| public record NotificationMessage( | ||
| UUID eventId, | ||
| Long id, | ||
| Long templateGroupId, | ||
| Map<String, Object> subscriptionInfo, | ||
| Map<String, Object> variables) {} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| package com.project.rdb.batch.notificationsend.dto; | ||
|
|
||
| import java.util.concurrent.CompletableFuture; | ||
|
|
||
| import org.springframework.kafka.support.SendResult; | ||
|
|
||
| public record NotificationSendTask( | ||
| NotificationMessage event, CompletableFuture<SendResult<String, String>> future) {} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,32 +1,40 @@ | ||
| package com.project.rdb.batch.notificationsend.processor; | ||
|
|
||
| import java.util.Map; | ||
| import java.util.UUID; | ||
|
|
||
| import org.springframework.batch.item.ItemProcessor; | ||
| import org.springframework.stereotype.Component; | ||
|
|
||
| import com.project.rdb.batch.model.dto.UsageNotificationEvent; | ||
| import com.project.rdb.batch.model.dto.UsageNotificationOutboxRow; | ||
| import com.project.rdb.batch.notificationsend.dto.NotificationMessage; | ||
| import com.project.rdb.batch.usagenotification.dto.UsageNotificationOutboxRow; | ||
|
|
||
| import lombok.extern.slf4j.Slf4j; | ||
|
|
||
| @Component | ||
| @Slf4j | ||
| public class NotificationSendProcessor | ||
| implements ItemProcessor<UsageNotificationOutboxRow, UsageNotificationEvent> { | ||
| implements ItemProcessor<UsageNotificationOutboxRow, NotificationMessage> { | ||
|
|
||
| @Override | ||
| public UsageNotificationEvent process(UsageNotificationOutboxRow item) { | ||
| public NotificationMessage process(UsageNotificationOutboxRow item) { | ||
|
|
||
| return new UsageNotificationEvent( | ||
| return new NotificationMessage( | ||
| UUID.randomUUID(), | ||
| item.id(), | ||
| item.subId(), | ||
| item.period(), | ||
| item.unit(), | ||
| item.threshold(), | ||
| item.percent(), | ||
| item.totalUsedMb(), | ||
| item.allotmentMb()); | ||
| 101L, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| Map.of( | ||
| "subId", item.subId(), | ||
| "phoneNumber", item.phoneNumber(), | ||
| "email", item.email()), | ||
| Map.of( | ||
| "period", item.period(), | ||
| "threshold", item.threshold(), | ||
| "percent", item.percent(), | ||
| "totalUsedMb", item.totalUsedMb(), | ||
| "allotmentMb", item.allotmentMb(), | ||
| "phoneNumber", item.phoneNumber(), | ||
| "email", item.email(), | ||
| "createdAt", item.createdAt())); | ||
|
Comment on lines
+30
to
+38
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Map.of(
"period", item.period(),
"threshold", item.threshold(),
"percent", item.percent(),
"totalUsedMb", item.totalUsedMb(),
"allotmentMb", item.allotmentMb(),
"createdAt", item.createdAt())); |
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -13,26 +13,26 @@ | |||||
| import org.springframework.stereotype.Component; | ||||||
|
|
||||||
| import com.fasterxml.jackson.databind.ObjectMapper; | ||||||
| import com.project.rdb.batch.model.dto.NotificationSendTask; | ||||||
| import com.project.rdb.batch.model.dto.UsageNotificationEvent; | ||||||
| import com.project.rdb.batch.model.repository.UsageNotificationOutboxRepository; | ||||||
| import com.project.rdb.batch.notificationsend.dto.NotificationMessage; | ||||||
| import com.project.rdb.batch.notificationsend.dto.NotificationSendTask; | ||||||
|
|
||||||
| import lombok.RequiredArgsConstructor; | ||||||
|
|
||||||
| @Component | ||||||
| @RequiredArgsConstructor | ||||||
| public class NotificationSendWriter implements ItemWriter<UsageNotificationEvent> { | ||||||
| public class NotificationSendWriter implements ItemWriter<NotificationMessage> { | ||||||
|
|
||||||
| private final KafkaTemplate<String, String> kafkaTemplate; | ||||||
| private final UsageNotificationOutboxRepository repository; | ||||||
| private final ObjectMapper objectMapper; | ||||||
| private static final int MAX_FAILURE_REASON_LENGTH = 255; | ||||||
|
|
||||||
| @Override | ||||||
| public void write(Chunk<? extends UsageNotificationEvent> items) { | ||||||
| public void write(Chunk<? extends NotificationMessage> items) { | ||||||
|
|
||||||
| // usageNotificationOutboxId 추출 | ||||||
| List<Long> ids = items.getItems().stream().map(UsageNotificationEvent::id).toList(); | ||||||
| List<Long> ids = items.getItems().stream().map(NotificationMessage::id).toList(); | ||||||
|
|
||||||
| // PROCESSING Status 먼저 DB 반영 | ||||||
| repository.markProcessing(ids); | ||||||
|
|
@@ -42,12 +42,13 @@ public void write(Chunk<? extends UsageNotificationEvent> items) { | |||||
|
|
||||||
| Map<Long, String> failedReasons = new HashMap<>(); | ||||||
|
|
||||||
| for (UsageNotificationEvent event : items) { | ||||||
| for (NotificationMessage event : items) { | ||||||
| try { | ||||||
| String payload = objectMapper.writeValueAsString(event); | ||||||
| String key = String.valueOf(event.subscriptionInfo().get("subId")); | ||||||
|
|
||||||
| CompletableFuture<SendResult<String, String>> future = | ||||||
| kafkaTemplate.send("notification-usage", event.subId().toString(), payload); | ||||||
| kafkaTemplate.send("noti-tp", key, payload); | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Kafka 메시지를 발행하는 토픽 이름이
Suggested change
|
||||||
|
|
||||||
| tasks.add(new NotificationSendTask(event, future)); | ||||||
| } catch (Exception e) { | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| package com.project.rdb.batch.orchestrator; | ||
|
|
||
| import java.time.LocalDateTime; | ||
| import java.util.Map; | ||
|
|
||
| import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; | ||
| import org.springframework.stereotype.Service; | ||
| import org.springframework.transaction.annotation.Transactional; | ||
|
|
||
| import lombok.RequiredArgsConstructor; | ||
|
|
||
| @Service | ||
| @RequiredArgsConstructor | ||
| public class BatchTimeWindowService { | ||
|
|
||
| private final NamedParameterJdbcTemplate jdbc; | ||
|
|
||
| public LocalDateTime resolve(String jobName) { | ||
| return jdbc.queryForObject( | ||
| """ | ||
| SELECT last_processed_at | ||
| FROM batch_watermark | ||
| WHERE job_name = :jobName | ||
| """, | ||
| Map.of("jobName", jobName), | ||
| LocalDateTime.class); | ||
| } | ||
|
|
||
| @Transactional | ||
| public void update(String jobName, LocalDateTime newFrom) { | ||
| jdbc.update( | ||
| """ | ||
| UPDATE batch_watermark | ||
| SET last_processed_at = :newFrom | ||
| WHERE job_name = :jobName | ||
| """, | ||
| Map.of( | ||
| "jobName", jobName, | ||
| "newFrom", newFrom)); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
runUsagePipeline()메서드는 배치 작업을 동기적으로 실행합니다. 배치 작업은 완료하는 데 시간이 오래 걸릴 수 있으므로, HTTP 요청이 타임아웃될 가능성이 매우 높습니다.JobLauncher를 비동기적으로 실행하도록 설정하거나, 별도의 스레드 풀(ExecutorService)을 사용하여 비동기적으로 작업을 시작하고 즉시202 Accepted와 같은 응답을 반환하는 것이 좋습니다.예를 들어,
SimpleAsyncTaskExecutor를 사용하여JobLauncher를 구성할 수 있습니다.이렇게 구성된
asyncJobLauncher를 주입받아 사용하면jobLauncher.run()호출이 즉시 반환됩니다.