Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/nitpick-exceptions.ini
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ py-class=
fastapi.applications.FastAPI
starlette.applications.Starlette
_contextvars.Token
opentelemetry.util.genai.types._BaseAgent

any=
; API
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
OpenTelemetry GenAI Utils — create_agent Demo
===============================================

This example shows how to combine ``VertexAIInstrumentor`` (automatic
instrumentation of Vertex AI SDK calls) with manual
``TelemetryHandler.create_agent()`` spans from ``opentelemetry-util-genai``
to extend existing instrumentation with agent lifecycle spans.

The demo performs the following steps:

1. **LLM call** — generates content via the ``google-genai`` SDK (API key).
Vertex AI SDK calls are auto-instrumented by ``VertexAIInstrumentor``.
2. **Local agent creation** — creates a ``LanggraphAgent`` wrapped in a
``create_agent`` span.
3. **Deploy to Agent Engine** — deploys the agent to Vertex AI Agent Engine,
also wrapped in a ``create_agent`` span.
4. **Query the remote agent** — sends a currency exchange query.
5. **Cleanup** — deletes the deployed agent.

Prerequisites
-------------

- A GCP project with the **Vertex AI API** enabled.
- **Application Default Credentials** (ADC) configured:
``gcloud auth application-default login``
- A **Google API key** for LLM calls (set as ``GOOGLE_API_KEY``).
- A **GCS staging bucket** for agent deployment:

::

gsutil mb -p <PROJECT_ID> -l us-central1 -b on gs://<PROJECT_ID>-agent-staging/
gsutil pap set enforced gs://<PROJECT_ID>-agent-staging/

Setup
-----

An OTLP compatible endpoint should be listening for traces and logs on
http://localhost:4317. If not, update ``OTEL_EXPORTER_OTLP_ENDPOINT``.

Set up a virtual environment:

::

python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
pip install -e ../../../../util/opentelemetry-util-genai/

Run
---

::

export GOOGLE_API_KEY="AIza..."
export GCP_PROJECT="your-project-id"
python main.py

The deploy step takes 3-5 minutes. You should see two ``create_agent``
spans printed to the console and exported to your OTLP endpoint (one for
local agent creation, one for the deploy), along with the agent's response
to the currency exchange query.

Environment Variables
---------------------

- ``GOOGLE_API_KEY`` — API key for ``google-genai`` SDK LLM calls.
- ``GCP_PROJECT`` — GCP project ID (default: ``gcp-o11yinframon-nprd-81065``).
- ``GCP_LOCATION`` — GCP region (default: ``us-central1``).
- ``GCP_STAGING_BUCKET`` — GCS bucket for agent staging
(default: ``gs://<GCP_PROJECT>-agent-staging``).
- ``OTEL_EXPORTER_OTLP_ENDPOINT`` — OTLP endpoint
(default: ``http://localhost:4317``).
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
# pylint: skip-file
"""
Vertex AI Agent Engine — create_agent instrumentation demo.

Combines automatic instrumentation (``VertexAIInstrumentor``) with
manual ``TelemetryHandler.create_agent()`` spans from
``opentelemetry-util-genai`` to show how genai-utils extends the
existing Vertex AI instrumentation with agent lifecycle spans.

Also demonstrates deploying an agent to Vertex AI Agent Engine
via ``agent_engines.create()`` (requires GCP project + ADC).

Set environment variables before running:

export GOOGLE_API_KEY="AIza..."
export GCP_PROJECT="your-project-id"
python main.py
"""

import os

import vertexai
from google import genai
from vertexai import agent_engines

# NOTE: OpenTelemetry Python Logs and Events APIs are in beta
from opentelemetry import _logs, trace
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
OTLPLogExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
from opentelemetry.instrumentation.vertexai import VertexAIInstrumentor
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
ConsoleSpanExporter,
)
from opentelemetry.util.genai.handler import TelemetryHandler
from opentelemetry.util.genai.types import AgentCreation

OTLP_ENDPOINT = os.environ.get(
"OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317"
)
GCP_PROJECT = os.environ.get("GCP_PROJECT", "gcp-o11yinframon-nprd-81065")
GCP_LOCATION = os.environ.get("GCP_LOCATION", "us-central1")
MODEL = "gemini-2.5-flash"
AGENT_NAME = "Currency Exchange Agent"

# configure tracing
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(ConsoleSpanExporter())
)
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint=OTLP_ENDPOINT, insecure=True))
)

# configure logging and events
_logs.set_logger_provider(LoggerProvider())
_logs.get_logger_provider().add_log_record_processor(
BatchLogRecordProcessor(
OTLPLogExporter(endpoint=OTLP_ENDPOINT, insecure=True)
)
)

# Auto-instrument Vertex AI SDK calls (generate_content, etc.)
VertexAIInstrumentor().instrument()


def get_exchange_rate(
currency_from: str = "USD",
currency_to: str = "EUR",
currency_date: str = "latest",
) -> dict:
"""Retrieves the exchange rate between two currencies on a specified date."""
import requests # noqa: PLC0415

response = requests.get(
f"https://api.frankfurter.app/{currency_date}",
params={"from": currency_from, "to": currency_to},
)
return response.json()


def main():
# Initialize Vertex AI (needed for LanggraphAgent)
staging_bucket = os.environ.get(
"GCP_STAGING_BUCKET",
f"gs://{GCP_PROJECT}-agent-staging",
)
vertexai.init(
project=GCP_PROJECT,
location=GCP_LOCATION,
staging_bucket=staging_bucket,
)
# API-key client for LLM calls (uses GOOGLE_API_KEY env var)
client = genai.Client()
handler = TelemetryHandler()

