Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/uipath/_cli/_evals/_evaluate.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from uipath.core.tracing import UiPathTraceManager
from uipath.runtime import (
UiPathRuntimeFactoryProtocol,
Expand All @@ -6,19 +8,22 @@

from uipath._cli._evals._runtime import UiPathEvalContext, UiPathEvalRuntime
from uipath._events._event_bus import EventBus
from uipath.tracing import LlmOpsHttpExporter


async def evaluate(
runtime_factory: UiPathRuntimeFactoryProtocol,
trace_manager: UiPathTraceManager,
eval_context: UiPathEvalContext,
event_bus: EventBus,
live_tracking_exporter: LlmOpsHttpExporter,
) -> UiPathRuntimeResult:
async with UiPathEvalRuntime(
factory=runtime_factory,
context=eval_context,
trace_manager=trace_manager,
event_bus=event_bus,
live_tracking_exporter=live_tracking_exporter,
) as eval_runtime:
results = await eval_runtime.execute()
await event_bus.wait_for_all(timeout=10)
Expand Down
371 changes: 32 additions & 339 deletions src/uipath/_cli/_evals/_progress_reporter.py

Large diffs are not rendered by default.

147 changes: 89 additions & 58 deletions src/uipath/_cli/_evals/_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@
from time import time
from typing import (
Any,
Awaitable,
Iterable,
Iterator,
Protocol,
Sequence,
Tuple,
runtime_checkable,
)

Expand Down Expand Up @@ -185,41 +182,79 @@ def __init__(
)

def _upsert_span_async(
self, span: Span | ReadableSpan, status_override: int | None = None
self,
span: Span | ReadableSpan,
status_override: int | None = None,
wait: bool = False,
) -> None:
"""Run upsert_span in a background thread without blocking.
"""Run upsert_span in a background thread.

Submits the upsert task to the thread pool. For parent spans, waits for
completion to ensure correct ordering without blocking evaluation logic.

Submits the upsert task to the thread pool and returns immediately.
The thread pool handles execution with max_workers cap to prevent
resource exhaustion.
Args:
span: The span to upsert
status_override: Optional status to override (e.g., RUNNING)
wait: If True, blocks until upsert completes (for parent spans)
"""

def _upsert():
try:
span_type = (
span.attributes.get("span_type") if span.attributes else "unknown"
)
span_name = span.name if hasattr(span, "name") else "unknown"
exporter_trace_id = getattr(self.exporter, "trace_id", None)
logger.debug(
f"[TraceID] Upserting span '{span_name}' (type={span_type}) "
f"with exporter.trace_id = {exporter_trace_id}"
)
if status_override:
self.exporter.upsert_span(span, status_override=status_override)
else:
self.exporter.upsert_span(span)
except Exception as e:
logger.debug(f"Failed to upsert span: {e}")

# Submit to thread pool and return immediately (non-blocking)
# The timeout parameter is reserved for shutdown operations
self.executor.submit(_upsert)
# Submit to thread pool
future = self.executor.submit(_upsert)

# For parent spans, wait for completion to ensure ordering
if wait:
try:
future.result(timeout=5.0) # 5 second timeout
except Exception as e:
logger.debug(f"Failed to wait for span upsert: {e}")

def on_start(
self, span: Span, parent_context: context_api.Context | None = None
) -> None:
"""Called when span starts - upsert with RUNNING status (non-blocking)."""
"""Called when span starts - upsert with RUNNING status.

Parent spans (eval_set_run, evaluation) wait for upsert completion to
ensure correct ordering. Child spans are non-blocking for performance.
"""
# Only track evaluation-related spans
if span.attributes and self._is_eval_span(span):
self._upsert_span_async(span, status_override=self.span_status.RUNNING)
span_type = span.attributes.get("span_type")
# Wait for parent spans to ensure ordering
wait = span_type in ("eval_set_run", "evaluation")
self._upsert_span_async(
span, status_override=self.span_status.RUNNING, wait=wait
)

def on_end(self, span: ReadableSpan) -> None:
"""Called when span ends - upsert with final status (non-blocking)."""
"""Called when span ends - upsert with final status.

Parent spans (eval_set_run, evaluation) wait for upsert completion to
ensure correct ordering. Child spans are non-blocking for performance.
"""
# Only track evaluation-related spans
if span.attributes and self._is_eval_span(span):
self._upsert_span_async(span)
span_type = span.attributes.get("span_type")
# Wait for parent spans to ensure ordering
wait = span_type in ("eval_set_run", "evaluation")
self._upsert_span_async(span, wait=wait)

def _is_eval_span(self, span: Span | ReadableSpan) -> bool:
"""Check if span is evaluation-related."""
Expand Down Expand Up @@ -308,6 +343,7 @@ def __init__(
factory: UiPathRuntimeFactoryProtocol,
trace_manager: UiPathTraceManager,
event_bus: EventBus,
live_tracking_exporter: LlmOpsHttpExporter,
):
self.context: UiPathEvalContext = context
# Wrap the factory to support model settings overrides
Expand All @@ -323,8 +359,12 @@ def __init__(
self.trace_manager.tracer_provider.add_span_processor(span_processor)

# Live tracking processor for real-time span updates
live_tracking_exporter = LlmOpsHttpExporter()
live_tracking_processor = LiveTrackingSpanProcessor(live_tracking_exporter)
self.live_tracking_exporter = live_tracking_exporter
logger.info(
f"[TraceID] UiPathEvalRuntime initialized with live_tracking_exporter.trace_id = "
f"{getattr(live_tracking_exporter, 'trace_id', None)}"
)
live_tracking_processor = LiveTrackingSpanProcessor(self.live_tracking_exporter)
self.trace_manager.tracer_span_processors.append(live_tracking_processor)
self.trace_manager.tracer_provider.add_span_processor(live_tracking_processor)

Expand Down Expand Up @@ -369,23 +409,38 @@ def _mocker_cache(self) -> Iterator[None]:
cache_manager.flush()
cache_manager_context.set(None)

async def initiate_evaluation(
self,
runtime: UiPathRuntimeProtocol,
) -> Tuple[
EvaluationSet,
list[BaseEvaluator[Any, Any, Any]],
Iterable[Awaitable[EvaluationRunResult]],
]:
async def execute(self) -> UiPathRuntimeResult:
logger.info("=" * 80)
logger.info("EVAL RUNTIME: Starting evaluation execution")
logger.info(f"EVAL RUNTIME: Execution ID: {self.execution_id}")
logger.info(f"EVAL RUNTIME: Job ID: {self.context.job_id}")
logger.info(f"EVAL RUNTIME: Resume mode: {self.context.resume}")
if self.context.resume:
logger.info(
"🟢 EVAL RUNTIME: RESUME MODE ENABLED - Will resume from suspended state"
)
logger.info("=" * 80)

# Configure model settings override before creating runtime
await self._configure_model_settings_override()

runtime = await self.factory.new_runtime(
entrypoint=self.context.entrypoint or "",
runtime_id=self.execution_id,
)

# CRITICAL: Load eval set and publish CREATE_EVAL_SET_RUN event BEFORE creating any spans
# This ensures eval_set_run_id is created and trace_id is set on the exporter early
if self.context.eval_set is None:
raise ValueError("eval_set must be provided for evaluation runs")

# Load eval set (path is already resolved in cli_eval.py)
evaluation_set, _ = EvalHelpers.load_eval_set(
self.context.eval_set, self.context.eval_ids
)
evaluators = await self._load_evaluators(evaluation_set, runtime)

# Publish CREATE_EVAL_SET_RUN event and WAIT for it to complete
# This creates the eval_set_run_id in progress reporter and sets trace_id on exporter
await self.event_bus.publish(
EvaluationEvents.CREATE_EVAL_SET_RUN,
EvalSetRunCreatedEvent(
Expand All @@ -398,33 +453,9 @@ async def initiate_evaluation(
),
)

return (
evaluation_set,
evaluators,
(
self._execute_eval(eval_item, evaluators, runtime)
for eval_item in evaluation_set.evaluations
),
)

async def execute(self) -> UiPathRuntimeResult:
logger.info("=" * 80)
logger.info("EVAL RUNTIME: Starting evaluation execution")
logger.info(f"EVAL RUNTIME: Execution ID: {self.execution_id}")
logger.info(f"EVAL RUNTIME: Job ID: {self.context.job_id}")
logger.info(f"EVAL RUNTIME: Resume mode: {self.context.resume}")
if self.context.resume:
logger.info(
"🟢 EVAL RUNTIME: RESUME MODE ENABLED - Will resume from suspended state"
)
logger.info("=" * 80)

# Configure model settings override before creating runtime
await self._configure_model_settings_override()

runtime = await self.factory.new_runtime(
entrypoint=self.context.entrypoint or "",
runtime_id=self.execution_id,
logger.info(
f"[TraceID] After CREATE_EVAL_SET_RUN event, exporter.trace_id = "
f"{getattr(self.live_tracking_exporter, 'trace_id', None)}"
)
try:
with self._mocker_cache():
Expand All @@ -444,11 +475,11 @@ async def execute(self) -> UiPathRuntimeResult:
"Evaluation Set Run", attributes=span_attributes
) as span:
try:
(
evaluation_set,
evaluators,
evaluation_iterable,
) = await self.initiate_evaluation(runtime)
# Evaluation set and evaluators already loaded and event published above
evaluation_iterable = (
self._execute_eval(eval_item, evaluators, runtime)
for eval_item in evaluation_set.evaluations
)
workers = self.context.workers or 1
assert workers >= 1
eval_run_result_list = await execute_parallel(
Expand Down
26 changes: 23 additions & 3 deletions src/uipath/_cli/cli_eval.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import ast
import asyncio
import logging
import os
from typing import Any

import click
from uipath.core.tracing import UiPathTraceManager
from uipath.runtime import UiPathRuntimeContext, UiPathRuntimeFactoryRegistry

logger = logging.getLogger(__name__)

from uipath._cli._evals._console_progress_reporter import ConsoleProgressReporter
from uipath._cli._evals._evaluate import evaluate
from uipath._cli._evals._progress_reporter import StudioWebProgressReporter
Expand Down Expand Up @@ -203,8 +206,20 @@ def eval(
async def execute_eval():
event_bus = EventBus()

# Always create live tracking exporter (needed by runtime for live span tracking)
live_tracking_exporter = LlmOpsHttpExporter()

# Set trace_id early if eval_set_run_id is already known
if eval_context.eval_set_run_id:
live_tracking_exporter.trace_id = eval_context.eval_set_run_id
logger.info(
f"[TraceID] Set live_tracking_exporter.trace_id = {eval_context.eval_set_run_id} (user-provided via --eval-set-run-id)"
)

if should_register_progress_reporter:
progress_reporter = StudioWebProgressReporter(LlmOpsHttpExporter())
progress_reporter = StudioWebProgressReporter(
live_tracking_exporter
)
await progress_reporter.subscribe_to_eval_runtime_events(event_bus)

console_reporter = ConsoleProgressReporter()
Expand All @@ -224,7 +239,7 @@ async def execute_eval():
eval_context.job_id = ctx.job_id

if ctx.job_id:
trace_manager.add_span_exporter(LlmOpsHttpExporter())
trace_manager.add_span_exporter(live_tracking_exporter)

if trace_file:
trace_manager.add_span_exporter(
Expand All @@ -247,11 +262,16 @@ async def execute_eval():
trace_manager,
eval_context,
event_bus,
live_tracking_exporter,
)
else:
# Fall back to execution without overwrites
ctx.result = await evaluate(
runtime_factory, trace_manager, eval_context, event_bus
runtime_factory,
trace_manager,
eval_context,
event_bus,
live_tracking_exporter,
)
finally:
if runtime_factory:
Expand Down
Loading
Loading