Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-runtime"
version = "0.10.0"
version = "0.10.1"
description = "Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
87 changes: 61 additions & 26 deletions src/uipath/runtime/logging/_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,16 @@ def setup(self) -> None:
# logger.propagate remains True (default)
self.patched_loggers.add(logger_name)

# Child executions should redirect stdout/stderr to their own handler
# This ensures print statements are captured per execution
self._redirect_stdout_stderr()
# Register our handler on stdout/stderr loggers so that
# print() output routed through the master's LoggerWriter
# is captured per-execution via filters.
# We do NOT replace sys.stdout/sys.stderr — the master owns those.
if not isinstance(sys.stdout, LoggerWriter):
self.logger.warning(
"Child interceptor set up without a master LoggerWriter on sys.stdout. "
"print() output will not be captured for this execution context."
)
self._register_stdout_stderr_handlers()
else:
# Master execution mode: remove all handlers and add only ours
self._clean_all_handlers(self.root_logger)
Expand All @@ -165,28 +172,33 @@ def setup(self) -> None:
# Master redirects stdout/stderr
self._redirect_stdout_stderr()

def _register_stdout_stderr_handlers(self) -> None:
"""Register our handler on stdout/stderr loggers without replacing the streams."""
stdout_logger = logging.getLogger("stdout")
stderr_logger = logging.getLogger("stderr")

stdout_logger.propagate = False
stderr_logger.propagate = False

if self.log_handler not in stdout_logger.handlers:
stdout_logger.addHandler(self.log_handler)
if self.log_handler not in stderr_logger.handlers:
stderr_logger.addHandler(self.log_handler)

def _redirect_stdout_stderr(self) -> None:
"""Redirect stdout and stderr to the logging system."""
# Set up stdout and stderr loggers
"""Redirect stdout and stderr to the logging system.

Only called by master execution mode. Replaces sys.stdout/sys.stderr
with LoggerWriter instances that route output through the logging system.
"""
stdout_logger = logging.getLogger("stdout")
stderr_logger = logging.getLogger("stderr")

if self.execution_id:
# Child execution: add our handler to stdout/stderr loggers
stdout_logger.propagate = False
stderr_logger.propagate = False

if self.log_handler not in stdout_logger.handlers:
stdout_logger.addHandler(self.log_handler)
if self.log_handler not in stderr_logger.handlers:
stderr_logger.addHandler(self.log_handler)
else:
# Master execution: clean and set up handlers
stdout_logger.propagate = False
stderr_logger.propagate = False
stdout_logger.propagate = False
stderr_logger.propagate = False

self._clean_all_handlers(stdout_logger)
self._clean_all_handlers(stderr_logger)
self._clean_all_handlers(stdout_logger)
self._clean_all_handlers(stderr_logger)

# Use the min_level in the LoggerWriter to filter messages
sys.stdout = LoggerWriter(
Expand All @@ -197,16 +209,37 @@ def _redirect_stdout_stderr(self) -> None:
)

def teardown(self) -> None:
"""Restore original logging configuration."""
# Clear the context variable
"""Restore original logging configuration.

IMPORTANT: The ordering below is critical. Flushing must happen before
clearing the context variable and before removing handlers. Otherwise:
- If context is cleared first, the execution filter won't match the
flushed records and they'll be dropped.
- If handlers are removed first, the flushed records have no destination.
"""
# Step 1: Flush LoggerWriter buffers while context and handlers are still active.
# Child mode: flush only this context's buffer from the shared LoggerWriter.
# Master mode: flush ALL remaining buffers before restoring streams.
if self.execution_id:
if isinstance(sys.stdout, LoggerWriter):
sys.stdout.flush()
if isinstance(sys.stderr, LoggerWriter):
sys.stderr.flush()
else:
if isinstance(sys.stdout, LoggerWriter):
sys.stdout.flush_all()
if isinstance(sys.stderr, LoggerWriter):
sys.stderr.flush_all()

# Step 2: Clear the context variable (after flush used it)
if self.execution_id:
current_execution_id.set(None)

# Restore the original disable level
# Step 3: Restore the original disable level
if not self.execution_id:
logging.disable(self.original_disable_level)

# Remove our handler and filter
# Step 4: Remove our handler and filter
if self.execution_filter:
self.log_handler.removeFilter(self.execution_filter)

Expand Down Expand Up @@ -240,8 +273,10 @@ def teardown(self) -> None:
if self._owns_handler:
self.log_handler.close()

# Only restore streams if we redirected them
if self.original_stdout and self.original_stderr:
# Step 5: Only master restores streams. Children never replaced
# sys.stdout/sys.stderr (they only registered handlers on the loggers),
# so there is nothing for them to restore here.
if not self.execution_id and self.original_stdout and self.original_stderr:
sys.stdout = self.original_stdout
sys.stderr = self.original_stderr

