Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion api/summary.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from fastapi import APIRouter
from service.summary import call_assistant_summary, AssistantSummary
from service.summary import call_assistant_summary
from config import settings
import requests
import json
from typing import List
from datetime import datetime
from pydantic import ValidationError
from fastapi import HTTPException
from pydantic import BaseModel, Field

router = APIRouter(tags=["Summary"])

Expand Down
1 change: 1 addition & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from fastapi import FastAPI
from api import health, icebreaking, summary, embedding
from service import auto

app = FastAPI(root_path="/ai")

Expand Down
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ pydantic
pydantic-settings
requests
openai==1.82.0
sentence-transformers
sentence-transformers
schedule
APScheduler>=3.11.0
358 changes: 358 additions & 0 deletions service/auto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,358 @@
import requests
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime, timedelta
from service.summary import (
extract_plain_reply,
get_summary,
AssistantSummary,
)
import requests
import os
import json
from config import settings
from openai import OpenAI
from datetime import datetime
from dotenv import load_dotenv

scheduler = AsyncIOScheduler()


# 전체 채팅방 리스트 요청
def fetch_roomList() -> list[dict]:
url = f"{settings.CHAT_SERVER_URL}/room/list/all"
response = requests.get(url)
response.raise_for_status()
rooms = response.json()

summary_scheduler(rooms)


# 전체 채팅방 스케줄링 요약
def summary_scheduler(rooms):
print(rooms)

for room in rooms:
room_id = room.get("_id")
print(room_id)

# 채팅 서버에 요약본 요청
raw_summary = get_summary(room_id)

# 프롬프팅
parsing_json_response = AssistantSummary.model_validate(raw_summary.json())
messages = summary_prompt(parsing_json_response)

# gpt 요약 요청
gpt_response_str = get_reply(messages)

# print(messages)
# print(gpt_response_str)

# 채팅 서버로 요약본 업데이트
post_summary_update(room_id, gpt_response_str, parsing_json_response)


# 매일 채팅방 업데이트 post 함수
def post_summary_update(
room_id: str, summary_json_str: str, parsed_summary: AssistantSummary
) -> None:
try:
payload = json.loads(summary_json_str)

except json.JSONDecodeError as e:
raise ValueError(f"요약 JSON 파싱 실패: {e}")

body = {
"roomId": parsed_summary.roomId,
"persona": [persona.model_dump() for persona in parsed_summary.persona],
"summaries": payload,
}

url = f"{settings.CHAT_SERVER_URL}/summary/update"
resp = requests.post(url, json=body, timeout=5)

resp.raise_for_status()
print(f"[{room_id}] 요약 업데이트 성공", [{resp}], resp.status_code)


def summary_prompt(response) -> list[dict]:

# open ai에 질의 메시지
main_prompt_content = f"""
### ROLE
너는 오늘 채팅방의 내용을 매우 심도있게 분석하여 요약하는 것이 역할이다.
두 사람이 오늘 나눈 대화 내역(today_messages)을 6개의 주제에 대화 내역에서 추출하여 각각 요약해야한다.
6개의 주제는 다음과 같다.
1. health_status
2. hobbies_and_interests
3. emotional_and_mental_state
4. relationship_context
5. social_activitie
6. intimacy

### 요약
{response.summaries}

### 오늘 나눈 대화 내역
{response.today_messages}


### TASK
1) 오늘 나눈 대화 내역을 읽고, 6개의 주제에 대한 내용을 추출하여 한 문장으로 요약한다.
2) 만일 일부 주제에 대한 내용을 추출할 내용을 없을 시에는 "주제에 대한 대화 없음"이라고 저장한다.
3) 반드시 기존에 있는 요약 내용에 추가하는 것이다. 대답할 때 오늘 나눈 대화 내역에서 각 주제에 추출한 내용을 이미 있는 내용에 day리스트 맨 뒤에 추가만 할 것.
4) {datetime.now()}가 현재 시각임으로 last_update에 반영할 것.
5) 금기) 반드시 요약 JSON 구조를 지키며 수정하지 말아라.
6) 금기) 반드시 기존의 요약에 day 리스트 맨 뒤에 요약 내용을 추가할 것.
6-1) 예시) 추가할 때의 형태는 ★{datetime.today().strftime("%m-%d")}: "요약 내용" 형태로 추가할 것.
"""

