Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions java/tools/README-zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,16 @@ mvn install -P with-java -DskipTests

- **time_column**:每个表有且仅有一个时间列,写入 TsFile 后列名固定为 `time`,类型为 `TIMESTAMP`。
- **tag_columns**:设备标识列(联合主键),可以为 0 到多个。支持通过 `DEFAULT` 关键字定义不在源文件中的虚拟列。
- **数据类型固定为 `STRING`**,不可指定其他类型。即使在 `source_columns` 中为 TAG 列声明了类型,也会被忽略;推荐在 `source_columns` 里 TAG 列只写名字,不写类型。
- **source_columns**:映射源文件中的所有列,CSV 按位置对应,Parquet / Arrow 按列名匹配。使用 `SKIP` 跳过不需要的列。
- **FIELD**(推导结果,非配置项):`source_columns` 中去掉 `time_column`、`tag_columns`、`SKIP` 后的剩余列,即为测点列,其值随时间变化。

> **列名大小写**:TsFile 表模型下列名/表名大小写不敏感,统一以小写存储。`import.schema` 中无论写 `Time` / `TIME` / `time`,落盘与读取均为 `time`。

### Schema 示例

> 同一设备内重复时间戳不支持 —— tag 列值相同且时间戳相同的行无法写入。

CSV 文件内容:
```
Region,FactoryNumber,DeviceNumber,Model,MaintenanceCycle,Time,Temperature,Emission
Expand All @@ -96,9 +101,9 @@ DeviceNumber
time_column=Time

source_columns
Region TEXT,
FactoryNumber TEXT,
DeviceNumber TEXT,
Region,
FactoryNumber,
DeviceNumber,
SKIP,
SKIP,
Time INT64,
Expand All @@ -108,7 +113,7 @@ Emission DOUBLE,

说明:
- `Group` 是虚拟标签列(不在 CSV 中),默认值为 `Datang`
- `Region`、`FactoryNumber`、`DeviceNumber` 是从 CSV 中读取的标签列
- `Region`、`FactoryNumber`、`DeviceNumber` 是从 CSV 中读取的标签列,类型固定为 `STRING`,无需声明
- `Model` 和 `MaintenanceCycle` 通过 `SKIP` 跳过
- `Temperature` 和 `Emission` 自动推导为 FIELD 列

Expand All @@ -121,6 +126,12 @@ Temperature FLOAT,
Emission DOUBLE,
```

**Parquet / Arrow schema 模式校验规则**(强制校验,不通过则报错并将源文件移至 `--fail_dir`):

- **列数必须严格一致**:`source_columns` 的条目数必须等于 Parquet / Arrow 文件的列数。不需要导入的列请使用 `SKIP`。
- **每个名称都必须存在于源文件中**:所有非 SKIP 列名和所有命名 SKIP 都必须能在文件列中找到。
- **不允许匿名 `SKIP`**:按名匹配下,单独的 `SKIP` 无法定位具体列,必须写成 `columnName SKIP`。

## 命令行参数

| 参数 | 说明 | 是否必填 | 默认值 |
Expand Down Expand Up @@ -161,11 +172,19 @@ arrow2tsfile.bat --source .\data\arrow --target .\output --fail_dir .\failed --s
不传 `--schema`,自动推断列类型并识别时间列。

**Auto 模式规则:**
- 时间列:必须严格命名为 `time` 或 `TIME`(区分大小写)
- **时间列**:必须严格命名为 `time` 或 `TIME`(区分大小写)。
- Parquet / Arrow:若源文件中存在多个 Timestamp 类型的列,**只有名为 `time` / `TIME` 的那一列被选为时间轴**;其余 Timestamp 列会作为 FIELD 列写入,类型为 `INT64`(原值保留,但 TIMESTAMP 语义丢失)。如需保留 TIMESTAMP 类型,请改用 schema 模式显式声明。
- 其余所有列自动成为 FIELD(不自动推断标签列)
- CSV 类型推断基于前 100 行采样。提升规则:INT64 和 DOUBLE 混合提升为 DOUBLE;其他任何混合对(包括 BOOLEAN 与数字类型)直接提升为 STRING。
- **CSV 类型推断** 基于每列前 100 行采样。每个非空单元格先归类到一个基础类型(BOOLEAN / INT64 / DOUBLE / STRING)。
- 一列在采样里只出现一种基础类型时,该列就用该类型。
- **当同一列里出现不同基础类型时**触发提升:INT64 + DOUBLE → DOUBLE;其他任何混合组合(包括 BOOLEAN 与任意数字类型)→ STRING。
- Parquet / Arrow 直接使用原生 schema 类型
- 默认表名:从源文件名推导(如 `sensor.csv` → 表名 `sensor`)
- **默认表名**:从源文件名推导(如 `sensor.csv` → 表名 `sensor`)。清洗规则按顺序执行:
1. 去掉文件扩展名(`.csv` / `.parquet` / `.arrow` / `.ipc` / `.feather`,无法匹配时去掉最后一个 `.` 之后的内容)。
2. 只保留 ASCII 字母(`a–z`、`A–Z`)、数字(`0–9`)、下划线(`_`)和点(`.`),其余字符全部替换为 `_`。
3. 连续的 `_` 合并为一个;去掉首尾的 `_`。
4. 若结果为空,使用按格式区分的默认名:`csv_data` / `parquet_data` / `arrow_data`。
5. 若结果以数字开头,前面补 `t_`(TsFile 表名不允许以数字开头)。
- 默认 null 识别(仅 CSV):空单元格和 `\N`