# ----- Step 1: LLM call via google-genai SDK -----
print("[LLM call] Generating content...")
response = client.models.generate_content(
model=MODEL,
contents="Write a short poem on OpenTelemetry.",
)
print(f"[LLM call] Response:\n{response.text}\n")

# ----- Step 2: Create local agent (create_agent span) -----
with handler.create_agent(
AgentCreation(
name=AGENT_NAME,
provider="gcp_vertex_ai",
request_model=MODEL,
description="Local LanggraphAgent for currency exchange",
server_address=f"{GCP_LOCATION}-aiplatform.googleapis.com",
)
) as _creation:
agent = agent_engines.LanggraphAgent(
model=MODEL,
tools=[get_exchange_rate],
model_kwargs={
"temperature": 0.28,
"max_output_tokens": 1000,
"top_p": 0.95,
},
)
print(f"[create_agent] Local agent created: {AGENT_NAME}")

# ----- Step 3: Deploy to Vertex AI Agent Engine (create_agent span) -----
print("\n[deploy] Deploying agent to Vertex AI Agent Engine...")
print(" (this may take several minutes...)")
with handler.create_agent(
AgentCreation(
name=AGENT_NAME,
provider="gcp_vertex_ai",
request_model=MODEL,
description="Deploying agent to Vertex AI Agent Engine",
server_address=f"{GCP_LOCATION}-aiplatform.googleapis.com",
)
) as _deploy_creation:
remote_agent = agent_engines.create(
agent_engine=agent,
display_name=AGENT_NAME,
requirements=[
"google-cloud-aiplatform[agent_engines,langchain]",
"langchain-google-vertexai",
],
)
print(f"[deploy] Remote agent deployed: {remote_agent.resource_name}")

# ----- Step 4: Query the remote agent -----
try:
print("\n[query] Querying remote agent...")
query_response = remote_agent.query(
input={
"messages": [
(
"user",
"What is the exchange rate from US dollars to SEK today?",
),
]
}
)
print(f"[query] Response: {query_response}\n")
except Exception as exc:
print(f"\n[query] Failed — {type(exc).__name__}: {exc}\n")
finally:
# ----- Cleanup: delete the deployed agent -----
print("[cleanup] Deleting remote agent...")
remote_agent.delete(force=True)
print("[cleanup] Done.")


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
google-cloud-aiplatform[agent_engines,langchain]>=1.64
google-genai>=1.0
langchain-google-vertexai
requests

opentelemetry-sdk>=1.29
opentelemetry-exporter-otlp-proto-grpc>=1.29
opentelemetry-instrumentation-vertexai>=2.0b0
opentelemetry-util-genai>=0.4b0.dev0
3 changes: 3 additions & 0 deletions util/opentelemetry-util-genai/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Add `_BaseAgent` shared base class and `AgentCreation` type for agent creation lifecycle spans
([#4217](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4217))

## Version 0.3b0 (2026-02-20)

- Add `gen_ai.tool_definitions` to completion hook ([#4181](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4181))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,16 @@
)
from opentelemetry.util.genai.metrics import InvocationMetricsRecorder
from opentelemetry.util.genai.span_utils import (
_apply_creation_finish_attributes,
_apply_error_attributes,
_apply_llm_finish_attributes,
_maybe_emit_llm_event,
)
from opentelemetry.util.genai.types import Error, LLMInvocation
from opentelemetry.util.genai.types import (
AgentCreation,
Error,
LLMInvocation,
)
from opentelemetry.util.genai.version import __version__


Expand Down Expand Up @@ -208,6 +213,70 @@ def llm(
raise
self.stop_llm(invocation)

# ---- Agent lifecycle ----

def start_agent(
self,
agent: AgentCreation,
) -> AgentCreation:
"""Start an agent operation (create or invoke) and create a pending span entry."""
span_name = f"{agent.operation_name} {agent.name}".strip()
span = self._tracer.start_span(
name=span_name,
kind=SpanKind.CLIENT,
)
agent.monotonic_start_s = timeit.default_timer()
agent.span = span
agent.context_token = otel_context.attach(set_span_in_context(span))
return agent

def stop_agent(self, agent: AgentCreation) -> AgentCreation: # pylint: disable=no-self-use
"""Finalize an agent operation successfully and end its span."""
if agent.context_token is None or agent.span is None:
return agent

span = agent.span
_apply_creation_finish_attributes(span, agent)
otel_context.detach(agent.context_token)
span.end()
return agent

def fail_agent( # pylint: disable=no-self-use
self, agent: AgentCreation, error: Error
) -> AgentCreation:
"""Fail an agent operation and end its span with error status."""
if agent.context_token is None or agent.span is None:
return agent

span = agent.span
_apply_creation_finish_attributes(span, agent)
_apply_error_attributes(span, error)
otel_context.detach(agent.context_token)
span.end()
return agent

@contextmanager
def create_agent(
self, creation: AgentCreation | None = None
) -> Iterator[AgentCreation]:
"""Context manager for agent creation.

Only set data attributes on the creation object, do not modify the span or context.

Starts the span on entry. On normal exit, finalizes the creation and ends the span.
If an exception occurs inside the context, marks the span as error, ends it, and
re-raises the original exception.
"""
if creation is None:
creation = AgentCreation()
self.start_agent(creation)
try:
yield creation
except Exception as exc:
self.fail_agent(creation, Error(message=str(exc), type=type(exc)))
raise
self.stop_agent(creation)


def get_telemetry_handler(
tracer_provider: TracerProvider | None = None,
Expand Down
Loading