Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mapillary_tools/api_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class ClusterFileType(enum.Enum):
ZIP = "zip"
BLACKVUE = "mly_blackvue_video"
CAMM = "mly_camm_video"
MLY_BUNDLE_MANIFEST = "mly_bundle_manifest"


class HTTPSystemCertsAdapter(HTTPAdapter):
Expand Down
2 changes: 2 additions & 0 deletions mapillary_tools/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,5 @@ def _yes_or_no(val: str) -> bool:
"upload_history",
),
)

MAX_IMAGE_UPLOAD_WORKERS = int(os.getenv(_ENV_PREFIX + "MAX_IMAGE_UPLOAD_WORKERS", 64))
64 changes: 27 additions & 37 deletions mapillary_tools/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@
import hashlib
import json
import os
import sys
import typing as T
import uuid
from pathlib import Path
from typing import Literal, TypedDict
from typing import TypedDict

if sys.version_info >= (3, 11):
from typing import Required
else:
from typing_extensions import Required

import jsonschema

Expand Down Expand Up @@ -144,73 +150,57 @@ class UserItem(TypedDict, total=False):
# Not in use. Keep here for back-compatibility
MAPSettingsUsername: str
MAPSettingsUserKey: str
user_upload_token: str
user_upload_token: Required[str]


class _CompassHeading(TypedDict, total=True):
TrueHeading: float
MagneticHeading: float


class _ImageRequired(TypedDict, total=True):
MAPLatitude: float
MAPLongitude: float
MAPCaptureTime: str
class _SharedDescription(TypedDict, total=False):
filename: Required[str]
filetype: Required[str]

# if None or absent, it will be calculated
md5sum: str | None
filesize: int | None


class _Image(_ImageRequired, total=False):
class ImageDescription(_SharedDescription, total=False):
MAPLatitude: Required[float]
MAPLongitude: Required[float]
MAPAltitude: float
MAPCaptureTime: Required[str]
MAPCompassHeading: _CompassHeading


class _SequenceOnly(TypedDict, total=False):
MAPSequenceUUID: str


class MetaProperties(TypedDict, total=False):
MAPDeviceMake: str
MAPDeviceModel: str
MAPGPSAccuracyMeters: float
MAPCameraUUID: str
MAPOrientation: int


class ImageDescription(_SequenceOnly, _Image, MetaProperties, total=True):
# filename is required
filename: str
# if None or absent, it will be calculated
md5sum: str | None
filetype: Literal["image"]
filesize: int | None


class _VideoDescriptionRequired(TypedDict, total=True):
filename: str
md5sum: str | None
filetype: str
MAPGPSTrack: list[T.Sequence[float | int | None]]
# For grouping images in a sequence
MAPSequenceUUID: str


class VideoDescription(_VideoDescriptionRequired, total=False):
class VideoDescription(_SharedDescription, total=False):
MAPGPSTrack: Required[list[T.Sequence[float | int | None]]]
MAPDeviceMake: str
MAPDeviceModel: str
filesize: int | None


class _ErrorDescription(TypedDict, total=False):
# type and message are required
type: str
type: Required[str]
message: str
# vars is optional
vars: dict


class _ImageDescriptionErrorRequired(TypedDict, total=True):
filename: str
error: _ErrorDescription


class ImageDescriptionError(_ImageDescriptionErrorRequired, total=False):
class ImageDescriptionError(TypedDict, total=False):
filename: Required[str]
error: Required[_ErrorDescription]
filetype: str


Expand Down
39 changes: 22 additions & 17 deletions mapillary_tools/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
ipc,
telemetry,
types,
upload_api_v4,
uploader,
utils,
VERSION,
Expand Down Expand Up @@ -192,8 +191,9 @@ def write_history(payload: uploader.Progress):
def _setup_tdqm(emitter: uploader.EventEmitter) -> None:
upload_pbar: tqdm | None = None

@emitter.on("upload_start")
@emitter.on("upload_fetch_offset")
def upload_fetch_offset(payload: uploader.Progress) -> None:
def upload_start(payload: uploader.Progress) -> None:
nonlocal upload_pbar

