Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dandi/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ def urls(self) -> Iterator[str]:
#: MIME type assigned to & used to identify Zarr assets
ZARR_MIME_TYPE = "application/x-zarr"

#: Maximum file size for a single S3 PUT upload (5 GiB).
#: S3 rejects single-part PUTs larger than this; such files would need
#: multipart upload which is not yet supported for zarr chunks.
S3_MAX_SINGLE_PART_UPLOAD = 5 * 1024**3

#: Maximum number of Zarr directory entries to upload at once
ZARR_UPLOAD_BATCH_SIZE = 255

Expand Down
66 changes: 56 additions & 10 deletions dandi/files/zarr.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
from __future__ import annotations

from base64 import b64encode
from collections import Counter
from collections.abc import Generator, Iterator
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from contextlib import closing
from dataclasses import dataclass, field, replace
from datetime import datetime
from enum import Enum
import json
import math
import os
import os.path
import urllib.parse
from pathlib import Path
import random
from time import sleep
from typing import Any, Optional
import urllib.parse

from dandischema.models import BareAsset, DigestType
from pydantic import BaseModel, ConfigDict, ValidationError
Expand All @@ -24,6 +27,7 @@
from dandi import get_logger
from dandi.consts import (
MAX_ZARR_DEPTH,
S3_MAX_SINGLE_PART_UPLOAD,
ZARR_DELETE_BATCH_SIZE,
ZARR_MIME_TYPE,
ZARR_UPLOAD_BATCH_SIZE,
Expand Down Expand Up @@ -745,6 +749,7 @@ def mkzarr() -> str:
items_to_upload = list(items)
max_retries = 5
retry_count = 0
current_jobs = jobs or 5
# Add all items to checksum tree (only done once)
for it in items_to_upload:
zcc.add_leaf(Path(it.entry_path), it.size, it.digest)
Expand Down Expand Up @@ -774,7 +779,7 @@ def mkzarr() -> str:
r = client.post(f"/zarr/{zarr_id}/files/", json=uploading)

# Upload files in parallel
with ThreadPoolExecutor(max_workers=jobs or 5) as executor:
with ThreadPoolExecutor(max_workers=current_jobs) as executor:
futures = [
executor.submit(
_upload_zarr_file,
Expand Down Expand Up @@ -817,14 +822,22 @@ def mkzarr() -> str:
# Prepare for next iteration with retry items
if items_to_upload := retry_items:
retry_count += 1
current_jobs = max(1, math.ceil(current_jobs / 2))
if retry_count <= max_retries:
lgr.info(
"%s: %s got 403 errors, requesting new URLs",
"%s: %s got 403 errors, requesting new URLs"
" (attempt %d/%d, workers: %d)",
asset_path,
pluralize(len(items_to_upload), "file"),
retry_count,
max_retries,
current_jobs,
)
# Exponential backoff with jitter before retry
sleep(
min(2**retry_count * 5, 120)
+ random.uniform(0, 5)
)
# Small delay before retry
sleep(1 * retry_count)

# Check if we exhausted retries
if items_to_upload:
Expand Down Expand Up @@ -898,7 +911,19 @@ def _handle_failed_items_and_raise(

# Log all failures
for item, error in failed_items:
lgr.error("Failed to upload %s: %s", item.filepath, error)
lgr.error("Failed to upload %s (%d bytes): %s", item.filepath, item.size, error)

# Summary diagnostics
exc_counts = Counter(type(error).__name__ for _, error in failed_items)
exc_summary = ", ".join(f"{k}: {v}" for k, v in exc_counts.most_common())
lgr.error(
"Upload failure summary: %d/%d files failed; exception types: {%s}%s",
len(failed_items),
len(futures),
exc_summary,
" (systematic — all same exception type)" if len(exc_counts) == 1 else "",
)

# Raise the first error
raise failed_items[0][1]

Expand Down Expand Up @@ -945,23 +970,36 @@ def _upload_zarr_file(
json_resp=False,
retry_if=_retry_zarr_file,
headers=headers,
timeout=(60, 7200),
)
except requests.HTTPError as e:
post_upload_size_check(item.filepath, item.size, True)
# Check if this is a 403 error that we should retry with a new URL
if e.response is not None and e.response.status_code == 403:
lgr.debug(
"Got 403 error uploading %s, will retry with new URL: %s",
"Got 403 error uploading %s (%d bytes), will retry with new URL: %s",
item.filepath,
item.size,
str(e),
)
return UploadResult(item=item, status=UploadStatus.RETRY_NEEDED, error=e)
else:
# Other HTTP error - don't retry
lgr.warning(
"HTTP error uploading %s (%d bytes): %s",
item.filepath,
item.size,
e,
)
return UploadResult(item=item, status=UploadStatus.FAILED, error=e)
except Exception as e:
post_upload_size_check(item.filepath, item.size, True)
# Non-HTTP error - don't retry
lgr.warning(
"Error uploading %s (%d bytes): %s: %s",
item.filepath,
item.size,
type(e).__name__,
e,
)
return UploadResult(item=item, status=UploadStatus.FAILED, error=e)
else:
post_upload_size_check(item.filepath, item.size, False)
Expand Down Expand Up @@ -1056,11 +1094,19 @@ def from_entry(cls, e: LocalZarrEntry, digest: str) -> UploadItem:
content_type = "application/json"
else:
content_type = None
size = pre_upload_size_check(e.filepath)
if size > S3_MAX_SINGLE_PART_UPLOAD:
raise ValueError(
f"Zarr chunk {e.filepath} is {size / 1024**3:.2f} GiB,"
f" exceeding the S3 single-part upload limit of"
f" {S3_MAX_SINGLE_PART_UPLOAD / 1024**3:.0f} GiB."
f" Multipart upload for zarr chunks is not yet supported."
)
return cls(
entry_path=str(e),
filepath=e.filepath,
digest=digest,
size=pre_upload_size_check(e.filepath),
size=size,
content_type=content_type,
)

Expand Down
99 changes: 99 additions & 0 deletions dandi/tests/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,105 @@ def mock_request(self, method, path, **kwargs):
), f"URL {url} should not have been retried but had {request_attempts[url]} attempts"


@pytest.mark.ai_generated
def test_zarr_upload_403_batch_retry_reduces_parallelism(
new_dandiset: SampleDandiset,
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that 403 errors trigger batch-level retry with reduced parallelism
and exponential backoff, exercising the batch retry loop in iter_upload."""
zarr_path = new_dandiset.dspath / "test.zarr"
zarr.save(zarr_path, np.arange(100), np.arange(100, 0, -1))

# Mock at the request() level so that _upload_zarr_file sees the 403
# response and returns RETRY_NEEDED, triggering the batch retry loop.
request_attempts: defaultdict[str, int] = defaultdict(int)
original_request = RESTFullAPIClient.request

def mock_request(self, method, path, **kwargs):
urlpath = urlparse(path).path if path.startswith("http") else path
request_attempts[urlpath] += 1

# Simulate 403 on first attempt for files containing "arr_1"
if method == "PUT" and "arr_1" in path and request_attempts[urlpath] == 1:
resp = Mock(spec=requests.Response)
resp.status_code = 403
resp.ok = False
resp.text = "Forbidden"
resp.headers = {}
error = requests.HTTPError("403 Forbidden", response=resp)
error.response = resp
raise error

return original_request(self, method, path, **kwargs)

monkeypatch.setattr(RESTFullAPIClient, "request", mock_request)
# Speed up the test by removing the exponential backoff sleep
monkeypatch.setattr("dandi.files.zarr.sleep", lambda _: None)

with caplog.at_level("INFO", logger="dandi"):
new_dandiset.upload()

# Verify the upload succeeded
(asset,) = new_dandiset.dandiset.get_assets()
assert isinstance(asset, RemoteZarrAsset)
assert asset.path == "test.zarr"

# Verify the batch retry log message with worker count
retry_msgs = [
r.message for r in caplog.records if "requesting new URLs" in r.message
]
assert len(retry_msgs) > 0, "Expected batch retry log message"
assert "workers:" in retry_msgs[0], "Expected reduced worker count in retry log"


@pytest.mark.ai_generated
def test_zarr_upload_connection_error_diagnostics(
new_dandiset: SampleDandiset,
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that ConnectionError failures produce diagnostic summary logging."""
zarr_path = new_dandiset.dspath / "test.zarr"
zarr.save(zarr_path, np.arange(100), np.arange(100, 0, -1))

# Mock put() to raise ConnectionError for all S3 uploads.
# This bypasses tenacity retries (they would take too long) and directly
# exercises _upload_zarr_file's except-Exception path and the
# _handle_failed_items_and_raise diagnostics.
original_put = RESTFullAPIClient.put

def mock_put(self, url, **kwargs):
if "dandi-dandisets" in url:
raise requests.ConnectionError(
"('Connection aborted.', ConnectionAbortedError(10053, "
"'An established connection was aborted by the software "
"in your host machine'))"
)
return original_put(self, url, **kwargs)

monkeypatch.setattr(RESTFullAPIClient, "put", mock_put)

with caplog.at_level("ERROR", logger="dandi"), pytest.raises(
requests.ConnectionError
):
new_dandiset.upload()

# Verify the diagnostic summary was logged
summary_msgs = [
r.message for r in caplog.records if "Upload failure summary" in r.message
]
assert (
len(summary_msgs) == 1
), f"Expected 1 summary message, got {len(summary_msgs)}"
summary = summary_msgs[0]
assert "ConnectionError" in summary
assert (
"systematic" in summary
), "All-same-type failures should be flagged as systematic"


@pytest.mark.ai_generated
def test_upload_rejects_dandidownload_paths(
new_dandiset: SampleDandiset, tmp_path: Path
Expand Down
Loading