Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 116 additions & 105 deletions native/Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ arrow = { version = "58.1.0", features = ["prettyprint", "ffi", "chrono-tz"] }
async-trait = { version = "0.1" }
bytes = { version = "1.11.1" }
parquet = { version = "58.1.0", default-features = false, features = ["experimental"] }
datafusion = { version = "53.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
datafusion-datasource = { version = "53.0.0" }
datafusion-physical-expr-adapter = { version = "53.0.0" }
datafusion-spark = { version = "53.0.0", features = ["core"] }
datafusion = { git = "https://github.com/mbutrovich/datafusion.git", branch = "dyncomparator", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
datafusion-datasource = { git = "https://github.com/mbutrovich/datafusion.git", branch = "dyncomparator" }
datafusion-physical-expr-adapter = { git = "https://github.com/mbutrovich/datafusion.git", branch = "dyncomparator" }
datafusion-spark = { git = "https://github.com/mbutrovich/datafusion.git", branch = "dyncomparator", features = ["core"] }
datafusion-comet-spark-expr = { path = "spark-expr" }
datafusion-comet-common = { path = "common" }
datafusion-comet-jni-bridge = { path = "jni-bridge" }
Expand Down
2 changes: 1 addition & 1 deletion native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ jni = { version = "0.22.4", features = ["invocation"] }
lazy_static = "1.4"
assertables = "9"
hex = "0.4.3"
datafusion-functions-nested = { version = "53.0.0" }
datafusion-functions-nested = { git = "https://github.com/mbutrovich/datafusion.git", branch = "dyncomparator" }

[features]
backtrace = ["datafusion/backtrace"]
Expand Down
15 changes: 12 additions & 3 deletions native/core/src/execution/operators/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use arrow::array::{RecordBatch, RecordBatchOptions};
use arrow::datatypes::SchemaRef;
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::common::DataFusionError;
use datafusion::physical_expr::{EquivalenceProperties, PhysicalExpr};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
Expand All @@ -29,7 +30,6 @@ use datafusion::{
};
use futures::{Stream, StreamExt};
use std::{
any::Any,
pin::Pin,
sync::Arc,
task::{Context, Poll},
Expand Down Expand Up @@ -91,8 +91,17 @@ impl DisplayAs for ExpandExec {
}

impl ExecutionPlan for ExpandExec {
fn as_any(&self) -> &dyn Any {
self
fn apply_expressions(
&self,
f: &mut dyn FnMut(&dyn PhysicalExpr) -> datafusion::common::Result<TreeNodeRecursion>,
) -> datafusion::common::Result<TreeNodeRecursion> {
let mut tnr = TreeNodeRecursion::Continue;
for projection in &self.projections {
for expr in projection {
tnr = tnr.visit_sibling(|| f(expr.as_ref()))?;
}
}
Ok(tnr)
}

fn schema(&self) -> SchemaRef {
Expand Down
9 changes: 6 additions & 3 deletions native/core/src/execution/operators/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

//! Native Iceberg table scan operator using iceberg-rust

use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::pin::Pin;
Expand All @@ -26,6 +25,7 @@ use std::task::{Context, Poll};

use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions};
use arrow::datatypes::SchemaRef;
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::common::{DataFusionError, Result as DFResult};
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::expressions::Column;
Expand Down Expand Up @@ -108,8 +108,11 @@ impl ExecutionPlan for IcebergScanExec {
"IcebergScanExec"
}

fn as_any(&self) -> &dyn Any {
self
fn apply_expressions(
&self,
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> DFResult<TreeNodeRecursion>,
) -> DFResult<TreeNodeRecursion> {
Ok(TreeNodeRecursion::Continue)
}

fn schema(&self) -> SchemaRef {
Expand Down
17 changes: 10 additions & 7 deletions native/core/src/execution/operators/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! Parquet writer operator for writing RecordBatches to Parquet files

use std::{
any::Any,
collections::HashMap,
fmt,
fmt::{Debug, Formatter},
Expand All @@ -38,6 +37,7 @@ use crate::parquet::parquet_support::{create_hdfs_operator, prepare_object_store
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::{
error::{DataFusionError, Result},
execution::context::TaskContext,
Expand All @@ -46,8 +46,8 @@ use datafusion::{
execution_plan::{Boundedness, EmissionType},
metrics::{ExecutionPlanMetricsSet, MetricsSet},
stream::RecordBatchStreamAdapter,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
SendableRecordBatchStream,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PhysicalExpr,
PlanProperties, SendableRecordBatchStream,
},
};
use futures::TryStreamExt;
Expand Down Expand Up @@ -404,14 +404,17 @@ impl DisplayAs for ParquetWriterExec {

#[async_trait]
impl ExecutionPlan for ParquetWriterExec {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"ParquetWriterExec"
}

fn apply_expressions(
&self,
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
) -> Result<TreeNodeRecursion> {
Ok(TreeNodeRecursion::Continue)
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
Expand Down
9 changes: 6 additions & 3 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use arrow::compute::{cast_with_options, take, CastOptions};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::ffi::FFI_ArrowArray;
use arrow::ffi::FFI_ArrowSchema;
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::metrics::{
Expand All @@ -43,7 +44,6 @@ use itertools::Itertools;
use jni::objects::{Global, JObject, JValue};
use std::rc::Rc;
use std::{
any::Any,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
Expand Down Expand Up @@ -383,8 +383,11 @@ fn schema_from_data_types(data_types: &[DataType]) -> SchemaRef {
}

impl ExecutionPlan for ScanExec {
fn as_any(&self) -> &dyn Any {
self
fn apply_expressions(
&self,
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> DataFusionResult<TreeNodeRecursion>,
) -> DataFusionResult<TreeNodeRecursion> {
Ok(TreeNodeRecursion::Continue)
}

fn schema(&self) -> SchemaRef {
Expand Down
9 changes: 6 additions & 3 deletions native/core/src/execution/operators/shuffle_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
};
use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::common::{arrow_datafusion_err, Result as DataFusionResult};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::metrics::{
Expand All @@ -37,7 +38,6 @@ use datafusion::{
use futures::Stream;
use jni::objects::{Global, JByteBuffer, JObject};
use std::{
any::Any,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
Expand Down Expand Up @@ -221,8 +221,11 @@ fn schema_from_data_types(data_types: &[DataType]) -> SchemaRef {
}

impl ExecutionPlan for ShuffleScanExec {
fn as_any(&self) -> &dyn Any {
self
fn apply_expressions(
&self,
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> DataFusionResult<TreeNodeRecursion>,
) -> DataFusionResult<TreeNodeRecursion> {
Ok(TreeNodeRecursion::Continue)
}

fn schema(&self) -> SchemaRef {
Expand Down
2 changes: 1 addition & 1 deletion native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1704,7 +1704,7 @@ impl PhysicalPlanner {
hash_join.as_ref().swap_inputs(PartitionMode::Partitioned)?;

let mut additional_native_plans = vec![];
if swapped_hash_join.as_any().is::<ProjectionExec>() {
if swapped_hash_join.is::<ProjectionExec>() {
// a projection was added to the hash join
additional_native_plans.push(Arc::clone(swapped_hash_join.children()[0]));
}
Expand Down
16 changes: 9 additions & 7 deletions native/shuffle/src/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use crate::partitioners::{
use crate::{CometPartitioning, CompressionCodec};
use async_trait::async_trait;
use datafusion::common::exec_datafusion_err;
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::EmptyRecordBatchStream;
use datafusion::{
Expand All @@ -41,7 +42,6 @@ use datafusion::{
use datafusion_comet_common::tracing::with_trace_async;
use futures::{StreamExt, TryFutureExt, TryStreamExt};
use std::{
any::Any,
fmt,
fmt::{Debug, Formatter},
sync::Arc,
Expand Down Expand Up @@ -120,15 +120,17 @@ impl DisplayAs for ShuffleWriterExec {

#[async_trait]
impl ExecutionPlan for ShuffleWriterExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"ShuffleWriterExec"
}

fn apply_expressions(
&self,
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
) -> Result<TreeNodeRecursion> {
Ok(TreeNodeRecursion::Continue)
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
Expand Down
9 changes: 2 additions & 7 deletions native/spark-expr/src/agg_funcs/avg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion::logical_expr::{
Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature,
};
use datafusion::physical_expr::expressions::format_state_name;
use std::{any::Any, sync::Arc};
use std::sync::Arc;

use arrow::array::ArrowNativeTypeOp;
use datafusion::logical_expr::function::{AccumulatorArgs, StateFieldsArgs};
Expand Down Expand Up @@ -67,11 +67,6 @@ impl Avg {
}

impl AggregateUDFImpl for Avg {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
// All numeric types use Float64 accumulation after casting
match (&self.input_data_type, &self.result_data_type) {
Expand Down Expand Up @@ -239,7 +234,7 @@ where
impl<T, F> GroupsAccumulator for AvgGroupsAccumulator<T, F>
where
T: ArrowNumericType + Send,
F: Fn(T::Native, i64) -> Result<T::Native> + Send,
F: Fn(T::Native, i64) -> Result<T::Native> + Send + 'static,
{
fn update_batch(
&mut self,
Expand Down
7 changes: 1 addition & 6 deletions native/spark-expr/src/agg_funcs/avg_decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion::logical_expr::{
Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature,
};
use datafusion::physical_expr::expressions::format_state_name;
use std::{any::Any, sync::Arc};
use std::sync::Arc;

use crate::utils::{build_bool_state, is_valid_decimal_precision, unlikely};
use crate::{decimal_sum_overflow_error, EvalMode, SparkErrorWithContext};
Expand Down Expand Up @@ -108,11 +108,6 @@ impl AvgDecimal {
}

impl AggregateUDFImpl for AvgDecimal {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
match (&self.sum_data_type, &self.result_data_type) {
(Decimal128(sum_precision, sum_scale), Decimal128(target_precision, target_scale)) => {
Expand Down
7 changes: 1 addition & 6 deletions native/spark-expr/src/agg_funcs/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use arrow::compute::{and, filter, is_not_null};

use std::{any::Any, sync::Arc};
use std::sync::Arc;

use crate::agg_funcs::covariance::CovarianceAccumulator;
use crate::agg_funcs::stddev::StddevAccumulator;
Expand Down Expand Up @@ -58,11 +58,6 @@ impl Correlation {
}

impl AggregateUDFImpl for Correlation {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
&self.name
}
Expand Down
6 changes: 0 additions & 6 deletions native/spark-expr/src/agg_funcs/covariance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use datafusion::logical_expr::type_coercion::aggregates::NUMERICS;
use datafusion::logical_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility};
use datafusion::physical_expr::expressions::format_state_name;
use datafusion::physical_expr::expressions::StatsType;
use std::any::Any;
use std::sync::Arc;

/// COVAR_SAMP and COVAR_POP aggregate expression
Expand Down Expand Up @@ -73,11 +72,6 @@ impl Covariance {
}

impl AggregateUDFImpl for Covariance {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
&self.name
}
Expand Down
7 changes: 1 addition & 6 deletions native/spark-expr/src/agg_funcs/stddev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::{any::Any, sync::Arc};
use std::sync::Arc;

use crate::agg_funcs::variance::VarianceAccumulator;
use arrow::datatypes::FieldRef;
Expand Down Expand Up @@ -78,11 +78,6 @@ impl Stddev {
}

impl AggregateUDFImpl for Stddev {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
&self.name
}
Expand Down
6 changes: 1 addition & 5 deletions native/spark-expr/src/agg_funcs/sum_decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion::logical_expr::Volatility::Immutable;
use datafusion::logical_expr::{
Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature,
};
use std::{any::Any, sync::Arc};
use std::sync::Arc;

#[derive(Debug)]
pub struct SumDecimal {
Expand Down Expand Up @@ -99,10 +99,6 @@ impl SumDecimal {
}

impl AggregateUDFImpl for SumDecimal {
fn as_any(&self) -> &dyn Any {
self
}

fn accumulator(&self, _args: AccumulatorArgs) -> DFResult<Box<dyn Accumulator>> {
Ok(Box::new(SumDecimalAccumulator::new(
self.precision,
Expand Down
Loading
Loading