Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,9 @@
- title_en: HTTP
title_cn: HTTP
path: streaming/http
- title_en: Blackhole
title_cn: Blackhole
path: streaming/blackhole

- title_en: Key Value Data Model
title_cn: Key-Value 存储
Expand Down
69 changes: 69 additions & 0 deletions en_US/streaming/blackhole.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
---
title: "Blackhole Connector"
description: "Blackhole 是一个 sink 专用 connector,接收所有输入数据并直接丢弃,适用于测试、开发调试和基准压测。"
---

# Blackhole Connector

## 概述

Blackhole connector 是一个 sink 专用 connector。它接收 pipeline 输出的所有数据并直接丢弃,不写入任何外部存储或内部表。Blackhole sink 的最典型用途包括:

- **功能测试**:验证 pipeline 定义是否正确、source 数据是否能被正常读取和执行
- **基准压测**:评估数据从 source 到 pipeline 的端到端吞吐量,排除 sink 写入延迟的影响
- **开发调试**:快速调试 pipeline 的 AS SELECT 逻辑,无需事先创建 sink table

## 配置选项

Blackhole connector 不需要任何额外配置选项。只需在 `CREATE SINK` 中指定 `connector='blackhole'` 即可。同时 blackhole connector 不支持配置 format。

## 使用示例

### 创建 Blackhole Sink

```sql
CREATE SINK bh WITH (connector='blackhole');
```

### 在 Pipeline 中使用 Blackhole Sink

```sql
CREATE SOURCE s1 (
ts TIMESTAMP(9),
sid STRING,
value FLOAT64
) WITH (
connector='http',
endpoint='http://127.0.0.1:8080/data',
poll='once',
format='json'
);

CREATE SINK bh WITH (connector='blackhole');

CREATE PIPELINE p1
SINK TO bh
AS
SELECT ts, sid, value
FROM s1
WHERE value >= 2.0;
```

### 管理 Sink

```sql
-- 查看所有 sink
SHOW SINKS;

-- 查看 sink 的定义 SQL
SHOW CREATE SINK bh;

-- 删除 sink
DROP SINK bh;
```

## 注意事项

- Blackhole 是 sink 专用 connector,不能作为 source 使用
- 所有写入 blackhole 的数据都会被丢弃,无法恢复
- 删除被 pipeline 引用的 sink 前,需先删除对应的 pipeline
23 changes: 23 additions & 0 deletions zh_CN/sql-reference/statements/create.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,29 @@ WHERE value >= 2.0;
- pipeline 输出列需要与 sink table 列按名称和类型兼容,必要时系统会尝试插入 cast
- 当前用户需要具备 source 的 `SELECT` 权限以及 sink table 的 `INSERT` 权限

### 创建 Sink

`CREATE SINK` 用于注册一个外部的流式输出对象,对接到指定的 connector。与内部的 sink table 类似,外部的 sink 可以直接被 create pipeline 引用,作为输出目标。

```sql
CREATE SINK [IF NOT EXISTS] [database.]sink_name
WITH (
connector='<connector_type>',
...
)
```

示例:

```sql
CREATE SINK bh WITH (connector='blackhole');
```

说明:

- `connector` 选项是必填的,决定数据写入的目标类型
- 当前仅支持 `blackhole` connector,用于丢弃所有写入数据(通常用于性能压测)

### 建表时声明索引(INVERTED / VECTOR)

除了使用 `CREATE INDEX` 在建表后创建索引,Datalayers 也支持在 `CREATE TABLE` 的表约束中直接声明索引。
Expand Down
13 changes: 13 additions & 0 deletions zh_CN/sql-reference/statements/drop.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,22 @@ DROP PIPELINE [IF EXISTS] [db.]pipeline_name

说明

- `IF EXISTS` 可用于忽略不存在对象的报错。
- 只有处于 `Stopped` 或 `Failed` 状态的 pipeline 才允许删除。
- 如果 pipeline 仍在运行,请先执行 `ALTER PIPELINE ... STOP`,再执行 `DROP PIPELINE`。

### DROP SINK

删除指定 sink。

```SQL
DROP SINK [IF EXISTS] [db.]sink_name
```

说明

- `IF EXISTS` 可用于忽略不存在对象的报错。
- 如果 sink 仍被某个 pipeline 引用,删除会失败。

### DROP NODE

