Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 45 additions & 31 deletions netra/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import atexit
import logging
import threading
from typing import Any, Dict, List, Optional, Set
from typing import AbstractSet, Any, Dict, List, Optional

from opentelemetry import context as context_api
from opentelemetry import metrics as otel_metrics
Expand All @@ -12,7 +12,11 @@
from netra.dashboard import Dashboard
from netra.evaluation import Evaluation
from netra.instrumentation import init_instrumentations
from netra.instrumentation.instruments import DEFAULT_INSTRUMENTS_FOR_ROOT, NetraInstruments
from netra.instrumentation.instruments import (
DEFAULT_INSTRUMENTS,
DEFAULT_INSTRUMENTS_FOR_ROOT,
NetraInstruments,
)
from netra.logging_utils import configure_package_logging
from netra.meter import MetricsSetup
from netra.meter import get_meter as _get_meter
Expand All @@ -22,13 +26,7 @@
from netra.span_wrapper import ActionModel, SpanType, SpanWrapper, UsageModel
from netra.tracer import Tracer
from netra.usage import Usage

__all__ = [
"Netra",
"UsageModel",
"ActionModel",
"Prompts",
]
from netra.utils import resolve_root_instruments

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -70,12 +68,12 @@ def init(
environment: Optional[str] = None,
enable_scrubbing: Optional[bool] = None,
blocked_spans: Optional[List[str]] = None,
instruments: Optional[Set[NetraInstruments]] = None,
block_instruments: Optional[Set[NetraInstruments]] = None,
instruments: Optional[AbstractSet[NetraInstruments]] = None,
block_instruments: Optional[AbstractSet[NetraInstruments]] = None,
enable_metrics: Optional[bool] = None,
metrics_export_interval_ms: Optional[int] = None,
export_auto_metrics: Optional[bool] = None,
root_instruments: Optional[Set[NetraInstruments]] = None,
root_instruments: Optional[AbstractSet[NetraInstruments]] = None,
) -> None:
"""
Thread-safe initialization of Netra.
Expand All @@ -91,17 +89,25 @@ def init(
environment: Environment to be sent to the server
enable_scrubbing: Whether to enable scrubbing
blocked_spans: List of spans to be blocked
instruments: Set of instruments to be enabled
block_instruments: Set of instruments to be blocked
instruments: Set of instruments to be enabled for non-root spans.
Defaults to ``DEFAULT_INSTRUMENTS`` when ``None``. Pass a set
containing ``NetraInstruments.ALL`` to enable every
instrumentation available in the user's environment (legacy
behaviour).
block_instruments: Set of instruments to be blocked. Defaults to
``None`` (no instruments blocked). Applied to both
``instruments`` (non-root) and ``root_instruments``
independently. Pass a set containing ``NetraInstruments.ALL``
to block every instrumentation.
enable_metrics: Whether to enable OTLP custom metrics export (default: False)
metrics_export_interval_ms: Metrics push interval in milliseconds (default: 60000)
export_auto_metrics: Whether to export OTel auto-instrumented metrics (default: False)
root_instruments: Set of instruments allowed to produce root-level
spans. When a root span is blocked, its entire subtree is
discarded. Resolution priority:
1. Explicit ``root_instruments`` value if provided.
2. The ``instruments`` value if provided (but ``root_instruments`` is not).
3. ``DEFAULT_INSTRUMENTS_FOR_ROOT`` if neither is provided.
spans. Independent of ``instruments``. Defaults to
``DEFAULT_INSTRUMENTS_FOR_ROOT`` when ``None``. When a root
span is blocked, its entire subtree is discarded. Pass a set
containing ``NetraInstruments.ALL`` to allow all
instrumentations to produce root spans (legacy behaviour).

Returns:
None
Expand Down Expand Up @@ -131,14 +137,12 @@ def init(
# Configure logging based on debug mode
configure_package_logging(debug_mode=cfg.debug_mode)

# Resolve root_instruments → set of instrumentation-name strings.
resolved_root: Optional[Set[str]] = None
if root_instruments is not None:
resolved_root = {m.value for m in root_instruments}
elif instruments is not None:
resolved_root = {m.value for m in instruments}
else:
resolved_root = {m.value for m in DEFAULT_INSTRUMENTS_FOR_ROOT}
effective_instruments = instruments if instruments is not None else DEFAULT_INSTRUMENTS

resolved_root = resolve_root_instruments(
root_instruments=root_instruments,
block_instruments=block_instruments,
)

# Initialize tracer (OTLP exporter, span processor, resource)
Tracer(cfg, root_instrument_names=resolved_root)
Expand Down Expand Up @@ -190,7 +194,7 @@ def init(
init_instrumentations(
should_enrich_metrics=True,
base64_image_uploader=None,
instruments=instruments,
instruments=effective_instruments,
block_instruments=block_instruments,
)

Expand All @@ -216,8 +220,8 @@ def init(
pass
logger.info("Netra root span created and attached to context.")

# Ensure cleanup at process exit
atexit.register(cls.shutdown)
# Ensure cleanup at process exit
atexit.register(cls.shutdown)

@classmethod
def shutdown(cls) -> None:
Expand Down Expand Up @@ -459,4 +463,14 @@ def get_trace_id(cls) -> Optional[str]:
return SessionManager.get_trace_id()


__all__ = ["Netra", "UsageModel", "ActionModel", "SpanType", "EvaluationScore", "Prompts", "ConversationType"]
__all__ = [
"Netra",
"UsageModel",
"ActionModel",
"SpanType",
"Prompts",
"ConversationType",
"NetraInstruments",
"DEFAULT_INSTRUMENTS",
"DEFAULT_INSTRUMENTS_FOR_ROOT",
]
177 changes: 123 additions & 54 deletions netra/instrumentation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,80 +2,149 @@
import os
from contextlib import redirect_stderr, redirect_stdout
from io import StringIO
from typing import Any, Callable, Optional, Set
from typing import AbstractSet, Any, Callable, Optional

from traceloop.sdk import Instruments, Telemetry
from traceloop.sdk.utils.package_check import is_package_installed

from netra.instrumentation.instruments import DEFAULT_INSTRUMENTS, CustomInstruments, NetraInstruments
from netra.instrumentation.instruments import DEFAULT_INSTRUMENTS, CustomInstruments, InstrumentSet, NetraInstruments

_ALL_SENTINEL = InstrumentSet.ALL

# Traceloop instrumentors that Netra always replaces with its own custom
# implementations. These are unconditionally added to the traceloop block
# list regardless of what the user passes.
_TRACELOOP_ALWAYS_BLOCKED: frozenset[Instruments] = frozenset(
{
Instruments.WEAVIATE,
Instruments.QDRANT,
Instruments.GOOGLE_GENERATIVEAI,
Instruments.MISTRAL,
Instruments.OPENAI,
Instruments.GROQ,
Instruments.REDIS,
Instruments.PYMYSQL,
Instruments.REQUESTS,
}
)


def _contains_all(instrument_set: Optional[AbstractSet[NetraInstruments]]) -> bool:
"""Return ``True`` if the set contains the ``InstrumentSet.ALL`` sentinel.

Args:
instrument_set: The set of instruments to check.

Returns:
True if the set contains ``InstrumentSet.ALL``, False otherwise.
"""
return instrument_set is not None and _ALL_SENTINEL in instrument_set


def _classify(
members: AbstractSet[NetraInstruments],
) -> tuple[set[Instruments], set[CustomInstruments]]:
"""Partition ``InstrumentSet`` members into traceloop and custom buckets.

Each member's ``origin`` attribute determines whether it maps to a
``traceloop.sdk.Instruments`` member or a ``CustomInstruments`` member.
The ``ALL`` sentinel is silently skipped.

Args:
members: Set of ``InstrumentSet`` members to classify.

Returns:
A ``(traceloop, custom)`` tuple of concrete enum member sets.
"""
traceloop: set[Instruments] = set()
custom: set[CustomInstruments] = set()
for instrument in members:
if instrument is _ALL_SENTINEL:
continue
if instrument.origin == CustomInstruments:
member = getattr(CustomInstruments, instrument.name, None)
if member is not None:
custom.add(member)
else:
logging.warning("Unknown custom instrument: %s", instrument.name)
else:
member = getattr(Instruments, instrument.name, None)
if member is not None:
traceloop.add(member)
else:
logging.warning("Unknown traceloop instrument: %s", instrument.name)
return traceloop, custom


def init_instrumentations(
should_enrich_metrics: bool,
base64_image_uploader: Optional[Callable[[str, str, str], str]],
instruments: Optional[Set[NetraInstruments]] = None,
block_instruments: Optional[Set[NetraInstruments]] = None,
instruments: Optional[AbstractSet[NetraInstruments]] = None,
block_instruments: Optional[AbstractSet[NetraInstruments]] = None,
) -> None:
"""Initialize all requested instrumentations.

