Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.grpc.ExperimentalApi;
import org.apache.arrow.compression.CommonsCompressionFactory;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;

Expand All @@ -12,7 +12,7 @@
*/
@ExperimentalApi("ApacheArrow support is experimental and API may change without notice")
public abstract class ApacheArrowCompressedPartsHandler extends ApacheArrowPartsHandler {
public ApacheArrowCompressedPartsHandler(RootAllocator allocator) {
public ApacheArrowCompressedPartsHandler(BufferAllocator allocator) {
super(allocator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import com.google.protobuf.ByteString;
import io.grpc.ExperimentalApi;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ReadChannel;
Expand All @@ -24,9 +24,9 @@
*/
@ExperimentalApi("ApacheArrow support is experimental and API may change without notice")
public abstract class ApacheArrowPartsHandler implements QueryStream.PartsHandler {
private final RootAllocator allocator;
private final BufferAllocator allocator;

public ApacheArrowPartsHandler(RootAllocator allocator) {
public ApacheArrowPartsHandler(BufferAllocator allocator) {
this.allocator = allocator;
}

Expand Down
4 changes: 2 additions & 2 deletions query/src/test/java/tech/ydb/query/impl/ApacheArrowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ public static void initTables() {
allocator = new RootAllocator();
client = QueryClient.newClient(YDB).build();

initTable(tablePath(ROW_TABLE_NAME), AllTypesRecord.createTableDescription(false), ROW_BATCH);
initTable(tablePath(COLUMN_TABLE_NAME), AllTypesRecord.createTableDescription(true), COLUMN_BATCH);
initTable(tablePath(ROW_TABLE_NAME), ROW_TABLE, ROW_BATCH);
initTable(tablePath(COLUMN_TABLE_NAME), COLUMN_TABLE, COLUMN_BATCH);
}

@AfterClass
Expand Down
53 changes: 51 additions & 2 deletions table/src/test/java/tech/ydb/table/integration/BulkUpsertTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;

import tech.ydb.core.grpc.GrpcReadStream;
Expand All @@ -22,12 +23,14 @@
import tech.ydb.table.impl.SimpleTableClient;
import tech.ydb.table.query.BulkUpsertData;
import tech.ydb.table.query.Params;
import tech.ydb.table.query.arrow.ApacheArrowData;
import tech.ydb.table.query.arrow.ApacheArrowWriter;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.rpc.grpc.GrpcTableRpc;
import tech.ydb.table.settings.ExecuteScanQuerySettings;
import tech.ydb.table.settings.ExecuteSchemeQuerySettings;
import tech.ydb.table.values.ListValue;
import tech.ydb.table.values.PrimitiveType;
import tech.ydb.table.values.PrimitiveValue;
import tech.ydb.test.junit4.GrpcTransportRule;

Expand Down Expand Up @@ -160,11 +163,57 @@ public void writeProtobufToColumnShardTable() {
Assert.assertEquals(5000, rows2Count);
}

@Test
@Ignore("https://github.com/ydb-platform/ydb/issues/36331")
public void writeApacheArrowEmptyStringToDataShardTest() {
// Create table
TableDescription table = TableDescription.newBuilder()
.addNullableColumn("id", PrimitiveType.Int32)
.addNullableColumn("text", PrimitiveType.Text)
.setPrimaryKey("id")
.build();

createTable(table);

ApacheArrowWriter.Schema schema = ApacheArrowWriter.newSchema()
.addNullableColumn("id", PrimitiveType.Int32)
.addNullableColumn("text", PrimitiveType.Text);

try (BufferAllocator allocator = new RootAllocator()) {
try (ApacheArrowWriter writer = schema.createWriter(allocator)) {
// create batch with estimated size
ApacheArrowWriter.Batch batch = writer.createNewBatch(1);
ApacheArrowWriter.Row row = batch.writeNextRow();
row.writeInt32("id", 1);
row.writeText("text", "");
ApacheArrowData data = batch.buildBatch();
bulkUpsert(data);
} catch (IOException ex) {
throw new AssertionError("Cannot serialize apache arrow", ex);
}
}

retryCtx.supplyStatus(session -> {
GrpcReadStream<ResultSetReader> stream = session.executeScanQuery(
"SELECT * FROM `" + TEST_TABLE + "`;",
Params.empty(),
ExecuteScanQuerySettings.newBuilder().build()
);

return stream.start((rs) -> {
Assert.assertTrue(rs.next());
Assert.assertEquals(1, rs.getColumn("id").getInt32());
Assert.assertEquals("", rs.getColumn("text").getText());
Assert.assertFalse(rs.next());
});
}).join().expectSuccess("cannot read table");
}

@Test
public void writeApacheArrowToDataShardTest() {
// Create table
TableDescription table = AllTypesRecord.createTableDescription(false);
retryCtx.supplyStatus(s -> s.createTable(tablePath(), table)).join().expectSuccess("Cannot create table");
createTable(table);

Set<String> columnNames = table.getColumns().stream().map(TableColumn::getName).collect(Collectors.toSet());

Expand Down Expand Up @@ -210,7 +259,7 @@ public void writeApacheArrowToDataShardTest() {
public void writeApacheArrowToColumnShardTest() {
// Create table
TableDescription table = AllTypesRecord.createTableDescription(true);
retryCtx.supplyStatus(s -> s.createTable(tablePath(), table)).join().expectSuccess("Cannot create table");
createTable(table);

Set<String> columnNames = table.getColumns().stream().map(TableColumn::getName).collect(Collectors.toSet());

Expand Down
Loading