Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 242 additions & 0 deletions python/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,248 @@ All notable changes to this project will be documented in this file.

This project adheres to [Semantic Versioning](http://semver.org/).

## [v0.15.0] - May 7, 2026

### What's New

v0.15.0 updates the streaming ingestion client to match the new `sift-stream-bindings` 0.3.0
API. The `RecoveryStrategyConfig` class and `recovery_strategy` parameter have been replaced
with an explicit `StreamingMode` enum and discrete per-mode configuration kwargs. The
non-blocking send method has been renamed for consistency with the Rust library, and a new
`try_send` method is available for single-flow non-blocking sends. This release contains
**breaking changes** to the ingestion API — see below for details and a migration prompt.

#### Breaking Changes

##### 1. `RecoveryStrategyConfig` Removed — Replaced by `StreamingMode` + Per-Mode Kwargs

`RecoveryStrategyConfig` and the `recovery_strategy` parameter on `IngestionConfigStreamingClient`
and `IngestionAPIAsync` have been removed. Transport mode is now selected via a `StreamingMode`
enum, with separate `retry_policy` and `disk_backup_policy` kwargs for per-mode configuration.

The default mode is `StreamingMode.LIVE_WITH_BACKUPS`, which matches the previous default
behavior of `RecoveryStrategyConfig.retry_with_backups()`.

**Removed from the public API:**
- `RecoveryStrategyConfig` class (and its `retry_only()` / `retry_with_backups()` factory methods)
- `recovery_strategy` parameter on `IngestionConfigStreamingClient` and `IngestionAPIAsync`

**Added:**
- `StreamingMode` enum — `LIVE_ONLY`, `LIVE_WITH_BACKUPS`, `FILE_BACKUP`
- `streaming_mode` parameter (default: `StreamingMode.LIVE_WITH_BACKUPS`)
- `retry_policy` parameter — applies to `LIVE_WITH_BACKUPS` mode
- `disk_backup_policy` parameter — applies to `LIVE_WITH_BACKUPS` and `FILE_BACKUP` modes
- `checkpoint_interval_seconds` parameter — applies to `LIVE_WITH_BACKUPS` mode

**Before:**
```python
from sift_client.resources.ingestion import (
IngestionConfigStreamingClient,
RecoveryStrategyConfig,
)

# Default: live streaming with backups
client = await IngestionConfigStreamingClient.create(
ingestion_config=my_config,
recovery_strategy=RecoveryStrategyConfig.retry_with_backups(),
)

# Retry only (no disk backups)
client = await IngestionConfigStreamingClient.create(
ingestion_config=my_config,
recovery_strategy=RecoveryStrategyConfig.retry_only(),
)
```

**After:**
```python
from sift_client.resources.ingestion import (
IngestionConfigStreamingClient,
StreamingMode,
)

# Default: live streaming with backups (no change needed if you were using the default)
client = await IngestionConfigStreamingClient.create(
ingestion_config=my_config,
streaming_mode=StreamingMode.LIVE_WITH_BACKUPS,
)

# Live only — no disk backups, lowest overhead
client = await IngestionConfigStreamingClient.create(
ingestion_config=my_config,
streaming_mode=StreamingMode.LIVE_ONLY,
)

# File backup only
client = await IngestionConfigStreamingClient.create(
ingestion_config=my_config,
streaming_mode=StreamingMode.FILE_BACKUP,
)
```

To pass a custom retry or disk backup policy:
```python
from sift_stream_bindings import DiskBackupPolicyPy, RetryPolicyPy

client = await IngestionConfigStreamingClient.create(
ingestion_config=my_config,
streaming_mode=StreamingMode.LIVE_WITH_BACKUPS,
retry_policy=RetryPolicyPy.default(),
disk_backup_policy=DiskBackupPolicyPy.default(),
checkpoint_interval_seconds=30,
)
```

##### 2. `send_requests_nonblocking` Renamed to `try_send_requests`

`IngestionConfigStreamingClient.send_requests_nonblocking` has been renamed to `try_send_requests`
to align with the Rust `sift-stream` naming convention where `try_` methods return immediately
without awaiting channel capacity.

**Before:**
```python
client.send_requests_nonblocking(requests)
```

**After:**
```python
client.try_send_requests(requests)
```

#### New Features

##### `try_send` — Non-Blocking Single-Flow Send

A new `try_send(flow)` method is available on `IngestionConfigStreamingClient` for non-blocking
single-flow sends. It accepts either a `Flow` or a raw `FlowPy` object.

```python
client.try_send(flow)
```

Use `try_send` in real-time loops where blocking on channel capacity is unacceptable. For
most use cases, the async `send(flow)` method (which applies backpressure) is preferred.

##### `sift-stream-bindings` 0.3.0

The `sift-stream-bindings` dependency has been bumped to 0.3.0, which reflects the
`sift-stream` 0.9.0 breaking API changes (stepped builder, send rename, removed types).

#### AI-Assisted Migration Prompt (v0.14.x → v0.15.0)

Copy and paste the following prompt to an AI coding agent to automate the upgrade:

```
You are upgrading a Python project from sift_client v0.14.x to v0.15.0. The streaming ingestion
API has breaking changes. Apply ALL of the following changes precisely. Do not make any other
modifications.

---

## 1. Update `pyproject.toml`

Find every occurrence of `sift-stream-bindings==0.2.2` and replace it with
`sift-stream-bindings==0.3.0`. This may appear under multiple dependency groups (e.g. `all`,
`dev-all`, `sift-stream`, `sift-stream-bindings`).

---

## 2. Remove all imports of `RecoveryStrategyConfig`

Delete any line that imports `RecoveryStrategyConfig`, for example:

from sift_client.resources.ingestion import RecoveryStrategyConfig
from sift_client.resources.ingestion import IngestionConfigStreamingClient, RecoveryStrategyConfig

Remove only `RecoveryStrategyConfig` from those imports; keep any other names on the same line.

---

## 3. Add `StreamingMode` to imports where needed

Wherever `IngestionConfigStreamingClient` or `IngestionAPIAsync` is imported and a streaming
mode needs to be specified, add `StreamingMode` to the import:

from sift_client.resources.ingestion import IngestionConfigStreamingClient, StreamingMode

---

## 4. Replace all `recovery_strategy` call sites

Search for every call to `IngestionConfigStreamingClient.create(...)` and
`IngestionAPIAsync.create(...)` that contains a `recovery_strategy` keyword argument.

### Case A — `RecoveryStrategyConfig.retry_with_backups()` (or no recovery_strategy at all)

This was (and remains) the default. Replace the kwarg:

# BEFORE
recovery_strategy=RecoveryStrategyConfig.retry_with_backups()

# AFTER
streaming_mode=StreamingMode.LIVE_WITH_BACKUPS

If the call had no `recovery_strategy` argument, no change is needed — `LIVE_WITH_BACKUPS`
is the default.

If the old call passed explicit `retry_policy` or `disk_backup_policy` arguments inside
`RecoveryStrategyConfig.retry_with_backups(...)`, move them to top-level kwargs:

# BEFORE
recovery_strategy=RecoveryStrategyConfig.retry_with_backups(
retry_policy=my_retry_policy,
disk_backup_policy=my_disk_policy,
)

# AFTER
streaming_mode=StreamingMode.LIVE_WITH_BACKUPS,
retry_policy=my_retry_policy,
disk_backup_policy=my_disk_policy,

### Case B — `RecoveryStrategyConfig.retry_only()`

Replace with `streaming_mode=StreamingMode.LIVE_ONLY`. If a `retry_policy` was passed,
keep it as a top-level kwarg (it is ignored for `LIVE_ONLY` in this version, but preserving
it avoids a TypeError):

# BEFORE
recovery_strategy=RecoveryStrategyConfig.retry_only(retry_policy=my_policy)

# AFTER
streaming_mode=StreamingMode.LIVE_ONLY

### Case C — Raw `RecoveryStrategyPy` object passed directly

If any call passes a raw `RecoveryStrategyPy` instance as `recovery_strategy`, determine
which mode it was configured for and replace accordingly:
- `RecoveryStrategyPy.retry_only(...)` → `streaming_mode=StreamingMode.LIVE_ONLY`
- `RecoveryStrategyPy.retry_with_backups(...)` → `streaming_mode=StreamingMode.LIVE_WITH_BACKUPS`
with `retry_policy` and `disk_backup_policy` promoted to top-level kwargs.

---

## 5. Rename `send_requests_nonblocking` → `try_send_requests`

Find every call to `.send_requests_nonblocking(...)` on any ingestion client instance and
rename it to `.try_send_requests(...)`. The signature is unchanged.

# BEFORE
client.send_requests_nonblocking(requests)

# AFTER
client.try_send_requests(requests)

---

## 6. Verify

After applying the above changes:
1. Run `grep -r "RecoveryStrategyConfig" .` — expect zero results.
2. Run `grep -r "send_requests_nonblocking" .` — expect zero results.
3. Run `grep -r "recovery_strategy" .` — expect zero results.
4. Run your test suite to confirm no remaining references.
```

## [v0.14.1] - April 30, 2026

### Bugfixes
Expand Down
75 changes: 49 additions & 26 deletions python/lib/sift_client/_internal/low_level_wrappers/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@
from datetime import datetime

from sift_stream_bindings import (
DurationPy,
DiskBackupPolicyPy,
FlowConfigPy,
FlowDescriptorPy,
FlowPy,
IngestionConfigFormPy,
IngestWithConfigDataStreamRequestPy,
IngestWithConfigDataStreamRequestWrapperPy,
MetadataPy,
RecoveryStrategyPy,
RetryPolicyPy,
RunFormPy,
RunSelectorPy,
SiftStreamBuilderPy,
Expand All @@ -56,7 +56,7 @@
TimeValuePy,
)

from sift_client.resources.ingestion import TracingConfig
from sift_client.resources.ingestion import StreamingMode, TracingConfig


def _to_rust_py_timestamp(time: datetime) -> TimeValuePy:
Expand Down Expand Up @@ -250,26 +250,32 @@ async def create_sift_stream_instance(
cls,
api_key: str,
grpc_uri: str,
ingestion_config: IngestionConfigFormPy,
ingestion_config_form: IngestionConfigFormPy,
run_form: RunFormPy | None = None,
run_id: str | None = None,
asset_tags: list[str] | None = None,
asset_metadata: list[MetadataPy] | None = None,
recovery_strategy: RecoveryStrategyPy | None = None,
checkpoint_interval: DurationPy | None = None,
streaming_mode: StreamingMode = ..., # type: ignore[assignment]
retry_policy: RetryPolicyPy | None = None,
disk_backup_policy: DiskBackupPolicyPy | None = None,
checkpoint_interval_seconds: int | None = None,
enable_tls: bool = True,
tracing_config: TracingConfig | None = None,
) -> IngestionConfigStreamingLowLevelClient:
# Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users
# TODO(nathan): Fix bindings to fix mypy issues with tracing functions
from sift_stream_bindings import ( # type: ignore[attr-defined]
DurationPy,
SiftStreamBuilderPy,
init_tracing, # type: ignore[attr-defined]
init_tracing_with_file, # type: ignore[attr-defined]
is_tracing_initialized, # type: ignore[attr-defined]
) # type: ignore[attr-defined]
)

