Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
fba19ce
python(feature): sift_client low level wrapper for exports
wei-qlu Mar 7, 2026
c3a40de
python(feat): data export api for sift_client
wei-qlu Mar 10, 2026
82f7138
python(fix): updated sift_type export docstrings to handle channel_re…
wei-qlu Mar 10, 2026
3fb1962
python(fix): enforce channel_ids or calculated_channel_config for exp…
wei-qlu Mar 11, 2026
894a70d
python(fix): rename internal grpc module to _grpc to aovid shadowing …
wei-qlu Mar 12, 2026
e3aab9b
python(fix): add input validation and job status checks to export met…
wei-qlu Mar 12, 2026
f86a3fc
python(fix): added timestamp checks and unit tests for exports
wei-qlu Mar 12, 2026
bccbb7c
python(fix): updated docstring args to match export options in UI
wei-qlu Mar 12, 2026
5547dd3
python(fix): added assert for calc configs result
wei-qlu Mar 12, 2026
8a9f983
python(fix): updated sync stubs
wei-qlu Mar 12, 2026
99536f4
python(refactor): exports API accepts domain objects alongisde raw IDs
wei-qlu Mar 15, 2026
32fae43
python(refactor): exports API returns job, separate high-level/low-le…
wei-qlu Mar 15, 2026
5698a6a
python(fix): low-level-client accepts ExportOutputFormat enum
wei-qlu Mar 15, 2026
8f0afb3
python(fix): linting
wei-qlu Mar 15, 2026
42d20b1
python(refactor): merge low-level export methods into single method, …
wei-qlu Mar 16, 2026
078d401
python(fix): add assertions for datetime to resolve mypy errors
wei-qlu Mar 16, 2026
7ec7792
linting
wei-qlu Mar 16, 2026
96c3719
python(refactor): remove redundant code
wei-qlu Mar 16, 2026
fd67bbb
python(refactor): removed use_legacy_format as a possible field for e…
wei-qlu Mar 16, 2026
e6422c7
python(feat): return file path from export job, update test
wei-qlu Mar 17, 2026
1d7be3f
python(fix): resolve calc channel name-based identifiers to uuid for …
wei-qlu Mar 17, 2026
dc2f0d7
python(fix): add assertions for export tests
wei-qlu Mar 17, 2026
5bb216e
linting
wei-qlu Mar 17, 2026
c987db0
python(refactor): simplified resolve_calculated_channel logic
wei-qlu Mar 17, 2026
00b5807
mypy fix
wei-qlu Mar 17, 2026
19546ff
python(fix): update export tests to check equality
wei-qlu Mar 18, 2026
6b47152
pyright
wei-qlu Mar 18, 2026
c2c9134
python(fix): return paths to extracted_files in wait_until_complete
wei-qlu Mar 18, 2026
3404e7f
python(refactor): rename wait_until_complete to download_when_complet…
wei-qlu Mar 18, 2026
d7d9e21
python(refactor): rename to wait_and_download and regenerate stubs
wei-qlu Mar 18, 2026
4b09d4f
python(refactor): revert _grpc to grpc and updated stubs
wei-qlu Mar 18, 2026
c368af5
python(fix): updated unit tests to use the renamed function
wei-qlu Mar 18, 2026
c9e6309
python(refactor): shared _export helper to deduplicate export methods
wei-qlu Mar 18, 2026
be77f1a
python(fix): use asyncio.get_running_loop() instead of deprecated get…
wei-qlu Mar 18, 2026
128dfff
python(fix): add integration tests and combined duplicate unit tests
wei-qlu Mar 19, 2026
cc2ab5b
pyright fix
wei-qlu Mar 19, 2026
763e40f
move _resolve_calculated_channels and download_and_extract_zip to _in…
wei-qlu Mar 19, 2026
fe6d4fe
add extract parameter to wait_and_download can keep the zip without e…
wei-qlu Mar 19, 2026
d15f661
refactored export methods to a single entry point
wei-qlu Mar 19, 2026
722d61a
mypy fix
wei-qlu Mar 19, 2026
58dc781
python(refactor): use rest_client instead of raw request
wei-qlu Mar 19, 2026
2b4e84f
python(refactor): added dict support for calc channels
wei-qlu Mar 19, 2026
cab9888
python(refactor): class rename to DataExportsAPI
wei-qlu Mar 19, 2026
5ea51ea
mypy fix
wei-qlu Mar 19, 2026
d724ebc
pyright fix
wei-qlu Mar 19, 2026
b7705ed
timeout increased
wei-qlu Mar 19, 2026
8aebbc5
python(refactor): split up the download_and_extract function and rena…
wei-qlu Mar 20, 2026
63acd04
python(fix): scoped the integration export jobs to 10s
wei-qlu Mar 20, 2026
7e0fbba
python(fix): add sync wrapper tests for run_in_executer loop scenarios
wei-qlu Mar 20, 2026
058fcaa
python(refactor): add a run_sync_function util for running blocking c…
wei-qlu Mar 20, 2026
7bbbda0
python(refactor): move wait_and_download to JobsAPI and add the metho…
wei-qlu Mar 20, 2026
80cac44
python(fix): scoped the export_by_asset test to use one channel
wei-qlu Mar 21, 2026
25ae348
python(fix): updated export_by_asset integration tests with ingested …
wei-qlu Mar 21, 2026
eb9f9c5
python(refactor): removed redundant tests, following existing test st…
wei-qlu Mar 23, 2026
1a26784
python(refactor): renamed arguments to be more descriptive, added try…
wei-qlu Mar 23, 2026
eee4b52
python(refactor): use run_sync_function util instead of reimplementin…
wei-qlu Mar 23, 2026
9a17655
python(refactor): allow wait_and_download to handle non-zip files
wei-qlu Mar 23, 2026
040b5c8
python(refactor): moved job parameter to a positional argument
wei-qlu Mar 23, 2026
e971623
python(refactor): relocate tests to proper scoping
wei-qlu Mar 23, 2026
3c92f8e
python(refactor): use ingestionAPI instead of raw HTTP in export test…
wei-qlu Mar 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions python/lib/sift_client/_internal/low_level_wrappers/exports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
from __future__ import annotations

