feat(waterdata): Add async parallel chunker over httpx.AsyncClient#285
Draft
thodson-usgs wants to merge 2 commits into
Draft
feat(waterdata): Add async parallel chunker over httpx.AsyncClient#285thodson-usgs wants to merge 2 commits into
thodson-usgs wants to merge 2 commits into
Conversation
c618ea8 to
06d43aa
Compare
88fd692 to
57ff5bd
Compare
Swap the HTTP client across the package: `requests.Session` → `httpx.Client`, `requests.Response` → `httpx.Response`. Same call patterns; same response shape; only the underlying transport changes. Headers, timeout, and User-Agent defaults are centralized in `dataretrieval.utils.HTTPX_DEFAULTS`. Chunker, paginated-loop helpers, and OGC waterdata fetchers all route through one `httpx.Client` that the chunker publishes on the `_chunked_session` ContextVar, so paginated sub-requests reuse the connection pool across a single chunked call. Tests migrate from `requests_mock` to native `pytest-httpx`. The old `_RequestsMockShim` in `tests/conftest.py` is gone; the new conftest is ~30 lines configuring `pytest-httpx` strict-mode relaxations. Test count: 411 passing (mocked), 2 skipped, ruff clean. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Adds a parallel fan-out path to `multi_value_chunked`. When
`API_USGS_CONCURRENT` resolves to >1 (default: 16), the decorator
runs the sub-requests of an over-budget plan concurrently under
one shared `httpx.AsyncClient`, instead of issuing them serially.
Falls back to the serial sync path (with a one-time UserWarning)
when no async fetch sibling is wired or when an asyncio event
loop is already running (Jupyter, IPython, async apps —
`asyncio.run` would otherwise raise).
Architecture (`dataretrieval/waterdata/chunking.py`):
* `_fan_out_async(plan, fetch_once, fetch_async, *, max_concurrent)`
is the orchestrator. Probe-first: issues sub-request 0 alone via
`_probe_first` so its `x-ratelimit-remaining` header gates the
rest of the plan; if the window can't fit, raises
`RequestExceedsQuota` before the burst goes out. Then
`_fan_out_rest` dispatches indices 1..N-1 concurrently via
`asyncio.gather(return_exceptions=True)`. Completed pairs survive
a sibling's transient failure, so partial state stays recoverable
through `ChunkedCall.resume()` on the sync path.
* Failure precedence in `_fan_out_rest`:
1. Cancellation/interrupt signals (CancelledError,
KeyboardInterrupt, SystemExit) propagate unmodified — never
wrapped as transients. Cancellation is asyncio's abort
signal; rewriting it as ChunkInterrupted would silently
consume the user's stop request.
2. Recognized transients (RateLimited, ServiceUnavailable, bare
httpx.HTTPError) wrap as ChunkInterrupted so the user gets
a resumable handle even when a non-transient bug landed
earlier in submission order.
3. Otherwise raise the first failure in submission order,
preserving its type.
* `_execute_in_parallel` owns the sync→async bridge:
`asyncio.run` dispatch with the `fetch_async is None` and
running-event-loop fallbacks (one-time UserWarning + serial).
* `_publish_async_session` / `get_active_async_session` /
`_chunked_async_session` ContextVar let async paginated-loop
helpers (`_walk_pages_async`, `_paginate_async`) reuse one
`AsyncClient` connection pool across every concurrent
sub-request.
Wiring (`dataretrieval/waterdata/utils.py`):
* `_walk_pages_async`, `_paginate_async`, `_async_session`,
`_fetch_once_async` — async siblings of the sync paginate path.
* The `@chunking.multi_value_chunked(fetch_async=_fetch_once_async)`
decorator on `_fetch_once` wires the async sibling so the
parallel path is available to every Water Data OGC getter.
* `ChunkedCall._chunks` (already a dict in the prior commit)
now actually sees sparse keys when the parallel fan-out leaves
scattered completions; comments updated to document that.
Concurrency cap (`API_USGS_CONCURRENT`):
* Integer N >= 1: bounded fan-out (semaphore-gated, N=1 forces
serial sync). Default 16 — the server-friendly sweet spot.
* `unbounded`: no per-call cap (`Semaphore(sys.maxsize)`).
* Unset: default 16.
Test scaffolding: `tests/conftest.py` adds the `_serial_chunker`
autouse fixture that pins `API_USGS_CONCURRENT=1` so the historical
mocked suite stays on the deterministic serial path; the new async
tests opt back in by re-setting the env var inside their body.
Tests: 6 async-path tests in `tests/waterdata_chunking_test.py`
(one-call-per-sub-request, probe quota check, mid-fan-out
transient yields resumable ChunkInterrupted, fallback-to-serial
parametrized over running-loop and missing-fetch_async,
cancellation-wins-over-transient-sibling regression).
`tests/waterdata_progress_test.py` adds a progress-reporter
integration test for `_fan_out_async` and a `_paginate_async`
test. `tests/waterdata_utils_test.py` adds a `_walk_pages_async`
initial-parse-error test.
Test count: 420 passing (mocked), 2 skipped, ruff clean.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
57ff5bd to
6d1d5db
Compare
4 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Important
Stacked on top of #289 (httpx upgrade). Merge #289 first; this branch then rebases onto
mainand the visible diff collapses to just the async commit. Until that happens, this PR's diff includes #289's contents — review the single async commit (head of branch) to see only what's new.Adds a parallel fan-out path to
multi_value_chunked. WhenAPI_USGS_CONCURRENTresolves to >1 (default 16), the decorator runs the sub-requests of an over-budget plan concurrently under one sharedhttpx.AsyncClientinstead of issuing them serially. Falls back to the serial sync path (with a one-timeUserWarning) when no async fetch sibling is wired or when an asyncio event loop is already running (Jupyter / IPython kernels, async apps —asyncio.runwould otherwise raise).Benchmarked at 5.84× speedup vs
mainon a 19,602-site / 6-stateget_dailycall (parallel @API_USGS_CONCURRENT=16: 4.91s; sync: 28.65s; distinct date windows per side so the USGS cache can't bias either run).API_USGS_CONCURRENT_CONCURRENCY_DEFAULT)≥ 21ChunkedCall.resume()path)unbounded0, negative, malformedValueErrorat call timeConnection-pool sharing across all sub-requests of a single chunked call via the
_chunked_session(sync) /_chunked_async_session(async)ContextVars —_walk_pages/_walk_pages_asyncread them as fallbacks before opening a fresh client.Architecture
_fan_out_async(plan, fetch_once, fetch_async, *, max_concurrent)is the orchestrator. Probe-first: issues sub-request 0 alone via_probe_firstso itsx-ratelimit-remainingheader gates the rest of the plan; if the window can't fit, raisesRequestExceedsQuotabefore the burst goes out. Then_fan_out_restdispatches indices 1..N-1 concurrently viaasyncio.gather(return_exceptions=True). Completed pairs survive a sibling's transient failure, so partial state stays recoverable throughChunkedCall.resume()on the sync path.Failure precedence in
_fan_out_restCancelledError,KeyboardInterrupt,SystemExit) propagate unmodified — never wrapped as transients. Cancellation is asyncio's abort signal; rewriting it asChunkInterruptedwould silently consume the user's stop request.RateLimited,ServiceUnavailable, barehttpx.HTTPError) wrap asChunkInterruptedso the user gets a resumable handle even when a non-transient bug landed earlier in submission order.Sync ↔ async bridge
_execute_in_parallelowns theasyncio.rundispatch with two recoverable-misconfig fallbacks (each emits a one-timeUserWarning, then runs serial):asyncio.run()raises inside an already-running loop. The bridge callsasyncio.get_running_loop()first and, when one is active, falls back to the serial path so Jupyter / IPython users don't see a confusingRuntimeError.fetch_asyncwarning. IfAPI_USGS_CONCURRENTrequests parallel but the decorator wasn't wired withfetch_async=, the wrapper warns + runs serial rather than silently no-op'ing the env var.Test plan
tests/waterdata_chunking_test.py:RequestExceedsQuotaChunkInterruptedwhose.call.resume()re-issues only the unfinished indices via the sync pathrunning-loopandmissing-fetch_asynctests/waterdata_progress_test.py(reporter calls from_paginate_asyncand_fan_out_async)._walk_pages_asyncinitial-parse-error test intests/waterdata_utils_test.py.ruff checkandruff format --checkpass.Out of scope (follow-ups)
_fan_out_asyncmaterializes all(df, response)pairs before combining. Consider streaming-combine viaasyncio.as_completedif users push concurrency very high.asyncio.TaskGroup(3.11+): would replacegatheronce the 3.9 floor is dropped, though the partial-completion contract fightsTaskGroup's cancel-on-first-failure default — the gather form may stay the right shape regardless.NEWS.mdentry — left for the merger to draft.🤖 Generated with Claude Code