Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions mapillary_tools/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ def clear_expired(self) -> list[str]:

return expired_keys

def keys(self):
with self._lock:
with dbm.open(self._file, flag="c") as db:
return db.keys()

def _is_expired(self, payload: JSONDict) -> bool:
expires_at = payload.get("expires_at")
if isinstance(expires_at, (int, float)):
Expand Down
141 changes: 81 additions & 60 deletions mapillary_tools/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import concurrent.futures
import dataclasses
import hashlib
import io
import json
import logging
Expand Down Expand Up @@ -56,6 +57,9 @@ class UploadOptions:
user_items: config.UserItem
chunk_size: int = int(constants.UPLOAD_CHUNK_SIZE_MB * 1024 * 1024)
num_upload_workers: int = constants.MAX_IMAGE_UPLOAD_WORKERS
# When set, upload cache will be read/write there
# This option is exposed for testing purpose. In PROD, the path is calculated based on envvar and user_items
upload_cache_path: Path | None = None
dry_run: bool = False
nofinish: bool = False
noresume: bool = False
Expand Down Expand Up @@ -471,7 +475,7 @@ def _zip_sequence_fp(
# Arcname should be unique, the name does not matter
arcname = f"{idx}.jpg"
zipinfo = zipfile.ZipInfo(arcname, date_time=(1980, 1, 1, 0, 0, 0))
zipf.writestr(zipinfo, SingleImageUploader.dump_image_bytes(metadata))
zipf.writestr(zipinfo, CachedImageUploader.dump_image_bytes(metadata))
assert len(sequence) == len(set(zipf.namelist()))
zipf.comment = json.dumps(
{"sequence_md5sum": sequence_md5sum},
Expand Down Expand Up @@ -537,6 +541,13 @@ class ImageSequenceUploader:
def __init__(self, upload_options: UploadOptions, emitter: EventEmitter):
self.upload_options = upload_options
self.emitter = emitter
# Create a single shared SingleImageUploader instance that will be used across all uploads
cache = _maybe_create_persistent_cache_instance(self.upload_options)
if cache:
cache.clear_expired()
self.cached_image_uploader = CachedImageUploader(
self.upload_options, cache=cache
)

def upload_images(
self, image_metadatas: T.Sequence[types.ImageMetadata]
Expand Down Expand Up @@ -688,10 +699,6 @@ def _upload_images_from_queue(
with api_v4.create_user_session(
self.upload_options.user_items["user_upload_token"]
) as user_session:
single_image_uploader = SingleImageUploader(
self.upload_options, user_session=user_session
)

while True:
# Assert that all images are already pushed into the queue
try:
Expand All @@ -710,8 +717,8 @@ def _upload_images_from_queue(
}

# image_progress will be updated during uploading
file_handle = single_image_uploader.upload(
image_metadata, image_progress
file_handle = self.cached_image_uploader.upload(
user_session, image_metadata, image_progress
)

# Update chunk_size (it was constant if set)
Expand All @@ -731,24 +738,27 @@ def _upload_images_from_queue(
return indexed_file_handles


class SingleImageUploader:
class CachedImageUploader:
def __init__(
self,
upload_options: UploadOptions,
user_session: requests.Session | None = None,
cache: history.PersistentCache | None = None,
):
self.upload_options = upload_options
self.user_session = user_session
self.cache = self._maybe_create_persistent_cache_instance(
self.upload_options.user_items, upload_options
)
self.cache = cache
if self.cache:
self.cache.clear_expired()

# Thread-safe
def upload(
self, image_metadata: types.ImageMetadata, image_progress: dict[str, T.Any]
self,
user_session: requests.Session,
image_metadata: types.ImageMetadata,
image_progress: dict[str, T.Any],
) -> str:
image_bytes = self.dump_image_bytes(image_metadata)

uploader = Uploader(self.upload_options, user_session=self.user_session)
uploader = Uploader(self.upload_options, user_session=user_session)

session_key = uploader._gen_session_key(io.BytesIO(image_bytes), image_progress)

Expand Down Expand Up @@ -786,51 +796,7 @@ def dump_image_bytes(cls, metadata: types.ImageMetadata) -> bytes:
f"Failed to dump EXIF bytes: {ex}", metadata.filename
) from ex

@classmethod
def _maybe_create_persistent_cache_instance(
cls, user_items: config.UserItem, upload_options: UploadOptions
) -> history.PersistentCache | None:
if not constants.UPLOAD_CACHE_DIR:
LOG.debug(
"Upload cache directory is set empty, skipping caching upload file handles"
)
return None

if upload_options.dry_run:
LOG.debug("Dry-run mode enabled, skipping caching upload file handles")
return None

# Different python/CLI versions use different cache (dbm) formats.
# Separate them to avoid conflicts
py_version_parts = [str(part) for part in sys.version_info[:3]]
version = f"py_{'_'.join(py_version_parts)}_{VERSION}"

cache_path_dir = (
Path(constants.UPLOAD_CACHE_DIR)
.joinpath(version)
.joinpath(api_v4.MAPILLARY_CLIENT_TOKEN.replace("|", "_"))
.joinpath(
user_items.get("MAPSettingsUserKey", user_items["user_upload_token"])
)
)
cache_path_dir.mkdir(parents=True, exist_ok=True)
cache_path = cache_path_dir.joinpath("cached_file_handles")

# Sanitize sensitive segments for logging
sanitized_cache_path = (
Path(constants.UPLOAD_CACHE_DIR)
.joinpath(version)
.joinpath("***")
.joinpath("***")
.joinpath("cached_file_handles")
)
LOG.debug(f"File handle cache path: {sanitized_cache_path}")

cache = history.PersistentCache(str(cache_path.resolve()))
cache.clear_expired()

return cache

# Thread-safe
def _get_cached_file_handle(self, key: str) -> str | None:
if self.cache is None:
return None
Expand All @@ -840,6 +806,7 @@ def _get_cached_file_handle(self, key: str) -> str | None:

return self.cache.get(key)

# Thread-safe
def _set_file_handle_cache(self, key: str, value: str) -> None:
if self.cache is None:
return
Expand Down Expand Up @@ -1168,3 +1135,57 @@ def _prefixed_uuid4():

def _is_uuid(key: str) -> bool:
return key.startswith("uuid_") or key.startswith("mly_tools_uuid_")


def _build_upload_cache_path(upload_options: UploadOptions) -> Path:
# Different python/CLI versions use different cache (dbm) formats.
# Separate them to avoid conflicts
py_version_parts = [str(part) for part in sys.version_info[:3]]
version = f"py_{'_'.join(py_version_parts)}_{VERSION}"
# File handles are not sharable between different users
user_id = str(
upload_options.user_items.get(
"MAPSettingsUserKey", upload_options.user_items["user_upload_token"]
)
)
# Use hash to avoid log sensitive data
user_fingerprint = utils.md5sum_fp(
io.BytesIO((api_v4.MAPILLARY_CLIENT_TOKEN + user_id).encode("utf-8")),
md5=hashlib.sha256(),
).hexdigest()[:24]

cache_path = (
Path(constants.UPLOAD_CACHE_DIR)
.joinpath(version)
.joinpath(user_fingerprint)
.joinpath("cached_file_handles")
)

return cache_path


def _maybe_create_persistent_cache_instance(
upload_options: UploadOptions,
) -> history.PersistentCache | None:
"""Create a persistent cache instance if caching is enabled."""

if upload_options.dry_run:
LOG.debug("Dry-run mode enabled, skipping caching upload file handles")
return None

if upload_options.upload_cache_path is None:
if not constants.UPLOAD_CACHE_DIR:
LOG.debug(
"Upload cache directory is set empty, skipping caching upload file handles"
)
return None

cache_path = _build_upload_cache_path(upload_options)
else:
cache_path = upload_options.upload_cache_path

LOG.debug(f"File handle cache path: {cache_path}")

cache_path.parent.mkdir(parents=True, exist_ok=True)

return history.PersistentCache(str(cache_path.resolve()))
Loading
Loading