Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 202 additions & 29 deletions mapillary_tools/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import concurrent.futures
import dataclasses
import datetime
import email.utils
import hashlib
import io
import json
Expand Down Expand Up @@ -946,27 +948,33 @@ def _handle_upload_exception(
begin_offset = progress.get("begin_offset")
offset = progress.get("offset")

if retries <= constants.MAX_UPLOAD_RETRIES and _is_retriable_exception(ex):
self.emitter.emit("upload_retrying", progress)
LOG.warning(
f"Error uploading {self._upload_name(progress)} at {offset=} since {begin_offset=}: {ex.__class__.__name__}: {ex}"
)

LOG.warning(
f"Error uploading {self._upload_name(progress)} at {offset=} since {begin_offset=}: {ex.__class__.__name__}: {ex}"
)
if retries <= constants.MAX_UPLOAD_RETRIES:
retriable, retry_after_sec = _is_retriable_exception(ex)
if retriable:
self.emitter.emit("upload_retrying", progress)

# Keep things immutable here. Will increment retries in the caller
retries += 1
if _is_immediate_retriable_exception(ex):
sleep_for = 0
else:
sleep_for = min(2**retries, 16)
LOG.info(
f"Retrying in {sleep_for} seconds ({retries}/{constants.MAX_UPLOAD_RETRIES})"
)
if sleep_for:
time.sleep(sleep_for)
else:
self.emitter.emit("upload_failed", progress)
raise ex
# Keep things immutable here. Will increment retries in the caller
retries += 1
if _is_immediate_retriable_exception(ex):
sleep_for = 0
else:
sleep_for = min(2**retries, 16)
sleep_for += retry_after_sec

LOG.info(
f"Retrying in {sleep_for} seconds ({retries}/{constants.MAX_UPLOAD_RETRIES})"
)
if sleep_for:
time.sleep(sleep_for)

return

self.emitter.emit("upload_failed", progress)
raise ex

@classmethod
def _upload_name(cls, progress: UploaderProgress):
Expand Down Expand Up @@ -1083,23 +1091,188 @@ def _is_immediate_retriable_exception(ex: BaseException) -> bool:
return False


def _is_retriable_exception(ex: BaseException) -> bool:
def _is_retriable_exception(ex: BaseException) -> tuple[bool, int]:
"""
Determine if an exception should be retried and how long to wait.

Args:
ex: Exception to check for retryability

Returns:
Tuple of (retriable, retry_after_sec) where:
- retriable: True if the exception should be retried
- retry_after_sec: Seconds to wait before retry (>= 0)

Examples:
>>> resp = requests.Response()
>>> resp._content = b"foo"
>>> resp.status_code = 400
>>> ex = requests.HTTPError("error", response=resp)
>>> _is_retriable_exception(ex)
(False, 0)
>>> resp._content = b'{"backoff": 13000, "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}}'
>>> resp.status_code = 400
>>> ex = requests.HTTPError("error", response=resp)
>>> _is_retriable_exception(ex)
(True, 13)
>>> resp._content = b'{"backoff": "foo", "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}}'
>>> resp.status_code = 400
>>> ex = requests.HTTPError("error", response=resp)
>>> _is_retriable_exception(ex)
(True, 10)
>>> resp._content = b'{"debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}}'
>>> resp.status_code = 400
>>> ex = requests.HTTPError("error", response=resp)
>>> _is_retriable_exception(ex)
(True, 10)
>>> resp._content = b"foo"
>>> resp.status_code = 429
>>> ex = requests.HTTPError("error", response=resp)
>>> _is_retriable_exception(ex)
(True, 10)
>>> resp._content = b"foo"
>>> resp.status_code = 429
>>> ex = requests.HTTPError("error", response=resp)
>>> _is_retriable_exception(ex)
(True, 10)
>>> resp._content = b'{"backoff": 12000, "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}}'
>>> resp.status_code = 429
>>> ex = requests.HTTPError("error", response=resp)
>>> _is_retriable_exception(ex)
(True, 12)
>>> resp._content = b'{"backoff": 12000, "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}}'
>>> resp.headers = {"Retry-After": "1"}
>>> resp.status_code = 503
>>> ex = requests.HTTPError("error", response=resp)
>>> _is_retriable_exception(ex)
(True, 1)
"""

DEFAULT_RETRY_AFTER_RATE_LIMIT_SEC = 10

if isinstance(ex, (requests.ConnectionError, requests.Timeout)):
return True
return True, 0

if isinstance(ex, requests.HTTPError) and isinstance(
ex.response, requests.Response
):
if 400 <= ex.response.status_code < 500:
status_code = ex.response.status_code

# Always retry with some delay
if status_code == 429:
retry_after_sec = (
_parse_retry_after_from_header(ex.response)
or DEFAULT_RETRY_AFTER_RATE_LIMIT_SEC
)

try:
resp = ex.response.json()
except json.JSONDecodeError:
return False
return resp.get("debug_info", {}).get("retriable", False)
else:
return True
data = ex.response.json()
except requests.JSONDecodeError:
return True, retry_after_sec

return False
backoff_ms = _parse_backoff(data.get("backoff"))
if backoff_ms is None:
return True, retry_after_sec
else:
return True, max(0, int(int(backoff_ms) / 1000))

if 400 <= status_code < 500:
try:
data = ex.response.json()
except requests.JSONDecodeError:
return False, (_parse_retry_after_from_header(ex.response) or 0)

debug_info = data.get("debug_info", {})

if isinstance(debug_info, dict):
error_type = debug_info.get("type")
else:
error_type = None

# The server may respond 429 RequestRateLimitedError but with retryable=False
# We should retry for this case regardless
# e.g. HTTP 429 {"backoff": 10000, "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}}
if error_type == "RequestRateLimitedError":
backoff_ms = _parse_backoff(data.get("backoff"))
if backoff_ms is None:
return True, (
_parse_retry_after_from_header(ex.response)
or DEFAULT_RETRY_AFTER_RATE_LIMIT_SEC
)
else:
return True, max(0, int(int(backoff_ms) / 1000))

return debug_info.get("retriable", False), 0

if 500 <= status_code < 600:
return True, (_parse_retry_after_from_header(ex.response) or 0)

return False, 0


def _parse_backoff(backoff: T.Any) -> int | None:
if backoff is not None:
try:
backoff_ms = int(backoff)
except (ValueError, TypeError):
backoff_ms = None
else:
backoff_ms = None
return backoff_ms


def _parse_retry_after_from_header(resp: requests.Response) -> int | None:
"""
Parse Retry-After header from HTTP response.
See See https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Headers/Retry-After

Args:
resp: HTTP response object with headers

Returns:
Number of seconds to wait (>= 0) or None if header missing/invalid.

Examples:
>>> resp = requests.Response()
>>> resp.headers = {"Retry-After": "1"}
>>> _parse_retry_after_from_header(resp)
1
>>> resp.headers = {"Retry-After": "-1"}
>>> _parse_retry_after_from_header(resp)
0
>>> resp.headers = {"Retry-After": "Wed, 21 Oct 2015 07:28:00 GMT"}
>>> _parse_retry_after_from_header(resp)
0
>>> resp.headers = {"Retry-After": "Wed, 21 Oct 2315 07:28:00"}
>>> _parse_retry_after_from_header(resp)
"""

value = resp.headers.get("Retry-After")
if value is None:
return None

try:
return max(0, int(value))
except (ValueError, TypeError):
pass

# e.g. "Wed, 21 Oct 2015 07:28:00 GMT"
try:
dt = email.utils.parsedate_to_datetime(value)
except (ValueError, TypeError):
dt = None

if dt is None:
LOG.warning(f"Error parsing Retry-After: {value}")
return None

try:
delta = dt - datetime.datetime.now(datetime.timezone.utc)
except (TypeError, ValueError):
# e.g. TypeError: can't subtract offset-naive and offset-aware datetimes
return None

return max(0, int(delta.total_seconds()))


_SUFFIX_MAP: dict[api_v4.ClusterFileType | types.FileType, str] = {
Expand Down
Loading