Skip to content

Commit ef34494

Browse files
author
yinsu.zs
committed
feat: history/queue align with ComfyUI (outputs_to_execute, extra_data, GET /queue);
Change-Id: I1f9890aa249a2aaff0fdbc10bcda3e130b70a4e3 Co-developed-by: Cursor <noreply@cursor.com> Made-with: Cursor
1 parent c168aab commit ef34494

9 files changed

Lines changed: 284 additions & 53 deletions

File tree

src/code/agent/routes/gateway_routes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def __init__(self):
4747
self.prompt_handler = PromptHandler(task_manager)
4848
self.serverless_handler = ServerlessHandler()
4949
self.history_handler = HistoryHandler()
50-
self.interrupt_handler = InterruptHandler(task_manager)
50+
self.interrupt_handler = InterruptHandler()
5151
self.userdata_handler = UserdataHandler()
5252
self.ws_handler = WsHandler()
5353
self.serverless_ws_handler = ServerlessWsHandler(constants.GPU_FUNCTION_URL)
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
from .fc_stop_client import stop_async_task
1+
from .fc_client import stop_async_task
22

33
__all__ = ["stop_async_task"]
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
"""
2+
FC OpenAPI 客户端
3+
用于在 CPU 侧调用函数计算 OpenAPI,操作 GPU 上的异步任务。
4+
使用实例 RAM 角色凭证:优先从请求头(FC 注入)获取,否则从环境变量获取。
5+
"""
6+
from flask import request
7+
8+
from alibabacloud_tea_openapi.client import Client as OpenApiClient
9+
from alibabacloud_tea_openapi import models as open_api_models
10+
from alibabacloud_tea_util import models as util_models
11+
12+
import constants
13+
from utils.logger import log
14+
15+
16+
def _get_credentials():
17+
"""
18+
获取阿里云凭证(与 ServerlessApiService.get_credentials 一致)。
19+
优先从请求头获取(FC 实例 RAM 角色注入),否则从环境变量获取。
20+
Returns:
21+
tuple: (access_key_id, access_key_secret, security_token)
22+
"""
23+
ak = ""
24+
sk = ""
25+
sts = ""
26+
try:
27+
ak = request.headers.get(constants.HEADER_KEY_ACCESS_KEY_ID, "")
28+
sk = request.headers.get(constants.HEADER_KEY_ACCESS_KEY_SECRET, "")
29+
sts = request.headers.get(constants.HEADER_KEY_SECURITY_TOKEN, "")
30+
except Exception as e:
31+
log("WARNING", f"get credentials from header failed: {e}")
32+
if not ak or not sk:
33+
ak = getattr(constants, "ALIBABA_CLOUD_ACCESS_KEY_ID", "") or ""
34+
sk = getattr(constants, "ALIBABA_CLOUD_ACCESS_KEY_SECRET", "") or ""
35+
sts = getattr(constants, "ALIBABA_CLOUD_SECURITY_TOKEN", "") or ""
36+
return ak, sk, sts
37+
38+
39+
def _create_client(ak: str, sk: str, sts: str, endpoint: str) -> OpenApiClient:
40+
"""
41+
构造 FC OpenApiClient,供各 API 调用方复用,避免重复构建 Config。
42+
"""
43+
config = open_api_models.Config(
44+
access_key_id=ak,
45+
access_key_secret=sk,
46+
security_token=sts or None,
47+
)
48+
config.endpoint = endpoint
49+
return OpenApiClient(config)
50+
51+
52+
def stop_async_task(task_id: str) -> bool:
53+
"""
54+
调用 FC OpenAPI StopAsyncTask 停止指定异步任务(带签名,使用实例 RAM 角色凭证)。
55+
56+
Args:
57+
task_id: 异步任务 ID(与我们的 task_id/prompt_id 一致)。
58+
59+
Returns:
60+
bool: 请求成功且为 2xx 返回 True,否则 False。
61+
"""
62+
account_id = getattr(constants, "FC_ACCOUNT_ID", "") or ""
63+
region = getattr(constants, "FC_REGION", "cn-hangzhou") or "cn-hangzhou"
64+
cpu_function_name = getattr(constants, "FC_FUNCTION_NAME", "") or ""
65+
66+
if not account_id or not cpu_function_name:
67+
return False
68+
69+
ak, sk, sts = _get_credentials()
70+
if not ak or not sk:
71+
log("WARNING", "StopAsyncTask: no credentials (header or env)")
72+
return False
73+
74+
gpu_function_name = cpu_function_name.replace("-gw", "")
75+
endpoint = f"{account_id}.{region}.fc.aliyuncs.com"
76+
77+
try:
78+
client = _create_client(ak, sk, sts, endpoint)
79+
80+
params = open_api_models.Params(
81+
action="StopAsyncTask",
82+
version="2023-03-30",
83+
protocol="HTTPS",
84+
method="PUT",
85+
auth_type="AK",
86+
style="FC",
87+
pathname=f"/2023-03-30/functions/{gpu_function_name}/async-tasks/{task_id}/stop",
88+
req_body_type="json",
89+
body_type="json",
90+
)
91+
req = open_api_models.OpenApiRequest(query={"qualifier": "LATEST"})
92+
runtime = util_models.RuntimeOptions(read_timeout=5000, connect_timeout=5000)
93+
94+
resp = client.call_api(params, req, runtime)
95+
status_code = resp.get("statusCode") or 0
96+
if status_code >= 200 and status_code < 300:
97+
return True
98+
log("WARNING", f"StopAsyncTask failed: task_id={task_id}, status={status_code}, body={resp.get('body', '')[:200]}")
99+
return False
100+
except Exception as e:
101+
log("WARNING", f"StopAsyncTask request error: task_id={task_id}, error={e}")
102+
return False
Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,21 @@
11
"""
22
Interrupt Handler
3-
处理 POST /api/interrupt 请求,与 ComfyUI 行为对齐
3+
处理 POST /api/interrupt 请求
44
"""
5-
from flask import Response
6-
7-
from utils.logger import log
5+
from flask import jsonify
86