from typing import TYPE_CHECKING, cast

from sift.calculated_channels.v2.calculated_channels_pb2 import (
CalculatedChannelAbstractChannelReference,
)
from sift.exports.v1.exports_pb2 import (
AssetsAndTimeRange,
CalculatedChannelConfig,
ExportDataRequest,
ExportDataResponse,
ExportOptions,
GetDownloadUrlRequest,
GetDownloadUrlResponse,
RunsAndTimeRange,
TimeRange,
)
from sift.exports.v1.exports_pb2_grpc import ExportServiceStub

from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
from sift_client._internal.util.timestamp import to_pb_timestamp
from sift_client.sift_types.calculated_channel import CalculatedChannel, CalculatedChannelCreate
from sift_client.transport import WithGrpcClient

if TYPE_CHECKING:
from datetime import datetime

from sift_client.sift_types.export import ExportOutputFormat
from sift_client.transport.grpc_transport import GrpcClient


def _build_calc_channel_configs(
calculated_channels: list[CalculatedChannel | CalculatedChannelCreate] | None,
) -> list[CalculatedChannelConfig]:
"""Convert high-level calculated channel objects to proto CalculatedChannelConfig messages."""
if not calculated_channels:
return []
configs = []
for cc in calculated_channels:
if isinstance(cc, CalculatedChannelCreate):
refs = cc.expression_channel_references or []
else:
refs = cc.channel_references
configs.append(
CalculatedChannelConfig(
name=cc.name,
expression=cc.expression or "",
channel_references=[
CalculatedChannelAbstractChannelReference(
channel_reference=ref.channel_reference,
channel_identifier=ref.channel_identifier,
)
for ref in refs
],
units=cc.units,
)
)
return configs


class ExportsLowLevelClient(LowLevelClientBase, WithGrpcClient):
"""Low-level client for the DataExportAPI.

This class provides a thin wrapper around the autogenerated gRPC bindings for the DataExportAPI.
"""

def __init__(self, grpc_client: GrpcClient):
"""Initialize the ExportsLowLevelClient.

Args:
grpc_client: The gRPC client to use for making API calls.
"""
super().__init__(grpc_client)

