Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions dandi/dandiapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from datetime import datetime
from enum import Enum
from fnmatch import fnmatchcase
from functools import cached_property
import json
import os.path
from pathlib import Path, PurePosixPath
Expand Down Expand Up @@ -565,6 +566,27 @@ def _get_keyring_ids(self) -> tuple[str, str]:
def _instance_id(self) -> str:
return self.dandi_instance.name.upper()

@cached_property
def supports_zarr_multipart_upload(self) -> bool:
"""
Whether the server exposes the zarr multipart upload endpoints
introduced in dandi-archive#2784 (``/uploads/zarr/initialize/`` and
friends).

Probed once per client by POSTing an empty body to
``/uploads/zarr/initialize/``: if the route does not exist the server
returns 404; any other status (400 for the bad payload, 401/403 for
auth, etc.) means the route is present and the server supports
multipart zarr uploads.
"""
try:
self.post("/uploads/zarr/initialize/", json={})
except HTTP404Error:
return False
except requests.HTTPError:
return True
return True

def get_dandiset(
self, dandiset_id: str, version_id: str | None = None, lazy: bool = True
) -> RemoteDandiset:
Expand Down
69 changes: 48 additions & 21 deletions dandi/files/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,22 +743,44 @@ def mkzarr() -> str:
# Items to upload in this batch (may be retried e.g. due to
# 403 errors because of timed-out upload URLs)
all_items = list(items)
large_items = [
multipart_items = [
it for it in all_items if it.size > ZARR_LARGE_CHUNK_THRESHOLD
]
items_to_upload = [
singlepart_items = [
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without name changes code would be hard(er) to read going forward. Cleaner names at least make more sense IMHO for future ourselves

it for it in all_items if it.size <= ZARR_LARGE_CHUNK_THRESHOLD
]
# TODO: remove once all servers are > 0.23.0 (i.e. ship
# dandi-archive#2784) and the multipart zarr upload
# endpoints are universally available; the capability
# check would then be unnecessary.
if multipart_items and not client.supports_zarr_multipart_upload:
largest = max(multipart_items, key=lambda it: it.size)
total_large_size = sum(it.size for it in multipart_items)
raise UploadError(
f"{asset_path}:"
f" {pluralize(len(multipart_items), 'Zarr chunk')}"
f" totaling {total_large_size / 1024**3:.2f} GiB"
f" exceed the S3 single-part upload limit of"
f" {ZARR_LARGE_CHUNK_THRESHOLD / 1024**3:.0f} GiB"
f" (largest: {largest.entry_path},"
f" {largest.size / 1024**3:.2f} GiB);"
f" the server does not support multipart zarr"
f" uploads (dandi-archive#2784)."
)
max_retries = 5
retry_count = 0
# Add all items to checksum tree (only done once)
for it in all_items:
zcc.add_leaf(Path(it.entry_path), it.size, it.digest)

# Upload chunks above 5GB individually via multipart upload
for it in large_items:
# Yield uploading status
yield from _multipart_upload(
# Upload chunks above 5GB individually via multipart upload.
# ``_multipart_upload`` reports ``current`` as bytes within
# the single chunk being uploaded; translate it to bytes
# uploaded across the whole zarr so progress reporting
# stays monotonic for downstream consumers.
for it in multipart_items:
cumulative_before = bytes_uploaded
for status in _multipart_upload(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

potentially we could change what record we yield in that helper to make it easier here, e.g. could provide "bytes_uploaded" to be provided inside to be taken into account for progress reporting or alike -- I will leave to you to choose

client=client,
filepath=it.filepath,
asset_path=it.entry_path,
Expand All @@ -768,21 +790,26 @@ def mkzarr() -> str:
"chunk_key": it.entry_path,
},
jobs=jobs,
)

# Part is finished uploading, yield final progress
):
if (
status.get("status") == "uploading"
and "current" in status
):
cumulative = cumulative_before + status["current"]
yield {
"status": "uploading",
"progress": 100 * cumulative / to_upload.total_size,
"current": cumulative,
}
else:
yield status
changed = True
bytes_uploaded += it.size
yield {
"status": "uploading",
"progress": 100 * bytes_uploaded / to_upload.total_size,
"current": bytes_uploaded,
}

# Upload the remaining files using single part upload
while items_to_upload and retry_count <= max_retries:
while singlepart_items and retry_count <= max_retries:
# Prepare upload requests for current items
uploading = [it.upload_request() for it in items_to_upload]
uploading = [it.upload_request() for it in singlepart_items]

if retry_count == 0:
lgr.debug(
Expand Down Expand Up @@ -814,7 +841,7 @@ def mkzarr() -> str:
upload_url=signed_url,
item=it,
)
for (signed_url, it) in zip(r, items_to_upload)
for (signed_url, it) in zip(r, singlepart_items)
]

