Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6fef801
UPLUS-16 fix : git 충돌 해결
starboxxxx Jan 15, 2026
d5867f5
UPLUS-16 feat : Kafka 이벤트 발행을 위해 필요한 Schema 작성
starboxxxx Jan 15, 2026
fdeac3b
UPLUS-16 feat : Kafka 시간 처리를 위한 유틸 클래스 별도로 구성
starboxxxx Jan 15, 2026
931a5c1
UPLUS-16 feat : Kafka 데이터 사용량 집계 로직 구현
starboxxxx Jan 15, 2026
1bf302a
UPLUS-16 feat : 데이터 사용량 집계 및 임계치 초과 여부 확인 Redis Lua 스크립트 작성
starboxxxx Jan 15, 2026
7546536
UPLUS-16 feat : 요금제 변동시 이벤트 발행 및 처리 로직 구현
starboxxxx Jan 15, 2026
e18f1d0
UPLUS-16 fix : git 충돌 해결
starboxxxx Jan 15, 2026
268b4dd
UPLUS-16 feat : Kafka 유실 데이터 처리를 위한 별도의 Producer 임시 구현
starboxxxx Jan 15, 2026
7ab7cd4
UPLUS-16 feat : Test 코드 작성
starboxxxx Jan 15, 2026
87b238e
UPLUS-16 feat : 불필요 스키마 제거
starboxxxx Jan 15, 2026
c3364c5
UPLUS-16 feat : 불필요 의존성 제거
starboxxxx Jan 15, 2026
e712b24
UPLUS-16 feat : 불필요 코드 삭제
starboxxxx Jan 15, 2026
9bc5c1e
UPLUS-16 feat : 임시로 작성된 유실 데이터 Producer 코드 제거
starboxxxx Jan 15, 2026
84c3f1e
UPLUS-16 fix : git 충돌 해결
starboxxxx Jan 15, 2026
e417b82
UPLUS-16 feat : CI checkStyle 에러 해결
starboxxxx Jan 15, 2026
d287357
UPLUS-16 feat : CI checkStyle 에러 해결
starboxxxx Jan 15, 2026
85d5817
UPLUS-16 feat : CI checkStyle 에러 해결
starboxxxx Jan 15, 2026
71b68c5
UPLUS-16 feat : CI checkStyle 에러 해결
starboxxxx Jan 15, 2026
68d5010
UPLUS-16 feat : CI checkStyle 에러 해결
starboxxxx Jan 15, 2026
a8341d1
UPLUS-16 feat : CI checkStyle 에러 해결
starboxxxx Jan 15, 2026
0e8a253
UPLUS-16 feat : CI checkStyle 에러 해결
starboxxxx Jan 15, 2026
6667402
UPLUS-16 feat : Lua Script 별도의 정적 파일로 분리
starboxxxx Jan 15, 2026
38b1e54
UPLUS-16 feat : spotlessApply 에러 해결
starboxxxx Jan 15, 2026
e96770b
UPLUS-16 feat : CI 에러 해결
starboxxxx Jan 15, 2026
74055eb
UPLUS-16 fix : splotless 에러 해결
starboxxxx Jan 19, 2026
5b4fed1
UPLUS-16 fix : checksyle 에러 해결
starboxxxx Jan 19, 2026
ce5ba61
UPLUS-16 fix : sonarCube 에러 해결
starboxxxx Jan 19, 2026
ec7e45b
UPLUS-16 fix : sonarCube 에러 해결
starboxxxx Jan 19, 2026
a46971f
UPLUS-16 fix : sonarCube 에러 해결
starboxxxx Jan 19, 2026
83f9775
UPLUS-16 fix : sonarCube 에러 해결
starboxxxx Jan 19, 2026
3ef6104
UPLUS-16 fix : spotless 에러 해결
starboxxxx Jan 19, 2026
8930e24
UPLUS-16 fix : checkstyle 에러 해결
starboxxxx Jan 19, 2026
bd24aa1
UPLUS-16 fix : spotless 에러 해결
starboxxxx Jan 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file modified gradlew
100644 → 100755
Empty file.
53 changes: 53 additions & 0 deletions src/main/java/com/project/consumer/PlanChangeConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.project.consumer;

import java.util.ArrayList;
import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.project.consumer.util.PlanChangeUtil;
import com.project.consumer.util.RedisUtil;
import com.project.global.exception.ApplicationException;
import com.project.global.exception.code.domain.GlobalErrorCode;
import com.project.producer.schema.CalculatedLimitSchema;
import com.project.producer.schema.PlanChangeSchema;

import lombok.RequiredArgsConstructor;

