Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_ICEBERG_DATA_FILE_CONCURRENCY_LIMIT: ConfigEntry[Int] =
conf("spark.comet.scan.icebergNative.dataFileConcurrencyLimit")
.category(CATEGORY_SCAN)
.doc(
"The number of Iceberg data files to read concurrently within a single task. " +
"Higher values improve throughput for tables with many small files by overlapping " +
"I/O latency, but increase memory usage. Values between 2 and 8 are suggested.")
.intConf
.checkValue(v => v > 0, "Data file concurrency limit must be positive")
.createWithDefault(1)

val COMET_CSV_V2_NATIVE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.scan.csv.v2.enabled")
.category(CATEGORY_TESTING)
Expand Down
6 changes: 5 additions & 1 deletion native/core/src/execution/operators/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ pub struct IcebergScanExec {
catalog_properties: HashMap<String, String>,
/// Pre-planned file scan tasks
tasks: Vec<FileScanTask>,
/// Number of data files to read concurrently
data_file_concurrency_limit: usize,
/// Metrics
metrics: ExecutionPlanMetricsSet,
}
Expand All @@ -71,6 +73,7 @@ impl IcebergScanExec {
schema: SchemaRef,
catalog_properties: HashMap<String, String>,
tasks: Vec<FileScanTask>,
data_file_concurrency_limit: usize,
) -> Result<Self, ExecutionError> {
let output_schema = schema;
let plan_properties = Self::compute_properties(Arc::clone(&output_schema), 1);
Expand All @@ -83,6 +86,7 @@ impl IcebergScanExec {
plan_properties,
catalog_properties,
tasks,
data_file_concurrency_limit,
metrics,
})
}
Expand Down Expand Up @@ -158,7 +162,7 @@ impl IcebergScanExec {

let reader = iceberg::arrow::ArrowReaderBuilder::new(file_io)
.with_batch_size(batch_size)
.with_data_file_concurrency_limit(context.session_config().target_partitions())
.with_data_file_concurrency_limit(self.data_file_concurrency_limit)
.with_row_selection_enabled(true)
.build();

Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1176,12 +1176,14 @@ impl PhysicalPlanner {
.collect();
let metadata_location = common.metadata_location.clone();
let tasks = parse_file_scan_tasks_from_common(common, &scan.file_scan_tasks)?;
let data_file_concurrency_limit = common.data_file_concurrency_limit as usize;

let iceberg_scan = IcebergScanExec::new(
metadata_location,
required_schema,
catalog_properties,
tasks,
data_file_concurrency_limit,
)?;

Ok((
Expand Down
3 changes: 3 additions & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ message IcebergScanCommon {
repeated PartitionData partition_data_pool = 9;
repeated DeleteFileList delete_files_pool = 10;
repeated spark.spark_expression.Expr residual_pool = 11;

// Number of data files to read concurrently within a single task
uint32 data_file_concurrency_limit = 12;
}

message IcebergScan {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeExec}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceRDD, DataSourceRDDPartition}
import org.apache.spark.sql.types._

import org.apache.comet.ConfigEntry
import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection}
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.ExprOuterClass.Expr
Expand Down Expand Up @@ -757,6 +757,8 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit
var totalTasks = 0

commonBuilder.setMetadataLocation(metadata.metadataLocation)
commonBuilder.setDataFileConcurrencyLimit(
CometConf.COMET_ICEBERG_DATA_FILE_CONCURRENCY_LIMIT.get())
metadata.catalogProperties.foreach { case (key, value) =>
commonBuilder.putCatalogProperties(key, value)
}
Expand Down
Loading