Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
8004001
FD-74: Add channels(ingest and fetch) and rules to client.
ian-sift Jun 26, 2025
fc42efd
Add arrow data type export. Clean up channel data cache function docs…
ian-sift Jul 25, 2025
933af52
Make pyarrow pin more flexible to older python.
ian-sift Jul 25, 2025
c1c1cd3
ignore imports for pyarrow
ian-sift Jul 25, 2025
e8b36ff
Make ingestion run quickly without requiring wait_for_ingestion.
ian-sift Jul 26, 2025
96fd272
Double underscore to single. Cleanup 2 todos
ian-sift Jul 28, 2025
4779925
Ailin pr fb + discovered and fixed rule descriptions being reset if n…
ian-sift Aug 5, 2025
ba8c270
Split data and ingestion functionality to separate files. Other PR fb.
ian-sift Aug 6, 2025
8bdefbf
Merge branch 'main' into FD-74
ian-sift Aug 6, 2025
f15339a
Update default recovery strategy.
ian-sift Aug 6, 2025
103ef87
id -> id_. Use latest sift stream bindings types.
ian-sift Aug 11, 2025
0935fdd
Time instead of datetimte for regulating metrics.
ian-sift Aug 13, 2025
a4e1375
Removed sync ingestion client. Fixed first time flow handling. ingest…
ian-sift Aug 14, 2025
da033fd
Rename stop event to deconflict w/ internal Threading attributes in e…
ian-sift Aug 18, 2025
56bb8b9
Deps.
ian-sift Aug 19, 2025
76e88fb
PR fb.
ian-sift Aug 22, 2025
61a2b52
pr fb
ian-sift Aug 26, 2025
a3dd47e
mypy
ian-sift Aug 26, 2025
1477841
fmt
ian-sift Aug 26, 2025
3c8e26a
timer rename
ian-sift Aug 26, 2025
9ccc075
Merge remote-tracking branch 'origin/main' into FD-74
ian-sift Aug 27, 2025
3b63c20
lint
ian-sift Aug 27, 2025
2697cbf
Require Flow on ingest()
ian-sift Aug 28, 2025
30cc6b4
Remove type hint
ian-sift Aug 28, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python_ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- name: Pip install
run: |
python -m pip install --upgrade pip
pip install '.[development,openssl,tdms,rosbags,hdf5]'
pip install '.[development,openssl,tdms,rosbags,hdf5,sift-stream]'
- name: Lint
run: |
ruff check
Expand Down
12 changes: 12 additions & 0 deletions python/lib/sift_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,22 @@ async def get_asset_async():

"""

import logging
import sys

from sift_client.client import SiftClient
from sift_client.transport import SiftConnectionConfig

__all__ = [
"SiftClient",
"SiftConnectionConfig",
]

logger = logging.getLogger("sift_client")
logging.basicConfig(
level=logging.ERROR, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)


handler = logging.StreamHandler(sys.stdout)
logger.addHandler(handler)
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@
from sift_client._internal.low_level_wrappers.calculated_channels import (
CalculatedChannelsLowLevelClient,
)
from sift_client._internal.low_level_wrappers.channels import ChannelsLowLevelClient
from sift_client._internal.low_level_wrappers.ingestion import IngestionLowLevelClient
from sift_client._internal.low_level_wrappers.ping import PingLowLevelClient
from sift_client._internal.low_level_wrappers.rules import RulesLowLevelClient
from sift_client._internal.low_level_wrappers.runs import RunsLowLevelClient

__all__ = [
"AssetsLowLevelClient",
"CalculatedChannelsLowLevelClient",
"ChannelsLowLevelClient",
"IngestionLowLevelClient",
"PingLowLevelClient",
"RulesLowLevelClient",
"RunsLowLevelClient",
]
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from sift.calculated_channels.v2.calculated_channels_pb2_grpc import CalculatedChannelServiceStub

from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
from sift_client.transport.grpc_transport import GrpcClient
from sift_client.transport import GrpcClient, WithGrpcClient
from sift_client.types.calculated_channel import (
CalculatedChannel,
CalculatedChannelUpdate,
Expand All @@ -33,7 +33,7 @@
logger = logging.getLogger(__name__)


class CalculatedChannelsLowLevelClient(LowLevelClientBase):
class CalculatedChannelsLowLevelClient(LowLevelClientBase, WithGrpcClient):
"""
Low-level client for the CalculatedChannelsAPI.