@Component
@RequiredArgsConstructor
public class PlanChangeConsumer {

private final ObjectMapper objectMapper;
private final PlanChangeUtil planChangeUtil;
private final RedisUtil redisUtil;

@KafkaListener(
id = "plan-change-consumer",
topics = "change_plan",
groupId = "plan-change-consumer",
containerFactory = "batchKafkaListenerContainerFactory")
public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
if (records == null || records.isEmpty()) {
ack.acknowledge();
return;
}
List<PlanChangeSchema> events = new ArrayList<>(records.size());
try {
for (ConsumerRecord<String, String> rec : records) {
events.add(objectMapper.readValue(rec.value(), PlanChangeSchema.class));
}

List<CalculatedLimitSchema> limits = planChangeUtil.calculate(events);
redisUtil.writePlanChangeBatch(limits);

ack.acknowledge();
} catch (Exception e) {
throw new ApplicationException(GlobalErrorCode.PLAN_CHANGE_EVENT_PRODUCE_INVALID);
}
}
}
67 changes: 67 additions & 0 deletions src/main/java/com/project/consumer/UsageConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.project.consumer;

import java.util.ArrayList;
import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.project.consumer.util.RedisUtil;
import com.project.global.exception.ApplicationException;
import com.project.global.exception.code.domain.GlobalErrorCode;
import com.project.producer.NotificationProducer;
import com.project.producer.schema.UsageEventSchema;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
@RequiredArgsConstructor
public class UsageConsumer {

private final ObjectMapper objectMapper;
private final RedisUtil redisUtil;
private final NotificationProducer notificationProducer;

@KafkaListener(
id = "usage-batch-consumer",
topics = "usage-data",
groupId = "usage-consumer",
containerFactory = "batchKafkaListenerContainerFactory")
public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {

if (records == null || records.isEmpty()) {
ack.acknowledge();
return;
}

List<UsageEventSchema> events = new ArrayList<>(records.size());

try {

for (ConsumerRecord<String, String> rec : records) {
UsageEventSchema schema =
objectMapper.readValue(rec.value(), UsageEventSchema.class);
events.add(schema);
}

List<String> notifications = redisUtil.applyUsageBatch(events);

// 임계치 넘은 것만 notification_topic으로 발행
for (String notification : notifications) {
notificationProducer.sendNotification(notification);
}

// 성공한 배치 ack
ack.acknowledge();
} catch (Exception e) {
log.error("usage batch failed", e);
// 여기서 ack 안 하면 같은 배치가 재시도됨 (at-least-once)
throw new ApplicationException(GlobalErrorCode.NOTIFICATION_EVENT_PRODUCE_INVALID);
}
}
}
29 changes: 29 additions & 0 deletions src/main/java/com/project/consumer/util/LuaScriptLoader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.project.consumer.util;

import java.io.InputStream;
import java.nio.charset.StandardCharsets;

import com.project.global.exception.ApplicationException;
import com.project.global.exception.code.domain.GlobalErrorCode;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class LuaScriptLoader {

public static String load(String path) {
try (InputStream is = LuaScriptLoader.class.getClassLoader().getResourceAsStream(path)) {

if (is == null) {
throw new IllegalStateException("Lua script not found: " + path);
}

return new String(is.readAllBytes(), StandardCharsets.UTF_8);
} catch (Exception e) {
log.error("Failed to load lua script");
throw new ApplicationException(GlobalErrorCode.LUA_SCRIPT_LOAD_INVALID);
}
}

private LuaScriptLoader() {}
}
108 changes: 108 additions & 0 deletions src/main/java/com/project/consumer/util/PlanChangeUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.project.consumer.util;

import static com.project.consumer.util.UsageTimeUtil.toYearMonth;

