Skip to content

Commit eda2c87

Browse files
xxlaykxxlriggs
authored andcommitted
apacheGH-836: Added support of ExtensionType for ComplexCopier (apache#837)
Updated ComplexCopier to support ExtensionType - it contains two **copy** methods ``` public static void copy(FieldReader input, FieldWriter output) //for not breaking existing logic public static void copy(FieldReader input, FieldWriter output, ExtensionTypeWriterFactory extensionTypeWriterFactory) ``` Also updated ComplexCopier tests. Closes apache#836.
1 parent e5b70a7 commit eda2c87

20 files changed

+588
-9
lines changed

vector/src/main/codegen/includes/vv_imports.ftl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.arrow.vector.complex.*;
3434
import org.apache.arrow.vector.complex.reader.*;
3535
import org.apache.arrow.vector.complex.impl.*;
3636
import org.apache.arrow.vector.complex.writer.*;
37+
import org.apache.arrow.vector.complex.writer.BaseWriter.ExtensionWriter;
3738
import org.apache.arrow.vector.complex.writer.BaseWriter.StructWriter;
3839
import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter;
3940
import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;

vector/src/main/codegen/templates/AbstractFieldReader.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ public void copyAsField(String name, ${name}Writer writer) {
109109

110110
</#list></#list>
111111

112+
public void copyAsValue(StructWriter writer, ExtensionTypeWriterFactory writerFactory) {
113+
fail("CopyAsValue StructWriter");
114+
}
115+
112116
public void read(ExtensionHolder holder) {
113117
fail("Extension");
114118
}

vector/src/main/codegen/templates/BaseReader.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public interface RepeatedStructReader extends StructReader{
4949
boolean next();
5050
int size();
5151
void copyAsValue(StructWriter writer);
52+
void copyAsValue(StructWriter writer, ExtensionTypeWriterFactory writerFactory);
5253
}
5354

5455
public interface ListReader extends BaseReader{
@@ -59,6 +60,7 @@ public interface RepeatedListReader extends ListReader{
5960
boolean next();
6061
int size();
6162
void copyAsValue(ListWriter writer);
63+
void copyAsValue(ListWriter writer, ExtensionTypeWriterFactory writerFactory);
6264
}
6365

6466
public interface MapReader extends BaseReader{
@@ -69,6 +71,7 @@ public interface RepeatedMapReader extends MapReader{
6971
boolean next();
7072
int size();
7173
void copyAsValue(MapWriter writer);
74+
void copyAsValue(MapWriter writer, ExtensionTypeWriterFactory writerFactory);
7275
}
7376

7477
public interface ScalarReader extends

vector/src/main/codegen/templates/ComplexCopier.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,14 @@ public class ComplexCopier {
4242
* @param output field to write to
4343
*/
4444
public static void copy(FieldReader input, FieldWriter output) {
45-
writeValue(input, output);
45+
writeValue(input, output, null);
4646
}
4747

48-
private static void writeValue(FieldReader reader, FieldWriter writer) {
48+
public static void copy(FieldReader input, FieldWriter output, ExtensionTypeWriterFactory extensionTypeWriterFactory) {
49+
writeValue(input, output, extensionTypeWriterFactory);
50+
}
51+
52+
private static void writeValue(FieldReader reader, FieldWriter writer, ExtensionTypeWriterFactory extensionTypeWriterFactory) {
4953
final MinorType mt = reader.getMinorType();
5054

5155
switch (mt) {
@@ -61,7 +65,7 @@ private static void writeValue(FieldReader reader, FieldWriter writer) {
6165
FieldReader childReader = reader.reader();
6266
FieldWriter childWriter = getListWriterForReader(childReader, writer);
6367
if (childReader.isSet()) {
64-
writeValue(childReader, childWriter);
68+
writeValue(childReader, childWriter, extensionTypeWriterFactory);
6569
} else {
6670
childWriter.writeNull();
6771
}
@@ -79,8 +83,8 @@ private static void writeValue(FieldReader reader, FieldWriter writer) {
7983
FieldReader structReader = reader.reader();
8084
if (structReader.isSet()) {
8185
writer.startEntry();
82-
writeValue(mapReader.key(), getMapWriterForReader(mapReader.key(), writer.key()));
83-
writeValue(mapReader.value(), getMapWriterForReader(mapReader.value(), writer.value()));
86+
writeValue(mapReader.key(), getMapWriterForReader(mapReader.key(), writer.key()), extensionTypeWriterFactory);
87+
writeValue(mapReader.value(), getMapWriterForReader(mapReader.value(), writer.value()), extensionTypeWriterFactory);
8488
writer.endEntry();
8589
} else {
8690
writer.writeNull();
@@ -99,7 +103,7 @@ private static void writeValue(FieldReader reader, FieldWriter writer) {
99103
if (childReader.getMinorType() != Types.MinorType.NULL) {
100104
FieldWriter childWriter = getStructWriterForReader(childReader, writer, name);
101105
if (childReader.isSet()) {
102-
writeValue(childReader, childWriter);
106+
writeValue(childReader, childWriter, extensionTypeWriterFactory);
103107
} else {
104108
childWriter.writeNull();
105109
}
@@ -110,6 +114,20 @@ private static void writeValue(FieldReader reader, FieldWriter writer) {
110114
writer.writeNull();
111115
}
112116
break;
117+
case EXTENSIONTYPE:
118+
if (extensionTypeWriterFactory == null) {
119+
throw new IllegalArgumentException("Must provide ExtensionTypeWriterFactory");
120+
}
121+
if (reader.isSet()) {
122+
Object value = reader.readObject();
123+
if (value != null) {
124+
writer.addExtensionTypeWriterFactory(extensionTypeWriterFactory);
125+
writer.writeExtension(value);
126+
}
127+
} else {
128+
writer.writeNull();
129+
}
130+
break;
113131
<#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
114132
<#assign fields = minor.fields!type.fields />
115133
<#assign uncappedName = name?uncap_first/>
@@ -162,6 +180,9 @@ private static FieldWriter getStructWriterForReader(FieldReader reader, StructWr
162180
return (FieldWriter) writer.map(name);
163181
case LISTVIEW:
164182
return (FieldWriter) writer.listView(name);
183+
case EXTENSIONTYPE:
184+
ExtensionWriter extensionWriter = writer.extension(name, reader.getField().getType());
185+
return (FieldWriter) extensionWriter;
165186
default:
166187
throw new UnsupportedOperationException(reader.getMinorType().toString());
167188
}
@@ -186,6 +207,9 @@ private static FieldWriter getListWriterForReader(FieldReader reader, ListWriter
186207
return (FieldWriter) writer.list();
187208
case LISTVIEW:
188209
return (FieldWriter) writer.listView();
210+
case EXTENSIONTYPE:
211+
ExtensionWriter extensionWriter = writer.extension(reader.getField().getType());
212+
return (FieldWriter) extensionWriter;
189213
default:
190214
throw new UnsupportedOperationException(reader.getMinorType().toString());
191215
}
@@ -211,6 +235,9 @@ private static FieldWriter getMapWriterForReader(FieldReader reader, MapWriter w
211235
return (FieldWriter) writer.listView();
212236
case MAP:
213237
return (FieldWriter) writer.map(false);
238+
case EXTENSIONTYPE:
239+
ExtensionWriter extensionWriter = writer.extension(reader.getField().getType());
240+
return (FieldWriter) extensionWriter;
214241
default:
215242
throw new UnsupportedOperationException(reader.getMinorType().toString());
216243
}

vector/src/main/codegen/templates/NullReader.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public void read(int arrayIndex, Nullable${name}Holder holder){
8686
}
8787
</#list></#list>
8888

89+
public void copyAsValue(StructWriter writer, ExtensionTypeWriterFactory writerFactory){}
8990
public void read(ExtensionHolder holder) {
9091
holder.isSet = 0;
9192
}

vector/src/main/java/org/apache/arrow/vector/BaseValueVector.java

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.arrow.memory.BufferAllocator;
2323
import org.apache.arrow.memory.ReferenceManager;
2424
import org.apache.arrow.util.Preconditions;
25+
import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
2526
import org.apache.arrow.vector.complex.reader.FieldReader;
2627
import org.apache.arrow.vector.util.DataSizeRoundingUtil;
2728
import org.apache.arrow.vector.util.TransferPair;
@@ -248,4 +249,128 @@ public void copyFrom(int fromIndex, int thisIndex, ValueVector from) {
248249
public void copyFromSafe(int fromIndex, int thisIndex, ValueVector from) {
249250
throw new UnsupportedOperationException();
250251
}
252+
253+
@Override
254+
public void copyFrom(
255+
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
256+
throw new UnsupportedOperationException();
257+
}
258+
259+
@Override
260+
public void copyFromSafe(
261+
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
262+
throw new UnsupportedOperationException();
263+
}
264+
265+
/**
266+
* Transfer the validity buffer from `validityBuffer` to the target vector's `validityBuffer`.
267+
* Start at `startIndex` and copy `length` number of elements. If the starting index is 8 byte
268+
* aligned, then the buffer is sliced from that index and ownership is transferred. If not,
269+
* individual bytes are copied.
270+
*
271+
* @param startIndex starting index
272+
* @param length number of elements to be copied
273+
* @param target target vector
274+
*/
275+
protected void splitAndTransferValidityBuffer(
276+
int startIndex, int length, BaseValueVector target) {
277+
int offset = startIndex % 8;
278+
279+
if (length <= 0) {
280+
return;
281+
}
282+
if (offset == 0) {
283+
sliceAndTransferValidityBuffer(startIndex, length, target);
284+
} else {
285+
copyValidityBuffer(startIndex, length, target);
286+
}
287+
}
288+
289+
/**
290+
* If the start index is 8 byte aligned, slice `validityBuffer` and transfer ownership to
291+
* `target`'s `validityBuffer`.
292+
*
293+
* @param startIndex starting index
294+
* @param length number of elements to be copied
295+
* @param target target vector
296+
*/
297+
protected void sliceAndTransferValidityBuffer(
298+
int startIndex, int length, BaseValueVector target) {
299+
final int firstByteSource = BitVectorHelper.byteIndex(startIndex);
300+
final int byteSizeTarget = getValidityBufferSizeFromCount(length);
301+
302+
if (target.validityBuffer != null) {
303+
target.validityBuffer.getReferenceManager().release();
304+
}
305+
target.validityBuffer = validityBuffer.slice(firstByteSource, byteSizeTarget);
306+
target.validityBuffer.getReferenceManager().retain(1);
307+
}
308+
309+
/**
310+
* Allocate new validity buffer for `target` and copy bytes from `validityBuffer`. Precise details
311+
* in the comments below.
312+
*
313+
* @param startIndex starting index
314+
* @param length number of elements to be copied
315+
* @param target target vector
316+
*/
317+
protected void copyValidityBuffer(int startIndex, int length, BaseValueVector target) {
318+
final int firstByteSource = BitVectorHelper.byteIndex(startIndex);
319+
final int lastByteSource = BitVectorHelper.byteIndex(valueCount - 1);
320+
final int byteSizeTarget = getValidityBufferSizeFromCount(length);
321+
final int offset = startIndex % 8;
322+
323+
/* Copy data
324+
* When the first bit starts from the middle of a byte (offset != 0),
325+
* copy data from src BitVector.
326+
* Each byte in the target is composed by a part in i-th byte,
327+
* another part in (i+1)-th byte.
328+
*/
329+
target.allocateValidityBuffer(byteSizeTarget);
330+
331+
for (int i = 0; i < byteSizeTarget - 1; i++) {
332+
byte b1 =
333+
BitVectorHelper.getBitsFromCurrentByte(this.validityBuffer, firstByteSource + i, offset);
334+
byte b2 =
335+
BitVectorHelper.getBitsFromNextByte(this.validityBuffer, firstByteSource + i + 1, offset);
336+
337+
target.validityBuffer.setByte(i, (b1 + b2));
338+
}
339+
340+
/* Copying the last piece is done in the following manner:
341+
* if the source vector has 1 or more bytes remaining, we copy
342+
* the last piece as a byte formed by shifting data
343+
* from the current byte and the next byte.
344+
*
345+
* if the source vector has no more bytes remaining
346+
* (we are at the last byte), we copy the last piece as a byte
347+
* by shifting data from the current byte.
348+
*/
349+
if ((firstByteSource + byteSizeTarget - 1) < lastByteSource) {
350+
byte b1 =
351+
BitVectorHelper.getBitsFromCurrentByte(
352+
this.validityBuffer, firstByteSource + byteSizeTarget - 1, offset);
353+
byte b2 =
354+
BitVectorHelper.getBitsFromNextByte(
355+
this.validityBuffer, firstByteSource + byteSizeTarget, offset);
356+
357+
target.validityBuffer.setByte(byteSizeTarget - 1, b1 + b2);
358+
} else {
359+
byte b1 =
360+
BitVectorHelper.getBitsFromCurrentByte(
361+
this.validityBuffer, firstByteSource + byteSizeTarget - 1, offset);
362+
target.validityBuffer.setByte(byteSizeTarget - 1, b1);
363+
}
364+
}
365+
366+
/**
367+
* Allocate new validity buffer for when the bytes need to be copied over.
368+
*
369+
* @param byteSizeTarget desired size of the buffer
370+
*/
371+
protected void allocateValidityBuffer(long byteSizeTarget) {
372+
validityBuffer = allocator.buffer(byteSizeTarget);
373+
validityBuffer.readerIndex(0);
374+
validityBuffer.setZero(0, validityBuffer.capacity());
375+
}
251376
}

vector/src/main/java/org/apache/arrow/vector/NullVector.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.arrow.memory.util.hash.ArrowBufHasher;
2828
import org.apache.arrow.util.Preconditions;
2929
import org.apache.arrow.vector.compare.VectorVisitor;
30+
import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
3031
import org.apache.arrow.vector.complex.impl.NullReader;
3132
import org.apache.arrow.vector.complex.reader.FieldReader;
3233
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
@@ -329,6 +330,18 @@ public void copyFromSafe(int fromIndex, int thisIndex, ValueVector from) {
329330
throw new UnsupportedOperationException();
330331
}
331332

333+
@Override
334+
public void copyFrom(
335+
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
336+
throw new UnsupportedOperationException();
337+
}
338+
339+
@Override
340+
public void copyFromSafe(
341+
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
342+
throw new UnsupportedOperationException();
343+
}
344+
332345
@Override
333346
public String getName() {
334347
return this.getField().getName();

vector/src/main/java/org/apache/arrow/vector/ValueVector.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.arrow.memory.OutOfMemoryException;
2323
import org.apache.arrow.memory.util.hash.ArrowBufHasher;
2424
import org.apache.arrow.vector.compare.VectorVisitor;
25+
import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
2526
import org.apache.arrow.vector.complex.reader.FieldReader;
2627
import org.apache.arrow.vector.types.Types.MinorType;
2728
import org.apache.arrow.vector.types.pojo.Field;
@@ -309,6 +310,30 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {
309310
*/
310311
void copyFromSafe(int fromIndex, int thisIndex, ValueVector from);
311312

313+
/**
314+
* Copy a cell value from a particular index in source vector to a particular position in this
315+
* vector.
316+
*
317+
* @param fromIndex position to copy from in source vector
318+
* @param thisIndex position to copy to in this vector
319+
* @param from source vector
320+
* @param writerFactory the extension type writer factory to use for copying extension type values
321+
*/
322+
void copyFrom(
323+
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory);
324+
325+
/**
326+
* Same as {@link #copyFrom(int, int, ValueVector)} except that it handles the case when the
327+
* capacity of the vector needs to be expanded before copy.
328+
*
329+
* @param fromIndex position to copy from in source vector
330+
* @param thisIndex position to copy to in this vector
331+
* @param from source vector
332+
* @param writerFactory the extension type writer factory to use for copying extension type values
333+
*/
334+
void copyFromSafe(
335+
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory);
336+
312337
/**
313338
* Accept a generic {@link VectorVisitor} and return the result.
314339
*

vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.arrow.vector.DensityAwareVector;
2222
import org.apache.arrow.vector.FieldVector;
2323
import org.apache.arrow.vector.ValueVector;
24+
import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
2425
import org.apache.arrow.vector.types.Types.MinorType;
2526
import org.apache.arrow.vector.types.pojo.ArrowType;
2627
import org.apache.arrow.vector.types.pojo.ArrowType.FixedSizeList;
@@ -151,6 +152,18 @@ public void copyFromSafe(int fromIndex, int thisIndex, ValueVector from) {
151152
throw new UnsupportedOperationException();
152153
}
153154

155+
@Override
156+
public void copyFrom(
157+
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
158+
throw new UnsupportedOperationException();
159+
}
160+
161+
@Override
162+
public void copyFromSafe(
163+
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
164+
throw new UnsupportedOperationException();
165+
}
166+
154167
@Override
155168
public String getName() {
156169
return name;

0 commit comments

Comments
 (0)