Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions docs/source/contributor-guide/spark_expressions_support.md
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,41 @@
### hash_funcs

- [x] crc32
- Spark 3.4.3 (audited 2026-05-27): identical to 3.5.8.
- Spark 3.5.8 (audited 2026-05-27): baseline. `Crc32(child) extends UnaryExpression`; `inputTypes = Seq(BinaryType) -> LongType`. Wired as `CometScalarFunction("crc32")`.
- Spark 4.0.1 (audited 2026-05-27): semantics unchanged; `NullIntolerant` trait replaced by `nullIntolerant: Boolean` override.
- Spark 4.1.1 (audited 2026-05-27): identical to 4.0.1.
- [x] hash
- Spark 3.4.3 (audited 2026-05-27): identical to 3.5.8.
- Spark 3.5.8 (audited 2026-05-27): baseline. `Murmur3Hash(children, seed) extends HashExpression[Int]`; produces a Murmur3 hash with a configurable Int seed and `IntegerType` result. Comet routes via `CometMurmur3Hash` to the native `murmur3_hash` UDF.
- Spark 4.0.1 (audited 2026-05-27): semantics unchanged; some inner helper refactors only.
- Spark 4.1.1 (audited 2026-05-27): identical to 4.0.1.
- Known limitation: `DecimalType` children with precision > 18 fall back because Spark hashes them through Java `BigDecimal`; `TimeType` (Spark 4.0+) is also unsupported. The same limitations apply to `xxhash64`, `sha1`, `sha2` through the shared `HashUtils`.
- [x] md5
- Spark 3.4.3 (audited 2026-05-27): identical to 3.5.8.
- Spark 3.5.8 (audited 2026-05-27): baseline. `Md5(child) extends UnaryExpression`; `inputTypes = Seq(BinaryType) -> StringType`. Wired as `CometScalarFunction("md5")`.
- Spark 4.0.1 (audited 2026-05-27): semantics unchanged; trait set gains `DefaultStringProducingExpression` and the `nullIntolerant: Boolean` refactor.
- Spark 4.1.1 (audited 2026-05-27): identical to 4.0.1.
- [x] sha
- Spark 3.4.3 (audited 2026-05-27): registry alias of `Sha1`. Same support as `sha1`.
- Spark 3.5.8 (audited 2026-05-27): identical to 3.4.3.
- Spark 4.0.1 (audited 2026-05-27): identical to 3.4.3.
- Spark 4.1.1 (audited 2026-05-27): identical to 3.4.3.
- [x] sha1
- Spark 3.4.3 (audited 2026-05-27): identical to 3.5.8.
- Spark 3.5.8 (audited 2026-05-27): baseline. `Sha1(child) extends UnaryExpression with NullIntolerant`; `inputTypes = Seq(BinaryType) -> StringType`. Comet routes via `CometSha1` to the native `sha1` UDF.
- Spark 4.0.1 (audited 2026-05-27): trait set gains `DefaultStringProducingExpression` and `NullIntolerant` is replaced by `nullIntolerant: Boolean`. Runtime unchanged.
- Spark 4.1.1 (audited 2026-05-27): identical to 4.0.1.
- [x] sha2
- Spark 3.4.3 (audited 2026-05-27): identical to 3.5.8.
- Spark 3.5.8 (audited 2026-05-27): baseline. `Sha2(left, right) extends BinaryExpression`; `inputTypes = Seq(BinaryType, IntegerType) -> StringType`. The `numBits` argument selects SHA-224/256/384/512 (0 is treated as 256); other values return NULL. Comet routes via `CometSha2` to the native `sha2` UDF; non-foldable `numBits` falls back to Spark.
- Spark 4.0.1 (audited 2026-05-27): trait set gains `DefaultStringProducingExpression` and the `nullIntolerant: Boolean` refactor. Runtime unchanged.
- Spark 4.1.1 (audited 2026-05-27): identical to 4.0.1.
- [x] xxhash64
- Spark 3.4.3 (audited 2026-05-27): identical to 3.5.8.
- Spark 3.5.8 (audited 2026-05-27): baseline. `XxHash64(children, seed) extends HashExpression[Long]`; produces an xxHash64 hash with a configurable Long seed and `LongType` result. Comet routes via `CometXxHash64` to the native `xxhash64` UDF.
- Spark 4.0.1 (audited 2026-05-27): semantics unchanged.
- Spark 4.1.1 (audited 2026-05-27): identical to 4.0.1.

### json_funcs

Expand Down
109 changes: 59 additions & 50 deletions spark/src/main/scala/org/apache/comet/serde/hash.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@ package org.apache.comet.serde
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Murmur3Hash, Sha1, Sha2, XxHash64}
import org.apache.spark.sql.types.{ArrayType, DataType, DecimalType, IntegerType, LongType, MapType, StringType, StructType}

import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, isTimeType, scalarFunctionExprToProtoWithReturnType, serializeDataType, supportedDataType}