import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.YearMonth;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import com.project.producer.schema.CalculatedLimitSchema;
import com.project.producer.schema.PlanChangeSchema;
import com.project.producer.test.PlanUnit;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Service
@RequiredArgsConstructor
public class PlanChangeUtil {

private final StringRedisTemplate redisTemplate;

public List<CalculatedLimitSchema> calculate(List<PlanChangeSchema> events) {

List<CalculatedLimitSchema> results = new ArrayList<>();

String yearMonth;
long finalLimit;
String processedKey;
Long added;

for (PlanChangeSchema event : events) {
yearMonth = toYearMonth(event.changedAt().toString());

processedKey = "processed:plan:" + yearMonth + ":" + event.subscriptionId();
added = redisTemplate.opsForSet().add(processedKey, event.eventId());

redisTemplate.expire(
processedKey,
Duration.ofSeconds(
UsageTimeUtil.ttlToNextMonthWithBufferSec(
event.changedAt().toString(), 2)));

if (added == null || added == 0L) {
// 이미 처리된 요금제 변경 이벤트 → skip
continue;
}

String unitKey = "plan:unit:" + event.subscriptionId();
String prevUnit = redisTemplate.opsForValue().get(unitKey);

if (event.unit().equals(PlanUnit.ULTIMATE)) {
finalLimit = -1L;
} else if (event.unit().equals(PlanUnit.DAY)) {
finalLimit = event.allowanceAmount();
} else {
finalLimit = getMonthFinalLimit(yearMonth, event, prevUnit);
}

long ttlSec =
UsageTimeUtil.ttlToNextMonthWithBufferSec(event.changedAt().toString(), 2);

results.add(
new CalculatedLimitSchema(
event.subscriptionId(), yearMonth, finalLimit, ttlSec, event.unit()));
}

return results;
}

private long getPreviousLimit(String key) {
String value = redisTemplate.opsForValue().get(key);
return value == null ? 0L : Long.parseLong(value);
}

// 사용자가 월 기준 중도에 요금제를 변경했을 경우 사용 가능 데이터 집계 로직
private long getMonthFinalLimit(String yearMonth, PlanChangeSchema event, String prevUnit) {
OffsetDateTime kstTime =
event.changedAt().atZoneSameInstant(ZoneId.of("Asia/Seoul")).toOffsetDateTime();

YearMonth month = YearMonth.from(kstTime);
int totalDays = month.lengthOfMonth();
int changeDay = kstTime.getDayOfMonth();

int daysBefore = changeDay - 1;
int daysAfter = totalDays - daysBefore;

// 만약 전 요금제가 무제한 or DAY 요금제 였다면 새로 추가된 요금제의 데이터 양만 계산
if (!"MONTH".equals(prevUnit)) {
return event.allowanceAmount() * daysAfter / totalDays;
}

String limitKey = "limit:" + yearMonth + ":" + event.subscriptionId();

long prevLimit = getPreviousLimit(limitKey);

long allowanceBefore = prevLimit * daysBefore / totalDays;
long allowanceAfter = event.allowanceAmount() * daysAfter / totalDays;

return allowanceBefore + allowanceAfter;
}
}
90 changes: 90 additions & 0 deletions src/main/java/com/project/consumer/util/RedisUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.project.consumer.util;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.stereotype.Service;

import com.project.producer.schema.CalculatedLimitSchema;
import com.project.producer.schema.UsageEventSchema;

import lombok.RequiredArgsConstructor;

@Service
@RequiredArgsConstructor
public class RedisUtil {

private final StringRedisTemplate redisTemplate;

private final DefaultRedisScript<List<String>> script =
new DefaultRedisScript<>(
LuaScriptLoader.load("lua/usage_batch.lua"),
(Class<List<String>>) (Class<?>) List.class);

public List<String> applyUsageBatch(List<UsageEventSchema> events) {
if (events == null || events.isEmpty()) {
return List.of();
}

List<String> args = new ArrayList<>();
args.add(String.valueOf(events.size()));

for (UsageEventSchema e : events) {

args.add(String.valueOf(e.subscriptionId()));
args.add(e.eventId());
args.add(String.valueOf(e.usageBytes()));
args.add(e.timeStamp());
}

Object result = redisTemplate.execute(script, Collections.emptyList(), args.toArray());
return result == null ? List.of() : (List<String>) result;
}

public void writePlanChangeBatch(List<CalculatedLimitSchema> limits) {
redisTemplate.executePipelined(
(RedisCallback<Void>)
connection -> {
for (CalculatedLimitSchema limit : limits) {
String ke =
"limit:" + limit.yearMonth() + ":" + limit.subscriptionId();

byte[] key = redisTemplate.getStringSerializer().serialize(ke);
byte[] value =
redisTemplate
.getStringSerializer()
.serialize(String.valueOf(limit.limit()));

connection
.stringCommands()
.set(
key,
value,
Expiration.seconds(limit.ttlSec()),
RedisStringCommands.SetOption.UPSERT);

String unitKey = "plan:unit:" + limit.subscriptionId();
byte[] uk = redisTemplate.getStringSerializer().serialize(unitKey);
byte[] uv =
redisTemplate
.getStringSerializer()
.serialize(limit.unit().name());

connection
.stringCommands()
.set(
uk,
uv,
Expiration.seconds(limit.ttlSec()),
RedisStringCommands.SetOption.UPSERT);
}
return null;
});
}
}
30 changes: 30 additions & 0 deletions src/main/java/com/project/consumer/util/UsageTimeUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.project.consumer.util;

import java.time.Duration;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;

public class UsageTimeUtil {

private static final DateTimeFormatter ISO = DateTimeFormatter.ISO_OFFSET_DATE_TIME;

private UsageTimeUtil() {}

public static String toYearMonth(String isoTs) {
OffsetDateTime odt = OffsetDateTime.parse(isoTs, ISO);
return odt.format(DateTimeFormatter.ofPattern("yyyyMM"));
}

public static long ttlToNextMonthWithBufferSec(String isoTs, int bufferDays) {
OffsetDateTime now = OffsetDateTime.parse(isoTs, ISO);

OffsetDateTime nextMonthStart =
now.withDayOfMonth(1).with(LocalTime.MIDNIGHT).plusMonths(1);

Duration base = Duration.between(now, nextMonthStart);
Duration buffer = Duration.ofDays(bufferDays);

return Math.max(base.plus(buffer).getSeconds(), 3600);
}
}
Loading
Loading