Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,7 @@ class VeloxIcebergSuite extends IcebergSuite {
val lastExecId = statusStore.executionsList().last.executionId
val executionMetrics = statusStore.executionMetrics(lastExecId)

// TODO: fix https://github.com/apache/gluten/issues/11510
assert(executionMetrics(metrics("numWrittenFiles").id).toLong == 0)
assert(executionMetrics(metrics("numWrittenFiles").id).toLong == 1)
}
}

Expand Down
6 changes: 2 additions & 4 deletions cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,8 @@ if(ENABLE_GPU)
memory/GpuBufferColumnarBatch.cc)
endif()

if(ENABLE_ENHANCED_FEATURES)
list(APPEND VELOX_SRCS compute/iceberg/IcebergFormat.cc
compute/iceberg/IcebergWriter.cc)
endif()
list(APPEND VELOX_SRCS compute/iceberg/IcebergFormat.cc
compute/iceberg/IcebergWriter.cc)

if(BUILD_TESTS OR BUILD_BENCHMARKS)
set(BUILD_TEST_UTILS ON)
Expand Down
8 changes: 8 additions & 0 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include "velox/connectors/hive/BufferedInputBuilder.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveDataSource.h"
#include "velox/connectors/hive/iceberg/IcebergConnector.h"
#include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h" // @manual
#include "velox/connectors/hive/storage_adapters/gcs/RegisterGcsFileSystem.h" // @manual
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h"
Expand Down Expand Up @@ -330,6 +331,13 @@ std::shared_ptr<facebook::velox::connector::Connector> VeloxBackend::createDelta
return std::make_shared<delta::DeltaConnector>(connectorId, hiveConnectorConfig_, ioExecutor);
}

std::shared_ptr<facebook::velox::connector::Connector> VeloxBackend::createIcebergConnector(
const std::string& connectorId,
folly::Executor* ioExecutor) const {
return std::make_shared<velox::connector::hive::iceberg::IcebergConnector>(
connectorId, hiveConnectorConfig_, ioExecutor);
}

std::shared_ptr<facebook::velox::connector::Connector> VeloxBackend::createValueStreamConnector(
const std::string& connectorId,
bool dynamicFilterEnabled) const {
Expand Down
4 changes: 4 additions & 0 deletions cpp/velox/compute/VeloxBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ class VeloxBackend {
const std::string& connectorId,
folly::Executor* ioExecutor) const;

std::shared_ptr<facebook::velox::connector::Connector> createIcebergConnector(
const std::string& connectorId,
folly::Executor* ioExecutor) const;

std::shared_ptr<facebook::velox::connector::Connector> createDeltaConnector(
const std::string& connectorId,
folly::Executor* ioExecutor) const;
Expand Down
2 changes: 2 additions & 0 deletions cpp/velox/compute/VeloxConnectorIds.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ namespace gluten {

struct VeloxConnectorIds {
std::string hive;
std::string iceberg;
std::string delta;
std::string iterator;
std::string cudfHive;
bool hiveRegistered{false};
bool icebergRegistered{false};
bool deltaRegistered{false};
bool iteratorRegistered{false};
bool cudfHiveRegistered{false};
Expand Down
15 changes: 13 additions & 2 deletions cpp/velox/compute/VeloxRuntime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ std::string makeScopedConnectorId(const std::string& base, uint64_t runtimeId) {
VeloxConnectorIds makeScopedConnectorIds(uint64_t runtimeId) {
return VeloxConnectorIds{
.hive = makeScopedConnectorId(kHiveConnectorId, runtimeId),
.iceberg = makeScopedConnectorId(kIcebergConnectorId, runtimeId),
.delta = makeScopedConnectorId(delta::DeltaConnectorFactory::kDeltaConnectorName, runtimeId),
.iterator = makeScopedConnectorId(kIteratorConnectorId, runtimeId),
.cudfHive = makeScopedConnectorId(kCudfHiveConnectorId, runtimeId)};
Expand Down Expand Up @@ -273,6 +274,14 @@ void VeloxRuntime::registerConnectors() {
velox::connector::hasConnector(connectorIds_.hive),
"Scoped hive connector not found after registration: " + connectorIds_.hive);

connectorIds_.icebergRegistered =
velox::connector::registerConnector(backend->createIcebergConnector(connectorIds_.iceberg, ioExecutor_.get()));
GLUTEN_CHECK(
connectorIds_.icebergRegistered, "Failed to register scoped Iceberg connector: " + connectorIds_.iceberg);
GLUTEN_CHECK(
velox::connector::hasConnector(connectorIds_.iceberg),
"Scoped Iceberg connector not found after registration: " + connectorIds_.iceberg);

connectorIds_.deltaRegistered =
velox::connector::registerConnector(backend->createDeltaConnector(connectorIds_.delta, ioExecutor_.get()));
GLUTEN_CHECK(connectorIds_.deltaRegistered, "Failed to register scoped delta connector: " + connectorIds_.delta);
Expand Down Expand Up @@ -323,6 +332,10 @@ void VeloxRuntime::unregisterConnectors() {
velox::connector::unregisterConnector(connectorIds_.hive);
connectorIds_.hiveRegistered = false;
}
if (connectorIds_.icebergRegistered) {
velox::connector::unregisterConnector(connectorIds_.iceberg);
connectorIds_.icebergRegistered = false;
}
}

void VeloxRuntime::parsePlan(const uint8_t* data, int32_t size) {
Expand Down Expand Up @@ -501,7 +514,6 @@ std::shared_ptr<RowToColumnarConverter> VeloxRuntime::createRow2ColumnarConverte
return std::make_shared<VeloxRowToColumnarConverter>(cSchema, veloxPool);
}

#ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
std::shared_ptr<IcebergWriter> VeloxRuntime::createIcebergWriter(
RowTypePtr rowType,
int32_t format,
Expand Down Expand Up @@ -529,7 +541,6 @@ std::shared_ptr<IcebergWriter> VeloxRuntime::createIcebergWriter(
veloxPool,
connectorPool);
}
#endif

std::shared_ptr<ShuffleWriter> VeloxRuntime::createShuffleWriter(
int32_t numPartitions,
Expand Down
7 changes: 0 additions & 7 deletions cpp/velox/compute/VeloxRuntime.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,15 @@
#include "WholeStageResultIterator.h"
#include "compute/Runtime.h"
#include "compute/VeloxConnectorIds.h"
#ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
#include "iceberg/IcebergWriter.h"
#endif
#include <folly/Executor.h>
#include "memory/VeloxMemoryManager.h"
#include "operators/serializer/VeloxColumnarBatchSerializer.h"
#include "operators/serializer/VeloxColumnarToRowConverter.h"
#include "operators/writer/VeloxParquetDataSource.h"
#include "shuffle/ShuffleReader.h"
#include "shuffle/ShuffleWriter.h"

#ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
#include "IcebergNestedField.pb.h"
#endif

namespace gluten {

Expand Down Expand Up @@ -76,7 +71,6 @@ class VeloxRuntime final : public Runtime {

std::shared_ptr<RowToColumnarConverter> createRow2ColumnarConverter(struct ArrowSchema* cSchema) override;

#ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
std::shared_ptr<IcebergWriter> createIcebergWriter(
RowTypePtr rowType,
int32_t format,
Expand All @@ -88,7 +82,6 @@ class VeloxRuntime final : public Runtime {
std::shared_ptr<const facebook::velox::connector::hive::iceberg::IcebergPartitionSpec> spec,
const gluten::IcebergNestedField& protoField,
const std::unordered_map<std::string, std::string>& sparkConfs);
#endif

std::shared_ptr<ShuffleWriter> createShuffleWriter(
int numPartitions,
Expand Down
3 changes: 2 additions & 1 deletion cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ WholeStageResultIterator::WholeStageResultIterator(
std::unordered_map<std::string, std::string> customSplitInfo{{"table_format", "hive-iceberg"}};
auto deleteFiles = icebergSplitInfo->deleteFilesVec[idx];
split = std::make_shared<velox::connector::hive::iceberg::HiveIcebergSplit>(
connectorIds_.hive,
connectorIds_.iceberg,
paths[idx],
format,
starts[idx],
Expand Down Expand Up @@ -219,6 +219,7 @@ std::shared_ptr<velox::core::QueryCtx> WholeStageResultIterator::createNewVeloxQ
std::unordered_map<std::string, std::shared_ptr<velox::config::ConfigBase>> connectorConfigs;
auto hiveSessionConfig = createHiveConnectorSessionConfig(veloxCfg_);
connectorConfigs[connectorIds_.hive] = hiveSessionConfig;
connectorConfigs[connectorIds_.iceberg] = hiveSessionConfig;
connectorConfigs[connectorIds_.delta] = hiveSessionConfig;
connectorConfigs[connectorIds_.iterator] = hiveSessionConfig;
#ifdef GLUTEN_ENABLE_GPU
Expand Down
54 changes: 33 additions & 21 deletions cpp/velox/compute/iceberg/IcebergWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

#include "IcebergWriter.h"

#include "IcebergNestedField.pb.h"
#include "IcebergPartitionSpec.pb.h"
#include "compute/ProtobufUtils.h"
#include "compute/VeloxBackend.h"
#include "compute/iceberg/IcebergFormat.h"
#include "config/VeloxConfig.h"
#include "utils/ConfigExtractor.h"
Expand Down Expand Up @@ -99,9 +101,9 @@ class GlutenIcebergFileNameGenerator : public connector::hive::FileNameGenerator
mutable int32_t fileCount_;
};

iceberg::IcebergNestedField convertToIcebergNestedField(const gluten::IcebergNestedField& protoField) {
IcebergNestedField result;
result.id = protoField.id();
parquet::ParquetFieldId convertToIcebergNestedField(const gluten::IcebergNestedField& protoField) {
parquet::ParquetFieldId result;
result.fieldId = protoField.id();

// Recursively convert children
result.children.reserve(protoField.children_size());
Expand All @@ -121,7 +123,7 @@ std::shared_ptr<IcebergInsertTableHandle> createIcebergInsertTableHandle(
int64_t taskId,
const std::string& operationId,
std::shared_ptr<const IcebergPartitionSpec> spec,
const iceberg::IcebergNestedField& nestedField,
const parquet::ParquetFieldId& nestedField,
facebook::velox::memory::MemoryPool* pool) {
std::vector<std::shared_ptr<const iceberg::IcebergColumnHandle>> columnHandles;

Expand All @@ -139,14 +141,12 @@ std::shared_ptr<IcebergInsertTableHandle> createIcebergInsertTableHandle(
columnNames.at(i),
connector::hive::HiveColumnHandle::ColumnType::kPartitionKey,
columnTypes.at(i),
columnTypes.at(i),
nestedField.children[i]));
} else {
columnHandles.push_back(std::make_shared<iceberg::IcebergColumnHandle>(
columnNames.at(i),
connector::hive::HiveColumnHandle::ColumnType::kRegular,
columnTypes.at(i),
columnTypes.at(i),
nestedField.children[i]));
}
}
Expand All @@ -157,18 +157,9 @@ std::shared_ptr<IcebergInsertTableHandle> createIcebergInsertTableHandle(
std::shared_ptr<const connector::hive::LocationHandle> locationHandle =
std::make_shared<connector::hive::LocationHandle>(
outputDirectoryPath, outputDirectoryPath, connector::hive::LocationHandle::TableType::kExisting);
const std::vector<IcebergSortingColumn> sortedBy;
const std::unordered_map<std::string, std::string> serdeParameters;
return std::make_shared<connector::hive::iceberg::IcebergInsertTableHandle>(
columnHandles,
locationHandle,
spec,
pool,
fileFormat,
sortedBy,
compressionKind,
serdeParameters,
fileNameGenerator);
columnHandles, locationHandle, fileFormat, spec, compressionKind, serdeParameters, fileNameGenerator);
}

} // namespace
Expand Down Expand Up @@ -200,20 +191,36 @@ IcebergWriter::IcebergWriter(
connectorSessionProperties_ = createHiveConnectorSessionConfig(veloxCfg);
connectorConfig_ =
std::make_shared<facebook::velox::connector::hive::HiveConfig>(createHiveConnectorConfig(veloxCfg));
std::unordered_map<std::string, std::shared_ptr<facebook::velox::config::ConfigBase>> connectorConfigs;
connectorConfigs[kHiveConnectorId] = connectorSessionProperties_;
auto queryConfigBase =
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string, std::string>(sparkConfs));
queryCtx_ = facebook::velox::core::QueryCtx::create(
nullptr,
facebook::velox::core::QueryConfig{facebook::velox::core::QueryConfig::ConfigTag{}, queryConfigBase},
connectorConfigs,
nullptr, // cache
pool_,
nullptr, // spillExecutor
"IcebergWriter");

auto expressionEvaluator =
std::make_unique<facebook::velox::exec::SimpleExpressionEvaluator>(queryCtx_.get(), pool_.get());

connectorQueryCtx_ = std::make_unique<connector::ConnectorQueryCtx>(
pool_.get(),
connectorPool_.get(),
connectorSessionProperties_.get(),
nullptr,
common::PrefixSortConfig(),
nullptr,
std::move(expressionEvaluator),
nullptr,
"query.IcebergDataSink",
"task.IcebergDataSink",
"planNodeId.IcebergDataSink",
0,
"");

auto icebergConfig = std::make_shared<facebook::velox::connector::hive::iceberg::IcebergConfig>(veloxCfg);
dataSink_ = std::make_unique<IcebergDataSink>(
rowType_,
createIcebergInsertTableHandle(
Expand All @@ -229,7 +236,9 @@ IcebergWriter::IcebergWriter(
pool_.get()),
connectorQueryCtx_.get(),
facebook::velox::connector::CommitStrategy::kNoCommit,
connectorConfig_);
connectorConfig_,
icebergConfig);
dataSink_.get();
}

void IcebergWriter::write(const VeloxColumnarBatch& batch) {
Expand All @@ -238,10 +247,13 @@ void IcebergWriter::write(const VeloxColumnarBatch& batch) {

if (inputRowType->size() != rowType_->size()) {
const auto& children = inputRowVector->children();

VELOX_CHECK_GE(children.size(), 1 + rowType_->size());

std::vector<VectorPtr> dataColumns(children.begin() + 1, children.begin() + 1 + rowType_->size());

auto filteredRowVector = std::make_shared<RowVector>(
pool_.get(), rowType_, inputRowVector->nulls(), inputRowVector->size(), std::move(dataColumns));
auto filteredRowVector =
std::make_shared<RowVector>(pool_.get(), rowType_, nullptr, inputRowVector->size(), std::move(dataColumns));

dataSink_->appendData(filteredRowVector);
} else {
Expand Down
5 changes: 3 additions & 2 deletions cpp/velox/compute/iceberg/IcebergWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class IcebergWriter {
int64_t taskId,
const std::string& operationId,
std::shared_ptr<const facebook::velox::connector::hive::iceberg::IcebergPartitionSpec> spec,
const gluten::IcebergNestedField& field,
const IcebergNestedField& field,
const std::unordered_map<std::string, std::string>& sparkConfs,
std::shared_ptr<facebook::velox::memory::MemoryPool> memoryPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> connectorPool);
Expand All @@ -60,7 +60,7 @@ class IcebergWriter {

private:
facebook::velox::RowTypePtr rowType_;
const facebook::velox::connector::hive::iceberg::IcebergNestedField field_;
const facebook::velox::parquet::ParquetFieldId field_;
int32_t partitionId_;
int64_t taskId_;
std::string operationId_;
Expand All @@ -69,6 +69,7 @@ class IcebergWriter {
std::shared_ptr<facebook::velox::connector::hive::HiveConfig> connectorConfig_;
std::shared_ptr<facebook::velox::config::ConfigBase> connectorSessionProperties_;

std::shared_ptr<facebook::velox::core::QueryCtx> queryCtx_;
std::unique_ptr<facebook::velox::connector::ConnectorQueryCtx> connectorQueryCtx_;

std::unique_ptr<facebook::velox::connector::hive::iceberg::IcebergDataSink> dataSink_;
Expand Down
2 changes: 2 additions & 0 deletions cpp/velox/config/VeloxConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ const std::string kVeloxMemReclaimMaxWaitMs = "spark.gluten.sql.columnar.backend
const uint64_t kVeloxMemReclaimMaxWaitMsDefault = 3600000; // 60min

const std::string kHiveConnectorId = "test-hive";
const std::string kIcebergConnectorId = "test-iceberg";

const std::string kVeloxCacheEnabled = "spark.gluten.sql.columnar.backend.velox.cacheEnabled";

const std::string kExprMaxCompiledRegexes = "spark.gluten.sql.columnar.backend.velox.maxCompiledRegexes";
Expand Down
10 changes: 0 additions & 10 deletions cpp/velox/jni/VeloxJniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@
#include "utils/GpuBufferBatchResizer.h"
#endif

#ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
#include "IcebergNestedField.pb.h"
#endif

using namespace gluten;
using namespace facebook;

Expand Down Expand Up @@ -844,11 +840,7 @@ Java_org_apache_gluten_vectorized_UnifflePartitionWriterJniWrapper_createPartiti
JNIEXPORT jboolean JNICALL Java_org_apache_gluten_config_ConfigJniWrapper_isEnhancedFeaturesEnabled( // NOLINT
JNIEnv* env,
jclass) {
#ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
return true;
#else
return false;
#endif
}

#ifdef GLUTEN_ENABLE_GPU
Expand All @@ -867,7 +859,6 @@ JNIEXPORT jboolean JNICALL Java_org_apache_gluten_cudf_VeloxCudfPlanValidatorJni
}
#endif

#ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
JNIEXPORT jlong JNICALL Java_org_apache_gluten_execution_IcebergWriteJniWrapper_init( // NOLINT
JNIEnv* env,
jobject wrapper,
Expand Down Expand Up @@ -955,7 +946,6 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_execution_IcebergWriteJniWrappe

JNI_METHOD_END(nullptr)
}
#endif

JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_nativeBuild( // NOLINT
JNIEnv* env,
Expand Down
4 changes: 4 additions & 0 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "TypeUtils.h"
#include "VariantToVectorConverter.h"
#include "compute/iceberg/IcebergPlanConverter.h"
#include "jni/JniHashTable.h"
#include "operators/hashjoin/HashTableBuilder.h"
#include "operators/plannodes/RowVectorStream.h"
Expand Down Expand Up @@ -1574,6 +1575,9 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
connector::ConnectorTableHandlePtr tableHandle;
auto remainingFilter = readRel.has_filter() ? exprConverter_->toVeloxExpr(readRel.filter(), baseSchema) : nullptr;
auto connectorId = connectorIds_.hive;
if (std::dynamic_pointer_cast<IcebergSplitInfo>(splitInfo)) {
connectorId = connectorIds_.iceberg;
}
if (useCudfTableHandle(splitInfos_) && veloxCfg_->get<bool>(kCudfEnableTableScan, kCudfEnableTableScanDefault) &&
veloxCfg_->get<bool>(kCudfEnabled, kCudfEnabledDefault)) {
#ifdef GLUTEN_ENABLE_GPU
Expand Down
Loading
Loading