Skip to content

feat: Support Spark expression hours#3804

Open
0lai0 wants to merge 6 commits intoapache:mainfrom
0lai0:support_spark_hours
Open

feat: Support Spark expression hours#3804
0lai0 wants to merge 6 commits intoapache:mainfrom
0lai0:support_spark_hours

Conversation

@0lai0
Copy link
Copy Markdown
Contributor

@0lai0 0lai0 commented Mar 27, 2026

Which issue does this PR close?

Closes #3125

Rationale for this change

Comet previously did not support the Spark hours expression (a V2 partition transform).
Queries using the hours function for partitioning would fall back to Spark's JVM execution instead of running natively on DataFusion. By adding native support for this expression, we allow more Spark workloads (especially those partitioned by hourly intervals) to benefit from Comet's native acceleration.

What changes are included in this PR?

This change adds end-to-end native support for the hours partition transform. Since Hours is a PartitionTransformExpression (and not a TimeZoneAwareExpression), the timezone is injected from the session configuration during the planning phase.
The native implementation uses Arrow's unary and try_unary kernels for efficient vectorized computationThe semantics are: hours since Unix epoch (1970-01-01 00:00:00 UTC), computed by floor-dividing the raw microsecond value by 3_600_000_000. Both TimestampType and TimestampNTZType use the same arithmetic — no session timezone offset is applied, since this transform is always UTC-based..

  • expr.proto: Added HoursTransform message definition to pass the child expression and session timezone.
  • datetime.scala: Added CometHours serde handler to intercept the Spark Hours expression and read the timezone from SQLConf.
  • QueryPlanSerde.scala: Registered the CometHours handler in the temporal expressions map.
  • hours.rs: Added SparkHoursTransform UDF using efficient Arrow kernels.
  • temporal.rs & expression_registry.rs: Registered the native Builder for the new expression.

How are these changes tested?

Added comprehensive evaluation in both Rust and Scala:

  1. Rust Unit Tests : Added unit tests in hours.rs covering:
    • Positive/negative (pre-epoch) epoch handling
    • Epoch boundary (zero)
    • Timezone offset handling
    • Null propagation
    • Proper isolation of TimestampNTZType (ensuring it ignores timezone offsets)
    cargo test -p datafusion-comet-spark-expr -- datetime_funcs::hours
  2. Scala Integration Tests: Evaluated end-to-end execution in CometTemporalExpressionSuite.
    ./mvnw test -pl spark -Dsuites='org.apache.comet.CometTemporalExpressionSuite'

Copy link
Copy Markdown
Contributor

@parthchandra parthchandra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR text says there is an end-to-end test but there doesn't seem to be any. CometTemporalExpressionSuite is probably the right place to add such a test similar to the "days - timestamp(input)" test.

)?;
let offset_secs =
tz.offset_from_utc_datetime(&dt).fix().local_minus_utc() as i64;
let local_micros = micros + offset_secs * 1_000_000;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Spark's corresponding implementation in InMemoryBaseTable it looks like the session timezone is not being applied.
Can you add a unit test that reads from InMemoryBaseTable and compares with the results produced by Spark ?

Copy link
Copy Markdown
Contributor Author

@0lai0 0lai0 Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, thanks @parthchandra for review. I’ll correct it, add the missing test, and update it in the next commit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR updated. Thanks!


match args {
[ColumnarValue::Array(array)] => {
let ts_array = as_primitive_array::<TimestampMicrosecondType>(&array);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be after the match on array.data_type in the DataType::Timestamp(Microsecond, _) arm. This would panic for other types.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Moved the cast inside the DataType::Timestamp(Microsecond,_) to prevent panics on unsupported types.

let result: Int32Array = match array.data_type() {
DataType::Timestamp(Microsecond, _) => {
arrow::compute::kernels::arity::unary(ts_array, |micros| {
micros.div_euclid(MICROS_PER_HOUR) as i32
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why div_euclid? Elsewhere the code is generally using div_floor

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to use div_floor to match the rest of the codebase. Thanks for pointing this out!

binding: Boolean): Option[ExprOuterClass.Expr] = {
val childExpr = exprToProtoInternal(expr.child, inputs, binding)

if (childExpr.isDefined) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to explicitly check the child expr datatype and only allow valid types, fall back otherwise.
See CometDays below.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Added explicit type checking to only allow TimestampType and TimestampNTZType, and it will now fall back for other types similarly to CometDays.

val ts = row.getAs[java.time.LocalDateTime]("ts")
val micros = if (ts != null) {
org.apache.spark.sql.catalyst.util.DateTimeUtils.localDateTimeToMicros(ts)
} else 0L // assuming safe non-null
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the timestamp generated by the generator is null, then hours should return null. This will return 0.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. It now properly handles null values and returns null instead of 0.

@0lai0
Copy link
Copy Markdown
Contributor Author

0lai0 commented Apr 10, 2026

Thanks to @parthchandra for the review and feedback. PR has been updated.

Copy link
Copy Markdown
Contributor

@parthchandra parthchandra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. @0lai0 can you resolve the merge conflicts so this can be merged?

@0lai0
Copy link
Copy Markdown
Contributor Author

0lai0 commented Apr 11, 2026

Thanks @parthchandra . Conflicts resolved and PR updated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Support Spark expression: hours

2 participants