object CometXxHash64 extends CometExpressionSerde[XxHash64] {

override def getUnsupportedReasons(): Seq[String] = HashUtils.unsupportedReasons

override def getSupportLevel(expr: XxHash64): SupportLevel =
HashUtils.supportLevelForChildren(expr)

override def convert(
expr: XxHash64,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
if (!HashUtils.isSupportedType(expr)) {
return None
}
val exprs = expr.children.map(exprToProtoInternal(_, inputs, binding))
val seedBuilder = LiteralOuterClass.Literal
.newBuilder()
Expand All @@ -45,13 +47,16 @@ object CometXxHash64 extends CometExpressionSerde[XxHash64] {
}

object CometMurmur3Hash extends CometExpressionSerde[Murmur3Hash] {

override def getUnsupportedReasons(): Seq[String] = HashUtils.unsupportedReasons

override def getSupportLevel(expr: Murmur3Hash): SupportLevel =
HashUtils.supportLevelForChildren(expr)

override def convert(
expr: Murmur3Hash,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
if (!HashUtils.isSupportedType(expr)) {
return None
}
val exprs = expr.children.map(exprToProtoInternal(_, inputs, binding))
val seedBuilder = LiteralOuterClass.Literal
.newBuilder()
Expand All @@ -68,72 +73,76 @@ object CometMurmur3Hash extends CometExpressionSerde[Murmur3Hash] {
}

object CometSha2 extends CometExpressionSerde[Sha2] {
override def convert(
expr: Sha2,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
if (!HashUtils.isSupportedType(expr)) {
return None
}

// It's possible for spark to dynamically compute the number of bits from input
// expression, however DataFusion does not support that yet.
private val nonFoldableNumBitsReason =
"The `numBits` argument must be a foldable literal value"

override def getUnsupportedReasons(): Seq[String] =
HashUtils.unsupportedReasons :+ nonFoldableNumBitsReason

override def getSupportLevel(expr: Sha2): SupportLevel = {
if (!expr.right.foldable) {
withInfo(expr, "For Sha2, non literal numBits is not supported")
return None
Unsupported(Some(nonFoldableNumBitsReason))
} else {
HashUtils.supportLevelForChildren(expr)
}
}

override def convert(
expr: Sha2,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
val leftExpr = exprToProtoInternal(expr.left, inputs, binding)
val numBitsExpr = exprToProtoInternal(expr.right, inputs, binding)
scalarFunctionExprToProtoWithReturnType("sha2", StringType, false, leftExpr, numBitsExpr)
}
}

object CometSha1 extends CometExpressionSerde[Sha1] {

override def getUnsupportedReasons(): Seq[String] = HashUtils.unsupportedReasons

override def getSupportLevel(expr: Sha1): SupportLevel =
HashUtils.supportLevelForChildren(expr)

override def convert(
expr: Sha1,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
if (!HashUtils.isSupportedType(expr)) {
withInfo(expr, s"HashUtils doesn't support dataType: ${expr.child.dataType}")
return None
}
val childExpr = exprToProtoInternal(expr.child, inputs, binding)
scalarFunctionExprToProtoWithReturnType("sha1", StringType, false, childExpr)
}
}

private object HashUtils {
def isSupportedType(expr: Expression): Boolean = {
for (child <- expr.children) {
if (!isSupportedDataType(expr, child.dataType)) {
return false
}

private val unsupportedDecimalReason =
"`DecimalType` with precision > 18 is not supported (Spark hashes via Java `BigDecimal`)"
private val unsupportedTimeTypeReason = "`TimeType` is not supported"

val unsupportedReasons: Seq[String] =
Seq(unsupportedDecimalReason, unsupportedTimeTypeReason, "Unsupported child data type")

def supportLevelForChildren(expr: Expression): SupportLevel = {
expr.children.iterator
.flatMap(c => unsupportedReasonFor(c.dataType).iterator)
.toSeq
.headOption match {
case Some(reason) => Unsupported(Some(reason))
case None => Compatible()
}
true
}

private def isSupportedDataType(expr: Expression, dt: DataType): Boolean = {
dt match {
case d: DecimalType if d.precision > 18 =>
// Spark converts decimals with precision > 18 into
// Java BigDecimal before hashing
withInfo(expr, s"Unsupported datatype: $dt (precision > 18)")
false
case s: StructType =>
s.fields.forall(f => isSupportedDataType(expr, f.dataType))
case a: ArrayType =>
isSupportedDataType(expr, a.elementType)
case m: MapType =>
isSupportedDataType(expr, m.keyType) && isSupportedDataType(expr, m.valueType)
case dt if isTimeType(dt) =>
withInfo(expr, s"Unsupported datatype $dt")
false
case _ if !supportedDataType(dt, allowComplex = true) =>
withInfo(expr, s"Unsupported datatype $dt")
false
case _ =>
true
}
private def unsupportedReasonFor(dt: DataType): Option[String] = dt match {
case d: DecimalType if d.precision > 18 => Some(unsupportedDecimalReason)
case s: StructType =>
s.fields.iterator.flatMap(f => unsupportedReasonFor(f.dataType).iterator).toSeq.headOption
case a: ArrayType => unsupportedReasonFor(a.elementType)
case m: MapType =>
unsupportedReasonFor(m.keyType).orElse(unsupportedReasonFor(m.valueType))
case dt if isTimeType(dt) => Some(unsupportedTimeTypeReason)
case _ if !supportedDataType(dt, allowComplex = true) =>
Some(s"Unsupported child data type: $dt")
case _ => None
}
}
Loading