Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 111 additions & 18 deletions cognite/client/_api/files.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import asyncio
import copy
import math
import warnings
from collections import defaultdict
from collections.abc import AsyncIterator, Sequence
Expand All @@ -9,7 +11,14 @@
from urllib.parse import urlparse

from cognite.client._api_client import APIClient
from cognite.client._constants import DEFAULT_LIMIT_READ
from cognite.client._constants import (
DEFAULT_LIMIT_READ,
FILE_DEFAULT_MULTIPART_SIZE,
FILE_MAX_MULTIPART_COUNT,
FILE_MAX_MULTIPART_SIZE,
FILE_MIN_MULTIPART_SIZE,
)
from cognite.client.config import global_config
from cognite.client.data_classes import (
FileMetadata,
FileMetadataFilter,
Expand All @@ -28,7 +37,7 @@
from cognite.client.utils._auxiliary import append_url_path, find_duplicates, unpack_items
from cognite.client.utils._concurrency import AsyncSDKTask, execute_async_tasks
from cognite.client.utils._identifier import Identifier, IdentifierSequence
from cognite.client.utils._uploading import prepare_content_for_upload
from cognite.client.utils._uploading import AsyncFileChunker, prepare_content_for_upload
from cognite.client.utils._validation import process_asset_subtree_ids, process_data_set_ids
from cognite.client.utils.useful_types import SequenceNotStr

Expand Down Expand Up @@ -450,22 +459,43 @@ async def upload_content(
external_id: str | None = None,
instance_id: NodeId | None = None,
) -> FileMetadata:
"""`Upload a file content <https://api-docs.cognite.com/20230101/tag/Files/operation/getUploadLink>`_.
"""`Upload file content <https://api-docs.cognite.com/20230101/tag/Files/operation/getMultiPartUploadLink>`_
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure we want to point this away from the "old" get upload link? The multi-part is a bit more complex.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The upload() function will with the changes in this PR use the files/multiuploadlink (and files/completemultipartupload) endpoint.
So it seems more correct to reference that endpoint in the REST API documentation.
The API docs there contains further details about the file size restrictions, that it provides a uniform API across all cloud environments for CDF, and the steps needed to use multipart upload API endpoints.

However, all those details are really not that relevant for users of upload(), as the function here takes care of the part size calculation, upload parts in parallel, etc, and completing the operation with the completemultipartupload call.


Upload file content from a local file path to a file previously created (initiated) with only metadata.
For files created with FilesAPI.create(), use `external_id`.
For files created with data modeling API using CogniteFileApply, use `instance_id`.
Supports upload of large files (>5 GB), using multipart upload.

Args:
path (Path | str): Path to the file you wish to upload.
path (Path | str): Local file path.
external_id (str | None): The external ID provided by the client. Must be unique within the project.
instance_id (NodeId | None): Instance ID of the file.
instance_id (NodeId | None): Instance ID of the file (CogniteFile).
Returns:
FileMetadata: No description.
"""
path = Path(path)
if path.is_file():
with path.open("rb") as fh:
return await self.upload_content_bytes(fh, external_id=external_id, instance_id=instance_id)
elif path.is_dir():
if path.is_dir():
raise IsADirectoryError(path)
raise FileNotFoundError(path)
if not path.is_file():
raise FileNotFoundError(path)

upload_semaphore = asyncio.Semaphore(global_config.concurrency_settings.general.write)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs for multi-part say:

The parts can optionally be uploaded in parallel, preferably on a subset of parts at a time, for example maximum 3 concurrent PUT operations.

Should we introduce a separate concurrency_settings for file upload/download concurrency?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could be relevant yes, but then probably in a separate PR?
Or should the httpx resource limits be used for that?
https://www.python-httpx.org/advanced/resource-limits/

I think it could be potentially also relevant to support network bandwidth throttling,
for throttling the upload of files (also for download, and for other data?).
Throttling can perhaps be implemented with existing async or httpx extensions.


file_size = path.stat().st_size
part_size, num_parts = self.calculate_part_size_and_count(file_size)
session = await self.multipart_upload_content_session(
parts=num_parts, external_id=external_id, instance_id=instance_id
)

async with session:

async def upload_part(part_no: int) -> None:
async with upload_semaphore:
await self._upload_file_part(session, path, part_no, part_size, file_size)

await asyncio.gather(*(upload_part(i) for i in range(num_parts)))

return session.file_metadata

async def upload(
self,
Expand All @@ -486,7 +516,14 @@ async def upload(
recursive: bool = False,
overwrite: bool = False,
) -> FileMetadata | FileMetadataList:
"""`Upload a file <https://api-docs.cognite.com/20230101/tag/Files/operation/initFileUpload>`_.
"""`Upload a file or directory <https://api-docs.cognite.com/20230101/tag/Files/operation/initMultiPartUpload>`_.

Creates files in files API with metadata and uploads file content.

Note:
If path is a directory, this method will upload all files in that directory. Use `recursive=True` for subdirectories as well.

Supports upload of large files (>5 GB), using multipart upload.

Args:
path (Path | str): Path to the file you wish to upload. If path is a directory, this method will upload all files in that directory.
Expand Down Expand Up @@ -566,32 +603,86 @@ async def upload(
source_modified_time=source_modified_time,
security_categories=security_categories,
)

upload_semaphore = asyncio.Semaphore(global_config.concurrency_settings.general.write)
path = Path(path)
if path.is_file():
if not name:
file_metadata.name = path.name
return await self._upload_file_from_path(file_metadata, path, overwrite)
return await self._upload_file_from_path(file_metadata, path, overwrite, upload_semaphore)

elif not path.is_dir():
raise FileNotFoundError(path)

tasks: list[AsyncSDKTask] = []
file_iter = path.rglob("*") if recursive else path.iterdir()
for file in file_iter:
if file.is_file():
file_metadata = copy.copy(file_metadata)
file_metadata.name = file.name
tasks.append(AsyncSDKTask(self._upload_file_from_path, file_metadata, file, overwrite))
tasks.append(
AsyncSDKTask(self._upload_file_from_path, file_metadata, file, overwrite, upload_semaphore)
)

tasks_summary = await execute_async_tasks(tasks)
tasks_summary.raise_compound_exception_if_failed_tasks(task_unwrap_fn=lambda task: task[0].name)
return FileMetadataList(tasks_summary.results)

async def _upload_file_from_path(
self, file_metadata: FileMetadataWrite, path: Path, overwrite: bool
self, file_metadata: FileMetadataWrite, path: Path, overwrite: bool, upload_semaphore: asyncio.Semaphore
) -> FileMetadata:
file_size = path.stat().st_size
part_size, num_parts = self.calculate_part_size_and_count(file_size)
session = await self.multipart_upload_session(
parts=num_parts,
overwrite=overwrite,
**file_metadata.dump(camel_case=False),
)

async with session:

async def upload_part(part_no: int) -> None:
async with upload_semaphore:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need this semaphore, already being auto-used.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the semaphore the _upload_file_part calls gets out of control, running more calls than concurrency_limit.

I added more test cases checking for this, see the new test_upload_content_semaphore_limits_concurrent_parts
and
test_upload_semaphore_limits_concurrent_parts
in test_files.py.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's interesting 🤔 Every API call in the SDK will at some point hit AsyncHTTPClientWithRetry._with_retry:

async with concurrency_control:
    if headers is not None:
        self.refresh_auth_header(headers)
    response = await coro_factory()

From how I read your current implementation, each successful upload / PUT is actually consuming 2 X from the semaphore.

await self._upload_file_part(session, path, part_no, part_size, file_size)

await asyncio.gather(*(upload_part(i) for i in range(num_parts)))

return session.file_metadata

def calculate_part_size_and_count(self, file_size: int) -> tuple[int, int]:
"""Calculate part size and count for a multipart upload, for a given file size.

See <https://api-docs.cognite.com/20230101/tag/Files/operation/initMultiPartUpload>
for more details on multipart upload and the constraints on part size and count.

Args:
file_size (int): The total file size in bytes.

Returns:
tuple[int, int]: A tuple of (part_size, num_parts).
"""
if file_size > FILE_MAX_MULTIPART_COUNT * FILE_MAX_MULTIPART_SIZE:
raise ValueError(
f"File size {file_size} exceeds the maximum supported size of {FILE_MAX_MULTIPART_COUNT * FILE_MAX_MULTIPART_SIZE} bytes for multipart upload."
)
if file_size < FILE_MIN_MULTIPART_SIZE:
return FILE_MIN_MULTIPART_SIZE, 1
uncapped_part_size = max(FILE_DEFAULT_MULTIPART_SIZE, math.ceil(file_size / FILE_MAX_MULTIPART_COUNT))
part_size = min(uncapped_part_size, FILE_MAX_MULTIPART_SIZE)
num_parts = math.ceil(file_size / part_size)
return part_size, num_parts

async def _upload_file_part(
self,
session: FileMultipartUploadSession,
path: Path,
part_no: int,
part_size: int,
file_size: int,
) -> None:
offset = part_no * part_size
read_size = min(part_size, file_size - offset)
with path.open("rb") as fh:
return await self.upload_bytes(fh, overwrite=overwrite, **file_metadata.dump(camel_case=False))
await session.upload_part_async(part_no, AsyncFileChunker(fh, offset=offset, size=read_size))
Comment thread
haakonvt marked this conversation as resolved.

async def upload_content_bytes(
self,
Expand Down Expand Up @@ -940,15 +1031,17 @@ async def multipart_upload_content_session(
FileMetadata._load(returned_file_metadata), upload_urls, upload_id, self._cognite_client
)

async def _upload_multipart_part(self, upload_url: str, content: str | bytes | BinaryIO) -> None:
async def _upload_multipart_part(
self, upload_url: str, content: str | bytes | BinaryIO | AsyncIterator[bytes]
) -> None:
"""Upload part of a file to an upload URL returned from `multipart_upload_session`.

Note:
If `content` does not somehow expose its length, this method may not work on Azure or AWS.

Args:
upload_url (str): URL to upload file chunk to.
content (str | bytes | BinaryIO): The content to upload.
content (str | bytes | BinaryIO | AsyncIterator[bytes]): The content to upload.
"""
headers = {"accept": "*/*"}
file_size, file_content = prepare_content_for_upload(content)
Expand Down
6 changes: 6 additions & 0 deletions cognite/client/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,9 @@ def __bool__(self) -> Literal[False]:
DATA_MODELING_DEFAULT_LIMIT_READ = 10
DEFAULT_DATAPOINTS_CHUNK_SIZE = 100_000
_RUNNING_IN_BROWSER = IN_BROWSER

# Files API constants
FILE_MIN_MULTIPART_SIZE = 5 * 1024 * 1024 # 5 MiB
FILE_MAX_MULTIPART_SIZE = 4000 * 1024 * 1024 # 4000 MiB
FILE_DEFAULT_MULTIPART_SIZE = 50 * 1024 * 1024 # 50 MiB
FILE_MAX_MULTIPART_COUNT = 250
26 changes: 20 additions & 6 deletions cognite/client/_sync_api/files.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""
===============================================================================
9775a0ca6d02913bc835831ac35662df
69bea850b9412d5e874eac1bc0dbc0f0
This file is auto-generated from the Async API modules, - do not edit manually!
===============================================================================
"""
Expand All @@ -12,7 +12,9 @@
from typing import Any, BinaryIO, Literal, overload

from cognite.client import AsyncCogniteClient
from cognite.client._constants import DEFAULT_LIMIT_READ
from cognite.client._constants import (
DEFAULT_LIMIT_READ,
)
from cognite.client._sync_api_client import SyncAPIClient
from cognite.client.data_classes import (
FileMetadata,
Expand Down Expand Up @@ -425,12 +427,17 @@ def upload_content(
self, path: Path | str, external_id: str | None = None, instance_id: NodeId | None = None
) -> FileMetadata:
"""
`Upload a file content <https://api-docs.cognite.com/20230101/tag/Files/operation/getUploadLink>`_.
`Upload file content <https://api-docs.cognite.com/20230101/tag/Files/operation/getMultiPartUploadLink>`_

Upload file content from a local file path to a file previously created (initiated) with only metadata.
For files created with FilesAPI.create(), use `external_id`.
For files created with data modeling API using CogniteFileApply, use `instance_id`.
Supports upload of large files (>5 GB), using multipart upload.

Args:
path (Path | str): Path to the file you wish to upload.
path (Path | str): Local file path.
external_id (str | None): The external ID provided by the client. Must be unique within the project.
instance_id (NodeId | None): Instance ID of the file.
instance_id (NodeId | None): Instance ID of the file (CogniteFile).
Returns:
FileMetadata: No description.
"""
Expand Down Expand Up @@ -458,7 +465,14 @@ def upload(
overwrite: bool = False,
) -> FileMetadata | FileMetadataList:
"""
`Upload a file <https://api-docs.cognite.com/20230101/tag/Files/operation/initFileUpload>`_.
`Upload a file or directory <https://api-docs.cognite.com/20230101/tag/Files/operation/initMultiPartUpload>`_.

Creates files in files API with metadata and uploads file content.

Note:
If path is a directory, this method will upload all files in that directory. Use `recursive=True` for subdirectories as well.

Supports upload of large files (>5 GB), using multipart upload.

Args:
path (Path | str): Path to the file you wish to upload. If path is a directory, this method will upload all files in that directory.
Expand Down
8 changes: 4 additions & 4 deletions cognite/client/data_classes/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import warnings
from abc import ABC
from collections.abc import Sequence
from collections.abc import AsyncIterator, Sequence
from contextlib import AbstractAsyncContextManager, AbstractContextManager
from types import TracebackType
from typing import TYPE_CHECKING, Any, BinaryIO, Literal, TypeVar
Expand Down Expand Up @@ -586,15 +586,15 @@ def __init__(
self._cognite_client = cognite_client
self._upload_is_finalized = False

async def upload_part_async(self, part_no: int, content: str | bytes | BinaryIO) -> None:
async def upload_part_async(self, part_no: int, content: str | bytes | BinaryIO | AsyncIterator[bytes]) -> None:
"""Upload part of a file.

Note:
If `content` does not somehow expose its length, this method may not work on Azure or AWS.

Args:
part_no (int): Which part number this is, must be between 0 and `parts` given to `multipart_upload_session`
content (str | bytes | BinaryIO): The content to upload.
content (str | bytes | BinaryIO | AsyncIterator[bytes]): The content to upload.
"""
if part_no < 0 or part_no > len(self._uploaded_urls):
raise IndexError(f"Index out of range: {part_no}, must be between 0 and {len(self._uploaded_urls)}")
Expand All @@ -605,7 +605,7 @@ async def upload_part_async(self, part_no: int, content: str | bytes | BinaryIO)
self._uploaded_urls[part_no] = True

@copy_doc_from_async(upload_part_async)
def upload_part(self, part_no: int, content: str | bytes | BinaryIO) -> None:
def upload_part(self, part_no: int, content: str | bytes | BinaryIO | AsyncIterator[bytes]) -> None:
return run_sync(self.upload_part_async(part_no, content))

def _check_errors_before_completing(self, exc_type: type[BaseException] | None) -> None:
Expand Down
18 changes: 14 additions & 4 deletions cognite/client/utils/_uploading.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ def prepare_content_for_upload(
case str():
content = content.encode("utf-8")
file_size = len(content)
case AsyncFileChunker():
file_size = content.size
case AsyncIterable():
file_size = None
case StringIO():
Expand Down Expand Up @@ -49,28 +51,36 @@ class AsyncFileChunker(AsyncIterator[bytes]):

Args:
file_handle (BinaryIO): An open file handle.
offset (int): Byte offset to seek to before reading. Defaults to 0 (beginning of file).
size (int | None): Maximum number of bytes to yield in total. If None, reads until EOF.
"""

CHUNK_SIZE = 64 * 1024 # 64 KiB chunks by default, copying httpx default

def __init__(self, file_handle: BinaryIO) -> None:
def __init__(self, file_handle: BinaryIO, *, offset: int = 0, size: int | None = None) -> None:
from cognite.client import global_config

self._file_handle = file_handle
self._chunk_size = global_config.file_upload_chunk_size or self.CHUNK_SIZE
self._remaining = size
self.size = size # exposed so prepare_content_for_upload can set Content-Length
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exposing size is a great addition


# Read from beginning of file if possible:
if hasattr(self._file_handle, "seek"):
try:
self._file_handle.seek(0)
self._file_handle.seek(offset)
except UnsupportedOperation:
pass

def __aiter__(self) -> AsyncIterator[bytes]:
return self

async def __anext__(self) -> bytes:
if chunk := self._file_handle.read(self._chunk_size):
to_read = self._chunk_size if self._remaining is None else min(self._chunk_size, self._remaining)
if to_read == 0:
raise StopAsyncIteration
if chunk := self._file_handle.read(to_read):
if self._remaining is not None:
self._remaining -= len(chunk)
return chunk
raise StopAsyncIteration

Expand Down
Loading
Loading