Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 212 additions & 0 deletions python/lib/sift_client/_internal/low_level_wrappers/data_imports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
from __future__ import annotations

import logging
from typing import TYPE_CHECKING, cast

from sift.data_imports.v2.data_imports_pb2 import (
CreateDataImportFromUploadRequest,
CreateDataImportFromUploadResponse,
CreateDataImportFromUrlRequest,
CreateDataImportFromUrlResponse,
DetectConfigRequest,
DetectConfigResponse,
GetDataImportRequest,
GetDataImportResponse,
ListDataImportsRequest,
ListDataImportsResponse,
RetryDataImportRequest,
)
from sift.data_imports.v2.data_imports_pb2_grpc import DataImportServiceStub

from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
from sift_client._internal.util.executor import run_sync_function
from sift_client.sift_types.data_import import CsvImportConfig, DataImport
from sift_client.transport import WithGrpcClient, WithRestClient

if TYPE_CHECKING:
from pathlib import Path

from sift.data_imports.v2.data_imports_pb2 import DataTypeKey

from sift_client.transport.grpc_transport import GrpcClient
from sift_client.transport.rest_transport import RestClient

# Union of all supported config types. Extend this as new formats are added.
ImportConfig = CsvImportConfig


def _set_config_on_request(
request: CreateDataImportFromUploadRequest | CreateDataImportFromUrlRequest,
config: ImportConfig,
) -> None:
"""Set the appropriate config field on a proto request based on the config type."""
if isinstance(config, CsvImportConfig):
request.csv_config.CopyFrom(config._to_proto())
else:
raise TypeError(f"Unsupported import config type: {type(config).__name__}")


logger = logging.getLogger(__name__)


class DataImportsLowLevelClient(LowLevelClientBase, WithGrpcClient, WithRestClient):
"""Low-level client for the DataImportService.

This class provides a thin wrapper around the autogenerated bindings for the DataImportsAPI.
"""

def __init__(self, grpc_client: GrpcClient, rest_client: RestClient):
WithGrpcClient.__init__(self, grpc_client=grpc_client)
WithRestClient.__init__(self, rest_client=rest_client)

async def create_from_upload(self, config: ImportConfig) -> tuple[str, str]:
"""Create a data import and get back a presigned upload URL.

Args:
config: The import configuration.

Returns:
A tuple of (data_import_id, upload_url).
"""
request = CreateDataImportFromUploadRequest()
_set_config_on_request(request, config)
response = await self._grpc_client.get_stub(
DataImportServiceStub
).CreateDataImportFromUpload(request)
response = cast("CreateDataImportFromUploadResponse", response)
return response.data_import_id, response.upload_url

async def upload_file(self, upload_url: str, file_path: Path) -> None:
"""Upload a file to a presigned URL.

Runs the synchronous HTTP POST in a thread pool to avoid blocking
the event loop.

Args:
upload_url: The presigned URL to upload to.
file_path: Path to the file to upload.
"""
rest_client = self._rest_client

def _do_upload() -> None:
with open(file_path, "rb") as f:
response = rest_client.post(upload_url, data=f)
response.raise_for_status()

await run_sync_function(_do_upload)

async def create_from_url(self, url: str, config: ImportConfig) -> str:
"""Create a data import from a remote URL.

Args:
url: The URL to import from (HTTP or S3).
config: The import configuration.

Returns:
The data_import_id.
"""
request = CreateDataImportFromUrlRequest(url=url)
_set_config_on_request(request, config)
response = await self._grpc_client.get_stub(DataImportServiceStub).CreateDataImportFromUrl(
request
)
response = cast("CreateDataImportFromUrlResponse", response)
return response.data_import_id

async def get(self, data_import_id: str) -> DataImport:
"""Get a data import by ID.

Args:
data_import_id: The ID of the data import.

Returns:
The DataImport.
"""
request = GetDataImportRequest(data_import_id=data_import_id)
response = await self._grpc_client.get_stub(DataImportServiceStub).GetDataImport(request)
response = cast("GetDataImportResponse", response)
return DataImport._from_proto(response.data_import)

