Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions docs/source/contributor-guide/spark_expressions_support.md

Large diffs are not rendered by default.

163 changes: 88 additions & 75 deletions spark/src/main/scala/org/apache/comet/serde/datetime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.Locale

import org.apache.spark.sql.catalyst.expressions.{AddMonths, Attribute, ConvertTimezone, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, FromUTCTimestamp, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, MakeTimestamp, MicrosToTimestamp, MillisToTimestamp, Minute, Month, MonthsBetween, NextDay, Quarter, Second, SecondsToTimestamp, ToUnixTimestamp, ToUTCTimestamp, TruncDate, TruncTimestamp, UnixDate, UnixMicros, UnixMillis, UnixSeconds, UnixTimestamp, WeekDay, WeekOfYear, Year}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DateType, DoubleType, FloatType, IntegerType, LongType, StringType, TimestampNTZType, TimestampType}
import org.apache.spark.sql.types.{DataType, DateType, DoubleType, FloatType, IntegerType, LongType, StringType, TimestampNTZType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String

import org.apache.comet.CometConf
Expand Down Expand Up @@ -179,23 +179,24 @@ object CometQuarter extends CometExpressionSerde[Quarter] with CometExprGetDateF
}
}

object CometHour extends CometExpressionSerde[Hour] {
private object TimeFieldSerde {
val timestampNtzIncompatReason: String =
"Incorrectly applies timezone conversion to TimestampNTZ inputs" +
" (https://github.com/apache/datafusion-comet/issues/3180)"

val incompatReason: String = "Incorrectly applies timezone conversion to TimestampNTZ inputs" +
" (https://github.com/apache/datafusion-comet/issues/3180)"
def supportLevelForChild(childType: DataType): SupportLevel = childType match {
case TimestampNTZType => Incompatible(Some(timestampNtzIncompatReason))
case _ => Compatible()
}
}

override def getIncompatibleReasons(): Seq[String] = Seq(incompatReason)
object CometHour extends CometExpressionSerde[Hour] {

override def getSupportLevel(expr: Hour): SupportLevel = {
if (expr.child.dataType == TimestampNTZType) {
Incompatible(
Some(
"Incorrectly applies timezone conversion to TimestampNTZ inputs" +
" (https://github.com/apache/datafusion-comet/issues/3180)"))
} else {
Compatible()
}
}
override def getIncompatibleReasons(): Seq[String] =
Seq(TimeFieldSerde.timestampNtzIncompatReason)

override def getSupportLevel(expr: Hour): SupportLevel =
TimeFieldSerde.supportLevelForChild(expr.child.dataType)

override def convert(
expr: Hour,
Expand Down Expand Up @@ -224,20 +225,11 @@ object CometHour extends CometExpressionSerde[Hour] {

object CometMinute extends CometExpressionSerde[Minute] {

override def getIncompatibleReasons(): Seq[String] = Seq(
"Incorrectly applies timezone conversion to TimestampNTZ inputs" +
" (https://github.com/apache/datafusion-comet/issues/3180)")

override def getSupportLevel(expr: Minute): SupportLevel = {
if (expr.child.dataType == TimestampNTZType) {
Incompatible(
Some(
"Incorrectly applies timezone conversion to TimestampNTZ inputs" +
" (https://github.com/apache/datafusion-comet/issues/3180)"))
} else {
Compatible()
}
}
override def getIncompatibleReasons(): Seq[String] =
Seq(TimeFieldSerde.timestampNtzIncompatReason)

override def getSupportLevel(expr: Minute): SupportLevel =
TimeFieldSerde.supportLevelForChild(expr.child.dataType)

override def convert(
expr: Minute,
Expand Down Expand Up @@ -266,20 +258,11 @@ object CometMinute extends CometExpressionSerde[Minute] {

object CometSecond extends CometExpressionSerde[Second] {

override def getIncompatibleReasons(): Seq[String] = Seq(
"Incorrectly applies timezone conversion to TimestampNTZ inputs" +
" (https://github.com/apache/datafusion-comet/issues/3180)")

override def getSupportLevel(expr: Second): SupportLevel = {
if (expr.child.dataType == TimestampNTZType) {
Incompatible(
Some(
"Incorrectly applies timezone conversion to TimestampNTZ inputs" +
" (https://github.com/apache/datafusion-comet/issues/3180)"))
} else {
Compatible()
}
}
override def getIncompatibleReasons(): Seq[String] =
Seq(TimeFieldSerde.timestampNtzIncompatReason)

override def getSupportLevel(expr: Second): SupportLevel =
TimeFieldSerde.supportLevelForChild(expr.child.dataType)

override def convert(
expr: Second,
Expand Down Expand Up @@ -437,6 +420,11 @@ object CometMakeDate extends CometScalarFunction[MakeDate]("make_date")

object CometSecondsToTimestamp
extends CometScalarFunction[SecondsToTimestamp]("seconds_to_timestamp") {

override def getUnsupportedReasons(): Seq[String] = Seq(
"Only `IntegerType`, `LongType`, `FloatType`, and `DoubleType` inputs are supported." +
" `DecimalType`, `ByteType`, and `ShortType` fall back to Spark.")

override def getSupportLevel(expr: SecondsToTimestamp): SupportLevel =
expr.child.dataType match {
case IntegerType | LongType | FloatType | DoubleType => Compatible()
Expand Down Expand Up @@ -482,8 +470,14 @@ object CometTruncDate extends CometExpressionSerde[TruncDate] {
val supportedFormats: Seq[String] =
Seq("year", "yyyy", "yy", "quarter", "mon", "month", "mm", "week")

override def getIncompatibleReasons(): Seq[String] = Seq(
"Non-literal format strings will throw an exception instead of returning NULL")
private val nonLiteralFormatIncompatReason: String =
"Non-literal format strings will throw an exception instead of returning NULL"

private def unsupportedFormatReason(fmt: Any): String =
s"Format $fmt is not supported. Only the following formats are supported: " +
supportedFormats.mkString(", ")

override def getIncompatibleReasons(): Seq[String] = Seq(nonLiteralFormatIncompatReason)

override def getUnsupportedReasons(): Seq[String] = Seq(
"Only the following formats are supported: " + supportedFormats.mkString(", "))
Expand All @@ -494,11 +488,10 @@ object CometTruncDate extends CometExpressionSerde[TruncDate] {
if (supportedFormats.contains(fmt.toString.toLowerCase(Locale.ROOT))) {
Compatible()
} else {
Unsupported(Some(s"Format $fmt is not supported"))
Unsupported(Some(unsupportedFormatReason(fmt)))
}
case _ =>
Incompatible(
Some("Invalid format strings will throw an exception instead of returning NULL"))
Incompatible(Some(nonLiteralFormatIncompatReason))
}
}

Expand All @@ -521,10 +514,6 @@ object CometTruncDate extends CometExpressionSerde[TruncDate] {

object CometTruncTimestamp extends CometExpressionSerde[TruncTimestamp] {

override def getIncompatibleReasons(): Seq[String] = Seq(
"Produces incorrect results when used with non-UTC timezones. Compatible when timezone is" +
" UTC. (https://github.com/apache/datafusion-comet/issues/2649)")

val supportedFormats: Seq[String] =
Seq(
"year",
Expand All @@ -543,6 +532,23 @@ object CometTruncTimestamp extends CometExpressionSerde[TruncTimestamp] {
"millisecond",
"microsecond")

private val nonUtcIncompatReason: String =
"Produces incorrect results when used with non-UTC timezones. Compatible when timezone is" +
" UTC. (https://github.com/apache/datafusion-comet/issues/2649)"

private val nonLiteralFormatIncompatReason: String =
"Non-literal format strings will throw an exception instead of returning NULL"

private def unsupportedFormatReason(fmt: Any): String =
s"Format $fmt is not supported. Only the following formats are supported: " +
supportedFormats.mkString(", ")

override def getIncompatibleReasons(): Seq[String] =
Seq(nonUtcIncompatReason, nonLiteralFormatIncompatReason)

override def getUnsupportedReasons(): Seq[String] = Seq(
"Only the following formats are supported: " + supportedFormats.mkString(", "))

override def getSupportLevel(expr: TruncTimestamp): SupportLevel = {
val timezone = expr.timeZoneId.getOrElse("UTC")
val isUtc = timezone == "UTC" || timezone == "Etc/UTC"
Expand All @@ -552,17 +558,13 @@ object CometTruncTimestamp extends CometExpressionSerde[TruncTimestamp] {
if (isUtc) {
Compatible()
} else {
Incompatible(
Some(
s"Incorrect results in non-UTC timezone '$timezone'" +
" (https://github.com/apache/datafusion-comet/issues/2649)"))
Incompatible(Some(nonUtcIncompatReason))
}
} else {
Unsupported(Some(s"Format $fmt is not supported"))
Unsupported(Some(unsupportedFormatReason(fmt)))
}
case _ =>
Incompatible(
Some("Invalid format strings will throw an exception instead of returning NULL"))
Incompatible(Some(nonLiteralFormatIncompatReason))
}
}

Expand Down Expand Up @@ -700,24 +702,27 @@ object CometDateFormat extends CometExpressionSerde[DateFormatClass] {
* without applying any session timezone offset.
*/
object CometHours extends CometExpressionSerde[Hours] {

override def getUnsupportedReasons(): Seq[String] = Seq(
"Only `TimestampType` and `TimestampNTZType` inputs are supported.")

override def getSupportLevel(expr: Hours): SupportLevel = expr.child.dataType match {
case TimestampType | TimestampNTZType => Compatible()
case other => Unsupported(Some(s"Hours does not support input type: $other"))
}

override def convert(
expr: Hours,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
val optExpr = expr.child.dataType match {
case TimestampType | TimestampNTZType =>
exprToProtoInternal(expr.child, inputs, binding).map { childExpr =>
val builder = ExprOuterClass.HoursTransform.newBuilder()
builder.setChild(childExpr)
val optExpr = exprToProtoInternal(expr.child, inputs, binding).map { childExpr =>
val builder = ExprOuterClass.HoursTransform.newBuilder()
builder.setChild(childExpr)

ExprOuterClass.Expr
.newBuilder()
.setHoursTransform(builder)
.build()
}
case other =>
withInfo(expr, s"Hours does not support input type: $other")
None
ExprOuterClass.Expr
.newBuilder()
.setHoursTransform(builder)
.build()
}
optExprWithInfo(optExpr, expr, expr.child)
}
Expand All @@ -734,6 +739,16 @@ object CometHours extends CometExpressionSerde[Hours] {
* The first cast respects the session timezone to correctly determine the date boundary.
*/
object CometDays extends CometExpressionSerde[Days] {

override def getUnsupportedReasons(): Seq[String] = Seq(
"Only `DateType` and `TimestampType` inputs are supported." +
" `TimestampNTZType` is not supported.")

override def getSupportLevel(expr: Days): SupportLevel = expr.child.dataType match {
case DateType | TimestampType => Compatible()
case other => Unsupported(Some(s"Days does not support input type: $other"))
}

override def convert(
expr: Days,
inputs: Seq[Attribute],
Expand All @@ -748,9 +763,7 @@ object CometDays extends CometExpressionSerde[Days] {
childExpr.flatMap { child =>
CometCast.castToProto(expr, Some(timezone), DateType, child, CometEvalMode.LEGACY)
}
case other =>
withInfo(expr, s"Days does not support input type: $other")
None
case _ => None
}

// Convert DateType to IntegerType (days since epoch)
Expand Down
29 changes: 20 additions & 9 deletions spark/src/main/scala/org/apache/comet/serde/unixtime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,26 @@ import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithIn
// https://github.com/apache/datafusion/issues/16594
object CometFromUnixTime extends CometExpressionSerde[FromUnixTime] {

override def getIncompatibleReasons(): Seq[String] = Seq(
"Only supports the default datetime format pattern `yyyy-MM-dd HH:mm:ss`." +
" DataFusion's valid timestamp range differs from Spark" +
" (https://github.com/apache/datafusion/issues/16594)")
private val incompatReason: String =
"DataFusion's valid timestamp range differs from Spark" +
" (https://github.com/apache/datafusion/issues/16594)"

override def getSupportLevel(expr: FromUnixTime): SupportLevel = Incompatible(None)
private val unsupportedFormatReason: String =
"Only the default datetime format pattern `yyyy-MM-dd HH:mm:ss` is supported;" +
" other patterns fall back to Spark" +
" (https://github.com/apache/datafusion/issues/16577)"

override def getIncompatibleReasons(): Seq[String] = Seq(incompatReason)

override def getUnsupportedReasons(): Seq[String] = Seq(unsupportedFormatReason)

override def getSupportLevel(expr: FromUnixTime): SupportLevel = {
if (expr.format != Literal(TimestampFormatter.defaultPattern)) {
Unsupported(Some(unsupportedFormatReason))
} else {
Incompatible(Some(incompatReason))
}
}

override def convert(
expr: FromUnixTime,
Expand All @@ -48,10 +62,7 @@ object CometFromUnixTime extends CometExpressionSerde[FromUnixTime] {
val formatExpr = exprToProtoInternal(Literal("%Y-%m-%d %H:%M:%S"), inputs, binding)
val timeZone = exprToProtoInternal(Literal(expr.timeZoneId.orNull), inputs, binding)

if (expr.format != Literal(TimestampFormatter.defaultPattern)) {
withInfo(expr, "Datetime pattern format is unsupported")
None
} else if (secExpr.isDefined && formatExpr.isDefined) {
if (secExpr.isDefined && formatExpr.isDefined) {
val timestampExpr =
scalarFunctionExprToProto("from_unixtime", Seq(secExpr, timeZone): _*)
val optExpr = scalarFunctionExprToProto("to_char", Seq(timestampExpr, formatExpr): _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ INSERT INTO test_from_unix_time VALUES (0), (1718451045), (-1), (NULL), (2147483
query expect_fallback(not fully compatible with Spark)
SELECT from_unixtime(t) FROM test_from_unix_time

query expect_fallback(not fully compatible with Spark)
query expect_fallback(Only the default datetime format pattern)
SELECT from_unixtime(t, 'yyyy-MM-dd') FROM test_from_unix_time

-- literal arguments
query expect_fallback(not fully compatible with Spark)
SELECT from_unixtime(0)

query expect_fallback(not fully compatible with Spark)
query expect_fallback(Only the default datetime format pattern)
SELECT from_unixtime(1718451045, 'yyyy-MM-dd')
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- ANSI mode: Spark's MakeDate wraps the java.time.DateTimeException from LocalDate.of in
-- ansiDateTimeArgumentOutOfRange (4.0) / ansiDateTimeError (3.4/3.5) when
-- spark.sql.ansi.enabled=true. Comet's native SparkMakeDate always returns NULL on
-- invalid input and never raises, so it does not throw under ANSI. The ignored queries
-- below capture the divergence; remove ignore(...) when
-- https://github.com/apache/datafusion-comet/issues/4451 is fixed.
-- Config: spark.sql.ansi.enabled=true

-- February 30 is not a valid date.
query ignore(https://github.com/apache/datafusion-comet/issues/4451)
SELECT make_date(2024, 2, 30)

-- Month 13 is out of range.
query ignore(https://github.com/apache/datafusion-comet/issues/4451)
SELECT make_date(2024, 13, 1)

-- Day 0 is out of range.
query ignore(https://github.com/apache/datafusion-comet/issues/4451)
SELECT make_date(2024, 6, 0)
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,8 @@ SELECT next_day(date('2023-01-01'), 'Monday'), next_day(date('2023-01-01'), 'Sun
-- null handling
query
SELECT next_day(NULL, 'Monday'), next_day(date('2023-01-01'), NULL)

-- Comet's native impl trims whitespace before matching the day name; Spark does not, so
-- ' MO ' is invalid in Spark (NULL) but matches Monday in Comet.
query ignore(https://github.com/apache/datafusion-comet/issues/4450)
SELECT next_day(date('2024-01-01'), ' MO '), next_day(date('2024-01-01'), 'MO ')
Loading
Loading