Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions python/lib/sift_client/_internal/low_level_wrappers/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from typing import TYPE_CHECKING, Any, cast

from sift.channels.v3.channels_pb2 import (
BatchArchiveChannelsRequest,
BatchUnarchiveChannelsRequest,
GetChannelRequest,
GetChannelResponse,
ListChannelsRequest,
Expand Down Expand Up @@ -117,3 +119,21 @@ async def list_all_channels(
order_by=order_by,
max_results=max_results,
)

async def batch_archive_channels(self, channel_ids: list[str]) -> None:
"""Batch archive channels by setting active to false.

Args:
channel_ids: The channel IDs to archive.
"""
request = BatchArchiveChannelsRequest(channel_ids=channel_ids)
await self._grpc_client.get_stub(ChannelServiceStub).BatchArchiveChannels(request)

async def batch_unarchive_channels(self, channel_ids: list[str]) -> None:
"""Batch unarchive channels by setting active to true.

Args:
channel_ids: The channel IDs to unarchive.
"""
request = BatchUnarchiveChannelsRequest(channel_ids=channel_ids)
await self._grpc_client.get_stub(ChannelServiceStub).BatchUnarchiveChannels(request)
83 changes: 68 additions & 15 deletions python/lib/sift_client/_tests/resources/test_channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
- Error handling and edge cases
"""

import asyncio
import uuid
from urllib.parse import urljoin

import pytest
import requests

from sift_client import SiftClient
from sift_client.resources import ChannelsAPI, ChannelsAPIAsync
Expand Down Expand Up @@ -183,21 +188,6 @@ async def test_list_with_limit(self, channels_api_async):
assert isinstance(channels_3, list)
assert len(channels_3) <= 3

# TODO: active channel test
# @pytest.mark.asyncio
# async def test_list_include_archived(self, channels_api_async):
# """Test channel listing with archived channels included."""
# # Test without archived channels (default)
# channels_active = await channels_api_async.list_(limit=5, include_archived=False)
# assert isinstance(channels_active, list)
#
# # Test with archived channels included
# channels_all = await channels_api_async.list_(limit=5, include_archived=True)
# assert isinstance(channels_all, list)
#
# # Should have at least as many channels when including archived
# assert len(channels_all) >= len(channels_active)

@pytest.mark.asyncio
async def test_list_with_time_filters(self, channels_api_async):
"""Test channel listing with time-based filters."""
Expand Down Expand Up @@ -240,6 +230,69 @@ async def test_find_multiple_raises_error(self, channels_api_async):
with pytest.raises(ValueError, match="Multiple"):
await channels_api_async.find(name_contains="test", limit=5)

class TestArchive:
"""Tests for the async archive method."""

@pytest.mark.asyncio
async def test_create_archive_unarchive_flow(self, channels_api_async, test_channel):
"""Create a channel via REST schemaless ingest, then archive/unarchive via channels API; verify at each step with find."""
asset_name = test_channel.asset.name
asset_id = test_channel.asset_id
unique_name = f"archive-test-channel-{uuid.uuid4().hex}"

rest_client = channels_api_async.client.rest_client
rest_url = urljoin(rest_client.base_url, "api/v2/ingest")
api_key = rest_client._config.api_key

# Create the channel by ingesting a single data point (schemaless).
#
# This is currently the simplest way to create a channel. Simply
# creating a channel schema is not sufficient since schemaless channels
# that have no data are filtered out of the `ListChannels` response.
payload = {
"asset_name": asset_name,
"data": [
{
"timestamp": "2024-11-06T10:27:20-07:00",
"values": [
{"channel": unique_name, "value": 1},
],
}
],
}
resp = requests.post(
rest_url,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json=payload,
timeout=30,
)
resp.raise_for_status()

# Retry find until the channel is visible.
created = None
for _ in range(20):
created = await channels_api_async.find(name=unique_name, asset=asset_id)
if created is not None:
break
await asyncio.sleep(0.5)
assert created is not None, f"Channel {unique_name} did not appear after ingest"

await channels_api_async.archive([created])
found_archived = await channels_api_async.find(
name=unique_name, asset=asset_id, archived=True
)
assert found_archived is not None

await channels_api_async.unarchive([created])
found_active = await channels_api_async.find(name=unique_name, asset=asset_id)
assert found_active is not None

# Cleanup by archiving the channel again
await channels_api_async.archive([created])

# TODO: data retrieval tests
# class TestGetData:
# """Tests for the async get_data method."""
Expand Down
54 changes: 49 additions & 5 deletions python/lib/sift_client/resources/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,30 @@
from sift_client.sift_types.channel import Channel


def _channel_ids_from_list(items: list[str | Channel]) -> list[str]:
"""Resolve a list of channel IDs or Channel objects to a list of channel IDs.

Args:
items: List of channel IDs (str) or Channel objects.

Returns:
List of channel ID strings.

Raises:
ValueError: If any Channel object has no id set.
"""
ids: list[str] = []
for item in items:
if isinstance(item, str):
ids.append(item)
else:
try:
ids.append(item._id_or_error)
except ValueError:
raise ValueError("One or more Channel objects have no id set.") from None
return ids


class ChannelsAPIAsync(ResourceBase):
"""High-level API for interacting with channels.

Expand Down Expand Up @@ -75,7 +99,7 @@ async def list_(
run: Run | str | None = None,
# common filters
description_contains: str | None = None,
include_archived: bool | None = None,
archived: bool | None = None,
filter_query: str | None = None,
order_by: str | None = None,
limit: int | None = None,
Expand All @@ -96,7 +120,7 @@ async def list_(
assets: Filter channels associated with these Assets or asset IDs.
run: Filter channels associated with this Run or run ID.
description_contains: Partial description of the channel.
include_archived: If True, include archived channels in results.
archived: If True, searches for archived channels.
filter_query: Explicit CEL query to filter channels.
order_by: Field and direction to order results by.
limit: Maximum number of channels to return. If None, returns all matches.
Expand All @@ -117,7 +141,6 @@ async def list_(
*self._build_common_cel_filters(
description_contains=description_contains,
filter_query=filter_query,
include_archived=include_archived,
),
]
if channel_ids:
Expand All @@ -133,9 +156,10 @@ async def list_(
if run is not None:
run_id = run.id_ if isinstance(run, Run) else run
filter_parts.append(cel.equals("run_id", run_id))

# This is opposite of usual archived state
if include_archived is not None:
filter_parts.append(cel.equals("active", not include_archived))
if archived is not None:
filter_parts.append(cel.equals("active", not archived))

query_filter = cel.and_(*filter_parts)

Expand Down Expand Up @@ -163,6 +187,26 @@ async def find(self, **kwargs) -> Channel | None:
return channels[0]
return None

async def archive(self, channels: list[str | Channel]) -> None:
"""Batch archive channels by setting active to false.

Args:
channels: List of channel IDs or Channel objects to archive. If a Channel
has no id set, raises ValueError.
"""
channel_ids = _channel_ids_from_list(channels)
await self._low_level_client.batch_archive_channels(channel_ids)

async def unarchive(self, channels: list[str | Channel]) -> None:
"""Batch unarchive channels by setting active to true.

Args:
channels: List of channel IDs or Channel objects to unarchive. If a Channel
has no id set, raises ValueError.
"""
channel_ids = _channel_ids_from_list(channels)
await self._low_level_client.batch_unarchive_channels(channel_ids)

def _ensure_data_low_level_client(self):
"""Ensure that the data low level client is initialized. Separated out like this to not require large dependencies (pandas/pyarrow) for the client if not fetching data."""
if self._data_low_level_client is None:
Expand Down
22 changes: 20 additions & 2 deletions python/lib/sift_client/resources/sync_stubs/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,15 @@ class ChannelsAPI:
...

def _run(self, coro): ...
def archive(self, channels: list[str | Channel]) -> None:
"""Batch archive channels by setting active to false.

Args:
channels: List of channel IDs or Channel objects to archive. If a Channel
has no id set, raises ValueError.
"""
...

def find(self, **kwargs) -> Channel | None:
"""Find a single channel matching the given query. Takes the same arguments as `list`. If more than one channel is found,
raises an error.
Expand Down Expand Up @@ -484,7 +493,7 @@ class ChannelsAPI:
assets: list[str | Asset] | None = None,
run: Run | str | None = None,
description_contains: str | None = None,
include_archived: bool | None = None,
archived: bool | None = None,
filter_query: str | None = None,
order_by: str | None = None,
limit: int | None = None,
Expand All @@ -505,7 +514,7 @@ class ChannelsAPI:
assets: Filter channels associated with these Assets or asset IDs.
run: Filter channels associated with this Run or run ID.
description_contains: Partial description of the channel.
include_archived: If True, include archived channels in results.
archived: If True, searches for archived channels.
filter_query: Explicit CEL query to filter channels.
order_by: Field and direction to order results by.
limit: Maximum number of channels to return. If None, returns all matches.
Expand All @@ -515,6 +524,15 @@ class ChannelsAPI:
"""
...

def unarchive(self, channels: list[str | Channel]) -> None:
"""Batch unarchive channels by setting active to true.

Args:
channels: List of channel IDs or Channel objects to unarchive. If a Channel
has no id set, raises ValueError.
"""
...

class FileAttachmentsAPI:
"""Sync counterpart to `FileAttachmentsAPIAsync`.

Expand Down
Loading