**Auto 模式示例:**
Expand Down
33 changes: 26 additions & 7 deletions java/tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,16 @@ mvn install -P with-java -DskipTests

- **time_column**: Exactly one per table. Written as `time` column with type `TIMESTAMP` in TsFile.
- **tag_columns**: Device identifiers (composite primary key), 0 or more. Supports virtual columns not present in the source file via `DEFAULT` keyword.
- **Data type is always `STRING`** and cannot be changed. Any type declared for a tag column in `source_columns` is ignored. We recommend writing tag columns in `source_columns` with the column name only (no type).
- **source_columns**: Maps every column in the source file by position (CSV) or by name (Parquet / Arrow). Use `SKIP` to ignore a column.
- **FIELD** (derived, not configured): All columns in `source_columns` that are not `time_column`, not in `tag_columns`, and not `SKIP`. These are the measurement columns whose values change over time.

> **Column name case**: TsFile table-model column and table names are case-insensitive and stored as lowercase. Regardless of whether you write `Time` / `TIME` / `time` in `import.schema`, the on-disk and read-back name is `time`.

### Schema Example

> Duplicate timestamps within the same device are not supported — rows sharing identical tag column values and the same timestamp will fail to write.

CSV file content:
```
Region,FactoryNumber,DeviceNumber,Model,MaintenanceCycle,Time,Temperature,Emission
Expand All @@ -95,9 +100,9 @@ DeviceNumber
time_column=Time

source_columns
Region TEXT,
FactoryNumber TEXT,
DeviceNumber TEXT,
Region,
FactoryNumber,
DeviceNumber,
SKIP,
SKIP,
Time INT64,
Expand All @@ -107,7 +112,7 @@ Emission DOUBLE,

In this example:
- `Group` is a virtual tag column (not in CSV) with default value `Datang`
- `Region`, `FactoryNumber`, `DeviceNumber` are tag columns read from CSV
- `Region`, `FactoryNumber`, `DeviceNumber` are tag columns read from CSV; their type is fixed as `STRING` and need not be declared
- `Model` and `MaintenanceCycle` are skipped via `SKIP`
- `Temperature` and `Emission` are automatically derived as FIELD columns

Expand All @@ -120,6 +125,12 @@ Temperature FLOAT,
Emission DOUBLE,
```

**Validation rules for Parquet / Arrow schema mode** (enforced — mismatches raise an error and the source file is moved to `--fail_dir`):

- **Column count must match exactly.** The number of entries in `source_columns` must equal the number of columns in the Parquet / Arrow file. Use `SKIP` for any file column you don't want to import.
- **Every name must exist in the source file.** Each non-SKIP column name and every named SKIP must resolve to an actual column in the file.
- **Unnamed `SKIP` is not allowed.** Because matching is by name, an unqualified `SKIP` cannot identify a column. Always use `columnName SKIP`.

## CLI Parameters

| Parameter | Description | Required | Default |
Expand Down Expand Up @@ -160,11 +171,19 @@ arrow2tsfile.bat --source .\data\arrow --target .\output --fail_dir .\failed --s
Omit `--schema` to automatically infer column types and detect the time column.

**Auto mode rules:**
- Time column: must be named exactly `time` or `TIME` (case-sensitive, strict match)
- **Time column**: must be named exactly `time` or `TIME` (case-sensitive, strict match).
- Parquet / Arrow: if the source file contains multiple Timestamp-typed columns, only the one named `time` / `TIME` is selected as the time axis. The remaining Timestamp columns become FIELD columns and are stored as `INT64` (raw value preserved, TIMESTAMP semantic dropped). To keep them as `TIMESTAMP`, switch to schema mode and declare them explicitly.
- All other columns become FIELD (no tag inference)
- CSV type inference uses a 100-row sampling window. Promotion rules: INT64 and DOUBLE promote to DOUBLE; any other mixed pair (including BOOLEAN with numeric) promotes to STRING.
- **CSV type inference** uses a 100-row sampling window per column. Each non-null cell is classified into a base type (BOOLEAN / INT64 / DOUBLE / STRING).
- If only one base type appears across the sampled rows for a column, that type is used.
- When **different base types appear in the same column**, the column is promoted: INT64 + DOUBLE → DOUBLE; any other mixed combination (including BOOLEAN with any numeric type) → STRING.
- Parquet / Arrow use native schema types directly
- Default table name: derived from source filename (e.g. `sensor.csv` → table `sensor`)
- **Default table name**: derived from the source filename (e.g. `sensor.csv` → table `sensor`). Sanitization rules applied in order:
1. Strip the file extension (`.csv` / `.parquet` / `.arrow` / `.ipc` / `.feather`, or the last `.`-suffix as a fallback).
2. Keep only ASCII letters (`a–z`, `A–Z`), digits (`0–9`), underscore (`_`), and dot (`.`). Every other character is replaced with `_`.
3. Collapse consecutive `_` into a single `_`; strip leading and trailing `_`.
4. If the result is empty, use a format-specific default: `csv_data` / `parquet_data` / `arrow_data`.
5. If the result starts with a digit, prefix `t_` (TsFile table names cannot start with a digit).
- Default null tokens (CSV only): empty cell and `\N`