from sift_client.resources.ingestion import StreamingMode, TracingConfig

from sift_client.resources.ingestion import TracingConfig
if streaming_mode is ...: # type: ignore[comparison-overlap]
streaming_mode = StreamingMode.LIVE_WITH_BACKUPS

if not is_tracing_initialized():
if tracing_config is None:
Expand All @@ -287,21 +293,35 @@ async def create_sift_stream_instance(
# Use stdout/stderr only
init_tracing(tracing_config.level)

builder = SiftStreamBuilderPy(
uri=grpc_uri,
apikey=api_key,
)

builder.enable_tls = enable_tls
builder.ingestion_config = ingestion_config
builder.recovery_strategy = recovery_strategy
builder.checkpoint_interval = checkpoint_interval
builder.asset_tags = asset_tags
builder.metadata = asset_metadata
builder.run = run_form
builder.run_id = run_id

sift_stream_instance = await builder.build()
sift_builder = SiftStreamBuilderPy(uri=grpc_uri, apikey=api_key)
sift_builder.enable_tls = enable_tls

config_builder = sift_builder.ingestion_config(ingestion_config_form)
config_builder.run = run_form
config_builder.run_id = run_id
config_builder.asset_tags = asset_tags
config_builder.metadata = asset_metadata

if streaming_mode == StreamingMode.LIVE_ONLY:
sift_stream_instance = await config_builder.live_only().build()

elif streaming_mode == StreamingMode.FILE_BACKUP:
fb_builder = config_builder.file_backup()
if disk_backup_policy is not None:
fb_builder.disk_backup_policy = disk_backup_policy
sift_stream_instance = await fb_builder.build()

else: # LIVE_WITH_BACKUPS (default)
lwb_builder = config_builder.live_with_backups()
if retry_policy is not None:
lwb_builder.retry_policy = retry_policy
if disk_backup_policy is not None:
lwb_builder.disk_backup_policy = disk_backup_policy
if checkpoint_interval_seconds is not None:
lwb_builder.checkpoint_interval = DurationPy(
secs=checkpoint_interval_seconds, nanos=0
)
sift_stream_instance = await lwb_builder.build()

return cls(sift_stream_instance)

Expand All @@ -314,10 +334,13 @@ async def batch_send(self, flows: Iterable[FlowPy]):
async def send_requests(self, requests: list[IngestWithConfigDataStreamRequestPy]):
await self._sift_stream_instance.send_requests(requests)

def send_requests_nonblocking(
def try_send_requests(
self, requests: Iterable[IngestWithConfigDataStreamRequestWrapperPy]
):
self._sift_stream_instance.send_requests_nonblocking(requests)
) -> None:
self._sift_stream_instance.try_send_requests(requests)

def try_send(self, flow: FlowPy) -> None:
self._sift_stream_instance.try_send(flow)

def get_flow_descriptor(self, flow_name: str) -> FlowDescriptorPy:
return self._sift_stream_instance.get_flow_descriptor(flow_name)
Expand Down
3 changes: 2 additions & 1 deletion python/lib/sift_client/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ async def main():
from sift_client.resources.calculated_channels import CalculatedChannelsAPIAsync
from sift_client.resources.channels import ChannelsAPIAsync
from sift_client.resources.file_attachments import FileAttachmentsAPIAsync
from sift_client.resources.ingestion import IngestionAPIAsync, TracingConfig
from sift_client.resources.ingestion import IngestionAPIAsync, StreamingMode, TracingConfig
from sift_client.resources.jobs import JobsAPIAsync
from sift_client.resources.ping import PingAPIAsync
from sift_client.resources.reports import ReportsAPIAsync
Expand Down Expand Up @@ -200,6 +200,7 @@ async def main():
"FileAttachmentsAPI",
"FileAttachmentsAPIAsync",
"IngestionAPIAsync",
"StreamingMode",
"JobsAPI",
"JobsAPIAsync",
"PingAPI",
Expand Down
Loading
Loading