Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 67 additions & 53 deletions native/core/src/execution/operators/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@

use std::{
any::Any,
collections::HashMap,
fmt,
fmt::{Debug, Formatter},
fs::File,
io::Cursor,
sync::Arc,
};

use opendal::{services::Hdfs, Operator};
use url::Url;
use opendal::Operator;

use crate::execution::shuffle::CompressionCodec;
use crate::parquet::parquet_support::{
create_hdfs_operator, is_hdfs_scheme, prepare_object_store_with_configs,
};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
Expand All @@ -50,8 +54,7 @@ use parquet::{
basic::{Compression, ZstdLevel},
file::properties::WriterProperties,
};

use crate::execution::shuffle::CompressionCodec;
use url::Url;

/// Enum representing different types of Arrow writers based on storage backend
enum ParquetWriter {
Expand Down Expand Up @@ -200,6 +203,8 @@ pub struct ParquetWriterExec {
partition_id: i32,
/// Column names to use in the output Parquet file
column_names: Vec<String>,
/// Object store configuration options
object_store_options: HashMap<String, String>,
/// Metrics
metrics: ExecutionPlanMetricsSet,
/// Cache for plan properties
Expand All @@ -218,6 +223,7 @@ impl ParquetWriterExec {
compression: CompressionCodec,
partition_id: i32,
column_names: Vec<String>,
object_store_options: HashMap<String, String>,
) -> Result<Self> {
// Preserve the input's partitioning so each partition writes its own file
let input_partitioning = input.output_partitioning().clone();
Expand All @@ -238,6 +244,7 @@ impl ParquetWriterExec {
compression,
partition_id,
column_names,
object_store_options,
metrics: ExecutionPlanMetricsSet::new(),
cache,
})
Expand All @@ -255,10 +262,11 @@ impl ParquetWriterExec {
/// Create an Arrow writer based on the storage scheme
///
/// # Arguments
/// * `storage_scheme` - The storage backend ("hdfs", "s3", or "local")
/// * `output_file_path` - The full path to the output file
/// * `schema` - The Arrow schema for the Parquet file
/// * `props` - Writer properties including compression
/// * `runtime_env` - Runtime environment for object store registration
/// * `object_store_options` - Configuration options for object store
///
/// # Returns
/// * `Ok(ParquetWriter)` - A writer appropriate for the storage scheme
Expand All @@ -267,71 +275,61 @@ impl ParquetWriterExec {
output_file_path: &str,
schema: SchemaRef,
props: WriterProperties,
runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
object_store_options: &HashMap<String, String>,
) -> Result<ParquetWriter> {
// Determine storage scheme from output_file_path
let storage_scheme = if output_file_path.starts_with("hdfs://") {
"hdfs"
} else if output_file_path.starts_with("s3://") || output_file_path.starts_with("s3a://") {
"s3"
} else {
"local"
};
// Parse URL and match on storage scheme directly
let url = Url::parse(output_file_path).map_err(|e| {
DataFusionError::Execution(format!("Failed to parse URL '{}': {}", output_file_path, e))
})?;

match storage_scheme {
"hdfs" => {
// Parse the output_file_path to extract namenode and path
// Expected format: hdfs://namenode:port/path/to/file
let url = Url::parse(output_file_path).map_err(|e| {
if is_hdfs_scheme(&url, object_store_options) {
// HDFS storage
{
// Use prepare_object_store_with_configs to create and register the object store
let (_object_store_url, object_store_path) = prepare_object_store_with_configs(
runtime_env,
output_file_path.to_string(),
object_store_options,
)
.map_err(|e| {
DataFusionError::Execution(format!(
"Failed to parse HDFS URL '{}': {}",
"Failed to prepare object store for '{}': {}",
output_file_path, e
))
})?;

// Extract namenode (scheme + host + port)
let namenode = format!(
"{}://{}{}",
url.scheme(),
url.host_str().unwrap_or("localhost"),
url.port()
.map(|p| format!(":{}", p))
.unwrap_or_else(|| ":9000".to_string())
);

// Extract the path (without the scheme and host)
let hdfs_path = url.path().to_string();

// For remote storage (HDFS, S3), write to an in-memory buffer
let buffer = Vec::new();
let cursor = Cursor::new(buffer);
let arrow_parquet_buffer_writer = ArrowWriter::try_new(cursor, schema, Some(props))
.map_err(|e| {
DataFusionError::Execution(format!(
"Failed to create {} writer: {}",
storage_scheme, e
))
DataFusionError::Execution(format!("Failed to create HDFS writer: {}", e))
})?;

let builder = Hdfs::default().name_node(&namenode);
let op = Operator::new(builder)
.map_err(|e| {
DataFusionError::Execution(format!(
"Failed to create HDFS operator for '{}' (namenode: {}): {}",
output_file_path, namenode, e
))
})?
.finish();
// Create HDFS operator with configuration options using the helper function
let op = create_hdfs_operator(&url).map_err(|e| {
DataFusionError::Execution(format!(
"Failed to create HDFS operator for '{}': {}",
output_file_path, e
))
})?;

// HDFS writer will be created lazily on first write
// Use only the path part for the HDFS writer
// Use the path from prepare_object_store_with_configs
Ok(ParquetWriter::Remote(
arrow_parquet_buffer_writer,
None,
op,
hdfs_path,
object_store_path.to_string(),
))
}
"local" => {
} else if output_file_path.starts_with("file://")
|| output_file_path.starts_with("file:")
|| !output_file_path.contains("://")
{
// Local file system
{
// For a local file system, write directly to file
// Strip file:// or file: prefix if present
let local_path = output_file_path
Expand Down Expand Up @@ -368,10 +366,12 @@ impl ParquetWriterExec {
})?;
Ok(ParquetWriter::LocalFile(writer))
}
_ => Err(DataFusionError::Execution(format!(
"Unsupported storage scheme: {}",
storage_scheme
))),
} else {
// Unsupported storage scheme
Err(DataFusionError::Execution(format!(
"Unsupported storage scheme in path: {}",
output_file_path
)))
}
}
}
Expand Down Expand Up @@ -435,6 +435,7 @@ impl ExecutionPlan for ParquetWriterExec {
self.compression.clone(),
self.partition_id,
self.column_names.clone(),
self.object_store_options.clone(),
)?)),
_ => Err(DataFusionError::Internal(
"ParquetWriterExec requires exactly one child".to_string(),
Expand All @@ -454,6 +455,7 @@ impl ExecutionPlan for ParquetWriterExec {
let bytes_written = MetricBuilder::new(&self.metrics).counter("bytes_written", partition);
let rows_written = MetricBuilder::new(&self.metrics).counter("rows_written", partition);

let runtime_env = context.runtime_env();
let input = self.input.execute(partition, context)?;
let input_schema = self.input.schema();
let work_dir = self.work_dir.clone();
Expand Down Expand Up @@ -488,7 +490,14 @@ impl ExecutionPlan for ParquetWriterExec {
.set_compression(compression)
.build();

let mut writer = Self::create_arrow_writer(&part_file, Arc::clone(&output_schema), props)?;
let object_store_options = self.object_store_options.clone();
let mut writer = Self::create_arrow_writer(
&part_file,
Arc::clone(&output_schema),
props,
runtime_env,
&object_store_options,
)?;

// Clone schema for use in async closure
let schema_for_write = Arc::clone(&output_schema);
Expand Down Expand Up @@ -732,10 +741,14 @@ mod tests {
// Create ParquetWriter using the create_arrow_writer method
// Use full HDFS URL format
let full_output_path = format!("hdfs://namenode:9000{}", output_path);
let session_ctx = datafusion::prelude::SessionContext::new();
let runtime_env = session_ctx.runtime_env();
let mut writer = ParquetWriterExec::create_arrow_writer(
&full_output_path,
create_test_record_batch(1)?.schema(),
props,
runtime_env,
&HashMap::new(),
)?;

// Write 5 batches in a loop
Expand Down Expand Up @@ -802,6 +815,7 @@ mod tests {
CompressionCodec::None,
0, // partition_id
column_names,
HashMap::new(), // object_store_options
)?;

// Create a session context and execute the plan
Expand Down
7 changes: 7 additions & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,12 @@ impl PhysicalPlanner {
))),
}?;

let object_store_options: HashMap<String, String> = writer
.object_store_options
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();

let parquet_writer = Arc::new(ParquetWriterExec::try_new(
Arc::clone(&child.native_plan),
writer.output_path.clone(),
Expand All @@ -1261,6 +1267,7 @@ impl PhysicalPlanner {
codec,
self.partition,
writer.column_names.clone(),
object_store_options,
)?);

Ok((
Expand Down
22 changes: 14 additions & 8 deletions native/core/src/parquet/parquet_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ fn value_field(entries_field: &FieldRef) -> Option<FieldRef> {
}
}

fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap<String, String>) -> bool {
pub fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap<String, String>) -> bool {
const COMET_LIBHDFS_SCHEMES_KEY: &str = "fs.comet.libhdfs.schemes";
let scheme = url.scheme();
if let Some(libhdfs_schemes) = object_store_configs.get(COMET_LIBHDFS_SCHEMES_KEY) {
Expand Down Expand Up @@ -387,20 +387,26 @@ fn create_hdfs_object_store(
}
}

// Creates an HDFS object store from a URL using OpenDAL
// Creates an OpenDAL HDFS Operator from a URL with optional configuration
#[cfg(feature = "hdfs-opendal")]
fn create_hdfs_object_store(
url: &Url,
) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
pub(crate) fn create_hdfs_operator(url: &Url) -> Result<opendal::Operator, object_store::Error> {
let name_node = get_name_node_uri(url)?;
let builder = opendal::services::Hdfs::default().name_node(&name_node);

let op = opendal::Operator::new(builder)
opendal::Operator::new(builder)
.map_err(|error| object_store::Error::Generic {
store: "hdfs-opendal",
source: error.into(),
})?
.finish();
})
.map(|op| op.finish())
}

// Creates an HDFS object store from a URL using OpenDAL
#[cfg(feature = "hdfs-opendal")]
pub(crate) fn create_hdfs_object_store(
url: &Url,
) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
let op = create_hdfs_operator(url)?;
let store = object_store_opendal::OpendalStore::new(op);
let path = Path::parse(url.path())?;
Ok((Box::new(store), path))
Expand Down
7 changes: 7 additions & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ message ParquetWriter {
optional string job_id = 6;
// Task attempt ID for this specific task
optional int32 task_attempt_id = 7;
// Options for configuring object stores such as AWS S3, GCS, etc. The key-value pairs are taken
// from Hadoop configuration for compatibility with Hadoop FileSystem implementations of object
// stores.
// The configuration values have hadoop. or spark.hadoop. prefix trimmed. For instance, the
// configuration value "spark.hadoop.fs.s3a.access.key" will be stored as "fs.s3a.access.key" in
// the map.
map<string, string> object_store_options = 8;
}

enum AggregateMode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.comet.serde.operator

import java.net.URI
import java.util.Locale

import scala.jdk.CollectionConverters._
Expand All @@ -32,6 +33,7 @@ import org.apache.spark.sql.internal.SQLConf

import org.apache.comet.{CometConf, ConfigEntry, DataTypeSupport}
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.objectstore.NativeConfig
import org.apache.comet.serde.{CometOperatorSerde, Incompatible, OperatorOuterClass, SupportLevel, Unsupported}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde.serializeDataType
Expand Down Expand Up @@ -126,14 +128,24 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
return None
}

val writerOp = OperatorOuterClass.ParquetWriter
val writerOpBuilder = OperatorOuterClass.ParquetWriter
.newBuilder()
.setOutputPath(outputPath)
.setCompression(codec)
.addAllColumnNames(cmd.query.output.map(_.name).asJava)
// Note: work_dir, job_id, and task_attempt_id will be set at execution time
// in CometNativeWriteExec, as they depend on the Spark task context
.build()
// Note: work_dir, job_id, and task_attempt_id will be set at execution time
// in CometNativeWriteExec, as they depend on the Spark task context

// Collect S3/cloud storage configurations
val session = op.session
val hadoopConf = session.sessionState.newHadoopConfWithOptions(cmd.options)
val objectStoreOptions =
NativeConfig.extractObjectStoreOptions(hadoopConf, URI.create(outputPath))
objectStoreOptions.foreach { case (key, value) =>
writerOpBuilder.putObjectStoreOptions(key, value)
}

val writerOp = writerOpBuilder.build()

val writerOperator = Operator
.newBuilder()
Expand Down
Loading