async def list_(
self,
*,
page_size: int | None = None,
page_token: str | None = None,
query_filter: str = "",
order_by: str = "",
) -> tuple[list[DataImport], str]:
"""List data imports with optional filtering and pagination.

Args:
page_size: Maximum number of results per page.
page_token: Token for the next page of results.
query_filter: CEL filter string.
order_by: Ordering string (e.g. "created_date desc").

Returns:
A tuple of (list of DataImports, next_page_token).
"""
request = ListDataImportsRequest(
filter=query_filter,
order_by=order_by,
)
if page_size is not None:
request.page_size = page_size
if page_token:
request.page_token = page_token

response = await self._grpc_client.get_stub(DataImportServiceStub).ListDataImports(request)
response = cast("ListDataImportsResponse", response)
data_imports = [DataImport._from_proto(di) for di in response.data_imports]
return data_imports, response.next_page_token

async def list_all(
self,
*,
query_filter: str = "",
order_by: str = "",
max_results: int | None = None,
) -> list[DataImport]:
"""List all data imports, handling pagination automatically.

Args:
query_filter: CEL filter string.
order_by: Ordering string (e.g. "created_date desc").
max_results: Maximum total results to return.

Returns:
A list of all matching DataImports.
"""
return await self._handle_pagination(
func=self.list_,
kwargs={"query_filter": query_filter, "order_by": order_by},
max_results=max_results,
)

async def retry(self, data_import_id: str) -> None:
"""Retry a failed data import.

Only works for URL-based imports in a failed state.

Args:
data_import_id: The ID of the data import to retry.
"""
request = RetryDataImportRequest(data_import_id=data_import_id)
await self._grpc_client.get_stub(DataImportServiceStub).RetryDataImport(request)

async def detect_config(
self, data: bytes, data_type_key: DataTypeKey.ValueType
) -> DetectConfigResponse:
"""Call the DetectConfig RPC to auto-detect import configuration.

Args:
data: A sample of the file content (e.g. the first 64 KiB).
data_type_key: The file type hint.

Returns:
The raw DetectConfigResponse proto. The caller (resource API)
is responsible for converting to a sift_type.
"""
request = DetectConfigRequest(data=data, type=data_type_key)
response = await self._grpc_client.get_stub(DataImportServiceStub).DetectConfig(request)
return cast("DetectConfigResponse", response)
7 changes: 7 additions & 0 deletions python/lib/sift_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
ChannelsAPIAsync,
DataExportAPI,
DataExportAPIAsync,
DataImportAPI,
DataImportAPIAsync,
FileAttachmentsAPI,
FileAttachmentsAPIAsync,
IngestionAPIAsync,
Expand Down Expand Up @@ -110,6 +112,9 @@ class SiftClient(
data_export: DataExportAPI
"""Instance of the Data Export API for making synchronous requests."""

data_import: DataImportAPI
"""Instance of the Data Import API for making synchronous requests."""

async_: AsyncAPIs
"""Accessor for the asynchronous APIs. All asynchronous APIs are available as attributes on this accessor."""

Expand Down Expand Up @@ -159,6 +164,7 @@ def __init__(
self.tags = TagsAPI(self)
self.test_results = TestResultsAPI(self)
self.data_export = DataExportAPI(self)
self.data_import = DataImportAPI(self)

# Accessor for the asynchronous APIs
self.async_ = AsyncAPIs(
Expand All @@ -175,6 +181,7 @@ def __init__(
tags=TagsAPIAsync(self),
test_results=TestResultsAPIAsync(self),
data_export=DataExportAPIAsync(self),
data_import=DataImportAPIAsync(self),
)

@property
Expand Down
4 changes: 4 additions & 0 deletions python/lib/sift_client/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ async def main():
from sift_client.resources.runs import RunsAPIAsync
from sift_client.resources.tags import TagsAPIAsync
from sift_client.resources.test_results import TestResultsAPIAsync
from sift_client.resources.data_imports import DataImportAPIAsync
from sift_client.resources.exports import DataExportAPIAsync

# ruff: noqa All imports needs to be imported before sync_stubs to avoid circular import
Expand All @@ -178,6 +179,7 @@ async def main():
TestResultsAPI,
FileAttachmentsAPI,
DataExportAPI,
DataImportAPI,
)

import sys
Expand Down Expand Up @@ -215,4 +217,6 @@ async def main():
"TracingConfig",
"DataExportAPI",
"DataExportAPIAsync",
"DataImportAPI",
"DataImportAPIAsync",
]
Loading
Loading