Conversation
…a, GET /queue); Change-Id: I1f9890aa249a2aaff0fdbc10bcda3e130b70a4e3 Co-developed-by: Cursor <noreply@cursor.com> Made-with: Cursor
ef34494 to
91f155a
Compare
| @@ -0,0 +1,96 @@ | |||
| """ | |||
There was a problem hiding this comment.
fc_client.py 和 fc_stop_client.py 是重复的,是不是只保留一个就可以?
There was a problem hiding this comment.
Code Review — refactor/queue-handler
严重问题 (Blocker)
- fc_stop_client.py 和 fc_stop_client_test.py 是冗余的文件
fc_stop_client.py 与 fc_client.py 的逻辑 100% 重复,唯一区别是没有提取 _create_client 辅助函数。init.py 已经正确地从 fc_client 导出,queue_handler 也使用 fc_client,fc_stop_client 是孤立的死文件。必须删除,否则后续维护两套代码。
- queue_handler.py 中 except Exception 吞掉了非超时异常
第 248-249 行
except Exception:
timed_out = True
as_completed(..., timeout=...) 超时抛出的是 concurrent.futures.TimeoutError,但这里捕获了所有 Exception,若 as_completed 内部出现其他异常(如 RuntimeError),也会被误认为超时。应改为:
from concurrent.futures import TimeoutError as FuturesTimeoutError
except FuturesTimeoutError:
timed_out = True
3. queue_handler.py 中 stop_async_task 是函数内延迟导入,违反编码规范
第 218 行(函数体内)
from services.fc_openapi.fc_client import stop_async_task
项目规则 1.2 要求导入统一放在模块顶部,这里放在函数体内会导致每次调用时才解析模块(虽有缓存,但逻辑不清晰),应移至文件顶部。
中等问题 (Major)
4. history_manager.py 中"解析 prompt_body"逻辑重复了两遍
_build_history_item(第 337-354 行)和 late_init_history_item(第 371-384 行)中几乎完全相同的一段解析代码:
outputs_to_execute = []
prompt_dict = prompt_body or {}
if isinstance(prompt_dict, dict):
if "prompt" in prompt_dict and ...:
outputs_to_execute = prompt_dict.get("outputs_to_execute", [])
prompt_dict = prompt_dict["prompt"]
if not outputs_to_execute and isinstance(prompt_dict, dict):
outputs_to_execute = infer_outputs_to_execute(prompt_dict)
raw = prompt_body if isinstance(prompt_body, dict) else {}
extra_data = dict(raw.get("extra_data", {})) ...
应提取为一个私有方法 _parse_prompt_body(prompt_body, client_id) -> (prompt_dict, extra_data, outputs_to_execute) 消除重复。
- interrupt_current_user 和 get_current_user_running_task_ids 是死代码
TaskManager 中实现了这两个方法,但 InterruptHandler 直接返回 403,完全没有调用。若这个功能是 "暂不支持",这两个方法不应该存在于此次 PR 中,要么删除,要么在 PR 描述中明确说明"为后续 interrupt 功能预留"。
- task_manager.py 中 delete_history_items 存在 TOCTOU race condition
item = self._history_manager.get_history_item(prompt_id) # 加锁读
if item and item.get("user_id") == user_id:
if self._history_manager.remove_history_item(prompt_id): # 加锁写
两次操作各自持锁,但中间无原子性保证,另一线程可能在 get 和 remove 之间删除或修改该 item。应在 HistoryManager 中提供一个原子的 remove_if_owned(prompt_id, user_id) 方法。
轻微问题 (Minor)
7. queue_handler.py 中 _build_task_info 有永远为真的冗余判断
prompt_dict = prompt_body if isinstance(prompt_body, dict) else {}
if isinstance(prompt_dict, dict): # ← 永远为真
if "prompt" in prompt_dict and isinstance(prompt_dict.get("prompt"), dict):
第二个 isinstance(prompt_dict, dict) 可直接删除。
- ThreadPoolExecutor 未使用 with 语法,资源管理不健壮
ex = ThreadPoolExecutor(max_workers=workers)
try:
...
finally:
ex.shutdown(wait=not timed_out)
若 ex.submit(...) 之前(futs = {ex.submit(...)} 这行)抛异常,finally 不会执行,线程池泄漏。若用 with ThreadPoolExecutor(...) as ex:,则 exit 保证一定会 shutdown。超时时不想等待的需求,可以通过先 cancel 再 ex.shutdown(wait=False) 实现,不需要手动管理。
- requirements.txt 末尾缺少换行符
alibabacloud_tea_util>=0.3.0
\ No newline at end of file
两个新依赖行末都没有换行,应补充 \n,避免工具解析问题。
No description provided.