-
Notifications
You must be signed in to change notification settings - Fork 33
Zarr multipart upload tune ups: condition on server capability, fix progress reporting, harmonize variable names #1846
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a5bfdd7
4b81340
39cc742
684da5c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -743,22 +743,44 @@ def mkzarr() -> str: | |
| # Items to upload in this batch (may be retried e.g. due to | ||
| # 403 errors because of timed-out upload URLs) | ||
| all_items = list(items) | ||
| large_items = [ | ||
| multipart_items = [ | ||
| it for it in all_items if it.size > ZARR_LARGE_CHUNK_THRESHOLD | ||
| ] | ||
| items_to_upload = [ | ||
| singlepart_items = [ | ||
| it for it in all_items if it.size <= ZARR_LARGE_CHUNK_THRESHOLD | ||
| ] | ||
| # TODO: remove once all servers are > 0.23.0 (i.e. ship | ||
| # dandi-archive#2784) and the multipart zarr upload | ||
| # endpoints are universally available; the capability | ||
| # check would then be unnecessary. | ||
| if multipart_items and not client.supports_zarr_multipart_upload: | ||
| largest = max(multipart_items, key=lambda it: it.size) | ||
| total_large_size = sum(it.size for it in multipart_items) | ||
| raise UploadError( | ||
| f"{asset_path}:" | ||
| f" {pluralize(len(multipart_items), 'Zarr chunk')}" | ||
| f" totaling {total_large_size / 1024**3:.2f} GiB" | ||
| f" exceed the S3 single-part upload limit of" | ||
| f" {ZARR_LARGE_CHUNK_THRESHOLD / 1024**3:.0f} GiB" | ||
| f" (largest: {largest.entry_path}," | ||
| f" {largest.size / 1024**3:.2f} GiB);" | ||
| f" the server does not support multipart zarr" | ||
| f" uploads (dandi-archive#2784)." | ||
| ) | ||
| max_retries = 5 | ||
| retry_count = 0 | ||
| # Add all items to checksum tree (only done once) | ||
| for it in all_items: | ||
| zcc.add_leaf(Path(it.entry_path), it.size, it.digest) | ||
|
|
||
| # Upload chunks above 5GB individually via multipart upload | ||
| for it in large_items: | ||
| # Yield uploading status | ||
| yield from _multipart_upload( | ||
| # Upload chunks above 5GB individually via multipart upload. | ||
| # ``_multipart_upload`` reports ``current`` as bytes within | ||
| # the single chunk being uploaded; translate it to bytes | ||
| # uploaded across the whole zarr so progress reporting | ||
| # stays monotonic for downstream consumers. | ||
| for it in multipart_items: | ||
| cumulative_before = bytes_uploaded | ||
| for status in _multipart_upload( | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. potentially we could change what record we yield in that helper to make it easier here, e.g. could provide "bytes_uploaded" to be provided inside to be taken into account for progress reporting or alike -- I will leave to you to choose |
||
| client=client, | ||
| filepath=it.filepath, | ||
| asset_path=it.entry_path, | ||
|
|
@@ -768,21 +790,26 @@ def mkzarr() -> str: | |
| "chunk_key": it.entry_path, | ||
| }, | ||
| jobs=jobs, | ||
| ) | ||
|
|
||
| # Part is finished uploading, yield final progress | ||
| ): | ||
| if ( | ||
| status.get("status") == "uploading" | ||
| and "current" in status | ||
| ): | ||
| cumulative = cumulative_before + status["current"] | ||
| yield { | ||
| "status": "uploading", | ||
| "progress": 100 * cumulative / to_upload.total_size, | ||
| "current": cumulative, | ||
| } | ||
| else: | ||
| yield status | ||
| changed = True | ||
| bytes_uploaded += it.size | ||
| yield { | ||
| "status": "uploading", | ||
| "progress": 100 * bytes_uploaded / to_upload.total_size, | ||
| "current": bytes_uploaded, | ||
| } | ||
|
|
||
| # Upload the remaining files using single part upload | ||
| while items_to_upload and retry_count <= max_retries: | ||
| while singlepart_items and retry_count <= max_retries: | ||
| # Prepare upload requests for current items | ||
| uploading = [it.upload_request() for it in items_to_upload] | ||
| uploading = [it.upload_request() for it in singlepart_items] | ||
|
|
||
| if retry_count == 0: | ||
| lgr.debug( | ||
|
|
@@ -814,7 +841,7 @@ def mkzarr() -> str: | |
| upload_url=signed_url, | ||
| item=it, | ||
| ) | ||
| for (signed_url, it) in zip(r, items_to_upload) | ||
| for (signed_url, it) in zip(r, singlepart_items) | ||
| ] | ||
|
|
||
| changed = True | ||
|
|
@@ -846,20 +873,20 @@ def mkzarr() -> str: | |
| ) | ||
|
|
||
| # Prepare for next iteration with retry items | ||
| if items_to_upload := retry_items: | ||
| if singlepart_items := retry_items: | ||
| retry_count += 1 | ||
| if retry_count <= max_retries: | ||
| lgr.info( | ||
| "%s: %s got 403 errors, requesting new URLs", | ||
| asset_path, | ||
| pluralize(len(items_to_upload), "file"), | ||
| pluralize(len(singlepart_items), "file"), | ||
| ) | ||
| # Small delay before retry | ||
| sleep(1 * retry_count) | ||
|
|
||
| # Check if we exhausted retries | ||
| if items_to_upload: | ||
| nfiles_str = pluralize(len(items_to_upload), "file") | ||
| if singlepart_items: | ||
| nfiles_str = pluralize(len(singlepart_items), "file") | ||
| raise UploadError( | ||
| f"{asset_path}: failed to upload {nfiles_str} " | ||
| f"after {max_retries} retries due to repeated 403 errors" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,7 +16,6 @@ | |
| from ..consts import ZARR_MIME_TYPE, dandiset_metadata_file | ||
| from ..dandiapi import AssetType, RemoteZarrAsset | ||
| from ..exceptions import UnknownAssetError | ||
| from ..files.bases import _multipart_upload as real_multipart_upload | ||
| from ..files import ( | ||
| BIDSDatasetDescriptionAsset, | ||
| DandisetMetadataFile, | ||
|
|
@@ -30,6 +29,7 @@ | |
| dandi_file, | ||
| find_dandi_files, | ||
| ) | ||
| from ..files.bases import _multipart_upload as real_multipart_upload | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. did you have pre-commit installed for your local clone of dandi-cli @jjnesbitt ? |
||
|
|
||
| lgr = get_logger() | ||
|
|
||
|
|
@@ -540,6 +540,11 @@ def test_upload_zarr_entry_content_type(new_dandiset, tmp_path): | |
| @pytest.mark.ai_generated | ||
| def test_upload_zarr_large_chunks(new_dandiset, tmp_path): | ||
| """Chunks above ZARR_LARGE_CHUNK_THRESHOLD are uploaded via multipart upload.""" | ||
| if not new_dandiset.client.supports_zarr_multipart_upload: | ||
| pytest.skip( | ||
| "Server does not expose the zarr multipart upload endpoints" | ||
| " (dandi-archive#2784)" | ||
| ) | ||
| filepath = tmp_path / "example.zarr" | ||
| zarr.save(filepath, np.arange(1000), np.arange(1000, 0, -1)) | ||
| zf = dandi_file(filepath) | ||
|
|
@@ -571,6 +576,11 @@ def spy_multipart_upload(**kwargs): | |
| @pytest.mark.ai_generated | ||
| def test_upload_zarr_mixed_chunks(new_dandiset, tmp_path): | ||
| """Chunks above ZARR_LARGE_CHUNK_THRESHOLD go multipart; smaller ones use single-part upload.""" | ||
| if not new_dandiset.client.supports_zarr_multipart_upload: | ||
| pytest.skip( | ||
| "Server does not expose the zarr multipart upload endpoints" | ||
| " (dandi-archive#2784)" | ||
| ) | ||
| filepath = tmp_path / "mixed.zarr" | ||
| store = zarr.open_group(str(filepath), mode="w") | ||
| # small array: 10 int64 elements, produces a ~96-byte chunk (compressed) | ||
|
|
@@ -601,9 +611,7 @@ def spy_multipart_upload(**kwargs): | |
| remote_entries = {str(e) for e in asset.iterfiles()} | ||
| # Only chunk files whose on-disk size exceeds the threshold should be multipart-uploaded | ||
| large_chunks = { | ||
| p | ||
| for p in remote_entries | ||
| if (filepath / p).stat().st_size > mixed_threshold | ||
| p for p in remote_entries if (filepath / p).stat().st_size > mixed_threshold | ||
| } | ||
| assert set(multipart_paths) == large_chunks | ||
| # At least one chunk must have gone each path so the test is meaningful | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without name changes code would be hard(er) to read going forward. Cleaner names at least make more sense IMHO for future ourselves