-
Notifications
You must be signed in to change notification settings - Fork 33
feat: add Data Fabric tool support #726
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| """Registry for resource types that contribute init-time context. | ||
|
|
||
| Resource modules self-register by calling ``register_init_context_provider`` | ||
| at module level. The INIT node calls ``gather_init_context`` to collect | ||
| additional context from all registered providers, without needing to know | ||
| which resource types participate. | ||
| """ | ||
|
|
||
| import logging | ||
| from typing import Protocol, Sequence | ||
|
|
||
| from uipath.agent.models.agent import BaseAgentResourceConfig | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class InitContextProvider(Protocol): | ||
| """Contract for a resource type's init-time context builder.""" | ||
|
|
||
| async def __call__( | ||
| self, | ||
| resources: Sequence[BaseAgentResourceConfig], | ||
| ) -> str | None: ... | ||
|
|
||
|
|
||
| _registry: dict[str, InitContextProvider] = {} | ||
|
|
||
|
|
||
| def register_init_context_provider( | ||
| name: str, | ||
| provider: InitContextProvider, | ||
| ) -> None: | ||
| """Register a provider that contributes init-time context. | ||
|
|
||
| Args: | ||
| name: Identifier for logging and deduplication. | ||
| provider: Async callable matching ``InitContextProvider``. | ||
| """ | ||
| if name in _registry: | ||
| raise ValueError(f"Init context provider '{name}' is already registered") | ||
| _registry[name] = provider | ||
| logger.debug("Registered init context provider: %s", name) | ||
|
|
||
|
|
||
| async def gather_init_context( | ||
| resources: Sequence[BaseAgentResourceConfig], | ||
| ) -> str | None: | ||
| """Call all registered providers and merge their context contributions. | ||
|
|
||
| Args: | ||
| resources: The agent's resource configs. | ||
|
|
||
| Returns: | ||
| Merged context string, or None if no provider contributed. | ||
| """ | ||
| parts: list[str] = [] | ||
| for name, provider in _registry.items(): | ||
| try: | ||
| result = await provider(resources) | ||
| if result: | ||
| parts.append(result) | ||
| logger.info( | ||
| "Init context provider '%s' contributed %d chars", | ||
| name, | ||
| len(result), | ||
| ) | ||
| except Exception: | ||
| logger.exception("Init context provider '%s' failed; skipping", name) | ||
| return "\n\n".join(parts) if parts else None | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make it structured rather than free form text. Expose it as a pydantic model to be consumed by caller |
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,5 +1,6 @@ | ||||||||||||||||||||||||||||||||||||||||||||||
| """State initialization node for the ReAct Agent graph.""" | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| import logging | ||||||||||||||||||||||||||||||||||||||||||||||
| from typing import Any, Callable, Sequence | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| from langchain_core.messages import HumanMessage, SystemMessage | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -10,20 +11,38 @@ | |||||||||||||||||||||||||||||||||||||||||||||
| get_job_attachments, | ||||||||||||||||||||||||||||||||||||||||||||||
| parse_attachments_from_conversation_messages, | ||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||
| from .types import AgentResources | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| logger = logging.getLogger(__name__) | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| def create_init_node( | ||||||||||||||||||||||||||||||||||||||||||||||
| messages: Sequence[SystemMessage | HumanMessage] | ||||||||||||||||||||||||||||||||||||||||||||||
| | Callable[[Any], Sequence[SystemMessage | HumanMessage]], | ||||||||||||||||||||||||||||||||||||||||||||||
| | Callable[..., Sequence[SystemMessage | HumanMessage]], | ||||||||||||||||||||||||||||||||||||||||||||||
| input_schema: type[BaseModel] | None, | ||||||||||||||||||||||||||||||||||||||||||||||
| is_conversational: bool = False, | ||||||||||||||||||||||||||||||||||||||||||||||
| resources_for_init: AgentResources | None = None, | ||||||||||||||||||||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||||||||||||||||||||
| def graph_state_init(state: Any) -> Any: | ||||||||||||||||||||||||||||||||||||||||||||||
| async def graph_state_init(state: Any) -> Any: | ||||||||||||||||||||||||||||||||||||||||||||||
| # --- Gather init-time context from registered providers --- | ||||||||||||||||||||||||||||||||||||||||||||||
| additional_context: str | None = None | ||||||||||||||||||||||||||||||||||||||||||||||
| if resources_for_init: | ||||||||||||||||||||||||||||||||||||||||||||||
| from .init_context_registry import gather_init_context | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| additional_context = await gather_init_context(resources_for_init) | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| # --- Resolve messages --- | ||||||||||||||||||||||||||||||||||||||||||||||
| resolved_messages: Sequence[SystemMessage | HumanMessage] | Overwrite | ||||||||||||||||||||||||||||||||||||||||||||||
| if callable(messages): | ||||||||||||||||||||||||||||||||||||||||||||||
| resolved_messages = list(messages(state)) | ||||||||||||||||||||||||||||||||||||||||||||||
| if additional_context: | ||||||||||||||||||||||||||||||||||||||||||||||
| resolved_messages = list( | ||||||||||||||||||||||||||||||||||||||||||||||
| messages(state, additional_context=additional_context) | ||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||||||||
| resolved_messages = list(messages(state)) | ||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
36
to
+42
|
||||||||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||||||||
| resolved_messages = list(messages) | ||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If Useful? React with 👍 / 👎.
|
||||||||||||||||||||||||||||||||||||||||||||||
| resolved_messages = list(messages) | |
| resolved_messages = list(messages) | |
| # When using a static sequence of messages, inject any init-time context | |
| # into the system prompt so provider output (e.g., Data Fabric schemas) | |
| # is not silently ignored. | |
| if additional_context: | |
| # Try to append the additional context to the first SystemMessage. | |
| system_msg_index = next( | |
| (i for i, m in enumerate(resolved_messages) if isinstance(m, SystemMessage)), | |
| None, | |
| ) | |
| if system_msg_index is not None: | |
| system_msg = resolved_messages[system_msg_index] | |
| # Safely append to existing content, assuming string content. | |
| existing_content = str(system_msg.content) | |
| updated_content = f"{existing_content}\n\n{additional_context}" | |
| resolved_messages[system_msg_index] = SystemMessage( | |
| content=updated_content, additional_kwargs=system_msg.additional_kwargs | |
| ) | |
| else: | |
| # No SystemMessage present; prepend a new one with the additional context. | |
| resolved_messages.insert(0, SystemMessage(content=additional_context)) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,7 @@ | |
| from uipath.agent.models.agent import ( | ||
| AgentContextResourceConfig, | ||
| AgentContextRetrievalMode, | ||
| AgentContextType, | ||
| AgentToolArgumentArgumentProperties, | ||
| AgentToolArgumentProperties, | ||
| ) | ||
|
|
@@ -134,16 +135,27 @@ def is_static_query(resource: AgentContextResourceConfig) -> bool: | |
| return resource.settings.query.variant.lower() == "static" | ||
|
|
||
|
|
||
| def create_context_tool(resource: AgentContextResourceConfig) -> StructuredTool: | ||
| tool_name = sanitize_tool_name(resource.name) | ||
| def create_context_tool( | ||
| resource: AgentContextResourceConfig, | ||
| ) -> StructuredTool | BaseTool: | ||
| assert resource.context_type is not None | ||
|
|
||
| if resource.context_type == AgentContextType.DATA_FABRIC_ENTITY_SET: | ||
| from .datafabric_tool import create_datafabric_query_tool | ||
|
|
||
| return create_datafabric_query_tool() | ||
|
|
||
| assert resource.settings is not None | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Put this behind isIndexTool flag. |
||
| tool_name = sanitize_tool_name(resource.name) | ||
| retrieval_mode = resource.settings.retrieval_mode.lower() | ||
|
|
||
| if retrieval_mode == AgentContextRetrievalMode.DEEP_RAG.value.lower(): | ||
| return handle_deep_rag(tool_name, resource) | ||
| elif retrieval_mode == AgentContextRetrievalMode.BATCH_TRANSFORM.value.lower(): | ||
|
|
||
| if retrieval_mode == AgentContextRetrievalMode.BATCH_TRANSFORM.value.lower(): | ||
| return handle_batch_transform(tool_name, resource) | ||
| else: | ||
| return handle_semantic_search(tool_name, resource) | ||
|
|
||
| return handle_semantic_search(tool_name, resource) | ||
|
|
||
|
|
||
| def handle_semantic_search( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| """Data Fabric tool module for entity-based SQL queries.""" | ||
|
|
||
| import logging | ||
| from typing import Sequence | ||
|
|
||
| from uipath.agent.models.agent import BaseAgentResourceConfig | ||
|
|
||
| from uipath_langchain.agent.react.init_context_registry import ( | ||
| register_init_context_provider, | ||
| ) | ||
|
|
||
| from .datafabric_tool import ( | ||
| create_datafabric_query_tool, | ||
| fetch_entity_schemas, | ||
| get_datafabric_contexts, | ||
| get_datafabric_entity_identifiers_from_resources, | ||
| ) | ||
| from .schema_context import format_schemas_for_context | ||
|
|
||
| __all__ = [ | ||
| "create_datafabric_query_tool", | ||
| "fetch_entity_schemas", | ||
| "format_schemas_for_context", | ||
| "get_datafabric_contexts", | ||
| "get_datafabric_entity_identifiers_from_resources", | ||
| ] | ||
|
|
||
| _logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| # --- Init-time context self-registration --- | ||
|
|
||
|
|
||
| async def _datafabric_init_context_provider( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move this to datafabric_tool rather than exposing it as a module method. |
||
| resources: Sequence[BaseAgentResourceConfig], | ||
| ) -> str | None: | ||
| """Fetch and format DataFabric entity schemas for system prompt injection.""" | ||
| entity_identifiers = get_datafabric_entity_identifiers_from_resources(resources) | ||
| if not entity_identifiers: | ||
| return None | ||
|
|
||
| _logger.info( | ||
| "Fetching Data Fabric schemas for %d identifier(s)", | ||
| len(entity_identifiers), | ||
| ) | ||
| entities = await fetch_entity_schemas(entity_identifiers) | ||
| return format_schemas_for_context(entities) | ||
|
|
||
|
|
||
| register_init_context_provider("datafabric", _datafabric_init_context_provider) | ||
|
Comment on lines
+34
to
+50
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant whitespaces. Can you run ruff with whitespace check if there is a rule for that. Dont like these spaces Agents generate