Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
35a52ee
wip commit
alexluck-sift Sep 26, 2025
768b69d
change extras isntall testing to only be all or none
alexluck-sift Sep 26, 2025
d763b05
add support for .env files and integration test marker for pytest
alexluck-sift Sep 26, 2025
5d46aa7
add integration tests
alexluck-sift Sep 26, 2025
c33bdc6
linting and reorganized pre-push hook
alexluck-sift Sep 29, 2025
0cc9a4b
break out unit and integration tests
alexluck-sift Sep 29, 2025
4fb2523
linting
alexluck-sift Sep 29, 2025
fcc4bba
update CI
alexluck-sift Sep 29, 2025
2d59cd4
CI update to run all python checks in parallel
alexluck-sift Sep 29, 2025
d4abaff
update ci to use pip cahce instead of a full venv
alexluck-sift Sep 29, 2025
449e73b
revert to steps only
alexluck-sift Sep 29, 2025
ec54a23
remove error test from test_runs
alexluck-sift Sep 29, 2025
bc82512
lint
alexluck-sift Sep 29, 2025
ddeafd3
update env
alexluck-sift Sep 29, 2025
fa0960c
update env
alexluck-sift Sep 29, 2025
1b3e02b
update env
alexluck-sift Sep 29, 2025
d1937b9
update runs integration test
alexluck-sift Sep 29, 2025
f3fb8a6
update runs integration test
alexluck-sift Sep 30, 2025
c29ec36
fmt
alexluck-sift Sep 30, 2025
b7d3dca
fix tests
alexluck-sift Sep 30, 2025
df3d868
fix dev script args
alexluck-sift Oct 7, 2025
a3ae253
move sift client fixture to a shared file
alexluck-sift Oct 8, 2025
3709185
add channels test
alexluck-sift Oct 9, 2025
4223aa7
add test_calculated_channels.py
alexluck-sift Oct 9, 2025
dc1e389
add test_rules.py
alexluck-sift Oct 9, 2025
289604c
add test_ingestion.py
alexluck-sift Oct 9, 2025
69739da
linting
alexluck-sift Oct 9, 2025
e589b2a
add base test
alexluck-sift Oct 9, 2025
56f2aec
linting
alexluck-sift Oct 9, 2025
6a5aca3
docs fix
alexluck-sift Oct 9, 2025
4be9e09
- fix tests
alexluck-sift Oct 10, 2025
540cf1b
fix tests
alexluck-sift Oct 10, 2025
2324754
remove environment
alexluck-sift Oct 10, 2025
b1cecdd
remove invalid test
alexluck-sift Oct 10, 2025
ded6112
improve integration test coverage
alexluck-sift Oct 10, 2025
e3704f1
add unit tests for sift_types
alexluck-sift Oct 10, 2025
519e61f
fmt and linting
alexluck-sift Oct 10, 2025
3bc0c2d
update pre-push
alexluck-sift Oct 10, 2025
feacab2
fix regression
alexluck-sift Oct 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .githooks/pre-push
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ GITHOOKS_DIR="$REPO_ROOT/.githooks"
python_changed_files=($(git diff --name-only --diff-filter=ACM | grep '^python/lib/sift_client/' || true))

if [[ -n "$python_changed_files" ]]; then
echo "Python files changed, running Python stub checks..."
echo "Python files changed, running Python formatting and linting..."
bash "$GITHOOKS_DIR/pre-push-python/fmt-lint.sh"

echo "Running Python stub checks..."
bash "$GITHOOKS_DIR/pre-push-python/stubs.sh"
fi

Expand Down
35 changes: 35 additions & 0 deletions .githooks/pre-push-python/fmt-lint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env bash

set -e

# Store the root directory of the repository
REPO_ROOT="$(git rev-parse --show-toplevel)"
PYTHON_DIR="$REPO_ROOT/python"

echo "Running Python formatting and linting with --fix..."

# Change to Python directory
cd "$PYTHON_DIR"

# Run ruff format (formatter)
echo "Running ruff format..."
bash ./scripts/dev fmt

# Run ruff check with --fix (linter)
echo "Running ruff check --fix..."
bash ./scripts/dev lint-fix

# Check if any files were modified by formatting/linting
cd "$REPO_ROOT"
changed_files=$(git status --porcelain python/lib/sift_client/ | grep -E '\.py$' || true)

if [ -n "$changed_files" ]; then
echo ""
echo "ERROR: Formatting/linting made changes to the following files:"
echo "$changed_files"
echo ""
echo "Please commit these changes before pushing."
exit 1
fi

echo "Python formatting and linting completed successfully."
14 changes: 12 additions & 2 deletions .github/workflows/python_ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ jobs:
python-version: "3.8"

- name: Pip install
id: install
run: |
python -m pip install --upgrade pip
pip install '.[development,openssl,tdms,rosbags,hdf5,sift-stream]'

- name: Lint
run: |
ruff check
Expand All @@ -44,9 +46,17 @@ jobs:
run: |
pyright lib

