-
Notifications
You must be signed in to change notification settings - Fork 37
feat(files): Support upload of files larger than 5GB in all CDF cloud environments #2566
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
e29b789
ad5261c
5cb6375
ac23041
02f0385
620b29e
3aee59b
01b1aef
66f3ec6
dfac491
a9b17e4
b74cc63
d2f83fa
27c7025
5e16d73
aad03eb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,8 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
| import copy | ||
| import math | ||
| import warnings | ||
| from collections import defaultdict | ||
| from collections.abc import AsyncIterator, Sequence | ||
|
|
@@ -9,7 +11,14 @@ | |
| from urllib.parse import urlparse | ||
|
|
||
| from cognite.client._api_client import APIClient | ||
| from cognite.client._constants import DEFAULT_LIMIT_READ | ||
| from cognite.client._constants import ( | ||
| DEFAULT_LIMIT_READ, | ||
| FILE_DEFAULT_MULTIPART_SIZE, | ||
| FILE_MAX_MULTIPART_COUNT, | ||
| FILE_MAX_MULTIPART_SIZE, | ||
| FILE_MIN_MULTIPART_SIZE, | ||
| ) | ||
| from cognite.client.config import global_config | ||
| from cognite.client.data_classes import ( | ||
| FileMetadata, | ||
| FileMetadataFilter, | ||
|
|
@@ -28,7 +37,7 @@ | |
| from cognite.client.utils._auxiliary import append_url_path, find_duplicates, unpack_items | ||
| from cognite.client.utils._concurrency import AsyncSDKTask, execute_async_tasks | ||
| from cognite.client.utils._identifier import Identifier, IdentifierSequence | ||
| from cognite.client.utils._uploading import prepare_content_for_upload | ||
| from cognite.client.utils._uploading import AsyncFileChunker, prepare_content_for_upload | ||
| from cognite.client.utils._validation import process_asset_subtree_ids, process_data_set_ids | ||
| from cognite.client.utils.useful_types import SequenceNotStr | ||
|
|
||
|
|
@@ -450,22 +459,43 @@ async def upload_content( | |
| external_id: str | None = None, | ||
| instance_id: NodeId | None = None, | ||
| ) -> FileMetadata: | ||
| """`Upload a file content <https://api-docs.cognite.com/20230101/tag/Files/operation/getUploadLink>`_. | ||
| """`Upload file content <https://api-docs.cognite.com/20230101/tag/Files/operation/getMultiPartUploadLink>`_ | ||
|
|
||
| Upload file content from a local file path to a file previously created (initiated) with only metadata. | ||
| For files created with FilesAPI.create(), use `external_id`. | ||
| For files created with data modeling API using CogniteFileApply, use `instance_id`. | ||
| Supports upload of large files (>5 GB), using multipart upload. | ||
|
|
||
| Args: | ||
| path (Path | str): Path to the file you wish to upload. | ||
| path (Path | str): Local file path. | ||
| external_id (str | None): The external ID provided by the client. Must be unique within the project. | ||
| instance_id (NodeId | None): Instance ID of the file. | ||
| instance_id (NodeId | None): Instance ID of the file (CogniteFile). | ||
| Returns: | ||
| FileMetadata: No description. | ||
| """ | ||
| path = Path(path) | ||
| if path.is_file(): | ||
| with path.open("rb") as fh: | ||
| return await self.upload_content_bytes(fh, external_id=external_id, instance_id=instance_id) | ||
| elif path.is_dir(): | ||
| if path.is_dir(): | ||
| raise IsADirectoryError(path) | ||
| raise FileNotFoundError(path) | ||
| if not path.is_file(): | ||
| raise FileNotFoundError(path) | ||
|
|
||
| upload_semaphore = asyncio.Semaphore(global_config.concurrency_settings.general.write) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The docs for multi-part say:
Should we introduce a separate
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That could be relevant yes, but then probably in a separate PR? I think it could be potentially also relevant to support network bandwidth throttling, |
||
|
|
||
| file_size = path.stat().st_size | ||
| part_size, num_parts = self.calculate_part_size_and_count(file_size) | ||
| session = await self.multipart_upload_content_session( | ||
| parts=num_parts, external_id=external_id, instance_id=instance_id | ||
| ) | ||
|
|
||
| async with session: | ||
|
|
||
| async def upload_part(part_no: int) -> None: | ||
| async with upload_semaphore: | ||
| await self._upload_file_part(session, path, part_no, part_size, file_size) | ||
|
|
||
| await asyncio.gather(*(upload_part(i) for i in range(num_parts))) | ||
|
|
||
| return session.file_metadata | ||
|
|
||
| async def upload( | ||
| self, | ||
|
|
@@ -486,7 +516,14 @@ async def upload( | |
| recursive: bool = False, | ||
| overwrite: bool = False, | ||
| ) -> FileMetadata | FileMetadataList: | ||
| """`Upload a file <https://api-docs.cognite.com/20230101/tag/Files/operation/initFileUpload>`_. | ||
| """`Upload a file or directory <https://api-docs.cognite.com/20230101/tag/Files/operation/initMultiPartUpload>`_. | ||
|
|
||
| Creates files in files API with metadata and uploads file content. | ||
|
|
||
| Note: | ||
| If path is a directory, this method will upload all files in that directory. Use `recursive=True` for subdirectories as well. | ||
|
|
||
| Supports upload of large files (>5 GB), using multipart upload. | ||
|
|
||
| Args: | ||
| path (Path | str): Path to the file you wish to upload. If path is a directory, this method will upload all files in that directory. | ||
|
|
@@ -566,32 +603,86 @@ async def upload( | |
| source_modified_time=source_modified_time, | ||
| security_categories=security_categories, | ||
| ) | ||
|
|
||
| upload_semaphore = asyncio.Semaphore(global_config.concurrency_settings.general.write) | ||
| path = Path(path) | ||
| if path.is_file(): | ||
| if not name: | ||
| file_metadata.name = path.name | ||
| return await self._upload_file_from_path(file_metadata, path, overwrite) | ||
| return await self._upload_file_from_path(file_metadata, path, overwrite, upload_semaphore) | ||
|
|
||
| elif not path.is_dir(): | ||
| raise FileNotFoundError(path) | ||
|
|
||
| tasks: list[AsyncSDKTask] = [] | ||
| file_iter = path.rglob("*") if recursive else path.iterdir() | ||
| for file in file_iter: | ||
| if file.is_file(): | ||
| file_metadata = copy.copy(file_metadata) | ||
| file_metadata.name = file.name | ||
| tasks.append(AsyncSDKTask(self._upload_file_from_path, file_metadata, file, overwrite)) | ||
| tasks.append( | ||
| AsyncSDKTask(self._upload_file_from_path, file_metadata, file, overwrite, upload_semaphore) | ||
| ) | ||
|
|
||
| tasks_summary = await execute_async_tasks(tasks) | ||
| tasks_summary.raise_compound_exception_if_failed_tasks(task_unwrap_fn=lambda task: task[0].name) | ||
| return FileMetadataList(tasks_summary.results) | ||
|
|
||
| async def _upload_file_from_path( | ||
| self, file_metadata: FileMetadataWrite, path: Path, overwrite: bool | ||
| self, file_metadata: FileMetadataWrite, path: Path, overwrite: bool, upload_semaphore: asyncio.Semaphore | ||
| ) -> FileMetadata: | ||
| file_size = path.stat().st_size | ||
| part_size, num_parts = self.calculate_part_size_and_count(file_size) | ||
| session = await self.multipart_upload_session( | ||
| parts=num_parts, | ||
| overwrite=overwrite, | ||
| **file_metadata.dump(camel_case=False), | ||
| ) | ||
|
|
||
| async with session: | ||
|
|
||
| async def upload_part(part_no: int) -> None: | ||
| async with upload_semaphore: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't need this semaphore, already being auto-used.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Without the semaphore the _upload_file_part calls gets out of control, running more calls than concurrency_limit. I added more test cases checking for this, see the new
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's interesting 🤔 Every API call in the SDK will at some point hit async with concurrency_control:
if headers is not None:
self.refresh_auth_header(headers)
response = await coro_factory()From how I read your current implementation, each successful upload / PUT is actually consuming 2 X from the semaphore. |
||
| await self._upload_file_part(session, path, part_no, part_size, file_size) | ||
|
|
||
| await asyncio.gather(*(upload_part(i) for i in range(num_parts))) | ||
|
|
||
| return session.file_metadata | ||
|
|
||
| def calculate_part_size_and_count(self, file_size: int) -> tuple[int, int]: | ||
| """Calculate part size and count for a multipart upload, for a given file size. | ||
|
|
||
| See <https://api-docs.cognite.com/20230101/tag/Files/operation/initMultiPartUpload> | ||
| for more details on multipart upload and the constraints on part size and count. | ||
|
|
||
| Args: | ||
| file_size (int): The total file size in bytes. | ||
|
|
||
| Returns: | ||
| tuple[int, int]: A tuple of (part_size, num_parts). | ||
| """ | ||
| if file_size > FILE_MAX_MULTIPART_COUNT * FILE_MAX_MULTIPART_SIZE: | ||
| raise ValueError( | ||
| f"File size {file_size} exceeds the maximum supported size of {FILE_MAX_MULTIPART_COUNT * FILE_MAX_MULTIPART_SIZE} bytes for multipart upload." | ||
| ) | ||
| if file_size < FILE_MIN_MULTIPART_SIZE: | ||
| return FILE_MIN_MULTIPART_SIZE, 1 | ||
| uncapped_part_size = max(FILE_DEFAULT_MULTIPART_SIZE, math.ceil(file_size / FILE_MAX_MULTIPART_COUNT)) | ||
| part_size = min(uncapped_part_size, FILE_MAX_MULTIPART_SIZE) | ||
| num_parts = math.ceil(file_size / part_size) | ||
| return part_size, num_parts | ||
|
|
||
| async def _upload_file_part( | ||
| self, | ||
| session: FileMultipartUploadSession, | ||
| path: Path, | ||
| part_no: int, | ||
| part_size: int, | ||
| file_size: int, | ||
| ) -> None: | ||
| offset = part_no * part_size | ||
| read_size = min(part_size, file_size - offset) | ||
| with path.open("rb") as fh: | ||
| return await self.upload_bytes(fh, overwrite=overwrite, **file_metadata.dump(camel_case=False)) | ||
| await session.upload_part_async(part_no, AsyncFileChunker(fh, offset=offset, size=read_size)) | ||
|
haakonvt marked this conversation as resolved.
|
||
|
|
||
| async def upload_content_bytes( | ||
| self, | ||
|
|
@@ -940,15 +1031,17 @@ async def multipart_upload_content_session( | |
| FileMetadata._load(returned_file_metadata), upload_urls, upload_id, self._cognite_client | ||
| ) | ||
|
|
||
| async def _upload_multipart_part(self, upload_url: str, content: str | bytes | BinaryIO) -> None: | ||
| async def _upload_multipart_part( | ||
| self, upload_url: str, content: str | bytes | BinaryIO | AsyncIterator[bytes] | ||
| ) -> None: | ||
| """Upload part of a file to an upload URL returned from `multipart_upload_session`. | ||
|
|
||
| Note: | ||
| If `content` does not somehow expose its length, this method may not work on Azure or AWS. | ||
|
|
||
| Args: | ||
| upload_url (str): URL to upload file chunk to. | ||
| content (str | bytes | BinaryIO): The content to upload. | ||
| content (str | bytes | BinaryIO | AsyncIterator[bytes]): The content to upload. | ||
| """ | ||
| headers = {"accept": "*/*"} | ||
| file_size, file_content = prepare_content_for_upload(content) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,8 @@ def prepare_content_for_upload( | |
| case str(): | ||
| content = content.encode("utf-8") | ||
| file_size = len(content) | ||
| case AsyncFileChunker(): | ||
| file_size = content.size | ||
| case AsyncIterable(): | ||
| file_size = None | ||
| case StringIO(): | ||
|
|
@@ -49,28 +51,36 @@ class AsyncFileChunker(AsyncIterator[bytes]): | |
|
|
||
| Args: | ||
| file_handle (BinaryIO): An open file handle. | ||
| offset (int): Byte offset to seek to before reading. Defaults to 0 (beginning of file). | ||
| size (int | None): Maximum number of bytes to yield in total. If None, reads until EOF. | ||
| """ | ||
|
|
||
| CHUNK_SIZE = 64 * 1024 # 64 KiB chunks by default, copying httpx default | ||
|
|
||
| def __init__(self, file_handle: BinaryIO) -> None: | ||
| def __init__(self, file_handle: BinaryIO, *, offset: int = 0, size: int | None = None) -> None: | ||
| from cognite.client import global_config | ||
|
|
||
| self._file_handle = file_handle | ||
| self._chunk_size = global_config.file_upload_chunk_size or self.CHUNK_SIZE | ||
| self._remaining = size | ||
| self.size = size # exposed so prepare_content_for_upload can set Content-Length | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exposing |
||
|
|
||
| # Read from beginning of file if possible: | ||
| if hasattr(self._file_handle, "seek"): | ||
| try: | ||
| self._file_handle.seek(0) | ||
| self._file_handle.seek(offset) | ||
| except UnsupportedOperation: | ||
| pass | ||
|
|
||
| def __aiter__(self) -> AsyncIterator[bytes]: | ||
| return self | ||
|
|
||
| async def __anext__(self) -> bytes: | ||
| if chunk := self._file_handle.read(self._chunk_size): | ||
| to_read = self._chunk_size if self._remaining is None else min(self._chunk_size, self._remaining) | ||
| if to_read == 0: | ||
| raise StopAsyncIteration | ||
| if chunk := self._file_handle.read(to_read): | ||
| if self._remaining is not None: | ||
| self._remaining -= len(chunk) | ||
| return chunk | ||
| raise StopAsyncIteration | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure we want to point this away from the "old" get upload link? The multi-part is a bit more complex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The upload() function will with the changes in this PR use the files/multiuploadlink (and files/completemultipartupload) endpoint.
So it seems more correct to reference that endpoint in the REST API documentation.
The API docs there contains further details about the file size restrictions, that it provides a uniform API across all cloud environments for CDF, and the steps needed to use multipart upload API endpoints.
However, all those details are really not that relevant for users of upload(), as the function here takes care of the part size calculation, upload parts in parallel, etc, and completing the operation with the completemultipartupload call.