Fix OOM on large spooled result sets#590
Fix OOM on large spooled result sets#590gopinathnelluri wants to merge 1 commit intotrinodb:masterfrom
Conversation
|
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
|
@gopinathnelluri please let us know when you have submitted the CLA form. |
Previously, TrinoQuery.fetch() eagerness caused all segments to load into memory at once when using fault-tolerant execution. This led to OOM errors on large datasets. Changes: - Enable lazy loading by returning SegmentIterator directly in fetch(). - Update execute() to handle result rows as iterators instead of requiring lists. - Add unit test to verify lazy fetching implementation.
f178dc3 to
049421b
Compare
|
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
damian3031
left a comment
There was a problem hiding this comment.
Great fix! I've added a few comments regarding style.
@hashhar should tests be rewritten to avoid using mocking library? (DEVELOPMENT.md discourages that). I can help with that if needed.
| if isinstance(self._result.rows, list) and len(self._result.rows) == 0: | ||
| new_rows = self.fetch() | ||
| if isinstance(new_rows, list): | ||
| self._result.rows += new_rows | ||
| else: | ||
| # It's an iterator (spooled segments), replace rows with it | ||
| self._result.rows = new_rows | ||
| # We have an iterator now, so we can return result to user | ||
| break | ||
| else: | ||
| # We have data (list with items or an iterator), so return | ||
| break |
There was a problem hiding this comment.
| if isinstance(self._result.rows, list) and len(self._result.rows) == 0: | |
| new_rows = self.fetch() | |
| if isinstance(new_rows, list): | |
| self._result.rows += new_rows | |
| else: | |
| # It's an iterator (spooled segments), replace rows with it | |
| self._result.rows = new_rows | |
| # We have an iterator now, so we can return result to user | |
| break | |
| else: | |
| # We have data (list with items or an iterator), so return | |
| break | |
| # Stop if we have a non-empty list or an iterator | |
| if not isinstance(self._result.rows, list) or self._result.rows: | |
| break | |
| new_rows = self.fetch() | |
| if isinstance(new_rows, list): | |
| self._result.rows.extend(new_rows) | |
| elif isinstance(new_rows, SegmentIterator): | |
| self._result.rows = new_rows | |
| break | |
| else: | |
| raise TypeError( | |
| f"fetch() returned {type(new_rows).__name__}, expected list or SegmentIterator" | |
| ) | |
This part could be made a bit more readable:
- outer
elsecan be avoided. - explicitly check for rows type. Raise an error if the type is neither list nor SegmentIterator
extendis more idiomatic than += for lists.- comments can be simplified a bit
| """ | ||
| Execute should block until at least one row is received or query is finished or cancelled | ||
|
|
||
| For Standard Execution, rows is a list, we can check len. the first response usually contains no rows (just stats), | ||
| so we need to continue fetching until we get some rows or query is finished or cancelled. | ||
|
|
||
| For Spooled Execution, rows start as empty list and eventually fetch returns the rows as iterator, | ||
| we can't check len of an iterator easily without peeking. | ||
|
|
||
| So, if we get rows as non empty list or iterator, we stop blocking and return it to the caller to consume it. | ||
| """ |
There was a problem hiding this comment.
This docstring is too verbose, it should be a short comment as it was previously, for example:
# Execute should block until the query is finished or cancelled,
# or until at least one row is received (direct protocol),
# or an iterator is received (spooling protocol).|
@findepi Submitted the signed CLA |
Previously, TrinoQuery.fetch() eagerness caused all segments to load into memory. This led to OOM errors on large datasets.
Changes:
Enable lazy loading by returning SegmentIterator directly in fetch().
Update execute() to handle result rows as iterators instead of requiring lists.
Add unit test to verify lazy fetching implementation.
Description
This PR addresses a critical memory issue (OOM) encountered when fetching large result sets with fault-tolerant execution (spooling) enabled.
Previously,
TrinoQuery.fetch()would materialize all spooled segments into a list immediately upon retrieval, even if user code was iterating row-by-row. For large datasets (e.g., 100M+ rows), this caused the client to consume excessive memory and crash.Changes:
TrinoQuery.fetch()to return a SegmentIterator directly instead of materializing it into a list when the fetch mode is standard.TrinoQuery.execute()to handleself._result.rowsas an iterator (usingitertools.chain) instead of assuming it is always a list.Non-technical explanation
Fixed an issue where downloading very large datasets (100 million+ rows) when using fault-tolerant execution would cause the client to run out of memory and crash. The client now streams data efficiently instead of trying to load everything at once.
Release notes
(x) Release notes are required, with the following suggested text: