Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,8 @@ GenieData/
.opencode/
.kilocode/
.worktrees/
CLAUDE.md
.claude/


dashboard/bun.lock
5 changes: 4 additions & 1 deletion astrbot/api/all.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,7 @@

from astrbot.core.platform.register import register_platform_adapter

from .message_components import *
from .message_components import *

# tracing
from .trace import span_context, span_record, get_current_span # noqa: F401
45 changes: 45 additions & 0 deletions astrbot/api/trace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Public tracing API for AstrBot plugins.

Plugin authors can import from this module to instrument their code with
trace spans that automatically appear in the AstrBot trace dashboard.

Quick start::

from astrbot.api.trace import span_record, span_context

class MyPlugin(Star):

@command("weather")
@span_record("plugin.weather", span_type="plugin_call", record_input=True)
async def get_weather(self, event: AstrMessageEvent, city: str):
result = await self._fetch(city)
yield event.plain_result(result)

async def _fetch(self, city: str):
async with span_context("http_fetch", span_type="io_call") as s:
s.set_input(city=city)
data = await httpx.get(f"https://wttr.in/{city}?format=3")
s.set_output(status=data.status_code)
return data.text

All spans created this way are automatically attached to the trace for the
currently-processed request (via a ``contextvars.ContextVar``) and will show
up in the span tree on the Trace page. When tracing is disabled in the
dashboard settings, all functions are called with zero overhead.
"""

from astrbot.core.utils.trace import (
TraceSpan,
_NullSpan,
get_current_span,
span_context,
span_record,
)

__all__ = [
"span_context", # async with span_context("name", span_type="io_call") as s:
"span_record", # @span_record("name", span_type="plugin_call")
"get_current_span", # TraceSpan | None — manual span manipulation
"TraceSpan", # type hint
"_NullSpan", # type hint (returned when tracing is disabled)
]
135 changes: 124 additions & 11 deletions astrbot/core/astr_agent_run_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import traceback
from collections.abc import AsyncGenerator

from astrbot.core import logger
from astrbot.core import astrbot_config, logger
from astrbot.core.agent.message import Message
from astrbot.core.agent.runners.tool_loop_agent_runner import ToolLoopAgentRunner
from astrbot.core.astr_agent_context import AstrAgentContext
Expand All @@ -19,6 +19,7 @@
)
from astrbot.core.provider.entities import LLMResponse
from astrbot.core.provider.provider import TTSProvider
from astrbot.core.utils.trace import TraceSpan, get_current_span

AgentRunner = ToolLoopAgentRunner[AstrAgentContext]

Expand Down Expand Up @@ -124,6 +125,17 @@ async def run_agent(
step_idx = 0
astr_event = agent_runner.run_context.context.event
tool_name_by_call_id: dict[str, str] = {}
# Trace: parent span for all step spans.
# Prefer the ContextVar (set by internal.py before calling run_agent) so
# this function works correctly even when called without an event attribute.
_trace_on = astrbot_config.get("trace_enable", False)
_llm_parent = get_current_span() or getattr(
astr_event, "_llm_agent_span", astr_event.trace
)
# Per-step span and per-tool-call spans
_step_span = None
_tool_spans: dict[str, TraceSpan] = {} # call_id -> TraceSpan

buffered_llm_chains: list[MessageChain] = []
can_buffer_llm_result = _should_buffer_llm_result(
buffer_intermediate_messages,
Expand All @@ -149,6 +161,20 @@ async def run_agent(
)
)

# Create a span for this LLM iteration
if _trace_on:
_step_span = _llm_parent.child(
f"llm_step_{step_idx}",
span_type="llm_call",
model=agent_runner.provider.get_model()
if agent_runner.provider
else "",
)
_step_span.set_input(
message_count=len(agent_runner.run_context.messages),
)
_tool_spans = {}

stop_watcher = asyncio.create_task(
_watch_agent_stop_signal(agent_runner, astr_event),
)
Expand Down Expand Up @@ -177,20 +203,29 @@ async def run_agent(
pass
astr_event.set_extra("agent_user_aborted", True)
astr_event.set_extra("agent_stop_requested", False)
if (
_trace_on
and _step_span is not None
and _step_span.finished_at is None
):
_step_span.finish(status="error", reason="aborted")
return

if _should_stop_agent(astr_event):
continue

if resp.type == "tool_call_result":
msg_chain = resp.data["chain"]
result_text = msg_chain.get_plain_text(with_other_comps_mark=True)

astr_event.trace.record(
"agent_tool_result",
tool_result=msg_chain.get_plain_text(
with_other_comps_mark=True
),
)
# Finish matching tool span
if _trace_on:
result_data = _extract_chain_json_data(msg_chain)
call_id = str(result_data.get("id", "")) if result_data else ""
tool_span = _tool_spans.pop(call_id, None)
if tool_span is not None and tool_span.finished_at is None:
tool_span.set_output(result=result_text[:4000])
tool_span.finish()

if msg_chain.type == "tool_direct_result":
# tool_direct_result 用于标记 llm tool 需要直接发送给用户的内容
Expand Down Expand Up @@ -218,10 +253,41 @@ async def run_agent(
yield MessageChain(chain=[], type="break")

tool_info = _extract_chain_json_data(resp.data["chain"])
astr_event.trace.record(
"agent_tool_call",
tool_name=tool_info if tool_info else "unknown",
)
# Create tool call span
if _trace_on and _step_span is not None:
tool_name = (
tool_info.get("name", "unknown")
if isinstance(tool_info, dict)
else "unknown"
)
tool_call_id = (
str(tool_info.get("id", ""))
if isinstance(tool_info, dict)
else ""
)
tool_args = (
tool_info.get("arguments", {})
if isinstance(tool_info, dict)
else {}
)
_ts = _step_span.child(tool_name, span_type="tool_call")
_ts.set_input(
**(
tool_args
if isinstance(tool_args, dict)
else {"args": tool_args}
)
)
# Attach plugin attribution to the tool_call span so the
# originating plugin is visible even outside the plugin_handler
# subtree (e.g. @llm_tool registered by a third-party plugin).
_tool_plugin_meta = _resolve_tool_plugin_meta(
agent_runner, tool_name
)
if _tool_plugin_meta:
_ts.set_meta(**_tool_plugin_meta)
if tool_call_id:
_tool_spans[tool_call_id] = _ts
_record_tool_call_name(tool_info, tool_name_by_call_id)

if astr_event.get_platform_name() == "webchat":
Expand All @@ -245,6 +311,21 @@ async def run_agent(
if stream_to_general and resp.type == "streaming_delta":
continue

# Finish step span on llm_result
if _trace_on and resp.type == "llm_result" and _step_span is not None:
resp_chain = resp.data.get("chain")
completion = resp_chain.get_plain_text() if resp_chain else ""
_step_span.set_output(completion=completion[:2000])
stats = agent_runner.stats
if stats and stats.token_usage:
_step_span.set_meta(
input_tokens=stats.token_usage.input,
output_tokens=stats.token_usage.output,
cached_tokens=stats.token_usage.input_cached,
)
if _step_span.finished_at is None:
_step_span.finish()

if stream_to_general or not agent_runner.streaming:
if can_buffer_llm_result and resp.type == "llm_result":
buffered_llm_chains.append(resp.data["chain"])
Expand Down Expand Up @@ -288,6 +369,9 @@ async def run_agent(
await stop_watcher
except asyncio.CancelledError:
pass
# Finish step span if not already done (e.g. streaming case)
if _trace_on and _step_span is not None and _step_span.finished_at is None:
_step_span.finish()
if agent_runner.done():
# send agent stats to webchat
if astr_event.get_platform_name() == "webchat":
Expand Down Expand Up @@ -339,6 +423,35 @@ async def run_agent(
return


def _resolve_tool_plugin_meta(agent_runner: AgentRunner, tool_name: str) -> dict | None:
"""Return plugin attribution meta for a tool call span.

