Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ab6e07e
migrate to pipeline v3
PabloPardoGarcia Mar 26, 2026
a828ab1
:chore: code format
PabloPardoGarcia Mar 26, 2026
48d6f30
Merge a828ab1c7e4ec38c6c3b195d91eef3b611d5dc76 into de1521e7675d2475b…
PabloPardoGarcia Mar 26, 2026
216d343
docs: update coverage badge
github-actions[bot] Mar 26, 2026
f1a8f58
Merge 216d343e73dfe135afe5134fe11580dbf076649a into de1521e7675d2475b…
PabloPardoGarcia Mar 26, 2026
f90c8cd
chore: bump version to 4.0.0
github-actions[bot] Mar 26, 2026
0a3dbf9
fix validation error on pipeline version
PabloPardoGarcia Mar 26, 2026
d5b5195
Merge branch 'add-support-for-pipeline-v3' of github.com:glassflow/gl…
PabloPardoGarcia Mar 26, 2026
c56b89b
feat: update pipeline config to V3 schema
PabloPardoGarcia Apr 10, 2026
ce84a93
Merge branch 'main' into add-support-for-pipeline-v3
PabloPardoGarcia Apr 10, 2026
3bd3597
docs: update coverage badge
github-actions[bot] Apr 10, 2026
4973986
align transformation file names
PabloPardoGarcia Apr 14, 2026
08657a4
Merge remote-tracking branch 'origin/add-support-for-pipeline-v3' int…
PabloPardoGarcia Apr 14, 2026
8ad0341
docs: update coverage badge
github-actions[bot] Apr 14, 2026
12c0b4a
refactor: replace local v2->v3 migration helper with backend endpoint
PabloPardoGarcia Apr 22, 2026
a6fb7f8
style: ruff format fix for test_client.py
PabloPardoGarcia Apr 22, 2026
182d48e
docs: update coverage badge
github-actions[bot] Apr 22, 2026
46a78c6
docs: update coverage badge
github-actions[bot] Apr 22, 2026
d41df55
docs: update coverage badge
github-actions[bot] Apr 24, 2026
82e1d85
Update readme for migration example
ashish-bagri Apr 27, 2026
b83dad9
docs: update coverage badge
github-actions[bot] Apr 27, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 105 additions & 131 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<img src="https://github.com/glassflow/glassflow-python-sdk/workflows/Test/badge.svg?labelColor=&color=e69e3a">
</a>
<!-- Pytest Coverage Comment:Begin -->
<img src="https://img.shields.io/badge/coverage-90%25-brightgreen">
<img src="https://img.shields.io/badge/coverage-94%25-brightgreen">
<!-- Pytest Coverage Comment:End -->
</p>

Expand All @@ -24,9 +24,13 @@ A Python SDK for creating and managing data pipelines between Kafka and ClickHou
## Features

- Create and manage data pipelines between Kafka and ClickHouse
- Deduplication of events during a time window based on a key
- Temporal joins between topics based on a common key with a given time window
- Ingest from Kafka sources or OTLP signals (logs, metrics, traces)
- Unified transforms pipeline: dedup, filter, and stateless transformations
- Temporal joins between sources based on a common key with a given time window
- Per-source Schema Registry integration
- Pipeline configuration via YAML or JSON
- Schema validation and configuration management
- Fine-grained resource control per pipeline component

## Installation

Expand All @@ -41,191 +45,161 @@ pip install glassflow
```python
from glassflow.etl import Client

# Initialize GlassFlow client
client = Client(host="your-glassflow-etl-url")
```

### Create a pipeline

