Skip to content

[FLINK-24544][formats] Fix Avro enum deserialization failure with Confluent Schema Registry#27591

Open
nateab wants to merge 1 commit intoapache:masterfrom
nateab:fix/FLINK-24544-avro-enum-deserialization
Open

[FLINK-24544][formats] Fix Avro enum deserialization failure with Confluent Schema Registry#27591
nateab wants to merge 1 commit intoapache:masterfrom
nateab:fix/FLINK-24544-avro-enum-deserialization

Conversation

@nateab
Copy link
Contributor

@nateab nateab commented Feb 12, 2026

What is the purpose of the change

This pull request fixes a deserialization failure when using the Table API with Kafka + Avro + Confluent Schema Registry for records containing enum types. Deserialization fails with the error "Found MyEnumType, expecting union".

The root cause is that RegistryAvroFormatFactory derives a reader schema from the Table DDL via AvroSchemaConverter.convertToSchema(rowType). This conversion is lossy: Avro enums become Flink STRING, which converts back to Avro ["null", "string"] instead of ["null", {"type": "enum", ...}]. When Avro's GenericDatumReader performs schema resolution between the writer schema (from the registry, with enum) and the reader schema (from DDL, with string), it fails because union resolution cannot match an enum against a string.

The fix stops using the DDL-derived schema as the Avro reader schema when no explicit schema is provided via the avro-confluent.schema format option. Instead, the writer schema from the registry is used directly for deserialization. The AvroToRowDataConverters already handles enum-to-string conversion via .toString() at the Flink level, so Avro-level schema resolution is not needed for type coercion. When the user provides an explicit schema via avro-confluent.schema, it continues to be used as the reader schema.

Brief change log

  • RegistryAvroFormatFactory: changed deserialization path to pass null instead of the DDL-derived schema when no avro-confluent.schema option is set (serialization path unchanged)
  • AvroDeserializationSchema: checkAvroInitialized() now handles null schemaString gracefully; added Preconditions.checkNotNull guards in deserialize() and getProducedType() to fail fast with clear messages if null reader schema leaks into code paths that require it
  • RegistryAvroDeserializationSchema: falls back to writer schema when reader schema is null via datumReader.setExpected(readerSchema != null ? readerSchema : writerSchema)
  • Added test testRowDataReadWithEnumFieldAndNullReaderSchema in RegistryAvroRowDataSeDeSchemaTest
  • Updated RegistryAvroFormatFactoryTest.testDeserializationSchema to match new null-schema behavior

Verifying this change

This change added tests and can be verified as follows:

  • Added testRowDataReadWithEnumFieldAndNullReaderSchema that creates an Avro schema with a nullable enum field, serializes a GenericRecord using the Confluent wire format (magic byte + schema ID + Avro binary), then deserializes with a null reader schema and verifies the enum value is correctly read as a string
  • Updated RegistryAvroFormatFactoryTest.testDeserializationSchema to expect null schema in the deserialization path
  • Verified all existing tests pass: flink-avro (337 tests, 0 failures), flink-avro-confluent-registry (28 tests, 0 failures)

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes — the deserialization path now skips Avro schema resolution when no explicit reader schema is set, which is a slight performance improvement
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@nateab nateab force-pushed the fix/FLINK-24544-avro-enum-deserialization branch from 08744e2 to cbae031 Compare February 12, 2026 04:43
@flinkbot
Copy link
Collaborator

flinkbot commented Feb 12, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

…fluent Schema Registry

When using the Table API with Kafka + Avro + Confluent Schema Registry,
deserialization fails for records containing enum types with the error
"Found MyEnum, expecting union". The root cause is that the
RegistryAvroFormatFactory derives a reader schema from the Table DDL via
AvroSchemaConverter, which is lossy: Avro enums become Flink STRING,
which converts back to Avro string. Avro's schema resolution then fails
because it cannot match an enum writer type against a string reader type
in a union.

The fix stops using the DDL-derived schema as the Avro reader schema
when no explicit schema is provided via the avro-confluent.schema option.
Instead, the writer schema from the registry is used directly for
deserialization. The AvroToRowDataConverter already handles enum-to-string
conversion via .toString() at the Flink level, so Avro-level schema
resolution is not needed for type coercion.

When the user provides an explicit schema via avro-confluent.schema, it
continues to be used as the reader schema (schema evolution works because
user-provided schemas preserve enum types).
@nateab nateab force-pushed the fix/FLINK-24544-avro-enum-deserialization branch from cbae031 to ccdbdec Compare February 12, 2026 04:53
int[][] projections) {
producedDataType = Projection.of(projections).project(producedDataType);
final RowType rowType = (RowType) producedDataType.getLogicalType();
// When no explicit schema is provided, pass null so that the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to put some of the tables mentioned in Jira as test cases to make sure they work and making it explicit under what circumstances this fix is required. For example does this effect sink cases as well as join cases?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants