-
Notifications
You must be signed in to change notification settings - Fork 29.1k
[SPARK-55416][SS][PYTHON] Streaming Python Data Source memory leak when end-offset is not updated #54237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
vinodkc
wants to merge
7
commits into
apache:master
Choose a base branch
from
vinodkc:br_SPARK-55416
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+92
−2
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
HeartSaVioR
reviewed
Feb 10, 2026
Contributor
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good in overall, only minor and nits. Nice finding!
81b2fd3 to
3c0b205
Compare
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
In
_SimpleStreamReaderWrapper.latestOffset(), validate that custom implementation of datasource based onSimpleDataSourceStreamReader.read()does not return a non-empty batch with end == start. If it does, raise PySparkException with error classSIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCEbefore appending to the cache. Empty batches with end == start remain allowed.Why are the changes needed?
When a user implements read(start) incorrectly and returns:
If a reader returns end == start with data (e.g. return (it, {"offset": start_idx})), the wrapper keeps appending to its prefetch cache on every trigger while commit(end) never trims entries (first matching index is 0). The cache grows without bound and driver (non-JVM) memory increases until OOM. Validating and raising error before appending stops this and fails fast with a clear error.
Empty batches with end == start remain allowed , it will allow the Python data source to represent that there is no data to read.
Does this PR introduce any user-facing change?
Yes. Implementations that return end == start with a non-empty iterator now get PySparkException instead of unbounded memory growth. Empty batches with end == start are unchanged.
How was this patch tested?
Added unit test in
test_python_streaming_datasource.pyWas this patch authored or co-authored using generative AI tooling?
No.