Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 118 additions & 1 deletion src/uipath/_cli/_evals/_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ class UiPathEvalContext:
enable_mocker_cache: bool = False
report_coverage: bool = False
model_settings_id: str = "default"
resume: bool = False
job_id: str | None = None


class UiPathEvalRuntime:
Expand All @@ -218,7 +220,8 @@ def __init__(
self.trace_manager.tracer_provider.add_span_processor(span_processor)

self.logs_exporter: ExecutionLogsExporter = ExecutionLogsExporter()
self.execution_id = str(uuid.uuid4())
# Use job_id if available (for single runtime runs), otherwise generate UUID
self.execution_id = context.job_id or str(uuid.uuid4())
self.coverage = coverage.Coverage(branch=True)

async def __aenter__(self) -> "UiPathEvalRuntime":
Expand Down Expand Up @@ -296,6 +299,17 @@ async def initiate_evaluation(
)

async def execute(self) -> UiPathRuntimeResult:
logger.info("=" * 80)
logger.info("EVAL RUNTIME: Starting evaluation execution")
logger.info(f"EVAL RUNTIME: Execution ID: {self.execution_id}")
logger.info(f"EVAL RUNTIME: Job ID: {self.context.job_id}")
logger.info(f"EVAL RUNTIME: Resume mode: {self.context.resume}")
if self.context.resume:
logger.info(
"🟢 EVAL RUNTIME: RESUME MODE ENABLED - Will resume from suspended state"
)
logger.info("=" * 80)

# Configure model settings override before creating runtime
await self._configure_model_settings_override()

Expand Down Expand Up @@ -381,9 +395,41 @@ async def execute(self) -> UiPathRuntimeResult:
wait_for_completion=False,
)

# Collect triggers from all evaluation runs (pass-through from inner runtime)
logger.info("=" * 80)
logger.info(
"EVAL RUNTIME: Collecting triggers from all evaluation runs"
)
all_triggers = []
for eval_run_result in results.evaluation_set_results:
if (
eval_run_result.agent_execution_output
and eval_run_result.agent_execution_output.result
):
runtime_result = (
eval_run_result.agent_execution_output.result
)
if runtime_result.trigger:
all_triggers.append(runtime_result.trigger)
if runtime_result.triggers:
all_triggers.extend(runtime_result.triggers)

if all_triggers:
logger.info(
f"EVAL RUNTIME: ✅ Passing through {len(all_triggers)} trigger(s) to top-level result"
)
for i, trigger in enumerate(all_triggers, 1):
logger.info(
f"EVAL RUNTIME: Pass-through trigger {i}: {trigger.model_dump(by_alias=True)}"
)
else:
logger.info("EVAL RUNTIME: No triggers to pass through")
logger.info("=" * 80)

result = UiPathRuntimeResult(
output={**results.model_dump(by_alias=True)},
status=UiPathRuntimeStatus.SUCCESSFUL,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the runtime is suspended, you need to pass the result as suspended to the serverless exector

triggers=all_triggers if all_triggers else None,
)
return result
except Exception as e:
Expand Down Expand Up @@ -448,6 +494,14 @@ async def _execute_eval(
agent_execution_output = await self.execute_runtime(
eval_item, execution_id, runtime
)

logger.info(
f"DEBUG: Agent execution result status: {agent_execution_output.result.status}"
)
logger.info(
f"DEBUG: Agent execution result trigger: {agent_execution_output.result.trigger}"
)

except Exception as e:
if self.context.verbose:
if isinstance(e, EvaluationRuntimeException):
Expand Down Expand Up @@ -483,6 +537,69 @@ async def _execute_eval(
)
raise

# Check if execution was suspended (e.g., waiting for RPA job completion)
if (
agent_execution_output.result.status
== UiPathRuntimeStatus.SUSPENDED
):
# For suspended executions, we don't run evaluators yet
# The serverless executor should save the triggers and resume later
logger.info("=" * 80)
logger.info(
f"🔴 EVAL RUNTIME: DETECTED SUSPENSION for eval '{eval_item.name}' (id: {eval_item.id})"
)
logger.info("EVAL RUNTIME: Agent returned SUSPENDED status")

# Extract triggers from result
triggers = []
if agent_execution_output.result.trigger:
triggers.append(agent_execution_output.result.trigger)
if agent_execution_output.result.triggers:
triggers.extend(agent_execution_output.result.triggers)

logger.info(
f"EVAL RUNTIME: Extracted {len(triggers)} trigger(s) from suspended execution"
)
for i, trigger in enumerate(triggers, 1):
logger.info(
f"EVAL RUNTIME: Trigger {i}: {trigger.model_dump(by_alias=True)}"
)
logger.info("=" * 80)

# IMPORTANT: Always include execution output with triggers when suspended
# This ensures triggers are visible in the output JSON for serverless executor
evaluation_run_results.agent_execution_output = (
convert_eval_execution_output_to_serializable(
agent_execution_output
)
)

# Publish suspended status event
await self.event_bus.publish(
EvaluationEvents.UPDATE_EVAL_RUN,
EvalRunUpdatedEvent(
execution_id=execution_id,
eval_item=eval_item,
eval_results=[],
success=True, # Not failed, just suspended
agent_output={
"status": "suspended",
"triggers": [
t.model_dump(by_alias=True) for t in triggers
],
},
agent_execution_time=agent_execution_output.execution_time,
spans=agent_execution_output.spans,
logs=agent_execution_output.logs,
exception_details=None,
),
wait_for_completion=False,
)

# Return partial results with trigger information
# The evaluation will be completed when resumed
return evaluation_run_results

if self.context.verbose:
evaluation_run_results.agent_execution_output = (
convert_eval_execution_output_to_serializable(
Expand Down
12 changes: 12 additions & 0 deletions src/uipath/_cli/cli_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ def setup_reporting_prereq(no_report: bool) -> bool:
type=click.Path(exists=False),
help="File path where traces will be written in JSONL format",
)
@click.option(
"--resume",
is_flag=True,
default=False,
help="Resume execution from a previous suspended state",
)
def eval(
entrypoint: str | None,
eval_set: str | None,
Expand All @@ -118,6 +124,7 @@ def eval(
report_coverage: bool,
model_settings_id: str,
trace_file: str | None,
resume: bool,
) -> None:
"""Run an evaluation set against the agent.

Expand All @@ -131,6 +138,7 @@ def eval(
enable_mocker_cache: Enable caching for LLM mocker responses
report_coverage: Report evaluation coverage
model_settings_id: Model settings ID to override agent settings
resume: Resume execution from a previous suspended state
"""
should_register_progress_reporter = setup_reporting_prereq(no_report)

Expand Down Expand Up @@ -166,6 +174,7 @@ def eval(
eval_context.eval_ids = eval_ids
eval_context.report_coverage = report_coverage
eval_context.model_settings_id = model_settings_id
eval_context.resume = resume

try:

Expand All @@ -189,6 +198,9 @@ async def execute_eval():
trace_manager=trace_manager,
command="eval",
) as ctx:
# Set job_id in eval context for single runtime runs
eval_context.job_id = ctx.job_id

if ctx.job_id:
trace_manager.add_span_exporter(LlmOpsHttpExporter())

Expand Down
87 changes: 87 additions & 0 deletions src/uipath/functions/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
UiPathErrorContract,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to undo everything in this file (this is used for arbitrary python code, no langgraph deps)

UiPathRuntimeError,
)
from uipath.runtime.resumable.trigger import (
UiPathResumeTrigger,
UiPathResumeTriggerName,
UiPathResumeTriggerType,
)
from uipath.runtime.schema import UiPathRuntimeSchema

from .schema_gen import get_type_schema
Expand Down Expand Up @@ -124,6 +129,71 @@ async def _execute_function(

return convert_from_class(result) if result is not None else {}

def _detect_langgraph_interrupt(
self, output: dict[str, Any]
) -> UiPathResumeTrigger | None:
"""Detect LangGraph __interrupt__ field and extract InvokeProcess trigger.

LangGraph's interrupt() creates an __interrupt__ field in the output dict:
{
"query": "...",
"final_result": "",
"__interrupt__": [Interrupt(value=InvokeProcess(...), id="...")]
}

We extract the InvokeProcess from the interrupt and convert it to a UiPath trigger.
"""
try:
if not isinstance(output, dict):
return None

# Check for LangGraph's __interrupt__ field
if "__interrupt__" not in output:
return None

interrupts = output["__interrupt__"]
if not interrupts or not isinstance(interrupts, list):
logger.warning("__interrupt__ field exists but is not a list")
return None

# Extract first interrupt
interrupt_obj = interrupts[0]
if not hasattr(interrupt_obj, "value"):
logger.warning("Interrupt object missing 'value' attribute")
return None

invoke_process = interrupt_obj.value

# Check if it's an InvokeProcess object (has name and input_arguments)
if not (
hasattr(invoke_process, "name")
and hasattr(invoke_process, "input_arguments")
):
logger.warning(
f"Interrupt value is not InvokeProcess (type: {type(invoke_process)})"
)
return None

logger.info(
f"Detected LangGraph interrupt - suspending execution for process: {invoke_process.name}"
)

# Convert InvokeProcess to UiPath trigger
return UiPathResumeTrigger(
trigger_type=UiPathResumeTriggerType.JOB,
trigger_name=UiPathResumeTriggerName.JOB,
item_key=f"job-{uuid.uuid4()}", # Generate unique job key
folder_path=getattr(invoke_process, "process_folder_path", "Shared"),
payload={
"process_name": invoke_process.name,
"input_arguments": invoke_process.input_arguments or {},
"folder_key": getattr(invoke_process, "process_folder_key", None),
},
)
except Exception as e:
logger.warning(f"Failed to detect LangGraph interrupt: {e}")
return None

async def execute(
self,
input: dict[str, Any] | None = None,
Expand All @@ -134,6 +204,23 @@ async def execute(
func = self._load_function()
output = await self._execute_function(func, input or {})

logger.info(
f"Output type: {type(output)}, has __interrupt__: {'__interrupt__' in output if isinstance(output, dict) else False}"
)

# Check if output represents a LangGraph interrupt (suspend)
trigger = self._detect_langgraph_interrupt(output)
logger.info(f"Trigger detected: {trigger}")
if trigger:
logger.info(
f"Detected LangGraph interrupt - suspending execution with trigger: {trigger.item_key}"
)
return UiPathRuntimeResult(
output=None, # No final output yet (suspended)
status=UiPathRuntimeStatus.SUSPENDED,
trigger=trigger,
)

return UiPathRuntimeResult(
output=output,
status=UiPathRuntimeStatus.SUCCESSFUL,
Expand Down
Loading