Skip to content

Commit c5758d0

Browse files
hmithmitnflx
authored andcommitted
AVRO-3989: [java] add conversion classes to default instance of SpecificData (#3354)
* AVRO-3989: [java] add conversion classes to default instance of SpecificData * add test case, exclude decimal conversion * review comment: use schema from the generated class for `fooBar.avsc` * add logical type conversion classes to ReflectData as well * refactor to single method --------- Co-authored-by: Harshit Mittal <hmittal@netflix.com> (cherry picked from commit f131a5c)
1 parent 44228ea commit c5758d0

File tree

6 files changed

+227
-21
lines changed

6 files changed

+227
-21
lines changed

lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ protected Schema createFieldSchema(Field field, Map<String, Schema> names) {
106106

107107
private static final ReflectData INSTANCE = new ReflectData();
108108

109+
static {
110+
addLogicalTypeConversions(INSTANCE);
111+
}
112+
109113
/** For subclasses. Applications normally use {@link ReflectData#get()}. */
110114
public ReflectData() {
111115
}

lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919

2020
import org.apache.avro.AvroRuntimeException;
2121
import org.apache.avro.AvroTypeException;
22+
import org.apache.avro.Conversions;
2223
import org.apache.avro.Protocol;
2324
import org.apache.avro.Schema;
2425
import org.apache.avro.Schema.Type;
26+
import org.apache.avro.data.TimeConversions;
2527
import org.apache.avro.generic.GenericData;
2628
import org.apache.avro.io.BinaryDecoder;
2729
import org.apache.avro.io.BinaryEncoder;
@@ -59,6 +61,28 @@ public class SpecificData extends GenericData {
5961

6062
private static final SpecificData INSTANCE = new SpecificData();
6163

64+
static {
65+
addLogicalTypeConversions(INSTANCE);
66+
}
67+
68+
protected static void addLogicalTypeConversions(SpecificData instance) {
69+
instance.addLogicalTypeConversion(new Conversions.UUIDConversion());
70+
// Disable DecimalConversion since it's gated behind
71+
// `compiler.setEnableDecimalLogicalType`
72+
// INSTANCE.addLogicalTypeConversion(new Conversions.DecimalConversion());
73+
instance.addLogicalTypeConversion(new Conversions.BigDecimalConversion());
74+
instance.addLogicalTypeConversion(new Conversions.DurationConversion());
75+
instance.addLogicalTypeConversion(new TimeConversions.DateConversion());
76+
instance.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion());
77+
instance.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
78+
instance.addLogicalTypeConversion(new TimeConversions.LocalTimestampNanosConversion());
79+
instance.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
80+
instance.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
81+
instance.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
82+
instance.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
83+
instance.addLogicalTypeConversion(new TimeConversions.TimestampNanosConversion());
84+
}
85+
6286
private static final Class<?>[] NO_ARG = new Class[] {};
6387
private static final Class<?>[] SCHEMA_ARG = new Class[] { Schema.class };
6488

lang/java/avro/src/test/java/org/apache/avro/reflect/TestReflectLogicalTypes.java

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -423,36 +423,37 @@ void readUUIDMissingLogicalTypeReflect() throws IOException {
423423
r1.uuid = u1.toString();
424424

425425
File test = write(ReflectData.get().getSchema(RecordWithStringUUID.class), r1);
426-
assertThrows(IllegalArgumentException.class,
427-
() -> read(ReflectData.get().createDatumReader(uuidSchema), test).get(0));
426+
RecordWithUUID result = (RecordWithUUID) read(ReflectData.get().createDatumReader(uuidSchema), test).get(0);
427+
assertEquals(u1, result.uuid);
428428
}
429429

430430
@Test
431431
void writeUUIDMissingLogicalType() throws IOException {
432-
assertThrows(DataFileWriter.AppendWriteException.class, () -> {
433-
Schema uuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName()).fields().requiredString("uuid")
434-
.endRecord();
435-
LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema());
432+
Schema uuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName()).fields().requiredString("uuid")
433+
.endRecord();
434+
LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema());
436435

437-
UUID u1 = UUID.randomUUID();
438-
UUID u2 = UUID.randomUUID();
436+
UUID u1 = UUID.randomUUID();
437+
UUID u2 = UUID.randomUUID();
439438

440-
RecordWithUUID r1 = new RecordWithUUID();
441-
r1.uuid = u1;
442-
RecordWithUUID r2 = new RecordWithUUID();
443-
r2.uuid = u2;
439+
RecordWithUUID r1 = new RecordWithUUID();
440+
r1.uuid = u1;
441+
RecordWithUUID r2 = new RecordWithUUID();
442+
r2.uuid = u2;
444443

445-
// write without using REFLECT, which has the logical type
446-
File test = write(uuidSchema, r1, r2);
444+
// write without using REFLECT, which has the logical type
445+
File test = write(uuidSchema, r1, r2);
447446

448-
// verify that the field's type overrides the logical type
449-
Schema uuidStringSchema = SchemaBuilder.record(RecordWithStringUUID.class.getName()).fields()
450-
.requiredString("uuid").endRecord();
447+
// verify that the field's type overrides the logical type
448+
Schema uuidStringSchema = SchemaBuilder.record(RecordWithStringUUID.class.getName()).fields().requiredString("uuid")
449+
.endRecord();
451450

452-
// this fails with an AppendWriteException wrapping ClassCastException
453-
// because the UUID isn't converted to a CharSequence expected internally
454-
read(ReflectData.get().createDatumReader(uuidStringSchema), test);
455-
});
451+
// this fails with an AppendWriteException wrapping ClassCastException
452+
// because the UUID isn't converted to a CharSequence expected internally
453+
List<RecordWithStringUUID> items = (List<RecordWithStringUUID>) read(
454+
ReflectData.get().createDatumReader(uuidStringSchema), test);
455+
assertEquals(r1.uuid.toString(), items.get(0).uuid);
456+
assertEquals(r2.uuid.toString(), items.get(1).uuid);
456457
}
457458

