Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions .github/workflows/python_ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ jobs:
run: |
pytest -m "not integration"

#- name: Pytest Integration Tests
#env:
#SIFT_GRPC_URI: ${{ vars.SIFT_GRPC_URI }}
#SIFT_REST_URI: ${{ vars.SIFT_REST_URI }}
#SIFT_API_KEY: ${{ secrets.SIFT_API_KEY }}
#run: |
#pytest -m "integration"
- name: Pytest Integration Tests
env:
SIFT_GRPC_URI: ${{ vars.SIFT_GRPC_URI }}
SIFT_REST_URI: ${{ vars.SIFT_REST_URI }}
SIFT_API_KEY: ${{ secrets.SIFT_API_KEY }}
run: |
pytest -m "integration"

- name: Sync Stubs Mypy
working-directory: python/lib
Expand Down
5 changes: 3 additions & 2 deletions python/lib/sift_client/_internal/low_level_wrappers/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from sift.assets.v1.assets_pb2_grpc import AssetServiceStub

from sift_client._internal.low_level_wrappers.base import (
DEFAULT_PAGE_SIZE,
LowLevelClientBase,
)
from sift_client.sift_types.asset import Asset, AssetUpdate
Expand Down Expand Up @@ -46,7 +47,7 @@ async def list_all_assets(
query_filter: str | None = None,
order_by: str | None = None,
max_results: int | None = None,
page_size: int | None = None,
page_size: int | None = DEFAULT_PAGE_SIZE,
) -> list[Asset]:
"""List all results matching the given query.

Expand All @@ -70,7 +71,7 @@ async def list_all_assets(

async def list_assets(
self,
page_size: int | None = None,
page_size: int | None = DEFAULT_PAGE_SIZE,
page_token: str | None = None,
query_filter: str | None = None,
order_by: str | None = None,
Expand Down
8 changes: 8 additions & 0 deletions python/lib/sift_client/_internal/low_level_wrappers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from abc import ABC
from typing import Any, Callable

DEFAULT_PAGE_SIZE = 1000
"""Default page size to use for pagination."""


class LowLevelClientBase(ABC):
@staticmethod
Expand Down Expand Up @@ -35,6 +38,11 @@ async def _handle_pagination(
return results
if page_token is None:
page_token = ""

# No point in querying more results than needed if limited by max_results.
if max_results is not None and page_size is not None and page_size > max_results:
page_size = max_results

while True:
if max_results is not None and len(results) >= max_results:
break
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
)
from sift.calculated_channels.v2.calculated_channels_pb2_grpc import CalculatedChannelServiceStub

from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
from sift_client._internal.low_level_wrappers.base import DEFAULT_PAGE_SIZE, LowLevelClientBase
from sift_client.sift_types.calculated_channel import (
CalculatedChannel,
CalculatedChannelCreate,
Expand Down Expand Up @@ -91,7 +91,7 @@ async def list_all_calculated_channels(
query_filter: str | None = None,
order_by: str | None = None,
max_results: int | None = None,
page_size: int | None = None,
page_size: int | None = DEFAULT_PAGE_SIZE,
organization_id: str | None = None,
) -> list[CalculatedChannel]:
"""List all calculated channels matching the given query.
Expand All @@ -117,7 +117,7 @@ async def list_all_calculated_channels(
async def list_calculated_channels(
self,
*,
page_size: int | None = None,
page_size: int | None = DEFAULT_PAGE_SIZE,
page_token: str | None = None,
query_filter: str | None = None,
order_by: str | None = None,
Expand Down Expand Up @@ -198,7 +198,7 @@ async def list_calculated_channel_versions(
calculated_channel_id: str | None = None,
client_key: str | None = None,
organization_id: str | None = None,
page_size: int | None = None,
page_size: int | None = DEFAULT_PAGE_SIZE,
page_token: str | None = None,
query_filter: str | None = None,
order_by: str | None = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async def get_channel(self, channel_id: str) -> Channel:
async def list_channels(
self,
*,
page_size: int | None = None,
page_size: int | None = CHANNELS_DEFAULT_PAGE_SIZE,
page_token: str | None = None,
query_filter: str | None = None,
order_by: str | None = None,
Expand Down Expand Up @@ -97,6 +97,7 @@ async def list_all_channels(
*,
query_filter: str | None = None,
order_by: str | None = None,
page_size: int | None = CHANNELS_DEFAULT_PAGE_SIZE,
max_results: int | None = None,
) -> list[Channel]:
"""List all channels with optional filtering.
Expand All @@ -109,10 +110,6 @@ async def list_all_channels(
Returns:
A list of all matching channels.
"""
# Channels default page size is 10,000 so lower it if we're passing max_results
page_size = None
if max_results is not None and max_results <= CHANNELS_DEFAULT_PAGE_SIZE:
page_size = max_results
return await self._handle_pagination(
self.list_channels,
kwargs={"query_filter": query_filter},
Expand Down
3 changes: 0 additions & 3 deletions python/lib/sift_client/_internal/low_level_wrappers/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,6 @@ async def get_channel_data(
tasks = []
# Queue up calls for non-cached channels in batches.
batch_size = REQUEST_BATCH_SIZE
page_size = None
if max_results is not None and max_results <= CHANNELS_DEFAULT_PAGE_SIZE:
page_size = max_results
for i in range(0, len(not_cached_channels), batch_size): # type: ignore
batch = not_cached_channels[i : i + batch_size] # type: ignore

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from typing import TYPE_CHECKING, Iterable, cast

from sift.ingestion_configs.v2.ingestion_configs_pb2 import (
GetIngestionConfigRequest,
ListIngestionConfigFlowsRequest,
ListIngestionConfigFlowsResponse,
ListIngestionConfigsRequest,
ListIngestionConfigsResponse,
Expand Down Expand Up @@ -96,8 +96,8 @@ def __init__(self, grpc_client: GrpcClient):

async def get_ingestion_config_flows(self, ingestion_config_id: str) -> list[FlowConfig]:
"""Get the flows for an ingestion config."""
res = await self._grpc_client.get_stub(IngestionConfigServiceStub).GetIngestionConfig(
GetIngestionConfigRequest(ingestion_config_id=ingestion_config_id)
res = await self._grpc_client.get_stub(IngestionConfigServiceStub).ListIngestionConfigFlows(
ListIngestionConfigFlowsRequest(ingestion_config_id=ingestion_config_id)
)
res = cast("ListIngestionConfigFlowsResponse", res)
return [FlowConfig._from_proto(flow) for flow in res.flows]
Expand Down
6 changes: 4 additions & 2 deletions python/lib/sift_client/_internal/low_level_wrappers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
)
from sift.jobs.v1.jobs_pb2_grpc import JobServiceStub

from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
from sift_client._internal.low_level_wrappers.base import DEFAULT_PAGE_SIZE, LowLevelClientBase
from sift_client.sift_types.job import Job
from sift_client.transport import GrpcClient, WithGrpcClient

Expand All @@ -41,7 +41,7 @@ def __init__(self, grpc_client: GrpcClient):
async def list_jobs(
self,
*,
page_size: int | None = None,
page_size: int | None = DEFAULT_PAGE_SIZE,
page_token: str | None = None,
query_filter: str | None = None,
organization_id: str | None = None,
Expand Down Expand Up @@ -83,6 +83,7 @@ async def list_all_jobs(
query_filter: str | None = None,
organization_id: str | None = None,
order_by: str | None = None,
page_size: int | None = DEFAULT_PAGE_SIZE,
max_results: int | None = None,
) -> list[Job]:
"""List all jobs, handling pagination automatically.
Expand All @@ -107,6 +108,7 @@ async def list_all_jobs(
kwargs=kwargs,
order_by=order_by,
max_results=max_results,
page_size=page_size,
)

return jobs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from sift.remote_files.v1.remote_files_pb2_grpc import RemoteFileServiceStub

from sift_client._internal.low_level_wrappers.base import (
DEFAULT_PAGE_SIZE,
LowLevelClientBase,
)
from sift_client.transport import GrpcClient, WithGrpcClient
Expand Down Expand Up @@ -64,7 +65,7 @@ async def list_all_remote_files(
self,
query_filter: str | None = None,
max_results: int | None = None,
page_size: int | None = None,
page_size: int | None = DEFAULT_PAGE_SIZE,
order_by: str | None = None,
sift_client: SiftClient | None = None,
) -> list[FileAttachment]:
Expand All @@ -91,7 +92,7 @@ async def list_all_remote_files(

async def list_remote_files(
self,
page_size: int | None = None,
page_size: int | None = DEFAULT_PAGE_SIZE,
page_token: str | None = None,
query_filter: str | None = None,
order_by: str | None = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ async def list_all_reports(
query_filter: str | None = None,
organization_id: str | None = None,
order_by: str | None = None,
page_size: int | None = None,
max_results: int | None = None,
) -> list[Report]:
"""List all reports with optional filtering.
Expand All @@ -126,6 +127,7 @@ async def list_all_reports(
},
order_by=order_by,
max_results=max_results,
page_size=page_size,
)

async def rerun_report(self, report_id: str) -> tuple[str, str]:
Expand Down
4 changes: 2 additions & 2 deletions python/lib/sift_client/_internal/low_level_wrappers/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
)
from sift.rules.v1.rules_pb2_grpc import RuleServiceStub

from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
from sift_client._internal.low_level_wrappers.base import DEFAULT_PAGE_SIZE, LowLevelClientBase
from sift_client._internal.low_level_wrappers.reports import ReportsLowLevelClient
from sift_client._internal.util.timestamp import to_pb_timestamp
from sift_client._internal.util.util import count_non_none
Expand Down Expand Up @@ -495,7 +495,7 @@ async def list_all_rules(
filter_query: str | None = None,
order_by: str | None = None,
max_results: int | None = None,
page_size: int | None = None,
page_size: int | None = DEFAULT_PAGE_SIZE,
) -> list[Rule]:
"""List all rules."""
return await self._handle_pagination(
Expand Down
6 changes: 4 additions & 2 deletions python/lib/sift_client/_internal/low_level_wrappers/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
)
from sift.runs.v2.runs_pb2_grpc import RunServiceStub

from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
from sift_client._internal.low_level_wrappers.base import DEFAULT_PAGE_SIZE, LowLevelClientBase
from sift_client._internal.util.timestamp import to_pb_timestamp
from sift_client.sift_types.run import Run, RunCreate, RunUpdate
from sift_client.transport import WithGrpcClient
Expand Down Expand Up @@ -66,7 +66,7 @@ async def get_run(self, run_id: str) -> Run:
async def list_runs(
self,
*,
page_size: int | None = None,
page_size: int | None = DEFAULT_PAGE_SIZE,
page_token: str | None = None,
query_filter: str | None = None,
order_by: str | None = None,
Expand Down Expand Up @@ -104,6 +104,7 @@ async def list_all_runs(
*,
query_filter: str | None = None,
order_by: str | None = None,
page_size: int | None = DEFAULT_PAGE_SIZE,
max_results: int | None = None,
) -> list[Run]:
"""List all runs with optional filtering.
Expand All @@ -121,6 +122,7 @@ async def list_all_runs(
kwargs={"query_filter": query_filter},
order_by=order_by,
max_results=max_results,
page_size=page_size,
)

async def create_run(self, *, create: RunCreate) -> Run:
Expand Down
6 changes: 4 additions & 2 deletions python/lib/sift_client/_internal/low_level_wrappers/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
)
from sift.tags.v2.tags_pb2_grpc import TagServiceStub

from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
from sift_client._internal.low_level_wrappers.base import DEFAULT_PAGE_SIZE, LowLevelClientBase
from sift_client.sift_types.tag import Tag
from sift_client.transport import WithGrpcClient

Expand Down Expand Up @@ -59,7 +59,7 @@ async def create_tag(self, name: str) -> Tag:
async def list_tags(
self,
*,
page_size: int | None = None,
page_size: int | None = DEFAULT_PAGE_SIZE,
page_token: str | None = None,
query_filter: str | None = None,
order_by: str | None = None,
Expand Down Expand Up @@ -97,6 +97,7 @@ async def list_all_tags(
*,
query_filter: str | None = None,
order_by: str | None = None,
page_size: int | None = DEFAULT_PAGE_SIZE,
max_results: int | None = None,
) -> list[Tag]:
"""List all tags with optional filtering.
Expand All @@ -114,4 +115,5 @@ async def list_all_tags(
kwargs={"query_filter": query_filter},
order_by=order_by,
max_results=max_results,
page_size=page_size,
)
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
)
from sift.test_reports.v1.test_reports_pb2_grpc import TestReportServiceStub

from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
from sift_client._internal.low_level_wrappers.base import DEFAULT_PAGE_SIZE, LowLevelClientBase
from sift_client.sift_types.test_report import (
TestMeasurement,
TestMeasurementCreate,
Expand Down Expand Up @@ -129,7 +129,7 @@ async def get_test_report(self, test_report_id: str) -> TestReport:
async def list_test_reports(
self,
*,
page_size: int | None = None,
page_size: int | None = DEFAULT_PAGE_SIZE,
page_token: str | None = None,
query_filter: str | None = None,
order_by: str | None = None,
Expand Down Expand Up @@ -167,6 +167,7 @@ async def list_all_test_reports(
*,
query_filter: str | None = None,
order_by: str | None = None,
page_size: int | None = DEFAULT_PAGE_SIZE,
max_results: int | None = None,
) -> list[TestReport]:
"""List all test reports with optional filtering.
Expand All @@ -184,6 +185,7 @@ async def list_all_test_reports(
kwargs={"query_filter": query_filter},
order_by=order_by,
max_results=max_results,
page_size=page_size,
)

async def update_test_report(self, update: TestReportUpdate) -> TestReport:
Expand Down Expand Up @@ -274,6 +276,7 @@ async def list_all_test_steps(
query_filter: str | None = None,
order_by: str | None = None,
max_results: int | None = None,
page_size: int | None = DEFAULT_PAGE_SIZE,
) -> list[TestStep]:
"""List all test steps with optional filtering.

Expand All @@ -290,6 +293,7 @@ async def list_all_test_steps(
kwargs={"query_filter": query_filter},
order_by=order_by,
max_results=max_results,
page_size=page_size,
)

async def update_test_step(self, update: TestStepUpdate) -> TestStep:
Expand Down Expand Up @@ -367,7 +371,7 @@ async def create_test_measurements(
async def list_test_measurements(
self,
*,
page_size: int | None = None,
page_size: int | None = DEFAULT_PAGE_SIZE,
page_token: str | None = None,
query_filter: str | None = None,
order_by: str | None = None,
Expand Down Expand Up @@ -408,6 +412,7 @@ async def list_all_test_measurements(
query_filter: str | None = None,
order_by: str | None = None,
max_results: int | None = None,
page_size: int | None = DEFAULT_PAGE_SIZE,
) -> list[TestMeasurement]:
"""List all test measurements with optional filtering.

Expand All @@ -424,6 +429,7 @@ async def list_all_test_measurements(
kwargs={"query_filter": query_filter},
order_by=order_by,
max_results=max_results,
page_size=page_size,
)

async def update_test_measurement(self, update: TestMeasurementUpdate) -> TestMeasurement:
Expand Down
Loading
Loading