return [
{
"role": "system",
"content": "너는 한국어 어르신 대화 코치이다. 두 어르신의 건강을 고려해 한 문장으로 답한다.",
},
{"role": "user", "content": main_prompt_content},
# {"role": "user", "content": few_shot_example},
]


def weekly_summary_scheduler() -> None:
rooms = requests.get(f"{settings.CHAT_SERVER_URL}/room/list/all", timeout=5).json()

for room in rooms:
room_id = room["_id"]
resp = get_summary(room_id) # Response
summary_obj: AssistantSummary = AssistantSummary.model_validate(resp.json())

topics = [
"health_status",
"hobbies_and_interests",
"emotional_and_mental_state",
"relationship_context",
"social_activities",
"intimacy",
]

today = datetime.now()
week_start = (today - timedelta(days=6)).strftime("%m-%d")
week_end = today.strftime("%m-%d")
week_of_month = (today.day - 1) // 7 + 1
week_label = f"{today.month}월 {week_of_month}주차"
# 4) GPT 프롬프트 구성
prompt = f"""
지난 일주일({week_start} ~ {week_end}) 각 주제별로 day 리스트에 쌓인 항목을 읽고,
각 주제에 대해 한 문장으로 요약해라라
- 입력 데이터(딕셔너리):
{{
"""
for t in topics:
day_list = getattr(summary_obj.summaries, t).day
prompt += f' "{t}": {day_list}\n'
prompt += (
"}\n\n반드시 한 문장으로 요약하고, 요약 결과를 문자열로만 반환해 주세요.\n"
)

# 5) GPT 호출
messages = [
{"role": "system", "content": "너는 숙련된 요약 봇이다."},
{"role": "user", "content": prompt},
]
week_summaries = {}
for t in topics:

resp_text = extract_plain_reply(
get_reply(
[messages[0], {"role": "user", "content": f"{prompt}\n주제: {t}"}]
)
)
week_summaries[t] = f"★ {week_label}: {resp_text}"

for t in topics:
# week 리스트에 추가
getattr(summary_obj.summaries, t).week.append(week_summaries[t])
# day 리스트는 비워기기
getattr(summary_obj.summaries, t).day.clear()

summaries_json_str = summary_obj.summaries.model_dump_json()

summaries_dict = json.loads(summaries_json_str)

body = {
"roomId": summary_obj.roomId,
"persona": [p.model_dump() for p in summary_obj.persona],
"summaries": summaries_dict,
}
requests.post(
f"{settings.CHAT_SERVER_URL}/summary/update", json=body, timeout=5
).raise_for_status()
print(f"[{room_id}] 주간 요약 업데이트 완료")


def monthly_summary_scheduler() -> None:
"""
매월 1일 새벽 2시에, 지난 한 달간 week 리스트를
month 리스트에 한 문장 요약으로 추가하고
week 리스트는 비웁니다.
"""
rooms = requests.get(f"{settings.CHAT_SERVER_URL}/room/list/all", timeout=5).json()

# 오늘의 월 레이블 (예: 5월)
today = datetime.now()
month_label = f"{today.month}월"

topics = [
"health_status",
"hobbies_and_interests",
"emotional_and_mental_state",
"relationship_context",
"social_activities",
"intimacy",
]

for room in rooms:
room_id = room["_id"]
resp = get_summary(room_id)
summary_obj: AssistantSummary = AssistantSummary.model_validate(resp.json())

