Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* file. Use of {@link org.apache.avro.io.ValidatingEncoder} is recommended.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Target({ ElementType.FIELD, ElementType.TYPE })
public @interface AvroEncode {
Class<? extends CustomEncoding<?>> using();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.ResolvingDecoder;

/**
* Expert: a custom encoder and decoder that writes an object directly to avro.
Expand All @@ -38,8 +39,16 @@ public abstract class CustomEncoding<T> {

protected abstract void write(Object datum, Encoder out) throws IOException;

protected void write(Object datum, Encoder out, ReflectDatumWriter writer) throws IOException {
this.write(datum, out);
}

protected abstract T read(Object reuse, Decoder in) throws IOException;

protected T read(Object reuse, ResolvingDecoder in, ReflectDatumReader reader) throws IOException {
return this.read(reuse, in);
}

T read(Decoder in) throws IOException {
return this.read(null, in);
}
Expand All @@ -48,4 +57,8 @@ protected Schema getSchema() {
return schema;
}

public CustomEncoding<T> setSchema(Schema schema) {
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,11 @@ protected Schema createSchema(Type type, Map<String, Schema> names) {
AvroSchema explicit = c.getAnnotation(AvroSchema.class);
if (explicit != null) // explicit schema
return new Schema.Parser().parse(explicit.value());
CustomEncoding<?> custom = getCustomEncoding(c);
// if custom encoding does not specify the schema use the default schema
if (custom != null && custom.getSchema() != null) {
return custom.getSchema();
}
if (CharSequence.class.isAssignableFrom(c)) // String
return Schema.create(Schema.Type.STRING);
if (ByteBuffer.class.isAssignableFrom(c)) // bytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,22 @@ public ReflectDatumReader(ReflectData data) {
super(data);
}

/** Called to read data. */
@Override
protected Object read(Object old, Schema expected, ResolvingDecoder in) throws IOException {
return super.read(old, expected, in);
}

@Override
protected Object readRecord(Object old, Schema expected, ResolvingDecoder in) throws IOException {
SpecificData data = getSpecificData();
CustomEncoding encoder = data.getCustomEncoding(expected);
if (encoder != null) {
return encoder.read(old, in, this);
}
return super.readRecord(old, expected, in);
}

@Override
protected Object newArray(Object old, int size, Schema schema) {
Class<?> collectionClass = ReflectData.getClassProp(schema, SpecificData.CLASS_PROP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.io.Encoder;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.MapEntry;

Expand Down Expand Up @@ -61,6 +62,17 @@ protected ReflectDatumWriter(ReflectData reflectData) {
super(reflectData);
}

@Override
protected void writeRecord(Schema schema, Object datum, Encoder out) throws IOException {
SpecificData data = getSpecificData();
CustomEncoding encoder = data.getCustomEncoding(schema);
if (encoder != null) {
encoder.write(datum, out, this);
} else {
super.writeRecord(schema, datum, out);
}
}

/**
* Called to write a array. May be overridden for alternate array
* representations.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.reflect;

import java.io.IOException;
import java.lang.reflect.Array;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.avro.AvroMissingFieldException;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.ResolvingDecoder;

public class ReflectRecordEncoding extends CustomEncoding<Object> {

private final Class<?> type;
private final List<FieldWriter> writer;
private final Constructor<?> constructor;
private List<FieldReader> reader;

public ReflectRecordEncoding(Class<?> type) {
this.type = type;
this.writer = null;
this.constructor = null;

}

public ReflectRecordEncoding(Class<?> type, Schema schema) {
this.type = type;
this.schema = schema;
this.writer = schema.getFields().stream().map(field -> {
try {
Field classField = type.getDeclaredField(field.name());
classField.setAccessible(true);
AvroEncode enc = classField.getAnnotation(AvroEncode.class);
if (enc != null)
return new CustomEncodedFieldWriter(classField, enc.using().getDeclaredConstructor().newInstance());
return new ReflectFieldWriter(classField, field.schema());
} catch (ReflectiveOperationException e) {
throw new AvroMissingFieldException("Field does not exist", field);
}
}).collect(Collectors.toList());

// order of this matches default constructor find order mapping

Field[] fields = type.getDeclaredFields();

List<Class<?>> parameterTypes = new ArrayList<>(fields.length);

Map<String, Integer> offsets = new HashMap<>();

// need to know offset for mapping
for (Field field : fields) {
if (Modifier.isStatic(field.getModifiers())) {
continue;
}
offsets.put(field.getName(), parameterTypes.size());
parameterTypes.add(field.getType());
}

try {
this.constructor = type.getDeclaredConstructor(parameterTypes.toArray(new Class[0]));
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}

var reader = schema.getFields().stream().map(field -> {
int offset = offsets.remove(field.name());

try {
Field classField = type.getDeclaredField(field.name());
AvroEncode enc = classField.getAnnotation(AvroEncode.class);
if (enc != null)
return new CustomEncodedFieldReader(offset, enc.using().getDeclaredConstructor().newInstance(),
field.schema());
return new ReflectFieldReader(offset, field.schema());
} catch (ReflectiveOperationException e) {
throw new AvroRuntimeException("Could not instantiate custom Encoding");
}
}).collect(Collectors.toList());

// got a new field, set to default value
if (!offsets.isEmpty()) {
var defaultReader = new ArrayList<>(reader);

offsets.forEach((field, offset) -> {
try {

var fieldType = type.getDeclaredField(field).getType();
if (fieldType.isPrimitive()) {
var defaultValue = Array.get(Array.newInstance(fieldType, 1), 0);
defaultReader.add(new DefaultReader(offset, defaultValue));

} else {
defaultReader.add(new DefaultReader(offset, null));
}
} catch (ReflectiveOperationException e) {
throw new AvroRuntimeException(e);
}
});

reader = defaultReader;

}
this.reader = reader;

}

@Override
public CustomEncoding<Object> setSchema(Schema schema) {
return new ReflectRecordEncoding(type, schema);
}

@Override
protected void write(Object datum, Encoder out) throws IOException {
throw new UnsupportedOperationException("No writer specified");
}

@Override
protected void write(Object datum, Encoder out, ReflectDatumWriter writer) throws IOException {
for (FieldWriter field : this.writer) {
field.write(datum, out, writer);
}
}

@Override
protected Object read(Object reuse, Decoder in) throws IOException {
throw new UnsupportedOperationException("No reader specified");
}

@Override
protected Object read(Object reuse, ResolvingDecoder in, ReflectDatumReader reader) throws IOException {

Object[] args = new Object[this.reader.size()];

for (FieldReader field : this.reader) {
field.read(in, reader, args);
}

try {
return this.constructor.newInstance(args);
} catch (ReflectiveOperationException e) {
throw new RuntimeException();
}
}

private interface FieldWriter {
void write(Object datum, Encoder out, ReflectDatumWriter writer) throws IOException;
}

private static class ReflectFieldWriter implements FieldWriter {

private final Field field;
private final Schema schema;

public ReflectFieldWriter(Field field, Schema schema) {
this.field = field;
this.schema = schema;
}

@Override
public void write(Object datum, Encoder out, ReflectDatumWriter writer) throws IOException {
try {
Object obj = field.get(datum);
writer.write(schema, obj, out);
} catch (ReflectiveOperationException e) {
throw new AvroRuntimeException("Could not invoke", e);
}
}
}

private static class CustomEncodedFieldWriter implements FieldWriter {

private final Field field;
private final CustomEncoding<?> encoding;

public CustomEncodedFieldWriter(Field field, CustomEncoding<?> encoding) {
this.field = field;
this.encoding = encoding;
}

@Override
public void write(Object datum, Encoder out, ReflectDatumWriter writer) throws IOException {
try {
Object obj = field.get(datum);
encoding.write(obj, out);
} catch (ReflectiveOperationException e) {
throw new AvroRuntimeException("Could not invoke", e);
}
}
}

private interface FieldReader {
public void read(ResolvingDecoder in, ReflectDatumReader reader, Object[] constructorArgs) throws IOException;
}

private static class ReflectFieldReader implements FieldReader {

private final int constructorOffset;
private final Schema schema;

public ReflectFieldReader(int constructorOffset, Schema schema) {
this.constructorOffset = constructorOffset;
this.schema = schema;
}

@Override
public void read(ResolvingDecoder in, ReflectDatumReader reader, Object[] constructorArgs) throws IOException {
Object obj = reader.read(null, schema, in);
constructorArgs[constructorOffset] = obj;
}
}

private static class DefaultReader implements FieldReader {

private final int constructorOffset;
private final Object defaultValue;

public DefaultReader(int constructorOffset, Object defaultValue) {
this.constructorOffset = constructorOffset;
this.defaultValue = defaultValue;
}

@Override
public void read(ResolvingDecoder in, ReflectDatumReader reader, Object[] constructorArgs) throws IOException {
constructorArgs[constructorOffset] = defaultValue;
}
}

private static class CustomEncodedFieldReader implements FieldReader {

private final int constructorOffset;
private final CustomEncoding<?> encoding;

public CustomEncodedFieldReader(int constructorOffset, CustomEncoding<?> encoding, Schema schema) {
this.constructorOffset = constructorOffset;
this.encoding = encoding;
encoding.setSchema(schema);
}

@Override
public void read(ResolvingDecoder in, ReflectDatumReader reader, Object[] constructorArgs) throws IOException {
Object obj = encoding.read(in);
constructorArgs[constructorOffset] = obj;
}
}
}
Loading