Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/external-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ You can create **External Streams** in Timeplus to query data in the external sy

You can run streaming analytics with the external streams in the similar way as other streams.

Timeplus supports 5 types of external streams:
Timeplus supports 6 types of external streams:
* [Kafka External Stream](/kafka-source)
* [Pulsar External Stream](/pulsar-source)
* [NATS JetStream External Stream](/nats-jetstream-source)
* [Python External Stream Source](/python-external-stream-source) and [Sink](/python-external-stream-sink), only available in Timeplus Enterprise
* [Timeplus External Stream](/timeplus-source), only available in Timeplus Enterprise
* [Log External Stream](/log-stream) (experimental)

Expand Down
10 changes: 10 additions & 0 deletions docs/python-external-stream-sink.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
id: python-external-stream-sink
title: Python Sink
---

import ExternalPythonBasics from './shared/python-external-stream.md';
import ExternalPythonWrite from './shared/python-external-stream-write.md';

<ExternalPythonBasics />
<ExternalPythonWrite />
10 changes: 10 additions & 0 deletions docs/python-external-stream-source.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
id: python-external-stream-source
title: Python Source
---

import ExternalPythonBasics from './shared/python-external-stream.md';
import ExternalPythonRead from './shared/python-external-stream-read.md';

<ExternalPythonBasics />
<ExternalPythonRead />
112 changes: 112 additions & 0 deletions docs/shared/python-external-stream-read.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
## Read Data from a Python External Stream

The read function is the entry point Timeplus calls to pull rows from your Python code. It is **synchronous** (no `async`/`await`) and receives **no implicit arguments** — any configuration must arrive through the init function. Each value it produces is a row whose columns match the stream's schema in declared order; for a single-column stream you may yield bare scalars.

### Streaming source (generator)

Yield a row or a batch of rows at a time. The query stays alive as long as the generator does, which makes generators the right shape for clocks, polling loops, websocket feeds, message-bus consumers, and other long-lived sources.

```sql
CREATE EXTERNAL STREAM py_clock (tick uint32, label string)
AS $$
import time

def py_clock():
n = 0
while True:
yield (n, "tick")
n += 1
time.sleep(1)
$$
SETTINGS
type = 'python',
mode = 'streaming';
```

`read_function_name` is omitted, so it defaults to the stream name `py_clock`. Setting `mode = 'streaming'` makes the engine reject a non-generator return value, which catches mistakes like returning a list early.

### Batch source (list)

Return a list of rows once. Use this shape for one-shot pulls — REST snapshots, file scans, or any source where a single call yields the full result.

```sql
CREATE EXTERNAL STREAM py_users (id int32, name string)
AS $$
import json
import urllib.request

def py_users():
with urllib.request.urlopen("https://api.example.com/users") as r:
payload = json.load(r)
return [(u["id"], u["name"]) for u in payload]
$$
SETTINGS
type = 'python',
mode = 'batch';
```

### Long-lived setup with init / deinit

Open a client once, stash it on `builtins`, and tear it down at the end of the query. Init parameters arrive as a single string, so JSON is convenient when you have more than one value to pass.

```sql
CREATE EXTERNAL STREAM py_cookie_counter
(
previous_cleanup_count int32,
secret_flavor string
)
AS $$
import builtins, json

def open_bakery(config):
builtins._tp_cookie_secret_flavor = json.loads(config)["flavor"]

def close_bakery():
if hasattr(builtins, "_tp_cookie_secret_flavor"):
del builtins._tp_cookie_secret_flavor

def serve_cookie_report():
return [(0, getattr(builtins, "_tp_cookie_secret_flavor", ""))]
$$
SETTINGS
type = 'python',
read_function_name = 'serve_cookie_report',
init_function_name = 'open_bakery',
init_function_parameters = '{"flavor":"double-chocolate"}',
deinit_function_name = 'close_bakery';
```

Remember that init and deinit run **per query**, not once per stream creation — the `builtins` state above is set up and torn down each time a query reads from `py_cookie_counter`. Use stream-specific `builtins` attribute names and delete them in deinit so later Python sessions do not see stale state.

### Calling back to `timeplusd`

The injected `__timeplus_local_api_user` and `__timeplus_local_api_password` globals let the read function authenticate to the same server without hard-coded credentials. The example below queries an internal stream over the REST interface and turns the result into a row.

```sql
CREATE EXTERNAL STREAM py_user_count (total int64)
AS $$
import base64, urllib.request

def py_user_count():
creds = base64.b64encode(
f"{__timeplus_local_api_user}:{__timeplus_local_api_password}".encode()
).decode()
req = urllib.request.Request(
"http://localhost:8123/?query=SELECT+count()+FROM+table(users)",
headers={"Authorization": f"Basic {creds}"},
)
with urllib.request.urlopen(req) as r:
return [(int(r.read().strip()),)]
$$
SETTINGS
type = 'python',
mode = 'batch';
```

Treat `__timeplus_local_api_password` as a secret — do not log it, do not echo it back into output rows, and do not pass it into subprocesses.

### Cancellation and errors

