Skip to content
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -6935,6 +6935,11 @@
"Streaming query evolution error:"
],
"subClass" : {
"CANNOT_ENABLE_ON_EXISTING_CHECKPOINT" : {
"message" : [
"Cannot enable streaming source evolution on a checkpoint that was created without it. The existing checkpoint uses offset log format version <existingVersion>, which does not support the named source tracking required by streaming source evolution. To use source evolution, start the query with a fresh checkpoint."
]
},
"DUPLICATE_SOURCE_NAMES" : {
"message" : [
"Duplicate streaming source names detected: <names>. Each streaming source must have a unique name."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2536,6 +2536,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
messageParameters = Map("names" -> duplicateNames.mkString(", ")))
}

def cannotEnableSourceEvolutionOnExistingCheckpointError(existingVersion: Int): Throwable = {
new AnalysisException(
errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.CANNOT_ENABLE_ON_EXISTING_CHECKPOINT",
messageParameters = Map("existingVersion" -> existingVersion.toString))
}

def columnNotFoundInExistingColumnsError(
columnType: String, columnName: String, validColumnNames: Seq[String]): Throwable = {
new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ class ClientStreamingQuerySuite extends QueryTest with RemoteSparkSession with L
"streaming")

/**
* Helper method to run tests with source evolution configs enabled.
* Helper method to run tests with source evolution enabled. Enabling source evolution
* automatically forces offset log format V2 (OffsetMap) for new queries, since named sources
* require it.
*/
private def testWithSourceEvolution(testName: String)(testFun: => Unit): Unit = {
test(testName) {
withSQLConf(
"spark.sql.streaming.queryEvolution.enableSourceEvolution" -> "true",
"spark.sql.streaming.offsetLog.formatVersion" -> "2") {
withSQLConf("spark.sql.streaming.queryEvolution.enableSourceEvolution" -> "true") {
testFun
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming.checkpointing

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf

/**
* Case class for managing the internal versioning for the streaming checkpoint. Versions are
* tracked per component (currently just the offset log; other components are managed elsewhere via
* dedicated configs).
*/
case class StreamingCheckpointVersion(offsetLogVersion: Int) {
override def toString: String = {
s"StreamingCheckpointVersion(offsetLogVersion: $offsetLogVersion)"
}
}

sealed trait CheckpointLogType
case object OffsetLogType extends CheckpointLogType

/**
* The `CheckpointVersionManager` is responsible for managing the versioning of the streaming
* checkpoint. It determines which version of each system-managed log format to use when starting
* a streaming query, and validates that the requested feature set is compatible with the existing
* checkpoint when restarting.
*
* Writer versions are typically used only while starting a new streaming query and are not
* intended to be exposed directly to users; once set, they are not intended to change for the
* lifetime of the query.
*/
object CheckpointVersionManager extends Logging {

// Streaming checkpoint writer version 1: base version supporting DataFrame-based streaming
// queries across the standard trigger types.
private val CHECKPOINT_VERSION_V1 = StreamingCheckpointVersion(OffsetSeqLog.VERSION_1)

// The current version of the streaming checkpoint. To bump this, define a new
// `StreamingCheckpointVersion` instance with the new per-component version numbers and update
// this constant.
private val CURRENT_VERSION = CHECKPOINT_VERSION_V1

def getCurrentVersion(): StreamingCheckpointVersion = CURRENT_VERSION

/**
* Returns the offset log format version to use for a new streaming query. We take the max of:
* - the current default version
* - the minimum required version implied by enabled features (e.g. streaming source evolution
* requires [[OffsetSeqLog.VERSION_2]] for OffsetMap-based named source tracking)
* - the configured version (via [[SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION]])
*
* @param sparkSessionForStream the cloned `SparkSession` for the streaming query
*/
private def getOffsetLogVersion(sparkSessionForStream: SparkSession): Int = {
val currentDefaultVersion = getCurrentVersion().offsetLogVersion
val minRequiredVersion = getMinRequiredOffsetLogVersion(sparkSessionForStream)
val configuredVersion = sparkSessionForStream.sessionState.conf.streamingOffsetLogFormatVersion
val result = List[Int](currentDefaultVersion, minRequiredVersion, configuredVersion).max
logInfo(s"Retrieved offset log writer version=$result")
result
}

/**
* Minimum offset log format version required by the features enabled on this session. Streaming
* source evolution relies on the OffsetMap (sourceId -> offset) format, which is only available
* in [[OffsetSeqLog.VERSION_2]].
*/
private def getMinRequiredOffsetLogVersion(sparkSessionForStream: SparkSession): Int = {
if (sparkSessionForStream.sessionState.conf.enableStreamingSourceEvolution) {
OffsetSeqLog.VERSION_2
} else {
OffsetSeqLog.VERSION_1
}
}

/**
* Set the SparkSession configurations for the offset log format version.
*/
private def setSparkSessionConfigsForOffsetLog(
sparkSessionForStream: SparkSession,
offsetLogFormatVersion: Int): Unit = {
sparkSessionForStream.conf.set(
SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key, offsetLogFormatVersion)
}

/**
* Returns the format version for the given log type. Reads any feature-driven minimums from the
* `sparkSessionForStream` config, which must be initialized before calling.
*/
def getFormatVersionFromSession(
sparkSessionForStream: SparkSession,
logType: CheckpointLogType): Int = {
logType match {
case OffsetLogType => getOffsetLogVersion(sparkSessionForStream)
}
}

/**
* Determines the offset log format version for this query run. For existing queries, reads from
* the last written offset log entry. For new queries, delegates to the session config (honoring
* any feature-driven minimums).
*
* Also validates that the session config is compatible with the existing checkpoint. Currently,
* enabling streaming source evolution on a checkpoint whose offset log is below VERSION_2 is
* rejected, since the OffsetMap-based named source tracking required by source evolution is not
* available in earlier versions.
*/
def resolveOffsetLogVersion(
sparkSessionForStream: SparkSession,
latestStartedBatch: Option[(Long, OffsetSeqBase)]): Int = {
latestStartedBatch match {
case Some((_, offsetSeq)) =>
val existingVersion = offsetSeq.version
if (existingVersion < OffsetSeqLog.VERSION_2 &&
sparkSessionForStream.sessionState.conf.enableStreamingSourceEvolution) {
throw QueryCompilationErrors.cannotEnableSourceEvolutionOnExistingCheckpointError(
existingVersion)
}
existingVersion
case None =>
getFormatVersionFromSession(sparkSessionForStream, OffsetLogType)
}
}

def setFormatVersion(
sparkSessionForStream: SparkSession,
logType: CheckpointLogType,
version: Int): Unit = {
logType match {
case OffsetLogType =>
setSparkSessionConfigsForOffsetLog(sparkSessionForStream, version)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, RealTimeStreamScanExec, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeModeAllowlist, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper}
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2}
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CheckpointVersionManager, CommitMetadata, OffsetLogType, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2}
import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, StateStoreWriter}
import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE}
import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchSink, WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1}
Expand Down Expand Up @@ -470,20 +470,21 @@ class MicroBatchExecution(
sourceIdMap
)

// Read the offset log format version from the last written offset log entry. If no entries
// are found, use the set/default value from the config.
val offsetLogFormatVersion = if (latestStartedBatch.isDefined) {
latestStartedBatch.get._2.version
} else {
// For existing queries, the offset log format version is read from the last written offset log
// entry; for new queries, it is resolved from the session config (honoring any feature-driven
// minimums). Restarting with an incompatible feature set on an existing checkpoint is rejected
// inside the manager.
if (latestStartedBatch.isEmpty) {
// If no offset log entries are found, assert that the query does not have any committed
// batches to be extra safe.
assert(lastCommittedBatchId == -1L)
sparkSessionForStream.conf.get(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION)
}
val offsetLogFormatVersion = CheckpointVersionManager.resolveOffsetLogVersion(
sparkSessionForStream, latestStartedBatch)

// Set the offset log format version in the sparkSessionForStream conf
sparkSessionForStream.conf.set(
SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key, offsetLogFormatVersion)
// Persist the resolved offset log format version on the streaming session.
CheckpointVersionManager.setFormatVersion(
sparkSessionForStream, OffsetLogType, offsetLogFormatVersion)

val execCtx = new MicroBatchExecutionContext(id, runId, name, triggerClock, sources, sink,
progressReporter, -1, sparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import java.io.File

import org.scalatest.Tag

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.execution.streaming.checkpointing.{OffsetMap, OffsetSeq, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata}
import org.apache.spark.sql.execution.streaming.runtime.{LongOffset, MemoryStream, SerializedOffset}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQueryException
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -368,6 +370,51 @@ class OffsetSeqLogSuite extends SharedSparkSession {
}
}

test("enabling source evolution on an existing V1 checkpoint is rejected") {
withTempDir { checkpointDir =>
withTempDir { outputDir =>
val inputData = MemoryStream[Int]

// Start query without source evolution, writing V1 offset log entries.
val query1 = inputData.toDF()
.writeStream
.format("parquet")
.option("path", outputDir.getAbsolutePath)
.option("checkpointLocation", checkpointDir.getAbsolutePath)
.start()
inputData.addData(1, 2)
query1.processAllAvailable()
query1.stop()

val offsetLog = new OffsetSeqLog(spark, s"${checkpointDir.getAbsolutePath}/offsets")
val initialBatch = offsetLog.getLatest()
assert(initialBatch.isDefined)
assert(initialBatch.get._2.version === 1)
assert(initialBatch.get._2.isInstanceOf[OffsetSeq])

// Restart with the source evolution session flag enabled. The existing V1 checkpoint does
// not support OffsetMap-based named source tracking, so the query must fail loudly rather
// than silently downgrading the user's session config.
withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true") {
val query2 = inputData.toDF()
.writeStream
.format("parquet")
.option("path", outputDir.getAbsolutePath)
.option("checkpointLocation", checkpointDir.getAbsolutePath)
.start()
val ex = intercept[StreamingQueryException] {
inputData.addData(3, 4)
query2.processAllAvailable()
}
checkError(
exception = ex.cause.asInstanceOf[AnalysisException],
condition = "STREAMING_QUERY_EVOLUTION_ERROR.CANNOT_ENABLE_ON_EXISTING_CHECKPOINT",
parameters = Map("existingVersion" -> "1"))
}
}
}
}

test("SPARK-55131: offset log records defaults to merge operator version 2") {
val offsetSeqMetadata = OffsetSeqMetadata.apply(batchWatermarkMs = 0, batchTimestampMs = 0,
spark.conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.mockito.Mockito._
import org.scalatest.Tag

import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming.checkpointing.{OffsetMap, OffsetSeqLog}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.streaming.Trigger._
Expand Down Expand Up @@ -211,6 +212,33 @@ class StreamingSourceEvolutionSuite extends StreamTest {
// Metadata Path Tests
// =======================

testWithSourceEvolution("offset log uses VERSION_2 when source evolution is enabled") {
LastOptions.clear()

val checkpointLocation = new Path(newMetadataDir)

val df1 = spark.readStream
.format("org.apache.spark.sql.streaming.test")
.name("source1")
.load()

val q = df1.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", checkpointLocation.toString)
.trigger(ProcessingTime(10.seconds))
.start()
q.processAllAvailable()
q.stop()

val offsetLog = new OffsetSeqLog(spark, s"$checkpointLocation/offsets")
val latestBatch = offsetLog.getLatest()
assert(latestBatch.isDefined, "Offset log should have at least one entry")
val (_, offsetSeq) = latestBatch.get
assert(offsetSeq.isInstanceOf[OffsetMap],
s"Expected OffsetMap but got ${offsetSeq.getClass.getSimpleName}")
assert(offsetSeq.version === 2, s"Expected version 2 but got ${offsetSeq.version}")
}

testWithSourceEvolution("named sources - metadata path uses source name") {
LastOptions.clear()

Expand Down Expand Up @@ -506,13 +534,12 @@ class StreamingSourceEvolutionSuite extends StreamTest {

/**
* Helper method to run tests with source evolution enabled.
* Sets offset log format to V2 (OffsetMap) since named sources require it.
* Enabling source evolution automatically forces offset log format V2 (OffsetMap) for new
* queries, since named sources require it.
*/
def testWithSourceEvolution(testName: String, testTags: Tag*)(testBody: => Any): Unit = {
test(testName, testTags: _*) {
withSQLConf(
SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true",
SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") {
withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true") {
Comment thread
ericm-db marked this conversation as resolved.
testBody
}
}
Expand Down