async def export_data(
self,
*,
output_format: ExportOutputFormat,
run_ids: list[str] | None = None,
asset_ids: list[str] | None = None,
start_time: datetime | None = None,
stop_time: datetime | None = None,
channel_ids: list[str] | None = None,
calculated_channels: list[CalculatedChannel | CalculatedChannelCreate] | None = None,
simplify_channel_names: bool = False,
combine_runs: bool = False,
split_export_by_asset: bool = False,
split_export_by_run: bool = False,
) -> str:
"""Initiate a data export.

Builds the ExportDataRequest proto and makes the gRPC call.
Sets whichever time_selection oneof fields are provided
(run_ids, asset_ids, or time range); the server validates
the request.

Returns:
The job ID for the background export.
"""
request = ExportDataRequest(
output_format=output_format.value,
export_options=ExportOptions(
use_legacy_format=False,
simplify_channel_names=simplify_channel_names,
combine_runs=combine_runs,
split_export_by_asset=split_export_by_asset,
split_export_by_run=split_export_by_run,
),
channel_ids=channel_ids or [],
calculated_channel_configs=_build_calc_channel_configs(calculated_channels),
)

if run_ids is not None:
runs_and_time_range = RunsAndTimeRange(run_ids=run_ids)
if start_time:
runs_and_time_range.start_time.CopyFrom(to_pb_timestamp(start_time))
if stop_time:
runs_and_time_range.stop_time.CopyFrom(to_pb_timestamp(stop_time))
request.runs_and_time_range.CopyFrom(runs_and_time_range)

if asset_ids is not None:
assets_and_time_range = AssetsAndTimeRange(asset_ids=asset_ids)
if start_time:
assets_and_time_range.start_time.CopyFrom(to_pb_timestamp(start_time))
if stop_time:
assets_and_time_range.stop_time.CopyFrom(to_pb_timestamp(stop_time))
request.assets_and_time_range.CopyFrom(assets_and_time_range)

if run_ids is None and asset_ids is None:
time_range = TimeRange()
if start_time:
time_range.start_time.CopyFrom(to_pb_timestamp(start_time))
if stop_time:
time_range.stop_time.CopyFrom(to_pb_timestamp(stop_time))
request.time_range.CopyFrom(time_range)

response = await self._grpc_client.get_stub(ExportServiceStub).ExportData(request)
response = cast("ExportDataResponse", response)
return response.job_id

async def get_download_url(self, job_id: str) -> str:
"""Get the download URL for a background export job.

Args:
job_id: The job ID returned from export_data.

Returns:
The presigned URL to download the exported zip file.
"""
request = GetDownloadUrlRequest(job_id=job_id)
response = await self._grpc_client.get_stub(ExportServiceStub).GetDownloadUrl(request)
response = cast("GetDownloadUrlResponse", response)
return response.presigned_url
54 changes: 54 additions & 0 deletions python/lib/sift_client/_internal/util/channels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from sift_client.sift_types.calculated_channel import CalculatedChannel, CalculatedChannelCreate
from sift_client.sift_types.channel import ChannelReference

if TYPE_CHECKING:
from sift_client.resources.channels import ChannelsAPIAsync


async def resolve_calculated_channels(
calculated_channels: list[CalculatedChannel | CalculatedChannelCreate] | None,
channels_api: ChannelsAPIAsync,
) -> list[CalculatedChannel | CalculatedChannelCreate] | None:
"""Resolve channel reference identifiers from names to UUIDs.

For each channel reference, looks up the identifier as a channel name.
If found, replaces it with the channel's UUID. If not found, assumes
the identifier is already a UUID and keeps it as-is.
"""
if not calculated_channels:
return None

resolved: list[CalculatedChannel | CalculatedChannelCreate] = []
for cc in calculated_channels:
refs = (
(cc.expression_channel_references or [])
if isinstance(cc, CalculatedChannelCreate)
else cc.channel_references
)

resolved_refs: list[ChannelReference] = []
for ref in refs:
channel = await channels_api.find(
name=ref.channel_identifier,
assets=cc.asset_ids,
)
if channel is not None:
ref = ChannelReference(
channel_reference=ref.channel_reference,
channel_identifier=channel._id_or_error,
)
resolved_refs.append(ref)

