Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions clients/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,66 @@ Arbitrary key-value pairs can be attached to objects and retrieved on download.
session.put(b"payload", metadata={"source": "upload-service"})
```

### Multipart Upload API

For large objects, use multipart uploads to upload parts independently and then
assemble them into a final object.

**Important:** unlike single-object uploads, multipart uploads do **not** auto-compress.
The caller must pre-compress each part according to the compression set as part of the metadata
when initiating the upload.

```python
from concurrent.futures import ThreadPoolExecutor

import zstandard

from objectstore_client.multipart import MultipartCompleteError

upload = session.initiate_multipart_upload(
key="my-large-object",
compression="zstd",
metadata={"source": "upload-service"},
)

compressor = zstandard.ZstdCompressor()
chunks = [b"part1", b"part2", b"part3", b"part4"]

def upload_part(part_number: int, data: bytes):
compressed = compressor.compress(data)
return upload.upload_part(
compressed, part_number=part_number, content_length=len(compressed)
)

with ThreadPoolExecutor(max_workers=4) as executor:
futures = [
executor.submit(upload_part, i + 1, chunk)
for i, chunk in enumerate(chunks)
]
parts = [f.result() for f in futures]

try:
key = upload.complete(parts)
except MultipartCompleteError:
upload.abort()
raise
```

To resume an in-progress multipart upload after a process restart, persist the
`key` and `upload_id`, then reconstruct the upload handle later:

```python
saved_key = upload.key
saved_upload_id = upload.upload_id

resumed = session.resume_multipart_upload(saved_key, saved_upload_id)
existing_parts = resumed.list_parts()

# Upload missing parts...

