|
1 | | -import typer |
2 | | -import os |
3 | | -import threading |
4 | | -import json |
5 | | -from rich.console import Console |
6 | | -from rich.table import Table |
7 | | -from datetime import datetime, timedelta, timezone |
8 | | -from dotenv import load_dotenv |
9 | | - |
10 | | -from src.api import get_user_id, get_user_profile, get_user_posts, get_post_insights, fetch_all_posts, create_post, get_post_replies, get_post_replies_count |
11 | | -from src.utils import convert_to_locale |
12 | | - |
13 | | -app = typer.Typer() |
14 | | -console = Console() |
15 | | -load_dotenv() |
16 | | - |
17 | | -ACCESS_TOKEN = os.getenv("ACCESS_TOKEN") |
18 | | -HEADERS = { |
19 | | - 'Authorization': f'Bearer {ACCESS_TOKEN}' |
20 | | -} |
21 | | -DRAFTS_FILE = 'drafts.json' |
22 | | -SERVER_PROCESS_TIME = 10 |
23 | | - |
24 | | -@app.command() |
25 | | -def get_profile(): |
26 | | - """ |
27 | | - Retrieve and display user profile information, including the last post made by the user. |
28 | | - """ |
29 | | - user_id = get_user_id(HEADERS) |
30 | | - profile = get_user_profile(user_id, HEADERS) |
31 | | - last_post = get_user_posts(user_id, HEADERS, limit=1)[0] |
32 | | - |
33 | | - profile_table = Table(title=f'{profile["username"]}\'s Profile') |
34 | | - profile_table.add_column("Field", style="cyan", no_wrap=True) |
35 | | - profile_table.add_column("Value", style="magenta") |
36 | | - |
37 | | - profile_table.add_row("ID", profile.get("id", "N/A")) |
38 | | - profile_table.add_row("Username", profile.get("username", "N/A")) |
39 | | - profile_table.add_row("Profile Picture URL", profile.get("threads_profile_picture_url", "N/A")) |
40 | | - profile_table.add_row("Biography", profile.get("threads_biography", "N/A")) |
41 | | - if last_post: |
42 | | - profile_table.add_row("Last Post ID", last_post.get("id", "N/A")) |
43 | | - profile_table.add_row("Post Type", last_post.get("media_type", "N/A")) |
44 | | - profile_table.add_row("Post Text", last_post.get("text", "N/A")) |
45 | | - profile_table.add_row("Post Permalink", last_post.get("permalink", "N/A")) |
46 | | - profile_table.add_row("Post Timestamp", convert_to_locale(last_post.get("timestamp", "N/A"))) |
47 | | - else: |
48 | | - profile_table.add_row("Message", "No posts found") |
49 | | - |
50 | | - console.print(profile_table) |
51 | | - |
52 | | -@app.command() |
53 | | -def get_recent_posts(limit: int = 5): |
54 | | - """ |
55 | | - Retrieve the most recent posts. |
56 | | - """ |
57 | | - user_id = get_user_id(HEADERS) |
58 | | - posts = get_user_posts(user_id, HEADERS, limit=limit) |
59 | | - |
60 | | - table = Table(title="Recent Posts") |
61 | | - table.add_column("ID", style="cyan", no_wrap=True) |
62 | | - table.add_column("Username", style="cyan", no_wrap=True) |
63 | | - table.add_column("Timestamp", style="magenta") |
64 | | - table.add_column("Type", style="green") |
65 | | - table.add_column("Text", style="yellow") |
66 | | - table.add_column("Permalink", style="blue") |
67 | | - table.add_column("Replies", style="red") |
68 | | - |
69 | | - for post in posts: |
70 | | - if post.get('media_type') == 'REPOST_FACADE': |
71 | | - continue |
72 | | - timestamp = convert_to_locale(post.get('timestamp', 'N/A')) |
73 | | - replies_count = get_post_replies_count(post['id'], HEADERS) |
74 | | - table.add_row( |
75 | | - post.get('id', 'N/A'), |
76 | | - post.get('username', 'N/A'), |
77 | | - timestamp, |
78 | | - post.get('media_type', 'N/A'), |
79 | | - post.get('text', 'N/A'), |
80 | | - post.get('permalink', 'N/A'), |
81 | | - str(replies_count) |
82 | | - ) |
83 | | - |
84 | | - console.print(table) |
85 | | - |
86 | | -@app.command() |
87 | | -def get_top_liked_posts(limit: int = 5, time_range: str = None): |
88 | | - """ |
89 | | - Retrieve the top liked posts of all time or within a specific time range. |
90 | | - """ |
91 | | - user_id = get_user_id(HEADERS) |
92 | | - all_posts = fetch_all_posts(user_id, HEADERS) |
93 | | - |
94 | | - if time_range: |
95 | | - now = datetime.now(timezone.utc) |
96 | | - if time_range.endswith('w'): |
97 | | - weeks = int(time_range[:-1]) |
98 | | - start_time = now - timedelta(weeks=weeks) |
99 | | - elif time_range.endswith('d'): |
100 | | - days = int(time_range[:-1]) |
101 | | - start_time = now - timedelta(days=days) |
102 | | - elif time_range.endswith('h'): |
103 | | - hours = int(time_range[:-1]) |
104 | | - start_time = now - timedelta(hours=hours) |
105 | | - elif time_range.endswith('m'): |
106 | | - months = int(time_range[:-1]) |
107 | | - start_time = now - timedelta(days=30 * months) |
108 | | - else: |
109 | | - typer.echo("Invalid time range format. Use '2w' for 2 weeks, '7d' for 7 days, '24h' for 24 hours, or '7m' for 7 months.") |
110 | | - return |
111 | | - |
112 | | - all_posts = [post for post in all_posts if datetime.strptime(post['timestamp'], '%Y-%m-%dT%H:%M:%S%z') >= start_time] |
113 | | - |
114 | | - posts_with_likes = [] |
115 | | - for post in all_posts: |
116 | | - if post.get('media_type') == 'REPOST_FACADE': |
117 | | - continue |
118 | | - insights = get_post_insights(post['id'], HEADERS) |
119 | | - if 'likes' in insights: |
120 | | - posts_with_likes.append((post, insights['likes'])) |
121 | | - |
122 | | - posts_with_likes.sort(key=lambda x: x[1], reverse=True) |
123 | | - top_liked_posts = posts_with_likes[:limit] |
124 | | - |
125 | | - table = Table(title="Top Liked Posts") |
126 | | - table.add_column("Username", style="cyan", no_wrap=True) |
127 | | - table.add_column("Timestamp", style="magenta") |
128 | | - table.add_column("Type", style="green") |
129 | | - table.add_column("Text", style="yellow") |
130 | | - table.add_column("Permalink", style="blue") |
131 | | - table.add_column("Likes", style="red") |
132 | | - table.add_column("Replies", style="green") |
133 | | - table.add_column("Reposts", style="blue") |
134 | | - table.add_column("Quotes", style="yellow") |
135 | | - table.add_column("Views", style="cyan") |
136 | | - |
137 | | - for post, likes in top_liked_posts: |
138 | | - timestamp = convert_to_locale(post.get('timestamp', 'N/A')) |
139 | | - insights = get_post_insights(post['id'], HEADERS) |
140 | | - table.add_row( |
141 | | - post.get('username', 'N/A'), |
142 | | - timestamp, |
143 | | - post.get('media_type', 'N/A'), |
144 | | - post.get('text', 'N/A'), |
145 | | - post.get('permalink', 'N/A'), |
146 | | - str(insights.get('likes', 'N/A')), |
147 | | - str(insights.get('replies', 'N/A')), |
148 | | - str(insights.get('reposts', 'N/A')), |
149 | | - str(insights.get('quotes', 'N/A')), |
150 | | - str(insights.get('views', 'N/A')) |
151 | | - ) |
152 | | - |
153 | | - console.print(table) |
154 | | - |
155 | | -@app.command() |
156 | | -def create_text_post(text: str): |
157 | | - """ |
158 | | - Create a post with text. |
159 | | - """ |
160 | | - user_id = get_user_id(HEADERS) |
161 | | - payload = { |
162 | | - "media_type": "TEXT", |
163 | | - "text": text |
164 | | - } |
165 | | - post = create_post(user_id, HEADERS, payload) |
166 | | - typer.echo(f"Post created with ID: {post['id']}") |
167 | | - |
168 | | -@app.command() |
169 | | -def create_image_post(text: str, image_url: str): |
170 | | - """ |
171 | | - Create a post with an image. |
172 | | - """ |
173 | | - user_id = get_user_id(HEADERS) |
174 | | - payload = { |
175 | | - "media_type": "IMAGE", |
176 | | - "image_url": image_url, |
177 | | - "text": text |
178 | | - } |
179 | | - post = create_post(user_id, HEADERS, payload) |
180 | | - typer.echo(f"Post created with ID: {post['id']}") |
181 | | - |
182 | | -@app.command() |
183 | | -def get_latest_replies(media_id: str, limit: int = 5): |
184 | | - """ |
185 | | - Retrieve the latest replies for a specific media post. |
186 | | - """ |
187 | | - replies = get_post_replies(media_id, HEADERS, limit=limit) |
188 | | - |
189 | | - table = Table(title="Latest Replies") |
190 | | - table.add_column("Username", style="cyan", no_wrap=True) |
191 | | - table.add_column("Media ID", style="cyan", no_wrap=True) |
192 | | - table.add_column("Timestamp", style="magenta") |
193 | | - table.add_column("Text", style="yellow") |
194 | | - table.add_column("Permalink", style="blue") |
195 | | - |
196 | | - for reply in replies: |
197 | | - timestamp = convert_to_locale(reply.get('timestamp', 'N/A')) |
198 | | - table.add_row( |
199 | | - reply.get('username', 'N/A'), |
200 | | - reply.get('id', 'N/A'), |
201 | | - timestamp, |
202 | | - reply.get('text', 'N/A'), |
203 | | - reply.get('permalink', 'N/A') |
204 | | - ) |
205 | | - |
206 | | - console.print(table) |
207 | | - |
208 | | -@app.command() |
209 | | -def send_reply(media_id: str, text: str): |
210 | | - """ |
211 | | - Send a reply to a specific media post. |
212 | | - """ |
213 | | - user_id = get_user_id(HEADERS) |
214 | | - payload = { |
215 | | - "media_type": "TEXT", |
216 | | - "text": text, |
217 | | - "reply_to_id": media_id |
218 | | - } |
219 | | - reply = create_post(user_id, HEADERS, payload) |
220 | | - typer.echo(f"Reply created with ID: {reply['id']}") |
221 | | - |
222 | | -def job_create_text_post(text: str): |
223 | | - """ |
224 | | - Job function to create a post with text. |
225 | | - """ |
226 | | - create_text_post(text) |
227 | | - |
228 | | -@app.command() |
229 | | -def schedule_post(text: str, post_time: str): |
230 | | - """ |
231 | | - Schedule a post with text at a specific time. |
232 | | - """ |
233 | | - post_time_dt = datetime.strptime(post_time, '%Y-%m-%d %H:%M:%S') |
234 | | - current_time = datetime.now() |
235 | | - delay = (post_time_dt - current_time).total_seconds() |
236 | | - |
237 | | - if delay <= 0: |
238 | | - typer.echo("Scheduled time must be in the future.") |
239 | | - return |
240 | | - |
241 | | - timer = threading.Timer(delay, job_create_text_post, [text]) |
242 | | - timer.start() |
243 | | - typer.echo(f"Post scheduled for {post_time} with text: '{text}'") |
244 | | - |
245 | | -@app.command() |
246 | | -def create_draft(text: str, drafts_file: str = DRAFTS_FILE): |
247 | | - ''' |
248 | | - Create a draft with the given text and save it to the drafts file. |
249 | | - ''' |
250 | | - if os.path.exists(drafts_file): |
251 | | - with open(drafts_file, 'r') as file: |
252 | | - drafts = json.load(file) |
253 | | - else: |
254 | | - drafts = [] |
255 | | - |
256 | | - next_id = max([draft['id'] for draft in drafts], default=0) + 1 |
257 | | - |
258 | | - draft = { |
259 | | - "id": next_id, |
260 | | - "text": text, |
261 | | - "timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S') |
262 | | - } |
263 | | - drafts.append(draft) |
264 | | - |
265 | | - with open(drafts_file, 'w') as file: |
266 | | - json.dump(drafts, file, indent=4) |
267 | | - |
268 | | - typer.echo(f"Draft created with ID: {next_id}") |
269 | | - |
270 | | -@app.command() |
271 | | -def get_drafts(drafts_file: str = DRAFTS_FILE): |
272 | | - ''' |
273 | | - Get all drafts from the drafts file. |
274 | | - ''' |
275 | | - if not os.path.exists(drafts_file): |
276 | | - typer.echo("No drafts found.") |
277 | | - return |
278 | | - |
279 | | - with open(drafts_file, 'r') as file: |
280 | | - drafts = json.load(file) |
281 | | - |
282 | | - table = Table(title="Drafts") |
283 | | - table.add_column("ID", style="cyan", no_wrap=True) |
284 | | - table.add_column("Text", style="yellow") |
285 | | - table.add_column("Timestamp", style="magenta") |
286 | | - |
287 | | - for draft in drafts: |
288 | | - table.add_row( |
289 | | - str(draft['id']), |
290 | | - draft['text'], |
291 | | - draft['timestamp'] |
292 | | - ) |
293 | | - |
294 | | - console.print(table) |
295 | | - |
296 | | -@app.command() |
297 | | -def send_draft(draft_id: int, drafts_file: str = DRAFTS_FILE): |
298 | | - ''' |
299 | | - Send a draft with the given ID and remove it from the drafts file. |
300 | | - ''' |
301 | | - if not os.path.exists(drafts_file): |
302 | | - typer.echo("No drafts found.") |
303 | | - raise typer.Exit(1) |
304 | | - |
305 | | - with open(drafts_file, 'r') as file: |
306 | | - drafts = json.load(file) |
307 | | - |
308 | | - draft = next((draft for draft in drafts if draft['id'] == draft_id), None) |
309 | | - |
310 | | - if draft is None: |
311 | | - typer.echo(f"Draft with ID {draft_id} not found.") |
312 | | - raise typer.Exit(1) |
313 | | - |
314 | | - create_text_post(draft['text']) |
315 | | - |
316 | | - drafts = [draft for draft in drafts if draft['id'] != draft_id] |
317 | | - |
318 | | - with open(drafts_file, 'w') as file: |
319 | | - json.dump(drafts, file, indent=4) |
320 | | - |
321 | | - typer.echo(f"Draft with ID {draft_id} sent and removed from drafts.") |
| 1 | +import src.env |
| 2 | +from src.app import app |
322 | 3 |
|
323 | 4 | def main(): |
324 | 5 | app() |
|
0 commit comments