Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,43 @@ package org.apache.spark.sql.catalyst.plans.logical
import java.util.UUID
import java.util.concurrent.TimeUnit

import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.CONFIG
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark.updateEventTimeColumn
import org.apache.spark.sql.catalyst.trees.TreePattern.{EVENT_TIME_WATERMARK, TreePattern, UPDATE_EVENT_TIME_WATERMARK_COLUMN}
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.MetadataBuilder
import org.apache.spark.unsafe.types.CalendarInterval

object EventTimeWatermark {
object EventTimeWatermark extends Logging {
/** The [[org.apache.spark.sql.types.Metadata]] key used to hold the eventTime watermark delay. */
val delayKey = "spark.watermarkDelayMs"

/**
* The effective delay in milliseconds for `withWatermark`. When the configured delay is 0 and
* [[SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS]] is enabled (the default), it is
* bumped to 1 ms.
*
* Spark's late-event filter and state-eviction predicates compare event times to the watermark
* with `event_time_us <= watermark_ms * 1000`, so with a delay of 0 every record whose event
* time lands on the same millisecond as the current watermark is treated as late. Bumping the
* effective delay to 1 ms makes the comparison strict in practice and avoids that footgun.
*/
def getDelayMs(delay: CalendarInterval): Long = {
IntervalUtils.getDuration(delay, TimeUnit.MILLISECONDS)
val rawDelayMs = IntervalUtils.getDuration(delay, TimeUnit.MILLISECONDS)
if (rawDelayMs == 0 &&
SQLConf.get.getConf(SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS)) {
logWarning(log"withWatermark was called with a delay of 0; bumping the internal delay to " +
log"1 ms so that records whose event time equals the current watermark are not dropped " +
log"as late. Set " +
log"${MDC(CONFIG, SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key)}=false " +
log"to disable.")
1L
} else {
rawDelayMs
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3689,6 +3689,18 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS =
buildConf("spark.sql.streaming.eventTimeWatermark.bumpZeroDelayToOneMs")
.doc("When true, a zero watermark delay configured via withWatermark is internally " +
"bumped to 1 millisecond, and a warning is logged. Without this bump, the late-event " +
"predicate `event_time_us <= watermark_ms * 1000` drops every record whose event time " +
"lands on the same millisecond as the current watermark; with a delay of 0 seconds " +
"this means all but the first record per millisecond is dropped. Set to false to " +
"restore the pre-Spark 5.0 behavior. Non-zero delays are not affected.")
.version("5.0.0")
.booleanConf
.createWithDefault(true)

val OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD =
buildConf("spark.sql.objectHashAggregate.sortBased.fallbackThreshold")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,47 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
matchPVals = true)
}

test("zero-delay watermark keeps records at max event time across batches") {
// When the configured delay is 0 seconds, the late-event predicate `timestamp_us <=
// watermark_ms * 1000` drops every record at exactly the watermark, which means a record
// arriving in a later batch at the previous batch's max event time is dropped. Bumping the
// internal delay to 1 ms preserves such records.
def buildQuery(input: MemoryStream[Int]): Dataset[(Long, Long)] = input.toDF()
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "0 seconds")
.groupBy($"eventTime")
.agg(count("*").as("cnt"))
.select($"eventTime".cast("long").as[Long], $"cnt".as[Long])

val input = MemoryStream[Int]
testStream(buildQuery(input), outputMode = Append)(
AddData(input, 10),
// With the bump, watermark = 10s - 1 ms after the batch, so the group for eventTime=10
// is retained and nothing is emitted yet.
CheckAnswer(),
AddData(input, 10),
// Same timestamp as the previous batch's max: must NOT be dropped as late.
CheckAnswer(),
AddData(input, 11),
// Once the watermark strictly passes 10 s, both records show up in the emitted group.
CheckAnswer((10L, 2L))
)

withSQLConf(SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key -> "false") {
val input2 = MemoryStream[Int]
testStream(buildQuery(input2), outputMode = Append)(
AddData(input2, 10),
// Legacy: watermark = 10s exactly, so the group is evicted and emitted immediately.
CheckAnswer((10L, 1L)),
AddData(input2, 10),
// The second record at 10 s is dropped by the late-event filter under legacy semantics.
CheckAnswer((10L, 1L)),
AddData(input2, 11),
CheckAnswer((10L, 1L), (11L, 1L))
)
}
}

test("withWatermark should work with alias-qualified column name") {
// When a DataFrame has an alias, referencing the event time column via
// "alias.columnName" should be allowed because it still refers to a top-level column.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ class MultiStatefulOperatorsSuite
}

test("agg -> agg -> agg, append mode") {
// The expected watermark progression and window emission boundaries below are calibrated
// against the legacy 0-delay semantics; disable the auto-bump so the test continues to
// exercise that boundary math.
withSQLConf(SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key -> "false") {
val inputData = MemoryStream[Int]

val stream = inputData.toDF()
Expand Down Expand Up @@ -224,6 +228,7 @@ class MultiStatefulOperatorsSuite
assertNumStateRows(Seq(0, 0, 1)),
assertNumRowsDroppedByWatermark(Seq(0, 0, 1))
)
}
}

