Skip to content

feat(py-client): Add many API#419

Draft
jan-auer wants to merge 15 commits into
mainfrom
feat/python-many-streaming-api
Draft

feat(py-client): Add many API#419
jan-auer wants to merge 15 commits into
mainfrom
feat/python-many-streaming-api

Conversation

@jan-auer
Copy link
Copy Markdown
Member

@jan-auer jan-auer commented Apr 2, 2026

Adds session.many() to the Python client for executing multiple get/put/delete operations in batches, with a fully streaming request/response pipeline.

The batch protocol uses multipart/form-data with per-part custom headers (x-sn-batch-operation-kind, x-sn-batch-operation-key, etc.). No existing Python library supports both streaming encoding with arbitrary per-part headers and streaming decoding, so both are implemented in multipart.py.

Streaming pipeline (no buffering):

  • Request: _build_batch_parts (generator) → encode_multipart (yields bytes lazily) → urllib3 chunked transfer encoding
  • Response: response.stream(65536)iter_multipart_response (incremental boundary detection across arbitrary chunks) → _parse_batch_response (generator) → caller receives Iterator[ManyResponse]

Operation classification:

  • bytes Put bodies: compressed eagerly (required for size-based batching), then classified — ≤1 MB goes in a batch request, >1 MB goes individually via the standard PUT endpoint
  • IO[bytes] Put bodies: always sent individually to avoid eager reading
  • Get and Delete: always batched (zero body size)

Concurrency: ThreadPoolExecutor dispatches batch chunks and individual ops concurrently (default concurrency=3). Results arrive in completion order, matching the Rust client's buffer_unordered semantics. Sequential execution (concurrency=1) preserves input order.

Close FS-330

Introduces `session.many()` for executing multiple get/put/delete
operations with automatic batching and streaming end-to-end — no
buffering of request or response bodies.

- New `many.py`: `Put`, `Get`, `Delete` operation types, `ManyResponse`
  result, and `execute_many()` orchestrator. Small puts are batched via
  multipart; puts over 1 MB or with `IO[bytes]` bodies go individually.
  Concurrent execution via `ThreadPoolExecutor` with configurable
  concurrency (default 3, matching Rust client semantics).
- New `multipart.py`: custom multipart encoder (lazy `Iterator[bytes]`,
  supports per-part custom headers and `IO[bytes]` bodies) and streaming
  decoder (`iter_multipart_response`) that parses parts incrementally
  across arbitrary-sized chunks without buffering the full response.
- `client.py`: exposes `Session.many()` and re-exports operation types
  and `ManyResponse` from the top-level package.
@github-actions

This comment was marked as spam.

@lcian lcian marked this pull request as ready for review April 21, 2026 14:57
@lcian lcian requested a review from a team as a code owner April 21, 2026 14:57
@lcian lcian marked this pull request as draft April 21, 2026 14:59
yield result
else:
with ThreadPoolExecutor(max_workers=concurrency) as executor:
futures = [
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The _send_batch generator is iterated on the main thread instead of the worker thread, causing concurrent batch operations to execute sequentially.
Severity: HIGH

Suggested Fix

To ensure the generator's logic runs in the worker thread, wrap the call in a function that consumes the generator. For example, create a helper function def _run_send_batch(*args): return list(_send_batch(*args)) and submit this new function to the executor: executor.submit(_run_send_batch, session, chunk). This forces the I/O operations to complete within the worker thread before the result is returned.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: clients/python/src/objectstore_client/many.py#L546

Potential issue: The `_send_batch` function is a generator. When submitted to a
`ThreadPoolExecutor` via `executor.submit`, the worker thread immediately returns a
generator object without executing the function's body. The main thread later retrieves
this generator via `future.result()` and iterates over it. This causes all HTTP I/O for
the batch operation to execute sequentially on the main thread, defeating the purpose of
the thread pool. As a result, setting `concurrency > 1` for batch operations has no
effect on performance, as requests are not parallelized.

Did we get this right? 👍 / 👎 to inform future reviews.

Comment thread clients/python/src/objectstore_client/many.py Outdated
Comment thread clients/python/src/objectstore_client/many.py Outdated
@linear-code
Copy link
Copy Markdown

linear-code Bot commented Apr 27, 2026

lcian added 4 commits May 27, 2026 12:41
…xecutor

_send_batch is a generator function. Submitting it directly to
executor.submit returns the generator object without executing the
HTTP I/O. Wrap in run_batch to materialize results in the worker
thread so concurrency > 1 actually parallelizes batch requests.
…ontent-Encoding

When a large Put was routed to the individual endpoint, the pre-compressed
body was sent with compression="none", which prevented session.put() from
setting the Content-Encoding: zstd header. The server would store compressed
bytes without knowing they were compressed, causing data corruption on read.

Fix by always delegating to session.put() with the original uncompressed
body, letting it handle compression end-to-end.
Add __all__ to many.py to prevent stdlib imports from leaking into
autodoc, and add the many module section to the docs rst file.
With preload_content=False, urllib3 keeps the connection open for
reuse. If the multipart parser stops at the closing boundary without
consuming all socket data, the next request on that connection could
read stale bytes. Ensure drain_conn() + release_conn() always run.
Comment thread clients/python/src/objectstore_client/many.py
lcian and others added 10 commits May 27, 2026 12:58
Types re-exported from __init__.py (GetResponse, RequestError, Session)
create duplicate targets when submodules are also documented. Suppress
ref.python warnings since both targets resolve to the same class.

Also regenerate the rst file via sphinx-apidoc to include multipart module.
…tion

Sort batch response parts by original index before yielding when
concurrency=1, since the server may return multipart response parts
in a different order than the request.
Decode zstd batch GET payloads through a streaming reader so frames without content size can be read like Session.get().

Add coverage for streamed zstd frames produced without a content size header.

Co-Authored-By: OpenAI Codex <noreply@openai.com>
Keep individual operations in their original position when concurrency is 1 instead of dispatching all batchable work first.

This preserves input order for mixed batch and individual workloads.

Co-Authored-By: OpenAI Codex <noreply@openai.com>
Strip CR/LF characters from header names and values in
encode_multipart to prevent header injection via user-supplied
metadata, content_type, or origin fields.
Add the Session.get decompression options to batched Get operations and preserve compressed bytes when callers request them.

This keeps batched GET behavior aligned with single-object GET behavior for compressed payload consumers.

Co-Authored-By: OpenAI Codex <noreply@openai.com>
Reject control characters and invalid multipart header names before emitting part headers.

This avoids silently rewriting user-controlled metadata and prevents malformed batch puts from injecting extra multipart headers.

Co-Authored-By: OpenAI Codex <noreply@openai.com>
Treat an empty Put key as no explicit key when preparing batch puts.

This matches Session.put behavior and lets the server generate a key for batched auto-key uploads.

Co-Authored-By: OpenAI Codex <noreply@openai.com>
The server processes batch operations concurrently, so a GET for a key
being PUT in the same batch may not see the data. Split into separate
batch calls matching the Rust client's test pattern.

Also strengthen CRLF header validation to reject all control characters.
Avoid placing same-key reads and mutations in the same batch when concurrency is 1.

The server can process parts within one batch concurrently, so conflicting operations need separate sequential requests to preserve the documented ordering behavior.

Co-Authored-By: OpenAI Codex <noreply@openai.com>
@lcian

This comment was marked as spam.

@lcian

This comment was marked as spam.

raise
except Exception as exc:
error = RequestError(f"Batch request failed: {exc}", 0, str(exc))
yield from _batch_level_error(ops, error)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate results yielded on mid-parse exception in batch

Medium Severity

If _parse_batch_response (or iter_multipart_response) partially yields successful results and then raises an exception (e.g., NotImplementedError for non-zstd compression, ValueError from malformed server data, or a decompression error), those results have already been passed to the caller via yield from. The outer except Exception handler then calls yield from _batch_level_error(ops, error), which yields error results for ALL operations — including those already successfully yielded. Callers (list(_send_batch(...)) or sorted(_send_batch(...))) collect all values, producing duplicate ManyResponse entries for the same operations.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 837b455. Configure here.

@lcian lcian changed the title feat(client): Add streaming batch (many) API to Python client feat(py-client): Add many API May 27, 2026
@lcian
Copy link
Copy Markdown
Member

lcian commented May 27, 2026

bugbot run

Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

There are 2 total unresolved issues (including 1 from previous review).

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 837b455. Configure here.

else:
with ThreadPoolExecutor(max_workers=concurrency) as executor:
futures = [executor.submit(run_batch, chunk) for chunk in batch_chunks]
futures += [executor.submit(run_individual, entry) for entry in individual]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concurrent path ignores key-conflict batch splitting

High Severity

The concurrency > 1 path re-batches all operations from the batchable list (line 625), which unconditionally accumulates every batchable op (line 592) regardless of key conflicts. The careful key-conflict splitting done by flush_pending_batchable() only affects sequential_work, used exclusively by the concurrency == 1 path. With concurrency > 1, same-key Put and Get operations end up in the same batch, causing guaranteed server-side race conditions since the server processes intra-batch operations concurrently.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 837b455. Configure here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants