Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,10 @@ jobs:
org.apache.comet.exec.CometNativeReaderSuite
org.apache.comet.CometIcebergNativeSuite
org.apache.comet.CometIcebergRewriteActionSuite
org.apache.comet.CometIcebergWriteActionSuite
org.apache.comet.CometIcebergWriteDetectionSuite
org.apache.comet.iceberg.IcebergReflectionSuite
org.apache.comet.serde.operator.IcebergWriteProtoTranslationSuite
org.apache.comet.csv.CometCsvNativeReadSuite
org.apache.comet.CometFuzzTestSuite
org.apache.comet.CometFuzzIcebergSuite
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ jobs:
org.apache.comet.exec.CometNativeReaderSuite
org.apache.comet.CometIcebergNativeSuite
org.apache.comet.CometIcebergRewriteActionSuite
org.apache.comet.CometIcebergWriteActionSuite
org.apache.comet.CometIcebergWriteDetectionSuite
org.apache.comet.iceberg.IcebergReflectionSuite
org.apache.comet.serde.operator.IcebergWriteProtoTranslationSuite
org.apache.comet.csv.CometCsvNativeReadSuite
org.apache.comet.CometFuzzTestSuite
org.apache.comet.CometFuzzIcebergSuite
Expand Down
300 changes: 300 additions & 0 deletions docs/source/user-guide/latest/iceberg-writes.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docs/source/user-guide/latest/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ to read more.
:hidden:

Iceberg Guide <iceberg>
Iceberg Writes <iceberg-writes>
Kubernetes Guide <kubernetes>

.. toctree::
Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ fn op_name(op: &OpStruct) -> &'static str {
OpStruct::Window(_) => "Window",
OpStruct::NativeScan(_) => "NativeScan",
OpStruct::IcebergScan(_) => "IcebergScan",
OpStruct::IcebergWrite(_) => "IcebergWrite",
OpStruct::ParquetWriter(_) => "ParquetWriter",
OpStruct::Explode(_) => "Explode",
OpStruct::CsvScan(_) => "CsvScan",
Expand Down
63 changes: 63 additions & 0 deletions native/core/src/execution/operators/iceberg_common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Helpers shared between the Iceberg scan and Iceberg write operators.

use std::collections::HashMap;
use std::sync::Arc;

use datafusion::common::DataFusionError;
use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
use iceberg_storage_opendal::OpenDalStorageFactory;

/// Pick an OpenDAL storage backend from a URI's scheme. `file` (or no scheme) falls through to
/// the local file system. `memory` is used by the write path to assemble manifest bytes that
/// stay entirely in-process.
pub(crate) fn storage_factory_for(path: &str) -> Result<Arc<dyn StorageFactory>, DataFusionError> {
let scheme = if path.contains("://") {
path.split("://").next().unwrap_or("file")
} else {
"file"
};
match scheme {
"file" => Ok(Arc::new(OpenDalStorageFactory::Fs)),
"memory" => Ok(Arc::new(OpenDalStorageFactory::Memory)),
"s3" | "s3a" => Ok(Arc::new(OpenDalStorageFactory::S3 {
customized_credential_load: None,
})),
"gs" => Ok(Arc::new(OpenDalStorageFactory::Gcs)),
"oss" => Ok(Arc::new(OpenDalStorageFactory::Oss)),
_ => Err(DataFusionError::Execution(format!(
"Unsupported storage scheme: {scheme}"
))),
}
}

/// Build a `FileIO` whose storage scheme is inferred from `reference_path` and whose properties
/// come from the catalog. The reference path is the metadata location for reads or the data
/// location for writes — anything that carries the right URI scheme.
pub(crate) fn load_file_io(
catalog_properties: &HashMap<String, String>,
reference_path: &str,
) -> Result<FileIO, DataFusionError> {
let factory = storage_factory_for(reference_path)?;
let mut file_io_builder = FileIOBuilder::new(factory);
for (key, value) in catalog_properties {
file_io_builder = file_io_builder.with_prop(key, value);
}
Ok(file_io_builder.build())
}
38 changes: 2 additions & 36 deletions native/core/src/execution/operators/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ use datafusion::physical_plan::{
};
use futures::{Stream, StreamExt, TryStreamExt};
use iceberg::arrow::ScanMetrics;
use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
use iceberg_storage_opendal::OpenDalStorageFactory;

use crate::execution::operators::iceberg_common::load_file_io;
use crate::execution::operators::ExecutionError;
use crate::parquet::parquet_support::SparkParquetOptions;
use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory;
Expand Down Expand Up @@ -154,7 +153,7 @@ impl IcebergScanExec {
context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
let output_schema = Arc::clone(&self.output_schema);
let file_io = Self::load_file_io(&self.catalog_properties, &self.metadata_location)?;
let file_io = load_file_io(&self.catalog_properties, &self.metadata_location)?;
let batch_size = context.session_config().batch_size();

let metrics = IcebergScanMetrics::new(&self.metrics);
Expand Down Expand Up @@ -198,39 +197,6 @@ impl IcebergScanExec {

Ok(Box::pin(wrapped_stream))
}

fn storage_factory_for(path: &str) -> Result<Arc<dyn StorageFactory>, DataFusionError> {
let scheme = if path.contains("://") {
path.split("://").next().unwrap_or("file")
} else {
"file"
};
match scheme {
"file" => Ok(Arc::new(OpenDalStorageFactory::Fs)),
"s3" | "s3a" => Ok(Arc::new(OpenDalStorageFactory::S3 {
customized_credential_load: None,
})),
"gs" => Ok(Arc::new(OpenDalStorageFactory::Gcs)),
"oss" => Ok(Arc::new(OpenDalStorageFactory::Oss)),
_ => Err(DataFusionError::Execution(format!(
"Unsupported storage scheme: {scheme}"
))),
}
}

fn load_file_io(
catalog_properties: &HashMap<String, String>,
metadata_location: &str,
) -> Result<FileIO, DataFusionError> {
let factory = Self::storage_factory_for(metadata_location)?;
let mut file_io_builder = FileIOBuilder::new(factory);

for (key, value) in catalog_properties {
file_io_builder = file_io_builder.with_prop(key, value);
}

Ok(file_io_builder.build())
}
}

/// Metrics for IcebergScanExec
Expand Down
Loading