Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions util/opentelemetry-util-genai/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Add `AgentInvocation` type with `invoke_agent` span lifecycle via `start_invoke_local_agent` / `start_invoke_remote_agent` factory methods and `invoke_local_agent` / `invoke_remote_agent` context managers; add shared `_content.py` helper to deduplicate message serialization between `AgentInvocation` and `InferenceInvocation`; add `tool_definitions`, `cache_creation_input_tokens`, and `cache_read_input_tokens` to `InferenceInvocation`
([#4274](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4274))
- Add metrics support for EmbeddingInvocation
([#4377](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4377))
- Add support for workflow in genAI utils handler.
Expand Down
2 changes: 1 addition & 1 deletion util/opentelemetry-util-genai/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ classifiers = [
dependencies = [
"opentelemetry-instrumentation ~= 0.60b0",
"opentelemetry-semantic-conventions ~= 0.60b0",
"opentelemetry-api>=1.39",
"opentelemetry-api ~= 1.39",
]

[project.entry-points.opentelemetry_genai_completion_hook]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from typing import Any

from opentelemetry._logs import Logger
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
)
from opentelemetry.semconv.attributes import server_attributes
from opentelemetry.trace import SpanKind, Tracer
from opentelemetry.util.genai._invocation import (
Error,
GenAIInvocation,
get_content_attributes,
)
from opentelemetry.util.genai.metrics import InvocationMetricsRecorder
from opentelemetry.util.genai.types import (
InputMessage,
MessagePart,
OutputMessage,
ToolDefinition,
)

_GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS: str = getattr(
GenAI,
"GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS",
"gen_ai.usage.cache_creation.input_tokens",
)
_GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS: str = getattr(
GenAI,
"GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS",
"gen_ai.usage.cache_read.input_tokens",
)


class AgentInvocation(GenAIInvocation):
"""Represents a single agent invocation (invoke_agent span).

Use handler.start_invoke_local_agent() / handler.start_invoke_remote_agent()
or the handler.invoke_local_agent() / handler.invoke_remote_agent() context
managers rather than constructing this directly.

Reference:
Client span: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-agent-spans.md#invoke-agent-client-span
Internal span: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-agent-spans.md#invoke-agent-internal-span
"""

def __init__(
self,
tracer: Tracer,
metrics_recorder: InvocationMetricsRecorder,
logger: Logger,
provider: str,
*,
span_kind: SpanKind = SpanKind.INTERNAL,
request_model: str | None = None,
server_address: str | None = None,
server_port: int | None = None,
attributes: dict[str, Any] | None = None,
metric_attributes: dict[str, Any] | None = None,
) -> None:
"""Use handler.start_invoke_local_agent() or handler.start_invoke_remote_agent() instead of calling this directly."""
_operation_name = GenAI.GenAiOperationNameValues.INVOKE_AGENT.value
super().__init__(
tracer,
metrics_recorder,
logger,
operation_name=_operation_name,
span_name=_operation_name,
span_kind=span_kind,
attributes=attributes,
metric_attributes=metric_attributes,
)
self.provider = provider
self.request_model = request_model
self.server_address = server_address
self.server_port = server_port

self.agent_name: str | None = None
self.agent_id: str | None = None
self.agent_description: str | None = None
self.agent_version: str | None = None

self.conversation_id: str | None = None
self.data_source_id: str | None = None
self.output_type: str | None = None

self.temperature: float | None = None
self.top_p: float | None = None
self.frequency_penalty: float | None = None
self.presence_penalty: float | None = None
self.max_tokens: int | None = None
self.stop_sequences: list[str] | None = None
self.seed: int | None = None
self.choice_count: int | None = None

self.input_tokens: int | None = None
self.output_tokens: int | None = None
self.cache_creation_input_tokens: int | None = None
Comment thread
etserend marked this conversation as resolved.
self.cache_read_input_tokens: int | None = None

self.input_messages: list[InputMessage] = []
self.output_messages: list[OutputMessage] = []
self.system_instruction: list[MessagePart] = []
self.tool_definitions: list[ToolDefinition] | None = None

self._start()

def _get_common_attributes(self) -> dict[str, Any]:
optional_attrs = (
(GenAI.GEN_AI_REQUEST_MODEL, self.request_model),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question(blocking): We have these two attributes gen_ai.response.finish_reasons and gen_ai.response.model in the invoke_agent span as per this documentation. Should we add this to AgentInvocation class?

(server_attributes.SERVER_ADDRESS, self.server_address),
(server_attributes.SERVER_PORT, self.server_port),
(GenAI.GEN_AI_AGENT_NAME, self.agent_name),
(GenAI.GEN_AI_AGENT_ID, self.agent_id),
(GenAI.GEN_AI_AGENT_DESCRIPTION, self.agent_description),
(GenAI.GEN_AI_AGENT_VERSION, self.agent_version),
)
return {
GenAI.GEN_AI_OPERATION_NAME: self._operation_name,
GenAI.GEN_AI_PROVIDER_NAME: self.provider,
**{k: v for k, v in optional_attrs if v is not None},
}

def _get_request_attributes(self) -> dict[str, Any]:
optional_attrs = (
(GenAI.GEN_AI_CONVERSATION_ID, self.conversation_id),
(GenAI.GEN_AI_DATA_SOURCE_ID, self.data_source_id),
(GenAI.GEN_AI_OUTPUT_TYPE, self.output_type),
(GenAI.GEN_AI_REQUEST_TEMPERATURE, self.temperature),
(GenAI.GEN_AI_REQUEST_TOP_P, self.top_p),
(GenAI.GEN_AI_REQUEST_FREQUENCY_PENALTY, self.frequency_penalty),
(GenAI.GEN_AI_REQUEST_PRESENCE_PENALTY, self.presence_penalty),
(GenAI.GEN_AI_REQUEST_MAX_TOKENS, self.max_tokens),
(GenAI.GEN_AI_REQUEST_STOP_SEQUENCES, self.stop_sequences),
(GenAI.GEN_AI_REQUEST_SEED, self.seed),
(GenAI.GEN_AI_REQUEST_CHOICE_COUNT, self.choice_count),
)
return {k: v for k, v in optional_attrs if v is not None}

def _get_usage_attributes(self) -> dict[str, Any]:
optional_attrs = (
(GenAI.GEN_AI_USAGE_INPUT_TOKENS, self.input_tokens),
(GenAI.GEN_AI_USAGE_OUTPUT_TOKENS, self.output_tokens),
(
_GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS,
self.cache_creation_input_tokens,
),
(
_GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS,
self.cache_read_input_tokens,
),
)
return {k: v for k, v in optional_attrs if v is not None}

def _get_content_attributes_for_span(self) -> dict[str, Any]:
return get_content_attributes(
input_messages=self.input_messages,
output_messages=self.output_messages,
system_instruction=self.system_instruction,
tool_definitions=self.tool_definitions,
for_span=True,
)

def _get_metric_attributes(self) -> dict[str, Any]:
optional_attrs = (
(GenAI.GEN_AI_PROVIDER_NAME, self.provider),
(GenAI.GEN_AI_REQUEST_MODEL, self.request_model),
(server_attributes.SERVER_ADDRESS, self.server_address),
(server_attributes.SERVER_PORT, self.server_port),
)
attrs: dict[str, Any] = {
GenAI.GEN_AI_OPERATION_NAME: self._operation_name,
**{k: v for k, v in optional_attrs if v is not None},
}
attrs.update(self.metric_attributes)
return attrs

def _get_metric_token_counts(self) -> dict[str, int]:
counts: dict[str, int] = {}
if self.input_tokens is not None:
counts[GenAI.GenAiTokenTypeValues.INPUT.value] = self.input_tokens
if self.output_tokens is not None:
counts[GenAI.GenAiTokenTypeValues.OUTPUT.value] = (
self.output_tokens
)
return counts

def _apply_finish(self, error: Error | None = None) -> None:
if error is not None:
self._apply_error_attributes(error)

# Update span name if agent_name was set after construction
if self.agent_name:
self.span.update_name(f"{self._operation_name} {self.agent_name}")

attributes: dict[str, Any] = {}
attributes.update(self._get_common_attributes())
attributes.update(self._get_request_attributes())
attributes.update(self._get_usage_attributes())
attributes.update(self._get_content_attributes_for_span())
attributes.update(self.attributes)
self.span.set_attributes(attributes)
self._metrics_recorder.record(self)
Comment thread
etserend marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from __future__ import annotations

from dataclasses import asdict, dataclass, field
from dataclasses import dataclass, field
from typing import Any

from opentelemetry._logs import Logger, LogRecord
Expand All @@ -23,21 +23,34 @@
)
from opentelemetry.semconv.attributes import server_attributes
from opentelemetry.trace import INVALID_SPAN, Span, SpanKind, Tracer
from opentelemetry.util.genai._invocation import Error, GenAIInvocation
from opentelemetry.util.genai._invocation import (
Error,
GenAIInvocation,
get_content_attributes,
)
from opentelemetry.util.genai.metrics import InvocationMetricsRecorder
from opentelemetry.util.genai.types import (
InputMessage,
MessagePart,
OutputMessage,
ToolDefinition,
)
from opentelemetry.util.genai.utils import (
ContentCapturingMode,
gen_ai_json_dumps,
get_content_capturing_mode,
is_experimental_mode,
should_emit_event,
)

_GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS: str = getattr(
GenAI,
"GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS",
"gen_ai.usage.cache_creation.input_tokens",
)
_GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS: str = getattr(
GenAI,
"GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS",
"gen_ai.usage.cache_read.input_tokens",
)


class InferenceInvocation(GenAIInvocation):
"""Represents a single LLM chat/completion call.
Expand Down Expand Up @@ -113,53 +126,19 @@ def __init__( # pylint: disable=too-many-locals
self.seed = seed
self.server_address = server_address
self.server_port = server_port
self.cache_creation_input_tokens: int | None = None
self.cache_read_input_tokens: int | None = None
self.tool_definitions: list[ToolDefinition] | None = None
self._start()

def _get_message_attributes(self, *, for_span: bool) -> dict[str, Any]:
if not is_experimental_mode():
return {}
mode = get_content_capturing_mode()
allowed_modes = (
(
ContentCapturingMode.SPAN_ONLY,
ContentCapturingMode.SPAN_AND_EVENT,
)
if for_span
else (
ContentCapturingMode.EVENT_ONLY,
ContentCapturingMode.SPAN_AND_EVENT,
)
)
if mode not in allowed_modes:
return {}

def serialize(items: list[Any]) -> Any:
dicts = [asdict(item) for item in items]
return gen_ai_json_dumps(dicts) if for_span else dicts

optional_attrs = (
(
GenAI.GEN_AI_INPUT_MESSAGES,
serialize(self.input_messages)
if self.input_messages
else None,
),
(
GenAI.GEN_AI_OUTPUT_MESSAGES,
serialize(self.output_messages)
if self.output_messages
else None,
),
(
GenAI.GEN_AI_SYSTEM_INSTRUCTIONS,
serialize(self.system_instruction)
if self.system_instruction
else None,
),
return get_content_attributes(
input_messages=self.input_messages,
output_messages=self.output_messages,
system_instruction=self.system_instruction,
tool_definitions=self.tool_definitions,
for_span=for_span,
)
return {
key: value for key, value in optional_attrs if value is not None
}

def _get_finish_reasons(self) -> list[str] | None:
if self.finish_reasons is not None:
Expand Down Expand Up @@ -200,6 +179,14 @@ def _get_attributes(self) -> dict[str, Any]:
(GenAI.GEN_AI_RESPONSE_ID, self.response_id),
(GenAI.GEN_AI_USAGE_INPUT_TOKENS, self.input_tokens),
(GenAI.GEN_AI_USAGE_OUTPUT_TOKENS, self.output_tokens),
(
_GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS,
self.cache_creation_input_tokens,
),
(
_GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS,
self.cache_read_input_tokens,
),
)
attrs.update({k: v for k, v in optional_attrs if v is not None})
return attrs
Expand Down
Loading
Loading