Skip to content

Commit ef076f1

Browse files
committed
Stabilize WeCom channel runtime without dragging in unrelated PR scope
Rebuild the WeCom runtime fix on top of the current mainline so the channel-state and session-isolation changes can merge independently. This keeps the scope to runtime connection status, disconnect handling, official SDK payload parsing, group-vs-p2p session keys, and the minimal UI feedback needed for save and disconnect actions. Constraint: WeCom AI Bot traffic uses official SDK fields like from_userid, chattype, and chatid Constraint: The fix must stand alone on upstream/main without pulling in org-sync or password-reset work Rejected: Reuse the old PR branch history directly | earlier iterations mixed in unrelated changes and made scope review harder Rejected: Add a proxy dependency for the websocket path | forcing proxy=None is sufficient for the local SDK path Confidence: high Scope-risk: moderate Reversibility: clean Directive: Keep channel runtime behavior and ChannelConfig feedback changes together; splitting them will reintroduce silent disconnect failures Tested: cd backend && .venv/bin/python -m pytest tests/test_wecom_stream.py tests/test_wecom_channel_api.py Tested: cd backend && .venv/bin/python -m ruff check app/api/wecom.py app/services/wecom_stream.py tests/test_wecom_channel_api.py tests/test_wecom_stream.py Tested: cd frontend && npm run build Not-tested: End-to-end live single-chat and group-chat exchanges against a running WeCom bot
1 parent 441f41b commit ef076f1

7 files changed

Lines changed: 349 additions & 36 deletions

File tree

backend/app/api/wecom.py

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,11 @@
55

66
import base64
77
import hashlib
8-
import re
9-
import socket
108
import struct
11-
import time
129
import uuid
1310
import xml.etree.ElementTree as ET
1411

