Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.avro.io.FastReaderBuilder;
import org.apache.avro.util.Utf8;
import org.apache.avro.util.internal.Accessor;
import org.apache.avro.generic.PrimitivesArrays.PrimitiveArray;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.avro.util.springframework.ConcurrentReferenceHashMap;
Expand Down Expand Up @@ -1515,38 +1516,73 @@ else if (value instanceof Utf8) {

}

/*
/**
* Called to create new array instances. Subclasses may override to use a
* different array implementation. By default, this returns a {@link
* GenericData.Array}.
* different array implementation. By default, this returns a
* {@link GenericData.Array}.
*
* @param old the old array instance to reuse, if possible. If the old array
* is an appropriate type, it may be cleared and returned.
* @param size the size of the array to create.
* @param schema the schema of the array elements.
*/
public Object newArray(Object old, int size, Schema schema) {
if (old instanceof GenericArray) {
((GenericArray<?>) old).reset();
return old;
} else if (old instanceof Collection) {
((Collection<?>) old).clear();
return old;
} else {
if (schema.getElementType().getType() == Type.INT) {
return new PrimitivesArrays.IntArray(size, schema);
}
if (schema.getElementType().getType() == Type.BOOLEAN) {
return new PrimitivesArrays.BooleanArray(size, schema);
}
if (schema.getElementType().getType() == Type.LONG) {
return new PrimitivesArrays.LongArray(size, schema);
}
if (schema.getElementType().getType() == Type.FLOAT) {
return new PrimitivesArrays.FloatArray(size, schema);
}
if (schema.getElementType().getType() == Type.DOUBLE) {
return new PrimitivesArrays.DoubleArray(size, schema);
final var logicalType = schema.getElementType().getLogicalType();
final var conversion = getConversionFor(logicalType);
final var optimalValueType = optimalValueType(schema, logicalType,
conversion == null ? null : conversion.getConvertedType());

if (old != null) {
if (old instanceof GenericData.Array<?>) {
((GenericData.Array<?>) old).reset();
return old;
} else if (old instanceof PrimitiveArray) {
var primitiveOld = (PrimitiveArray<?>) old;
if (primitiveOld.valueType() == optimalValueType) {
primitiveOld.reset();
return old;
}
} else if (old instanceof Collection) {
((Collection<?>) old).clear();
return old;
}
return new GenericData.Array<Object>(size, schema);
}
// we can't reuse the old array, so we create a new one
return PrimitivesArrays.createOptimizedArray(size, schema, optimalValueType);
}

/**
* Determine the optimal value type for an array. The value type is determined
* form the convertedElementType if supplied, otherwise the underlying type from
* the schema
*
* @param schema the schema of the array
* @param convertedElementType the converted elements value type. This may not
* be the same and the schema if for instance there
* is a logical type, and a convertor is use
* @return an indicator for the type of the array, useful for
* {@link PrimitivesArrays#createOptimizedArray(int, Schema, Schema.Type)}.
* May be null if the type is not optimised
*/
public static Schema.Type optimalValueType(Schema schema, LogicalType logicalType, Class<?> convertedElementType) {
if (logicalType == null)
// if there are no logical types- use the schema type
return schema.getElementType().getType();
else if (convertedElementType == null)
// if there is no convertor
return null;
else
// use the converted type
return PRIMITIVE_TYPES_WITH_SPECIALISED_ARRAYS.get(convertedElementType);
}

private final static Map<Class<?>, Schema.Type> PRIMITIVE_TYPES_WITH_SPECIALISED_ARRAYS = Map.of(//
Long.TYPE, Schema.Type.LONG, //
Integer.TYPE, Schema.Type.INT, //
Float.TYPE, Schema.Type.FLOAT, //
Double.TYPE, Schema.Type.DOUBLE, //
Boolean.TYPE, Schema.Type.BOOLEAN);

/**
* Called to create new array instances. Subclasses may override to use a
* different map implementation. By default, this returns a {@link HashMap}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,62 @@
*/
package org.apache.avro.generic;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;

import java.util.Arrays;
import java.util.Collection;

public class PrimitivesArrays {

public static class IntArray extends GenericData.AbstractArray<Integer> {
/**
* Create a primitive array if the value type is has an associated optimised
* implementation, otherwise a generic array is returned. The value type is
* determined form the convertedElementType if supplied, otherwise the
* underlying type from the schema
*
* @param size the size of the array to create
* @param schema the schema of the array
* @param valueType the converted elements value type. This may not be the same
* and the schema if for instance there is a logical type, and
* a convertor is use
* @return an instance of a primitive array or a Generic array if the value type
* is does not have an associated optimised implementation.
*/
public static GenericData.AbstractArray<?> createOptimizedArray(int size, Schema schema, Schema.Type valueType) {

if (valueType != null)
switch (valueType) {
case INT:
return new PrimitivesArrays.IntArray(size, schema);
case BOOLEAN:
return new PrimitivesArrays.BooleanArray(size, schema);
case LONG:
return new PrimitivesArrays.LongArray(size, schema);
case FLOAT:
return new PrimitivesArrays.FloatArray(size, schema);
case DOUBLE:
return new PrimitivesArrays.DoubleArray(size, schema);
default:
break;
}
return new GenericData.Array<>(size, schema);
}

public abstract static class PrimitiveArray<T> extends GenericData.AbstractArray<T> {
PrimitiveArray(Schema schema) {
super(schema);
}

public abstract Schema.Type valueType();
}

public static class IntArray extends PrimitiveArray<Integer> {
private static final int[] EMPTY = new int[0];

private int[] elements = EMPTY;

public IntArray(int capacity, Schema schema) {
super(schema);
if (!Schema.Type.INT.equals(schema.getElementType().getType()))
throw new AvroRuntimeException("Not a int array schema: " + schema);
if (capacity != 0)
elements = new int[capacity];
}
Expand Down Expand Up @@ -127,17 +166,20 @@ protected void swap(final int index1, final int index2) {
elements[index1] = elements[index2];
elements[index2] = tmp;
}

@Override
public Schema.Type valueType() {
return Schema.Type.INT;
}
}

public static class LongArray extends GenericData.AbstractArray<Long> {
public static class LongArray extends PrimitiveArray<Long> {
private static final long[] EMPTY = new long[0];

private long[] elements = EMPTY;

public LongArray(int capacity, Schema schema) {
super(schema);
if (!Schema.Type.LONG.equals(schema.getElementType().getType()))
throw new AvroRuntimeException("Not a long array schema: " + schema);
if (capacity != 0)
elements = new long[capacity];
}
Expand Down Expand Up @@ -231,17 +273,20 @@ protected void swap(final int index1, final int index2) {
elements[index1] = elements[index2];
elements[index2] = tmp;
}

@Override
public Schema.Type valueType() {
return Schema.Type.LONG;
}
}

public static class BooleanArray extends GenericData.AbstractArray<Boolean> {
public static class BooleanArray extends PrimitiveArray<Boolean> {
private static final byte[] EMPTY = new byte[0];

private byte[] elements = EMPTY;

public BooleanArray(int capacity, Schema schema) {
super(schema);
if (!Schema.Type.BOOLEAN.equals(schema.getElementType().getType()))
throw new AvroRuntimeException("Not a boolean array schema: " + schema);
if (capacity != 0)
elements = new byte[1 + (capacity / Byte.SIZE)];
}
Expand Down Expand Up @@ -396,17 +441,20 @@ protected void swap(final int index1, final int index2) {
this.set(index1, this.get(index2));
this.set(index2, tmp);
}

@Override
public Schema.Type valueType() {
return Schema.Type.BOOLEAN;
}
}

public static class FloatArray extends GenericData.AbstractArray<Float> {
public static class FloatArray extends PrimitiveArray<Float> {
private static final float[] EMPTY = new float[0];

private float[] elements = EMPTY;

public FloatArray(int capacity, Schema schema) {
super(schema);
if (!Schema.Type.FLOAT.equals(schema.getElementType().getType()))
throw new AvroRuntimeException("Not a float array schema: " + schema);
if (capacity != 0)
elements = new float[capacity];
}
Expand Down Expand Up @@ -500,17 +548,20 @@ protected void swap(final int index1, final int index2) {
this.set(index1, this.get(index2));
this.set(index2, tmp);
}

@Override
public Schema.Type valueType() {
return Schema.Type.FLOAT;
}
}

public static class DoubleArray extends GenericData.AbstractArray<Double> {
public static class DoubleArray extends PrimitiveArray<Double> {
private static final double[] EMPTY = new double[0];

private double[] elements = EMPTY;

public DoubleArray(int capacity, Schema schema) {
super(schema);
if (!Schema.Type.DOUBLE.equals(schema.getElementType().getType()))
throw new AvroRuntimeException("Not a double array schema: " + schema);
if (capacity != 0)
elements = new double[capacity];
}
Expand Down Expand Up @@ -604,6 +655,11 @@ protected void swap(final int index1, final int index2) {
this.set(index1, this.get(index2));
this.set(index2, tmp);
}

@Override
public Schema.Type valueType() {
return Schema.Type.DOUBLE;
}
}

}
Loading