Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,14 @@ public DeserializationSchema<RowData> createRuntimeDecoder(
int[][] projections) {
producedDataType = Projection.of(projections).project(producedDataType);
final RowType rowType = (RowType) producedDataType.getLogicalType();
// When no explicit schema is provided, pass null so that the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to put some of the tables mentioned in Jira as test cases to make sure they work and making it explicit under what circumstances this fix is required. For example does this effect sink cases as well as join cases?

// writer schema from the registry is used for deserialization.
// This avoids schema resolution failures when Avro types (e.g.
// enums) are lossy-converted through Flink's type system.
final Schema schema =
schemaString
.map(s -> getAvroSchema(s, rowType))
.orElse(AvroSchemaConverter.convertToSchema(rowType));
.orElse(null);
final TypeInformation<RowData> rowDataTypeInfo =
context.createTypeInformation(producedDataType);
return new AvroRowDataDeserializationSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ void testDeserializationSchema() {
final AvroRowDataDeserializationSchema expectedDeser =
new AvroRowDataDeserializationSchema(
ConfluentRegistryAvroDeserializationSchema.forGeneric(
AvroSchemaConverter.convertToSchema(ROW_TYPE), REGISTRY_URL),
null, REGISTRY_URL),
AvroToRowDataConverters.createRowConverter(ROW_TYPE),
InternalTypeInfo.of(ROW_TYPE));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,18 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.EncoderFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;

import static org.apache.flink.formats.avro.utils.AvroTestUtils.writeRecord;
Expand All @@ -56,6 +61,22 @@
* schema registry avro.
*/
class RegistryAvroRowDataSeDeSchemaTest {
private static final String ENUM_SUBJECT = "enum-record-value";

private static final Schema ENUM_RECORD_SCHEMA =
new Schema.Parser()
.parse(
"{\"namespace\": \"org.apache.flink.formats.avro.generated\",\n"
+ " \"type\": \"record\",\n"
+ " \"name\": \"EnumRecord\",\n"
+ " \"fields\": [\n"
+ " {\"name\": \"name\", \"type\": \"string\"},\n"
+ " {\"name\": \"color\", \"type\": [\"null\","
+ " {\"type\": \"enum\", \"name\": \"Colors\","
+ " \"symbols\": [\"RED\", \"GREEN\", \"BLUE\"]}]}\n"
+ " ]\n"
+ "}");

private static final Schema ADDRESS_SCHEMA = Address.getClassSchema();

private static final Schema ADDRESS_SCHEMA_COMPATIBLE =
Expand Down Expand Up @@ -90,6 +111,7 @@ void before() {
@AfterEach
void after() throws IOException, RestClientException {
client.deleteSubject(SUBJECT);
client.deleteSubject(ENUM_SUBJECT);
}

@Test
Expand Down Expand Up @@ -129,6 +151,30 @@ void testRowDataReadWithNonRegistryAvro() throws Exception {
.hasCause(new IOException("Unknown data format. Magic number does not match"));
}

@Test
void testRowDataReadWithEnumFieldAndNullReaderSchema() throws Exception {
DataType dataType = AvroSchemaConverter.convertToDataType(ENUM_RECORD_SCHEMA.toString());
RowType rowType = (RowType) dataType.getLogicalType();

int schemaId = client.register(ENUM_SUBJECT, ENUM_RECORD_SCHEMA);
GenericRecord record = new GenericData.Record(ENUM_RECORD_SCHEMA);
record.put("name", "Alice");
record.put(
"color",
new GenericData.EnumSymbol(
ENUM_RECORD_SCHEMA.getField("color").schema().getTypes().get(1), "RED"));
byte[] serialized = serializeWithRegistryFormat(record, ENUM_RECORD_SCHEMA, schemaId);

AvroRowDataDeserializationSchema deserializer =
getDeserializationSchemaForSubject(rowType, null, ENUM_SUBJECT);
deserializer.open(null);

RowData result = deserializer.deserialize(serialized);
assertThat(result.getArity()).isEqualTo(2);
assertThat(result.getString(0).toString()).isEqualTo("Alice");
assertThat(result.getString(1).toString()).isEqualTo("RED");
}

private void testRowDataWriteReadWithSchema(Schema schema) throws Exception {
DataType dataType = AvroSchemaConverter.convertToDataType(schema.toString());
RowType rowType = (RowType) dataType.getLogicalType();
Expand Down Expand Up @@ -162,8 +208,13 @@ private void testRowDataWriteReadWithSchema(Schema schema) throws Exception {

private static AvroRowDataSerializationSchema getSerializationSchema(
RowType rowType, Schema avroSchema) {
return getSerializationSchemaForSubject(rowType, avroSchema, SUBJECT);
}

private static AvroRowDataSerializationSchema getSerializationSchemaForSubject(
RowType rowType, Schema avroSchema, String subject) {
ConfluentSchemaRegistryCoder registryCoder =
new ConfluentSchemaRegistryCoder(SUBJECT, client);
new ConfluentSchemaRegistryCoder(subject, client);
return new AvroRowDataSerializationSchema(
rowType,
new RegistryAvroSerializationSchema<GenericRecord>(
Expand All @@ -173,15 +224,31 @@ private static AvroRowDataSerializationSchema getSerializationSchema(

private static AvroRowDataDeserializationSchema getDeserializationSchema(
RowType rowType, Schema avroSchema) {
return getDeserializationSchemaForSubject(rowType, avroSchema, SUBJECT);
}

private static AvroRowDataDeserializationSchema getDeserializationSchemaForSubject(
RowType rowType, Schema avroSchema, String subject) {
ConfluentSchemaRegistryCoder registryCoder =
new ConfluentSchemaRegistryCoder(SUBJECT, client);
new ConfluentSchemaRegistryCoder(subject, client);
return new AvroRowDataDeserializationSchema(
new RegistryAvroDeserializationSchema<GenericRecord>(
GenericRecord.class, avroSchema, () -> registryCoder),
AvroToRowDataConverters.createRowConverter(rowType),
InternalTypeInfo.of(rowType));
}

private static byte[] serializeWithRegistryFormat(
GenericRecord record, Schema schema, int schemaId) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(0);
out.write(ByteBuffer.allocate(4).putInt(schemaId).array());
org.apache.avro.io.Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
new GenericDatumWriter<>(schema).write(record, encoder);
encoder.flush();
return out.toByteArray();
}

private static RowData address2RowData(Address address) {
GenericRowData rowData = new GenericRowData(5);
rowData.setField(0, address.getNum());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,11 @@ public T deserialize(@Nullable byte[] message) throws IOException {
// read record
checkAvroInitialized();
inputStream.setBuffer(message);
Schema readerSchema = getReaderSchema();
Schema readerSchema =
Preconditions.checkNotNull(
getReaderSchema(),
"Reader schema is required for non-registry deserialization. "
+ "Use RegistryAvroDeserializationSchema for registry-based deserialization.");
GenericDatumReader<T> datumReader = getDatumReader();

datumReader.setSchema(readerSchema);
Expand All @@ -198,16 +202,18 @@ void checkAvroInitialized() throws IOException {
this.datumReader = new SpecificDatumReader<>(specificData);
this.reader = AvroFactory.extractAvroSpecificSchema(recordClazz, specificData);
} else {
this.reader = new Schema.Parser().parse(schemaString);
if (schemaString != null) {
this.reader = new Schema.Parser().parse(schemaString);
}
GenericData genericData = new GenericData(cl);
this.datumReader = new GenericDatumReader<>(null, this.reader, genericData);
}

this.inputStream = new MutableByteArrayInputStream();

if (encoding == AvroEncoding.JSON) {
if (encoding == AvroEncoding.JSON && getReaderSchema() != null) {
this.decoder = DecoderFactory.get().jsonDecoder(getReaderSchema(), inputStream);
} else {
} else if (encoding != AvroEncoding.JSON) {
this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
}
}
Expand All @@ -223,7 +229,12 @@ public TypeInformation<T> getProducedType() {
if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
return new AvroTypeInfo(recordClazz);
} else {
return (TypeInformation<T>) new GenericRecordAvroTypeInfo(this.reader);
return (TypeInformation<T>)
new GenericRecordAvroTypeInfo(
Preconditions.checkNotNull(
this.reader,
"Reader schema is required to derive TypeInformation. "
+ "When using RegistryAvroDeserializationSchema, provide TypeInformation explicitly."));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public T deserialize(@Nullable byte[] message) throws IOException {
GenericDatumReader<T> datumReader = getDatumReader();

datumReader.setSchema(writerSchema);
datumReader.setExpected(readerSchema);
datumReader.setExpected(readerSchema != null ? readerSchema : writerSchema);

if (getEncoding() == AvroEncoding.JSON) {
((JsonDecoder) getDecoder()).configure(getInputStream());
Expand Down