Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 124 additions & 51 deletions python/packages/core/agent_framework/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from __future__ import annotations

import contextlib
import contextvars
import json
import logging
import os
Expand Down Expand Up @@ -65,6 +66,7 @@
GeneratedEmbeddings,
Message,
ResponseStream,
UsageDetails,
)

ResponseModelBoundT = TypeVar("ResponseModelBoundT", bound=BaseModel)
Expand Down Expand Up @@ -93,6 +95,18 @@
logger = logging.getLogger("agent_framework")


INNER_RESPONSE_TELEMETRY_CAPTURED_FIELDS: Final[contextvars.ContextVar[set[str] | None]] = contextvars.ContextVar(
"inner_response_telemetry_captured_fields", default=None
)
INNER_RESPONSE_ID_CAPTURED_FIELD: Final[str] = "response_id"
INNER_USAGE_CAPTURED_FIELD: Final[str] = "usage"

# Tracks accumulated token usage from all inner chat completion spans within an agent invoke.
INNER_ACCUMULATED_USAGE: Final[contextvars.ContextVar[UsageDetails | None]] = contextvars.ContextVar(
"inner_accumulated_usage", default=None
)


OTEL_METRICS: Final[str] = "__otel_metrics__"
TOKEN_USAGE_BUCKET_BOUNDARIES: Final[tuple[float, ...]] = (
1,
Expand Down Expand Up @@ -1314,6 +1328,7 @@ async def _finalize_stream() -> None:
operation_duration_histogram=getattr(self, "duration_histogram", None),
duration=duration,
)
_mark_inner_response_telemetry_captured(response)
if (
OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED
and isinstance(response, ChatResponse)
Expand Down Expand Up @@ -1373,6 +1388,7 @@ async def _get_response() -> ChatResponse:
operation_duration_histogram=getattr(self, "duration_histogram", None),
duration=duration,
)
_mark_inner_response_telemetry_captured(response)
if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and response.messages:
finish_reason = cast(
"FinishReason | None",
Expand Down Expand Up @@ -1527,8 +1543,6 @@ def run(
super().run, # type: ignore[misc]
)
provider_name = str(self.otel_provider_name)
capture_usage = bool(getattr(self, "_otel_capture_usage", True))

if not OBSERVABILITY_SETTINGS.ENABLED:
return super_run( # type: ignore[no-any-return]
messages=messages,
Expand Down Expand Up @@ -1557,23 +1571,34 @@ def run(
**merged_client_kwargs,
)

inner_response_telemetry_captured_fields: set[str] = set()
inner_response_telemetry_captured_fields_token = INNER_RESPONSE_TELEMETRY_CAPTURED_FIELDS.set(
inner_response_telemetry_captured_fields
)
inner_accumulated_usage_token = INNER_ACCUMULATED_USAGE.set({})

if stream:
run_result: object = super_run(
messages=messages,
stream=True,
session=session,
compaction_strategy=compaction_strategy,
tokenizer=tokenizer,
function_invocation_kwargs=function_invocation_kwargs,
client_kwargs=client_kwargs,
**kwargs,
)
if isinstance(run_result, ResponseStream):
result_stream: ResponseStream[AgentResponseUpdate, AgentResponse[Any]] = run_result # pyright: ignore[reportUnknownVariableType]
elif isinstance(run_result, Awaitable):
result_stream = ResponseStream.from_awaitable(run_result) # type: ignore[arg-type] # pyright: ignore[reportArgumentType]
else:
raise RuntimeError("Streaming telemetry requires a ResponseStream result.")
try:
run_result: object = super_run(
messages=messages,
stream=True,
session=session,
compaction_strategy=compaction_strategy,
tokenizer=tokenizer,
function_invocation_kwargs=function_invocation_kwargs,
client_kwargs=client_kwargs,
**kwargs,
)
if isinstance(run_result, ResponseStream):
result_stream: ResponseStream[AgentResponseUpdate, AgentResponse[Any]] = run_result # pyright: ignore[reportUnknownVariableType]
elif isinstance(run_result, Awaitable):
result_stream = ResponseStream.from_awaitable(run_result) # type: ignore[arg-type] # pyright: ignore[reportArgumentType]
else:
raise RuntimeError("Streaming telemetry requires a ResponseStream result.")
except Exception:
INNER_RESPONSE_TELEMETRY_CAPTURED_FIELDS.reset(inner_response_telemetry_captured_fields_token)
INNER_ACCUMULATED_USAGE.reset(inner_accumulated_usage_token)
raise

# Create span directly without trace.use_span() context attachment.
# Streaming spans are closed asynchronously in cleanup hooks, which run
Expand Down Expand Up @@ -1613,8 +1638,11 @@ async def _finalize_stream() -> None:
response_attributes = _get_response_attributes(
attributes,
response,
capture_usage=capture_usage,
capture_response_id=INNER_RESPONSE_ID_CAPTURED_FIELD
not in inner_response_telemetry_captured_fields,
capture_usage=INNER_USAGE_CAPTURED_FIELD not in inner_response_telemetry_captured_fields,
)
_apply_accumulated_usage(response_attributes, inner_response_telemetry_captured_fields)
_capture_response(span=span, attributes=response_attributes, duration=duration)
if (
OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED
Expand All @@ -1630,6 +1658,8 @@ async def _finalize_stream() -> None:
except Exception as exception:
capture_exception(span=span, exception=exception, timestamp=time_ns())
finally:
INNER_RESPONSE_TELEMETRY_CAPTURED_FIELDS.reset(inner_response_telemetry_captured_fields_token)
INNER_ACCUMULATED_USAGE.reset(inner_accumulated_usage_token)
_close_span()

# Register a weak reference callback to close the span if stream is garbage collected
Expand All @@ -1641,41 +1671,52 @@ async def _finalize_stream() -> None:
return wrapped_stream

async def _run() -> AgentResponse:
with _get_span(attributes=attributes, span_name_attribute=OtelAttr.AGENT_NAME) as span:
if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and messages:
_capture_messages(
span=span,
provider_name=provider_name,
messages=messages,
system_instructions=_get_instructions_from_options(merged_options),
)
start_time_stamp = perf_counter()
try:
response: AgentResponse[Any] = await super_run(
messages=messages,
stream=False,
session=session,
compaction_strategy=compaction_strategy,
tokenizer=tokenizer,
function_invocation_kwargs=function_invocation_kwargs,
client_kwargs=client_kwargs,
**kwargs,
)
except Exception as exception:
capture_exception(span=span, exception=exception, timestamp=time_ns())
raise
duration = perf_counter() - start_time_stamp
if response:
response_attributes = _get_response_attributes(attributes, response, capture_usage=capture_usage)
_capture_response(span=span, attributes=response_attributes, duration=duration)
if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and response.messages:
try:
with _get_span(attributes=attributes, span_name_attribute=OtelAttr.AGENT_NAME) as span:
if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and messages:
_capture_messages(
span=span,
provider_name=provider_name,
messages=response.messages,
output=True,
messages=messages,
system_instructions=_get_instructions_from_options(merged_options),
)
return response # type: ignore[return-value,no-any-return]
start_time_stamp = perf_counter()
try:
response: AgentResponse[Any] = await super_run(
messages=messages,
stream=False,
session=session,
compaction_strategy=compaction_strategy,
tokenizer=tokenizer,
function_invocation_kwargs=function_invocation_kwargs,
client_kwargs=client_kwargs,
**kwargs,
)
except Exception as exception:
capture_exception(span=span, exception=exception, timestamp=time_ns())
raise
duration = perf_counter() - start_time_stamp
if response:
response_attributes = _get_response_attributes(
attributes,
response,
capture_response_id=INNER_RESPONSE_ID_CAPTURED_FIELD
not in inner_response_telemetry_captured_fields,
capture_usage=INNER_USAGE_CAPTURED_FIELD not in inner_response_telemetry_captured_fields,
)
_apply_accumulated_usage(response_attributes, inner_response_telemetry_captured_fields)
_capture_response(span=span, attributes=response_attributes, duration=duration)
if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and response.messages:
_capture_messages(
span=span,
provider_name=provider_name,
messages=response.messages,
output=True,
)
return response # type: ignore[return-value,no-any-return]
finally:
INNER_RESPONSE_TELEMETRY_CAPTURED_FIELDS.reset(inner_response_telemetry_captured_fields_token)
INNER_ACCUMULATED_USAGE.reset(inner_accumulated_usage_token)

return _run()

Expand Down Expand Up @@ -1931,14 +1972,46 @@ def _to_otel_part(content: Content) -> dict[str, Any] | None:
return None


def _mark_inner_response_telemetry_captured(response: ChatResponse | AgentResponse) -> None:
"""Record when an inner chat telemetry span already captured response metadata."""
captured_fields = INNER_RESPONSE_TELEMETRY_CAPTURED_FIELDS.get()
if captured_fields is None:
return
if response.response_id:
captured_fields.add(INNER_RESPONSE_ID_CAPTURED_FIELD)
if response.usage_details:
captured_fields.add(INNER_USAGE_CAPTURED_FIELD)
accumulated = INNER_ACCUMULATED_USAGE.get()
if accumulated is not None:
from ._types import add_usage_details

INNER_ACCUMULATED_USAGE.set(add_usage_details(accumulated, response.usage_details))


def _apply_accumulated_usage(attributes: dict[str, Any], captured_fields: set[str]) -> None:
"""Apply accumulated usage from inner chat spans to the invoke_agent span attributes."""
if INNER_USAGE_CAPTURED_FIELD not in captured_fields:
return
accumulated = INNER_ACCUMULATED_USAGE.get()
if not accumulated:
return
input_tokens = accumulated.get("input_token_count")
if input_tokens:
attributes[OtelAttr.INPUT_TOKENS] = input_tokens
output_tokens = accumulated.get("output_token_count")
if output_tokens:
attributes[OtelAttr.OUTPUT_TOKENS] = output_tokens


def _get_response_attributes(
attributes: dict[str, Any],
response: ChatResponse | AgentResponse,
*,
capture_response_id: bool = True,
capture_usage: bool = True,
) -> dict[str, Any]:
"""Get the response attributes from a response."""
if response.response_id:
if capture_response_id and response.response_id:
attributes[OtelAttr.RESPONSE_ID] = response.response_id
finish_reason = getattr(response, "finish_reason", None)
if not finish_reason:
Expand Down
Loading
Loading