Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 121 additions & 12 deletions python/lib/sift_client/_internal/low_level_wrappers/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import hashlib
import logging
from collections import namedtuple
from typing import TYPE_CHECKING, Iterable, cast
from typing import TYPE_CHECKING, Any, Iterable, cast

from sift.ingestion_configs.v2.ingestion_configs_pb2 import (
ListIngestionConfigFlowsRequest,
Expand All @@ -32,6 +32,9 @@

logger = logging.getLogger(__name__)

DEFAULT_INGESTION_CONFIG_PAGE_SIZE = 100
"""Default page size for ingestion config and flow list calls (flow configs can be large)."""

if TYPE_CHECKING:
from datetime import datetime

Expand Down Expand Up @@ -94,26 +97,132 @@ def __init__(self, grpc_client: GrpcClient):
"""
super().__init__(grpc_client=grpc_client)

async def get_ingestion_config_flows(self, ingestion_config_id: str) -> list[FlowConfig]:
"""Get the flows for an ingestion config."""
async def list_ingestion_configs(
self,
filter_query: str | None = None,
page_size: int | None = DEFAULT_INGESTION_CONFIG_PAGE_SIZE,
page_token: str | None = None,
order_by: str | None = None,
) -> tuple[list[IngestionConfig], str]:
"""List ingestion configs (single page).

Args:
filter_query: The CEL filter query.
page_size: Number of results per page.
page_token: Token for the next page.
order_by: Unused; accepted for _handle_pagination compatibility.

Returns:
A tuple of (list of IngestionConfig, next_page_token).
"""
request_kwargs: dict[str, Any] = {}

if page_size is not None:
request_kwargs["page_size"] = page_size
if page_token is not None:
request_kwargs["page_token"] = page_token
if filter_query is not None:
request_kwargs["filter"] = filter_query
if order_by is not None:
request_kwargs["order_by"] = order_by

request = ListIngestionConfigsRequest(**request_kwargs)
res = await self._grpc_client.get_stub(IngestionConfigServiceStub).ListIngestionConfigs(
request
)
res = cast("ListIngestionConfigsResponse", res)
configs = [IngestionConfig._from_proto(config) for config in res.ingestion_configs]
return configs, res.next_page_token

async def list_all_ingestion_configs(
self,
filter_query: str | None = None,
page_size: int | None = DEFAULT_INGESTION_CONFIG_PAGE_SIZE,
max_results: int | None = None,
) -> list[IngestionConfig]:
"""List all ingestion configs matching the filter, using pagination.

Args:
filter_query: The CEL filter query.
page_size: Number of results per page.
max_results: Maximum total results to return; None for no limit.

Returns:
A list of all matching IngestionConfigs.
"""
return await self._handle_pagination(
self.list_ingestion_configs,
kwargs={"filter_query": filter_query},
page_size=page_size,
max_results=max_results,
)

async def list_ingestion_config_flows(
self,
ingestion_config_id: str,
page_size: int | None = DEFAULT_INGESTION_CONFIG_PAGE_SIZE,
page_token: str | None = None,
order_by: str | None = None,
filter_query: str | None = None,
) -> tuple[list[FlowConfig], str]:
"""List ingestion config flows (single page).

Args:
ingestion_config_id: The ingestion config ID.
page_size: Number of results per page.
page_token: Token for the next page.
order_by: Unused; accepted for _handle_pagination compatibility.
filter_query: Optional CEL filter for flows.

Returns:
A tuple of (list of FlowConfig, next_page_token).
"""
request_kwargs: dict[str, Any] = {"ingestion_config_id": ingestion_config_id}

if page_size is not None:
request_kwargs["page_size"] = page_size
if page_token is not None:
request_kwargs["page_token"] = page_token
if filter_query is not None:
request_kwargs["filter"] = filter_query
if order_by is not None:
request_kwargs["order_by"] = order_by

request = ListIngestionConfigFlowsRequest(**request_kwargs)
res = await self._grpc_client.get_stub(IngestionConfigServiceStub).ListIngestionConfigFlows(
ListIngestionConfigFlowsRequest(ingestion_config_id=ingestion_config_id)
request
)
res = cast("ListIngestionConfigFlowsResponse", res)
return [FlowConfig._from_proto(flow) for flow in res.flows]
flows = [FlowConfig._from_proto(flow) for flow in res.flows]
return flows, res.next_page_token

async def list_ingestion_configs(self, filter_query: str) -> list[IngestionConfig]:
"""List ingestion configs."""
res = await self._grpc_client.get_stub(IngestionConfigServiceStub).ListIngestionConfigs(
ListIngestionConfigsRequest(filter=filter_query)
async def get_ingestion_config_flows(
self,
ingestion_config_id: str,
page_size: int | None = DEFAULT_INGESTION_CONFIG_PAGE_SIZE,
max_results: int | None = None,
) -> list[FlowConfig]:
"""Get all flows for an ingestion config, using pagination.

Args:
ingestion_config_id: The ingestion config ID.
page_size: Number of results per page.
max_results: Maximum total results to return; None for no limit.

Returns:
A list of all FlowConfigs for the ingestion config.
"""
return await self._handle_pagination(
self.list_ingestion_config_flows,
kwargs={"ingestion_config_id": ingestion_config_id},
page_size=page_size,
max_results=max_results,
)
res = cast("ListIngestionConfigsResponse", res)
return [IngestionConfig._from_proto(config) for config in res.ingestion_configs]

async def get_ingestion_config_id_from_client_key(self, client_key: str) -> str | None:
"""Get the ingestion config id."""
filter_query = cel.equals("client_key", client_key)
ingestion_configs = await self.list_ingestion_configs(filter_query)
ingestion_configs = await self.list_all_ingestion_configs(filter_query)
if not ingestion_configs:
return None
if len(ingestion_configs) > 1:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async def test_get_ingestion_config_flows(ingestion_low_level_client, sift_clien
"""
# First, we need to find an ingestion config to test with
# We'll list ingestion configs and use the first one's client_key
ingestion_configs = await ingestion_low_level_client.list_ingestion_configs("")
ingestion_configs = await ingestion_low_level_client.list_all_ingestion_configs()

if not ingestion_configs:
pytest.skip("No ingestion configs available for testing")
Expand Down
Loading