Skip to content

refactor: queue clear #76

Open
KoalaBryson wants to merge 2 commits intocapfrom
refactor/queue-handler
Open

refactor: queue clear #76
KoalaBryson wants to merge 2 commits intocapfrom
refactor/queue-handler

Conversation

@KoalaBryson
Copy link
Collaborator

No description provided.

…a, GET /queue);

Change-Id: I1f9890aa249a2aaff0fdbc10bcda3e130b70a4e3
Co-developed-by: Cursor <noreply@cursor.com>
Made-with: Cursor
@KoalaBryson KoalaBryson force-pushed the refactor/queue-handler branch from ef34494 to 91f155a Compare March 12, 2026 02:27
@@ -0,0 +1,96 @@
"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fc_client.py 和 fc_stop_client.py 是重复的,是不是只保留一个就可以?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review — refactor/queue-handler
严重问题 (Blocker)

  1. fc_stop_client.py 和 fc_stop_client_test.py 是冗余的文件

fc_stop_client.py 与 fc_client.py 的逻辑 100% 重复,唯一区别是没有提取 _create_client 辅助函数。init.py 已经正确地从 fc_client 导出,queue_handler 也使用 fc_client,fc_stop_client 是孤立的死文件。必须删除,否则后续维护两套代码。

  1. queue_handler.py 中 except Exception 吞掉了非超时异常

第 248-249 行

except Exception:
timed_out = True
as_completed(..., timeout=...) 超时抛出的是 concurrent.futures.TimeoutError,但这里捕获了所有 Exception,若 as_completed 内部出现其他异常(如 RuntimeError),也会被误认为超时。应改为:

from concurrent.futures import TimeoutError as FuturesTimeoutError
except FuturesTimeoutError:
timed_out = True
3. queue_handler.py 中 stop_async_task 是函数内延迟导入,违反编码规范

第 218 行(函数体内)

from services.fc_openapi.fc_client import stop_async_task
项目规则 1.2 要求导入统一放在模块顶部,这里放在函数体内会导致每次调用时才解析模块(虽有缓存,但逻辑不清晰),应移至文件顶部。

中等问题 (Major)
4. history_manager.py 中"解析 prompt_body"逻辑重复了两遍

_build_history_item(第 337-354 行)和 late_init_history_item(第 371-384 行)中几乎完全相同的一段解析代码:

outputs_to_execute = []
prompt_dict = prompt_body or {}
if isinstance(prompt_dict, dict):
if "prompt" in prompt_dict and ...:
outputs_to_execute = prompt_dict.get("outputs_to_execute", [])
prompt_dict = prompt_dict["prompt"]
if not outputs_to_execute and isinstance(prompt_dict, dict):
outputs_to_execute = infer_outputs_to_execute(prompt_dict)
raw = prompt_body if isinstance(prompt_body, dict) else {}
extra_data = dict(raw.get("extra_data", {})) ...
应提取为一个私有方法 _parse_prompt_body(prompt_body, client_id) -> (prompt_dict, extra_data, outputs_to_execute) 消除重复。

  1. interrupt_current_user 和 get_current_user_running_task_ids 是死代码

TaskManager 中实现了这两个方法,但 InterruptHandler 直接返回 403,完全没有调用。若这个功能是 "暂不支持",这两个方法不应该存在于此次 PR 中,要么删除,要么在 PR 描述中明确说明"为后续 interrupt 功能预留"。

  1. task_manager.py 中 delete_history_items 存在 TOCTOU race condition

item = self._history_manager.get_history_item(prompt_id) # 加锁读
if item and item.get("user_id") == user_id:
if self._history_manager.remove_history_item(prompt_id): # 加锁写
两次操作各自持锁,但中间无原子性保证,另一线程可能在 get 和 remove 之间删除或修改该 item。应在 HistoryManager 中提供一个原子的 remove_if_owned(prompt_id, user_id) 方法。

轻微问题 (Minor)
7. queue_handler.py 中 _build_task_info 有永远为真的冗余判断

prompt_dict = prompt_body if isinstance(prompt_body, dict) else {}
if isinstance(prompt_dict, dict): # ← 永远为真
if "prompt" in prompt_dict and isinstance(prompt_dict.get("prompt"), dict):
第二个 isinstance(prompt_dict, dict) 可直接删除。

  1. ThreadPoolExecutor 未使用 with 语法,资源管理不健壮

ex = ThreadPoolExecutor(max_workers=workers)
try:
...
finally:
ex.shutdown(wait=not timed_out)
若 ex.submit(...) 之前(futs = {ex.submit(...)} 这行)抛异常,finally 不会执行,线程池泄漏。若用 with ThreadPoolExecutor(...) as ex:,则 exit 保证一定会 shutdown。超时时不想等待的需求,可以通过先 cancel 再 ex.shutdown(wait=False) 实现,不需要手动管理。

  1. requirements.txt 末尾缺少换行符

alibabacloud_tea_util>=0.3.0
\ No newline at end of file
两个新依赖行末都没有换行,应补充 \n,避免工具解析问题。

Change-Id: I8cf86dfcb1be18cd4a9b5308367b90f590768a11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants