Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions app/infra/kafka/recommendation_producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""์ถ”์ฒœ ๊ฒฐ๊ณผ ๋ฐœํ–‰์šฉ Kafka producer. ์•ฑ lifespan ๋™์•ˆ 1๊ฐœ ์ธ์Šคํ„ด์Šค๋ฅผ ์žฌ์‚ฌ์šฉํ•œ๋‹ค."""

from __future__ import annotations

import json
import logging
from typing import Any

from app.core.config import Settings, get_settings
from app.infra.kafka.client_options import build_kafka_client_options

logger = logging.getLogger(__name__)

_producer: Any = None


async def start_recommendation_kafka_producer(settings: Settings | None = None) -> None:
"""bootstrap์ด ์„ค์ •๋œ ๊ฒฝ์šฐ์—๋งŒ producer๋ฅผ ์ƒ์„ฑยท์‹œ์ž‘ํ•œ๋‹ค. ์‹คํŒจ ์‹œ ๋กœ๊ทธ๋งŒ ๋‚จ๊ธฐ๊ณ  ๊ณ„์† ๊ธฐ๋™."""
global _producer
if _producer is not None:
return
try:
from aiokafka import AIOKafkaProducer
except ImportError:
logger.info("aiokafka ๋ฏธ์„ค์น˜, ์ถ”์ฒœ์šฉ Kafka producer ์ƒ๋žต")
return

s = settings or get_settings()
bootstrap = (s.kafka_bootstrap_servers or "").strip()
if not bootstrap:
logger.info("kafka_bootstrap_servers ๋น„์–ด ์žˆ์Œ, ์ถ”์ฒœ์šฉ Kafka producer ์ƒ๋žต")
return

producer = AIOKafkaProducer(
value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8"),
**build_kafka_client_options(s),
)
try:
await producer.start()
except Exception:
logger.exception("์ถ”์ฒœ์šฉ Kafka producer ์‹œ์ž‘ ์‹คํŒจ (bootstrap=%s)", bootstrap[:80])
return
_producer = producer
logger.info("์ถ”์ฒœ์šฉ Kafka producer ์‹œ์ž‘๋จ topic=%s", getattr(s, "kafka_recommendation_topic", ""))


async def stop_recommendation_kafka_producer() -> None:
global _producer
if _producer is None:
return
try:
await _producer.stop()
except Exception:
logger.exception("์ถ”์ฒœ์šฉ Kafka producer ์ข…๋ฃŒ ์ค‘ ์˜ค๋ฅ˜")
finally:
_producer = None
logger.info("์ถ”์ฒœ์šฉ Kafka producer ์ข…๋ฃŒ๋จ")


def get_recommendation_kafka_producer() -> Any:
"""์‹œ์ž‘๋œ ๊ณต์œ  producer. ์—†์œผ๋ฉด None (์Šคํฌ๋ฆฝํŠธยท๋ฏธ์„ค์ • ํ™˜๊ฒฝ)."""
return _producer
46 changes: 46 additions & 0 deletions app/infra/openai/app_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""์•ฑ lifespan์—์„œ ์œ ์ง€ํ•˜๋Š” AsyncOpenAI ํด๋ผ์ด์–ธํŠธ (์—ฐ๊ฒฐ ํ’€ ์žฌ์‚ฌ์šฉ)."""

from __future__ import annotations

import logging
from typing import Any

from app.core.config import Settings, get_settings

logger = logging.getLogger(__name__)

_client: Any = None


def start_openai_client(settings: Settings | None = None) -> None:
"""OPENAI_API_KEY๊ฐ€ ์žˆ์„ ๋•Œ๋งŒ ๊ณต์œ  ํด๋ผ์ด์–ธํŠธ๋ฅผ ๋งŒ๋“ ๋‹ค. ๋™๊ธฐ ํ•จ์ˆ˜(๋‚ด๋ถ€ I/O ์—†์Œ)."""
global _client
if _client is not None:
return
s = settings or get_settings()
api_key = (getattr(s, "openai_api_key", "") or "").strip()
if not api_key:
logger.info("OPENAI_API_KEY ๋น„์–ด ์žˆ์Œ, ๊ณต์œ  AsyncOpenAI ํด๋ผ์ด์–ธํŠธ ์ƒ๋žต")
return
from openai import AsyncOpenAI

