-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
138 lines (120 loc) · 5.93 KB
/
main.py
File metadata and controls
138 lines (120 loc) · 5.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from datetime import datetime, timezone
import os, openai, asyncio
from .database import database
from .models import schedules, metadata
from sqlalchemy import create_engine
from .tasks import publish_task
# Initialize OpenAI key if present
OPENAI_KEY = os.environ.get("OPENAI_API_KEY")
if OPENAI_KEY:
openai.api_key = OPENAI_KEY
DATABASE_URL = os.environ.get("DATABASE_URL", "sqlite:///./creator.db")
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
metadata.create_all(engine)
app = FastAPI(title="CreatorCommand Backend (with Celery)")
class ScheduleIn(BaseModel):
platform: str
caption: str
scheduled_time: datetime # client should send ISO datetime in UTC or with tz info
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.post("/generate/hashtags")
async def generate_hashtags(caption: str, limit: int = 8):
# Use OpenAI if available, else fallback heuristic
try:
if OPENAI_KEY:
prompt = f"""Generate {limit} creative, localized and platform-friendly hashtags for this caption. Caption: '{caption}'"""
resp = openai.ChatCompletion.create(
model="gpt-4o-mini",
messages=[{"role":"user","content":prompt}],
max_tokens=200,
)
text = resp['choices'][0]['message']['content'].strip()
# expect newline-separated or comma-separated list; split heuristically
tags = [t.strip() for t in text.replace('\n',',').split(',') if t.strip()][:limit]
return {"hashtags": tags}
else:
# fallback heuristic
words = [w.strip(".,!?:;#").lower() for w in caption.split() if len(w)>2]
candidates = []
for w in words:
if w.isalpha() and len(w) > 3:
candidates.append("#" + w)
generic = ["#foryou", "#viral", "#trending", "#shorts", "#kenya"]
tags = list(dict.fromkeys(candidates + generic))[:limit]
return {"hashtags": tags}
except Exception as e:
# fallback on error
words = [w.strip(".,!?:;#").lower() for w in caption.split() if len(w)>2]
candidates = []
for w in words:
if w.isalpha() and len(w) > 3:
candidates.append("#" + w)
generic = ["#foryou", "#viral", "#trending", "#shorts", "#kenya"]
tags = list(dict.fromkeys(candidates + generic))[:limit]
return {"hashtags": tags, "note": "openai_error"}
@app.post("/schedule")
async def schedule_post(item: ScheduleIn):
# Insert into DB and enqueue Celery task with ETA
query = schedules.insert().values(
platform=item.platform,
caption=item.caption,
scheduled_time=item.scheduled_time,
status='scheduled',
created_at=datetime.now(timezone.utc)
)
schedule_id = await database.execute(query)
# Enqueue Celery task with ETA equal to scheduled_time
# Celery expects naive UTC or timezone-aware datetime - we provide ISO
eta = item.scheduled_time.isoformat()
# Note: We pass eta as 'eta' argument to apply_async. Using string ISO will be parsed.
# Use publish_task.apply_async(args=[schedule_id, item.platform, item.caption], eta=item.scheduled_time)
publish_task.apply_async(args=[schedule_id, item.platform, item.caption], eta=item.scheduled_time)
return {"status":"scheduled", "schedule_id": schedule_id}
@app.get("/schedules")
async def list_schedules():
query = schedules.select().order_by(schedules.c.scheduled_time.desc())
rows = await database.fetch_all(query)
return [dict(r) for r in rows]
from fastapi.responses import RedirectResponse
from fastapi import Request
# --- OAuth stubs (placeholders) for TikTok and YouTube ---
# These endpoints demonstrate the OAuth redirect and callback flow.
# In production you must securely store client IDs/secrets and implement proper token exchange.
@app.get("/auth/tiktok")
def tiktok_auth():
# Redirect user to TikTok auth URL (example placeholder)
client_id = "TIKTOK_CLIENT_ID"
redirect_uri = os.environ.get("TIKTOK_REDIRECT_URI", "http://localhost:8000/auth/tiktok/callback")
# scope and state could be added
auth_url = f"https://open.tiktokapis.com/v2/oauth/authorize?client_key={client_id}&response_type=code&scope=user.info.basic&redirect_uri={redirect_uri}"
return RedirectResponse(url=auth_url)
@app.get("/auth/tiktok/callback")
async def tiktok_callback(request: Request):
# Placeholder to capture 'code' from query params and simulate token exchange
code = request.query_params.get('code')
if not code:
raise HTTPException(status_code=400, detail="Missing code in callback")
# In real app: exchange code for access token with TikTok token endpoint
# Save token securely and associate with user
return {"status":"ok", "platform":"tiktok", "note":"received_code", "code": code}
@app.get("/auth/youtube")
def youtube_auth():
client_id = os.environ.get("YOUTUBE_CLIENT_ID", "YOUTUBE_CLIENT_ID")
redirect_uri = os.environ.get("YOUTUBE_REDIRECT_URI", "http://localhost:8000/auth/youtube/callback")
scope = "https://www.googleapis.com/auth/youtube.upload https://www.googleapis.com/auth/youtube.readonly"
auth_url = f"https://accounts.google.com/o/oauth2/v2/auth?client_id={client_id}&response_type=code&scope={scope}&redirect_uri={redirect_uri}&access_type=offline&prompt=consent"
return RedirectResponse(url=auth_url)
@app.get("/auth/youtube/callback")
async def youtube_callback(request: Request):
code = request.query_params.get('code')
if not code:
raise HTTPException(status_code=400, detail="Missing code in callback")
# In real app: exchange code for tokens at Google's token endpoint
return {"status":"ok", "platform":"youtube", "note":"received_code", "code": code}