Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/uipath/platform/common/interrupt_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ class WaitDeepRag(BaseModel):
index_folder_key: str | None = None


class CreateEphemeralIndex(BaseModel):
"""Model representing a Jit Index task creation."""

usage: str
attachments: list[str]


class WaitEphemeralIndex(BaseModel):
"""Model representing a wait Jit Index task."""

id: str


class CreateBatchTransform(BaseModel):
"""Model representing a Batch Transform task creation."""

Expand Down
2 changes: 2 additions & 0 deletions src/uipath/platform/context_grounding/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
DeepRagCreationResponse,
DeepRagResponse,
DeepRagStatus,
IndexStatus,
)
from .context_grounding_index import ContextGroundingIndex
from .context_grounding_payloads import (
Expand Down Expand Up @@ -52,6 +53,7 @@
"DeepRagCreationResponse",
"DeepRagResponse",
"DeepRagStatus",
"IndexStatus",
"DropboxDataSource",
"DropboxSourceConfig",
"GoogleDriveDataSource",
Expand Down
102 changes: 62 additions & 40 deletions src/uipath/platform/context_grounding/_context_grounding_service.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import uuid
from pathlib import Path
from typing import Annotated, Any, Dict, List, Optional, Tuple, Union

import httpx
from pydantic import Field, TypeAdapter
from typing_extensions import deprecated

from ..._utils import Endpoint, RequestSpec, header_folder, resource_override
from ..._utils._ssl_context import get_httpx_client_kwargs
Expand Down Expand Up @@ -33,10 +33,12 @@
)
from .context_grounding_index import ContextGroundingIndex
from .context_grounding_payloads import (
AttachmentsDataSource,
BucketDataSource,
BucketSourceConfig,
ConfluenceDataSource,
ConfluenceSourceConfig,
CreateEphemeralIndexPayload,
CreateIndexPayload,
DropboxDataSource,
DropboxSourceConfig,
Expand Down Expand Up @@ -276,31 +278,19 @@ async def retrieve_async(
raise Exception("ContextGroundingIndex not found") from e

@traced(name="contextgrounding_retrieve_by_id", run_type="uipath")
@deprecated("Use retrieve instead")
def retrieve_by_id(
self,
id: str,
folder_key: Optional[str] = None,
folder_path: Optional[str] = None,
) -> Any:
def retrieve_by_id(self, id: str) -> Any:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we keep the folder params?

"""Retrieve context grounding index information by its ID.
This method provides direct access to a context index using its unique
identifier, which can be more efficient than searching by name.
Args:
id (str): The unique identifier of the context index.
folder_key (Optional[str]): The key of the folder where the index resides.
folder_path (Optional[str]): The path of the folder where the index resides.
Returns:
Any: The index information, including its configuration and metadata.
"""
spec = self._retrieve_by_id_spec(
id,
folder_key=folder_key,
folder_path=folder_path,
)
spec = self._retrieve_by_id_spec(id)

return self.request(
spec.method,
Expand All @@ -309,31 +299,19 @@ def retrieve_by_id(
).json()

@traced(name="contextgrounding_retrieve_by_id", run_type="uipath")
@deprecated("Use retrieve_async instead")
async def retrieve_by_id_async(
self,
id: str,
folder_key: Optional[str] = None,
folder_path: Optional[str] = None,
) -> Any:
async def retrieve_by_id_async(self, id: str) -> Any:
"""Retrieve asynchronously context grounding index information by its ID.
This method provides direct access to a context index using its unique
identifier, which can be more efficient than searching by name.
Args:
id (str): The unique identifier of the context index.
folder_key (Optional[str]): The key of the folder where the index resides.
folder_path (Optional[str]): The path of the folder where the index resides.
Returns:
Any: The index information, including its configuration and metadata.
"""
spec = self._retrieve_by_id_spec(
id,
folder_key=folder_key,
folder_path=folder_path,
)
spec = self._retrieve_by_id_spec(id)

response = await self.request_async(
spec.method,
Expand Down Expand Up @@ -398,6 +376,28 @@ def create_index(

return ContextGroundingIndex.model_validate(response.json())

@resource_override(resource_type="index")
@traced(name="contextgrounding_create_ephemeral_index", run_type="uipath")
def create_ephemeral_index(
self,
usage: str,
attachments: list[uuid.UUID],
) -> ContextGroundingIndex:
"""Create a new context ephemeral grounding index."""
spec = self._create_ephemeral_spec(
usage,
attachments,
)

response = self.request(
spec.method,
spec.endpoint,
json=spec.json,
headers=spec.headers,
)

return ContextGroundingIndex.model_validate(response.json())

@resource_override(resource_type="index")
@traced(name="contextgrounding_create_index", run_type="uipath")
async def create_index_async(
Expand Down Expand Up @@ -1197,6 +1197,37 @@ def _create_spec(
},
)

def _create_ephemeral_spec(
self,
usage: str,
attachments: list[uuid.UUID] = None,
) -> RequestSpec:
"""Create request spec for index creation."""
data_source_dict = self._build_ephemeral_data_source(attachments)

payload = CreateEphemeralIndexPayload(
usage=usage,
data_source=data_source_dict,
)

return RequestSpec(
method="POST",
endpoint=Endpoint("/ecs_/v2/indexes/createephemeral"),
json=payload.model_dump(by_alias=True, exclude_none=True),
headers={},
)

def _build_ephemeral_data_source(
self, attachments: list[uuid.UUID]
) -> Dict[str, Any]:
data_source: AttachmentsDataSource
data_source = AttachmentsDataSource(attachments=attachments)
return data_source.model_dump(
by_alias=True,
exclude_none=True,
mode="json",
)

def _build_data_source(self, source: SourceConfig) -> Dict[str, Any]:
"""Build data source configuration from typed source config.
Expand Down Expand Up @@ -1265,20 +1296,11 @@ def _build_data_source(self, source: SourceConfig) -> Dict[str, Any]:

return data_source.model_dump(by_alias=True, exclude_none=True)

def _retrieve_by_id_spec(
self,
id: str,
folder_key: Optional[str] = None,
folder_path: Optional[str] = None,
) -> RequestSpec:
folder_key = self._resolve_folder_key(folder_key, folder_path)

def _retrieve_by_id_spec(self, id: str) -> RequestSpec:
return RequestSpec(
method="GET",
endpoint=Endpoint(f"/ecs_/v2/indexes/{id}"),
headers={
**header_folder(folder_key, None),
},
headers={},
)

def _delete_by_id_spec(
Expand Down
9 changes: 9 additions & 0 deletions src/uipath/platform/context_grounding/context_grounding.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ class DeepRagStatus(str, Enum):
FAILED = "Failed"


class IndexStatus(str, Enum):
"""Enum representing possible index tasks status."""

QUEUED = "Queued"
IN_PROGRESS = "InProgress"
SUCCESSFUL = "Successful"
FAILED = "Failed"


class Citation(BaseModel):
"""Model representing a deep RAG citation."""

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Payload models for context grounding index creation and configuration."""

import re
import uuid
from typing import Any, Dict, Literal, Optional, Union

from pydantic import BaseModel, ConfigDict, Field, model_validator
Expand Down Expand Up @@ -82,6 +83,10 @@ class ConfluenceDataSource(DataSourceBase):
space_id: str = Field(alias="spaceId", description="Space ID")


class AttachmentsDataSource(BaseModel):
attachments: list[uuid.UUID] = Field(description="List of attachment ids")


class Indexer(BaseModel):
"""Configuration for periodic indexing of data sources."""

Expand Down Expand Up @@ -136,6 +141,17 @@ class CreateIndexPayload(BaseModel):
model_config = ConfigDict(populate_by_name=True)


class CreateEphemeralIndexPayload(BaseModel):
""" """

usage: str = Field(description="Index usage")
data_source: Dict[str, Any] = Field(
alias="dataSource", description="Data source configuration"
)

model_config = ConfigDict(populate_by_name=True)


# user-facing source configuration models
class BaseSourceConfig(BaseModel):
"""Base configuration for all source types."""
Expand Down
59 changes: 58 additions & 1 deletion src/uipath/platform/resume_triggers/_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,14 @@
WaitJob,
WaitTask,
)
from uipath.platform.context_grounding import DeepRagStatus
from uipath.platform.common.interrupt_models import (
CreateEphemeralIndex,
WaitEphemeralIndex,
)
from uipath.platform.context_grounding import DeepRagStatus, IndexStatus
from uipath.platform.context_grounding.context_grounding_index import (
ContextGroundingIndex,
)
from uipath.platform.errors import (
BatchTransformNotCompleteException,
ExtractionNotCompleteException,
Expand Down Expand Up @@ -226,6 +233,33 @@ async def read_trigger(self, trigger: UiPathResumeTrigger) -> Any | None:

return trigger_response

case UiPathResumeTriggerType.EPHEMERAL_INDEX:
if trigger.item_key:
index = uipath.context_grounding.retrieve_by_id(trigger.item_key)

ephemeral_index = ContextGroundingIndex(**index)

ephemeral_index_status = ephemeral_index.last_ingestion_status

if ephemeral_index_status in (
IndexStatus.QUEUED,
IndexStatus.IN_PROGRESS,
):
raise UiPathPendingTriggerError(
ErrorCategory.SYSTEM,
f"Index ingestion is not finished yet. Current status: {ephemeral_index_status}",
)

if ephemeral_index_status != IndexStatus.SUCCESSFUL:
raise UiPathFaultedTriggerError(
ErrorCategory.USER,
f"Index ingestion '{ephemeral_index.name}' did not finish successfully.",
)

trigger_response = ephemeral_index

return trigger_response

case UiPathResumeTriggerType.BATCH_RAG:
if trigger.item_key:
destination_path = self._extract_field(
Expand Down Expand Up @@ -354,6 +388,10 @@ async def create_trigger(self, suspend_value: Any) -> UiPathResumeTrigger:
await self._handle_deep_rag_job_trigger(
suspend_value, resume_trigger, uipath
)
case UiPathResumeTriggerType.EPHEMERAL_INDEX:
await self._handle_ephemeral_index_job_trigger(
suspend_value, resume_trigger, uipath
)
case UiPathResumeTriggerType.BATCH_RAG:
await self._handle_batch_rag_job_trigger(
suspend_value, resume_trigger, uipath
Expand Down Expand Up @@ -391,6 +429,8 @@ def _determine_trigger_type(self, value: Any) -> UiPathResumeTriggerType:
return UiPathResumeTriggerType.JOB
if isinstance(value, (CreateDeepRag, WaitDeepRag)):
return UiPathResumeTriggerType.DEEP_RAG
if isinstance(value, (CreateEphemeralIndex, WaitEphemeralIndex)):
return UiPathResumeTriggerType.EPHEMERAL_INDEX
if isinstance(value, (CreateBatchTransform, WaitBatchTransform)):
return UiPathResumeTriggerType.BATCH_RAG
if isinstance(value, (DocumentExtraction, WaitDocumentExtraction)):
Expand All @@ -415,6 +455,8 @@ def _determine_trigger_name(self, value: Any) -> UiPathResumeTriggerName:
return UiPathResumeTriggerName.JOB
if isinstance(value, (CreateDeepRag, WaitDeepRag)):
return UiPathResumeTriggerName.DEEP_RAG
if isinstance(value, (CreateEphemeralIndex, WaitEphemeralIndex)):
return UiPathResumeTriggerName.EPHEMERAL_INDEX
if isinstance(value, (CreateBatchTransform, WaitBatchTransform)):
return UiPathResumeTriggerName.BATCH_RAG
if isinstance(value, (DocumentExtraction, WaitDocumentExtraction)):
Expand Down Expand Up @@ -479,6 +521,21 @@ async def _handle_deep_rag_job_trigger(
raise Exception("Failed to start deep rag")
resume_trigger.item_key = deep_rag.id

async def _handle_ephemeral_index_job_trigger(
self, value: Any, resume_trigger: UiPathResumeTrigger, uipath: UiPath
) -> None:
"""Handle ephemeral index"""
if isinstance(value, WaitEphemeralIndex):
resume_trigger.item_key = value.ephemeral_index.id
elif isinstance(value, CreateEphemeralIndex):
ephemeral_index = uipath.context_grounding.create_ephemeral_index(
usage=value.usage,
attachments=value.attachments,
)
if not ephemeral_index:
raise Exception("Failed to start ephemeral index")
resume_trigger.item_key = ephemeral_index.id

async def _handle_batch_rag_job_trigger(
self, value: Any, resume_trigger: UiPathResumeTrigger, uipath: UiPath
) -> None:
Expand Down
Loading