Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,40 @@ async for event in agent.query_stream("do something"):
print(f"Done: {text}")
```

### Observability with Langfuse

Optional tracing and observability with [Langfuse](https://langfuse.com):

```bash
# Install with Langfuse support
uv add bu-agent-sdk[langfuse]

# Set environment variables
export LANGFUSE_SECRET_KEY=sk-...
export LANGFUSE_PUBLIC_KEY=pk-...
```

```python
from bu_agent_sdk import observe_langfuse, is_langfuse_available, flush_langfuse

@observe_langfuse(name="my_tool", as_type="span")
async def my_tool(query: str) -> str:
return await process(query)

@observe_langfuse(name="llm_call", as_type="generation")
async def call_llm(prompt: str) -> str:
return await llm.generate(prompt)

# Check if Langfuse is available
if is_langfuse_available():
print("Langfuse tracing enabled")

# Flush traces before exit
flush_langfuse()
```

Alternative: Use [Laminar](https://www.lmnr.ai/) with `uv add bu-agent-sdk[observability]` and the `@observe` decorator.

## Claude Code in 100 Lines

A sandboxed coding assistant with dependency injection:
Expand Down
14 changes: 14 additions & 0 deletions bu_agent_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,24 @@ async def add(a: int, b: int) -> int:

from bu_agent_sdk.agent import Agent
from bu_agent_sdk.observability import Laminar, observe, observe_debug
from bu_agent_sdk.langfuse_observability import (
LangfuseClient,
observe_langfuse,
observe_langfuse_debug,
is_langfuse_available,
flush_langfuse,
)

__all__ = [
"Agent",
# Laminar observability
"Laminar",
"observe",
"observe_debug",
# Langfuse observability
"LangfuseClient",
"observe_langfuse",
"observe_langfuse_debug",
"is_langfuse_available",
"flush_langfuse",
]
322 changes: 322 additions & 0 deletions bu_agent_sdk/langfuse_observability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,322 @@
"""
Observability module for bu_agent_sdk with optional Langfuse integration.

This module provides:
- `observe_langfuse` decorator for tracing functions
- `observe_langfuse_debug` decorator that only traces in debug mode
- `LangfuseClient` wrapper for Langfuse initialization

If langfuse is not installed, all decorators become no-ops.

Usage:
from bu_agent_sdk.langfuse_observability import observe_langfuse, LangfuseClient

@observe_langfuse(name="my_function")
async def my_function():
...

# With async generators
@observe_langfuse(name="stream_response")
async def stream_response():
for chunk in get_chunks():
yield chunk

Environment Variables:
LANGFUSE_SECRET_KEY: Your Langfuse secret key
LANGFUSE_PUBLIC_KEY: Your Langfuse public key
LANGFUSE_HOST: Optional Langfuse host URL (default: https://cloud.langfuse.com)
"""

import logging
import os
from collections.abc import Callable
from functools import wraps
from typing import Any, Literal, TypeVar, cast

logger = logging.getLogger(__name__)

# Type definitions
F = TypeVar("F", bound=Callable[..., Any])

_LANGFUSE_AVAILABLE = False
_langfuse_observe = None
LangfuseClient = None

# Try to import langfuse
try:
from langfuse import Langfuse as LangfuseClient # type: ignore
from langfuse import observe as _langfuse_observe # type: ignore

_LANGFUSE_AVAILABLE = True
logger.debug("Langfuse is available for observability")
except ImportError:
logger.debug(
"Langfuse not installed - observability decorators will be no-ops"
)
_LANGFUSE_AVAILABLE = False


def _is_debug_mode() -> bool:
"""Check if we're in debug mode based on environment variables."""
langfuse_debug_mode = os.getenv("LANGFUSE_DEBUG", "").lower()
if langfuse_debug_mode in ("1", "true", "yes", "debug"):
return True
# Also check BU_DEBUG for convenience
if os.getenv("BU_DEBUG", "").lower() in ("1", "true", "yes"):
return True
return False


def _create_no_op_decorator(
name: str | None = None,
capture_input: bool = True,
capture_output: bool = True,
as_type: Literal["span", "generation"] | None = None,
**kwargs: Any,
) -> Callable[[F], F]:
"""Create a no-op decorator that accepts langfuse observe parameters but does nothing."""
import asyncio
import inspect

def decorator(func: F) -> F:
# Check for async generators first (async def with yield)
if inspect.isasyncgenfunction(func):

@wraps(func)
async def async_gen_wrapper(*args, **kwargs):
async for item in func(*args, **kwargs):
yield item

return cast(F, async_gen_wrapper)
elif asyncio.iscoroutinefunction(func):

@wraps(func)
async def async_wrapper(*args, **kwargs):
return await func(*args, **kwargs)

return cast(F, async_wrapper)
else:

@wraps(func)
def sync_wrapper(*args, **kwargs):
return func(*args, **kwargs)

return cast(F, sync_wrapper)

return decorator


def _create_async_gen_observe_decorator(
name: str | None = None,
capture_input: bool = True,
capture_output: bool = True,
as_type: Literal["span", "generation"] | None = None,
**kwargs: Any,
) -> Callable[[F], F]:
"""Create a decorator for async generators that wraps them in a Langfuse span."""

def decorator(func: F) -> F:
@wraps(func)
async def async_gen_wrapper(*args, **inner_kwargs):
span_name = name or func.__name__

# For async generators, we use context manager approach
if LangfuseClient is not None:
try:
from langfuse import get_client
langfuse = get_client()

with langfuse.start_as_current_observation(
name=span_name,
as_type=as_type or "span",
input={"args": str(args)[:500], "kwargs": str(inner_kwargs)[:500]} if capture_input else None,
) as span:
try:
async for item in func(*args, **inner_kwargs):
yield item
except Exception as e:
if capture_output:
span.update(output={"error": str(e)})
raise
except Exception:
# If Langfuse context fails, just run the function
async for item in func(*args, **inner_kwargs):
yield item
else:
async for item in func(*args, **inner_kwargs):
yield item

return cast(F, async_gen_wrapper)

return decorator


def observe_langfuse(
name: str | None = None,
capture_input: bool = True,
capture_output: bool = True,
as_type: Literal["span", "generation"] | None = None,
**kwargs: Any,
) -> Callable[[F], F]:
"""
Observability decorator that traces function execution when langfuse is available.

This decorator will use langfuse's observe decorator if langfuse is installed,
otherwise it will be a no-op.

Args:
name: Name of the span/trace
capture_input: Whether to capture function input parameters in tracing
capture_output: Whether to capture function output in tracing
as_type: Type of observation ('span' or 'generation')
**kwargs: Additional parameters passed to langfuse observe

Example:
@observe_langfuse(name="my_function")
async def my_function(param1, param2):
return param1 + param2

@observe_langfuse(name="llm_call", as_type="generation")
async def call_llm(prompt: str):
return await llm.generate(prompt)
"""
import inspect

decorator_kwargs = {
"name": name,
"capture_input": capture_input,
"capture_output": capture_output,
"as_type": as_type,
**kwargs,
}

def decorator(func: F) -> F:
# Async generators need special handling - use manual span wrapper
if inspect.isasyncgenfunction(func):
if _LANGFUSE_AVAILABLE and LangfuseClient is not None:
return _create_async_gen_observe_decorator(**decorator_kwargs)(func)
else:
return _create_no_op_decorator(**decorator_kwargs)(func)

if _LANGFUSE_AVAILABLE and _langfuse_observe:
# Map our parameters to langfuse's observe decorator parameters
langfuse_kwargs = {}
if name:
langfuse_kwargs["name"] = name
if not capture_input:
langfuse_kwargs["capture_input"] = False
if not capture_output:
langfuse_kwargs["capture_output"] = False
if as_type:
langfuse_kwargs["as_type"] = as_type
# Pass any additional kwargs
langfuse_kwargs.update(kwargs)

return cast(F, _langfuse_observe(**langfuse_kwargs)(func))
else:
return _create_no_op_decorator(**decorator_kwargs)(func)

return decorator


def observe_langfuse_debug(
name: str | None = None,
capture_input: bool = True,
capture_output: bool = True,
as_type: Literal["span", "generation"] | None = None,
**kwargs: Any,
) -> Callable[[F], F]:
"""
Debug-only observability decorator that only traces when in debug mode.

This decorator will use langfuse's observe decorator if both langfuse is installed
AND we're in debug mode, otherwise it will be a no-op.

Debug mode is enabled by:
- LANGFUSE_DEBUG=1/true/yes/debug
- BU_DEBUG=1/true/yes

Args:
name: Name of the span/trace
capture_input: Whether to capture function input parameters in tracing
capture_output: Whether to capture function output in tracing
as_type: Type of observation ('span' or 'generation')
**kwargs: Additional parameters passed to langfuse observe

Example:
@observe_langfuse_debug(name="debug_function")
async def debug_function():
...
"""
import inspect

decorator_kwargs = {
"name": name,
"capture_input": capture_input,
"capture_output": capture_output,
"as_type": as_type,
**kwargs,
}

def decorator(func: F) -> F:
# Async generators need special handling - use manual span wrapper
if inspect.isasyncgenfunction(func):
if _LANGFUSE_AVAILABLE and LangfuseClient is not None and _is_debug_mode():
return _create_async_gen_observe_decorator(**decorator_kwargs)(func)
else:
return _create_no_op_decorator(**decorator_kwargs)(func)

if _LANGFUSE_AVAILABLE and _langfuse_observe and _is_debug_mode():
# Map our parameters to langfuse's observe decorator parameters
langfuse_kwargs = {}
if name:
langfuse_kwargs["name"] = name
if not capture_input:
langfuse_kwargs["capture_input"] = False
if not capture_output:
langfuse_kwargs["capture_output"] = False
if as_type:
langfuse_kwargs["as_type"] = as_type
langfuse_kwargs.update(kwargs)

return cast(F, _langfuse_observe(**langfuse_kwargs)(func))
else:
return _create_no_op_decorator(**decorator_kwargs)(func)

return decorator


# Convenience functions
def is_langfuse_available() -> bool:
"""Check if langfuse is available for tracing."""
return _LANGFUSE_AVAILABLE


def is_debug_mode() -> bool:
"""Check if we're currently in debug mode."""
return _is_debug_mode()


def get_langfuse_status() -> dict[str, bool]:
"""Get the current status of Langfuse observability features."""
return {
"langfuse_available": _LANGFUSE_AVAILABLE,
"debug_mode": _is_debug_mode(),
"observe_active": _LANGFUSE_AVAILABLE,
"observe_debug_active": _LANGFUSE_AVAILABLE and _is_debug_mode(),
}


def flush_langfuse() -> None:
"""Flush any pending Langfuse traces.

Call this before your application exits to ensure all traces are sent.
"""
if _LANGFUSE_AVAILABLE and LangfuseClient is not None:
try:
from langfuse import get_client
client = get_client()
client.flush()
logger.debug("Langfuse traces flushed successfully")
except Exception as e:
logger.warning(f"Failed to flush Langfuse traces: {e}")
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ anthropic = ["anthropic>=0.40.0"]
openai = ["openai>=1.50.0"]
google = ["google-genai>=1.0.0"]
observability = ["lmnr>=0.4.0"]
langfuse = ["langfuse>=2.0.0"]

[project.urls]
Homepage = "https://github.com/browser-use/bu-agent-sdk"
Expand Down