month_entries: dict[str, str] = {}
for topic in topics:
week_list = getattr(summary_obj.summaries, topic).week
prompt = (
f"지난 한 달간({month_label}) {topic} 주간 요약 항목:\n"
f"{week_list}\n"
"위 리스트를 읽고, 하나의 문장으로 요약해주세요."
)
messages = [
{"role": "system", "content": "너는 요약 전문가입니다."},
{"role": "user", "content": prompt},
]
resp_text = extract_plain_reply(get_reply(messages))
month_entries[topic] = f"★ {month_label}: {resp_text}"

for topic in topics:
getattr(summary_obj.summaries, topic).month.append(month_entries[topic])
getattr(summary_obj.summaries, topic).week.clear()

summaries_json = json.loads(summary_obj.summaries.model_dump_json())

body = {
"roomId": summary_obj.roomId,
"persona": [p.model_dump() for p in summary_obj.persona],
"summaries": summaries_json,
}

resp = requests.post(
f"{settings.CHAT_SERVER_URL}/summary/update", json=body, timeout=5
)
resp.raise_for_status()
print(f"[{room_id}] 월간 요약 업데이트 완료")


# OpenAI 호출
def get_reply(messages) -> str:
# open 인스턴스 생성
load_dotenv()
OpenAI.api_key = os.getenv("OPENAI_API_KEY")
client = OpenAI(api_key=OpenAI.api_key)

# gpt에 요청
gpt_response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
temperature=0.5,
)
return extract_plain_reply(gpt_response.choices[0].message.content.strip())


def yearly_summary_scheduler() -> None:
rooms = requests.get(f"{settings.CHAT_SERVER_URL}/room/list/all", timeout=5).json()

today = datetime.now()
year_label = f"{today.year}년"

topics = [
"health_status",
"hobbies_and_interests",
"emotional_and_mental_state",
"relationship_context",
"social_activities",
"intimacy",
]

for room in rooms:
room_id = room["_id"]

resp = get_summary(room_id)
summary_obj: AssistantSummary = AssistantSummary.model_validate(resp.json())

year_entries: dict[str, str] = {}
for topic in topics:
month_list = getattr(summary_obj.summaries, topic).month
prompt = (
f"지난 한 해({year_label}) {topic} 월간 요약 항목:\n"
f"{month_list}\n"
"위 리스트를 읽고, 하나의 문장으로 요약해주세요."
)
messages = [
{"role": "system", "content": "너는 연간 요약 전문가입니다."},
{"role": "user", "content": prompt},
]
resp_text = extract_plain_reply(get_reply(messages))
year_entries[topic] = f"★ {year_label}: {resp_text}"

for topic, entry in year_entries.items():
getattr(summary_obj.summaries, topic).year.append(entry)
getattr(summary_obj.summaries, topic).month.clear()

summaries_json = json.loads(summary_obj.summaries.model_dump_json())

body = {
"roomId": summary_obj.roomId,
"persona": [p.model_dump() for p in summary_obj.persona],
"summaries": summaries_json,
}

# 6) chat 서버에 업데이트
requests.post(
f"{settings.CHAT_SERVER_URL}/summary/update", json=body, timeout=5
).raise_for_status()
print(f"[{room_id}] 연간 요약 업데이트 완료")


###############스케줄러###################

jobs = [
(
"daily_summary_job",
fetch_roomList,
dict(hour=2, minute=0, second=0),
), # 매일 02:00:00
(
"weekly_summary_job",
weekly_summary_scheduler,
dict(day_of_week="mon", hour=3, minute=0),
), # 매주 월 03:00
(
"monthly_summary_job",
monthly_summary_scheduler,
dict(day="1", hour=4, minute=0),
), # 매달 1일 04:00
(
"yearly_summary_job",
yearly_summary_scheduler,
dict(month="1", day="1", hour=5, minute=0),
), # 매년 1월1일 05:00
]

for job_id, func, cron_kwargs in jobs:
trigger = CronTrigger(**cron_kwargs)
scheduler.add_job(
func,
trigger,
id=job_id,
replace_existing=True,
)

scheduler.start()
Loading