Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1675,6 +1675,12 @@
],
"sqlState" : "42K09"
},
"EVENT_TIME_MUST_BE_TOP_LEVEL_COLUMN" : {
"message" : [
"The event time column <eventExpr> must be a top-level column in the schema."
],
"sqlState" : "42K09"
},
"EXCEED_LIMIT_LENGTH" : {
"message" : [
"Exceeds char/varchar type length limitation: <limit>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ class Analyzer(
DeduplicateRelations ::
ResolveCollationName ::
ResolveMergeIntoSchemaEvolution ::
ValidateEventTimeWatermarkColumn ::
new ResolveReferences(catalogManager) ::
// Please do not insert any other rules in between. See the TODO comments in rule
// ResolveLateralColumnAliasReference for more details.
Expand Down Expand Up @@ -4019,6 +4020,40 @@ object CleanupAliases extends Rule[LogicalPlan] with AliasHelper {
}
}

/**
* Validates that the event time column in EventTimeWatermark is a top-level column reference
* (e.g. a single name), not a nested field (e.g. "struct_col.field").
*
* Multi-part names are allowed when they resolve to a top-level attribute via a table alias
* (e.g. "alias.column"), but rejected when they resolve to a nested struct field extraction.
*/
object ValidateEventTimeWatermarkColumn extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
if (!conf.getConf(SQLConf.STREAMING_VALIDATE_EVENT_TIME_WATERMARK_COLUMN)) {
return plan
}
plan.resolveOperatorsWithPruning(
_.containsPattern(EVENT_TIME_WATERMARK)) {
case etw: EventTimeWatermark =>
etw.eventTime match {
case u: UnresolvedAttribute if u.nameParts.length > 1 =>
// Try to resolve the multi-part name against the child output.
// An alias-qualified column (e.g. "a.eventTime") resolves to an Attribute,
// while a nested struct field (e.g. "struct_col.field") resolves to an
// Alias(ExtractValue(...)) which is not an Attribute.
etw.child.resolve(u.nameParts, conf.resolver) match {
case Some(_: Attribute) => etw
case _ =>
etw.failAnalysis(
errorClass = "EVENT_TIME_MUST_BE_TOP_LEVEL_COLUMN",
messageParameters = Map("eventExpr" -> u.sql))
}
case _ => etw
}
}
}
}

/**
* Ignore event time watermark in batch query, which is only supported in Structured Streaming.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3460,6 +3460,13 @@ object SQLConf {
"Valid values are 'min' and 'max'")
.createWithDefault("min") // must be same as MultipleWatermarkPolicy.DEFAULT_POLICY_NAME

val STREAMING_VALIDATE_EVENT_TIME_WATERMARK_COLUMN =
buildConf("spark.sql.streaming.validateEventTimeWatermarkColumn")
.doc("When true, check that eventTime in withWatermark is a top-level column.")
.version("4.2.0")
.booleanConf
.createWithDefault(true)

val OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD =
buildConf("spark.sql.objectHashAggregate.sortBased.fallbackThreshold")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,20 @@ class ClientStreamingQuerySuite extends QueryTest with RemoteSparkSession with L
}
}
}

test("withWatermark fails for nested column") {
val df = spark.sql(
"SELECT 1 as id, struct(to_timestamp('2024-01-01 10:00:00') as timestamp, 'val1' as value) " +
"as nested_struct")
val e = intercept[AnalysisException] {
df.withWatermark("nested_struct.timestamp", "0 seconds").schema
}
checkError(
e,
condition = "EVENT_TIME_MUST_BE_TOP_LEVEL_COLUMN",
parameters = Map("eventExpr" -> ".*nested_struct.*timestamp.*"),
matchPVals = true)
}
}

class TestForeachWriter[T] extends ForeachWriter[T] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.UTC
import org.apache.spark.sql.execution.streaming.operators.stateful.{EventTimeStats, StateStoreSaveExec}
import org.apache.spark.sql.execution.streaming.runtime._
import org.apache.spark.sql.execution.streaming.sources.MemorySink
import org.apache.spark.sql.functions.{count, expr, timestamp_seconds, window}
import org.apache.spark.sql.functions.{count, expr, struct, timestamp_seconds, to_timestamp, window}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode._
import org.apache.spark.tags.SlowSQLTest
Expand Down Expand Up @@ -131,6 +131,50 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
assert(e.getMessage contains "int")
}

test("error on nested column") {
// Cannot past nested column as `eventTime` to `withWatermark`.
val e = intercept[AnalysisException] {
Seq((1, ("2024-01-01 10:00:00", "val1")))
.toDF("id", "data")
.select(
$"id",
struct(
to_timestamp($"data._1").as("timestamp"),
$"data._2".as("value")
).as("nested_struct")
)
.withWatermark("nested_struct.timestamp", "0 seconds")
.schema
}
checkError(
e,
condition = "EVENT_TIME_MUST_BE_TOP_LEVEL_COLUMN",
parameters = Map("eventExpr" -> ".*nested_struct.*timestamp.*"),
matchPVals = true)
}

test("withWatermark should work with alias-qualified column name") {
// When a DataFrame has an alias, referencing the event time column via
// "alias.columnName" should be allowed because it still refers to a top-level column.
val inputData = MemoryStream[Int]
val df = inputData.toDF()
.withColumn("eventTime", timestamp_seconds($"value"))
.alias("a")
.withWatermark("a.eventTime", "10 seconds")
.groupBy(window($"eventTime", "5 seconds") as Symbol("window"))
.agg(count("*") as Symbol("count"))
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])

testStream(df)(
AddData(inputData, 15),
CheckAnswer(),
AddData(inputData, 10, 12, 14),
CheckAnswer(),
AddData(inputData, 25),
CheckAnswer((10, 3))
)
}

test("event time and watermark metrics") {
// No event time metrics when there is no watermarking
val inputData1 = MemoryStream[Int]
Expand Down