-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Python: Feature: telemetry for tracking provider latency #3631
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
4f15dc0
b774d43
5d44435
ec4ca59
7423c9e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -100,6 +100,32 @@ | |||||
| 40.96, | ||||||
| 81.92, | ||||||
| ) | ||||||
| TIME_TO_FIRST_CHUNK_BUCKET_BOUNDARIES: Final[tuple[float, ...]] = ( | ||||||
| 0.01, | ||||||
| 0.02, | ||||||
| 0.04, | ||||||
| 0.08, | ||||||
| 0.16, | ||||||
| 0.32, | ||||||
| 0.64, | ||||||
| 1.28, | ||||||
| 2.56, | ||||||
| 5.12, | ||||||
| 10.24, | ||||||
| ) | ||||||
| TIME_PER_OUTPUT_CHUNK_BUCKET_BOUNDARIES: Final[tuple[float, ...]] = ( | ||||||
| 0.001, | ||||||
| 0.002, | ||||||
| 0.004, | ||||||
| 0.008, | ||||||
| 0.016, | ||||||
| 0.032, | ||||||
| 0.064, | ||||||
| 0.128, | ||||||
| 0.256, | ||||||
| 0.512, | ||||||
| 1.024, | ||||||
| ) | ||||||
|
|
||||||
|
|
||||||
| # We're recording multiple events for the chat history, some of them are emitted within (hundreds of) | ||||||
|
|
@@ -1039,6 +1065,33 @@ def _get_token_usage_histogram() -> "metrics.Histogram": | |||||
| ) | ||||||
|
|
||||||
|
|
||||||
| def _get_time_to_first_chunk_histogram() -> "metrics.Histogram": | ||||||
| return get_meter().create_histogram( | ||||||
| name="gen_ai.client.operation.time_to_first_chunk", # TODO(Brian Henson): Match semantic conventions | ||||||
| unit=OtelAttr.DURATION_UNIT, | ||||||
| description="Time from request start to first content chunk arrival", | ||||||
| explicit_bucket_boundaries_advisory=TIME_TO_FIRST_CHUNK_BUCKET_BOUNDARIES, | ||||||
| ) | ||||||
|
|
||||||
|
|
||||||
| def _get_time_per_output_chunk_histogram() -> "metrics.Histogram": | ||||||
| return get_meter().create_histogram( | ||||||
| name="gen_ai.client.operation.time_per_output_chunk", # TODO(Brian Henson): Match semantic conventions | ||||||
| unit=OtelAttr.DURATION_UNIT, | ||||||
| description="Average time between chunks after the first chunk", | ||||||
| explicit_bucket_boundaries_advisory=TIME_PER_OUTPUT_CHUNK_BUCKET_BOUNDARIES, | ||||||
| ) | ||||||
|
|
||||||
|
|
||||||
| def _get_client_operation_duration_histogram() -> "metrics.Histogram": | ||||||
| return get_meter().create_histogram( | ||||||
| name="gen_ai.client.operation.duration", | ||||||
| unit=OtelAttr.DURATION_UNIT, | ||||||
| description="Total time for the entire streaming operation from start to completion", | ||||||
| explicit_bucket_boundaries_advisory=OPERATION_DURATION_BUCKET_BOUNDARIES, | ||||||
| ) | ||||||
|
|
||||||
|
|
||||||
| # region ChatClientProtocol | ||||||
|
|
||||||
|
|
||||||
|
|
@@ -1172,6 +1225,14 @@ async def trace_get_streaming_response( | |||||
| self.additional_properties["token_usage_histogram"] = _get_token_usage_histogram() | ||||||
| if "operation_duration_histogram" not in self.additional_properties: | ||||||
| self.additional_properties["operation_duration_histogram"] = _get_duration_histogram() | ||||||
| if "time_to_first_chunk_histogram" not in self.additional_properties: | ||||||
| self.additional_properties["time_to_first_chunk_histogram"] = _get_time_to_first_chunk_histogram() | ||||||
| if "time_per_output_chunk_histogram" not in self.additional_properties: | ||||||
| self.additional_properties["time_per_output_chunk_histogram"] = _get_time_per_output_chunk_histogram() | ||||||
| if "client_operation_duration_histogram" not in self.additional_properties: | ||||||
| self.additional_properties["client_operation_duration_histogram"] = ( | ||||||
| _get_client_operation_duration_histogram() | ||||||
| ) | ||||||
|
|
||||||
| options = options or {} | ||||||
| model_id = kwargs.get("model_id") or options.get("model_id") or getattr(self, "model_id", None) or "unknown" | ||||||
|
|
@@ -1197,15 +1258,53 @@ async def trace_get_streaming_response( | |||||
| system_instructions=options.get("instructions"), | ||||||
| ) | ||||||
| start_time_stamp = perf_counter() | ||||||
| first_chunk_time: float | None = None | ||||||
| previous_chunk_time: float | None = None | ||||||
| chunk_count = 0 | ||||||
| total_inter_chunk_time = 0.0 | ||||||
| end_time_stamp: float | None = None | ||||||
| try: | ||||||
| async for update in func(self, messages=messages, options=options, **kwargs): | ||||||
| current_time = perf_counter() | ||||||
| if first_chunk_time is None: | ||||||
| # First chunk arrived | ||||||
| first_chunk_time = current_time | ||||||
| previous_chunk_time = current_time | ||||||
| chunk_count = 1 | ||||||
| else: | ||||||
| # Subsequent chunk - track inter-chunk timing | ||||||
| if previous_chunk_time is not None: | ||||||
| inter_chunk_time = current_time - previous_chunk_time | ||||||
| total_inter_chunk_time += inter_chunk_time | ||||||
| chunk_count += 1 | ||||||
| previous_chunk_time = current_time | ||||||
| all_updates.append(update) | ||||||
| yield update | ||||||
| end_time_stamp = perf_counter() | ||||||
| except Exception as exception: | ||||||
| end_time_stamp = perf_counter() | ||||||
| capture_exception(span=span, exception=exception, timestamp=time_ns()) | ||||||
| # Record metrics even if exception occurred (if we got at least one chunk) | ||||||
| if first_chunk_time is not None: | ||||||
| with contextlib.suppress(Exception): | ||||||
| _record_streaming_metrics( | ||||||
| span=span, | ||||||
| attributes=attributes, | ||||||
| start_time=start_time_stamp, | ||||||
| first_chunk_time=first_chunk_time, | ||||||
| end_time=end_time_stamp, | ||||||
| chunk_count=chunk_count, | ||||||
| total_inter_chunk_time=total_inter_chunk_time, | ||||||
| time_to_first_chunk_histogram=self.additional_properties[ | ||||||
| "time_to_first_chunk_histogram" | ||||||
| ], | ||||||
| time_per_output_chunk_histogram=self.additional_properties[ | ||||||
| "time_per_output_chunk_histogram" | ||||||
| ], | ||||||
| client_operation_duration_histogram=self.additional_properties[ | ||||||
| "client_operation_duration_histogram" | ||||||
| ], | ||||||
| ) | ||||||
| raise | ||||||
| else: | ||||||
| duration = (end_time_stamp or perf_counter()) - start_time_stamp | ||||||
|
|
@@ -1220,6 +1319,25 @@ async def trace_get_streaming_response( | |||||
| operation_duration_histogram=self.additional_properties["operation_duration_histogram"], | ||||||
| ) | ||||||
|
|
||||||
| # Record streaming-specific metrics | ||||||
| if first_chunk_time is not None: | ||||||
| _record_streaming_metrics( | ||||||
| span=span, | ||||||
| attributes=attributes, | ||||||
| start_time=start_time_stamp, | ||||||
| first_chunk_time=first_chunk_time, | ||||||
| end_time=end_time_stamp, | ||||||
| chunk_count=chunk_count, | ||||||
| total_inter_chunk_time=total_inter_chunk_time, | ||||||
| time_to_first_chunk_histogram=self.additional_properties["time_to_first_chunk_histogram"], | ||||||
| time_per_output_chunk_histogram=self.additional_properties[ | ||||||
| "time_per_output_chunk_histogram" | ||||||
| ], | ||||||
| client_operation_duration_histogram=self.additional_properties[ | ||||||
| "client_operation_duration_histogram" | ||||||
| ], | ||||||
| ) | ||||||
|
Comment on lines
+1322
to
+1339
|
||||||
|
|
||||||
| if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and response.messages: | ||||||
| _capture_messages( | ||||||
| span=span, | ||||||
|
|
@@ -1850,6 +1968,56 @@ def _capture_response( | |||||
| operation_duration_histogram.record(duration, attributes=attrs) | ||||||
|
|
||||||
|
|
||||||
| def _record_streaming_metrics( | ||||||
| span: trace.Span, | ||||||
|
||||||
| span: trace.Span, | |
| span: trace.Span, # Span parameter is currently unused; retained for future span-based metrics and API consistency. |
Copilot
AI
Feb 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring states that 'chunk_count' is "The number of chunks after the first chunk", but this is imprecise. It's actually the number of inter-chunk intervals (i.e., transitions between chunks after the first one). For 3 total chunks, this value would be 2. Consider updating the docstring to say "The number of inter-chunk intervals (chunks received after the first chunk)" to be more precise.
| chunk_count: The number of chunks after the first chunk. | |
| chunk_count: The number of inter-chunk intervals (chunks received after the first chunk). |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,7 @@ | ||
| # Copyright (c) Microsoft. All rights reserved. | ||
|
|
||
| import logging | ||
| from collections.abc import MutableSequence | ||
| from collections.abc import AsyncIterable, MutableSequence | ||
| from typing import Any | ||
| from unittest.mock import Mock | ||
|
|
||
|
|
@@ -831,6 +831,91 @@ def test_create_resource_with_custom_attributes(monkeypatch): | |
| assert resource.attributes["another_attr"] == 123 | ||
|
|
||
|
|
||
| # region Test Streaming Metrics | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def mock_timed_streaming_chat_client(): | ||
| """Create a mock chat client for streaming testing with timing.""" | ||
|
|
||
| class MockTimedStreamingChatClient(BaseChatClient): | ||
| def service_url(self): | ||
| return "https://test.example.com" | ||
|
|
||
| async def _inner_get_response(self, **kwargs): | ||
| pass | ||
|
|
||
| async def _inner_get_streaming_response( | ||
| self, *, messages: MutableSequence[ChatMessage], options: dict[str, Any], **kwargs: Any | ||
| ): | ||
| import asyncio | ||
|
|
||
| # Simulate delays to ensure timing metrics are non-zero | ||
| await asyncio.sleep(0.01) | ||
| yield ChatResponseUpdate(text="Chunk 1", role=Role.ASSISTANT) | ||
| await asyncio.sleep(0.01) | ||
| yield ChatResponseUpdate(text="Chunk 2", role=Role.ASSISTANT) | ||
| await asyncio.sleep(0.01) | ||
| yield ChatResponseUpdate(text="Chunk 3", role=Role.ASSISTANT) | ||
|
|
||
| return MockTimedStreamingChatClient | ||
|
|
||
|
|
||
| async def test_streaming_metrics_recorded(mock_timed_streaming_chat_client, span_exporter: InMemorySpanExporter): | ||
| """Test that streaming specific metrics are recorded correctly.""" | ||
| client = use_instrumentation(mock_timed_streaming_chat_client)() | ||
| messages = [ChatMessage(role=Role.USER, text="Test")] | ||
| span_exporter.clear() | ||
|
|
||
| updates = [] | ||
| async for update in client.get_streaming_response(messages=messages, model_id="TestStreaming"): | ||
| updates.append(update) | ||
|
|
||
| assert len(updates) == 3 | ||
| spans = span_exporter.get_finished_spans() | ||
| assert len(spans) == 1 | ||
| span = spans[0] | ||
| # Check that execution completed successfully and span was created | ||
| assert span.name == "chat TestStreaming" | ||
| assert span.attributes[OtelAttr.OPERATION.value] == OtelAttr.CHAT_COMPLETION_OPERATION | ||
|
Comment on lines
+864
to
+880
|
||
|
|
||
|
|
||
| @pytest.fixture | ||
| def mock_error_streaming_chat_client(): | ||
| """Create a mock chat client that fails during streaming.""" | ||
|
|
||
| class MockErrorStreamingChatClient(BaseChatClient): | ||
| def service_url(self): | ||
| return "https://test.example.com" | ||
|
|
||
| async def _inner_get_response(self, **kwargs): | ||
| pass | ||
|
|
||
| async def _inner_get_streaming_response( | ||
| self, *, messages: MutableSequence[ChatMessage], options: dict[str, Any], **kwargs: Any | ||
| ): | ||
| yield ChatResponseUpdate(text="Chunk 1", role=Role.ASSISTANT) | ||
| raise ValueError("Stream interrupted") | ||
|
|
||
| return MockErrorStreamingChatClient | ||
|
|
||
|
|
||
| async def test_streaming_metrics_with_error(mock_error_streaming_chat_client, span_exporter: InMemorySpanExporter): | ||
| """Test that metrics are recorded even if the stream fails after the first chunk.""" | ||
| client = use_instrumentation(mock_error_streaming_chat_client)() | ||
| messages = [ChatMessage(role=Role.USER, text="Test")] | ||
| span_exporter.clear() | ||
|
|
||
| with pytest.raises(ValueError, match="Stream interrupted"): | ||
| async for _ in client.get_streaming_response(messages=messages, model_id="TestError"): | ||
| pass | ||
|
|
||
| spans = span_exporter.get_finished_spans() | ||
| assert len(spans) == 1 | ||
| span = spans[0] | ||
| assert span.attributes[OtelAttr.OPERATION.value] == OtelAttr.CHAT_COMPLETION_OPERATION | ||
|
Comment on lines
+903
to
+916
|
||
|
|
||
|
|
||
| # region Test _create_otlp_exporters | ||
|
|
||
|
|
||
|
|
@@ -2216,3 +2301,46 @@ def test_capture_response(span_exporter: InMemorySpanExporter): | |
| # Verify attributes were set on the span | ||
| assert spans[0].attributes.get(OtelAttr.INPUT_TOKENS) == 100 | ||
| assert spans[0].attributes.get(OtelAttr.OUTPUT_TOKENS) == 50 | ||
|
|
||
|
|
||
| class ErrorChatClient(BaseChatClient): | ||
| """A chat client that raises an error during streaming.""" | ||
|
|
||
| OTEL_PROVIDER_NAME = "error_provider" | ||
|
|
||
| def service_url(self): | ||
| return "https://error.example.com" | ||
|
|
||
| async def _inner_get_response(self, messages, options, **kwargs): | ||
| raise NotImplementedError | ||
|
|
||
| async def _inner_get_streaming_response( | ||
| self, *, messages: list[ChatMessage], options: dict[str, Any], **kwargs: Any | ||
| ) -> AsyncIterable[ChatResponseUpdate]: | ||
| # Yield one chunk so metrics recording is triggered | ||
| yield ChatResponseUpdate(text="Chunk 1", role=Role.ASSISTANT) | ||
| # Then raise an exception | ||
| raise ValueError("Original Application Error") | ||
|
|
||
|
|
||
| async def test_streaming_error_with_metric_recording_failure(span_exporter: InMemorySpanExporter): | ||
| """ | ||
| Test that an exception during metric recording does not mask the original application error. | ||
| """ | ||
| from unittest.mock import patch | ||
|
|
||
| client = use_instrumentation(ErrorChatClient)() | ||
| messages = [ChatMessage(role=Role.USER, text="Test")] | ||
| span_exporter.clear() | ||
|
|
||
| # Mock _record_streaming_metrics to raise an exception | ||
| with patch("agent_framework.observability._record_streaming_metrics") as mock_metrics: | ||
| mock_metrics.side_effect = Exception("Metric Recording Failed") | ||
|
|
||
| # We expect the ORIGINAL ValueError, not the "Metric Recording Failed" exception | ||
| with pytest.raises(ValueError, match="Original Application Error"): | ||
| async for _ in client.get_streaming_response(messages=messages): | ||
| pass | ||
|
|
||
| # Verify that _record_streaming_metrics was actually called | ||
| assert mock_metrics.called | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable name 'chunk_count' is misleading because it actually represents the number of inter-chunk intervals (which is one less than the total number of chunks). For example, if 3 chunks are received, chunk_count will be 2. Consider renaming this to 'inter_chunk_count' or 'chunk_intervals_count' to make the intent clearer and avoid confusion.