458459
@Test
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.avro;
19+
20+
import static org.junit.jupiter.api.Assertions.assertEquals;
21+
22+
import java.io.File;
23+
import java.io.IOException;
24+
import java.time.Instant;
25+
26+
import example.avro.Bar;
27+
import org.apache.avro.file.DataFileReader;
28+
import org.apache.avro.file.DataFileWriter;
29+
import org.apache.avro.generic.GenericData;
30+
import org.apache.avro.generic.GenericDatumWriter;
31+
import org.apache.avro.reflect.ReflectDatumReader;
32+
import org.apache.avro.reflect.ReflectDatumWriter;
33+
import org.apache.avro.specific.SpecificDatumReader;
34+
35+
import org.junit.jupiter.api.Test;
36+
import org.junit.jupiter.api.io.TempDir;
37+
38+
public class TestDataFileReflect {
39+
40+
@TempDir
41+
public File DIR;
42+
43+
@Test
44+
public void reflectDatumReaderUnionWithLogicalType() throws IOException {
45+
File file = new File(DIR.getPath(), "testReflectDatumReaderUnionWithLogicalType");
46+
Schema schema = Bar.SCHEMA$;
47+
// Create test data
48+
Instant value = Instant.now();
49+
try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(
50+
new GenericDatumWriter<GenericData.Record>(schema)).create(schema, file)) {
51+
for (int i = 0; i < 10; i++) {
52+
GenericData.Record r = new GenericData.Record(schema);
53+
r.put("title", "title" + i);
54+
r.put("created_at", value.toEpochMilli() + i * 1000);
55+
writer.append(r);
56+
}
57+
}
58+
59+
// read using a 'new ReflectDatumReader<T>()' to force inference of
60+
// reader's schema from runtime
61+
try (DataFileReader<Bar> reader = new DataFileReader<>(file, new ReflectDatumReader<>())) {
62+
int i = 0;
63+
for (Bar instance : reader) {
64+
assertEquals("title" + i, instance.getTitle());
65+
assertEquals(Instant.ofEpochMilli(value.plusSeconds(i).toEpochMilli()), instance.getCreatedAt());
66+
i++;
67+
}
68+
assertEquals(10, i);
69+
}
70+
}
71+
72+
@Test
73+
public void reflectDatumWriterUnionWithLogicalType() throws IOException {
74+
File file = new File(DIR.getPath(), "testReflectDatumWriterUnionWithLogicalType");
75+
76+
// Create test data
77+
Instant value = Instant.now();
78+
try (DataFileWriter<Bar> writer = new DataFileWriter<>(new ReflectDatumWriter<Bar>()).create(Bar.SCHEMA$, file)) {
79+
for (int i = 0; i < 10; i++) {
80+
Bar r = Bar.newBuilder().setTitle("title" + i).setCreatedAt(value.plusSeconds(i)).build();
81+
writer.append(r);
82+
}
83+
}
84+
85+
// read using a 'new SpecificDatumReader<T>()' to force inference of
86+
// reader's schema from runtime
87+
try (DataFileReader<Bar> reader = new DataFileReader<>(file, new SpecificDatumReader<>())) {
88+
int i = 0;
89+
for (Bar instance : reader) {
90+
assertEquals("title" + i, instance.getTitle());
91+
assertEquals(Instant.ofEpochMilli(value.plusSeconds(i).toEpochMilli()), instance.getCreatedAt());
92+
i++;
93+
}
94+
assertEquals(10, i);
95+
}
96+
}
97+
}