Expand Down
52 changes: 42 additions & 10 deletions src/uipath/runtime/logging/_writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@
import logging
from typing import TextIO

from uipath.runtime.logging._context import current_execution_id


class LoggerWriter:
"""Redirect stdout/stderr to logging system."""
"""Redirect stdout/stderr to logging system.

Maintains per-execution-context buffers so that concurrent async tasks
(e.g. parallel eval runs) do not interleave partial lines.
"""

def __init__(
self,
Expand All @@ -18,7 +24,10 @@ def __init__(
self.logger = logger
self.level = level
self.min_level = min_level
self.buffer = ""
# Keyed by current_execution_id (None for master context).
# A single shared buffer would interleave partial lines from
# concurrent async tasks writing to the same sys.stdout.
self._buffers: dict[str | None, str] = {}
self.sys_file = sys_file
self._in_logging = False # Recursion guard

Expand All @@ -35,17 +44,22 @@ def write(self, message: str) -> None:

try:
self._in_logging = True
self.buffer += message
while "\n" in self.buffer:
line, self.buffer = self.buffer.split("\n", 1)
ctx = current_execution_id.get()
buf = self._buffers.get(ctx, "") + message
while "\n" in buf:
line, buf = buf.split("\n", 1)
# Only log if the message is not empty and the level is sufficient
if line and self.level >= self.min_level:
self.logger._log(self.level, line, ())
if buf:
self._buffers[ctx] = buf
else:
self._buffers.pop(ctx, None)
finally:
self._in_logging = False

def flush(self) -> None:
"""Flush any remaining buffered messages to the logger."""
"""Flush the current execution context's buffered messages to the logger."""
if self._in_logging:
if self.sys_file:
try:
Expand All @@ -56,10 +70,28 @@ def flush(self) -> None:

try:
self._in_logging = True
# Log any remaining content in the buffer on flush
if self.buffer and self.level >= self.min_level:
self.logger._log(self.level, self.buffer, ())
self.buffer = ""
ctx = current_execution_id.get()
buf = self._buffers.pop(ctx, "")
if buf and self.level >= self.min_level:
self.logger._log(self.level, buf, ())
finally:
self._in_logging = False

def flush_all(self) -> None:
"""Flush all execution contexts' buffered messages. Called by master teardown.

Intentionally ignores current_execution_id — iterates all keys
directly so that no context's partial lines are lost.
"""
if self._in_logging:
return

try:
self._in_logging = True
for buf in self._buffers.values():
if buf and self.level >= self.min_level:
self.logger._log(self.level, buf, ())
self._buffers.clear()
finally:
self._in_logging = False

Expand Down
30 changes: 29 additions & 1 deletion tests/test_executor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Simple test for runtime factory and executor span capture."""

import logging
import sys
from typing import Any, AsyncGenerator, TypeVar

import pytest
Expand All @@ -13,6 +15,7 @@
UiPathRuntimeProtocol,
)
from uipath.runtime.base import UiPathStreamOptions
from uipath.runtime.logging._interceptor import UiPathRuntimeLogsInterceptor
from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus
from uipath.runtime.schema import UiPathRuntimeSchema

Expand Down Expand Up @@ -119,9 +122,32 @@ async def new_runtime(
return self.runtime_class()


@pytest.fixture(autouse=True)
def _isolate_logging():
"""Save and restore logging state so tests don't leak into each other."""
root = logging.getLogger()
original_level = root.level
original_handlers = list(root.handlers)
original_stdout = sys.stdout
original_stderr = sys.stderr
yield
root.setLevel(original_level)
root.handlers = original_handlers
sys.stdout = original_stdout
sys.stderr = original_stderr
logging.disable(logging.NOTSET)


@pytest.mark.asyncio
async def test_multiple_factories_same_executor():
async def test_multiple_factories_same_executor(tmp_path):
"""Test factories using same trace manager, verify spans are captured correctly."""
# Set up a master interceptor so that sys.stdout is a LoggerWriter,
# matching real production usage where UiPathRuntimeContext provides one.
master = UiPathRuntimeLogsInterceptor(
job_id="test-job", dir=str(tmp_path), file="test.log"
)
master.setup()

trace_manager = UiPathTraceManager()

# Create factories for different runtimes
Expand Down Expand Up @@ -228,3 +254,5 @@ async def test_multiple_factories_same_executor():
assert execution_runtime_c.log_handler
assert len(execution_runtime_c.log_handler.buffer) > 0
assert execution_runtime_c.log_handler.buffer[0].msg == "executing {'input': 'c'}"

master.teardown()
Loading
Loading