changed = True
Expand Down Expand Up @@ -846,20 +873,20 @@ def mkzarr() -> str:
)

# Prepare for next iteration with retry items
if items_to_upload := retry_items:
if singlepart_items := retry_items:
retry_count += 1
if retry_count <= max_retries:
lgr.info(
"%s: %s got 403 errors, requesting new URLs",
asset_path,
pluralize(len(items_to_upload), "file"),
pluralize(len(singlepart_items), "file"),
)
# Small delay before retry
sleep(1 * retry_count)

# Check if we exhausted retries
if items_to_upload:
nfiles_str = pluralize(len(items_to_upload), "file")
if singlepart_items:
nfiles_str = pluralize(len(singlepart_items), "file")
raise UploadError(
f"{asset_path}: failed to upload {nfiles_str} "
f"after {max_retries} retries due to repeated 403 errors"
Expand Down
16 changes: 12 additions & 4 deletions dandi/tests/test_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from ..consts import ZARR_MIME_TYPE, dandiset_metadata_file
from ..dandiapi import AssetType, RemoteZarrAsset
from ..exceptions import UnknownAssetError
from ..files.bases import _multipart_upload as real_multipart_upload
from ..files import (
BIDSDatasetDescriptionAsset,
DandisetMetadataFile,
Expand All @@ -30,6 +29,7 @@
dandi_file,
find_dandi_files,
)
from ..files.bases import _multipart_upload as real_multipart_upload
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you have pre-commit installed for your local clone of dandi-cli @jjnesbitt ?


lgr = get_logger()

Expand Down Expand Up @@ -540,6 +540,11 @@ def test_upload_zarr_entry_content_type(new_dandiset, tmp_path):
@pytest.mark.ai_generated
def test_upload_zarr_large_chunks(new_dandiset, tmp_path):
"""Chunks above ZARR_LARGE_CHUNK_THRESHOLD are uploaded via multipart upload."""
if not new_dandiset.client.supports_zarr_multipart_upload:
pytest.skip(
"Server does not expose the zarr multipart upload endpoints"
" (dandi-archive#2784)"
)
filepath = tmp_path / "example.zarr"
zarr.save(filepath, np.arange(1000), np.arange(1000, 0, -1))
zf = dandi_file(filepath)
Expand Down Expand Up @@ -571,6 +576,11 @@ def spy_multipart_upload(**kwargs):
@pytest.mark.ai_generated
def test_upload_zarr_mixed_chunks(new_dandiset, tmp_path):
"""Chunks above ZARR_LARGE_CHUNK_THRESHOLD go multipart; smaller ones use single-part upload."""
if not new_dandiset.client.supports_zarr_multipart_upload:
pytest.skip(
"Server does not expose the zarr multipart upload endpoints"
" (dandi-archive#2784)"
)
filepath = tmp_path / "mixed.zarr"
store = zarr.open_group(str(filepath), mode="w")
# small array: 10 int64 elements, produces a ~96-byte chunk (compressed)
Expand Down Expand Up @@ -601,9 +611,7 @@ def spy_multipart_upload(**kwargs):
remote_entries = {str(e) for e in asset.iterfiles()}
# Only chunk files whose on-disk size exceeds the threshold should be multipart-uploaded
large_chunks = {
p
for p in remote_entries
if (filepath / p).stat().st_size > mixed_threshold
p for p in remote_entries if (filepath / p).stat().st_size > mixed_threshold
}
assert set(multipart_paths) == large_chunks
# At least one chunk must have gone each path so the test is meaningful
Expand Down
Loading