Expand Down
33 changes: 33 additions & 0 deletions zh_CN/sql-reference/statements/show.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,39 @@ SHOW CREATE SOURCE source_name
SHOW CREATE PIPELINE pipeline_name
```

## SHOW SINKS

查看当前数据库下的所有 sink。

```SQL
SHOW SINKS
```

返回以下列:

| 列名 | 含义 |
| --- | --- |
| `sink` | sink 名称 |
| `version` | sink 元信息的 version |
| `created_time` | sink 创建时间,ISO 8601 字符串 |
| `updated_time` | sink 最近一次更新时间,ISO 8601 字符串 |

说明:

- 当前用户需要具备当前数据库的 `SELECT` 权限

## SHOW CREATE SINK

展示指定 sink 的 create sink SQL。

```sql
SHOW CREATE SINK sink_name
```

说明:

- 输出的是规范化后的 `CREATE SINK` 语句,而不是用户的原始输入

## SHOW LICENSE

获取系统的 License 信息
Expand Down
69 changes: 69 additions & 0 deletions zh_CN/streaming/blackhole.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
---
title: "Blackhole Connector"
description: "Blackhole 是一个 sink 专用 connector,接收所有输入数据并直接丢弃,适用于测试、开发调试和基准压测。"
---

# Blackhole Connector

## 概述

Blackhole connector 是一个 sink 专用 connector。它接收 pipeline 输出的所有数据并直接丢弃,不写入任何外部存储或内部表。Blackhole sink 的最典型用途包括:

- **功能测试**:验证 pipeline 定义是否正确、source 数据是否能被正常读取和执行
- **基准压测**:评估数据从 source 到 pipeline 的端到端吞吐量,排除 sink 写入延迟的影响
- **开发调试**:快速调试 pipeline 的 AS SELECT 逻辑,无需事先创建 sink table

## 配置选项

Blackhole connector 不需要任何额外配置选项。只需在 `CREATE SINK` 中指定 `connector='blackhole'` 即可。同时 blackhole connector 不支持配置 format。

## 使用示例

### 创建 Blackhole Sink

```sql
CREATE SINK bh WITH (connector='blackhole');
```

### 在 Pipeline 中使用 Blackhole Sink

```sql
CREATE SOURCE s1 (
ts TIMESTAMP(9),
sid STRING,
value FLOAT64
) WITH (
connector='http',
endpoint='http://127.0.0.1:8080/data',
poll='once',
format='json'
);

CREATE SINK bh WITH (connector='blackhole');

CREATE PIPELINE p1
SINK TO bh
AS
SELECT ts, sid, value
FROM s1
WHERE value >= 2.0;
```

### 管理 Sink

```sql
-- 查看所有 sink
SHOW SINKS;

-- 查看 sink 的定义 SQL
SHOW CREATE SINK bh;

-- 删除 sink
DROP SINK bh;
```

## 注意事项

- Blackhole 是 sink 专用 connector,不能作为 source 使用
- 所有写入 blackhole 的数据都会被丢弃,无法恢复
- 删除被 pipeline 引用的 sink 前,需先删除对应的 pipeline
16 changes: 12 additions & 4 deletions zh_CN/streaming/connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ description: "介绍 Datalayers 流计算支持的 Connector 类型、适用场

## 什么是 Connector

Connector 决定 source 如何从外部系统读取数据,负责建立连接、拉取或订阅消息,并将原始消息交给 format 解码。
Connector 决定 source 如何从外部系统读取数据,或 sink 如何将数据写入外部系统,负责建立连接、拉取/推送消息,并将原始消息交给 format 解码。

当前版本中:

- connector 只用于 source 侧
- 目前只支持 Datalayers 内部时序表作为 sink,暂不支持外部系统作为 sink
- 当前没有独立的 sink connector,也不支持 `CREATE SINK`
- source 和 sink 都支持 connector
- source connector 负责从外部系统读取数据
- sink connector 负责将数据写入外部目标(例如 blackhole)

## 支持的 Connector 类型

Expand All @@ -22,6 +22,7 @@ Connector 决定 source 如何从外部系统读取数据,负责建立连接
| Kafka | Yes | No | 消息队列事件流接入 |
| MQTT | Yes | No | IoT / 边缘设备消息接入 |
| HTTP | Yes | No | 单次或周期轮询 HTTP 接口 |
| Blackhole | No | Yes | 丢弃所有写入数据,用于测试或基准压测 |

### Kafka

Expand All @@ -46,3 +47,10 @@ HTTP connector 适合按固定周期轮询第三方 API 或内部 HTTP 服务。
- 文档入口:[HTTP Connector](./http.md)
- format:`json`、`csv`、`parquet`
- metadata:通用 metadata `source_name`

### Blackhole

Blackhole connector 是一个 sink 专用 connector,它会接收所有输入数据并直接丢弃,不写入任何存储。常用于测试、开发调试以及流计算框架的基准压测场景。

- 文档入口:[Blackhole Connector](./blackhole.md)
- format:不支持配置 format,因为它不需要对输入数据进行解码
4 changes: 2 additions & 2 deletions zh_CN/streaming/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ Kafka connector 用于将 Kafka topic 中的消息持续读入 Datalayers source
| `brokers` | STRING | 无 | Yes | Kafka broker 列表,逗号分隔,格式为 `host:port` |
| `topic` | STRING | 无 | Yes | 要消费的 topic |
| `offset` | STRING | `latest` | No | 起始消费位置,支持 `earliest`、`latest` |
| `group.id` | STRING | `datalayers-<job_id>-group` | No | Consumer group ID,用于提交和恢复消费进度 |
| `client.id` | STRING | `datalayers-<job_id>-consumer` | No | Kafka client ID |
| `group.id` | STRING | `datalayers-<pipeline_id>-group` | No | Consumer group ID,用于提交和恢复消费进度 |
| `client.id` | STRING | `datalayers-<pipeline_id>-<task_index>-consumer` | No | Kafka client ID |
| `security.protocol` | STRING | 无 | No | 安全协议,支持 `PLAINTEXT`、`SSL`、`SASL_PLAINTEXT`、`SASL_SSL` |
| `sasl.mechanism` | STRING | 无 | No | SASL 机制,支持 `PLAIN`、`SCRAM-SHA-256`、`SCRAM-SHA-512` |
| `sasl.username` | STRING | 无 | No | SASL 用户名 |
Expand Down
29 changes: 22 additions & 7 deletions zh_CN/streaming/model.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ description: "介绍 Datalayers 流计算的核心对象及其关系。"

## 总体模型

Datalayers 的流计算采用 Dataflow 风格的处理模型。数据从外部系统持续进入 source,经由 pipeline 处理后写入内部 sink table。
Datalayers 的流计算采用 Dataflow 风格的处理模型。数据从外部系统持续进入 source,经由 pipeline 处理后写入 sink table 或外部 sink

```text
external system -> source -> pipeline -> sink table
external system -> source -> pipeline -> sink table / external sink
```

这一模型强调两点:
Expand Down Expand Up @@ -56,7 +56,7 @@ CREATE SOURCE src_mqtt (

## Pipeline

`PIPELINE` 是持续运行的实时任务定义,绑定一个 source、一个 sink table 和一条 `AS SELECT ...` 查询。
`PIPELINE` 是持续运行的实时任务定义,绑定一个 source、一个 sink 和一条 `AS SELECT ...` 查询。

```sql
CREATE PIPELINE p_mqtt
Expand All @@ -71,7 +71,7 @@ pipeline 的职责是:

- 从 source 中持续读取事件
- 执行轻量级实时处理
- 将结果写入 sink table
- 将结果写入指定的 sink

当前版本对于 pipeline 有如下限制:

Expand All @@ -80,16 +80,31 @@ pipeline 的职责是:
- 只支持投影和过滤
- 不支持 join、聚合、窗口、排序、limit、union、子查询等复杂算子

## Sink Table
## Sink

sink 不是独立对象,而是 Datalayers 中已存在的一张内部表。当前版本要求:
`SINK` 描述流计算的输出目标,可以为一个内部表(sink table)或一个外部 connector(external sink)。内部表通过 `CREATE TABLE` 创建,外部 connector 则通过 `CREATE SINK` 定义。它们均可用于 pipeline 的输出。

### Sink Table

Pipeline 可以将结果写入一张 Datalayers 内部表。当前版本要求:

- sink table 必须事先创建
- sink table 必须使用 `TimeSeries` 引擎
- pipeline 输出列名和类型必须与 sink table 兼容;当类型可转换时,系统会自动补充 cast
- sink table 中非空且没有默认值的列,必须出现在 pipeline 输出里

这意味着在设计 sink table 时,应先确定 pipeline 输出 schema,再创建表结构。
### External Sink

```sql
CREATE SINK bh WITH (connector='blackhole');
```

示例中创建了一个 `blackhole` sink,它会丢弃所有输入数据。当前支持的 sink connector 类型请参考 [流计算 Connectors](./connectors.md)。

需要注意:

- connector 是必填项
- 当前仅支持 `blackhole` connector

## Pipeline 生命周期与状态

Expand Down
2 changes: 1 addition & 1 deletion zh_CN/streaming/mqtt.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ MQTT connector 用于订阅 MQTT broker 中的主题消息,并持续读入 Dat
| `broker` | STRING | 无 | Yes | MQTT broker 地址,格式为 `[scheme://]host:port`;未写 scheme 时默认 `tcp` |
| `topic` | STRING | 无 | Yes | 要订阅的 topic |
| `qos` | STRING | `0` | No | 服务质量等级,支持 `0`、`1`、`2` |
| `client_id` | STRING | `datalayers-<job_id>-consumer` | No | MQTT client ID |
| `client_id` | STRING | `datalayers-<pipeline_id>-<task_index>-consumer` | No | MQTT client ID |
| `keep_alive` | STRING | `60s` | No | keep alive 时间,采用 duration 格式,例如 `60s` |
| `connect_timeout` | STRING | `10s` | No | 建立连接的超时时间 |
| `version` | STRING | 无 | No | MQTT 协议版本,支持 `3.1.1`、`5.0` |
Expand Down
Loading