The example below uses pipeline version `v3`. See [Migrating from V2 to V3](#migrating-from-v2-to-v3) if you have existing `v2` configurations.

```python
pipeline_config = {
"version": "v2",
"version": "v3",
"pipeline_id": "my-pipeline-id",
"source": {
"type": "kafka",
"connection_params": {
"brokers": [
"http://my.kafka.broker:9093"
],
"protocol": "PLAINTEXT",
"mechanism": "NO_AUTH"
},
"topics": [
"sources": [
{
"consumer_group_initial_offset": "latest",
"name": "users",
"deduplication": {
"enabled": True,
"id_field": "event_id",
"id_field_type": "string",
"time_window": "1h"
}
"type": "kafka",
"source_id": "users",
"connection_params": {
"brokers": ["my.kafka.broker:9093"],
"protocol": "PLAINTEXT",
},
"topic": "users",
"consumer_group_initial_offset": "latest",
"schema_fields": [
{"name": "event_id", "type": "string"},
{"name": "user_id", "type": "string"},
{"name": "created_at", "type": "string"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"},
],
}
]
},
"join": {
"enabled": False
},
"sink": {
"type": "clickhouse",
"host": "http://my.clickhouse.server",
"port": "9000",
"database": "default",
"username": "default",
"password": "c2VjcmV0",
"secure": False,
"max_batch_size": 1000,
"max_delay_time": "30s",
"table": "users_dedup"
},
"schema": {
"fields": [
{
"source_id": "users",
"name": "event_id",
"type": "string",
"column_name": "event_id",
"column_type": "UUID"
},
{
"source_id": "users",
"field_name": "user_id",
"column_name": "user_id",
"column_type": "UUID"
},
{
"source_id": "users",
"name": "created_at",
"type": "string",
"column_name": "created_at",
"column_type": "DateTime"
},
],
"transforms": [
{
"source_id": "users",
"name": "name",
"type": "string",
"column_name": "name",
"column_type": "String"
},
{
"source_id": "users",
"name": "email",
"type": "string",
"column_name": "email",
"column_type": "String"
"type": "dedup",
"source_id": "users",
"config": {
"key": "event_id",
"time_window": "1h",
},
}
]
}
],
"sink": {
"type": "clickhouse",
"connection_params": {
"host": "my.clickhouse.server",
"port": "9000",
"database": "default",
"username": "default",
"password": "mysecret",
"secure": False,
},
"table": "users",
"mapping": [
{"name": "event_id", "column_name": "event_id", "column_type": "UUID"},
{"name": "user_id", "column_name": "user_id", "column_type": "UUID"},
{"name": "created_at", "column_name": "created_at", "column_type": "DateTime"},
{"name": "name", "column_name": "name", "column_type": "String"},
{"name": "email", "column_name": "email", "column_type": "String"},
],
},
}

# Create a pipeline
pipeline = client.create_pipeline(pipeline_config)
```


## Get pipeline
You can also load configurations from YAML or JSON files:

```python
# Get a pipeline by ID
pipeline = client.get_pipeline("my-pipeline-id")
pipeline = client.create_pipeline(
pipeline_config_yaml_path="pipeline.yaml"
)
# or
pipeline = client.create_pipeline(
pipeline_config_json_path="pipeline.json"
)
```

### List pipelines
For full configuration reference — including Schema Registry, joins, OTLP sources, and resource controls — see the [GlassFlow docs](https://docs.glassflow.dev/configuration/pipeline-json-reference).

```python
pipelines = client.list_pipelines()
for pipeline in pipelines:
print(f"Pipeline ID: {pipeline['pipeline_id']}")
print(f"Name: {pipeline['name']}")
print(f"Transformation Type: {pipeline['transformation_type']}")
print(f"Created At: {pipeline['created_at']}")
print(f"State: {pipeline['state']}")
```

### Stop / Terminate / Resume Pipeline
### Get pipeline

```python
pipeline = client.get_pipeline("my-pipeline-id")
pipeline.stop()
print(pipeline.status)
```

```
STOPPING
```
### List pipelines

```python
# Stop a pipeline ungracefully (terminate)
client.stop_pipeline("my-pipeline-id", terminate=True)
print(pipeline.status)
pipelines = client.list_pipelines()
for pipeline in pipelines:
print(f"Pipeline ID: {pipeline['pipeline_id']}, State: {pipeline['state']}")
```

```
TERMINATING
```
### Stop / Terminate / Resume pipeline

```python
pipeline = client.get_pipeline("my-pipeline-id")
pipeline.resume()
print(pipeline.status)
```

```
RESUMING
pipeline.stop() # graceful stop → STOPPING
client.stop_pipeline("my-pipeline-id", terminate=True) # ungraceful → TERMINATING
pipeline.resume() # restart → RESUMING
```

### Delete pipeline

Only stopped or terminated pipelines can be deleted.

```python
# Delete a pipeline
client.delete_pipeline("my-pipeline-id")

# Or delete via pipeline instance
# or
pipeline.delete()
```

## Pipeline Configuration
## Migrating from V2 to V3

Pipeline version `v2` has been removed. Use `Client.migrate_pipeline_v2_to_v3()` to convert an existing configuration automatically:

For detailed information about the pipeline configuration, see [GlassFlow docs](https://docs.glassflow.dev/configuration/pipeline-json-reference).
```python
from glassflow.etl import Client

client = Client(host="your-glassflow-etl-url")
v2_config = ... # your existing v2 pipeline config dict
v3_config = client.migrate_pipeline_v2_to_v3(v2_config)
pipeline = client.create_pipeline(v3_config)
```

If you prefer to migrate manually, the key changes are:

| Area | V2 | V3 |
|------|----|----|
| `version` | `"v2"` | `"v3"` |
| Sources | `source: {type, connection_params, topics: [...]}` | `sources: [{type, source_id, connection_params, topic, ...}]` flat list |
| Schema | top-level `schema.fields` block | `sources[].schema_fields` per source |
| Deduplication | per-topic `deduplication: {enabled, id_field, ...}` | `transforms: [{type: "dedup", source_id, config: {key, time_window}}]` |
| Filter | top-level `filter: {enabled, expression}` | `transforms: [{type: "filter", source_id, config: {expression}}]` |
| Transformation | top-level `stateless_transformation` | `transforms: [{type: "stateless", source_id, config: {transforms: [...]}}]` |
| Join | `join.sources: [{source_id, key, orientation}]` | `join: {left_source: {...}, right_source: {...}, output_fields: [...]}` |
| Sink connection | flat fields (`host`, `port`, ...) at top level | nested `sink.connection_params` object |
| Sink field mapping | top-level `schema.fields` with `source_id` | `sink.mapping` list of `{name, column_name, column_type}` |
| Resources | `pipeline_resources: {ingestor, transform, ...}` | `resources: {sources: [...], transform: [...], ...}` |
| Sink password | base64-encoded | plain text |

## Tracking

The SDK includes anonymous usage tracking to help improve the product. Tracking is enabled by default but can be disabled in two ways:
The SDK includes anonymous usage stats collection to help improve the product. It collects non-identifying information such as SDK version, Python version, and feature flags (e.g., whether joins or deduplication are enabled). No personally identifiable information is collected.

Usage states collection is enabled by default. To disable it:

1. Using an environment variable:
```bash
export GF_TRACKING_ENABLED=false
export GF_USAGESTATS_ENABLED=false
```

2. Programmatically using the `disable_tracking` method:
```python
from glassflow.etl import Client

client = Client(host="my-glassflow-host")
client.disable_tracking()
client.disable_usagestats()
```

The tracking collects anonymous information about:
- SDK version
- Platform (operating system)
- Python version
- Pipeline ID
- Whether joins or deduplication are enabled
- Kafka security protocol, auth mechanism used and whether authentication is disabled
- Errors during pipeline creation and deletion

## Development

### Setup
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.8.0
4.0.0
Loading
Loading