Skip to content

Commit 567cf79

Browse files
GH-834: Fix Avro nullable consumer to use readIndex for unions
1 parent 0f7665f commit 567cf79

6 files changed

Lines changed: 41 additions & 9 deletions

File tree

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,6 @@ cmake_install.cmake
3232
dependency-reduced-pom.xml
3333
install_manifest.txt
3434
target/
35+
.claude/
36+
CLAUDE.md
37+
*.class

adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrowUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ private static Consumer createSkipConsumer(Schema schema) {
431431
case UNION:
432432
List<Consumer> unionDelegates =
433433
schema.getTypes().stream().map(s -> createSkipConsumer(s)).collect(Collectors.toList());
434-
skipFunction = decoder -> unionDelegates.get(decoder.readInt()).consume(decoder);
434+
skipFunction = decoder -> unionDelegates.get(decoder.readIndex()).consume(decoder);
435435

436436
break;
437437
case ARRAY:

adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/AvroNullableConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public AvroNullableConsumer(Consumer<T> delegate, int nullIndex) {
4141

4242
@Override
4343
public void consume(Decoder decoder) throws IOException {
44-
int typeIndex = decoder.readInt();
44+
int typeIndex = decoder.readIndex();
4545
if (typeIndex == nullIndex) {
4646
decoder.readNull();
4747
delegate.addNull();

adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/AvroStringConsumer.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,17 @@
1717
package org.apache.arrow.adapter.avro.consumers;
1818

1919
import java.io.IOException;
20-
import java.nio.ByteBuffer;
2120
import org.apache.arrow.vector.VarCharVector;
2221
import org.apache.avro.io.Decoder;
22+
import org.apache.avro.util.Utf8;
2323

2424
/**
2525
* Consumer which consume string type values from avro decoder. Write the data to {@link
2626
* VarCharVector}.
2727
*/
2828
public class AvroStringConsumer extends BaseAvroConsumer<VarCharVector> {
2929

30-
private ByteBuffer cacheBuffer;
30+
private Utf8 cachedUtf8;
3131

3232
/** Instantiate a AvroStringConsumer. */
3333
public AvroStringConsumer(VarCharVector vector) {
@@ -36,9 +36,7 @@ public AvroStringConsumer(VarCharVector vector) {
3636

3737
@Override
3838
public void consume(Decoder decoder) throws IOException {
39-
// cacheBuffer is initialized null and create in the first consume,
40-
// if its capacity < size to read, decoder will create a new one with new capacity.
41-
cacheBuffer = decoder.readBytes(cacheBuffer);
42-
vector.setSafe(currentIndex++, cacheBuffer, 0, cacheBuffer.limit());
39+
cachedUtf8 = decoder.readString(cachedUtf8);
40+
vector.setSafe(currentIndex++, cachedUtf8.getBytes(), 0, cachedUtf8.getByteLength());
4341
}
4442
}

adapter/avro/src/main/java/org/apache/arrow/adapter/avro/consumers/AvroUnionsConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public AvroUnionsConsumer(UnionVector vector, Consumer[] delegates, Types.MinorT
4242

4343
@Override
4444
public void consume(Decoder decoder) throws IOException {
45-
int fieldIndex = decoder.readInt();
45+
int fieldIndex = decoder.readIndex();
4646

4747
ensureInnerVectorCapacity(currentIndex + 1, fieldIndex);
4848
Consumer delegate = delegates[fieldIndex];

adapter/avro/src/test/java/org/apache/arrow/adapter/avro/AvroToArrowTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import static org.junit.jupiter.api.Assertions.assertEquals;
2020

21+
import java.io.ByteArrayInputStream;
22+
import java.io.ByteArrayOutputStream;
2123
import java.nio.ByteBuffer;
2224
import java.nio.charset.StandardCharsets;
2325
import java.util.ArrayList;
@@ -33,7 +35,13 @@
3335
import org.apache.arrow.vector.complex.StructVector;
3436
import org.apache.avro.Schema;
3537
import org.apache.avro.generic.GenericData;
38+
import org.apache.avro.generic.GenericDatumWriter;
3639
import org.apache.avro.generic.GenericRecord;
40+
import org.apache.avro.io.BinaryEncoder;
41+
import org.apache.avro.io.DatumWriter;
42+
import org.apache.avro.io.Decoder;
43+
import org.apache.avro.io.DecoderFactory;
44+
import org.apache.avro.io.EncoderFactory;
3745
import org.junit.jupiter.api.Test;
3846

3947
public class AvroToArrowTest extends AvroTestBase {
@@ -474,4 +482,27 @@ public void testNullableUnionType() throws Exception {
474482

475483
checkPrimitiveResult(expected, vector);
476484
}
485+
486+
@Test
487+
public void testNullableStringTypeWithValidatingDecoder() throws Exception {
488+
Schema schema = getSchema("test_nullable_string.avsc");
489+
490+
GenericRecord record = new GenericData.Record(schema);
491+
record.put(0, "hello");
492+
493+
ByteArrayOutputStream out = new ByteArrayOutputStream();
494+
BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(out, null);
495+
DatumWriter<Object> writer = new GenericDatumWriter<>(schema);
496+
writer.write(record, encoder);
497+
encoder.flush();
498+
499+
ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
500+
Decoder binaryDecoder = DecoderFactory.get().directBinaryDecoder(in, null);
501+
Decoder validatingDecoder = DecoderFactory.get().validatingDecoder(schema, binaryDecoder);
502+
503+
VectorSchemaRoot root = AvroToArrow.avroToArrow(schema, validatingDecoder, config);
504+
assertEquals(1, root.getRowCount());
505+
FieldVector vector = root.getVector("f0");
506+
assertEquals("hello", vector.getObject(0).toString());
507+
}
477508
}

0 commit comments

Comments
 (0)