Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions python/packages/core/agent_framework/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,32 @@
40.96,
81.92,
)
TIME_TO_FIRST_CHUNK_BUCKET_BOUNDARIES: Final[tuple[float, ...]] = (
0.01,
0.02,
0.04,
0.08,
0.16,
0.32,
0.64,
1.28,
2.56,
5.12,
10.24,
)
TIME_PER_OUTPUT_CHUNK_BUCKET_BOUNDARIES: Final[tuple[float, ...]] = (
0.001,
0.002,
0.004,
0.008,
0.016,
0.032,
0.064,
0.128,
0.256,
0.512,
1.024,
)


# We're recording multiple events for the chat history, some of them are emitted within (hundreds of)
Expand Down Expand Up @@ -1039,6 +1065,33 @@ def _get_token_usage_histogram() -> "metrics.Histogram":
)


def _get_time_to_first_chunk_histogram() -> "metrics.Histogram":
return get_meter().create_histogram(
name="gen_ai.client.operation.time_to_first_chunk", # TODO(Brian Henson): Match semantic conventions
unit=OtelAttr.DURATION_UNIT,
description="Time from request start to first content chunk arrival",
explicit_bucket_boundaries_advisory=TIME_TO_FIRST_CHUNK_BUCKET_BOUNDARIES,
)


def _get_time_per_output_chunk_histogram() -> "metrics.Histogram":
return get_meter().create_histogram(
name="gen_ai.client.operation.time_per_output_chunk", # TODO(Brian Henson): Match semantic conventions
unit=OtelAttr.DURATION_UNIT,
description="Average time between chunks after the first chunk",
explicit_bucket_boundaries_advisory=TIME_PER_OUTPUT_CHUNK_BUCKET_BOUNDARIES,
)


def _get_client_operation_duration_histogram() -> "metrics.Histogram":
return get_meter().create_histogram(
name="gen_ai.client.operation.duration",
unit=OtelAttr.DURATION_UNIT,
description="Total time for the entire streaming operation from start to completion",
explicit_bucket_boundaries_advisory=OPERATION_DURATION_BUCKET_BOUNDARIES,
)


# region ChatClientProtocol