lang/java/ipc/src/test/java/org/apache/avro/TestDataFileSpecific.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@
2121

2222
import java.io.File;
2323
import java.io.IOException;
24+
import java.time.Instant;
2425

26+
import example.avro.Bar;
2527
import org.apache.avro.file.DataFileReader;
2628
import org.apache.avro.file.DataFileWriter;
2729
import org.apache.avro.generic.GenericData.Record;
2830
import org.apache.avro.generic.GenericDatumWriter;
2931
import org.apache.avro.specific.SpecificDatumReader;
3032

33+
import org.apache.avro.specific.SpecificDatumWriter;
3134
import org.junit.jupiter.api.Test;
3235
import org.junit.jupiter.api.io.TempDir;
3336

@@ -70,4 +73,60 @@ void specificDatumReaderDefaultCtor() throws IOException {
7073
}
7174
}
7275

76+
@Test
77+
public void specificDatumReaderUnionWithLogicalType() throws IOException {
78+
File file = new File(DIR.getPath(), "testSpecificDatumReaderUnionWithLogicalType");
79+
Schema schema = Bar.SCHEMA$;
80+
81+
// Create test data
82+
Instant value = Instant.now();
83+
try (DataFileWriter<Record> writer = new DataFileWriter<>(new GenericDatumWriter<Record>(schema)).create(schema,
84+
file)) {
85+
for (int i = 0; i < 10; i++) {
86+
Record r = new Record(schema);
87+
r.put("title", "title" + i);
88+
r.put("created_at", value.toEpochMilli() + i * 1000);
89+
writer.append(r);
90+
}
91+
}
92+
93+
// read using a 'new SpecificDatumReader<T>()' to force inference of
94+
// reader's schema from runtime
95+
try (DataFileReader<Bar> reader = new DataFileReader<>(file, new SpecificDatumReader<>())) {
96+
int i = 0;
97+
for (Bar instance : reader) {
98+
assertEquals("title" + i, instance.getTitle());
99+
assertEquals(Instant.ofEpochMilli(value.plusSeconds(i).toEpochMilli()), instance.getCreatedAt());
100+
i++;
101+
}
102+
assertEquals(10, i);
103+
}
104+
}
105+
106+
@Test
107+
public void specificDatumWriterUnionWithLogicalType() throws IOException {
108+
File file = new File(DIR.getPath(), "testSpecificDatumWriterUnionWithLogicalType");
109+
Schema schema = Bar.SCHEMA$;
110+
111+
// Create test data
112+
Instant value = Instant.now();
113+
try (DataFileWriter<Bar> writer = new DataFileWriter<>(new SpecificDatumWriter<Bar>()).create(schema, file)) {
114+
for (int i = 0; i < 10; i++) {
115+
Bar r = Bar.newBuilder().setTitle("title" + i).setCreatedAt(value.plusSeconds(i)).build();
116+
writer.append(r);
117+
}
118+
}
119+
120+
// read using a 'new SpecificDatumReader<T>()' to force inference of
121+
// reader's schema from runtime
122+
try (DataFileReader<Bar> reader = new DataFileReader<>(file, new SpecificDatumReader<>())) {
123+
int i = 0;
124+
for (Bar instance : reader) {
125+
assertEquals("title" + i, instance.getTitle());
126+
assertEquals(Instant.ofEpochMilli(value.plusSeconds(i).toEpochMilli()), instance.getCreatedAt());
127+
i++;
128+
}
129+
assertEquals(10, i);
130+
}
131+
}
73132
}

share/test/schemas/fooBar.avsc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"fields" : [
3+
{
4+
"name" : "title",
5+
"type" : "string"
6+
},
7+
{
8+
"name" : "created_at",
9+
"type" : [
10+
"null",
11+
{
12+
"logicalType" : "timestamp-millis",
13+
"type" : "long"
14+
}
15+
]
16+
}
17+
],
18+
"name" : "Bar",
19+
"namespace" : "example.avro",
20+
"type" : "record"
21+
}

0 commit comments

Comments
 (0)