v1.2.0#69
Conversation
- httpx 연결 풀 재사용
There was a problem hiding this comment.
Code Review
This pull request refactors the recommendation service into a modular package and introduces centralized lifecycle management for Kafka and OpenAI clients. Key changes include the reorganization of recommendation logic into specialized modules and the integration of client initialization into the application's lifespan. Review feedback points out a potential resource leak regarding unclosed OpenAI clients and suggests improving code consistency and safety by using local variables and safe attribute access for model configurations.
| client = get_openai_client() | ||
| if client is None: | ||
| client = AsyncOpenAI(api_key=api_key) | ||
| service = RecommendationService(settings=settings, client=client) | ||
| try: | ||
| return await service.recommend_for_member(member_id) | ||
| except Exception as e: | ||
| logger.exception( | ||
| "get_recommendation: OpenAI 또는 추천 파이프라인 예외 member_id=%s error_type=%s error=%s", | ||
| member_id, | ||
| type(e).__name__, | ||
| e, | ||
| ) | ||
| return RecommendationResponse( | ||
| segment=Segment.normal, | ||
| cached_llm_recommendation="[일시 오류] 추천을 생성하지 못했습니다. 잠시 후 다시 시도해 주세요.(openai)", | ||
| recommended_products=[], | ||
| source="LIVE", | ||
| updated_at=utc_now_iso(), | ||
| ) |
There was a problem hiding this comment.
get_openai_client()가 None을 반환할 때 생성되는 AsyncOpenAI 인스턴스가 명시적으로 닫히지 않아 리소스 누수가 발생할 수 있습니다. owned 플래그와 finally 블록을 사용하여 생성된 클라이언트를 안전하게 닫아주어야 합니다.
client = get_openai_client()
owned = False
if client is None:
client = AsyncOpenAI(api_key=api_key)
owned = True
try:
service = RecommendationService(settings=settings, client=client)
return await service.recommend_for_member(member_id)
except Exception as e:
logger.exception(
"get_recommendation: OpenAI 또는 추천 파이프라인 예외 member_id=%s error_type=%s error=%s",
member_id,
type(e).__name__,
e,
)
return RecommendationResponse(
segment=Segment.normal,
cached_llm_recommendation="[일시 오류] 추천을 생성하지 못했습니다. 잠시 후 다시 시도해 주세요.(openai)",
recommended_products=[],
source="LIVE",
updated_at=utc_now_iso(),
)
finally:
if owned:
await client.close()|
|
||
| try: | ||
| emb_resp = await client.embeddings.create( | ||
| model=settings.openai_embedding_model, |
There was a problem hiding this comment.
| ) | ||
| try: | ||
| resp = await client.chat.completions.create( | ||
| model=settings.openai_chat_model, |
| (DEFAULT_RETRIEVAL_QUERY or "")[:60] + ("..." if len(DEFAULT_RETRIEVAL_QUERY or "") > 60 else ""), | ||
| ) | ||
| emb_resp = await client.embeddings.create( | ||
| model=settings.openai_embedding_model, |
| logger.info("recommendation: 폴백 LLM reason 생성 요청 상품=%d", len(summaries)) | ||
| reasons = await generate_recommendation_reasons( | ||
| client, | ||
| settings.openai_chat_model, |
📝작업 내용
Retrieval (DB)
MAIN_PRODUCT_TYPES별 벡터 검색을 순차 5회 쿼리 → 단일 윈도우 SQL (ROW_NUMBER() OVER (PARTITION BY product_type …))로 통합해 왕복 횟수를 줄였습니다.
구현 위치: app/services/recommendation/constants.py (SEARCH_SIMILAR_MAIN_TYPES_WINDOW_SQL), app/services/recommendation/retrieval.py.
Kafka
app/infra/kafka/recommendation_producer.py: 앱 lifespan에서 추천용 AIOKafkaProducer 시작/종료 (bootstrap 미설정·시작 실패 시 스킵/로그).
app/realtime/main.py: lifespan에 producer start/stop 연동.
kafka_publish: 공유 producer가 있으면 send_and_wait만 호출; 없으면 기존처럼 일회성 producer로 폴백 (스크립트·로컬 호환).
페이로드에 traceId 옵션 (publish_recommendation_to_kafka(..., trace_id=...)), 기본 빈 문자열.
OpenAI
app/infra/openai/app_client.py: OPENAI_API_KEY가 있을 때 공유 AsyncOpenAI 1개 생성, shutdown 시 close.
app/realtime/main.py: application.state.openai_client에도 참조 저장.
get_recommendation: 공유 클라이언트 우선, 없으면 기존처럼 요청 단위 생성.
구조 리팩터
app/services/recommendation/ 패키지로 분리:
constants, utils, weights, context_loader, retrieval, llm_recommendation, kafka_publish, service, init.py
app/services/recommendation_service.py: 위 패키지를 re-export하는 얇은 래퍼 (외부 import 유지).
기타
chore: 로깅 traceId 추가 커밋 포함 (추적용 로깅 보강).
retrieval_query_builder.py: 소규모 포맷 정리만 포함 (이 PR diff 기준).
👀변경 사항
🎫 Jira Ticket
#️⃣관련 이슈