-
Notifications
You must be signed in to change notification settings - Fork 8
FD-74: Add channels(ingest and fetch) and rules to client. #275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Changes from all commits
Commits
Show all changes
24 commits
Select commit
Hold shift + click to select a range
8004001
FD-74: Add channels(ingest and fetch) and rules to client.
ian-sift fc42efd
Add arrow data type export. Clean up channel data cache function docs…
ian-sift 933af52
Make pyarrow pin more flexible to older python.
ian-sift c1c1cd3
ignore imports for pyarrow
ian-sift e8b36ff
Make ingestion run quickly without requiring wait_for_ingestion.
ian-sift 96fd272
Double underscore to single. Cleanup 2 todos
ian-sift 4779925
Ailin pr fb + discovered and fixed rule descriptions being reset if n…
ian-sift ba8c270
Split data and ingestion functionality to separate files. Other PR fb.
ian-sift 8bdefbf
Merge branch 'main' into FD-74
ian-sift f15339a
Update default recovery strategy.
ian-sift 103ef87
id -> id_. Use latest sift stream bindings types.
ian-sift 0935fdd
Time instead of datetimte for regulating metrics.
ian-sift a4e1375
Removed sync ingestion client. Fixed first time flow handling. ingest…
ian-sift da033fd
Rename stop event to deconflict w/ internal Threading attributes in e…
ian-sift 56bb8b9
Deps.
ian-sift 76e88fb
PR fb.
ian-sift 61a2b52
pr fb
ian-sift a3dd47e
mypy
ian-sift 1477841
fmt
ian-sift 3c8e26a
timer rename
ian-sift 9ccc075
Merge remote-tracking branch 'origin/main' into FD-74
ian-sift 3b63c20
lint
ian-sift 2697cbf
Require Flow on ingest()
ian-sift 30cc6b4
Remove type hint
ian-sift File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
127 changes: 127 additions & 0 deletions
127
python/lib/sift_client/_internal/low_level_wrappers/channels.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,127 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import logging | ||
| from typing import Any, cast | ||
|
|
||
| from sift.channels.v3.channels_pb2 import ( | ||
| GetChannelRequest, | ||
| GetChannelResponse, | ||
| ListChannelsRequest, | ||
| ListChannelsResponse, | ||
| ) | ||
| from sift.channels.v3.channels_pb2_grpc import ChannelServiceStub | ||
|
|
||
| from sift_client._internal.low_level_wrappers.base import LowLevelClientBase | ||
| from sift_client.transport import WithGrpcClient | ||
| from sift_client.transport.grpc_transport import GrpcClient | ||
| from sift_client.types.channel import Channel | ||
|
|
||
| # Configure logging | ||
| logger = logging.getLogger(__name__) | ||
|
|
||
| CHANNELS_DEFAULT_PAGE_SIZE = 10_000 | ||
|
|
||
|
|
||
| class ChannelsLowLevelClient(LowLevelClientBase, WithGrpcClient): | ||
| """ | ||
| Low-level client for the ChannelsAPI. | ||
|
|
||
| This class provides a thin wrapper around the autogenerated bindings for the ChannelsAPI. | ||
| """ | ||
|
|
||
| def __init__(self, grpc_client: GrpcClient): | ||
| """ | ||
| Initialize the ChannelsLowLevelClient. | ||
|
|
||
| Args: | ||
| grpc_client: The gRPC client to use for making API calls. | ||
| """ | ||
| super().__init__(grpc_client) | ||
|
|
||
| async def get_channel(self, channel_id: str) -> Channel: | ||
| """ | ||
| Get a channel by channel_id. | ||
|
|
||
| Args: | ||
| channel_id: The channel ID to get. | ||
|
|
||
| Returns: | ||
| The Channel. | ||
|
|
||
| Raises: | ||
| ValueError: If channel_id is not provided. | ||
| """ | ||
|
|
||
| request = GetChannelRequest(channel_id=channel_id) | ||
| response = await self._grpc_client.get_stub(ChannelServiceStub).GetChannel(request) | ||
| grpc_channel = cast(GetChannelResponse, response).channel | ||
| channel = Channel._from_proto(grpc_channel) | ||
| return channel | ||
|
|
||
| async def list_channels( | ||
| self, | ||
| *, | ||
| page_size: int | None = None, | ||
| page_token: str | None = None, | ||
|
alexluck-sift marked this conversation as resolved.
|
||
| query_filter: str | None = None, | ||
| order_by: str | None = None, | ||
| ) -> tuple[list[Channel], str]: | ||
| """ | ||
| List channels with optional filtering and pagination. | ||
|
|
||
| Args: | ||
| page_size: The maximum number of channels to return. | ||
| page_token: A page token for pagination. | ||
| query_filter: A CEL filter string. | ||
| order_by: How to order the retrieved channels. | ||
|
|
||
| Returns: | ||
| A tuple of (channels, next_page_token). | ||
| """ | ||
|
|
||
| request_kwargs: dict[str, Any] = {} | ||
| if query_filter: | ||
| request_kwargs["filter"] = query_filter | ||
| if order_by: | ||
| request_kwargs["order_by"] = order_by | ||
| if page_size: | ||
| request_kwargs["page_size"] = page_size | ||
| if page_token: | ||
| request_kwargs["page_token"] = page_token | ||
|
|
||
| request = ListChannelsRequest(**request_kwargs) | ||
| response = await self._grpc_client.get_stub(ChannelServiceStub).ListChannels(request) | ||
| response = cast(ListChannelsResponse, response) | ||
|
|
||
| channels = [Channel._from_proto(channel) for channel in response.channels] | ||
| return channels, response.next_page_token | ||
|
|
||
| async def list_all_channels( | ||
| self, | ||
| *, | ||
| query_filter: str | None = None, | ||
| order_by: str | None = None, | ||
| max_results: int | None = None, | ||
|
alexluck-sift marked this conversation as resolved.
|
||
| ) -> list[Channel]: | ||
| """ | ||
| List all channels with optional filtering. | ||
|
|
||
| Args: | ||
| query_filter: A CEL filter string. | ||
| order_by: How to order the retrieved channels. | ||
| max_results: Maximum number of results to return. | ||
|
|
||
| Returns: | ||
| A list of all matching channels. | ||
| """ | ||
| # Channels default page size is 10,000 so lower it if we're passing max_results | ||
| page_size = None | ||
| if max_results is not None and max_results <= CHANNELS_DEFAULT_PAGE_SIZE: | ||
| page_size = max_results | ||
| return await self._handle_pagination( | ||
| self.list_channels, | ||
| kwargs={"query_filter": query_filter}, | ||
| page_size=page_size, | ||
| order_by=order_by, | ||
| max_results=max_results, | ||
| ) | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.