if upload_pbar is not None:
Expand All @@ -204,18 +204,18 @@ def upload_fetch_offset(payload: uploader.Progress) -> None:
import_path: str | None = payload.get("import_path")
filetype = payload.get("file_type", "unknown").upper()
if import_path is None:
_desc = f"Uploading {filetype} ({nth}/{total})"
desc = f"Uploading {filetype} ({nth}/{total})"
else:
_desc = (
desc = (
f"Uploading {filetype} {os.path.basename(import_path)} ({nth}/{total})"
)
upload_pbar = tqdm(
total=payload["entity_size"],
desc=_desc,
desc=desc,
unit="B",
unit_scale=True,
unit_divisor=1024,
initial=payload["offset"],
initial=payload.get("offset", 0),
disable=LOG.getEffectiveLevel() <= logging.DEBUG,
)

Expand Down Expand Up @@ -295,8 +295,13 @@ def _setup_api_stats(emitter: uploader.EventEmitter):

@emitter.on("upload_start")
def collect_start_time(payload: _APIStats) -> None:
payload["upload_start_time"] = time.time()
now = time.time()
payload["upload_start_time"] = now
payload["upload_total_time"] = 0
# These filed should be initialized in upload events like "upload_fetch_offset"
# but since we disabled them for uploading images, so we initialize them here
payload["upload_last_restart_time"] = now
payload["upload_first_offset"] = 0

@emitter.on("upload_fetch_offset")
def collect_restart_time(payload: _APIStats) -> None:
Expand Down Expand Up @@ -466,7 +471,7 @@ def _gen_upload_everything(
(m for m in metadatas if isinstance(m, types.ImageMetadata)),
utils.find_images(import_paths, skip_subfolders=skip_subfolders),
)
for image_result in uploader.ZipImageSequence.zip_images_and_upload(
for image_result in uploader.ZipImageSequence.upload_images(
mly_uploader,
image_metadatas,
):
Expand Down Expand Up @@ -510,13 +515,8 @@ def _gen_upload_videos(
"file_type": video_metadata.filetype.value,
"import_path": str(video_metadata.filename),
"sequence_md5sum": video_metadata.md5sum,
"upload_md5sum": video_metadata.md5sum,
}

session_key = uploader._session_key(
video_metadata.md5sum, upload_api_v4.ClusterFileType.CAMM
)

try:
with video_metadata.filename.open("rb") as src_fp:
# Build the mp4 stream with the CAMM samples
Expand All @@ -525,12 +525,15 @@ def _gen_upload_videos(
)

# Upload the mp4 stream
cluster_id = mly_uploader.upload_stream(
file_handle = mly_uploader.upload_stream(
T.cast(T.IO[bytes], camm_fp),
upload_api_v4.ClusterFileType.CAMM,
session_key,
progress=T.cast(T.Dict[str, T.Any], progress),
)
cluster_id = mly_uploader.finish_upload(
file_handle,
api_v4.ClusterFileType.CAMM,
progress=T.cast(T.Dict[str, T.Any], progress),
)
except Exception as ex:
yield video_metadata, uploader.UploadResult(error=ex)
else:
Expand Down Expand Up @@ -706,7 +709,9 @@ def upload(

finally:
# We collected stats after every upload is finished
assert upload_successes == len(stats)
assert upload_successes == len(stats), (
f"Expect {upload_successes} success but got {stats}"
)
_show_upload_summary(stats, upload_errors)


Expand Down
32 changes: 15 additions & 17 deletions mapillary_tools/upload_api_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import sys
import typing as T
import uuid
from pathlib import Path

if sys.version_info >= (3, 12):
from typing import override
Expand All @@ -14,7 +15,7 @@

import requests

from .api_v4 import ClusterFileType, request_get, request_post, REQUESTS_TIMEOUT
from .api_v4 import request_get, request_post, REQUESTS_TIMEOUT

MAPILLARY_UPLOAD_ENDPOINT = os.getenv(
"MAPILLARY_UPLOAD_ENDPOINT", "https://rupload.facebook.com/mapillary_public_uploads"
Expand All @@ -31,24 +32,14 @@
class UploadService:
user_access_token: str
session_key: str
cluster_filetype: ClusterFileType

MIME_BY_CLUSTER_TYPE: dict[ClusterFileType, str] = {
ClusterFileType.ZIP: "application/zip",
ClusterFileType.BLACKVUE: "video/mp4",
ClusterFileType.CAMM: "video/mp4",
}

def __init__(
self,
user_access_token: str,
session_key: str,
cluster_filetype: ClusterFileType,
):
self.user_access_token = user_access_token
self.session_key = session_key
# Validate the input
self.cluster_filetype = cluster_filetype

def fetch_offset(self) -> int:
headers = {
Expand Down Expand Up @@ -124,7 +115,6 @@ def upload_shifted_chunks(
"Authorization": f"OAuth {self.user_access_token}",
"Offset": f"{offset}",
"X-Entity-Name": self.session_key,
"X-Entity-Type": self.MIME_BY_CLUSTER_TYPE[self.cluster_filetype],
}
url = f"{MAPILLARY_UPLOAD_ENDPOINT}/{self.session_key}"
resp = request_post(
Expand All @@ -149,8 +139,8 @@ def upload_shifted_chunks(
class FakeUploadService(UploadService):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._upload_path = os.getenv(
"MAPILLARY_UPLOAD_PATH", "mapillary_public_uploads"
self._upload_path = Path(
os.getenv("MAPILLARY_UPLOAD_PATH", "mapillary_public_uploads")
)
self._error_ratio = 0.02

Expand All @@ -167,8 +157,8 @@ def upload_shifted_chunks(
)

os.makedirs(self._upload_path, exist_ok=True)
filename = os.path.join(self._upload_path, self.session_key)
with open(filename, "ab") as fp:
filename = self._upload_path.joinpath(self.session_key)
with filename.open("ab") as fp:
for chunk in shifted_chunks:
if random.random() <= self._error_ratio:
raise requests.ConnectionError(
Expand All @@ -179,7 +169,15 @@ def upload_shifted_chunks(
raise requests.ConnectionError(
f"TEST ONLY: Partially uploaded with error ratio {self._error_ratio}"
)
return uuid.uuid4().hex

file_handle_dir = self._upload_path.joinpath("file_handles")
file_handle_path = file_handle_dir.joinpath(self.session_key)
if not file_handle_path.exists():
os.makedirs(file_handle_dir, exist_ok=True)
random_file_handle = uuid.uuid4().hex
file_handle_path.write_text(random_file_handle)

return file_handle_path.read_text()

@override
def fetch_offset(self) -> int:
Expand Down
Loading
Loading