_client = AsyncOpenAI(api_key=api_key)
logger.info("๊ณต์œ  AsyncOpenAI ํด๋ผ์ด์–ธํŠธ ์ƒ์„ฑ๋จ")


async def stop_openai_client() -> None:
global _client
if _client is None:
return
try:
await _client.close()
except Exception:
logger.exception("AsyncOpenAI close ์ค‘ ์˜ค๋ฅ˜")
finally:
_client = None
logger.info("๊ณต์œ  AsyncOpenAI ํด๋ผ์ด์–ธํŠธ ์ข…๋ฃŒ๋จ")


def get_openai_client() -> Any:
"""lifespan์—์„œ ์ดˆ๊ธฐํ™”๋œ ํด๋ผ์ด์–ธํŠธ. ์—†์œผ๋ฉด None (์Šคํฌ๋ฆฝํŠธยทํ‚ค ๋ฏธ์„ค์ • ๊ธฐ๋™)."""
return _client
18 changes: 17 additions & 1 deletion app/realtime/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,15 @@

from app.core.config import get_settings
from app.core.logging import configure_logging
from app.infra.pinpoint import build_fastapi_pinpoint_middleware
from app.infra.kafka.recommendation_producer import (
start_recommendation_kafka_producer,
stop_recommendation_kafka_producer,
)
from app.infra.openai.app_client import (
get_openai_client,
start_openai_client,
stop_openai_client,
)
from app.realtime.api.router import api_router
from app.services.cdc_analysis_service import CdcAnalysisService

Expand Down Expand Up @@ -42,9 +50,17 @@ async def lifespan(application: FastAPI):
runtime_settings.effective_cdc_analysis_enabled,
)

start_openai_client(runtime_settings)
application.state.openai_client = get_openai_client()

await start_recommendation_kafka_producer(runtime_settings)

try:
yield
finally:
await stop_recommendation_kafka_producer()
await stop_openai_client()
application.state.openai_client = None
if cdc_service is not None:
await cdc_service.stop()

Expand Down
17 changes: 17 additions & 0 deletions app/services/recommendation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""๊ฐœ์ธํ™” ์ถ”์ฒœ ํŒŒ์ดํ”„๋ผ์ธ (์ปจํ…์ŠคํŠธ ๋กœ๋“œยท๊ฒ€์ƒ‰ยทLLMยทKafka)."""

from .kafka_publish import publish_recommendation_to_kafka
from .service import (
RecommendationService,
get_recommendation,
run_recommendation_and_publish_to_kafka,
)
from .weights import compute_product_type_weights

__all__ = [
"RecommendationService",
"compute_product_type_weights",
"get_recommendation",
"publish_recommendation_to_kafka",
"run_recommendation_and_publish_to_kafka",
]
126 changes: 126 additions & 0 deletions app/services/recommendation/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""์ถ”์ฒœ ํŒŒ์ดํ”„๋ผ์ธ SQLยท์ƒ์ˆ˜."""

from sqlalchemy import bindparam, text

FETCH_MEMBER_LLM_CONTEXT_SQL = text("""
SELECT
member_id, membership, age_group, join_months, children_count,
family_group_num, family_role, persona_code, segment,
current_subscriptions, current_product_types, product_type_clicks, current_data_usage_ratio,
data_usage_pattern, churn_score, churn_tier, recent_counseling,
recent_viewed_tags_top_3, contract_expiry_within_3m, updated_at
FROM member_llm_context
WHERE member_id = :member_id
""")

FETCH_SUBSCRIPTION_PRICES_SQL = text("""
SELECT product_id, price, sale_price, product_type
FROM product
WHERE product_id IN :ids
""").bindparams(bindparam("ids", expanding=True))

SEARCH_SIMILAR_SQL = text("""
SELECT product_id
FROM product
WHERE embedding_vector IS NOT NULL
AND (NOT (product_id = ANY(:exclude_ids)))
ORDER BY embedding_vector <#> :query_vec
LIMIT :k
""")

SEARCH_SIMILAR_WITH_TYPE_BOOST_SQL = text("""
SELECT product_id
FROM product
WHERE embedding_vector IS NOT NULL
AND (NOT (product_id = ANY(:exclude_ids)))
ORDER BY (embedding_vector <#> :query_vec)
- (CASE
WHEN product_type = :boost_type1 THEN :boost1
WHEN product_type = :boost_type2 THEN :boost2
ELSE 0
END)
LIMIT :k
""")

SEARCH_SIMILAR_MAIN_TYPES_WINDOW_SQL = text("""
SELECT product_id, product_type, rn
FROM (
SELECT
product_id,
product_type,
ROW_NUMBER() OVER (
PARTITION BY product_type
ORDER BY embedding_vector <#> :query_vec
) AS rn
FROM product
WHERE embedding_vector IS NOT NULL
AND product_type IN :main_types
AND (NOT (product_id = ANY(:exclude_ids)))
) ranked
WHERE ranked.rn <= :per_type_k
""").bindparams(bindparam("main_types", expanding=True))

FETCH_PRODUCTS_FULL_SQL = text("""
SELECT
p.product_id, p.name, p.product_type, p.price, p.sale_price, p.tags, p.embedding_text,
COALESCE(mp.data_amount, tw.data_amount) AS data_amount
FROM product p
LEFT JOIN mobile_plan mp ON p.product_id = mp.product_id
LEFT JOIN tab_watch_plan tw ON p.product_id = tw.product_id
WHERE p.product_id IN :ids
""").bindparams(bindparam("ids", expanding=True))

FETCH_DEFAULT_PRODUCTS_SQL = text("""
SELECT
p.product_id, p.name, p.product_type, p.price, p.sale_price, p.tags, p.embedding_text,
NULL::integer AS data_amount
FROM product p
WHERE p.embedding_text IS NOT NULL
ORDER BY p.product_id
LIMIT :k
""")

RETRIEVAL_CANDIDATES_K = 50

MAIN_PRODUCT_TYPES: tuple[str, ...] = (
"MOBILE_PLAN",
"INTERNET",
"IPTV",
"TAB_WATCH_PLAN",
"ADDON",
)
RETRIEVAL_PER_TYPE_K = 10

DEFAULT_RETRIEVAL_QUERY = "ํ†ต์‹  ์š”๊ธˆ์ œ, ๋ฐ์ดํ„ฐ ์š”๊ธˆ์ œ, ๋ถ€๊ฐ€์„œ๋น„์Šค ์ถ”์ฒœ"

EMBEDDING_DIMENSION = 1536

UNLIMITED_DATA_TAG_MARKER = "๋ฌด์ œํ•œ"

MAX_PRODUCTS_PER_TYPE = 2

DEFAULT_PRODUCT_REASON_TEXT = "๊ณ ๊ฐ๋‹˜๊ป˜ ๊ฐ€์žฅ ์ ํ•ฉํ•œ ์ƒํ’ˆ์„ ์ถ”์ฒœ๋“œ๋ฆฝ๋‹ˆ๋‹ค!"
NO_CANDIDATE_MESSAGE = "์ถ”์ฒœํ•  ์ˆ˜ ์žˆ๋Š” ์ƒํ’ˆ์ด ์—†์Šต๋‹ˆ๋‹ค."
NO_MATCHED_MESSAGE = "์กฐ๊ฑด์— ๋งž๋Š” ์ถ”์ฒœ ์ƒํ’ˆ์ด ์—†์Šต๋‹ˆ๋‹ค."
FALLBACK_CACHED_MESSAGE = "๊ณ ๊ฐ๋‹˜์˜ ์ด์šฉ ํŒจํ„ด๊ณผ ๊ด€์‹ฌ์‚ฌ๋ฅผ ๋ฐ˜์˜ํ•œ ์š”๊ธˆ์ œยท๋ถ€๊ฐ€์„œ๋น„์Šค ์ถ”์ฒœ์ž…๋‹ˆ๋‹ค."
FALLBACK_VECTOR_SUMMARY = "๊ณ ๊ฐ๋‹˜๊ป˜ ์–ด์šธ๋ฆฌ๋Š” ์ƒํ’ˆ์„ ์ถ”์ฒœํ•ด๋“œ๋ฆด๊ฒŒ์š”!"

CHURN_MAX_PRICE_RATIO = 1.1

