Skip to content

Commit 47487eb

Browse files
committed
Fixed wrong type in ApacheArrowPartsHandler
1 parent 28d9fed commit 47487eb

4 files changed

Lines changed: 62 additions & 10 deletions

File tree

query/src/main/java/tech/ydb/query/result/arrow/ApacheArrowCompressedPartsHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import io.grpc.ExperimentalApi;
44
import org.apache.arrow.compression.CommonsCompressionFactory;
5-
import org.apache.arrow.memory.RootAllocator;
5+
import org.apache.arrow.memory.BufferAllocator;
66
import org.apache.arrow.vector.VectorLoader;
77
import org.apache.arrow.vector.VectorSchemaRoot;
88

@@ -12,7 +12,7 @@
1212
*/
1313
@ExperimentalApi("ApacheArrow support is experimental and API may change without notice")
1414
public abstract class ApacheArrowCompressedPartsHandler extends ApacheArrowPartsHandler {
15-
public ApacheArrowCompressedPartsHandler(RootAllocator allocator) {
15+
public ApacheArrowCompressedPartsHandler(BufferAllocator allocator) {
1616
super(allocator);
1717
}
1818

query/src/main/java/tech/ydb/query/result/arrow/ApacheArrowPartsHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import com.google.protobuf.ByteString;
88
import io.grpc.ExperimentalApi;
9-
import org.apache.arrow.memory.RootAllocator;
9+
import org.apache.arrow.memory.BufferAllocator;
1010
import org.apache.arrow.vector.VectorLoader;
1111
import org.apache.arrow.vector.VectorSchemaRoot;
1212
import org.apache.arrow.vector.ipc.ReadChannel;
@@ -24,9 +24,9 @@
2424
*/
2525
@ExperimentalApi("ApacheArrow support is experimental and API may change without notice")
2626
public abstract class ApacheArrowPartsHandler implements QueryStream.PartsHandler {
27-
private final RootAllocator allocator;
27+
private final BufferAllocator allocator;
2828

29-
public ApacheArrowPartsHandler(RootAllocator allocator) {
29+
public ApacheArrowPartsHandler(BufferAllocator allocator) {
3030
this.allocator = allocator;
3131
}
3232

query/src/test/java/tech/ydb/query/impl/ApacheArrowTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public class ApacheArrowTest {
5454
private static final String COLUMN_TABLE_NAME = "arrow/cs_table";
5555

5656
private static final TableDescription ROW_TABLE = AllTypesRecord.createTableDescription(false);
57-
private static final TableDescription COLUMN_TABLE = AllTypesRecord.createTableDescription(true);
57+
private static final TableDescription COLUMN_TABLE = AllTypesRecord.createTableDescription(false);
5858

5959
private static final List<AllTypesRecord> ROW_BATCH = AllTypesRecord.randomBatch(1, 0, 2000);
6060
private static final List<AllTypesRecord> COLUMN_BATCH = AllTypesRecord.randomBatch(1, 0, 5000);
@@ -110,8 +110,8 @@ public static void initTables() {
110110
allocator = new RootAllocator();
111111
client = QueryClient.newClient(YDB).build();
112112

113-
initTable(tablePath(ROW_TABLE_NAME), AllTypesRecord.createTableDescription(false), ROW_BATCH);
114-
initTable(tablePath(COLUMN_TABLE_NAME), AllTypesRecord.createTableDescription(true), COLUMN_BATCH);
113+
initTable(tablePath(ROW_TABLE_NAME), ROW_TABLE, ROW_BATCH);
114+
initTable(tablePath(COLUMN_TABLE_NAME), COLUMN_TABLE, COLUMN_BATCH);
115115
}
116116

117117
@AfterClass

table/src/test/java/tech/ydb/table/integration/BulkUpsertTest.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.junit.After;
1414
import org.junit.Assert;
1515
import org.junit.ClassRule;
16+
import org.junit.Ignore;
1617
import org.junit.Test;
1718

1819
import tech.ydb.core.grpc.GrpcReadStream;
@@ -22,12 +23,15 @@
2223
import tech.ydb.table.impl.SimpleTableClient;
2324
import tech.ydb.table.query.BulkUpsertData;
2425
import tech.ydb.table.query.Params;
26+
import tech.ydb.table.query.arrow.ApacheArrowData;
2527
import tech.ydb.table.query.arrow.ApacheArrowWriter;
2628
import tech.ydb.table.result.ResultSetReader;
2729
import tech.ydb.table.rpc.grpc.GrpcTableRpc;
2830
import tech.ydb.table.settings.ExecuteScanQuerySettings;
2931
import tech.ydb.table.settings.ExecuteSchemeQuerySettings;
32+
import tech.ydb.table.utils.Hex;
3033
import tech.ydb.table.values.ListValue;
34+
import tech.ydb.table.values.PrimitiveType;
3135
import tech.ydb.table.values.PrimitiveValue;
3236
import tech.ydb.test.junit4.GrpcTransportRule;
3337

@@ -160,11 +164,59 @@ public void writeProtobufToColumnShardTable() {
160164
Assert.assertEquals(5000, rows2Count);
161165
}
162166

167+
@Test
168+
@Ignore("https://github.com/ydb-platform/ydb/issues/36331")
169+
public void writeApacheArrowEmptyStringToDataShardTest() {
170+
// Create table
171+
TableDescription table = TableDescription.newBuilder()
172+
.addNullableColumn("id", PrimitiveType.Int32)
173+
.addNullableColumn("text", PrimitiveType.Text)
174+
.setPrimaryKey("id")
175+
.build();
176+
177+
createTable(table);
178+
179+
ApacheArrowWriter.Schema schema = ApacheArrowWriter.newSchema()
180+
.addNullableColumn("id", PrimitiveType.Int32)
181+
.addNullableColumn("text", PrimitiveType.Text);
182+
183+
try (BufferAllocator allocator = new RootAllocator()) {
184+
try (ApacheArrowWriter writer = schema.createWriter(allocator)) {
185+
// create batch with estimated size
186+
ApacheArrowWriter.Batch batch = writer.createNewBatch(1);
187+
ApacheArrowWriter.Row row = batch.writeNextRow();
188+
row.writeInt32("id", 1);
189+
row.writeNull("text");
190+
ApacheArrowData data = batch.buildBatch();
191+
System.out.println("SCHEMA = \n" + Hex.toHex(data.getSchema()));
192+
System.out.println("DATA = \n" + Hex.toHex(data.getData()));
193+
bulkUpsert(data);
194+
} catch (IOException ex) {
195+
throw new AssertionError("Cannot serialize apache arrow", ex);
196+
}
197+
}
198+
199+
retryCtx.supplyStatus(session -> {
200+
GrpcReadStream<ResultSetReader> stream = session.executeScanQuery(
201+
"SELECT * FROM `" + TEST_TABLE + "`;",
202+
Params.empty(),
203+
ExecuteScanQuerySettings.newBuilder().build()
204+
);
205+
206+
return stream.start((rs) -> {
207+
Assert.assertTrue(rs.next());
208+
Assert.assertEquals(1, rs.getColumn("id").getInt32());
209+
Assert.assertEquals("", rs.getColumn("text").getText());
210+
Assert.assertFalse(rs.next());
211+
});
212+
}).join().expectSuccess("cannot read table");
213+
}
214+
163215
@Test
164216
public void writeApacheArrowToDataShardTest() {
165217
// Create table
166218
TableDescription table = AllTypesRecord.createTableDescription(false);
167-
retryCtx.supplyStatus(s -> s.createTable(tablePath(), table)).join().expectSuccess("Cannot create table");
219+
createTable(table);
168220

169221
Set<String> columnNames = table.getColumns().stream().map(TableColumn::getName).collect(Collectors.toSet());
170222

@@ -210,7 +262,7 @@ public void writeApacheArrowToDataShardTest() {
210262
public void writeApacheArrowToColumnShardTest() {
211263
// Create table
212264
TableDescription table = AllTypesRecord.createTableDescription(true);
213-
retryCtx.supplyStatus(s -> s.createTable(tablePath(), table)).join().expectSuccess("Cannot create table");
265+
createTable(table);
214266

215267
Set<String> columnNames = table.getColumns().stream().map(TableColumn::getName).collect(Collectors.toSet());
216268

0 commit comments

Comments
 (0)