feat(files): Support upload of files larger than 5GB in all CDF cloud environments#2566
feat(files): Support upload of files larger than 5GB in all CDF cloud environments#2566
Conversation
…GiB files in all CDF cloud environments
There was a problem hiding this comment.
Code Review
This pull request implements multipart upload functionality for the Files API, allowing large files to be uploaded in parallel parts. The changes include logic for calculating part sizes, managing upload sessions, and updated unit tests. Feedback identifies several necessary improvements: using a semaphore to limit concurrent uploads and prevent memory exhaustion, moving blocking file I/O operations to a thread pool, and enforcing a 1TB limit on file sizes. Additionally, corrections are needed for a broken docstring link and the use of Any type hints in test fixtures to comply with the project's typing standards.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #2566 +/- ##
=======================================
Coverage 93.47% 93.47%
=======================================
Files 480 480
Lines 48318 48383 +65
=======================================
+ Hits 45164 45227 +63
- Misses 3154 3156 +2
🚀 New features to boost your workflow:
|
… improving file size detection for AsyncIterable
…g of concurrent uploads
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request implements multipart upload support for the Files API, introducing concurrency control via semaphores and logic for calculating optimal part sizes. It also updates AsyncFileChunker to support offset-based reading and includes comprehensive unit tests for the new multipart flow. Feedback was provided regarding a type ignore marker that should be removed once related type hints are updated to maintain the repository's strong typing standards.
| source_modified_time=source_modified_time, | ||
| security_categories=security_categories, | ||
| ) | ||
| from cognite.client import global_config |
There was a problem hiding this comment.
I'm seeing a few lazy evaluated imports, could we keep these at the top of the file - or is there something preventing that?
There was a problem hiding this comment.
I also noticed that during development, as I was following a pattern from other code that used global_config.
If the same line is moved to top of the file along with other imports, it leads to a circular import error:
ImportError: cannot import name 'global_config' from partially initialized module 'cognite.client' (most likely due to a circular import)
There was a problem hiding this comment.
I saw the from cognite.client import global_config is used in a lot of places within functions in the SDK. So I thought that was the practice to avoid the circular imports.
But I see that using the more specific from cognite.client.config import global_config (adding .config) solves the circular import problem.
There was a problem hiding this comment.
Moved the import to top of the file in later commits.
haakonvt
left a comment
There was a problem hiding this comment.
Excellent changes - great QOL improvement! Consider if we can reuse execute_async_tasks_with_fail_fast or execute_async_tasks from cognite/client/utils/_concurrency.py. A few other suggestion/remarks:
| MIN_MULTIPART_SIZE = 5 * 1024 * 1024 # 5 MiB | ||
| MAX_MULTIPART_SIZE = 4000 * 1024 * 1024 # 4000 MiB | ||
| DEFAULT_MULTIPART_SIZE = 50 * 1024 * 1024 # 50 MiB | ||
| MAX_MULTIPART_PARTS = 250 |
There was a problem hiding this comment.
Let's add "FILE" into the constant names to make them self-explanatory
There was a problem hiding this comment.
Fixed in newer commits
| instance_id: NodeId | None = None, | ||
| ) -> FileMetadata: | ||
| """`Upload a file content <https://api-docs.cognite.com/20230101/tag/Files/operation/getUploadLink>`_ | ||
| """`Upload file content <https://api-docs.cognite.com/20230101/tag/Files/operation/getMultiPartUploadLink>`_ |
There was a problem hiding this comment.
Are we sure we want to point this away from the "old" get upload link? The multi-part is a bit more complex.
There was a problem hiding this comment.
The upload() function will with the changes in this PR use the files/multiuploadlink (and files/completemultipartupload) endpoint.
So it seems more correct to reference that endpoint in the REST API documentation.
The API docs there contains further details about the file size restrictions, that it provides a uniform API across all cloud environments for CDF, and the steps needed to use multipart upload API endpoints.
However, all those details are really not that relevant for users of upload(), as the function here takes care of the part size calculation, upload parts in parallel, etc, and completing the operation with the completemultipartupload call.
|
|
||
| from cognite.client import global_config | ||
|
|
||
| upload_semaphore = asyncio.Semaphore(global_config.concurrency_settings.general.write) |
There was a problem hiding this comment.
The docs for multi-part say:
The parts can optionally be uploaded in parallel, preferably on a subset of parts at a time, for example maximum 3 concurrent PUT operations.
Should we introduce a separate concurrency_settings for file upload/download concurrency?
There was a problem hiding this comment.
That could be relevant yes, but then probably in a separate PR?
Or should the httpx resource limits be used for that?
https://www.python-httpx.org/advanced/resource-limits/
I think it could be potentially also relevant to support network bandwidth throttling,
for throttling the upload of files (also for download, and for other data?).
Throttling can perhaps be implemented with existing async or httpx extensions.
| self._file_handle = file_handle | ||
| self._chunk_size = global_config.file_upload_chunk_size or self.CHUNK_SIZE | ||
| self._remaining = size | ||
| self.size = size # exposed so prepare_content_for_upload can set Content-Length |
There was a problem hiding this comment.
Exposing size is a great addition
| file_size = len(content) | ||
| case AsyncIterable(): | ||
| file_size = None | ||
| file_size = getattr(content, "size", None) |
There was a problem hiding this comment.
Suggestion:
case AsyncFileChunker():
file_size = content.size
case AsyncIterable():
file_size = NoneThere was a problem hiding this comment.
Fixed in later commits.
|
|
||
| Creates files in files API with metadata and uploads file content. | ||
|
|
||
| If path is a directory, this method will upload all files in that directory. Use `recursive=True` for subdirectories as well. |
There was a problem hiding this comment.
| If path is a directory, this method will upload all files in that directory. Use `recursive=True` for subdirectories as well. | |
| Tip: | |
| If path is a directory, this method will upload all files in that directory. Use `recursive=True` for subdirectories as well. |
or Note:
| async with session: | ||
|
|
||
| async def upload_part(part_no: int) -> None: | ||
| async with upload_semaphore: |
There was a problem hiding this comment.
Don't need this semaphore, already being auto-used.
There was a problem hiding this comment.
Without the semaphore the _upload_file_part calls gets out of control, running more calls than concurrency_limit.
I added more test cases checking for this, see the new test_upload_content_semaphore_limits_concurrent_parts
and
test_upload_semaphore_limits_concurrent_parts
in test_files.py.
| read_size = min(part_size, file_size - offset) | ||
| with path.open("rb") as fh: | ||
| return await self.upload_bytes(fh, overwrite=overwrite, **file_metadata.dump(camel_case=False)) | ||
| await session.upload_part_async(part_no, AsyncFileChunker(fh, offset=offset, size=read_size)) |
There was a problem hiding this comment.
I guess the actual file read is blocking, but I think we should just accept that.
There was a problem hiding this comment.
Yes, the synchronous path.open is still used.
mmap or asyncio could be used, but I doubt it really would improve things.
Changed to import global_config from the cognite.client.config, to break the circular import problem using the public export in cognite.client.
Proves that upload_semaphore is needed
Description
The SDK functions
upload()for single or directory upload, andupload_content()for CogniteFile based files, is currently limited to only upload files smaller than 5GB in AWS and Azure CDF clusters.With the changes in this PR the
upload()andupload_contentfunctions will raise this to support upload of files up to (about) 1000 GiB in size, as documented in the Files API docs (https://api-docs.cognite.com/20230101/tag/Files/operation/initMultiPartUpload).Internally the functions will now use the multipart upload functions. Multipart upload enables upload of files larger than 5 GB in all CDF Cloud environments (Azure, AWS, and Google). In addition upload performance can be increased, especially for single large files.
With this change, client scripts or notebooks using upload() and upload_content() will benefit implicitly and not have to port their scripts to use the (more low level) multipart upload functions in the SDK.
Note that the other non-file related upload functions are not updated, as they are meant for streaming data, where data is not parallell readable from different offsets, and size is not generally known up front.
A semaphore was added to ensure that the async parallel uploads adhere to the concurrency setting in the configuration. The current implementation can fail with "too many open files" when uploading directories with many files.
To test the functionality, a custom Python test script was created to test upload performance for few large, and also many small files. The main test case uploads a folder with 4 files, where 2 of the files were large, above 5 GB.
In total 30703.83 MB of data is uploaded, per test run.
The file upload script mainly calls
client.files.upload(DIRECTORY_NAME, recursive=True).With the current latest released 8.0.7 version, the upload() of large files works in CDF Google clusters, but fails in AWS and Azure clusters.
With the fixes in this PR, the script succeeds in all 3 CDF cloud environments.
The upload script achieves high upload rate on all the cloud environments, with the 1000 Mbps Ethernet connection I am using.
Comparison of old and new behaviour and performance for upload of folder with a few large files.
(SDK 8.0.7)
(with this PR)
Checklist:
If a new method has been added it should be referenced in cognite.rst in order to generate docs based on its docstring.