-
Notifications
You must be signed in to change notification settings - Fork 2
refactor: queue clear #76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,4 +8,5 @@ | |
| .vscode/settings.json | ||
| reference | ||
| test | ||
| .cursor | ||
| .cursor | ||
| **/docs | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| from .fc_client import stop_async_task | ||
|
|
||
| __all__ = ["stop_async_task"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,102 @@ | ||
| """ | ||
| FC OpenAPI 客户端 | ||
| 用于在 CPU 侧调用函数计算 OpenAPI,操作 GPU 上的异步任务。 | ||
| 使用实例 RAM 角色凭证:优先从请求头(FC 注入)获取,否则从环境变量获取。 | ||
| """ | ||
| from flask import request | ||
|
|
||
| from alibabacloud_tea_openapi.client import Client as OpenApiClient | ||
| from alibabacloud_tea_openapi import models as open_api_models | ||
| from alibabacloud_tea_util import models as util_models | ||
|
|
||
| import constants | ||
| from utils.logger import log | ||
|
|
||
|
|
||
| def _get_credentials(): | ||
| """ | ||
| 获取阿里云凭证(与 ServerlessApiService.get_credentials 一致)。 | ||
| 优先从请求头获取(FC 实例 RAM 角色注入),否则从环境变量获取。 | ||
| Returns: | ||
| tuple: (access_key_id, access_key_secret, security_token) | ||
| """ | ||
| ak = "" | ||
| sk = "" | ||
| sts = "" | ||
| try: | ||
| ak = request.headers.get(constants.HEADER_KEY_ACCESS_KEY_ID, "") | ||
| sk = request.headers.get(constants.HEADER_KEY_ACCESS_KEY_SECRET, "") | ||
| sts = request.headers.get(constants.HEADER_KEY_SECURITY_TOKEN, "") | ||
| except Exception as e: | ||
| log("WARNING", f"get credentials from header failed: {e}") | ||
| if not ak or not sk: | ||
| ak = getattr(constants, "ALIBABA_CLOUD_ACCESS_KEY_ID", "") or "" | ||
| sk = getattr(constants, "ALIBABA_CLOUD_ACCESS_KEY_SECRET", "") or "" | ||
| sts = getattr(constants, "ALIBABA_CLOUD_SECURITY_TOKEN", "") or "" | ||
| return ak, sk, sts | ||
|
|
||
|
|
||
| def _create_client(ak: str, sk: str, sts: str, endpoint: str) -> OpenApiClient: | ||
| """ | ||
| 构造 FC OpenApiClient,供各 API 调用方复用,避免重复构建 Config。 | ||
| """ | ||
| config = open_api_models.Config( | ||
| access_key_id=ak, | ||
| access_key_secret=sk, | ||
| security_token=sts or None, | ||
| ) | ||
| config.endpoint = endpoint | ||
| return OpenApiClient(config) | ||
|
|
||
|
|
||
| def stop_async_task(task_id: str) -> bool: | ||
| """ | ||
| 调用 FC OpenAPI StopAsyncTask 停止指定异步任务(带签名,使用实例 RAM 角色凭证)。 | ||
|
|
||
| Args: | ||
| task_id: 异步任务 ID(与我们的 task_id/prompt_id 一致)。 | ||
|
|
||
| Returns: | ||
| bool: 请求成功且为 2xx 返回 True,否则 False。 | ||
| """ | ||
| account_id = getattr(constants, "FC_ACCOUNT_ID", "") or "" | ||
| region = getattr(constants, "FC_REGION", "cn-hangzhou") or "cn-hangzhou" | ||
| cpu_function_name = getattr(constants, "FC_FUNCTION_NAME", "") or "" | ||
|
|
||
| if not account_id or not cpu_function_name: | ||
| return False | ||
|
|
||
| ak, sk, sts = _get_credentials() | ||
| if not ak or not sk: | ||
| log("WARNING", "StopAsyncTask: no credentials (header or env)") | ||
| return False | ||
|
|
||
| gpu_function_name = cpu_function_name.replace("-gw", "") | ||
| endpoint = f"{account_id}.{region}.fc.aliyuncs.com" | ||
|
|
||
| try: | ||
| client = _create_client(ak, sk, sts, endpoint) | ||
|
|
||
| params = open_api_models.Params( | ||
| action="StopAsyncTask", | ||
| version="2023-03-30", | ||
| protocol="HTTPS", | ||
| method="PUT", | ||
| auth_type="AK", | ||
| style="FC", | ||
| pathname=f"/2023-03-30/functions/{gpu_function_name}/async-tasks/{task_id}/stop", | ||
| req_body_type="json", | ||
| body_type="json", | ||
| ) | ||
| req = open_api_models.OpenApiRequest(query={"qualifier": "LATEST"}) | ||
| runtime = util_models.RuntimeOptions(read_timeout=5000, connect_timeout=5000) | ||
|
|
||
| resp = client.call_api(params, req, runtime) | ||
| status_code = resp.get("statusCode") or 0 | ||
| if status_code >= 200 and status_code < 300: | ||
| return True | ||
| log("WARNING", f"StopAsyncTask failed: task_id={task_id}, status={status_code}, body={resp.get('body', '')[:200]}") | ||
| return False | ||
| except Exception as e: | ||
| log("WARNING", f"StopAsyncTask request error: task_id={task_id}, error={e}") | ||
| return False |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
21 changes: 21 additions & 0 deletions
21
src/code/agent/services/gateway/handlers/interrupt_handler.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| """ | ||
| Interrupt Handler | ||
| 处理 POST /api/interrupt 请求 | ||
| """ | ||
| from flask import jsonify | ||
|
|
||
|
|
||
| class InterruptHandler: | ||
| """处理 POST /api/interrupt""" | ||
|
|
||
| def handle_post(self): | ||
| """ | ||
| POST /api/interrupt 暂不支持:FC StopAsyncTask 会带来非预期体验, | ||
| 禁止客户端中止正在运行的任务,返回 403。 | ||
| """ | ||
| return jsonify({ | ||
| "error": { | ||
| "type": "not_supported", | ||
| "message": "Interrupting a running task is not supported.", | ||
| } | ||
| }), 403 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review — refactor/queue-handler
严重问题 (Blocker)
fc_stop_client.py 与 fc_client.py 的逻辑 100% 重复,唯一区别是没有提取 _create_client 辅助函数。init.py 已经正确地从 fc_client 导出,queue_handler 也使用 fc_client,fc_stop_client 是孤立的死文件。必须删除,否则后续维护两套代码。
第 248-249 行
except Exception:
timed_out = True
as_completed(..., timeout=...) 超时抛出的是 concurrent.futures.TimeoutError,但这里捕获了所有 Exception,若 as_completed 内部出现其他异常(如 RuntimeError),也会被误认为超时。应改为:
from concurrent.futures import TimeoutError as FuturesTimeoutError
except FuturesTimeoutError:
timed_out = True
3. queue_handler.py 中 stop_async_task 是函数内延迟导入,违反编码规范
第 218 行(函数体内)
from services.fc_openapi.fc_client import stop_async_task
项目规则 1.2 要求导入统一放在模块顶部,这里放在函数体内会导致每次调用时才解析模块(虽有缓存,但逻辑不清晰),应移至文件顶部。
中等问题 (Major)
4. history_manager.py 中"解析 prompt_body"逻辑重复了两遍
_build_history_item(第 337-354 行)和 late_init_history_item(第 371-384 行)中几乎完全相同的一段解析代码:
outputs_to_execute = []
prompt_dict = prompt_body or {}
if isinstance(prompt_dict, dict):
if "prompt" in prompt_dict and ...:
outputs_to_execute = prompt_dict.get("outputs_to_execute", [])
prompt_dict = prompt_dict["prompt"]
if not outputs_to_execute and isinstance(prompt_dict, dict):
outputs_to_execute = infer_outputs_to_execute(prompt_dict)
raw = prompt_body if isinstance(prompt_body, dict) else {}
extra_data = dict(raw.get("extra_data", {})) ...
应提取为一个私有方法 _parse_prompt_body(prompt_body, client_id) -> (prompt_dict, extra_data, outputs_to_execute) 消除重复。
TaskManager 中实现了这两个方法,但 InterruptHandler 直接返回 403,完全没有调用。若这个功能是 "暂不支持",这两个方法不应该存在于此次 PR 中,要么删除,要么在 PR 描述中明确说明"为后续 interrupt 功能预留"。
item = self._history_manager.get_history_item(prompt_id) # 加锁读
if item and item.get("user_id") == user_id:
if self._history_manager.remove_history_item(prompt_id): # 加锁写
两次操作各自持锁,但中间无原子性保证,另一线程可能在 get 和 remove 之间删除或修改该 item。应在 HistoryManager 中提供一个原子的 remove_if_owned(prompt_id, user_id) 方法。
轻微问题 (Minor)
7. queue_handler.py 中 _build_task_info 有永远为真的冗余判断
prompt_dict = prompt_body if isinstance(prompt_body, dict) else {}
if isinstance(prompt_dict, dict): # ← 永远为真
if "prompt" in prompt_dict and isinstance(prompt_dict.get("prompt"), dict):
第二个 isinstance(prompt_dict, dict) 可直接删除。
ex = ThreadPoolExecutor(max_workers=workers)
try:
...
finally:
ex.shutdown(wait=not timed_out)
若 ex.submit(...) 之前(futs = {ex.submit(...)} 这行)抛异常,finally 不会执行,线程池泄漏。若用 with ThreadPoolExecutor(...) as ex:,则 exit 保证一定会 shutdown。超时时不想等待的需求,可以通过先 cancel 再 ex.shutdown(wait=False) 实现,不需要手动管理。
alibabacloud_tea_util>=0.3.0
\ No newline at end of file
两个新依赖行末都没有换行,应补充 \n,避免工具解析问题。