Skip to content

Conversation

@vinodkc
Copy link
Contributor

@vinodkc vinodkc commented Feb 9, 2026

What changes were proposed in this pull request?

In _SimpleStreamReaderWrapper.latestOffset(), validate that custom implementation of datasource based on SimpleDataSourceStreamReader.read() does not return a non-empty batch with end == start. If it does, raise PySparkException with error class SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE before appending to the cache. Empty batches with end == start remain allowed.

Why are the changes needed?

When a user implements read(start) incorrectly and returns:

  • Same offset for both: end = start (e.g. both {"offset": 0}).
  • Non-empty iterator: e.g. 2 rows.

If a reader returns end == start with data (e.g. return (it, {"offset": start_idx})), the wrapper keeps appending to its prefetch cache on every trigger while commit(end) never trims entries (first matching index is 0). The cache grows without bound and driver (non-JVM) memory increases until OOM. Validating and raising error before appending stops this and fails fast with a clear error.

Empty batches with end == start remain allowed , it will allow the Python data source to represent that there is no data to read.

Does this PR introduce any user-facing change?

Yes. Implementations that return end == start with a non-empty iterator now get PySparkException instead of unbounded memory growth. Empty batches with end == start are unchanged.

How was this patch tested?

Added unit test in test_python_streaming_datasource.py

Was this patch authored or co-authored using generative AI tooling?

No.

@vinodkc vinodkc changed the title [SPARK-55416][PYTHON][SS]Streaming Python Data Source memory leak when end-offset is not updated [SPARK-55416][PYTHON][SS] Streaming Python Data Source memory leak when end-offset is not updated Feb 9, 2026
Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good in overall, only minor and nits. Nice finding!

@vinodkc vinodkc changed the title [SPARK-55416][PYTHON][SS] Streaming Python Data Source memory leak when end-offset is not updated [SPARK-55416][SS][PYTHON] Streaming Python Data Source memory leak when end-offset is not updated Feb 12, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants