[HCR-425] 새로 배포#70
Conversation
…atch This workflow triggers a dispatch event to the central orchestrator on push to the main branch.
[HCR-226] analysis-server-v0.0.5
[HCR-292] recommendation-server-v0.0.3
[HSC-345] analysis-server-v0.0.7
[HSC-348] recommendation-server-v0.0.4
[HSC-350] unify intelligence-server runtime
[HCS-364] analysis server cdc on으로 설정
[HSC-398] v1.1.0
Pinpoint 설정 및 의존성 추가
[HCR-419] 로그
[HCR-421] v1.1.2
- httpx 연결 풀 재사용
There was a problem hiding this comment.
Code Review
This pull request refactors the recommendation service into a modular package and introduces centralized lifecycle management for Kafka and OpenAI clients within the FastAPI lifespan. It also adds application mode configurations and enhances logging and tracing. Key feedback includes resolving a function signature mismatch for background tasks, ensuring locally created OpenAI clients are properly closed to prevent resource leaks, and migrating deprecated startup event handlers to the lifespan utility for improved maintainability.
| async def run_recommendation_and_publish_to_kafka(member_id: int) -> None: | ||
| try: | ||
| resp = await get_recommendation(session=None, member_id=member_id) | ||
| await publish_recommendation_to_kafka(member_id, resp) | ||
| except Exception as e: | ||
| logger.error("recommendation: 백그라운드 추천/Kafka 실패 member_id=%s: %s", member_id, e, exc_info=True) |
There was a problem hiding this comment.
함수 시그니처와 호출부 간의 인자 개수 불일치로 인해 런타임 오류가 발생할 수 있습니다. app/realtime/api/v1/recommendation.py에서는 trace_id를 포함하여 두 개의 인자를 전달하고 있으나, 현재 정의된 함수는 member_id 하나만 받도록 되어 있습니다. 또한, 전달받은 trace_id를 publish_recommendation_to_kafka 함수에도 전달하여 Kafka 페이로드에 포함되도록 수정해야 합니다.
| async def run_recommendation_and_publish_to_kafka(member_id: int) -> None: | |
| try: | |
| resp = await get_recommendation(session=None, member_id=member_id) | |
| await publish_recommendation_to_kafka(member_id, resp) | |
| except Exception as e: | |
| logger.error("recommendation: 백그라운드 추천/Kafka 실패 member_id=%s: %s", member_id, e, exc_info=True) | |
| async def run_recommendation_and_publish_to_kafka(member_id: int, trace_id: str | None = None) -> None: | |
| try: | |
| resp = await get_recommendation(session=None, member_id=member_id) | |
| await publish_recommendation_to_kafka(member_id, resp, trace_id=trace_id) | |
| except Exception as e: | |
| logger.error("recommendation: 백그라운드 추천/Kafka 실패 member_id=%s trace_id=%s: %s", member_id, trace_id, e, exc_info=True) |
| logging.info("APP_MODE: %s", runtime_settings.app_mode or "(unset)") | ||
| logging.info( | ||
| "CDC analysis enabled: %s", | ||
| runtime_settings.effective_cdc_analysis_enabled, | ||
| ) |
There was a problem hiding this comment.
애플리케이션의 주요 설정 정보(DB 연결 대상, APP_MODE, CDC 활성화 여부)는 effective_cdc_analysis_enabled 값과 관계없이 항상 로그로 남기는 것이 운영 및 디버깅 측면에서 유리합니다. 또한, 아래 on_event("startup") 블록에서 중복으로 수행하던 로깅을 이곳으로 통합하여 관리하는 것을 권장합니다.
| logging.info("APP_MODE: %s", runtime_settings.app_mode or "(unset)") | |
| logging.info( | |
| "CDC analysis enabled: %s", | |
| runtime_settings.effective_cdc_analysis_enabled, | |
| ) | |
| logging.info("DB 연결 대상: %s", _mask_database_url(runtime_settings.effective_database_url)) | |
| logging.info("APP_MODE: %s", runtime_settings.app_mode or "(unset)") | |
| logging.info("CDC analysis enabled: %s", runtime_settings.effective_cdc_analysis_enabled) | |
| if runtime_settings.effective_cdc_analysis_enabled: | |
| cdc_service = CdcAnalysisService(runtime_settings) | |
| await cdc_service.start() | |
| application.state.cdc_service = cdc_service |
| @application.on_event("startup") | ||
| async def log_db_target() -> None: | ||
| runtime_settings = get_settings() | ||
| url = runtime_settings.effective_database_url | ||
| logging.info("DB 연결 대상: %s", _mask_database_url(url)) | ||
| logging.info("CDC analysis enabled: %s", runtime_settings.cdc_analysis_enabled) | ||
| logging.info("APP_MODE: %s", runtime_settings.app_mode or "(unset)") | ||
| logging.info( | ||
| "CDC analysis enabled: %s", | ||
| runtime_settings.effective_cdc_analysis_enabled, | ||
| ) |
| if client is None: | ||
| client = AsyncOpenAI(api_key=api_key) |
There was a problem hiding this comment.
get_openai_client()가 None을 반환하여 새로운 AsyncOpenAI 클라이언트를 생성하는 경우, 해당 클라이언트는 이 함수 내에서만 사용되는 리소스이므로 사용이 끝난 후 await client.close()를 호출하여 커넥션 풀을 명시적으로 닫아주어야 합니다. 그렇지 않으면 리소스 누수가 발생할 수 있습니다.
client = get_openai_client()
owned_client = False
if client is None:
client = AsyncOpenAI(api_key=api_key)
owned_client = True
service = RecommendationService(settings=settings, client=client)
try:
return await service.recommend_for_member(member_id)
except Exception as e:
logger.exception(
"get_recommendation: OpenAI 또는 추천 파이프라인 예외 member_id=%s error_type=%s error=%s",
member_id,
type(e).__name__,
e,
)
return RecommendationResponse(
segment=Segment.normal,
cached_llm_recommendation="[일시 오류] 추천을 생성하지 못했습니다. 잠시 후 다시 시도해 주세요.(openai)",
recommended_products=[],
source="LIVE",
updated_at=utc_now_iso(),
)
finally:
if owned_client:
await client.close()
📝작업 내용
👀변경 사항
🎫 Jira Ticket
#️⃣관련 이슈