SEGMENT_WEIGHT_CONFIG: dict[str, dict[str, float]] = {
"CHURN_RISK": {
"current_type": 1.5,
"click": 0.7,
"tag": 0.5,
},
"UPSELL": {
"current_type": 1.0,
"click": 1.5,
"tag": 1.2,
},
"NORMAL": {
"current_type": 0.8,
"click": 1.3,
"tag": 1.3,
},
}
36 changes: 36 additions & 0 deletions app/services/recommendation/context_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""member_llm_context ๋กœ๋”ฉ."""

import logging

from sqlalchemy.ext.asyncio import AsyncSession

from .constants import FETCH_MEMBER_LLM_CONTEXT_SQL

logger = logging.getLogger(__name__)


async def load_member_llm_context(session: AsyncSession, member_id: int) -> dict | None:
try:
ctx_result = await session.execute(
FETCH_MEMBER_LLM_CONTEXT_SQL,
{"member_id": member_id},
)
row = ctx_result.fetchone()
if row is None:
logger.info(
"recommendation: member_llm_context ํ–‰ ์—†์Œ (member_id=%s). ํ…Œ์ด๋ธ”์€ ์žˆ์œผ๋‚˜ ํ•ด๋‹น ํšŒ์› ๋ฐ์ดํ„ฐ ์—†์Œ โ†’ ํด๋ฐฑ",
member_id,
)
return None
return dict(row._mapping) if hasattr(row, "_mapping") else dict(row)
except Exception as e:
logger.info(
"recommendation: member_llm_context ์กฐํšŒ ์‹คํŒจ โ†’ ํด๋ฐฑ ์˜ˆ์ • (member_id=%s, error=%s)",
member_id,
e,
)
try:
await session.rollback()
except Exception:
pass # best-effort: ์ด๋ฏธ ๋‹ซํžŒ ์„ธ์…˜ ๋“ฑ์—์„œ rollback ์‹คํŒจ ๊ฐ€๋Šฅ
return None
55 changes: 55 additions & 0 deletions app/services/recommendation/kafka_publish.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""์ถ”์ฒœ ๊ฒฐ๊ณผ Kafka ๋ฐœํ–‰."""

from __future__ import annotations

import json
import logging

try:
from aiokafka import AIOKafkaProducer
except Exception: # pragma: no cover
AIOKafkaProducer = None # type: ignore[assignment]

from app.core.config import get_settings
from app.infra.kafka.client_options import build_kafka_client_options
from app.infra.kafka.recommendation_producer import get_recommendation_kafka_producer
from app.schemas.recommendation import RecommendationResponse

logger = logging.getLogger(__name__)


async def publish_recommendation_to_kafka(
member_id: int,
response: RecommendationResponse,
trace_id: str | None = None,
) -> None:
settings = get_settings()
topic = getattr(settings, "kafka_recommendation_topic", "recommendation")
bootstrap = getattr(settings, "kafka_bootstrap_servers", "").strip()
if AIOKafkaProducer is None:
logger.warning("recommendation: aiokafka ๋ฏธ์„ค์น˜, Kafka ๋ฐœํ–‰ ์Šคํ‚ต member_id=%s", member_id)
return
if not bootstrap:
logger.warning("recommendation: Kafka ๋ฏธ์„ค์ •, ๋ฐœํ–‰ ์Šคํ‚ต member_id=%s", member_id)
return
tid = (trace_id or "").strip()
payload = {"traceId": tid, "memberId": member_id, **response.model_dump(by_alias=True)}

shared = get_recommendation_kafka_producer()
owned = False
producer = shared
if producer is None:
producer = AIOKafkaProducer(
value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8"),
**build_kafka_client_options(settings),
)
await producer.start()
owned = True
try:
await producer.send_and_wait(topic, value=payload, key=str(member_id).encode("utf-8"))
logger.info("recommendation: Kafka ๋ฐœํ–‰ ์™„๋ฃŒ member_id=%s topic=%s", member_id, topic)
except Exception as e:
logger.error("recommendation: Kafka ๋ฐœํ–‰ ์‹คํŒจ member_id=%s: %s", member_id, e, exc_info=True)
finally:
if owned:
await producer.stop()
Loading
Loading