Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
40264dc
[HSC-195] chore: Add GitHub Actions workflow for central release disp…
tkv00 Mar 5, 2026
d470a90
Merge pull request #12 from one-year-gap/release/HSC-206
tkv00 Mar 5, 2026
452896d
Merge pull request #25 from one-year-gap/release/HSC-251
tkv00 Mar 9, 2026
118cca7
Merge pull request #31 from one-year-gap/release/HSC-288
tkv00 Mar 12, 2026
052f6fc
Merge pull request #36 from one-year-gap/release/HSC-294
tkv00 Mar 13, 2026
d57db30
Merge pull request #44 from one-year-gap/release/HSC-345
tkv00 Mar 17, 2026
da6e155
Merge pull request #47 from one-year-gap/release/HSC-348
tkv00 Mar 17, 2026
e9325c8
Merge pull request #54 from one-year-gap/refactor/HSC-350
tkv00 Mar 17, 2026
95ad423
[HSC-364] fix: analysis server cdc on으로 설정
tkv00 Mar 18, 2026
54fb06e
Merge pull request #56 from one-year-gap/fix/HSC-364
tkv00 Mar 18, 2026
098a6b1
Merge pull request #60 from one-year-gap/release/HSC-398
tkv00 Mar 19, 2026
027a367
config: Pinpoint 설정 추가
tkv00 Mar 28, 2026
8b0ce16
Merge branch 'main' into config/HSC-418
tkv00 Mar 28, 2026
baa939a
Merge pull request #62 from one-year-gap/config/HSC-418
tkv00 Mar 28, 2026
c95a562
[HSC-419] feat: 로그
rettooo Mar 31, 2026
2a84d4a
Merge pull request #64 from one-year-gap/hotfix/HSC-419
rettooo Mar 31, 2026
bbcba23
[HSC-421] chore: traceId 로그 추가
rettooo Mar 31, 2026
74a61ee
Merge pull request #67 from one-year-gap/hotfix/HSC-421
rettooo Mar 31, 2026
70c8bff
[HSC-422] chore: 로깅 traceId 추가
rettooo Mar 31, 2026
5ba5177
[HSC-422] feat: DB 한번에 실행 unionALL
rettooo Mar 31, 2026
5b843da
[HSC-422] refactor: producer 생성, start stop 앱 생성시 관리
rettooo Mar 31, 2026
8a5bb14
[HSC-422] refactor: main에 producer start, stop
rettooo Mar 31, 2026
817451e
[HSC-422] refactor: 공유 producer가 있으면 send_and_wait만 호출 ( 매번 start/sto…
rettooo Mar 31, 2026
d0306d7
[HSC-422] refactor: OpenAI 클라이언트 연결 풀
rettooo Mar 31, 2026
21d1565
[HSC-422] refactor: start openai client 한번 생성
rettooo Mar 31, 2026
d91ae2e
[HSC-422] chore: constants, utils, weights 모듈 분리
rettooo Mar 31, 2026
2379d61
[HSC-422] chore: context_loader, retrieval 분리
rettooo Mar 31, 2026
90f816b
[HSC-422] chore: LLM, 임베딩 파이프라인 분리
rettooo Mar 31, 2026
1604772
[HSC-422] chore: service, Kafka, 공개 API 및 recommendation service 래퍼
rettooo Mar 31, 2026
964d48d
Merge branch 'main' into hotfix/HSC-422
rettooo Mar 31, 2026
5e3c24d
Merge pull request #69 from one-year-gap/hotfix/HSC-422
rettooo Mar 31, 2026
e0cea71
[HSC-422] fix: 핀포인트 제거
rettooo Mar 31, 2026
7df777c
[HSC-422] fix: 프롬프트 길이 줄이기
rettooo Mar 31, 2026
7b81527
[HSC-425] chore: redeploy
rettooo Mar 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@
class Settings(BaseSettings):
app_name: str = "intelligence-server"
app_env: str = "local"
app_mode: str = Field(default="", validation_alias=AliasChoices("APP_MODE"))
debug: bool = False
cdc_analysis_enabled: bool = False
cdc_analysis_enabled: bool | None = Field(
default=None,
validation_alias=AliasChoices("CDC_ANALYSIS_ENABLED"),
)

api_v1_prefix: str = "/api/v1"
log_level: str = "INFO"
Expand Down Expand Up @@ -119,6 +123,13 @@ def effective_database_url(self) -> str:
return dsn.replace("postgres://", "postgresql+asyncpg://", 1)
return dsn

@property
def effective_cdc_analysis_enabled(self) -> bool:
if self.cdc_analysis_enabled is not None:
return self.cdc_analysis_enabled

return self.app_mode in {"server", "analysis-server"}


@lru_cache
def get_settings() -> Settings:
Expand Down
62 changes: 62 additions & 0 deletions app/infra/kafka/recommendation_producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""추천 결과 발행용 Kafka producer. 앱 lifespan 동안 1개 인스턴스를 재사용한다."""

from __future__ import annotations

import json
import logging
from typing import Any

from app.core.config import Settings, get_settings
from app.infra.kafka.client_options import build_kafka_client_options

logger = logging.getLogger(__name__)

_producer: Any = None


async def start_recommendation_kafka_producer(settings: Settings | None = None) -> None:
"""bootstrap이 설정된 경우에만 producer를 생성·시작한다. 실패 시 로그만 남기고 계속 기동."""
global _producer
if _producer is not None:
return
try:
from aiokafka import AIOKafkaProducer
except ImportError:
logger.info("aiokafka 미설치, 추천용 Kafka producer 생략")
return

s = settings or get_settings()
bootstrap = (s.kafka_bootstrap_servers or "").strip()
if not bootstrap:
logger.info("kafka_bootstrap_servers 비어 있음, 추천용 Kafka producer 생략")
return

producer = AIOKafkaProducer(
value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8"),
**build_kafka_client_options(s),
)
try:
await producer.start()
except Exception:
logger.exception("추천용 Kafka producer 시작 실패 (bootstrap=%s)", bootstrap[:80])
return
_producer = producer
logger.info("추천용 Kafka producer 시작됨 topic=%s", getattr(s, "kafka_recommendation_topic", ""))


async def stop_recommendation_kafka_producer() -> None:
global _producer
if _producer is None:
return
try:
await _producer.stop()
except Exception:
logger.exception("추천용 Kafka producer 종료 중 오류")
finally:
_producer = None
logger.info("추천용 Kafka producer 종료됨")


def get_recommendation_kafka_producer() -> Any:
"""시작된 공유 producer. 없으면 None (스크립트·미설정 환경)."""
return _producer
46 changes: 46 additions & 0 deletions app/infra/openai/app_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""앱 lifespan에서 유지하는 AsyncOpenAI 클라이언트 (연결 풀 재사용)."""

from __future__ import annotations

import logging
from typing import Any

from app.core.config import Settings, get_settings

logger = logging.getLogger(__name__)

_client: Any = None


def start_openai_client(settings: Settings | None = None) -> None:
"""OPENAI_API_KEY가 있을 때만 공유 클라이언트를 만든다. 동기 함수(내부 I/O 없음)."""
global _client
if _client is not None:
return
s = settings or get_settings()
api_key = (getattr(s, "openai_api_key", "") or "").strip()
if not api_key:
logger.info("OPENAI_API_KEY 비어 있음, 공유 AsyncOpenAI 클라이언트 생략")
return
from openai import AsyncOpenAI

_client = AsyncOpenAI(api_key=api_key)
logger.info("공유 AsyncOpenAI 클라이언트 생성됨")


async def stop_openai_client() -> None:
global _client
if _client is None:
return
try:
await _client.close()
except Exception:
logger.exception("AsyncOpenAI close 중 오류")
finally:
_client = None
logger.info("공유 AsyncOpenAI 클라이언트 종료됨")


def get_openai_client() -> Any:
"""lifespan에서 초기화된 클라이언트. 없으면 None (스크립트·키 미설정 기동)."""
return _client
25 changes: 23 additions & 2 deletions app/realtime/api/v1/recommendation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from fastapi import APIRouter, BackgroundTasks, Depends
import logging
import time
import uuid

from fastapi import APIRouter, BackgroundTasks, Depends, Request
from fastapi.responses import Response
from sqlalchemy.ext.asyncio import AsyncSession

Expand All @@ -7,18 +11,35 @@
from app.services.recommendation_service import run_recommendation_and_publish_to_kafka

router = APIRouter()
logger = logging.getLogger(__name__)


@router.post("/recommendations", status_code=202)
async def post_recommendations(
body: RecommendationRequest,
background_tasks: BackgroundTasks,
request: Request,
session: AsyncSession = Depends(get_db_session),
) -> Response:
"""
202 Accepted 즉시 반환. 백그라운드에서 추천 생성 후 Kafka recommendation-topic 발행.
Spring이 Kafka consume → persona_recommendation 적재 → CompletableFuture.complete(결과).
"""
_ = session
background_tasks.add_task(run_recommendation_and_publish_to_kafka, body.member_id)
t0 = time.monotonic()
trace_id = (
(request.headers.get("x-trace-id") or "").strip()
or uuid.uuid4().hex[:12]
)
member_id = body.member_id

logger.info("[REC][trace_id=%s][member_id=%s] fastapi_start", trace_id, member_id)
background_tasks.add_task(run_recommendation_and_publish_to_kafka, member_id, trace_id)
total_ms = int((time.monotonic() - t0) * 1000)
logger.info(
"[REC][trace_id=%s][member_id=%s] fastapi_end total_ms=%s",
trace_id,
member_id,
total_ms,
)
return Response(status_code=202, content=None)
36 changes: 32 additions & 4 deletions app/realtime/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@

from app.core.config import get_settings
from app.core.logging import configure_logging
from app.infra.kafka.recommendation_producer import (
start_recommendation_kafka_producer,
stop_recommendation_kafka_producer,
)
from app.infra.openai.app_client import (
get_openai_client,
start_openai_client,
stop_openai_client,
)
from app.realtime.api.router import api_router
from app.services.cdc_analysis_service import CdcAnalysisService

Expand All @@ -30,29 +39,48 @@ async def lifespan(application: FastAPI):
runtime_settings = get_settings()
cdc_service = None

if runtime_settings.cdc_analysis_enabled:
if runtime_settings.effective_cdc_analysis_enabled:
cdc_service = CdcAnalysisService(runtime_settings)
await cdc_service.start()
application.state.cdc_service = cdc_service
logging.info("CDC analysis service enabled inside unified intelligence runtime.")
logging.info("APP_MODE: %s", runtime_settings.app_mode or "(unset)")
logging.info(
"CDC analysis enabled: %s",
runtime_settings.effective_cdc_analysis_enabled,
)
Comment on lines +46 to +50
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

애플리케이션의 주요 설정 정보(DB 연결 대상, APP_MODE, CDC 활성화 여부)는 effective_cdc_analysis_enabled 값과 관계없이 항상 로그로 남기는 것이 운영 및 디버깅 측면에서 유리합니다. 또한, 아래 on_event("startup") 블록에서 중복으로 수행하던 로깅을 이곳으로 통합하여 관리하는 것을 권장합니다.

Suggested change
logging.info("APP_MODE: %s", runtime_settings.app_mode or "(unset)")
logging.info(
"CDC analysis enabled: %s",
runtime_settings.effective_cdc_analysis_enabled,
)
logging.info("DB 연결 대상: %s", _mask_database_url(runtime_settings.effective_database_url))
logging.info("APP_MODE: %s", runtime_settings.app_mode or "(unset)")
logging.info("CDC analysis enabled: %s", runtime_settings.effective_cdc_analysis_enabled)
if runtime_settings.effective_cdc_analysis_enabled:
cdc_service = CdcAnalysisService(runtime_settings)
await cdc_service.start()
application.state.cdc_service = cdc_service


start_openai_client(runtime_settings)
application.state.openai_client = get_openai_client()

await start_recommendation_kafka_producer(runtime_settings)

try:
yield
finally:
await stop_recommendation_kafka_producer()
await stop_openai_client()
application.state.openai_client = None
if cdc_service is not None:
await cdc_service.stop()


def create_app() -> FastAPI:
application = FastAPI(title=settings.app_name, lifespan=lifespan)
application = FastAPI(
title=settings.app_name,
lifespan=lifespan,
)
application.include_router(api_router, prefix=settings.api_v1_prefix)

@application.on_event("startup")
async def log_db_target() -> None:
runtime_settings = get_settings()
url = runtime_settings.effective_database_url
logging.info("DB 연결 대상: %s", _mask_database_url(url))
logging.info("CDC analysis enabled: %s", runtime_settings.cdc_analysis_enabled)
logging.info("APP_MODE: %s", runtime_settings.app_mode or "(unset)")
logging.info(
"CDC analysis enabled: %s",
runtime_settings.effective_cdc_analysis_enabled,
)
Comment on lines 74 to +83
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

FastAPI에서 @application.on_event("startup")은 더 이상 권장되지 않는(deprecated) 방식입니다. 이미 lifespan 핸들러가 구현되어 있으므로, 해당 로직(DB 연결 대상 및 설정 로깅)을 lifespan 내부로 이동시키고 이 블록은 삭제하는 것이 좋습니다. (이전 코멘트의 lifespan 수정 제안 참고)


@application.get("/")
async def root() -> dict[str, str]:
Expand Down
68 changes: 66 additions & 2 deletions app/services/persona_recommendation_prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
명세 8장: segment가 추천 제약, persona가 마케팅·reason 스타일 결정.
"""

from __future__ import annotations

import re


SEGMENT_SYSTEM_PROMPTS = {
"CHURN_RISK": """당신은 통신사 고객 이탈 방지 전문 추천 AI예요.
Expand Down Expand Up @@ -81,6 +85,62 @@ def get_persona_style_prompt(persona_code: str | None) -> str:
return PERSONA_STYLE_PROMPTS.get(code, PERSONA_STYLE_PROMPTS["SPACE_EXPLORER"])


def _extract_block(raw: str, label: str) -> str:
# [라벨] 다음 줄부터 다음 [라벨] 이전까지를 추출한다.
pattern = rf"{re.escape(label)}\s*(.*?)(?=\n\[[^\]]+\]|\Z)"
m = re.search(pattern, raw, flags=re.DOTALL)
return (m.group(1).strip() if m else "")


def _compact_embedding_text(text: str, max_chars: int = 250, max_target_reasons: int = 2) -> str:
raw = (text or "").strip()
if not raw:
return ""

name = _extract_block(raw, "[상품명]")
price = _extract_block(raw, "[가격]")
ptype = _extract_block(raw, "[상품유형]")
tags = _extract_block(raw, "[태그]")
target_section = _extract_block(raw, "[추천 대상/상황]")

compact_lines: list[str] = []
if name:
compact_lines.append(f"[상품명] {name}")
if price:
compact_lines.append(f"[가격] {price}")
if ptype:
compact_lines.append(f"[상품유형] {ptype}")
if tags:
tag_items = [t.strip() for t in tags.split(",") if t.strip()]
if tag_items:
compact_lines.append(f"[태그] {', '.join(tag_items[:2])}")

if target_section:
pairs: list[str] = []
chunks = [c.strip() for c in target_section.split("|") if c.strip()]
for chunk in chunks:
t = re.search(r"\[대상\]\s*([^\[\]|·]+)", chunk)
r = re.search(r"\[추천 이유\]\s*([^\[\]|]+)", chunk)
target = (t.group(1).strip() if t else "")
reason = (r.group(1).strip() if r else "")
if target and reason:
pairs.append(f"[대상/이유] {target} - {reason}")
if len(pairs) >= max_target_reasons:
break
compact_lines.extend(pairs)

compact = "\n".join(compact_lines).strip()
if compact:
return compact[:max_chars]

# 구조화 태그가 없는 경우 문장 기반 폴백
chunks = [s.strip() for s in re.split(r"[.!?\n]+", raw) if s and s.strip()]
fallback = ". ".join(chunks[:3]).strip()
if fallback and not fallback.endswith("."):
fallback += "."
return fallback[:max_chars]


def format_products(products: list[dict]) -> str:
"""LLM 유저 프롬프트에 넣을 상품 목록 문자열. 명세 8장 포맷."""
lines = []
Expand All @@ -94,7 +154,11 @@ def format_products(products: list[dict]) -> str:
tags_str = ", ".join(str(t) for t in tags)
else:
tags_str = str(tags) if tags else ""
embedding_text = (p.get("embedding_text") or "").strip()
embedding_text = _compact_embedding_text(
(p.get("embedding_text") or "").strip(),
max_chars=250,
max_target_reasons=2,
)
lines.append(
f"{i}. product_id: {p.get('product_id')}\n"
f" product_name: {product_name}\n"
Expand Down Expand Up @@ -128,7 +192,7 @@ def build_user_prompt(ctx: dict, products_text: str) -> str:
]
if recent_counseling:
blocks.append("## 최근 상담 이력")
blocks.append(recent_counseling[:800])
blocks.append(recent_counseling[:300])
blocks.append("## 후보 상품 목록 (이 중에서 최대 3개를 골라 추천하고, 각 product_id에 대해 reason을 작성하세요)")
blocks.append(products_text)

Expand Down
17 changes: 17 additions & 0 deletions app/services/recommendation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""개인화 추천 파이프라인 (컨텍스트 로드·검색·LLM·Kafka)."""

from .kafka_publish import publish_recommendation_to_kafka
from .service import (
RecommendationService,
get_recommendation,
run_recommendation_and_publish_to_kafka,
)
from .weights import compute_product_type_weights

__all__ = [
"RecommendationService",
"compute_product_type_weights",
"get_recommendation",
"publish_recommendation_to_kafka",
"run_recommendation_and_publish_to_kafka",
]
Loading
Loading