test("stream deduplication -> aggregation, append mode") {
Expand Down Expand Up @@ -269,6 +274,9 @@ class MultiStatefulOperatorsSuite
}

test("join -> window agg, append mode") {
// The expected watermark progression below assumes the legacy 0-delay boundary; disable the
// auto-bump so the test continues to exercise that boundary math.
withSQLConf(SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key -> "false") {
val input1 = MemoryStream[Int]
val inputDF1 = input1.toDF()
.withColumnRenamed("value", "value1")
Expand Down Expand Up @@ -334,9 +342,13 @@ class MultiStatefulOperatorsSuite
assertNumStateRows(Seq(1, 0)),
assertNumRowsDroppedByWatermark(Seq(0, 0))
)
}
}

test("aggregation -> stream deduplication, append mode") {
// The expected watermark progression below assumes the legacy 0-delay boundary; disable the
// auto-bump so the test continues to exercise that boundary math.
withSQLConf(SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key -> "false") {
val inputData = MemoryStream[Int]

val aggStream = inputData.toDF()
Expand Down Expand Up @@ -415,37 +427,42 @@ class MultiStatefulOperatorsSuite
assertNumStateRows(Seq(0, 1)),
assertNumRowsDroppedByWatermark(Seq(0, 0))
)
}
}

test("join with range join on non-time intervals -> window agg, append mode, shouldn't fail") {
val input1 = MemoryStream[Int]
val inputDF1 = input1.toDF()
.withColumnRenamed("value", "value1")
.withColumn("eventTime1", timestamp_seconds($"value1"))
.withColumn("v1", timestamp_seconds($"value1"))
.withWatermark("eventTime1", "0 seconds")

val input2 = MemoryStream[(Int, Int)]
val inputDF2 = input2.toDS().toDF("start", "end")
.withColumn("eventTime2Start", timestamp_seconds($"start"))
.withColumn("start2", timestamp_seconds($"start"))
.withColumn("end2", timestamp_seconds($"end"))
.withWatermark("eventTime2Start", "0 seconds")

val stream = inputDF1.join(inputDF2,
expr("v1 >= start2 AND v1 < end2 " +
"AND eventTime1 = start2"), "inner")
.groupBy(window($"eventTime1", "5 seconds") as Symbol("window"))
.agg(count("*") as Symbol("count"))
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])

testStream(stream)(
AddData(input1, 1, 2, 3, 4),
AddData(input2, (1, 2), (2, 3), (3, 4), (4, 5)),
CheckNewAnswer(),
assertNumStateRows(Seq(1, 0)),
assertNumRowsDroppedByWatermark(Seq(0, 0))
)
// The expected watermark progression below assumes the legacy 0-delay boundary; disable the
// auto-bump so the test continues to exercise that boundary math.
withSQLConf(SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key -> "false") {
val input1 = MemoryStream[Int]
val inputDF1 = input1.toDF()
.withColumnRenamed("value", "value1")
.withColumn("eventTime1", timestamp_seconds($"value1"))
.withColumn("v1", timestamp_seconds($"value1"))
.withWatermark("eventTime1", "0 seconds")

val input2 = MemoryStream[(Int, Int)]
val inputDF2 = input2.toDS().toDF("start", "end")
.withColumn("eventTime2Start", timestamp_seconds($"start"))
.withColumn("start2", timestamp_seconds($"start"))
.withColumn("end2", timestamp_seconds($"end"))
.withWatermark("eventTime2Start", "0 seconds")

val stream = inputDF1.join(inputDF2,
expr("v1 >= start2 AND v1 < end2 " +
"AND eventTime1 = start2"), "inner")
.groupBy(window($"eventTime1", "5 seconds") as Symbol("window"))
.agg(count("*") as Symbol("count"))
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])

testStream(stream)(
AddData(input1, 1, 2, 3, 4),
AddData(input2, (1, 2), (2, 3), (3, 4), (4, 5)),
CheckNewAnswer(),
assertNumStateRows(Seq(1, 0)),
assertNumRowsDroppedByWatermark(Seq(0, 0))
)
}
}

test("stream-stream time interval left outer join -> aggregation, append mode") {
Expand Down Expand Up @@ -473,8 +490,11 @@ class MultiStatefulOperatorsSuite
.selectExpr("CAST(window.start AS STRING) AS window_start",
"CAST(window.end AS STRING) AS window_end", "cnt")

// for ease of verification, we change the session timezone to UTC
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
// for ease of verification, we change the session timezone to UTC. The expected eviction
// watermark below also assumes the legacy 0-delay boundary, so disable the auto-bump.
withSQLConf(
SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC",
SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key -> "false") {
testStream(agg)(
MultiAddData(
(input1, Seq(
Expand Down Expand Up @@ -733,8 +753,11 @@ class MultiStatefulOperatorsSuite
.selectExpr("CAST(window.start AS STRING) AS window_start",
"CAST(window.end AS STRING) AS window_end", "cnt")

// for ease of verification, we change the session timezone to UTC
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
// for ease of verification, we change the session timezone to UTC. The expected eviction
// watermark below also assumes the legacy 0-delay boundary, so disable the auto-bump.
withSQLConf(
SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC",
SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key -> "false") {
testStream(agg)(
MultiAddData(
(input2, Seq(
Expand Down