15-
from fastapi import APIRouter, Depends, HTTPException, Request, Response, status
12+
from fastapi import APIRouter, Depends, HTTPException, Request, Response
1613
from loguru import logger
1714
from sqlalchemy import select
1815
from sqlalchemy.ext.asyncio import AsyncSession
@@ -138,6 +135,7 @@ async def configure_wecom_channel(
138135
existing.verification_token = token
139136
existing.extra_config = extra_config
140137
existing.is_configured = True
138+
existing.is_connected = False
141139
await db.flush()
142140
config_out = ChannelConfigOut.model_validate(existing)
143141
else:
@@ -150,22 +148,26 @@ async def configure_wecom_channel(
150148
verification_token=token,
151149
extra_config=extra_config,
152150
is_configured=True,
151+
is_connected=False,
153152
)
154153
db.add(config)
155154
await db.flush()
156155
config_out = ChannelConfigOut.model_validate(config)
157156

158-
# Auto-start WebSocket client if bot credentials provided
159-
if has_ws_mode:
160-
try:
161-
from app.services.wecom_stream import wecom_stream_manager
162-
import asyncio
157+
try:
158+
from app.services.wecom_stream import wecom_stream_manager
159+
import asyncio
160+
161+
if has_ws_mode:
163162
asyncio.create_task(
164163
wecom_stream_manager.start_client(agent_id, bot_id, bot_secret)
165164
)
166165
logger.info(f"[WeCom] WebSocket client start triggered for agent {agent_id}")
167-
except Exception as e:
168-
logger.error(f"[WeCom] Failed to start WebSocket client: {e}")
166+
else:
167+
asyncio.create_task(wecom_stream_manager.stop_client(agent_id))
168+
logger.info(f"[WeCom] WebSocket client stop triggered for agent {agent_id}")
169+
except Exception as e:
170+
logger.error(f"[WeCom] Failed to update WebSocket client state: {e}")
169171

170172
return config_out
171173

@@ -186,7 +188,15 @@ async def get_wecom_channel(
186188
config = result.scalar_one_or_none()
187189
if not config:
188190
raise HTTPException(status_code=404, detail="WeCom not configured")
189-
return ChannelConfigOut.model_validate(config)
191+
192+
config_out = ChannelConfigOut.model_validate(config)
193+
if (config.extra_config or {}).get("connection_mode") == "websocket":
194+
from app.services.wecom_stream import wecom_stream_manager
195+
196+
config_out.is_connected = wecom_stream_manager.status().get(str(agent_id), False)
197+
else:
198+
config_out.is_connected = False
199+
return config_out
190200

191201

192202
@router.get("/agents/{agent_id}/wecom-channel/webhook-url")
@@ -227,6 +237,9 @@ async def delete_wecom_channel(
227237
config = result.scalar_one_or_none()
228238
if not config:
229239
raise HTTPException(status_code=404, detail="WeCom not configured")
240+
from app.services.wecom_stream import wecom_stream_manager
241+
242+
await wecom_stream_manager.stop_client(agent_id)
230243
await db.delete(config)
231244

232245

@@ -300,8 +313,6 @@ async def wecom_event_webhook(
300313

301314
token = config.verification_token or ""
302315
encoding_aes_key = config.encrypt_key or ""
303-
corp_id = config.app_id or ""
304-
305316
# Parse encrypted XML body
306317
try:
307318
root = ET.fromstring(body_bytes)
@@ -457,7 +468,6 @@ async def _process_wecom_text(
457468
kf_msg_id: str = None,
458469
):
459470
"""Process an incoming WeCom text message and reply."""
460-
import json
461471
import httpx
462472
from datetime import datetime, timezone
463473
from sqlalchemy import select as _select
@@ -476,7 +486,6 @@ async def _process_wecom_text(
476486
if not agent_obj:
477487
logger.warning(f"[WeCom] Agent {agent_id} not found")
478488
return
479-
creator_id = agent_obj.creator_id
480489
ctx_size = agent_obj.context_window_size if agent_obj else 20
481490

482491
conv_id = f"wecom_p2p_{from_user}"

backend/app/services/wecom_stream.py

Lines changed: 63 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,47 @@
1515
from app.models.channel_config import ChannelConfig
1616

1717

18+
def _disable_wecom_sdk_proxy() -> None:
19+
"""Force the WeCom SDK websocket path to bypass system proxies."""
20+
import wecom_aibot_sdk.ws as sdk_ws
21+
22+
if getattr(sdk_ws.websockets.connect, "__clawith_no_proxy_patch__", False):
23+
return
24+
25+
original_connect = sdk_ws.websockets.connect
26+
27+
def connect_no_proxy(*args, **kwargs):
28+
kwargs.setdefault("proxy", None)
29+
return original_connect(*args, **kwargs)
30+
31+
connect_no_proxy.__clawith_no_proxy_patch__ = True
32+
sdk_ws.websockets.connect = connect_no_proxy
33+
34+
35+
def _extract_wecom_sender_id(body: dict) -> str:
36+
sender = body.get("from")
37+
if isinstance(sender, dict):
38+
sender_id = sender.get("user_id") or sender.get("userid")
39+
if sender_id:
40+
return str(sender_id).strip()
41+
return str(body.get("from_userid") or body.get("userid") or "").strip()
42+
43+
44+
def _extract_wecom_chat_type(body: dict) -> str:
45+
return str(body.get("chattype") or body.get("chat_type") or "single").strip().lower()
46+
47+
48+
def _extract_wecom_chat_id(body: dict) -> str:
49+
return str(body.get("chatid") or body.get("chat_id") or "").strip()
50+
51+
52+
def _build_wecom_conv_id(sender_id: str, chat_id: str, chat_type: str) -> str:
53+
normalized_type = (chat_type or "single").strip().lower()
54+
if normalized_type in {"group", "groupchat", "group_chat"} and chat_id:
55+
return f"wecom_group_{chat_id}"
56+
return f"wecom_p2p_{sender_id}"
57+
58+
1859
class WeComStreamManager:
1960
"""Manages WeCom AI Bot WebSocket clients for all agents."""
2061

@@ -63,6 +104,7 @@ async def _run_client(
63104
return
64105

65106
try:
107+
_disable_wecom_sdk_proxy()
66108
client = WSClient({
67109
"bot_id": bot_id,
68110
"secret": bot_secret,
@@ -80,13 +122,25 @@ async def on_text(frame):
80122
if not user_text:
81123
return
82124

83-
sender = body.get("from", {})
84-
sender_id = sender.get("user_id", "") or sender.get("userid", "")
85-
chat_id = body.get("chatid", "")
86-
chat_type = body.get("chat_type", "single")
125+
sender_id = _extract_wecom_sender_id(body)
126+
chat_id = _extract_wecom_chat_id(body)
127+
chat_type = _extract_wecom_chat_type(body)
128+
msg_id = str(body.get("msgid", "")).strip()
129+
130+
if not sender_id:
131+
logger.error(
132+
"[WeCom Stream] Missing sender_id, skip message",
133+
extra={
134+
"msgid": msg_id,
135+
"chat_type": chat_type,
136+
"chat_id": chat_id,
137+
"body_keys": sorted(body.keys()),
138+
},
139+
)
140+
return
87141

88142
logger.info(
89-
f"[WeCom Stream] Text from {sender_id}: {user_text[:80]}"
143+
f"[WeCom Stream] Text from {sender_id} ({chat_type}, chat_id={chat_id or '-'})"
90144
)
91145

92146
# Process message and get reply
@@ -121,8 +175,7 @@ async def on_text(frame):
121175
async def on_image(frame):
122176
try:
123177
body = frame.body or {}
124-
sender = body.get("from", {})
125-
sender_id = sender.get("user_id", "") or sender.get("userid", "")
178+
sender_id = _extract_wecom_sender_id(body)
126179
logger.info(f"[WeCom Stream] Image message from {sender_id} (not yet handled)")
127180
stream_id = generate_req_id("stream")
128181
await client.reply_stream(
@@ -137,8 +190,7 @@ async def on_image(frame):
137190
async def on_file(frame):
138191
try:
139192
body = frame.body or {}
140-
sender = body.get("from", {})
141-
sender_id = sender.get("user_id", "") or sender.get("userid", "")
193+
sender_id = _extract_wecom_sender_id(body)
142194
logger.info(f"[WeCom Stream] File message from {sender_id} (not yet handled)")
143195
stream_id = generate_req_id("stream")
144196
await client.reply_stream(
@@ -216,7 +268,7 @@ async def start_all(self):
216268
async with async_session() as db:
217269
result = await db.execute(
218270
select(ChannelConfig).where(
219-
ChannelConfig.is_configured == True,
271+
ChannelConfig.is_configured,
220272
ChannelConfig.channel_type == "wecom",
221273
)
222274
)
@@ -274,11 +326,7 @@ async def _process_wecom_stream_message(
274326
return "Agent not found"
275327
ctx_size = agent_obj.context_window_size or 20
276328

277-
# Conversation ID: differentiate single chat vs group chat
278-
if chat_type == "group" and chat_id:
279-
conv_id = f"wecom_group_{chat_id}"
280-
else:
281-
conv_id = f"wecom_p2p_{sender_id}"
329+
conv_id = _build_wecom_conv_id(sender_id, chat_id, chat_type)
282330

283331
# Find or create platform user
284332
wc_username = f"wecom_{sender_id}"
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
import uuid
2+
from datetime import UTC, datetime
3+
from types import SimpleNamespace
4+
5+
import pytest
6+
7+
from app.api import wecom as wecom_api
8+
from app.models.channel_config import ChannelConfig
9+
from app.models.user import User
10+
11+
12+
class DummyResult:
13+
def __init__(self, value=None):
14+
self._value = value
15+
16+
def scalar_one_or_none(self):
17+
return self._value
18+
19+
20+
class RecordingDB:
21+
def __init__(self, responses=None):
22+
self.responses = list(responses or [])
23+
self.deleted = []
24+
self.flushed = False
25+
26+
async def execute(self, statement):
27+
if self.responses:
28+
return self.responses.pop(0)
29+
return DummyResult()
30+
31+
def add(self, _obj):
32+
return None
33+
34+
async def flush(self):
35+
self.flushed = True
36+
37+
async def delete(self, obj):
38+
self.deleted.append(obj)
39+
40+
41+
def make_user(**overrides):
42+
values = {
43+
"id": uuid.uuid4(),
44+
"username": "alice",
45+
"email": "alice@example.com",
46+
"password_hash": "old-hash",
47+
"display_name": "Alice",
48+
"role": "member",
49+
"tenant_id": uuid.uuid4(),
50+
"is_active": True,
51+
}
52+
values.update(overrides)
53+
return User(**values)
54+
55+
56+
def make_channel(agent_id: uuid.UUID, *, connection_mode: str = "websocket") -> ChannelConfig:
57+
return ChannelConfig(
58+
id=uuid.uuid4(),
59+
agent_id=agent_id,
60+
channel_type="wecom",
61+
app_id="corp_id",
62+
app_secret="secret",
63+
is_configured=True,
64+
is_connected=False,
65+
extra_config={"connection_mode": connection_mode, "bot_id": "bot_123", "bot_secret": "secret_123"},
66+
created_at=datetime.now(UTC),
67+
)
68+
69+
70+
@pytest.mark.asyncio
71+
async def test_get_wecom_channel_reports_runtime_websocket_status(monkeypatch):
72+
agent_id = uuid.uuid4()
73+
config = make_channel(agent_id, connection_mode="websocket")
74+
db = RecordingDB([DummyResult(config)])
75+
76+
async def fake_check_agent_access(_db, _user, _agent_id):
77+
return object(), None
78+
79+
class FakeManager:
80+
def status(self):
81+
return {str(agent_id): True}
82+
83+
monkeypatch.setattr(wecom_api, "check_agent_access", fake_check_agent_access)
84+
monkeypatch.setattr("app.services.wecom_stream.wecom_stream_manager", FakeManager())
85+
86+
result = await wecom_api.get_wecom_channel(
87+
agent_id=agent_id,
88+
current_user=make_user(),
89+
db=db,
90+
)
91+
92+
assert result.is_connected is True
93+
94+
95+
@pytest.mark.asyncio
96+
async def test_get_wecom_channel_marks_webhook_mode_disconnected(monkeypatch):
97+
agent_id = uuid.uuid4()
98+
config = make_channel(agent_id, connection_mode="webhook")
99+
db = RecordingDB([DummyResult(config)])
100+
101+
async def fake_check_agent_access(_db, _user, _agent_id):
102+
return object(), None
103+
104+
monkeypatch.setattr(wecom_api, "check_agent_access", fake_check_agent_access)
105+
106+
result = await wecom_api.get_wecom_channel(
107+
agent_id=agent_id,
108+
current_user=make_user(),
109+
db=db,
110+
)
111+
112+
assert result.is_connected is False
113+
114+
115+
@pytest.mark.asyncio
116+
async def test_delete_wecom_channel_stops_runtime_client(monkeypatch):
117+
agent_id = uuid.uuid4()
118+
config = make_channel(agent_id)
119+
db = RecordingDB([DummyResult(config)])
120+
stop_calls = []
121+
122+
async def fake_check_agent_access(_db, _user, _agent_id):
123+
return SimpleNamespace(creator_id=creator.id), None
124+
125+
async def fake_stop_client(aid):
126+
stop_calls.append(aid)
127+
128+
creator = make_user()
129+
monkeypatch.setattr(wecom_api, "check_agent_access", fake_check_agent_access)
130+
monkeypatch.setattr("app.services.wecom_stream.wecom_stream_manager.stop_client", fake_stop_client)
131+
132+
await wecom_api.delete_wecom_channel(
133+
agent_id=agent_id,
134+
current_user=creator,
135+
db=db,
136+
)
137+
138+
assert stop_calls == [agent_id]
139+
assert db.deleted == [config]

0 commit comments

Comments
 (0)