[SPARK-56968][SS] Force offset log VERSION_2 when streaming source evolution is enabled#56015
Open
ericm-db wants to merge 7 commits into
Open
[SPARK-56968][SS] Force offset log VERSION_2 when streaming source evolution is enabled#56015ericm-db wants to merge 7 commits into
ericm-db wants to merge 7 commits into
Conversation
…olution is enabled
…or offset log version selection
…bump existing V1 offset log on restart
…isting V1 checkpoint
…nnect client test helper The streaming source evolution flag now forces offset log VERSION_2 automatically, so tests that enabled source evolution alongside an explicit offsetLog.formatVersion=2 no longer need to set both.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
When the streaming source evolution flag (
spark.sql.streaming.queryEvolution.enableSourceEvolution) is set totrue, force the offset log format toVERSION_2for new streaming queries.In
MicroBatchExecution.initializeExecution, the offset log format version selection now takesmax(STREAMING_OFFSET_LOG_FORMAT_VERSION, minRequiredVersion), whereminRequiredVersionisVERSION_2when source evolution is enabled andVERSION_1otherwise. Existing queries continue to use whatever version is already written in their offset log (read fromlatestStartedBatch), so this only affects new queries.The
testWithSourceEvolutionhelper inStreamingSourceEvolutionSuitewas updated to no longer set the offset log version explicitly, since it is now selected automatically.Why are the changes needed?
Streaming source evolution relies on the
OffsetMap(sourceId -> offset) format, which is only available in offset logVERSION_2. Previously, users had to remember to setspark.sql.streaming.offsetLog.formatVersion=2alongside enabling source evolution; otherwise the format would default toVERSION_1(sequence-based) and the named-source tracking required by source evolution would not function properly. Coupling the two configs eliminates a footgun.Does this PR introduce any user-facing change?
No.
The change only affects new streaming queries that explicitly enable the internal
spark.sql.streaming.queryEvolution.enableSourceEvolutionflag. For such queries, the offset log will now useVERSION_2automatically. Users who manually set the offset log version remain in control: the final version ismax(configuredVersion, minRequiredVersion), so a user-configuredVERSION_2keeps working unchanged.How was this patch tested?
offset log uses VERSION_2 when source evolution is enabledtest inStreamingSourceEvolutionSuite.StreamingSourceEvolutionSuitetests pass after dropping the explicit offset log version fromtestWithSourceEvolution(19/19).OffsetSeqLogSuitecontinues to pass (19/19).Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-7)