Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions zh_CN/streaming/connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,20 @@ Kafka connector 适合持续消费 topic 中的结构化事件流。

- 文档入口:[Kafka Connector](./kafka.md)
- format:`json`、`csv`
- metadata:`topic`、`partition`、`offset`
- metadata:`topic`、`partition`、`offset`,以及所有 source 都支持的通用 metadata `source_name`。其中 `source_name` 是 source 的名称,可以在 create source 时通过 `METADATA FROM 'source_name'` 引用

### MQTT

MQTT connector 适合订阅设备、网关或边缘服务上报的主题消息。

- 文档入口:[MQTT Connector](./mqtt.md)
- format:`json`、`csv`
- metadata:`topic`
- metadata:`topic`,以及通用 metadata `source_name`

### HTTP

HTTP connector 适合按固定周期轮询第三方 API 或内部 HTTP 服务。

- 文档入口:[HTTP Connector](./http.md)
- format:`json`、`csv`、`parquet`
- metadata:通用 metadata `source_name`
10 changes: 2 additions & 8 deletions zh_CN/streaming/format.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: "Datalayers 流计算 Formats"
description: "介绍 Datalayers 流计算支持的消息格式、配置项、示例以及 JSONCSV 的选型建议。"
description: "介绍 Datalayers 流计算支持的消息格式、配置项、示例以及 JSONCSV、Parquet 的选型建议。"
---

# 流计算 Formats
Expand Down Expand Up @@ -37,6 +37,7 @@ Format 用于将 connector 读取的消息解析为 source 的列结构。
- 适合结构化或半结构化事件
- 常用于 Kafka、MQTT 和 HTTP 返回的结构化消息
- 字段名通常可直接映射到 source 列名
- 默认使用非 strict 模式解码,schema 之外的额外字段会被忽略

### JSON 示例数据

Expand All @@ -45,12 +46,6 @@ Format 用于将 connector 读取的消息解析为 source 的列结构。
{"ts":"2025-01-01T00:00:02Z","sid":"sid-2","value":2.0}
```

### JSON 配置项

| 配置项 | 类型 | 默认值 | 必选 | 说明 |
| --- | --- | --- | --- | --- |
| `unstructured` | BOOL | `false` | No | 是否允许更宽松的 JSON 解码,默认按 schema 严格解析 |

### JSON 配置示例

```sql
Expand All @@ -63,7 +58,6 @@ CREATE SOURCE src_json (
brokers='127.0.0.1:9092',
topic='topic_json_demo',
format='json',
unstructured='false',
bad_data='fail'
);
```
Expand Down
18 changes: 14 additions & 4 deletions zh_CN/streaming/http.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ HTTP connector 通过单次或持续轮询 HTTP endpoint,将返回内容作为
| `endpoint` | STRING | 无 | Yes | 轮询地址,支持 `${...}` 时间变量 |
| `method` | STRING | `GET` | No | HTTP 方法,支持 `GET` 和 `POST` |
| `poll` | STRING | `once` | No | 轮询模式,支持 `once` 或 `interval(<duration>)` |
| `connect_timeout` | STRING | `10s` | No | 建立连接的超时时间 |
| `timeout` | STRING | `3s` | No | 请求超时时间 |
| `connect_timeout` | STRING | `3s` | No | 建立连接的超时时间 |
| `read_timeout` | STRING | `10s` | No | 读取响应 body 的超时时间 |
| `timeout` | STRING | `60s` | No | 单次请求的整体超时时间 |
| `max_body_size` | STRING | `512MiB` | No | 单次 HTTP 响应 body 的最大大小,采用 size 格式,例如 `512MB`、`512MiB` |
| `headers` | STRING | 无 | No | 请求头,格式为 `k1:v1;k2:v2` |
| `jwt_token` | STRING | 无 | No | Bearer token,会以 `Authorization: Bearer <token>` 方式发送 |
| `ca` | STRING | 无 | No | TLS CA 证书 |
Expand Down Expand Up @@ -80,10 +82,16 @@ endpoint='http://127.0.0.1:18080/export_${now_compact}.csv'
## 配置约束

- `headers` 中每一项都必须是 `key:value` 形式
- `connect_timeout` 和 `timeout` 必须大于 `0`
- `connect_timeout`、`read_timeout`、`timeout` 必须大于 `0`
- `max_body_size` 必须大于 `0`
- 当 `endpoint` 使用 `https://` 时,才能配置 `ca`、`cert`、`key`
- `poll='once'` 时,source 成功拉取一次后结束;`poll='interval(...)'` 时会持续轮询

## 运行时行为

- `poll='once'` 模式下,HTTP 请求失败会直接导致 source 执行失败
- `poll='interval(...)'` 模式下,单次 HTTP 请求失败只会记录一条 `warn` 日志,source 不会因为这一轮请求失败而退出,下一轮会继续轮询

## 示例

### 1. 启动一个简单的本地 HTTP 服务
Expand Down Expand Up @@ -209,8 +217,10 @@ CREATE SOURCE src_http_poll (
endpoint='http://127.0.0.1:18080/poll?ts=${now_ts}',
method='GET',
poll='interval(200ms)',
connect_timeout='10s',
connect_timeout='3s',
read_timeout='10s',
timeout='5s',
max_body_size='16MiB',
format='csv'
);

Expand Down
1 change: 1 addition & 0 deletions zh_CN/streaming/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Kafka / MQTT / HTTP
CREATE SOURCE src_kafka (
ts TIMESTAMP(9),
sid STRING,
source_name STRING METADATA FROM 'source_name',
source_topic STRING METADATA FROM 'topic',
topic_tag STRING AS source_topic,
value FLOAT64
Expand Down