When ``InstrumentSet.ALL`` is present in *instruments*, the curated
default list is bypassed and every instrumentation available in the
user's environment is enabled — matching the legacy behaviour before
default instrument lists were introduced.

Args:
should_enrich_metrics: Whether to enrich metrics.
base64_image_uploader: Optional callback for image uploads.
instruments: Set of instruments to enable. ``None`` falls back to the
curated default; a set containing ``InstrumentSet.ALL`` enables
everything available.
block_instruments: Set of instruments to explicitly block. A set
containing ``InstrumentSet.ALL`` blocks every instrumentation.
"""
from traceloop.sdk.tracing.tracing import init_instrumentations

# When the user does not pass instruments, fall back to the curated default set.
resolved_instruments = instruments if instruments is not None else DEFAULT_INSTRUMENTS

traceloop_instruments = set()
traceloop_block_instruments = set()
netra_custom_instruments = set()
netra_custom_block_instruments = set()
if resolved_instruments:
for instrument in resolved_instruments:
if instrument.origin == CustomInstruments: # type: ignore[attr-defined]
netra_custom_instruments.add(getattr(CustomInstruments, instrument.name))
else:
traceloop_instruments.add(getattr(Instruments, instrument.name))
if block_instruments:
for instrument in block_instruments:
if instrument.origin == CustomInstruments: # type: ignore[attr-defined]
netra_custom_block_instruments.add(getattr(CustomInstruments, instrument.name))
else:
traceloop_block_instruments.add(getattr(Instruments, instrument.name))

# If no instruments in traceloop are provided for instrumentation
if resolved_instruments and not traceloop_instruments and not traceloop_block_instruments:
traceloop_block_instruments = set(Instruments)

# If no custom instruments in netra are provided for instrumentation
if resolved_instruments and not netra_custom_instruments and not netra_custom_block_instruments:
netra_custom_block_instruments = set(CustomInstruments)

netra_custom_instruments = netra_custom_instruments - netra_custom_block_instruments
traceloop_instruments = traceloop_instruments - traceloop_block_instruments
if not traceloop_instruments:
traceloop_instruments = None # type:ignore[assignment]

traceloop_block_instruments.update(
{
Instruments.WEAVIATE,
Instruments.QDRANT,
Instruments.GOOGLE_GENERATIVEAI,
Instruments.MISTRAL,
Instruments.OPENAI,
Instruments.GROQ,
Instruments.REDIS,
Instruments.PYMYSQL,
Instruments.REQUESTS,
}
)
enable_all = _contains_all(instruments)
block_all = _contains_all(block_instruments)

resolved = None if enable_all else (instruments or DEFAULT_INSTRUMENTS)

# Classify user-requested instruments into traceloop / custom buckets
if resolved is not None:
traceloop_instruments, netra_custom_instruments = _classify(resolved)
else:
traceloop_instruments, netra_custom_instruments = set(), set()

# Classify blocked instruments
if block_all:
traceloop_block, netra_custom_block = set(Instruments), set(CustomInstruments)
elif block_instruments:
traceloop_block, netra_custom_block = _classify(block_instruments)
else:
traceloop_block, netra_custom_block = set(), set()

# When user specified instruments but none mapped to a category,
# block that entire category to avoid silently enabling everything.
if resolved is not None:
if not traceloop_instruments and not traceloop_block:
traceloop_block = set(Instruments)
if not netra_custom_instruments and not netra_custom_block:
netra_custom_block = set(CustomInstruments)

# Apply blocking and prepare traceloop arguments
traceloop_instruments -= traceloop_block
traceloop_block |= _TRACELOOP_ALWAYS_BLOCKED

os.environ["TRACELOOP_TELEMETRY"] = "false"
with redirect_stdout(StringIO()), redirect_stderr(StringIO()):
init_instrumentations(
should_enrich_metrics=should_enrich_metrics,
base64_image_uploader=base64_image_uploader,
instruments=traceloop_instruments,
block_instruments=traceloop_block_instruments,
instruments=traceloop_instruments or None,
block_instruments=traceloop_block,
)

netra_custom_instruments = netra_custom_instruments or set(CustomInstruments)
netra_custom_instruments = netra_custom_instruments - netra_custom_block_instruments
# When ALL is requested, default to every custom instrument if none
# were explicitly classified.
if enable_all:
netra_custom_instruments = netra_custom_instruments or set(CustomInstruments)
netra_custom_instruments -= netra_custom_block

# Initialize Groq instrumentation.
if CustomInstruments.GROQ in netra_custom_instruments:
Expand Down
Loading