resolved.append(
CalculatedChannelCreate(
name=cc.name,
expression=cc.expression,
expression_channel_references=resolved_refs,
units=cc.units or None,
)
)
return resolved
10 changes: 10 additions & 0 deletions python/lib/sift_client/_internal/util/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from __future__ import annotations

import asyncio
from typing import Any, Callable


async def run_sync_function(fn: Callable[..., Any], *args: Any) -> Any:
"""Run a synchronous function in a thread pool to avoid blocking the event loop."""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, fn, *args)
63 changes: 63 additions & 0 deletions python/lib/sift_client/_internal/util/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from __future__ import annotations

import warnings
import zipfile
from typing import TYPE_CHECKING

from sift_client.errors import SiftWarning

if TYPE_CHECKING:
from pathlib import Path

from sift_client.transport.rest_transport import RestClient


def download_file(signed_url: str, output_path: Path, *, rest_client: RestClient) -> Path:
"""Download a file from a URL in streaming 4 MiB chunks.

Args:
url: The URL to download from.
dest: Path where the file will be saved. Parent directories are created if needed.
rest_client: The SDK rest client to use for the download.

Returns:
The path to the downloaded file.

Raises:
requests.HTTPError: If the download request fails.
"""
output_path.parent.mkdir(parents=True, exist_ok=True)
# Strip the session's default Authorization header, presigned URLs carry their own auth
with rest_client.get(signed_url, stream=True, headers={"Authorization": None}) as response:
response.raise_for_status()
with output_path.open("wb") as file:
for chunk in response.iter_content(chunk_size=4194304): # 4 MiB
if chunk:
file.write(chunk)
return output_path


def extract_zip(zip_path: Path, output_dir: Path, *, delete_zip: bool = True) -> list[Path]:
"""Extract a zip file to a directory.

Args:
zip_path: Path to the zip file.
output_dir: Directory to extract contents into. Created if it doesn't exist.
delete_zip: If True (default), delete the zip file after extraction.

Returns:
List of paths to the extracted files (excludes directories).

Raises:
zipfile.BadZipFile: If the file is not a valid zip.
"""
output_dir.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as zip_file:
names = zip_file.namelist()
zip_file.extractall(output_dir)
if delete_zip:
try:
zip_path.unlink()
except OSError:
warnings.warn(f"Failed to delete zip file '{zip_path}'", SiftWarning, stacklevel=2)
return [output_dir / name for name in names if not name.endswith("/")]
58 changes: 58 additions & 0 deletions python/lib/sift_client/_tests/_internal/test_channels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from __future__ import annotations

from unittest.mock import AsyncMock, MagicMock

import pytest

from sift_client._internal.util.channels import resolve_calculated_channels
from sift_client.sift_types.calculated_channel import (
CalculatedChannel,
CalculatedChannelCreate,
ChannelReference,
)
from sift_client.sift_types.channel import Channel


class TestResolveCalculatedChannels:
@pytest.mark.asyncio
async def test_none_passthrough(self):
api = MagicMock()
api.find = AsyncMock(return_value=None)
assert await resolve_calculated_channels(None, channels_api=api) is None

@pytest.mark.asyncio
async def test_resolves_name_to_uuid(self):
mock_ch = MagicMock(spec=Channel)
mock_ch._id_or_error = "resolved-uuid"
api = MagicMock()
api.find = AsyncMock(return_value=mock_ch)

cc = MagicMock(spec=CalculatedChannel)
cc.name, cc.expression, cc.units = "calc", "$1 + 10", "m/s"
cc.asset_ids = ["asset-1"]
cc.channel_references = [
ChannelReference(channel_reference="$1", channel_identifier="sensor.vel")
]

result = await resolve_calculated_channels([cc], channels_api=api)
assert result is not None
assert len(result) == 1
refs = result[0].expression_channel_references
assert refs is not None
assert refs[0].channel_identifier == "resolved-uuid"

@pytest.mark.asyncio
async def test_keeps_identifier_when_not_found(self):
api = MagicMock()
api.find = AsyncMock(return_value=None)
cc = CalculatedChannelCreate(
name="x",
expression="$1",
units="m",
expression_channel_references=[
ChannelReference(channel_reference="$1", channel_identifier="ch-1")
],
)
result = await resolve_calculated_channels([cc], channels_api=api)
assert result is not None
assert result[0] == cc
Loading
Loading