Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 55 additions & 4 deletions docs/content.zh/docs/connectors/table/formats/csv.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,23 +67,26 @@ Format 参数
<thead>
<tr>
<th class="text-left" style="width: 25%">参数</th>
<th class="text-center" style="width: 10%">是否必选</th>
<th class="text-center" style="width: 10%">默认值</th>
<th class="text-center" style="width: 8%">是否必选</th>
<th class="text-center" style="width: 8%">是否可转发</th>
<th class="text-center" style="width: 7%">默认值</th>
<th class="text-center" style="width: 10%">类型</th>
<th class="text-center" style="width: 45%">描述</th>
<th class="text-center" style="width: 42%">描述</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>format</h5></td>
<td>必选</td>
<td>否</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>指定要使用的格式,这里应该是 <code>'csv'</code>。</td>
</tr>
<tr>
<td><h5>csv.field-delimiter</h5></td>
<td>可选</td>
<td>是</td>
<td style="word-wrap: break-word;"><code>,</code></td>
<td>String</td>
<td>字段分隔符 (默认<code>','</code>),必须为单字符。你可以使用反斜杠字符指定一些特殊字符,例如 <code>'\t'</code> 代表制表符。
Expand All @@ -93,20 +96,23 @@ Format 参数
<tr>
<td><h5>csv.disable-quote-character</h5></td>
<td>可选</td>
<td>是</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>是否禁止对引用的值使用引号 (默认是 false)。 如果禁止,选项 <code>'csv.quote-character'</code> 不能设置。</td>
</tr>
<tr>
<td><h5>csv.quote-character</h5></td>
<td>可选</td>
<td>是</td>
<td style="word-wrap: break-word;"><code>"</code></td>
<td>String</td>
<td>用于围住字段值的引号字符 (默认<code>"</code>)。</td>
</tr>
<tr>
<td><h5>csv.allow-comments</h5></td>
<td>可选</td>
<td>是</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>是否允许忽略注释行(默认不允许),注释行以 <code>'#'</code> 作为起始字符。
Expand All @@ -116,38 +122,83 @@ Format 参数
<tr>
<td><h5>csv.ignore-parse-errors</h5></td>
<td>可选</td>
<td>否</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为<code>null</code>。</td>
<td>当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为<code>null</code>。</td>
</tr>
<tr>
<td><h5>csv.array-element-delimiter</h5></td>
<td>可选</td>
<td>是</td>
<td style="word-wrap: break-word;"><code>;</code></td>
<td>String</td>
<td>分隔数组和行元素的字符串(默认<code>';'</code>)。</td>
</tr>
<tr>
<td><h5>csv.escape-character</h5></td>
<td>可选</td>
<td>是</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>转义字符(默认关闭)。</td>
</tr>
<tr>
<td><h5>csv.null-literal</h5></td>
<td>可选</td>
<td>是</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>指定识别成 null 值的字符串(默认禁用)。在输入端会将该字符串转为 null 值,在输出端会将 null 值转成该字符串。</td>
</tr>
<tr>
<td><h5>csv.write-bigdecimal-in-scientific-notation</h5></td>
<td>可选</td>
<td>是</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>设置将 Bigdecimal 类型的数据表示为科学计数法(默认为true,即需要转为科学计数法),例如一个BigDecimal的值为100000,设置true,结果为 '1E+5';设置为false,结果为 100000。注意:只有当值不等于0且是10的倍数才会转为科学计数法。</td>
</tr>
<tr>
<td><h5>csv.trim-spaces</h5></td>
<td>可选</td>
<td>是</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>可选标志,用于修剪未加引号字段值的前后空格(默认禁用)。仅影响反序列化。</td>
</tr>
<tr>
<td><h5>csv.ignore-trailing-unmappable</h5></td>
<td>可选</td>
<td>是</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>可选标志,用于忽略末尾无法映射到 schema 的多余字段(默认禁用)。仅影响反序列化。</td>
</tr>
<tr>
<td><h5>csv.allow-trailing-comma</h5></td>
<td>可选</td>
<td>是</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>可选标志,用于允许最后一个字段值之后的尾部逗号(默认启用)。仅影响反序列化。</td>
</tr>
<tr>
<td><h5>csv.fail-on-missing-columns</h5></td>
<td>可选</td>
<td>是</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>可选标志,当一行数据的列数少于 schema 期望的列数时将报错失败(默认禁用)。仅影响反序列化。</td>
</tr>
<tr>
<td><h5>csv.empty-string-as-null</h5></td>
<td>可选</td>
<td>是</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>可选标志,用于将空字符串值视为 null(默认禁用)。仅影响反序列化。</td>
</tr>
</tbody>
</table>

Expand Down
40 changes: 40 additions & 0 deletions docs/content/docs/connectors/table/formats/csv.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,46 @@ Format Options
<td>Boolean</td>
<td>Enables representation of BigDecimal data type in scientific notation (default is true). For example, 100000 is encoded as 1E+5 by default, and will be written as 100000 if set this option to false. Note: Only when the value is not 0 and a multiple of 10 is converted to scientific notation.</td>
</tr>
<tr>
<td><h5>csv.trim-spaces</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Optional flag to trim leading/trailing spaces from unquoted field values (disabled by default). Only affects deserialization.</td>
</tr>
<tr>
<td><h5>csv.ignore-trailing-unmappable</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Optional flag to ignore extra trailing fields that cannot be mapped to the schema (disabled by default). Only affects deserialization.</td>
</tr>
<tr>
<td><h5>csv.allow-trailing-comma</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Optional flag to allow a trailing comma after the last field value (enabled by default). Only affects deserialization.</td>
</tr>
<tr>
<td><h5>csv.fail-on-missing-columns</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Optional flag to fail when a row has fewer columns than the schema expects (disabled by default). Only affects deserialization.</td>
</tr>
<tr>
<td><h5>csv.empty-string-as-null</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Optional flag to treat empty string values as null (disabled by default). Only affects deserialization.</td>
</tr>
</tbody>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,18 @@
import java.util.Set;

import static org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_COMMENTS;
import static org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_TRAILING_COMMA;
import static org.apache.flink.formats.csv.CsvFormatOptions.ARRAY_ELEMENT_DELIMITER;
import static org.apache.flink.formats.csv.CsvFormatOptions.DISABLE_QUOTE_CHARACTER;
import static org.apache.flink.formats.csv.CsvFormatOptions.EMPTY_STRING_AS_NULL;
import static org.apache.flink.formats.csv.CsvFormatOptions.ESCAPE_CHARACTER;
import static org.apache.flink.formats.csv.CsvFormatOptions.FAIL_ON_MISSING_COLUMNS;
import static org.apache.flink.formats.csv.CsvFormatOptions.FIELD_DELIMITER;
import static org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_TRAILING_UNMAPPABLE;
import static org.apache.flink.formats.csv.CsvFormatOptions.NULL_LITERAL;
import static org.apache.flink.formats.csv.CsvFormatOptions.QUOTE_CHARACTER;
import static org.apache.flink.formats.csv.CsvFormatOptions.TRIM_SPACES;
import static org.apache.flink.formats.csv.CsvFormatOptions.WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION;

/** A class with common CSV format constants and utility methods. */
Expand Down Expand Up @@ -102,6 +107,11 @@ public static Set<ConfigOption<?>> optionalOptions() {
options.add(ESCAPE_CHARACTER);
options.add(NULL_LITERAL);
options.add(WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION);
options.add(TRIM_SPACES);
options.add(IGNORE_TRAILING_UNMAPPABLE);
options.add(ALLOW_TRAILING_COMMA);
options.add(FAIL_ON_MISSING_COLUMNS);
options.add(EMPTY_STRING_AS_NULL);
return options;
}

Expand All @@ -115,6 +125,11 @@ public static Set<ConfigOption<?>> forwardOptions() {
options.add(ESCAPE_CHARACTER);
options.add(NULL_LITERAL);
options.add(WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION);
options.add(TRIM_SPACES);
options.add(IGNORE_TRAILING_UNMAPPABLE);
options.add(ALLOW_TRAILING_COMMA);
options.add(FAIL_ON_MISSING_COLUMNS);
options.add(EMPTY_STRING_AS_NULL);
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@
import org.apache.flink.table.plan.stats.TableStats;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.function.SerializableSupplier;
import org.apache.flink.util.jackson.JacksonMapperFactory;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;

import org.apache.commons.text.StringEscapeUtils;
Expand All @@ -60,13 +62,18 @@
import java.util.Set;

import static org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_COMMENTS;
import static org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_TRAILING_COMMA;
import static org.apache.flink.formats.csv.CsvFormatOptions.ARRAY_ELEMENT_DELIMITER;
import static org.apache.flink.formats.csv.CsvFormatOptions.DISABLE_QUOTE_CHARACTER;
import static org.apache.flink.formats.csv.CsvFormatOptions.EMPTY_STRING_AS_NULL;
import static org.apache.flink.formats.csv.CsvFormatOptions.ESCAPE_CHARACTER;
import static org.apache.flink.formats.csv.CsvFormatOptions.FAIL_ON_MISSING_COLUMNS;
import static org.apache.flink.formats.csv.CsvFormatOptions.FIELD_DELIMITER;
import static org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_TRAILING_UNMAPPABLE;
import static org.apache.flink.formats.csv.CsvFormatOptions.NULL_LITERAL;
import static org.apache.flink.formats.csv.CsvFormatOptions.QUOTE_CHARACTER;
import static org.apache.flink.formats.csv.CsvFormatOptions.TRIM_SPACES;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** CSV format factory for file system. */
Expand Down Expand Up @@ -125,15 +132,14 @@ public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
final RowType physicalRowType = (RowType) physicalDataType.getLogicalType();
final CsvSchema schema = buildCsvSchema(physicalRowType, formatOptions);

final boolean ignoreParseErrors =
formatOptions.getOptional(IGNORE_PARSE_ERRORS).isPresent();
final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
final Converter<JsonNode, RowData, Void> converter =
(Converter)
new CsvToRowDataConverters(ignoreParseErrors)
.createRowConverter(projectedRowType, true);
CsvReaderFormat<RowData> csvReaderFormat =
new CsvReaderFormat<>(
JacksonMapperFactory::createCsvMapper,
createCsvMapperFactory(formatOptions),
ignored -> schema,
JsonNode.class,
converter,
Expand Down Expand Up @@ -217,4 +223,38 @@ private static CsvSchema buildCsvSchema(RowType rowType, ReadableConfig options)

return csvBuilder.build();
}

/**
* Creates a {@link SerializableSupplier} for {@link CsvMapper} that applies the configured
* {@link CsvParser.Feature} options.
*/
private static SerializableSupplier<CsvMapper> createCsvMapperFactory(
ReadableConfig formatOptions) {
final boolean trimSpaces = formatOptions.get(TRIM_SPACES);
final boolean ignoreTrailingUnmappable = formatOptions.get(IGNORE_TRAILING_UNMAPPABLE);
final boolean allowTrailingComma = formatOptions.get(ALLOW_TRAILING_COMMA);
final boolean failOnMissingColumns = formatOptions.get(FAIL_ON_MISSING_COLUMNS);
final boolean emptyStringAsNull = formatOptions.get(EMPTY_STRING_AS_NULL);

return () -> {
CsvMapper mapper = JacksonMapperFactory.createCsvMapper();
configureFeature(mapper, CsvParser.Feature.TRIM_SPACES, trimSpaces);
configureFeature(
mapper, CsvParser.Feature.IGNORE_TRAILING_UNMAPPABLE, ignoreTrailingUnmappable);
configureFeature(mapper, CsvParser.Feature.ALLOW_TRAILING_COMMA, allowTrailingComma);
configureFeature(
mapper, CsvParser.Feature.FAIL_ON_MISSING_COLUMNS, failOnMissingColumns);
configureFeature(mapper, CsvParser.Feature.EMPTY_STRING_AS_NULL, emptyStringAsNull);
return mapper;
};
}

private static void configureFeature(
CsvMapper mapper, CsvParser.Feature feature, boolean enabled) {
if (enabled) {
mapper.enable(feature);
} else {
mapper.disable(feature);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,18 @@
import java.util.Set;

import static org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_COMMENTS;
import static org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_TRAILING_COMMA;
import static org.apache.flink.formats.csv.CsvFormatOptions.ARRAY_ELEMENT_DELIMITER;
import static org.apache.flink.formats.csv.CsvFormatOptions.DISABLE_QUOTE_CHARACTER;
import static org.apache.flink.formats.csv.CsvFormatOptions.EMPTY_STRING_AS_NULL;
import static org.apache.flink.formats.csv.CsvFormatOptions.ESCAPE_CHARACTER;
import static org.apache.flink.formats.csv.CsvFormatOptions.FAIL_ON_MISSING_COLUMNS;
import static org.apache.flink.formats.csv.CsvFormatOptions.FIELD_DELIMITER;
import static org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_TRAILING_UNMAPPABLE;
import static org.apache.flink.formats.csv.CsvFormatOptions.NULL_LITERAL;
import static org.apache.flink.formats.csv.CsvFormatOptions.QUOTE_CHARACTER;
import static org.apache.flink.formats.csv.CsvFormatOptions.TRIM_SPACES;
import static org.apache.flink.formats.csv.CsvFormatOptions.WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION;

/**
Expand Down Expand Up @@ -179,6 +184,24 @@ private static void configureDeserializationSchema(
.ifPresent(schemaBuilder::setEscapeCharacter);

formatOptions.getOptional(NULL_LITERAL).ifPresent(schemaBuilder::setNullLiteral);

formatOptions.getOptional(TRIM_SPACES).ifPresent(schemaBuilder::setTrimSpaces);

formatOptions
.getOptional(IGNORE_TRAILING_UNMAPPABLE)
.ifPresent(schemaBuilder::setIgnoreTrailingUnmappable);

formatOptions
.getOptional(ALLOW_TRAILING_COMMA)
.ifPresent(schemaBuilder::setAllowTrailingComma);

formatOptions
.getOptional(FAIL_ON_MISSING_COLUMNS)
.ifPresent(schemaBuilder::setFailOnMissingColumns);

formatOptions
.getOptional(EMPTY_STRING_AS_NULL)
.ifPresent(schemaBuilder::setEmptyStringAsNull);
}

private static void configureSerializationSchema(
Expand Down
Loading