Skip to content
This repository was archived by the owner on Mar 4, 2026. It is now read-only.

Commit 429836e

Browse files
committed
feat: event bus, triggers
1 parent fd9e827 commit 429836e

8 files changed

Lines changed: 289 additions & 5 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "uipath-core"
3-
version = "0.5.1"
3+
version = "0.5.2"
44
description = "UiPath Core abstractions"
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.11"

src/uipath/core/events/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
"""This module contains the event bus implementation."""
2+
3+
from uipath.core.events._event_bus import EventBus
4+
5+
__all__ = ["EventBus"]
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
"""Event bus implementation for runtime events."""
2+
3+
import asyncio
4+
import logging
5+
from typing import Any, Callable, TypeVar
6+
7+
logger = logging.getLogger(__name__)
8+
9+
T = TypeVar("T")
10+
11+
12+
class EventBus:
13+
"""Event bus for publishing and subscribing to events."""
14+
15+
def __init__(self) -> None:
16+
"""Initialize a new EventBus instance."""
17+
self._subscribers: dict[str, list[Callable[[Any], Any]]] = {}
18+
self._running_tasks: set[asyncio.Task[Any]] = set()
19+
20+
def subscribe(self, topic: str, handler: Callable[[Any], Any]) -> None:
21+
"""Subscribe a handler method/function to a topic.
22+
23+
Args:
24+
topic: The topic name to subscribe to.
25+
handler: The async handler method/function that will handle events for this topic.
26+
"""
27+
if topic not in self._subscribers:
28+
self._subscribers[topic] = []
29+
self._subscribers[topic].append(handler)
30+
logger.debug(f"Handler registered for topic: {topic}")
31+
32+
def unsubscribe(self, topic: str, handler: Callable[[Any], Any]) -> None:
33+
"""Unsubscribe a handler from a topic.
34+
35+
Args:
36+
topic: The topic name to unsubscribe from.
37+
handler: The handler to remove.
38+
"""
39+
if topic in self._subscribers:
40+
try:
41+
self._subscribers[topic].remove(handler)
42+
if not self._subscribers[topic]:
43+
del self._subscribers[topic]
44+
logger.debug(f"Handler unregistered from topic: {topic}")
45+
except ValueError:
46+
logger.warning(f"Handler not found for topic: {topic}")
47+
48+
def _cleanup_completed_tasks(self) -> None:
49+
completed_tasks = {task for task in self._running_tasks if task.done()}
50+
self._running_tasks -= completed_tasks
51+
52+
async def publish(
53+
self, topic: str, payload: T, wait_for_completion: bool = True
54+
) -> None:
55+
"""Publish an event to all handlers of a topic.
56+
57+
Args:
58+
topic: The topic name to publish to.
59+
payload: The event payload to publish.
60+
wait_for_completion: Whether to wait for the event to be processed.
61+
"""
62+
if topic not in self._subscribers:
63+
logger.debug(f"No handlers for topic: {topic}")
64+
return
65+
66+
self._cleanup_completed_tasks()
67+
68+
tasks = []
69+
for subscriber in self._subscribers[topic]:
70+
try:
71+
task = asyncio.create_task(subscriber(payload))
72+
tasks.append(task)
73+
self._running_tasks.add(task)
74+
except Exception as e:
75+
logger.error(f"Error creating task for subscriber {subscriber}: {e}")
76+
77+
if tasks and wait_for_completion:
78+
try:
79+
await asyncio.gather(*tasks, return_exceptions=True)
80+
except Exception as e:
81+
logger.error(f"Error during event processing for topic {topic}: {e}")
82+
finally:
83+
# Clean up the tasks we just waited for
84+
for task in tasks:
85+
self._running_tasks.discard(task)
86+
87+
def get_running_tasks_count(self) -> int:
88+
"""Get the number of currently running subscriber tasks.
89+
90+
Returns:
91+
Number of running tasks.
92+
"""
93+
self._cleanup_completed_tasks()
94+
return len(self._running_tasks)
95+
96+
async def wait_for_all(self, timeout: float | None = None) -> None:
97+
"""Wait for all currently running subscriber tasks to complete.
98+
99+
Args:
100+
timeout: Maximum time to wait in seconds. If None, waits indefinitely.
101+
"""
102+
self._cleanup_completed_tasks()
103+
104+
if not self._running_tasks:
105+
logger.debug("No running tasks to wait for")
106+
return
107+
108+
logger.debug(
109+
f"Waiting for {len(self._running_tasks)} EventBus tasks to complete..."
110+
)
111+
112+
try:
113+
tasks_to_wait = list(self._running_tasks)
114+
115+
if timeout:
116+
await asyncio.wait_for(
117+
asyncio.gather(*tasks_to_wait, return_exceptions=True),
118+
timeout=timeout,
119+
)
120+
else:
121+
await asyncio.gather(*tasks_to_wait, return_exceptions=True)
122+
123+
logger.debug("All EventBus tasks completed")
124+
125+
except asyncio.TimeoutError:
126+
logger.warning(f"Timeout waiting for EventBus tasks after {timeout}s")
127+
for task in tasks_to_wait:
128+
if not task.done():
129+
task.cancel()
130+
except Exception as e:
131+
logger.error(f"Error waiting for EventBus tasks: {e}")
132+
finally:
133+
self._cleanup_completed_tasks()
134+
135+
def get_subscribers_count(self, topic: str) -> int:
136+
"""Get the number of subscribers for a topic.
137+
138+
Args:
139+
topic: The topic name.
140+
141+
Returns:
142+
Number of handlers for the topic.
143+
"""
144+
return len(self._subscribers.get(topic, []))
145+
146+
def clear_subscribers(self, topic: str | None = None) -> None:
147+
"""Clear subscribers for a topic or all topics.
148+
149+
Args:
150+
topic: The topic to clear. If None, clears all topics.
151+
"""
152+
if topic is None:
153+
self._subscribers.clear()
154+
logger.debug("All handlers cleared")
155+
elif topic in self._subscribers:
156+
del self._subscribers[topic]
157+
logger.debug(f"Handlers cleared for topic: {topic}")
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""Serialization utilities for converting Python objects to various formats."""
22

3-
from .json import serialize_defaults, serialize_json
3+
from .json import serialize_defaults, serialize_json, serialize_object
44

5-
__all__ = ["serialize_defaults", "serialize_json"]
5+
__all__ = ["serialize_defaults", "serialize_json", "serialize_object"]

src/uipath/core/serialization/json.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
"""JSON serialization utilities for converting Python objects to JSON formats."""
22

33
import json
4+
import uuid
45
from dataclasses import asdict, is_dataclass
5-
from datetime import datetime, timezone
6+
from datetime import date, datetime, time, timezone
67
from enum import Enum
78
from typing import Any, cast
89
from zoneinfo import ZoneInfo
@@ -156,3 +157,40 @@ def serialize_json(obj: Any) -> str:
156157
'{"name": "Review PR", "created": "2024-01-15T10:30:00"}'
157158
"""
158159
return json.dumps(obj, default=serialize_defaults)
160+
161+
162+
def serialize_object(obj):
163+
"""Recursively serializes an object and all its nested components."""
164+
# Handle Pydantic models
165+
if hasattr(obj, "model_dump"):
166+
return serialize_object(obj.model_dump(by_alias=True))
167+
elif hasattr(obj, "dict"):
168+
return serialize_object(obj.dict())
169+
elif hasattr(obj, "to_dict"):
170+
return serialize_object(obj.to_dict())
171+
# Special handling for UiPathBaseRuntimeErrors
172+
elif hasattr(obj, "as_dict"):
173+
return serialize_object(obj.as_dict)
174+
elif isinstance(obj, (datetime, date, time)):
175+
return obj.isoformat()
176+
# Handle dictionaries
177+
elif isinstance(obj, dict):
178+
return {k: serialize_object(v) for k, v in obj.items()}
179+
# Handle lists
180+
elif isinstance(obj, list):
181+
return [serialize_object(item) for item in obj]
182+
# Handle exceptions
183+
elif isinstance(obj, Exception):
184+
return str(obj)
185+
# Handle other iterable objects (convert to dict first)
186+
elif hasattr(obj, "__iter__") and not isinstance(obj, (str, bytes)):
187+
try:
188+
return serialize_object(dict(obj))
189+
except (TypeError, ValueError):
190+
return obj
191+
# UUIDs must be serialized explicitly
192+
elif isinstance(obj, uuid.UUID):
193+
return str(obj)
194+
# Return primitive types as is
195+
else:
196+
return obj
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
"""Module containing UiPath trigger definitions."""
2+
3+
__all__ = [
4+
"UiPathResumeTrigger",
5+
"UiPathResumeTriggerType",
6+
"UiPathApiTrigger",
7+
"UiPathResumeTriggerName",
8+
]
9+
10+
from uipath.core.triggers.trigger import (
11+
UiPathApiTrigger,
12+
UiPathResumeTrigger,
13+
UiPathResumeTriggerName,
14+
UiPathResumeTriggerType,
15+
)
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
"""Module defining resume trigger types and data models."""
2+
3+
from enum import Enum
4+
from typing import Any
5+
6+
from pydantic import BaseModel, ConfigDict, Field
7+
8+
9+
class UiPathResumeTriggerType(str, Enum):
10+
"""Constants representing different types of resume job triggers in the system."""
11+
12+
NONE = "None"
13+
QUEUE_ITEM = "QueueItem"
14+
JOB = "Job"
15+
TASK = "Task"
16+
TIMER = "Timer"
17+
INBOX = "Inbox"
18+
API = "Api"
19+
DEEP_RAG = "DeepRag"
20+
BATCH_RAG = "BatchRag"
21+
INDEX_INGESTION = "IndexIngestion"
22+
IXP_EXTRACTION = "IxpExtraction"
23+
IXP_VS_ESCALATION = "IxpVsEscalation"
24+
25+
26+
class UiPathResumeTriggerName(str, Enum):
27+
"""Constants representing specific names for resume job triggers in the system."""
28+
29+
UNKNOWN = "Unknown"
30+
QUEUE_ITEM = "QueueItem"
31+
JOB = "Job"
32+
TASK = "Task"
33+
ESCALATION = "Escalation"
34+
TIMER = "Timer"
35+
INBOX = "Inbox"
36+
API = "Api"
37+
DEEP_RAG = "DeepRag"
38+
BATCH_RAG = "BatchRag"
39+
INDEX_INGESTION = "IndexIngestion"
40+
EXTRACTION = "Extraction"
41+
IXP_VS_ESCALATION = "IxpVsEscalation"
42+
43+
44+
class UiPathApiTrigger(BaseModel):
45+
"""API resume trigger request."""
46+
47+
inbox_id: str | None = Field(default=None, alias="inboxId")
48+
request: Any = None
49+
50+
model_config = ConfigDict(validate_by_name=True)
51+
52+
53+
class UiPathResumeTrigger(BaseModel):
54+
"""Information needed to resume execution."""
55+
56+
interrupt_id: str | None = Field(default=None, alias="interruptId")
57+
trigger_type: UiPathResumeTriggerType = Field(
58+
default=UiPathResumeTriggerType.API, alias="triggerType"
59+
)
60+
trigger_name: UiPathResumeTriggerName = Field(
61+
default=UiPathResumeTriggerName.UNKNOWN, alias="triggerName", exclude=True
62+
)
63+
item_key: str | None = Field(default=None, alias="itemKey")
64+
api_resume: UiPathApiTrigger | None = Field(default=None, alias="apiResume")
65+
folder_path: str | None = Field(default=None, alias="folderPath")
66+
folder_key: str | None = Field(default=None, alias="folderKey")
67+
payload: Any | None = Field(default=None, alias="interruptObject", exclude=True)
68+
69+
model_config = ConfigDict(validate_by_name=True)

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)