Looks up the tool by name in the agent runner's tool set, then resolves
the originating plugin via star_map using the tool's handler_module_path.
Returns None for MCP tools or when attribution cannot be determined.
"""
try:
from astrbot.core.star.star import star_map

req = agent_runner.req
if req is None or req.func_tool is None:
return None
tool = req.func_tool.get_tool(tool_name)
if tool is None or not tool.handler_module_path:
# MCP tools and built-in framework tools have no handler_module_path
return None
md = star_map.get(tool.handler_module_path)
if md is None:
return None
return {
"plugin": md.name,
"plugin_type": "builtin" if md.reserved else "third_party",
}
except Exception as e:
logger.debug(f"[trace] Failed to resolve tool plugin meta: {e}")
return None


async def _watch_agent_stop_signal(agent_runner: AgentRunner, astr_event) -> None:
while not agent_runner.done():
if _should_stop_agent(astr_event):
Expand Down
59 changes: 47 additions & 12 deletions astrbot/core/astr_agent_tool_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import mcp

from astrbot import logger
from astrbot.core import astrbot_config as _astrbot_config
from astrbot.core.agent.handoff import HandoffTool
from astrbot.core.agent.mcp_client import MCPTool
from astrbot.core.agent.message import Message
Expand Down Expand Up @@ -49,6 +50,7 @@
from astrbot.core.utils.history_saver import persist_agent_history
from astrbot.core.utils.image_ref_utils import is_supported_image_ref
from astrbot.core.utils.string_utils import normalize_and_dedupe_strings
from astrbot.core.utils.trace import _current_span as _trace_current_span


class FunctionToolExecutor(BaseFunctionToolExecutor[AstrAgentContext]):
Expand Down Expand Up @@ -351,18 +353,51 @@ async def _execute_handoff(
prov_settings: dict = ctx.get_config(umo=umo).get("provider_settings", {})
agent_max_step = int(prov_settings.get("max_agent_step", 30))
stream = prov_settings.get("streaming_response", False)
llm_resp = await ctx.tool_loop_agent(
event=event,
chat_provider_id=prov_id,
prompt=input_,
image_urls=image_urls,
system_prompt=tool.agent.instructions,
tools=toolset,
contexts=contexts,
max_steps=agent_max_step,
tool_call_timeout=run_context.tool_call_timeout,
stream=stream,
)
# ── Trace: create a dedicated llm_agent span for this subagent ──────
_subagent_span = None
_subagent_token = None
if _astrbot_config.get("trace_enable", False):
_span_parent = _trace_current_span.get()
if _span_parent is not None:
_subagent_span = _span_parent.child(
f"LLMAgent [{tool.agent.name}]",
span_type="llm_agent",
)
_subagent_span.set_input(
subagent=tool.agent.name,
prompt=(input_ or "")[:500],
system_prompt=(tool.agent.instructions or "")[:300],
)
_subagent_token = _trace_current_span.set(_subagent_span)
# ─────────────────────────────────────────────────────────────────────
try:
llm_resp = await ctx.tool_loop_agent(
event=event,
chat_provider_id=prov_id,
prompt=input_,
image_urls=image_urls,
system_prompt=tool.agent.instructions,
tools=toolset,
contexts=contexts,
max_steps=agent_max_step,
tool_call_timeout=run_context.tool_call_timeout,
stream=stream,
)
if _subagent_span is not None and _subagent_span.finished_at is None:
_subagent_span.set_output(
response=(llm_resp.completion_text or "")[:2000]
if llm_resp
else "",
)
_subagent_span.finish()
except Exception:
if _subagent_span is not None and _subagent_span.finished_at is None:
_subagent_span.finish(status="error")
raise
finally:
if _subagent_token is not None:
_trace_current_span.reset(_subagent_token)
# ─────────────────────────────────────────────────────────────────────
yield mcp.types.CallToolResult(
content=[mcp.types.TextContent(type="text", text=llm_resp.completion_text)]
)
Expand Down
5 changes: 3 additions & 2 deletions astrbot/core/astr_main_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,11 +569,12 @@ async def _ensure_persona_and_skills(
if router_prompt:
req.system_prompt += f"\n{router_prompt}\n"
try:
event.trace.record(
"sel_persona",
persona_span = event.trace.child("sel_persona", span_type="pipeline_stage")
persona_span.set_input(
persona_id=persona_id,
persona_toolset=persona_toolset.names(),
)
persona_span.finish()
except Exception:
pass

Expand Down
Loading