Add mrd pool cache#846
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #846 +/- ##
==========================================
+ Coverage 88.52% 88.83% +0.31%
==========================================
Files 15 15
Lines 2989 3126 +137
==========================================
+ Hits 2646 2777 +131
- Misses 343 349 +6 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| callback = callback or NoOpCallback() | ||
|
|
||
| mrd = None | ||
| mrd_pool = await self._mrd_pool_cache.get(bucket, key, generation, pool_size=1) |
There was a problem hiding this comment.
With a pool size of 1, the pool cache initializes a new pool every time, resulting in no performance improvements and added overhead.
Do you have any measurements showing the improvements made?
There was a problem hiding this comment.
+1 to this
@googlyrahman @Yonghui-Lee do we have any macro benchmark results which confirm not just this change but overall MRD pooling is significantly improving performance ? it ll be good to add in description
| from enum import Enum | ||
| from glob import has_magic | ||
|
|
||
| import fsspec |
There was a problem hiding this comment.
can we make this targetted import ?
There was a problem hiding this comment.
It's for fsspec.FSTimeoutError. We use the same import in the core.py.
06a2250 to
43840a1
Compare
| raise RuntimeError("ExtendedGcsFileSystem has been garbage collected.") | ||
|
|
||
| if generation is None: | ||
| info = await fs._info(f"{bucket_name}/{object_name}") |
There was a problem hiding this comment.
Will this extra round-trip negatively impact performance?
|
|
||
| if generation is None: | ||
| info = await fs._info(f"{bucket_name}/{object_name}") | ||
| generation = info.get("generation") |
There was a problem hiding this comment.
Does calling fs._info() on an unfinalized object may return a previous finalized generation (causing the read stream to connect to stale data) or fail entirely if no finalized version exists?
| threads: [32] | ||
|
|
||
| - name: "read_repeatedly_open_same_file_fixed_duration" | ||
| pattern: "repeatedly_open_same_file" |
There was a problem hiding this comment.
Would reopen be cleaner?

Summary
This pull request introduces a filesystem-level cache of MRD pools,
MRDPoolCache.Previously, opening a
ZonalFileor invoking range operations (like_cat_rangesorread_into_memory_chunked) initialized a newMRDPool(and its constituent MRDs) from scratch. This added significant overhead due to connection setups, handshakes, and stream initialization.With this change,
MRDPoolCachemanagesMRDPoollifecycle at the filesystem level. It implements an LRU eviction cache for idle pools while using reference counting to protect active pools from eviction. Idle pools hold their underlying gRPC downloaders and streams in a shared queue, enabling reads of the same object generation to reuse existing gRPC connections instantly.Key Changes
1. Core Cache Logic
MRDPoolCacheClass:(bucket, object, generation)._incref,_decref) to track active pools in use.OrderedDictfor LRU eviction of idle pools once themax_idle_poolslimit is exceeded.MRDPoolEnhancements:cachereference._get_or_create_mrd()to query the cache for an idle MRD before falling back to spawning a new one on-demand.close(), instead of forcefully closing all MRDs, it releases them back to the cache queue (cache.release()) if a cache is attached.2. Filesystem-level Management
ExtendedGcsFileSysteminstantiates a single_mrd_pool_cacheon construction with customizable size viamrd_pool_cache_size._finalize_mrd_pool_cacheweakref finalizers to gracefully close all cached streams on GC, handling various event loop scenarios safely._cat_ranges,read_into_memory_chunked, and_get_fileto fetchMRDPoolinstances viaself._mrd_pool_cache.get(...)rather than constructing rawMRDPoolinstances.3. Zonal File Integration
ZonalFilewith the new cache, retrieving the pool synchronously via on initialization.mrd_pool.initializecall during init, since cache fetching automatically handles the initialization transparently.4. Test
test_zb_hns_utils.py:MRDPoolCachebehaviorsconftest.py:_close_gcs()/_close_gcs_async()to explicitly release gRPC and cache resources in sync/async fixtures, preventing cross-test resource leakage.