[FLINK-39065][Formats (JSON, Avro, Parquet, ORC, SequenceFile)] Support additional CsvParser.Feature options for CSV format deserialization #27578
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
This pull request exposes 5 additional Jackson
CsvParser.Featureoptions as Flink SQL CSV format configuration options, allowing users to fine-tune CSV deserialization behavior. Currently, the CSV format connector only exposes a limited set of parser options (likecsv.allow-commentsandcsv.ignore-parse-errors), but several useful Jackson CSV parser features are not accessible. This change adds the following new options:csv.trim-spaces— Trims leading/trailing whitespace from unquoted field values (CsvParser.Feature.TRIM_SPACES)csv.ignore-trailing-unmappable— Ignores extra trailing columns that don't map to the schema (CsvParser.Feature.IGNORE_TRAILING_UNMAPPABLE)csv.allow-trailing-comma— Allows a trailing comma after the last field value (CsvParser.Feature.ALLOW_TRAILING_COMMA)csv.fail-on-missing-columns— Fails when a row has fewer columns than expected by the schema (CsvParser.Feature.FAIL_ON_MISSING_COLUMNS)csv.empty-string-as-null— Treats empty string values as null (CsvParser.Feature.EMPTY_STRING_AS_NULL)These options only affect deserialization (source side).
Brief change log
ConfigOption<Boolean>definitions inCsvFormatOptionswith descriptions indicating they only affect deserializationCsvCommonsas both optional and forwarded optionsCsvRowDataDeserializationSchema.Builderwith setter methods for each new feature, and configured enabled/disabled features on theCsvMapperduringopen()CsvFormatFactory.configureDeserializationSchema()to read and pass the new options to the schema builderCsvFileFormatFactoryto support the new features in the Bulk Format / File Source path viacreateCsvMapperFactory()CsvFileFormatFactorywhereignoreParseErrorswas determined byisPresent()instead of reading the actual config valueVerifying this change
This change added tests and can be verified as follows:
testTrimSpaces()test inCsvFormatFactoryTestto verify thecsv.trim-spacesoption trims whitespace from unquoted field valuestestIgnoreTrailingUnmappable()test to verify extra trailing columns are silently ignoredtestAllowTrailingComma()test to verify a trailing comma after the last field value is acceptedtestFailOnMissingColumns()test to verify deserialization fails when a row has fewer columns than expectedtestEmptyStringAsNull()test to verify empty strings are treated as null valuestestAllCsvParserFeaturesTogether()test to verify all 5 new features work correctly when enabled simultaneouslytestSeDeSchema()test to includecsv.trim-spacesandcsv.empty-string-as-nulloptions, verifying the complete option-to-schema configuration chaintestBulkFormatWithParserFeatures()test to verify the Bulk Format / File Source path correctly applies the newCsvParser.Featureoptions viaCsvBulkDecodingFormatDoes this pull request potentially affect one of the following parts:
@Public(Evolving): yes (CsvFormatOptionsis annotated with@PublicEvolving, 5 newConfigOptionfields are added)open(), not per-record)Documentation
docs/content/docs/connectors/table/formats/csv.mdanddocs/content.zh/docs/connectors/table/formats/csv.md)