- name: Pytest
- name: Pytest Unit Tests
run: |
pytest -m "not integration"

- name: Pytest Integration Tests
env:
SIFT_GRPC_URI: ${{ vars.SIFT_GRPC_URI }}
SIFT_REST_URI: ${{ vars.SIFT_REST_URI }}
SIFT_API_KEY: ${{ secrets.SIFT_API_KEY }}
run: |
pytest
pytest -m "integration"

- name: Sync Stubs Mypy
working-directory: python/lib
Expand Down
6 changes: 5 additions & 1 deletion python/lib/sift_client/_internal/low_level_wrappers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async def _handle_pagination(
page_size: The number of results to return per page.
page_token: The token to use for the next page.
order_by: How to order the retrieved results.
max_results: Maximum number of results to return. NOTE: Will be in increments of page_size or default page size defined by the call if no page_size is provided.
max_results: Maximum number of results to return.

Returns:
A list of all matching results.
Expand All @@ -31,6 +31,8 @@ async def _handle_pagination(
kwargs = {}

results: list[Any] = []
if max_results == 0:
return results
if page_token is None:
page_token = ""
while True:
Expand All @@ -45,4 +47,6 @@ async def _handle_pagination(
results.extend(response)
if page_token == "":
break
if max_results and len(results) > max_results:
results = results[:max_results]
return results
40 changes: 26 additions & 14 deletions python/lib/sift_client/_internal/low_level_wrappers/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@
from queue import Queue
from typing import TYPE_CHECKING, Any, cast

import sift_stream_bindings
from sift.ingestion_configs.v2.ingestion_configs_pb2 import (
GetIngestionConfigRequest,
ListIngestionConfigFlowsResponse,
ListIngestionConfigsRequest,
ListIngestionConfigsResponse,
)
from sift.ingestion_configs.v2.ingestion_configs_pb2_grpc import IngestionConfigServiceStub
from sift_stream_bindings import (
IngestionConfigFormPy,
IngestWithConfigDataStreamRequestPy,
from sift.ingestion_configs.v2.ingestion_configs_pb2_grpc import (
IngestionConfigServiceStub,
)

from sift_client._internal.low_level_wrappers.base import (
Expand All @@ -44,6 +41,12 @@
if TYPE_CHECKING:
from datetime import datetime

from sift_stream_bindings import (
IngestionConfigFormPy,
IngestWithConfigDataStreamRequestPy,
SiftStreamBuilderPy,
)


class IngestionThread(threading.Thread):
"""Manages ingestion for a single ingestion config."""
Expand All @@ -54,7 +57,7 @@ class IngestionThread(threading.Thread):

def __init__(
self,
sift_stream_builder: sift_stream_bindings.SiftStreamBuilderPy,
sift_stream_builder: SiftStreamBuilderPy,
data_queue: Queue,
ingestion_config: IngestionConfigFormPy,
no_data_timeout: int = 1,
Expand Down Expand Up @@ -154,7 +157,7 @@ class IngestionLowLevelClient(LowLevelClientBase, WithGrpcClient):

CacheEntry = namedtuple("CacheEntry", ["data_queue", "ingestion_config", "thread"])

sift_stream_builder: sift_stream_bindings.SiftStreamBuilderPy
sift_stream_builder: SiftStreamBuilderPy
stream_cache: dict[str, CacheEntry]

def __init__(self, grpc_client: GrpcClient):
Expand All @@ -163,21 +166,25 @@ def __init__(self, grpc_client: GrpcClient):
Args:
grpc_client: The gRPC client to use for making API calls.
"""
from sift_stream_bindings import (
RecoveryStrategyPy,
RetryPolicyPy,
SiftStreamBuilderPy,
)

super().__init__(grpc_client=grpc_client)
# Rust GRPC client expects URI to have http(s):// prefix.
uri = grpc_client._config.uri
if not uri.startswith("http"):
uri = f"https://{uri}" if grpc_client._config.use_ssl else f"http://{uri}"
self.sift_stream_builder = sift_stream_bindings.SiftStreamBuilderPy(
self.sift_stream_builder = SiftStreamBuilderPy(
uri=uri,
apikey=grpc_client._config.api_key,
)
self.sift_stream_builder.enable_tls = grpc_client._config.use_ssl
# FD-177: Expose configuration for recovery strategy.
self.sift_stream_builder.recovery_strategy = (
sift_stream_bindings.RecoveryStrategyPy.retry_only(
sift_stream_bindings.RetryPolicyPy.default()
)
self.sift_stream_builder.recovery_strategy = RecoveryStrategyPy.retry_only(
RetryPolicyPy.default()
)
self.stream_cache = {}

Expand Down Expand Up @@ -229,7 +236,9 @@ async def get_ingestion_config_id_from_client_key(self, client_key: str) -> str
return ingestion_configs[0].id_

def _new_ingestion_thread(
self, ingestion_config_id: str, ingestion_config: IngestionConfigFormPy | None = None
self,
ingestion_config_id: str,
ingestion_config: IngestionConfigFormPy | None = None,
):
"""Start a new ingestion thread.
This allows ingestion to happen in the background regardless of if the user is using the sync or async client
Expand Down Expand Up @@ -290,7 +299,6 @@ async def create_ingestion_config(
asset_name: str,
flows: list[Flow],
client_key: str | None = None,
organization_id: str | None = None,
) -> str:
"""Create an ingestion config.

Expand All @@ -303,6 +311,8 @@ async def create_ingestion_config(
Returns:
The id of the new or found ingestion config.
"""
from sift_stream_bindings import IngestionConfigFormPy

ingestion_config_id = None
if client_key:
logger.debug(f"Getting ingestion config id for client key {client_key}")
Expand Down Expand Up @@ -381,6 +391,8 @@ def ingest_flow(
channel_values: The channel values to ingest.
organization_id: The organization id to use for ingestion. Only relevant if the user is part of several organizations.
"""
from sift_stream_bindings import IngestWithConfigDataStreamRequestPy

if not flow.ingestion_config_id:
raise ValueError(
"Flow has no ingestion config id -- have you created an ingestion config for this flow?"
Expand Down
55 changes: 34 additions & 21 deletions python/lib/sift_client/_internal/low_level_wrappers/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ async def create_rule(
)
conditions_request = [
UpdateConditionRequest(
expression=expression_proto, actions=[create.action._to_update_request()]
expression=expression_proto,
actions=[create.action._to_update_request()],
)
]
update_request = UpdateRuleRequest(
Expand Down Expand Up @@ -183,9 +184,7 @@ def _update_rule_request_from_update(
"asset_tag_ids",
]
# Need to manually copy fields that will be reset even if not provided in update dict.
copy_unset_fields = [
"description",
]
copy_unset_fields = ["description", "name"]

# Populate the trivial fields first.
update_dict.update(
Expand Down Expand Up @@ -214,15 +213,17 @@ def _update_rule_request_from_update(
"Expression and channel_references must both be provided or both be None"
)
expression_proto = RuleConditionExpression(
calculated_channel=CalculatedChannelConfig(
expression=expression,
channel_references={
c.channel_reference: ChannelReferenceProto(name=c.channel_identifier)
for c in channel_references
},
calculated_channel=(
CalculatedChannelConfig(
expression=expression,
channel_references={
c.channel_reference: ChannelReferenceProto(name=c.channel_identifier)
for c in channel_references
},
)
if expression
else None
)
if expression
else None
)
conditions_request = [
UpdateConditionRequest(
Expand All @@ -238,10 +239,10 @@ def _update_rule_request_from_update(

# This always needs to be set, so handle the defaults.
update_dict["asset_configuration"] = RuleAssetConfiguration( # type: ignore
asset_ids=update.asset_ids if "asset_ids" in model_dump else rule.asset_ids or [],
tag_ids=update.asset_tag_ids
if "asset_tag_ids" in model_dump
else rule.asset_tag_ids or [],
asset_ids=(update.asset_ids if "asset_ids" in model_dump else rule.asset_ids or []),
tag_ids=(
update.asset_tag_ids if "asset_tag_ids" in model_dump else rule.asset_tag_ids or []
),
)

update_request = UpdateRuleRequest(
Expand All @@ -254,7 +255,7 @@ def _update_rule_request_from_update(
async def update_rule(
self, rule: Rule, update: RuleUpdate, version_notes: str | None = None
) -> Rule:
"""Update a rule.
"""Update a rule. Also handles archive/unarchive to behave similar to other low-level clients.

Args:
rule: The rule to update.
Expand All @@ -264,14 +265,26 @@ async def update_rule(
Returns:
The updated Rule.
"""

should_update_archive = "is_archived" in update.model_fields_set

update.resource_id = rule.id_
if not should_update_archive or (
should_update_archive and len(update.model_fields_set) > 1
):
update_request = self._update_rule_request_from_update(rule, update, version_notes)

response = await self._grpc_client.get_stub(RuleServiceStub).UpdateRule(update_request)
_ = cast("UpdateRuleResponse", response)

update_request = self._update_rule_request_from_update(rule, update, version_notes)
if should_update_archive:
if update.is_archived:
await self.archive_rule(rule_id=rule.id_)
else:
await self.unarchive_rule(rule_id=rule.id_)

response = await self._grpc_client.get_stub(RuleServiceStub).UpdateRule(update_request)
updated_grpc_rule = cast("UpdateRuleResponse", response)
# Get the updated rule
return await self.get_rule(rule_id=updated_grpc_rule.rule_id)
return await self.get_rule(rule_id=rule.id_)

async def batch_update_rules(self, rules: list[RuleUpdate]) -> BatchUpdateRulesResponse:
"""Batch update rules.
Expand Down
Loading
Loading