feat(py-client): Add many API#419
Conversation
Introduces `session.many()` for executing multiple get/put/delete operations with automatic batching and streaming end-to-end — no buffering of request or response bodies. - New `many.py`: `Put`, `Get`, `Delete` operation types, `ManyResponse` result, and `execute_many()` orchestrator. Small puts are batched via multipart; puts over 1 MB or with `IO[bytes]` bodies go individually. Concurrent execution via `ThreadPoolExecutor` with configurable concurrency (default 3, matching Rust client semantics). - New `multipart.py`: custom multipart encoder (lazy `Iterator[bytes]`, supports per-part custom headers and `IO[bytes]` bodies) and streaming decoder (`iter_multipart_response`) that parses parts incrementally across arbitrary-sized chunks without buffering the full response. - `client.py`: exposes `Session.many()` and re-exports operation types and `ManyResponse` from the top-level package.
This comment was marked as spam.
This comment was marked as spam.
| yield result | ||
| else: | ||
| with ThreadPoolExecutor(max_workers=concurrency) as executor: | ||
| futures = [ |
There was a problem hiding this comment.
Bug: The _send_batch generator is iterated on the main thread instead of the worker thread, causing concurrent batch operations to execute sequentially.
Severity: HIGH
Suggested Fix
To ensure the generator's logic runs in the worker thread, wrap the call in a function that consumes the generator. For example, create a helper function def _run_send_batch(*args): return list(_send_batch(*args)) and submit this new function to the executor: executor.submit(_run_send_batch, session, chunk). This forces the I/O operations to complete within the worker thread before the result is returned.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.
Location: clients/python/src/objectstore_client/many.py#L546
Potential issue: The `_send_batch` function is a generator. When submitted to a
`ThreadPoolExecutor` via `executor.submit`, the worker thread immediately returns a
generator object without executing the function's body. The main thread later retrieves
this generator via `future.result()` and iterates over it. This causes all HTTP I/O for
the batch operation to execute sequentially on the main thread, defeating the purpose of
the thread pool. As a result, setting `concurrency > 1` for batch operations has no
effect on performance, as requests are not parallelized.
Did we get this right? 👍 / 👎 to inform future reviews.
…xecutor _send_batch is a generator function. Submitting it directly to executor.submit returns the generator object without executing the HTTP I/O. Wrap in run_batch to materialize results in the worker thread so concurrency > 1 actually parallelizes batch requests.
…ontent-Encoding When a large Put was routed to the individual endpoint, the pre-compressed body was sent with compression="none", which prevented session.put() from setting the Content-Encoding: zstd header. The server would store compressed bytes without knowing they were compressed, causing data corruption on read. Fix by always delegating to session.put() with the original uncompressed body, letting it handle compression end-to-end.
Add __all__ to many.py to prevent stdlib imports from leaking into autodoc, and add the many module section to the docs rst file.
With preload_content=False, urllib3 keeps the connection open for reuse. If the multipart parser stops at the closing boundary without consuming all socket data, the next request on that connection could read stale bytes. Ensure drain_conn() + release_conn() always run.
Types re-exported from __init__.py (GetResponse, RequestError, Session) create duplicate targets when submodules are also documented. Suppress ref.python warnings since both targets resolve to the same class. Also regenerate the rst file via sphinx-apidoc to include multipart module.
…tion Sort batch response parts by original index before yielding when concurrency=1, since the server may return multipart response parts in a different order than the request.
Decode zstd batch GET payloads through a streaming reader so frames without content size can be read like Session.get(). Add coverage for streamed zstd frames produced without a content size header. Co-Authored-By: OpenAI Codex <noreply@openai.com>
Keep individual operations in their original position when concurrency is 1 instead of dispatching all batchable work first. This preserves input order for mixed batch and individual workloads. Co-Authored-By: OpenAI Codex <noreply@openai.com>
Strip CR/LF characters from header names and values in encode_multipart to prevent header injection via user-supplied metadata, content_type, or origin fields.
Add the Session.get decompression options to batched Get operations and preserve compressed bytes when callers request them. This keeps batched GET behavior aligned with single-object GET behavior for compressed payload consumers. Co-Authored-By: OpenAI Codex <noreply@openai.com>
Reject control characters and invalid multipart header names before emitting part headers. This avoids silently rewriting user-controlled metadata and prevents malformed batch puts from injecting extra multipart headers. Co-Authored-By: OpenAI Codex <noreply@openai.com>
Treat an empty Put key as no explicit key when preparing batch puts. This matches Session.put behavior and lets the server generate a key for batched auto-key uploads. Co-Authored-By: OpenAI Codex <noreply@openai.com>
The server processes batch operations concurrently, so a GET for a key being PUT in the same batch may not see the data. Split into separate batch calls matching the Rust client's test pattern. Also strengthen CRLF header validation to reject all control characters.
Avoid placing same-key reads and mutations in the same batch when concurrency is 1. The server can process parts within one batch concurrently, so conflicting operations need separate sequential requests to preserve the documented ordering behavior. Co-Authored-By: OpenAI Codex <noreply@openai.com>
This comment was marked as spam.
This comment was marked as spam.
This comment was marked as spam.
This comment was marked as spam.
| raise | ||
| except Exception as exc: | ||
| error = RequestError(f"Batch request failed: {exc}", 0, str(exc)) | ||
| yield from _batch_level_error(ops, error) |
There was a problem hiding this comment.
Duplicate results yielded on mid-parse exception in batch
Medium Severity
If _parse_batch_response (or iter_multipart_response) partially yields successful results and then raises an exception (e.g., NotImplementedError for non-zstd compression, ValueError from malformed server data, or a decompression error), those results have already been passed to the caller via yield from. The outer except Exception handler then calls yield from _batch_level_error(ops, error), which yields error results for ALL operations — including those already successfully yielded. Callers (list(_send_batch(...)) or sorted(_send_batch(...))) collect all values, producing duplicate ManyResponse entries for the same operations.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 837b455. Configure here.
|
bugbot run |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 2 total unresolved issues (including 1 from previous review).
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 837b455. Configure here.
| else: | ||
| with ThreadPoolExecutor(max_workers=concurrency) as executor: | ||
| futures = [executor.submit(run_batch, chunk) for chunk in batch_chunks] | ||
| futures += [executor.submit(run_individual, entry) for entry in individual] |
There was a problem hiding this comment.
Concurrent path ignores key-conflict batch splitting
High Severity
The concurrency > 1 path re-batches all operations from the batchable list (line 625), which unconditionally accumulates every batchable op (line 592) regardless of key conflicts. The careful key-conflict splitting done by flush_pending_batchable() only affects sequential_work, used exclusively by the concurrency == 1 path. With concurrency > 1, same-key Put and Get operations end up in the same batch, causing guaranteed server-side race conditions since the server processes intra-batch operations concurrently.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 837b455. Configure here.


Adds
session.many()to the Python client for executing multiple get/put/delete operations in batches, with a fully streaming request/response pipeline.The batch protocol uses multipart/form-data with per-part custom headers (
x-sn-batch-operation-kind,x-sn-batch-operation-key, etc.). No existing Python library supports both streaming encoding with arbitrary per-part headers and streaming decoding, so both are implemented inmultipart.py.Streaming pipeline (no buffering):
_build_batch_parts(generator) →encode_multipart(yieldsbyteslazily) → urllib3 chunked transfer encodingresponse.stream(65536)→iter_multipart_response(incremental boundary detection across arbitrary chunks) →_parse_batch_response(generator) → caller receivesIterator[ManyResponse]Operation classification:
bytesPut bodies: compressed eagerly (required for size-based batching), then classified — ≤1 MB goes in a batch request, >1 MB goes individually via the standard PUT endpointIO[bytes]Put bodies: always sent individually to avoid eager readingConcurrency:
ThreadPoolExecutordispatches batch chunks and individual ops concurrently (defaultconcurrency=3). Results arrive in completion order, matching the Rust client'sbuffer_unorderedsemantics. Sequential execution (concurrency=1) preserves input order.Close FS-330