key = resumed.complete(new_parts + existing_parts)
```

### Authentication

If your Objectstore instance enforces authorization, you must configure authentication
Expand Down
5 changes: 5 additions & 0 deletions clients/python/docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@
"undoc-members": True,
"show-inheritance": True,
}

# Re-exported symbols in __init__.py create duplicate Sphinx targets
# (e.g. objectstore_client.Session vs objectstore_client.client.Session).
# This is the most specific suppression Sphinx supports for that warning.
suppress_warnings = ["ref.python"]
16 changes: 16 additions & 0 deletions clients/python/docs/objectstore_client.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ objectstore\_client.client module
:show-inheritance:
:undoc-members:

objectstore\_client.errors module
---------------------------------

.. automodule:: objectstore_client.errors
:members:
:show-inheritance:
:undoc-members:

objectstore\_client.metadata module
-----------------------------------

Expand All @@ -41,6 +49,14 @@ objectstore\_client.metrics module
:show-inheritance:
:undoc-members:

objectstore\_client.multipart module
------------------------------------

.. automodule:: objectstore_client.multipart
:members:
:show-inheritance:
:undoc-members:

objectstore\_client.scope module
--------------------------------

Expand Down
2 changes: 1 addition & 1 deletion clients/python/src/objectstore_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
from objectstore_client.client import (
Client,
GetResponse,
RequestError,
Session,
Usecase,
)
from objectstore_client.errors import RequestError
from objectstore_client.metadata import (
Compression,
ExpirationPolicy,
Expand Down
108 changes: 91 additions & 17 deletions clients/python/src/objectstore_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from objectstore_client import utils
from objectstore_client.auth import Permission, TokenGenerator, TokenProvider
from objectstore_client.errors import raise_for_status
from objectstore_client.metadata import (
HEADER_EXPIRATION,
HEADER_META_PREFIX,
Expand All @@ -27,6 +28,7 @@
NoOpMetricsBackend,
measure_storage_operation,
)
from objectstore_client.multipart import MultipartUpload
from objectstore_client.scope import Scope


Expand All @@ -35,15 +37,6 @@ class GetResponse(NamedTuple):
payload: IO[bytes]


class RequestError(Exception):
"""Exception raised if an API call to Objectstore fails."""

def __init__(self, message: str, status: int, response: str):
super().__init__(message)
self.status = status
self.response = response


class Usecase:
"""
An identifier for a workload in Objectstore, along with defaults to use for all
Expand Down Expand Up @@ -281,6 +274,25 @@ def _make_url(self, key: str | None, full: bool = False) -> str:
return f"http://{self._pool.host}:{self._pool.port}{path}"
return path

def _make_multipart_url(
self,
action: str | None,
key: str | None,
query: str | None = None,
) -> str:
if action == "parts":
resource = "objects:multipart:parts"
elif action == "complete":
resource = "objects:multipart:complete"
else:
resource = "objects:multipart"

relative_path = f"/v1/{resource}/{self._usecase.name}/{self._scope}/{key or ''}"
path = self._base_path.rstrip("/") + relative_path
if query:
return f"{path}?{query}"
return path

def put(
self,
contents: bytes | IO[bytes],
Expand Down Expand Up @@ -445,12 +457,74 @@ def delete(self, key: str) -> None:
)
raise_for_status(response)

def initiate_multipart_upload(
self,
*,
key: str | None = None,
compression: Compression | Literal["none"] | None = None,
content_type: str | None = None,
metadata: dict[str, str] | None = None,
expiration_policy: ExpirationPolicy | None = None,
origin: str | None = None,
) -> MultipartUpload:
"""
Initiates a multipart upload.

def raise_for_status(response: urllib3.BaseHTTPResponse) -> None:
if response.status >= 400:
res = (response.data or response.read() or b"").decode("utf-8", "replace")
raise RequestError(
f"Objectstore request failed with status {response.status}",
response.status,
res,
)
Returns a :class:`~objectstore_client.multipart.MultipartUpload` handle
that can be used to upload parts, list parts, complete, or abort.

**Important:** unlike :meth:`put`, the ``compression`` parameter only
records the compression algorithm in the object's metadata.
The caller is responsible for compressing each part in accordance with the
chosen algorithm before passing it to
:meth:`~objectstore_client.multipart.MultipartUpload.upload_part`.
"""
if compression and compression not in ("none", "zstd"):
raise ValueError(f"Invalid compression: {compression}")

headers = self._make_headers()

compression = compression or self._usecase._compression
if compression and compression != "none":
headers["Content-Encoding"] = compression

if content_type:
Comment thread
lcian marked this conversation as resolved.
headers["Content-Type"] = content_type

expiration_policy = expiration_policy or self._usecase._expiration_policy
if expiration_policy:
headers[HEADER_EXPIRATION] = format_expiration(expiration_policy)

if origin:
headers[HEADER_ORIGIN] = origin

if metadata:
for k, v in metadata.items():
headers[f"{HEADER_META_PREFIX}{k}"] = v

if key == "":
key = None

with measure_storage_operation(
self._metrics_backend, "multipart.initiate", self._usecase.name
):
response = self._pool.request(
"POST" if not key else "PUT",
self._make_multipart_url(None, key),
headers=headers,
preload_content=True,
decode_content=True,
)
raise_for_status(response)
res = response.json()
return MultipartUpload(self, res["key"], res["upload_id"])

def resume_multipart_upload(self, key: str, upload_id: str) -> MultipartUpload:
"""
Reconstructs a multipart upload handle.

This does not make any network calls.
Use it to resume an upload after a process restart or to
continue an upload started elsewhere.
"""
return MultipartUpload(self, key, upload_id)
22 changes: 22 additions & 0 deletions clients/python/src/objectstore_client/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from __future__ import annotations

import urllib3


class RequestError(Exception):
"""Exception raised if an API call to Objectstore fails."""

def __init__(self, message: str, status: int, response: str):
super().__init__(message)
self.status = status
self.response = response


def raise_for_status(response: urllib3.BaseHTTPResponse) -> None:
if response.status >= 400:
res = (response.data or response.read() or b"").decode("utf-8", "replace")
raise RequestError(
f"Objectstore request failed with status {response.status}",
response.status,
res,
)
16 changes: 14 additions & 2 deletions clients/python/src/objectstore_client/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def __init__(self, backend: MetricsBackend, operation: str, usecase: str):
# These may be set during or after the enclosed operation
self.start: int | None = None
self.elapsed: float | None = None
self.size: int | None = None
self.uncompressed_size: int | None = None
self.compressed_size: int | None = None
self.compression: str = "unknown"
Expand All @@ -101,6 +102,13 @@ def record_uncompressed_size(self, value: int) -> None:
)
self.uncompressed_size = value

def record_size(self, value: int) -> None:
tags = {"usecase": self.usecase}
self.backend.distribution(
f"storage.{self.operation}.size", value, tags=tags, unit="byte"
)
self.size = value

def record_compressed_size(self, value: int, compression: str = "unknown") -> None:
tags = {"usecase": self.usecase, "compression": compression}
self.backend.distribution(
Expand All @@ -124,14 +132,18 @@ def maybe_record_throughputs(self) -> None:
if not self.elapsed or self.elapsed <= 0:
return None

sizes = []
sizes: list[tuple[int, str | None]] = []
if self.size:
sizes.append((self.size, None))
if self.uncompressed_size:
sizes.append((self.uncompressed_size, "none"))
if self.compressed_size:
sizes.append((self.compressed_size, self.compression))

for size, compression in sizes:
tags = {"usecase": self.usecase, "compression": compression}
tags: dict[str, str] = {"usecase": self.usecase}
if compression is not None:
tags["compression"] = compression
self.backend.distribution(
f"storage.{self.operation}.throughput", size / self.elapsed, tags=tags
)
Expand Down
Loading
Loading