97

108
class InterruptHandler:
11-
"""处理 POST /api/interrupt:中断当前用户正在执行的任务,并通知 GPU/后端真正停止"""
12-
13-
def __init__(self, task_manager):
14-
self.task_manager = task_manager
9+
"""处理 POST /api/interrupt"""
1510

1611
def handle_post(self):
1712
"""
18-
处理 POST /api/interrupt 请求
19-
1) 先对当前用户每个 RUNNING task_id 调用 FC StopAsyncTask,在平台侧停止 GPU 任务
20-
2) 再取消当前用户所有 RUNNING 任务(停轮询、清理本地状态)
21-
返回 200(与 ComfyUI POST /interrupt 对齐)
13+
POST /api/interrupt 暂不支持:FC StopAsyncTask 会带来非预期体验,
14+
禁止客户端中止正在运行的任务,返回 403。
2215
"""
23-
# 函数计算 StopAsyncTask 存在 bug,等 FC 修复后再上线
24-
# from services.fc_openapi.fc_stop_client import stop_async_task
25-
# running_ids = self.task_manager.get_current_user_running_task_ids()
26-
# for task_id in running_ids:
27-
# stop_async_task(task_id)
28-
count = self.task_manager.interrupt_current_user()
29-
if count > 0:
30-
log("INFO", f"Interrupted {count} running task(s) for current user")
31-
return Response(status=200)
16+
return jsonify({
17+
"error": {
18+
"type": "not_supported",
19+
"message": "Interrupting a running task is not supported.",
20+
}
21+
}), 403

src/code/agent/services/gateway/handlers/queue_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def handle_post_request(self):
9999
if "clear" in request_data and request_data["clear"]:
100100
# 清空队列:先对当前用户 PENDING 任务并行调 StopAsyncTask(数量上限 + 总超时),再清本地 PENDING(与 ComfyUI 只清 pending 对齐,不停 RUNNING)
101101
log("INFO", f"Clearing task queue")
102-
from services.fc_openapi.fc_stop_client import stop_async_task
102+
from services.fc_openapi.fc_client import stop_async_task
103103

104104
pending_ids = self.task_manager.get_current_user_pending_task_ids()
105105
ids_to_stop = pending_ids[:MAX_PENDING_STOP_ON_CLEAR]
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
"""
2+
FC OpenAPI 客户端单测
3+
"""
4+
import pytest
5+
from unittest.mock import patch, MagicMock
6+
7+
import constants
8+
from services.fc_openapi.fc_client import stop_async_task
9+
10+
11+
class TestStopAsyncTaskConfig:
12+
"""配置缺失时不应发请求"""
13+
14+
def test_returns_false_when_fc_account_id_empty(self):
15+
with patch.object(constants, "FC_ACCOUNT_ID", ""), \
16+
patch.object(constants, "FC_FUNCTION_NAME", "my-func-gw"), \
17+
patch("services.fc_openapi.fc_client.OpenApiClient") as mock_client:
18+
result = stop_async_task("task-123")
19+
assert result is False
20+
mock_client.assert_not_called()
21+
22+
def test_returns_false_when_fc_function_name_empty(self):
23+
with patch.object(constants, "FC_ACCOUNT_ID", "123456"), \
24+
patch.object(constants, "FC_FUNCTION_NAME", ""), \
25+
patch("services.fc_openapi.fc_client.OpenApiClient") as mock_client:
26+
result = stop_async_task("task-123")
27+
assert result is False
28+
mock_client.assert_not_called()
29+
30+
def test_returns_false_when_no_credentials(self):
31+
with patch.object(constants, "FC_ACCOUNT_ID", "123456"), \
32+
patch.object(constants, "FC_FUNCTION_NAME", "my-func-gw"), \
33+
patch("services.fc_openapi.fc_client._get_credentials", return_value=("", "", "")), \
34+
patch("services.fc_openapi.fc_client.OpenApiClient") as mock_client:
35+
result = stop_async_task("task-123")
36+
assert result is False
37+
mock_client.assert_not_called()
38+
39+
40+
class TestStopAsyncTaskRequest:
41+
"""配置与凭证齐全时调用 OpenAPI 且 path/query 正确"""
42+
43+
@pytest.fixture(autouse=True)
44+
def set_config(self):
45+
with patch.object(constants, "FC_ACCOUNT_ID", "1338904783509062"), \
46+
patch.object(constants, "FC_REGION", "cn-hangzhou"), \
47+
patch.object(constants, "FC_FUNCTION_NAME", "art-speed-test-mpbx-gw-prod"), \
48+
patch("services.fc_openapi.fc_client._get_credentials", return_value=("ak", "sk", "sts")):
49+
yield
50+
51+
def test_put_url_and_qualifier(self, set_config):
52+
mock_client_instance = MagicMock()
53+
mock_client_instance.call_api.return_value = {"statusCode": 200}
54+
with patch("services.fc_openapi.fc_client.OpenApiClient", return_value=mock_client_instance):
55+
result = stop_async_task("task-123")
56+
57+
assert result is True
58+
mock_client_instance.call_api.assert_called_once()
59+
call_args = mock_client_instance.call_api.call_args
60+
params = call_args[0][0]
61+
req = call_args[0][1]
62+
assert "2023-03-30/functions/art-speed-test-mpbx-prod/async-tasks/task-123/stop" in (params.pathname or "")
63+
assert getattr(req, "query", {}).get("qualifier") == "LATEST"
64+
65+
def test_gpu_function_name_strips_gw(self, set_config):
66+
mock_client_instance = MagicMock()
67+
mock_client_instance.call_api.return_value = {"statusCode": 200}
68+
with patch("services.fc_openapi.fc_client.OpenApiClient", return_value=mock_client_instance):
69+
stop_async_task("tid")
70+
params = mock_client_instance.call_api.call_args[0][0]
71+
pathname = params.pathname or ""
72+
assert "art-speed-test-mpbx-prod" in pathname
73+
assert "art-speed-test-mpbx-gw-prod" not in pathname or "/functions/art-speed-test-mpbx-prod/" in pathname
Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,37 @@
11
"""
22
Interrupt Handler 单元测试
3-
测试 POST /api/interrupt 与 ComfyUI 对齐
3+
POST /api/interrupt 暂不支持,返回 403
44
"""
55
import pytest
6-
from unittest.mock import Mock, patch
6+
from unittest.mock import Mock
7+
from flask import Flask
78

89
from services.gateway.handlers.interrupt_handler import InterruptHandler
910

1011

1112
@pytest.fixture
12-
def mock_task_manager():
13-
return Mock()
13+
def app():
14+
return Flask(__name__)
1415

1516

1617
@pytest.fixture
17-
def interrupt_handler(mock_task_manager):
18-
return InterruptHandler(mock_task_manager)
19-
20-
21-
def test_handle_post_calls_interrupt_current_user_and_returns_200(interrupt_handler, mock_task_manager):
22-
"""POST /api/interrupt 调用 interrupt_current_user 并返回 200"""
23-
mock_task_manager.get_current_user_running_task_ids.return_value = []
24-
mock_task_manager.interrupt_current_user.return_value = 1
25-
with patch("services.fc_openapi.fc_stop_client.stop_async_task"):
26-
response = interrupt_handler.handle_post()
27-
assert response.status_code == 200
28-
mock_task_manager.interrupt_current_user.assert_called_once()
29-
30-
31-
def test_handle_post_calls_interrupt_current_user_when_running_exists(interrupt_handler, mock_task_manager):
32-
"""POST /api/interrupt 调用 interrupt_current_user 并返回 200(StopAsyncTask 临时注释期间仅本地中断)"""
33-
mock_task_manager.interrupt_current_user.return_value = 2
34-
response = interrupt_handler.handle_post()
35-
assert response.status_code == 200
36-
mock_task_manager.interrupt_current_user.assert_called_once()
18+
def interrupt_handler():
19+
return InterruptHandler()
20+
21+
22+
def test_handle_post_returns_403(interrupt_handler, app):
23+
"""POST /api/interrupt 返回 403,不允许客户端中止正在运行的任务"""
24+
with app.test_request_context("/api/interrupt", method="POST"):
25+
response, status_code = interrupt_handler.handle_post()
26+
assert status_code == 403
27+
data = response.get_json()
28+
assert data["error"]["type"] == "not_supported"
29+
30+
31+
def test_handle_post_returns_error_message(interrupt_handler, app):
32+
"""POST /api/interrupt 错误体包含可读的 message"""
33+
with app.test_request_context("/api/interrupt", method="POST"):
34+
response, status_code = interrupt_handler.handle_post()
35+
data = response.get_json()
36+
assert "message" in data["error"]
37+
assert len(data["error"]["message"]) > 0

src/code/agent/test/unit/services/gateway/queue_handler_test.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def test_handle_post_clear_calls_stop_async_task_per_pending_then_clear_queue(qu
3333
"""POST /api/queue clear 先对每个 PENDING 调 stop_async_task,再 clear_queue"""
3434
mock_task_manager.get_current_user_pending_task_ids.return_value = ["tid1", "tid2"]
3535
mock_task_manager.clear_queue.return_value = 2
36-
with patch("services.fc_openapi.fc_stop_client.stop_async_task") as mock_stop:
36+
with patch("services.fc_openapi.fc_client.stop_async_task") as mock_stop:
3737
with app.test_request_context("/api/queue", method="POST", json={"clear": True}):
3838
response = queue_handler.handle_post_request()
3939
assert response.status_code == 200
@@ -47,7 +47,7 @@ def test_handle_post_clear_with_no_pending_still_clears_queue(queue_handler, moc
4747
"""POST /api/queue clear 若无 PENDING 仍会调用 clear_queue"""
4848
mock_task_manager.get_current_user_pending_task_ids.return_value = []
4949
mock_task_manager.clear_queue.return_value = 0
50-
with patch("services.fc_openapi.fc_stop_client.stop_async_task") as mock_stop:
50+
with patch("services.fc_openapi.fc_client.stop_async_task") as mock_stop:
5151
with app.test_request_context("/api/queue", method="POST", json={"clear": True}):
5252
response = queue_handler.handle_post_request()
5353
assert response.status_code == 200
@@ -61,7 +61,7 @@ def test_handle_post_clear_caps_stop_async_task_calls(queue_handler, mock_task_m
6161
pending_ids = [f"tid_{i}" for i in range(over_limit)]
6262
mock_task_manager.get_current_user_pending_task_ids.return_value = pending_ids
6363
mock_task_manager.clear_queue.return_value = over_limit
64-
with patch("services.fc_openapi.fc_stop_client.stop_async_task") as mock_stop:
64+
with patch("services.fc_openapi.fc_client.stop_async_task") as mock_stop:
6565
with app.test_request_context("/api/queue", method="POST", json={"clear": True}):
6666
response = queue_handler.handle_post_request()
6767
assert response.status_code == 200
@@ -77,7 +77,7 @@ def test_handle_post_clear_logs_warning_when_stop_partially_fails(queue_handler,
7777
def stop_side_effect(task_id):
7878
return task_id == "tid1"
7979

80-
with patch("services.fc_openapi.fc_stop_client.stop_async_task", side_effect=stop_side_effect):
80+
with patch("services.fc_openapi.fc_client.stop_async_task", side_effect=stop_side_effect):
8181
with patch("services.gateway.handlers.queue_handler.log") as mock_log:
8282
with app.test_request_context("/api/queue", method="POST", json={"clear": True}):
8383
response = queue_handler.handle_post_request()
@@ -93,7 +93,7 @@ def test_handle_post_clear_no_warning_when_all_stop_succeed(queue_handler, mock_
9393
mock_task_manager.get_current_user_pending_task_ids.return_value = ["tid1", "tid2"]
9494
mock_task_manager.clear_queue.return_value = 2
9595

96-
with patch("services.fc_openapi.fc_stop_client.stop_async_task", return_value=True):
96+
with patch("services.fc_openapi.fc_client.stop_async_task", return_value=True):
9797
with patch("services.gateway.handlers.queue_handler.log") as mock_log:
9898
with app.test_request_context("/api/queue", method="POST", json={"clear": True}):
9999
response = queue_handler.handle_post_request()

0 commit comments

Comments
 (0)