Expand All @@ -47,7 +47,7 @@ def __init__(self, grpc_client: GrpcClient):
Args:
grpc_client: The gRPC client to use for making API calls.
"""
self._grpc_client = grpc_client
super().__init__(grpc_client)

async def get_calculated_channel(
self,
Expand Down
127 changes: 127 additions & 0 deletions python/lib/sift_client/_internal/low_level_wrappers/channels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from __future__ import annotations

import logging
from typing import Any, cast

from sift.channels.v3.channels_pb2 import (
GetChannelRequest,
GetChannelResponse,
ListChannelsRequest,
ListChannelsResponse,
)
from sift.channels.v3.channels_pb2_grpc import ChannelServiceStub

from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
from sift_client.transport import WithGrpcClient
from sift_client.transport.grpc_transport import GrpcClient
from sift_client.types.channel import Channel

# Configure logging
logger = logging.getLogger(__name__)

CHANNELS_DEFAULT_PAGE_SIZE = 10_000


class ChannelsLowLevelClient(LowLevelClientBase, WithGrpcClient):
"""
Low-level client for the ChannelsAPI.

This class provides a thin wrapper around the autogenerated bindings for the ChannelsAPI.
"""

def __init__(self, grpc_client: GrpcClient):
"""
Initialize the ChannelsLowLevelClient.

Args:
grpc_client: The gRPC client to use for making API calls.
"""
super().__init__(grpc_client)

async def get_channel(self, channel_id: str) -> Channel:
"""
Get a channel by channel_id.

Args:
channel_id: The channel ID to get.

Returns:
The Channel.

Raises:
ValueError: If channel_id is not provided.
"""

request = GetChannelRequest(channel_id=channel_id)
response = await self._grpc_client.get_stub(ChannelServiceStub).GetChannel(request)
grpc_channel = cast(GetChannelResponse, response).channel
channel = Channel._from_proto(grpc_channel)
return channel

async def list_channels(
self,
*,
Comment thread
alexluck-sift marked this conversation as resolved.
page_size: int | None = None,
page_token: str | None = None,
Comment thread
alexluck-sift marked this conversation as resolved.
query_filter: str | None = None,
order_by: str | None = None,
) -> tuple[list[Channel], str]:
"""
List channels with optional filtering and pagination.

Args:
page_size: The maximum number of channels to return.
page_token: A page token for pagination.
query_filter: A CEL filter string.
order_by: How to order the retrieved channels.

Returns:
A tuple of (channels, next_page_token).
"""

request_kwargs: dict[str, Any] = {}
if query_filter:
request_kwargs["filter"] = query_filter
if order_by:
request_kwargs["order_by"] = order_by
if page_size:
request_kwargs["page_size"] = page_size
if page_token:
request_kwargs["page_token"] = page_token

request = ListChannelsRequest(**request_kwargs)
response = await self._grpc_client.get_stub(ChannelServiceStub).ListChannels(request)
response = cast(ListChannelsResponse, response)

channels = [Channel._from_proto(channel) for channel in response.channels]
return channels, response.next_page_token

async def list_all_channels(
self,
*,
query_filter: str | None = None,
order_by: str | None = None,
max_results: int | None = None,
Comment thread
alexluck-sift marked this conversation as resolved.
) -> list[Channel]:
"""
List all channels with optional filtering.

Args:
query_filter: A CEL filter string.
order_by: How to order the retrieved channels.
max_results: Maximum number of results to return.

Returns:
A list of all matching channels.
"""
# Channels default page size is 10,000 so lower it if we're passing max_results
page_size = None
if max_results is not None and max_results <= CHANNELS_DEFAULT_PAGE_SIZE:
page_size = max_results
return await self._handle_pagination(
self.list_channels,
kwargs={"query_filter": query_filter},
page_size=page_size,
order_by=order_by,
max_results=max_results,
)
Loading
Loading