Skip to content

Add mrd pool cache#846

Open
Yonghui-Lee wants to merge 8 commits into
fsspec:mainfrom
Yonghui-Lee:shared-mrd-design
Open

Add mrd pool cache#846
Yonghui-Lee wants to merge 8 commits into
fsspec:mainfrom
Yonghui-Lee:shared-mrd-design

Conversation

@Yonghui-Lee
Copy link
Copy Markdown
Collaborator

Summary

This pull request introduces a filesystem-level cache of MRD pools, MRDPoolCache.

Previously, opening a ZonalFile or invoking range operations (like _cat_ranges or read_into_memory_chunked) initialized a new MRDPool (and its constituent MRDs) from scratch. This added significant overhead due to connection setups, handshakes, and stream initialization.

With this change, MRDPoolCache manages MRDPool lifecycle at the filesystem level. It implements an LRU eviction cache for idle pools while using reference counting to protect active pools from eviction. Idle pools hold their underlying gRPC downloaders and streams in a shared queue, enabling reads of the same object generation to reuse existing gRPC connections instantly.

Key Changes

1. Core Cache Logic

  • MRDPoolCache Class:
    • Caches MRD pools keyed by (bucket, object, generation).
    • Uses reference counting (_incref, _decref) to track active pools in use.
    • Leverages an OrderedDict for LRU eviction of idle pools once the max_idle_pools limit is exceeded.
    • Safely handles pool initialization failures by cleaning up partially initialized downloaders and purging the cache key unconditionally.
  • MRDPool Enhancements:
    • Now accepts an optional cache reference.
    • Introduces _get_or_create_mrd() to query the cache for an idle MRD before falling back to spawning a new one on-demand.
    • On close(), instead of forcefully closing all MRDs, it releases them back to the cache queue (cache.release()) if a cache is attached.

2. Filesystem-level Management

  • Lifecycle & Finalization:
    • ExtendedGcsFileSystem instantiates a single _mrd_pool_cache on construction with customizable size via mrd_pool_cache_size.
    • Registers _finalize_mrd_pool_cache weakref finalizers to gracefully close all cached streams on GC, handling various event loop scenarios safely.
  • Resource Sharing in Readers:
    • Updated _cat_ranges, read_into_memory_chunked, and _get_file to fetch MRDPool instances via self._mrd_pool_cache.get(...) rather than constructing raw MRDPool instances.

3. Zonal File Integration

  • Integrates ZonalFile with the new cache, retrieving the pool synchronously via on initialization.
  • Removes explicit mrd_pool.initialize call during init, since cache fetching automatically handles the initialization transparently.

4. Test

  • test_zb_hns_utils.py:
    • Comprehensive unit tests verifying MRDPoolCache behaviors
  • conftest.py:
    • Introduced _close_gcs() / _close_gcs_async() to explicitly release gRPC and cache resources in sync/async fixtures, preventing cross-test resource leakage.

@codecov
Copy link
Copy Markdown

codecov Bot commented May 14, 2026

Codecov Report

❌ Patch coverage is 99.43182% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 88.83%. Comparing base (991faba) to head (f02ea16).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
gcsfs/extended_gcsfs.py 98.41% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #846      +/-   ##
==========================================
+ Coverage   88.52%   88.83%   +0.31%     
==========================================
  Files          15       15              
  Lines        2989     3126     +137     
==========================================
+ Hits         2646     2777     +131     
- Misses        343      349       +6     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment thread gcsfs/zb_hns_utils.py Outdated
Comment thread gcsfs/extended_gcsfs.py
Comment thread gcsfs/extended_gcsfs.py Outdated
Comment thread gcsfs/zb_hns_utils.py Outdated
Comment thread gcsfs/extended_gcsfs.py
callback = callback or NoOpCallback()

mrd = None
mrd_pool = await self._mrd_pool_cache.get(bucket, key, generation, pool_size=1)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With a pool size of 1, the pool cache initializes a new pool every time, resulting in no performance improvements and added overhead.

Do you have any measurements showing the improvements made?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this
@googlyrahman @Yonghui-Lee do we have any macro benchmark results which confirm not just this change but overall MRD pooling is significantly improving performance ? it ll be good to add in description

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a micro benchmark to show the improvement of read throughput. For the current tessellation macro benchmark, I think pool cache may not improve it because we don't open the same file multiple times in the same process.

image

Comment thread gcsfs/extended_gcsfs.py
from enum import Enum
from glob import has_magic

import fsspec
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this targetted import ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for fsspec.FSTimeoutError. We use the same import in the core.py.

Comment thread gcsfs/zb_hns_utils.py
raise RuntimeError("ExtendedGcsFileSystem has been garbage collected.")

if generation is None:
info = await fs._info(f"{bucket_name}/{object_name}")
Copy link
Copy Markdown
Collaborator

@zhixiangli zhixiangli May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this extra round-trip negatively impact performance?

Comment thread gcsfs/zb_hns_utils.py

if generation is None:
info = await fs._info(f"{bucket_name}/{object_name}")
generation = info.get("generation")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does calling fs._info() on an unfinalized object may return a previous finalized generation (causing the read stream to connect to stale data) or fail entirely if no finalized version exists?

threads: [32]

- name: "read_repeatedly_open_same_file_fixed_duration"
pattern: "repeatedly_open_same_file"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would reopen be cleaner?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants