Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions lang/java/avro/src/main/java/org/apache/avro/Conversions.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.time.Duration;

public class Conversions {

Expand Down Expand Up @@ -146,6 +149,111 @@ private static BigDecimal validate(final LogicalTypes.Decimal decimal, BigDecima
}
}

public static class DurationConversion extends Conversion<Duration> {

private static final int MONTH_DAYS = 30;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A month has 28, 29, 30 or 31 days. Hardcoding a single value (31 days/month occurs more often) is basically a bug. Note that this cannot be fixed if we choose to support all possible duration values (see the conversation comments for more info).

private static final int GRANULARITY_BYTES = 4;
private static final String DOC_URL = "http://avro.apache.org/docs/current/spec.html#Duration";
private static final String DOC_STR = "For more information on the duration logical type, please refer to " + DOC_URL;

private static final byte[] EMPTY_BYTE_ARRAY = new byte[12];

private static byte[] toBytes(int value){
return ByteBuffer.allocate(GRANULARITY_BYTES).order(ByteOrder.LITTLE_ENDIAN).putInt(value).array();
}

private static int toInt(byte[] bytes) {
return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getInt();
}

private static byte[] buildByteArray(int months, int days, int milliSeconds) {
byte[] monthBytes = toBytes(months);
byte[] dayBytes = toBytes(days);
byte[] milliSecondBytes = toBytes(milliSeconds);

return ByteBuffer
.allocate(monthBytes.length + dayBytes.length + milliSecondBytes.length)
.put(monthBytes)
.put(dayBytes)
.put(milliSecondBytes)
.array();
}

@Override
public Class<Duration> getConvertedType() {
return Duration.class;
}

@Override
public String getLogicalTypeName() {
return "duration";
}

@Override
public Duration fromFixed(GenericFixed value, Schema schema, LogicalType type) {
byte[] payload = value.bytes();
int payloadLength = payload.length;

if (payloadLength != GRANULARITY_BYTES * 3) {
throw new IllegalArgumentException("Duration must be stored on a 12-byte long value, but " + payloadLength + " bytes given. " + DOC_STR);
}

if (Arrays.equals(payload, EMPTY_BYTE_ARRAY)) {
return Duration.ZERO;
}

byte[] monthBytes = new byte[]{ payload[0], payload[1], payload[2], payload[3] };
byte[] dayBytes = new byte[]{ payload[4], payload[5], payload[6], payload[7] };
byte[] milliSecondBytes = new byte[]{ payload[8], payload[9], payload[10], payload[11] };

int months = toInt(monthBytes);
int days = toInt(dayBytes);
int milliSeconds = toInt(milliSecondBytes);

return Duration.ofDays((long) months * MONTH_DAYS + days).plusMillis(milliSeconds);
}

@Override
public GenericFixed toFixed(Duration value, Schema schema, LogicalType type) {

if (value == null || value.isZero()) {
return new GenericData.Fixed(schema, EMPTY_BYTE_ARRAY);
}

long totalDays = value.toDays();

int months;
int days;
int milliSeconds;

try {
months = (int) (totalDays / MONTH_DAYS);
} catch (Throwable e) {
throw new IllegalArgumentException("The months part of a duration must fit a 4-byte int, longer duration given. " + DOC_STR, e);
}

try {
days = (int) (totalDays % MONTH_DAYS);
} catch (Throwable e) {
throw new IllegalArgumentException("The days part of a duration must fit a 4-byte int, longer duration given. " + DOC_STR, e);
}

try {
milliSeconds = (int) value.minus(totalDays, ChronoUnit.DAYS).toMillis();
} catch (Throwable e) {
throw new IllegalArgumentException("The milliseconds part of a duration must fit a 4-byte int, longer duration given. " + DOC_STR, e);
}

if (value.getNano() - milliSeconds * 1_000_000 > 0) {
milliSeconds += 1;
}

byte[] result = buildByteArray(months, days, milliSeconds);

return new GenericData.Fixed(schema, result);
}
}

/**
* Convert a underlying representation of a logical type (such as a ByteBuffer)
* to a higher level object (such as a BigDecimal).
Expand Down
26 changes: 26 additions & 0 deletions lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ private static LogicalType fromSchemaImpl(Schema schema, boolean throwErrors) {
case LOCAL_TIMESTAMP_MILLIS:
logicalType = LOCAL_TIMESTAMP_MILLIS_TYPE;
break;
case DURATION:
logicalType = DURATION_TYPE;
break;
default:
final LogicalTypeFactory typeFactory = REGISTERED_TYPES.get(typeName);
logicalType = (typeFactory == null) ? null : typeFactory.fromSchema(schema);
Expand Down Expand Up @@ -134,6 +137,7 @@ private static LogicalType fromSchemaImpl(Schema schema, boolean throwErrors) {
private static final String TIMESTAMP_MICROS = "timestamp-micros";
private static final String LOCAL_TIMESTAMP_MILLIS = "local-timestamp-millis";
private static final String LOCAL_TIMESTAMP_MICROS = "local-timestamp-micros";
private static final String DURATION = "duration";

/** Create a Decimal LogicalType with the given precision and scale 0 */
public static Decimal decimal(int precision) {
Expand Down Expand Up @@ -193,6 +197,13 @@ public static LocalTimestampMicros localTimestampMicros() {
return LOCAL_TIMESTAMP_MICROS_TYPE;
}

private static final Duration DURATION_TYPE = new Duration();

public static Duration duration() {
return DURATION_TYPE;
}


/** Decimal represents arbitrary-precision fixed-scale decimal numbers */
public static class Decimal extends LogicalType {
private static final String PRECISION_PROP = "precision";
Expand Down Expand Up @@ -410,4 +421,19 @@ public void validate(Schema schema) {
}
}

/** Duration represents an amount of time defined by a number of months, days and milliseconds */
public static class Duration extends LogicalType {
private Duration() {
super(DURATION);
}

@Override
public void validate(Schema schema) {
super.validate(schema);
if (schema.getType() != Schema.Type.FIXED || schema.getFixedSize() != 12) {
throw new IllegalArgumentException("Duration can only be used with an underlying fixed type of size 12");
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro;

import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;

import static java.math.RoundingMode.HALF_EVEN;
import static org.junit.Assert.*;

/*
* TODO test for negative duration
* TODO test for empty byte array (0 duration)
*/
public class TestDurationConversion {

private static final Conversion<Duration> CONVERSION = new Conversions.DurationConversion();

@Rule
public ExpectedException expectedException = ExpectedException.none();

private Schema schema;
private LogicalType logicalType;

@Before
public void setup() {
schema = Schema.createFixed("aFixed", null, null, 12);
schema.addProp("logicalType", "duration");
logicalType = LogicalTypes.fromSchema(schema);
}

@Test
public void testToFromFixed() {
final Duration value = Duration.ofDays(10).plusMillis(100);
final GenericFixed fixed = CONVERSION.toFixed(value, schema, logicalType);
final Duration result = CONVERSION.fromFixed(fixed, schema, logicalType);
assertEquals(value, result);
}

@Test
public void testConvertingMillisecondsFromBytes() {
LogicalTypes.Duration duration = (LogicalTypes.Duration) logicalType;

final Duration durationValue = Duration.ofMillis(200);

final byte[] bytes = new byte[]{
0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0,
-0x38, 0x0, 0x0, 0x0
};

final GenericFixed dd = new GenericData.Fixed(schema, bytes);

final Duration fromBytes = CONVERSION.fromFixed(dd, schema, duration);

assertEquals(durationValue, fromBytes);
}

@Test
public void testConvertingDaysFromBytes() {
LogicalTypes.Duration duration = (LogicalTypes.Duration) logicalType;

final Duration durationValue = Duration.ofDays(29);

final byte[] bytes = new byte[]{
0x0, 0x0, 0x0, 0x0,
0x1D, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0
};

final GenericFixed dd = new GenericData.Fixed(schema, bytes);

final Duration fromBytes = CONVERSION.fromFixed(dd, schema, duration);

assertEquals(durationValue, fromBytes);
}

@Test
public void testConvertingMonthsFromBytes() {
LogicalTypes.Duration duration = (LogicalTypes.Duration) logicalType;

final Duration durationValue = Duration.ofDays(60);

final byte[] bytes = new byte[]{
0x02, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0
};

final GenericFixed dd = new GenericData.Fixed(schema, bytes);

final Duration fromBytes = CONVERSION.fromFixed(dd, schema, duration);

assertEquals(durationValue, fromBytes);
}

@Test
public void testConvertingMillisecondsToBytes() {
LogicalTypes.Duration duration = (LogicalTypes.Duration) logicalType;

final Duration durationValue = Duration.ofMillis(200);

final byte[] bytes = new byte[]{
0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0,
-0x38, 0x0, 0x0, 0x0
};

final GenericFixed fixed = CONVERSION.toFixed(durationValue, schema, duration);

assertArrayEquals(bytes, fixed.bytes());
}

@Test
public void testConvertingMillisecondsWithNanosecondAdjustmentToBytes() {
LogicalTypes.Duration duration = (LogicalTypes.Duration) logicalType;

final Duration durationValue = Duration.ofMillis(200).plusNanos(10);

final byte[] bytes = new byte[]{
0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0,
-0x37, 0x0, 0x0, 0x0
};

final GenericFixed fixed = CONVERSION.toFixed(durationValue, schema, duration);

assertArrayEquals(bytes, fixed.bytes());
}

@Test
public void testConvertingDaysToBytes() {
LogicalTypes.Duration duration = (LogicalTypes.Duration) logicalType;

final Duration durationValue = Duration.ofDays(29);

final byte[] bytes = new byte[]{
0x0, 0x0, 0x0, 0x0,
0x1D, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0
};

final GenericFixed fixed = CONVERSION.toFixed(durationValue, schema, duration);

assertArrayEquals(bytes, fixed.bytes());
}

@Test
public void testConvertingWeeksToBytes() {
LogicalTypes.Duration duration = (LogicalTypes.Duration) logicalType;

final Duration durationValue = Duration.ofDays(60);

final byte[] bytes = new byte[]{
0x02, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0
};

final GenericFixed fixed = CONVERSION.toFixed(durationValue, schema, duration);

assertArrayEquals(bytes, fixed.bytes());
}
}
Loading