**Auto mode example:**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.DateMilliVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
Expand All @@ -43,10 +46,13 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class ArrowSourceReader implements SourceReader {

Expand All @@ -61,6 +67,7 @@ public class ArrowSourceReader implements SourceReader {
private List<ArrowBlock> recordBatches;
private int currentBatchIndex;
private boolean exhausted;
private boolean schemaValidated;

private String overrideTableName;
private String overrideTimePrecision;
Expand Down Expand Up @@ -145,6 +152,7 @@ public SourceBatch readBatch() {

try {
ensureReaderOpen();
validateSchema();

if (currentBatchIndex >= recordBatches.size()) {
exhausted = true;
Expand Down Expand Up @@ -229,6 +237,49 @@ private void ensureReaderOpen() throws IOException {
}
}

private void validateSchema() {
if (schemaValidated) {
return;
}
schemaValidated = true;

List<String> fileColumnNames = new ArrayList<>();
for (Field field : arrowSchema.getFields()) {
fileColumnNames.add(field.getName());
}
Set<String> fileColumnSet = new HashSet<>(fileColumnNames);

List<ImportSchema.SourceColumn> srcCols = schema.getSourceColumns();
if (fileColumnNames.size() != srcCols.size()) {
throw new IllegalArgumentException(
"Column count mismatch: schema defines "
+ srcCols.size()
+ " columns but Arrow file has "
+ fileColumnNames.size()
+ " columns in "
+ sourceFile.getAbsolutePath());
}

for (int i = 0; i < srcCols.size(); i++) {
ImportSchema.SourceColumn col = srcCols.get(i);
if (col.isSkip() && col.getName() == null) {
throw new IllegalArgumentException(
"Unnamed SKIP is not supported for Arrow (name-based matching). "
+ "Use 'columnName SKIP' to skip a specific column at position "
+ i
+ " in "
+ sourceFile.getAbsolutePath());
}
if (!fileColumnSet.contains(col.getName())) {
throw new IllegalArgumentException(
(col.isSkip() ? "SKIP column '" : "Source column '")
+ col.getName()
+ "' not found in Arrow file: "
+ sourceFile.getAbsolutePath());
}
}
}

private List<String> getSchemaColumnNames() {
List<String> names = new ArrayList<>();
List<ImportSchema.SourceColumn> srcCols = schema.getSourceColumns();
Expand All @@ -240,7 +291,22 @@ private List<String> getSchemaColumnNames() {
}

private Object extractValue(FieldVector vec, int row) {
if (vec instanceof BigIntVector) {
// Date / Timestamp checks must come BEFORE the BigIntVector/IntVector branches: although
// they hold int/long underneath, DateDayVector / TimeStampVector do NOT extend
// IntVector / BigIntVector, so without these branches Date columns fall through to the
// generic getObject().toString() path and produce strings that don't match TSDataType.DATE.
if (vec instanceof DateDayVector) {
// Days since 1970-01-01. ValueConverter.toLocalDate handles Integer → LocalDate.
return ((DateDayVector) vec).get(row);
} else if (vec instanceof DateMilliVector) {
// Millis since 1970-01-01; collapse to date.
long millis = ((DateMilliVector) vec).get(row);
return LocalDate.ofEpochDay(Math.floorDiv(millis, 86_400_000L));
} else if (vec instanceof TimeStampVector) {
// Long in the vector's native precision; matches the precision detected by
// detectTimestampPrecision() and stored on the schema.
return ((TimeStampVector) vec).get(row);
} else if (vec instanceof BigIntVector) {
return ((BigIntVector) vec).get(row);
} else if (vec instanceof IntVector) {
return ((IntVector) vec).get(row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,57 @@ private void ensureReaderOpen() throws IOException {
}
}

private String[] splitLine(String line) {
return line.split(separator, -1);
/**
* RFC 4180-style tokenizer that handles quoted fields with embedded delimiters and escaped quotes
* ({@code ""}). Multi-line quoted records are not supported — quoted values must not contain line
* breaks, since the surrounding read loop is line-oriented.
*/
String[] splitLine(String line) {
if (line.indexOf('"') < 0) {
return line.split(separator, -1);
}
List<String> tokens = new ArrayList<>();
StringBuilder cur = new StringBuilder();
boolean inQuotes = false;
boolean fieldStart = true;
int sepLen = separator.length();
int i = 0;
while (i < line.length()) {
char c = line.charAt(i);
if (inQuotes) {
if (c == '"') {
if (i + 1 < line.length() && line.charAt(i + 1) == '"') {
cur.append('"');
i += 2;
continue;
}
inQuotes = false;
i++;
continue;
}
cur.append(c);
i++;
} else {
if (fieldStart && c == '"') {
inQuotes = true;
fieldStart = false;
i++;
continue;
}
if (line.regionMatches(i, separator, 0, sepLen)) {
tokens.add(cur.toString());
cur.setLength(0);
fieldStart = true;
i += sepLen;
continue;
}
cur.append(c);
fieldStart = false;
i++;
}
}
tokens.add(cur.toString());
return tokens.toArray(new String[0]);
}

private Object[] parseLine(String line) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.time.DateTimeException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
Expand Down Expand Up @@ -448,12 +449,56 @@ public static long convertTimestampOrDatetimeStrToLongWithDefaultZone(
}

public static long convertDatetimeStrToLong(String str, ZoneId zoneId) {
return convertDatetimeStrToLong(str, toZoneOffset(zoneId), 0, "ms");
return convertDatetimeStrToLongWithZoneId(str, zoneId, 0, "ms");
}

public static long convertDatetimeStrToLong(
String str, ZoneId zoneId, String timestampPrecision) {
return convertDatetimeStrToLong(str, toZoneOffset(zoneId), 0, timestampPrecision);
return convertDatetimeStrToLongWithZoneId(str, zoneId, 0, timestampPrecision);
}

/**
* Resolve the offset based on the actual local datetime in the string (so DST is honored),
* instead of collapsing the {@link ZoneId} to a single offset using {@code Instant.now()}.
*/
private static long convertDatetimeStrToLongWithZoneId(
String str, ZoneId zoneId, int depth, String timestampPrecision) {
if (depth >= 2) {
throw new DateTimeException(
String.format(
"Failed to convert %s to millisecond, zone id is %s, "
+ "please input like 2011-12-03T10:15:30 or 2011-12-03T10:15:30+01:00",
str, zoneId));
}
if (str.contains("Z")) {
return convertDatetimeStrToLongWithZoneId(
str.substring(0, str.indexOf('Z')) + "+00:00", zoneId, depth, timestampPrecision);
} else if (str.length() == 10) {
return convertDatetimeStrToLongWithZoneId(
str + "T00:00:00", zoneId, depth, timestampPrecision);
} else if (str.length() - str.lastIndexOf('+') != 6
&& str.length() - str.lastIndexOf('-') != 6) {
ZoneOffset offset = resolveLocalOffset(str, zoneId);
return convertDatetimeStrToLongWithZoneId(
str + offset, zoneId, depth + 1, timestampPrecision);
} else if (str.contains("[") || str.contains("]")) {
throw new DateTimeException(
String.format(
"%s with [time-region] at end is not supported now, "
+ "please input like 2011-12-03T10:15:30 or 2011-12-03T10:15:30+01:00",
str));
}
return getInstantWithPrecision(str, timestampPrecision);
}

private static ZoneOffset resolveLocalOffset(String str, ZoneId zoneId) {
String normalized = str.replace('/', '-').replace('.', '-').replace(' ', 'T');
try {
LocalDateTime ldt = LocalDateTime.parse(normalized);
return zoneId.getRules().getOffset(ldt);
} catch (DateTimeParseException e) {
return zoneId.getRules().getOffset(Instant.now());
}
}

public static long getInstantWithPrecision(String str, String timestampPrecision) {
Expand Down
Loading
Loading