Skip to content

feat(waterdata): Add async parallel chunker over httpx.AsyncClient#285

Draft
thodson-usgs wants to merge 2 commits into
DOI-USGS:mainfrom
thodson-usgs:httpx-migration
Draft

feat(waterdata): Add async parallel chunker over httpx.AsyncClient#285
thodson-usgs wants to merge 2 commits into
DOI-USGS:mainfrom
thodson-usgs:httpx-migration

Conversation

@thodson-usgs
Copy link
Copy Markdown
Collaborator

@thodson-usgs thodson-usgs commented May 21, 2026

Summary

Important

Stacked on top of #289 (httpx upgrade). Merge #289 first; this branch then rebases onto main and the visible diff collapses to just the async commit. Until that happens, this PR's diff includes #289's contents — review the single async commit (head of branch) to see only what's new.

Adds a parallel fan-out path to multi_value_chunked. When API_USGS_CONCURRENT resolves to >1 (default 16), the decorator runs the sub-requests of an over-budget plan concurrently under one shared httpx.AsyncClient instead of issuing them serially. Falls back to the serial sync path (with a one-time UserWarning) when no async fetch sibling is wired or when an asyncio event loop is already running (Jupyter / IPython kernels, async apps — asyncio.run would otherwise raise).

Benchmarked at 5.84× speedup vs main on a 19,602-site / 6-state get_daily call (parallel @ API_USGS_CONCURRENT=16: 4.91s; sync: 28.65s; distinct date windows per side so the USGS cache can't bias either run).

API_USGS_CONCURRENT

Value Behavior
unset / blank parallel, cap = 16 (_CONCURRENCY_DEFAULT)
≥ 2 parallel, semaphore-capped at that value
1 serial (sync ChunkedCall.resume() path)
unbounded parallel, no per-call cap — caller owns the burst risk
0, negative, malformed ValueError at call time

Connection-pool sharing across all sub-requests of a single chunked call via the _chunked_session (sync) / _chunked_async_session (async) ContextVars — _walk_pages / _walk_pages_async read them as fallbacks before opening a fresh client.

Architecture

_fan_out_async(plan, fetch_once, fetch_async, *, max_concurrent) is the orchestrator. Probe-first: issues sub-request 0 alone via _probe_first so its x-ratelimit-remaining header gates the rest of the plan; if the window can't fit, raises RequestExceedsQuota before the burst goes out. Then _fan_out_rest dispatches indices 1..N-1 concurrently via asyncio.gather(return_exceptions=True). Completed pairs survive a sibling's transient failure, so partial state stays recoverable through ChunkedCall.resume() on the sync path.

Failure precedence in _fan_out_rest

  1. Cancellation / interrupt signals (CancelledError, KeyboardInterrupt, SystemExit) propagate unmodified — never wrapped as transients. Cancellation is asyncio's abort signal; rewriting it as ChunkInterrupted would silently consume the user's stop request.
  2. Recognized transients (RateLimited, ServiceUnavailable, bare httpx.HTTPError) wrap as ChunkInterrupted so the user gets a resumable handle even when a non-transient bug landed earlier in submission order.
  3. Otherwise raise the first failure in submission order, preserving its type.

Sync ↔ async bridge

_execute_in_parallel owns the asyncio.run dispatch with two recoverable-misconfig fallbacks (each emits a one-time UserWarning, then runs serial):

  • Event-loop detection. asyncio.run() raises inside an already-running loop. The bridge calls asyncio.get_running_loop() first and, when one is active, falls back to the serial path so Jupyter / IPython users don't see a confusing RuntimeError.
  • Missing-fetch_async warning. If API_USGS_CONCURRENT requests parallel but the decorator wasn't wired with fetch_async=, the wrapper warns + runs serial rather than silently no-op'ing the env var.

Test plan

  • 420 mocked tests pass (411 in the base httpx PR + 9 new async-path tests here).
  • Async-path coverage in tests/waterdata_chunking_test.py:
    • one-call-per-sub-request happy path
    • probe-first RequestExceedsQuota
    • mid-fan-out transient yields resumable ChunkInterrupted whose .call.resume() re-issues only the unfinished indices via the sync path
    • fallback-to-serial parametrized over running-loop and missing-fetch_async
    • CancelledError-wins-over-transient-sibling regression test (asyncio's cancellation contract takes precedence over the chunker's retry semantics)
  • Async-progress integration tests in tests/waterdata_progress_test.py (reporter calls from _paginate_async and _fan_out_async).
  • _walk_pages_async initial-parse-error test in tests/waterdata_utils_test.py.
  • ruff check and ruff format --check pass.
  • Live-API CI sweep.

Out of scope (follow-ups)

  • High-concurrency memory: _fan_out_async materializes all (df, response) pairs before combining. Consider streaming-combine via asyncio.as_completed if users push concurrency very high.
  • asyncio.TaskGroup (3.11+): would replace gather once the 3.9 floor is dropped, though the partial-completion contract fights TaskGroup's cancel-on-first-failure default — the gather form may stay the right shape regardless.
  • NEWS.md entry — left for the merger to draft.

🤖 Generated with Claude Code

thodson-usgs and others added 2 commits May 25, 2026 10:37
Swap the HTTP client across the package: `requests.Session` →
`httpx.Client`, `requests.Response` → `httpx.Response`. Same call
patterns; same response shape; only the underlying transport
changes. Headers, timeout, and User-Agent defaults are centralized
in `dataretrieval.utils.HTTPX_DEFAULTS`.

Chunker, paginated-loop helpers, and OGC waterdata fetchers all
route through one `httpx.Client` that the chunker publishes on the
`_chunked_session` ContextVar, so paginated sub-requests reuse the
connection pool across a single chunked call.

Tests migrate from `requests_mock` to native `pytest-httpx`. The
old `_RequestsMockShim` in `tests/conftest.py` is gone; the new
conftest is ~30 lines configuring `pytest-httpx` strict-mode
relaxations.

Test count: 411 passing (mocked), 2 skipped, ruff clean.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Adds a parallel fan-out path to `multi_value_chunked`. When
`API_USGS_CONCURRENT` resolves to >1 (default: 16), the decorator
runs the sub-requests of an over-budget plan concurrently under
one shared `httpx.AsyncClient`, instead of issuing them serially.
Falls back to the serial sync path (with a one-time UserWarning)
when no async fetch sibling is wired or when an asyncio event
loop is already running (Jupyter, IPython, async apps —
`asyncio.run` would otherwise raise).

Architecture (`dataretrieval/waterdata/chunking.py`):

* `_fan_out_async(plan, fetch_once, fetch_async, *, max_concurrent)`
  is the orchestrator. Probe-first: issues sub-request 0 alone via
  `_probe_first` so its `x-ratelimit-remaining` header gates the
  rest of the plan; if the window can't fit, raises
  `RequestExceedsQuota` before the burst goes out. Then
  `_fan_out_rest` dispatches indices 1..N-1 concurrently via
  `asyncio.gather(return_exceptions=True)`. Completed pairs survive
  a sibling's transient failure, so partial state stays recoverable
  through `ChunkedCall.resume()` on the sync path.
* Failure precedence in `_fan_out_rest`:
    1. Cancellation/interrupt signals (CancelledError,
       KeyboardInterrupt, SystemExit) propagate unmodified — never
       wrapped as transients. Cancellation is asyncio's abort
       signal; rewriting it as ChunkInterrupted would silently
       consume the user's stop request.
    2. Recognized transients (RateLimited, ServiceUnavailable, bare
       httpx.HTTPError) wrap as ChunkInterrupted so the user gets
       a resumable handle even when a non-transient bug landed
       earlier in submission order.
    3. Otherwise raise the first failure in submission order,
       preserving its type.
* `_execute_in_parallel` owns the sync→async bridge:
  `asyncio.run` dispatch with the `fetch_async is None` and
  running-event-loop fallbacks (one-time UserWarning + serial).
* `_publish_async_session` / `get_active_async_session` /
  `_chunked_async_session` ContextVar let async paginated-loop
  helpers (`_walk_pages_async`, `_paginate_async`) reuse one
  `AsyncClient` connection pool across every concurrent
  sub-request.

Wiring (`dataretrieval/waterdata/utils.py`):

* `_walk_pages_async`, `_paginate_async`, `_async_session`,
  `_fetch_once_async` — async siblings of the sync paginate path.
* The `@chunking.multi_value_chunked(fetch_async=_fetch_once_async)`
  decorator on `_fetch_once` wires the async sibling so the
  parallel path is available to every Water Data OGC getter.
* `ChunkedCall._chunks` (already a dict in the prior commit)
  now actually sees sparse keys when the parallel fan-out leaves
  scattered completions; comments updated to document that.

Concurrency cap (`API_USGS_CONCURRENT`):

* Integer N >= 1: bounded fan-out (semaphore-gated, N=1 forces
  serial sync). Default 16 — the server-friendly sweet spot.
* `unbounded`: no per-call cap (`Semaphore(sys.maxsize)`).
* Unset: default 16.

Test scaffolding: `tests/conftest.py` adds the `_serial_chunker`
autouse fixture that pins `API_USGS_CONCURRENT=1` so the historical
mocked suite stays on the deterministic serial path; the new async
tests opt back in by re-setting the env var inside their body.

Tests: 6 async-path tests in `tests/waterdata_chunking_test.py`
(one-call-per-sub-request, probe quota check, mid-fan-out
transient yields resumable ChunkInterrupted, fallback-to-serial
parametrized over running-loop and missing-fetch_async,
cancellation-wins-over-transient-sibling regression).
`tests/waterdata_progress_test.py` adds a progress-reporter
integration test for `_fan_out_async` and a `_paginate_async`
test. `tests/waterdata_utils_test.py` adds a `_walk_pages_async`
initial-parse-error test.

Test count: 420 passing (mocked), 2 skipped, ruff clean.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@thodson-usgs thodson-usgs changed the title feat(waterdata): Migrate to httpx and add async parallel chunker feat(waterdata): Add async parallel chunker over httpx.AsyncClient May 25, 2026
@thodson-usgs thodson-usgs added the enhancement New feature or request label May 25, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant