Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion mapillary_tools/api_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def _log_debug_request(
json: dict | None = None,
params: dict | None = None,
headers: dict | None = None,
timeout: T.Any = None,
):
if logging.getLogger().getEffectiveLevel() <= logging.DEBUG:
return
Expand All @@ -140,6 +141,9 @@ def _log_debug_request(
if headers:
msg += f" HEADERS={_sanitize(headers)}"

if timeout is not None:
msg += f" TIMEOUT={timeout}"

msg = msg.replace("\n", "\\n")

LOG.debug(msg)
Expand Down Expand Up @@ -202,6 +206,7 @@ def request_post(
json=json,
params=kwargs.get("params"),
headers=kwargs.get("headers"),
timeout=kwargs.get("timeout"),
)

if USE_SYSTEM_CERTS:
Expand Down Expand Up @@ -235,7 +240,12 @@ def request_get(

if not disable_debug:
_log_debug_request(
"GET", url, params=kwargs.get("params"), headers=kwargs.get("headers")
"GET",
url,
params=kwargs.get("params"),
headers=kwargs.get("headers"),
# Do not log timeout here as it's always set to REQUESTS_TIMEOUT
timeout=None,
)

if USE_SYSTEM_CERTS:
Expand Down
5 changes: 5 additions & 0 deletions mapillary_tools/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ def _parse_scaled_integers(
_ENV_PREFIX + "UPLOAD_CACHE_DIR",
os.path.join(tempfile.gettempdir(), "mapillary_tools", "upload_cache"),
)
# The minimal upload speed is used to calculate the read timeout to avoid upload hanging:
# timeout = upload_size / MIN_UPLOAD_SPEED
MIN_UPLOAD_SPEED: int | None = _parse_filesize(
os.getenv(_ENV_PREFIX + "MIN_UPLOAD_SPEED", "50K") # 50 KiB/s
)
MAX_IMAGE_UPLOAD_WORKERS: int = int(
os.getenv(_ENV_PREFIX + "MAX_IMAGE_UPLOAD_WORKERS", 64)
)
Expand Down
24 changes: 16 additions & 8 deletions mapillary_tools/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,13 @@ def upload(
_setup_ipc(emitter)

mly_uploader = uploader.Uploader(
user_items,
uploader.UploadOptions(
user_items,
dry_run=dry_run,
nofinish=nofinish,
noresume=noresume,
),
emitter=emitter,
dry_run=dry_run,
nofinish=nofinish,
noresume=noresume,
)

results = _gen_upload_everything(
Expand Down Expand Up @@ -324,13 +326,13 @@ def _setup_ipc(emitter: uploader.EventEmitter):
@emitter.on("upload_start")
def upload_start(payload: uploader.Progress):
type: uploader.EventName = "upload_start"
LOG.debug("IPC %s: %s", type.upper(), payload)
LOG.debug(f"{type.upper()}: {json.dumps(payload)}")
ipc.send(type, payload)

@emitter.on("upload_fetch_offset")
def upload_fetch_offset(payload: uploader.Progress) -> None:
type: uploader.EventName = "upload_fetch_offset"
LOG.debug("IPC %s: %s", type.upper(), payload)
LOG.debug(f"{type.upper()}: {json.dumps(payload)}")
ipc.send(type, payload)

@emitter.on("upload_progress")
Expand All @@ -349,15 +351,21 @@ def upload_progress(payload: uploader.Progress):
last_upload_progress_debug_at is None
or last_upload_progress_debug_at + INTERVAL_SECONDS < now
):
LOG.debug("IPC %s: %s", type.upper(), payload)
LOG.debug(f"{type.upper()}: {json.dumps(payload)}")
T.cast(T.Dict, payload)["_last_upload_progress_debug_at"] = now

ipc.send(type, payload)

@emitter.on("upload_end")
def upload_end(payload: uploader.Progress) -> None:
type: uploader.EventName = "upload_end"
LOG.debug("IPC %s: %s", type.upper(), payload)
LOG.debug(f"{type.upper()}: {json.dumps(payload)}")
ipc.send(type, payload)

@emitter.on("upload_failed")
def upload_failed(payload: uploader.Progress) -> None:
type: uploader.EventName = "upload_failed"
LOG.debug(f"{type.upper()}: {json.dumps(payload)}")
ipc.send(type, payload)


Expand Down
24 changes: 18 additions & 6 deletions mapillary_tools/upload_api_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,23 +125,34 @@ def upload_byte_stream(
stream: T.IO[bytes],
offset: int | None = None,
chunk_size: int = 2 * 1024 * 1024, # 2MB
read_timeout: float | None = None,
) -> str:
if offset is None:
offset = self.fetch_offset()
return self.upload_chunks(self.chunkize_byte_stream(stream, chunk_size), offset)
return self.upload_chunks(
self.chunkize_byte_stream(stream, chunk_size),
offset,
read_timeout=read_timeout,
)

def upload_chunks(
self,
chunks: T.Iterable[bytes],
offset: int | None = None,
read_timeout: float | None = None,
) -> str:
if offset is None:
offset = self.fetch_offset()
shifted_chunks = self.shift_chunks(chunks, offset)
return self.upload_shifted_chunks(shifted_chunks, offset)
return self.upload_shifted_chunks(
shifted_chunks, offset, read_timeout=read_timeout
)

def upload_shifted_chunks(
self, shifted_chunks: T.Iterable[bytes], offset: int
self,
shifted_chunks: T.Iterable[bytes],
offset: int,
read_timeout: float | None = None,
) -> str:
"""
Upload the chunks that must already be shifted by the offset (e.g. fp.seek(offset, io.SEEK_SET))
Expand All @@ -153,8 +164,6 @@ def upload_shifted_chunks(
"X-Entity-Name": self.session_key,
}
url = f"{MAPILLARY_UPLOAD_ENDPOINT}/{self.session_key}"
# TODO: Estimate read timeout based on the data size
read_timeout = None
resp = request_post(
url,
headers=headers,
Expand Down Expand Up @@ -198,7 +207,10 @@ def __init__(

@override
def upload_shifted_chunks(
self, shifted_chunks: T.Iterable[bytes], offset: int
self,
shifted_chunks: T.Iterable[bytes],
offset: int,
read_timeout: float | None = None,
) -> str:
expected_offset = self.fetch_offset()
if offset != expected_offset:
Expand Down
Loading
Loading