When a query is cancelled (for example by `KILL QUERY` or by closing the client), the running Python code receives a `KeyboardInterrupt`. Streaming generators stop at the next yield point; long-blocking calls inside C extensions may delay the interrupt until they return.

If the read function raises, the query fails and the Python traceback is included in the error response — wrap recoverable errors inside the function and decide explicitly whether to re-raise or continue.
87 changes: 87 additions & 0 deletions docs/shared/python-external-stream-write.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
## Write Data to a Python External Stream

The write function is invoked once per chunk, not once per row. Its arguments are **column-oriented**: one Python list per output column, in declared order, all of equal length. Iterate with `zip` to recover row tuples.

Column values follow the same Python type mapping as [Python UDF](/py-udf#data-type-mapping). One detail worth highlighting for sinks: a `string` (or `fixed_string`) column arrives as Python `bytes`, not `str`. Decode with `.decode()` (UTF-8) before passing values into APIs that require text.

### Sink basics

```sql
CREATE EXTERNAL STREAM py_metric_sink (host string, value float32)
AS $$
def py_metric_sink(host, value):
for h, v in zip(host, value):
print(f"{h.decode()}={v}")
$$
SETTINGS type = 'python';
```

Insert a few rows:

```sql
INSERT INTO py_metric_sink (host, value) VALUES ('a', 1.0), ('b', 2.0);
```

Behind the scenes Timeplus calls `py_metric_sink([b'a', b'b'], [1.0, 2.0])` — one call carrying both rows, with the `string` column delivered as `bytes`. A larger INSERT or a downstream query that delivers many chunks results in one call per chunk.

If `write_function_name` is omitted Timeplus uses `read_function_name` (which itself defaults to the stream name), so the Python function above only needs to be named once.

### Materialized view → external stream

Routing a continuous query into a sink is the most common production pattern. Define the sink once, then point a materialized view at it:

```sql
CREATE EXTERNAL STREAM py_alert_sink (host string, value float32)
AS $$
def py_alert_sink(host, value):
for h, v in zip(host, value):
notify(h.decode(), v) # your notifier
$$
SETTINGS type = 'python';

CREATE MATERIALIZED VIEW high_value_alerts INTO py_alert_sink AS
SELECT host, value FROM metrics WHERE value > 100;
```

The materialized view feeds chunks into the sink as they are produced; each chunk becomes one call to `py_alert_sink`.

### Custom protocol example: webhook POST

Load the destination URL in init, reuse that configuration for every chunk, and clear it in deinit. Init parameters carry the URL so the Python body is reusable across environments. To pool an actual HTTP connection, swap `urllib` for a session-aware client (for example `requests.Session()`) and stash the session itself on `builtins`.

```sql
CREATE EXTERNAL STREAM py_webhook (event_id string, body string)
AS $$
import builtins, json, urllib.request

def open_client(config):
builtins._tp_webhook = json.loads(config)["url"]

def close_client():
if hasattr(builtins, "_tp_webhook"):
del builtins._tp_webhook

def post_event(event_id, body):
for eid, b in zip(event_id, body):
payload = {"id": eid.decode(), "body": b.decode()}
req = urllib.request.Request(
builtins._tp_webhook,
data=json.dumps(payload).encode(),
headers={"Content-Type": "application/json"},
method="POST",
)
urllib.request.urlopen(req).read()
$$
SETTINGS
type = 'python',
init_function_name = 'open_client',
init_function_parameters = '{"url":"https://hooks.example.com/notify"}',
deinit_function_name = 'close_client',
write_function_name = 'post_event';
```

Replace `urllib` with any HTTP, S3, queue, or proprietary client your environment ships with. Manage Python dependencies through the [Python UDF](/py-udf) library configuration — the same runtime backs both features.

### Failure behavior

If the write function raises, the INSERT fails and the Python traceback is included in the error response. Side effects already performed by your Python code (HTTP requests sent, files written, queue messages published) are **not** rolled back by Timeplus — design idempotent writes, or batch your side effect inside a single transactional call your downstream system controls.
75 changes: 75 additions & 0 deletions docs/shared/python-external-stream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
## Overview

Python External Stream lets you read from and write to arbitrary sources by embedding a Python body directly in the DDL. It is available in **Timeplus Enterprise 3.2.2+**.

Unlike the Kafka, Pulsar, and NATS JetStream external streams — which speak a specific wire protocol — a Python External Stream is a generic escape hatch: you bring the protocol, the client library, and the logic. Timeplus calls your functions inside the embedded CPython runtime. When reading, return values become row batches; when writing, the sink function receives column batches. The same DDL object can serve as both a source (via `read_function_name`) and a sink (via `write_function_name`).

## Create a Python External Stream

```sql
CREATE EXTERNAL STREAM [IF NOT EXISTS] stream_name (<col_name1> <col_type>)
AS $$
def read_fn():
...

def write_fn(col1, ...):
...

def init_fn(config): # optional
...

def deinit_fn(): # optional
...
$$
SETTINGS
type = 'python', -- required
read_function_name = '..', -- defaults to the stream name
write_function_name = '..', -- defaults to read_function_name
init_function_name = '..',
init_function_parameters = '..', -- requires init_function_name
deinit_function_name = '..',
mode = 'auto' -- 'auto' (default), 'streaming', or 'batch'
```

### Settings

* **type**: must be `'python'`. Required.
* **read_function_name**: name of the Python function used when the stream is read from. Defaults to the stream name.
* **write_function_name**: name of the Python function used when the stream is written to (sink). Defaults to `read_function_name`.
* **init_function_name**: name of a Python function called once before read/write processing begins. Use it to open connections, warm caches, or prepare state for the entry function to consume.
* **init_function_parameters**: a single string passed as the only argument to the init function. Any format works (JSON, `key=value`, or a plain string) — parsing is up to your Python code. Requires `init_function_name`; otherwise the stream fails to create with `Setting 'init_function_parameters' requires 'init_function_name' to be configured`.
* **deinit_function_name**: name of a Python function called once after read/write processing completes, for cleanup.
* **mode**: Python execution mode — `'auto'` (default), `'streaming'`, or `'batch'`. See [Modes](#modes).

## Modes

The `mode` setting controls how Timeplus interprets your read function's return value:

* **`auto`** (default) — Timeplus inspects the return value at runtime. A generator drives a streaming read; a list is consumed as a single batch.
* **`streaming`** — the read function must return a generator. Timeplus pulls rows as they are yielded and the query stays alive until the generator stops. Returning a list from a streaming-mode function fails the query.
* **`batch`** — the read function must return a list. Timeplus consumes the list once and the query finishes. Returning a generator from a batch-mode function fails the query.

Set `mode` explicitly when you want the engine to enforce the expected shape. Leave it as `auto` when you want flexibility.

## Lifecycle

Each query that reads from or writes to a Python External Stream creates its own Python module. The lifecycle for one query is:

1. The DDL body is compiled into a fresh module.
2. Local API credential globals are injected into the module (see [Local API credentials](#local-api-credentials)).
3. If `init_function_name` is set, the init function is called once. When `init_function_parameters` is non-empty, it is passed as the only argument; otherwise init receives no arguments.
4. The read or write entry function is called as data flows.
5. When the query ends — normally or via cancellation — `deinit_function_name`, if set, is called.

Each query gets its own module, so ordinary module globals created by the DDL body are not reused across queries. If you stash state on Python's `builtins` module, use a stream-specific attribute name and remove it in deinit; `builtins` is shared by the embedded interpreter, so leftover attributes can be visible to later Python sessions in the same server process. Treat clients or caches opened in init as per-query resources and close them in deinit.

If the init function raises, the query fails before any read or write happens, and `deinit_function_name` is **not** called. If init opens more than one external resource, clean up already-opened resources before re-raising.

## Local API credentials {#local-api-credentials}

When the local API user is enabled on the server, Timeplus injects two module-level globals into every Python External Stream module so your code can authenticate back to the same `timeplusd` over the native TCP protocol or the REST HTTP interface without hard-coding credentials:

* `__timeplus_local_api_user` — the ephemeral local API username.
* `__timeplus_local_api_password` — the matching token. Treat this as a secret; do not log it.

Both globals are available as bare names inside the Python body — no `os.environ` lookup needed. They are regenerated on every server restart and never written to disk.
22 changes: 22 additions & 0 deletions docs/sql-create-external-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,28 @@ Please check the [Pulsar External Stream](/pulsar-source) for more details.

Please check the [NATS JetStream External Stream](/nats-jetstream-source) for more details.

## Python External Stream

```sql
CREATE EXTERNAL STREAM [IF NOT EXISTS] stream_name (<col_name1> <col_type>)
AS $$
def read_fn():
...
$$
SETTINGS
type = 'python', -- required
read_function_name = '..',
write_function_name = '..',
init_function_name = '..',
init_function_parameters = '..',
deinit_function_name = '..',
mode = 'auto' -- 'auto' (default), 'streaming', or 'batch'
```

Available in **Timeplus Enterprise 3.2.2+**.

Please check the [Python External Stream Source](/python-external-stream-source) for read-side settings, generator/batch sources, and lifecycle hooks, and the [Python External Stream Sink](/python-external-stream-sink) for write-side semantics, materialized-view sinks, and custom-protocol examples.

## Timeplus External Stream
```sql
CREATE EXTERNAL STREAM [IF NOT EXISTS] stream_name (<col_name1> <col_type>)
Expand Down
4 changes: 4 additions & 0 deletions docusaurus.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ const config = {
from: '/tiered-storage',
to: '/append-stream-tiered-storage',
},
{
from: '/python-external-stream',
to: '/python-external-stream-source',
},
],
},
],
Expand Down
10 changes: 10 additions & 0 deletions sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ const sidebars = {
label: "PostgreSQL",
id: "pg-external-table",
},
{
type: "doc",
label: "Python",
id: "python-external-stream-source",
},
{
type: "doc",
label: "S3",
Expand Down Expand Up @@ -545,6 +550,11 @@ const sidebars = {
label: "NATS JetStream",
id: "nats-jetstream-sink",
},
{
type: "doc",
label: "Python",
id: "python-external-stream-sink",
},
{
type: "doc",
label: "S3",
Expand Down
Loading