Expand Down Expand Up @@ -1172,6 +1225,14 @@ async def trace_get_streaming_response(
self.additional_properties["token_usage_histogram"] = _get_token_usage_histogram()
if "operation_duration_histogram" not in self.additional_properties:
self.additional_properties["operation_duration_histogram"] = _get_duration_histogram()
if "time_to_first_chunk_histogram" not in self.additional_properties:
self.additional_properties["time_to_first_chunk_histogram"] = _get_time_to_first_chunk_histogram()
if "time_per_output_chunk_histogram" not in self.additional_properties:
self.additional_properties["time_per_output_chunk_histogram"] = _get_time_per_output_chunk_histogram()
if "client_operation_duration_histogram" not in self.additional_properties:
self.additional_properties["client_operation_duration_histogram"] = (
_get_client_operation_duration_histogram()
)

options = options or {}
model_id = kwargs.get("model_id") or options.get("model_id") or getattr(self, "model_id", None) or "unknown"
Expand All @@ -1197,15 +1258,53 @@ async def trace_get_streaming_response(
system_instructions=options.get("instructions"),
)
start_time_stamp = perf_counter()
first_chunk_time: float | None = None
previous_chunk_time: float | None = None
chunk_count = 0
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable name 'chunk_count' is misleading because it actually represents the number of inter-chunk intervals (which is one less than the total number of chunks). For example, if 3 chunks are received, chunk_count will be 2. Consider renaming this to 'inter_chunk_count' or 'chunk_intervals_count' to make the intent clearer and avoid confusion.

Copilot uses AI. Check for mistakes.
total_inter_chunk_time = 0.0
end_time_stamp: float | None = None
try:
async for update in func(self, messages=messages, options=options, **kwargs):
current_time = perf_counter()
if first_chunk_time is None:
# First chunk arrived
first_chunk_time = current_time
previous_chunk_time = current_time
chunk_count = 1
else:
# Subsequent chunk - track inter-chunk timing
if previous_chunk_time is not None:
inter_chunk_time = current_time - previous_chunk_time
total_inter_chunk_time += inter_chunk_time
chunk_count += 1
previous_chunk_time = current_time
all_updates.append(update)
yield update
end_time_stamp = perf_counter()
except Exception as exception:
end_time_stamp = perf_counter()
capture_exception(span=span, exception=exception, timestamp=time_ns())
# Record metrics even if exception occurred (if we got at least one chunk)
if first_chunk_time is not None:
with contextlib.suppress(Exception):
_record_streaming_metrics(
span=span,
attributes=attributes,
start_time=start_time_stamp,
first_chunk_time=first_chunk_time,
end_time=end_time_stamp,
chunk_count=chunk_count,
total_inter_chunk_time=total_inter_chunk_time,
time_to_first_chunk_histogram=self.additional_properties[
"time_to_first_chunk_histogram"
],
time_per_output_chunk_histogram=self.additional_properties[
"time_per_output_chunk_histogram"
],
client_operation_duration_histogram=self.additional_properties[
"client_operation_duration_histogram"
],
)
raise
else:
duration = (end_time_stamp or perf_counter()) - start_time_stamp
Expand All @@ -1220,6 +1319,25 @@ async def trace_get_streaming_response(
operation_duration_histogram=self.additional_properties["operation_duration_histogram"],
)

# Record streaming-specific metrics
if first_chunk_time is not None:
_record_streaming_metrics(
span=span,
attributes=attributes,
start_time=start_time_stamp,
first_chunk_time=first_chunk_time,
end_time=end_time_stamp,
chunk_count=chunk_count,
total_inter_chunk_time=total_inter_chunk_time,
time_to_first_chunk_histogram=self.additional_properties["time_to_first_chunk_histogram"],
time_per_output_chunk_histogram=self.additional_properties[
"time_per_output_chunk_histogram"
],
client_operation_duration_histogram=self.additional_properties[
"client_operation_duration_histogram"
],
)
Comment on lines +1322 to +1339
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two separate duration metrics are being recorded for streaming operations: 'gen_ai.operation.duration' (via _capture_response) and 'gen_ai.client.operation.duration' (via _record_streaming_metrics). While this might be intentional to provide both a general operation duration and a client-specific streaming duration, consider documenting the distinction between these metrics or consolidating them if they measure the same thing. Both are calculated as end_time - start_time over the same time period.

Copilot uses AI. Check for mistakes.

if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and response.messages:
_capture_messages(
span=span,
Expand Down Expand Up @@ -1850,6 +1968,56 @@ def _capture_response(
operation_duration_histogram.record(duration, attributes=attrs)


def _record_streaming_metrics(
span: trace.Span,
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 'span' parameter is not used in the function body. It's passed in but never referenced. Consider removing this parameter if it's not needed, or add a comment explaining why it's included for future extensibility.

Suggested change
span: trace.Span,
span: trace.Span, # Span parameter is currently unused; retained for future span-based metrics and API consistency.

Copilot uses AI. Check for mistakes.
attributes: dict[str, Any],
start_time: float,
first_chunk_time: float,
end_time: float | None,
chunk_count: int,
total_inter_chunk_time: float,
time_to_first_chunk_histogram: "metrics.Histogram | None" = None,
time_per_output_chunk_histogram: "metrics.Histogram | None" = None,
client_operation_duration_histogram: "metrics.Histogram | None" = None,
) -> None:
"""Record streaming-specific metrics for client operations.

Args:
span: The span to record metrics for.
attributes: The attributes dictionary containing metric attributes.
start_time: The start time of the streaming operation.
first_chunk_time: The time when the first chunk arrived.
end_time: The end time of the streaming operation.
chunk_count: The number of chunks after the first chunk.
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring states that 'chunk_count' is "The number of chunks after the first chunk", but this is imprecise. It's actually the number of inter-chunk intervals (i.e., transitions between chunks after the first one). For 3 total chunks, this value would be 2. Consider updating the docstring to say "The number of inter-chunk intervals (chunks received after the first chunk)" to be more precise.

Suggested change
chunk_count: The number of chunks after the first chunk.
chunk_count: The number of inter-chunk intervals (chunks received after the first chunk).

Copilot uses AI. Check for mistakes.
total_inter_chunk_time: The sum of inter-chunk intervals.
time_to_first_chunk_histogram: Histogram for time to first chunk metric.
time_per_output_chunk_histogram: Histogram for time per output chunk metric.
client_operation_duration_histogram: Histogram for total duration metric.
"""
if end_time is None:
return

# Extract metric attributes (same as GEN_AI_METRIC_ATTRIBUTES)
attrs: dict[str, Any] = {k: v for k, v in attributes.items() if k in GEN_AI_METRIC_ATTRIBUTES}
if OtelAttr.ERROR_TYPE in attributes:
attrs[OtelAttr.ERROR_TYPE] = attributes[OtelAttr.ERROR_TYPE]

# Calculate time to first chunk
time_to_first_chunk = first_chunk_time - start_time
if time_to_first_chunk_histogram:
time_to_first_chunk_histogram.record(time_to_first_chunk, attributes=attrs)

# Calculate time per output chunk (average)
if chunk_count > 0 and time_per_output_chunk_histogram:
time_per_output_chunk = total_inter_chunk_time / chunk_count
time_per_output_chunk_histogram.record(time_per_output_chunk, attributes=attrs)

# Calculate total duration
duration = end_time - start_time
if client_operation_duration_histogram:
client_operation_duration_histogram.record(duration, attributes=attrs)


class EdgeGroupDeliveryStatus(Enum):
"""Enum for edge group delivery status values."""

Expand Down
130 changes: 129 additions & 1 deletion python/packages/core/tests/core/test_observability.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) Microsoft. All rights reserved.

import logging
from collections.abc import MutableSequence
from collections.abc import AsyncIterable, MutableSequence
from typing import Any
from unittest.mock import Mock

Expand Down Expand Up @@ -831,6 +831,91 @@ def test_create_resource_with_custom_attributes(monkeypatch):
assert resource.attributes["another_attr"] == 123


# region Test Streaming Metrics


@pytest.fixture
def mock_timed_streaming_chat_client():
"""Create a mock chat client for streaming testing with timing."""

class MockTimedStreamingChatClient(BaseChatClient):
def service_url(self):
return "https://test.example.com"

async def _inner_get_response(self, **kwargs):
pass

async def _inner_get_streaming_response(
self, *, messages: MutableSequence[ChatMessage], options: dict[str, Any], **kwargs: Any
):
import asyncio

# Simulate delays to ensure timing metrics are non-zero
await asyncio.sleep(0.01)
yield ChatResponseUpdate(text="Chunk 1", role=Role.ASSISTANT)
await asyncio.sleep(0.01)
yield ChatResponseUpdate(text="Chunk 2", role=Role.ASSISTANT)
await asyncio.sleep(0.01)
yield ChatResponseUpdate(text="Chunk 3", role=Role.ASSISTANT)

return MockTimedStreamingChatClient


async def test_streaming_metrics_recorded(mock_timed_streaming_chat_client, span_exporter: InMemorySpanExporter):
"""Test that streaming specific metrics are recorded correctly."""
client = use_instrumentation(mock_timed_streaming_chat_client)()
messages = [ChatMessage(role=Role.USER, text="Test")]
span_exporter.clear()

updates = []
async for update in client.get_streaming_response(messages=messages, model_id="TestStreaming"):
updates.append(update)

assert len(updates) == 3
spans = span_exporter.get_finished_spans()
assert len(spans) == 1
span = spans[0]
# Check that execution completed successfully and span was created
assert span.name == "chat TestStreaming"
assert span.attributes[OtelAttr.OPERATION.value] == OtelAttr.CHAT_COMPLETION_OPERATION
Comment on lines +864 to +880
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test does not verify that the new streaming metrics are actually recorded. It only checks that the span was created and has the correct operation attribute. Consider adding assertions to verify that the time_to_first_chunk, time_per_output_chunk, and client_operation_duration metrics were recorded with expected values or at least recorded at all. This would ensure the feature is working as intended.

Copilot uses AI. Check for mistakes.


@pytest.fixture
def mock_error_streaming_chat_client():
"""Create a mock chat client that fails during streaming."""

class MockErrorStreamingChatClient(BaseChatClient):
def service_url(self):
return "https://test.example.com"

async def _inner_get_response(self, **kwargs):
pass

async def _inner_get_streaming_response(
self, *, messages: MutableSequence[ChatMessage], options: dict[str, Any], **kwargs: Any
):
yield ChatResponseUpdate(text="Chunk 1", role=Role.ASSISTANT)
raise ValueError("Stream interrupted")

return MockErrorStreamingChatClient


async def test_streaming_metrics_with_error(mock_error_streaming_chat_client, span_exporter: InMemorySpanExporter):
"""Test that metrics are recorded even if the stream fails after the first chunk."""
client = use_instrumentation(mock_error_streaming_chat_client)()
messages = [ChatMessage(role=Role.USER, text="Test")]
span_exporter.clear()

with pytest.raises(ValueError, match="Stream interrupted"):
async for _ in client.get_streaming_response(messages=messages, model_id="TestError"):
pass

spans = span_exporter.get_finished_spans()
assert len(spans) == 1
span = spans[0]
assert span.attributes[OtelAttr.OPERATION.value] == OtelAttr.CHAT_COMPLETION_OPERATION
Comment on lines +903 to +916
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test does not verify that the streaming metrics are recorded when an error occurs. While it checks that the span contains the correct operation attribute, it should also verify that the time_to_first_chunk and client_operation_duration metrics were recorded (since at least one chunk was received before the error). This would ensure that the error handling path correctly records partial metrics.

Copilot uses AI. Check for mistakes.


# region Test _create_otlp_exporters


Expand Down Expand Up @@ -2216,3 +2301,46 @@ def test_capture_response(span_exporter: InMemorySpanExporter):
# Verify attributes were set on the span
assert spans[0].attributes.get(OtelAttr.INPUT_TOKENS) == 100
assert spans[0].attributes.get(OtelAttr.OUTPUT_TOKENS) == 50


class ErrorChatClient(BaseChatClient):
"""A chat client that raises an error during streaming."""

OTEL_PROVIDER_NAME = "error_provider"

def service_url(self):
return "https://error.example.com"

async def _inner_get_response(self, messages, options, **kwargs):
raise NotImplementedError

async def _inner_get_streaming_response(
self, *, messages: list[ChatMessage], options: dict[str, Any], **kwargs: Any
) -> AsyncIterable[ChatResponseUpdate]:
# Yield one chunk so metrics recording is triggered
yield ChatResponseUpdate(text="Chunk 1", role=Role.ASSISTANT)
# Then raise an exception
raise ValueError("Original Application Error")


async def test_streaming_error_with_metric_recording_failure(span_exporter: InMemorySpanExporter):
"""
Test that an exception during metric recording does not mask the original application error.
"""
from unittest.mock import patch

client = use_instrumentation(ErrorChatClient)()
messages = [ChatMessage(role=Role.USER, text="Test")]
span_exporter.clear()

# Mock _record_streaming_metrics to raise an exception
with patch("agent_framework.observability._record_streaming_metrics") as mock_metrics:
mock_metrics.side_effect = Exception("Metric Recording Failed")

# We expect the ORIGINAL ValueError, not the "Metric Recording Failed" exception
with pytest.raises(ValueError, match="Original Application Error"):
async for _ in client.get_streaming_response(messages=messages):
pass

# Verify that _record_streaming_metrics was actually called
assert mock_metrics.called