Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 82 additions & 37 deletions mapillary_tools/upload_api_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
else:
from typing_extensions import override

import tempfile

import requests

from .api_v4 import request_get, request_post, REQUESTS_TIMEOUT
Expand All @@ -30,14 +32,14 @@


class UploadService:
"""
Upload byte streams to the Upload Service.
"""

user_access_token: str
session_key: str

def __init__(
self,
user_access_token: str,
session_key: str,
):
def __init__(self, user_access_token: str, session_key: str):
self.user_access_token = user_access_token
self.session_key = session_key

Expand All @@ -46,11 +48,7 @@ def fetch_offset(self) -> int:
"Authorization": f"OAuth {self.user_access_token}",
}
url = f"{MAPILLARY_UPLOAD_ENDPOINT}/{self.session_key}"
resp = request_get(
url,
headers=headers,
timeout=REQUESTS_TIMEOUT,
)
resp = request_get(url, headers=headers, timeout=REQUESTS_TIMEOUT)
resp.raise_for_status()
data = resp.json()
return data["offset"]
Expand All @@ -59,18 +57,53 @@ def fetch_offset(self) -> int:
def chunkize_byte_stream(
cls, stream: T.IO[bytes], chunk_size: int
) -> T.Generator[bytes, None, None]:
"""
Chunkize a byte stream into chunks of the specified size.

>>> list(UploadService.chunkize_byte_stream(io.BytesIO(b"foo"), 1))
[b'f', b'o', b'o']

>>> list(UploadService.chunkize_byte_stream(io.BytesIO(b"foo"), 10))
[b'foo']
"""

if chunk_size <= 0:
raise ValueError("Expect positive chunk size")

while True:
data = stream.read(chunk_size)
if not data:
break
yield data

@classmethod
def shift_chunks(
self, chunks: T.Iterable[bytes], offset: int
cls, chunks: T.Iterable[bytes], offset: int
) -> T.Generator[bytes, None, None]:
assert offset >= 0, f"Expect non-negative offset but got {offset}"
"""
Shift the chunks by the offset.

>>> list(UploadService.shift_chunks([b"foo", b"bar"], 0))
[b'foo', b'bar']

>>> list(UploadService.shift_chunks([b"foo", b"bar"], 1))
[b'oo', b'bar']

>>> list(UploadService.shift_chunks([b"foo", b"bar"], 3))
[b'bar']

>>> list(UploadService.shift_chunks([b"foo", b"bar"], 6))
[]

>>> list(UploadService.shift_chunks([b"foo", b"bar"], 7))
[]

>>> list(UploadService.shift_chunks([], 0))
[]
"""

if offset < 0:
raise ValueError(f"Expect non-negative offset but got {offset}")

for chunk in chunks:
if offset:
Expand Down Expand Up @@ -103,12 +136,10 @@ def upload_chunks(
return self.upload_shifted_chunks(shifted_chunks, offset)

def upload_shifted_chunks(
self,
shifted_chunks: T.Iterable[bytes],
offset: int,
self, shifted_chunks: T.Iterable[bytes], offset: int
) -> str:
"""
Upload the chunks that must already be shifted by the offset (e.g. fp.seek(begin_offset, io.SEEK_SET))
Upload the chunks that must already be shifted by the offset (e.g. fp.seek(offset, io.SEEK_SET))
"""

headers = {
Expand All @@ -118,10 +149,7 @@ def upload_shifted_chunks(
}
url = f"{MAPILLARY_UPLOAD_ENDPOINT}/{self.session_key}"
resp = request_post(
url,
headers=headers,
data=shifted_chunks,
timeout=UPLOAD_REQUESTS_TIMEOUT,
url, headers=headers, data=shifted_chunks, timeout=UPLOAD_REQUESTS_TIMEOUT
)

resp.raise_for_status()
Expand All @@ -137,18 +165,35 @@ def upload_shifted_chunks(

# A mock class for testing only
class FakeUploadService(UploadService):
def __init__(self, *args, **kwargs):
"""
A mock upload service that simulates the upload process for testing purposes.
It writes the uploaded data to a file in a temporary directory and generates a fake file handle.
"""

FILE_HANDLE_DIR: str = "file_handles"

def __init__(
self,
upload_path: Path | None = None,
transient_error_ratio: float = 0.0,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self._upload_path = Path(
os.getenv("MAPILLARY_UPLOAD_PATH", "mapillary_public_uploads")
)
self._error_ratio = 0.02
if upload_path is None:
upload_path = Path(tempfile.gettempdir()).joinpath(
"mapillary_public_uploads"
)
self._upload_path = upload_path
self._transient_error_ratio = transient_error_ratio

@property
def upload_path(self) -> Path:
return self._upload_path

@override
def upload_shifted_chunks(
self,
shifted_chunks: T.Iterable[bytes],
offset: int,
self, shifted_chunks: T.Iterable[bytes], offset: int
) -> str:
expected_offset = self.fetch_offset()
if offset != expected_offset:
Expand All @@ -160,17 +205,17 @@ def upload_shifted_chunks(
filename = self._upload_path.joinpath(self.session_key)
with filename.open("ab") as fp:
for chunk in shifted_chunks:
if random.random() <= self._error_ratio:
if random.random() <= self._transient_error_ratio:
raise requests.ConnectionError(
f"TEST ONLY: Failed to upload with error ratio {self._error_ratio}"
f"TEST ONLY: Failed to upload with error ratio {self._transient_error_ratio}"
)
fp.write(chunk)
if random.random() <= self._error_ratio:
if random.random() <= self._transient_error_ratio:
raise requests.ConnectionError(
f"TEST ONLY: Partially uploaded with error ratio {self._error_ratio}"
f"TEST ONLY: Partially uploaded with error ratio {self._transient_error_ratio}"
)

file_handle_dir = self._upload_path.joinpath("file_handles")
file_handle_dir = self._upload_path.joinpath(self.FILE_HANDLE_DIR)
file_handle_path = file_handle_dir.joinpath(self.session_key)
if not file_handle_path.exists():
os.makedirs(file_handle_dir, exist_ok=True)
Expand All @@ -181,12 +226,12 @@ def upload_shifted_chunks(

@override
def fetch_offset(self) -> int:
if random.random() <= self._error_ratio:
if random.random() <= self._transient_error_ratio:
raise requests.ConnectionError(
f"TEST ONLY: Partially uploaded with error ratio {self._error_ratio}"
f"TEST ONLY: Partially uploaded with error ratio {self._transient_error_ratio}"
)
filename = os.path.join(self._upload_path, self.session_key)
if not os.path.exists(filename):
filename = self._upload_path.joinpath(self.session_key)
if not filename.exists():
return 0
with open(filename, "rb") as fp:
fp.seek(0, io.SEEK_END)
Expand Down
6 changes: 6 additions & 0 deletions mapillary_tools/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,15 @@ def _create_upload_service(self, session_key: str) -> upload_api_v4.UploadServic
upload_service: upload_api_v4.UploadService

if self.dry_run:
upload_path = os.getenv("MAPILLARY_UPLOAD_ENDPOINT")
upload_service = upload_api_v4.FakeUploadService(
user_access_token=self.user_items["user_upload_token"],
session_key=session_key,
upload_path=Path(upload_path) if upload_path is not None else None,
)
LOG.info(
"Dry run mode enabled. Data will be uploaded to %s",
upload_service.upload_path.joinpath(session_key),
)
else:
upload_service = upload_api_v4.UploadService(
Expand Down
16 changes: 8 additions & 8 deletions tests/integration/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import py.path
import pytest

from mapillary_tools import utils
from mapillary_tools import upload_api_v4, utils

EXECUTABLE = os.getenv(
"MAPILLARY_TOOLS__TESTS_EXECUTABLE", "python3 -m mapillary_tools.commands"
Expand Down Expand Up @@ -58,7 +58,7 @@ def setup_data(tmpdir: py.path.local):
@pytest.fixture
def setup_upload(tmpdir: py.path.local):
upload_dir = tmpdir.mkdir("mapillary_public_uploads")
os.environ["MAPILLARY_UPLOAD_PATH"] = str(upload_dir)
os.environ["MAPILLARY_UPLOAD_ENDPOINT"] = str(upload_dir)
os.environ["MAPILLARY_TOOLS__AUTH_VERIFICATION_DISABLED"] = "YES"
os.environ["MAPILLARY_TOOLS_PROMPT_DISABLED"] = "YES"
os.environ["MAPILLARY__ENABLE_UPLOAD_HISTORY_FOR_DRY_RUN"] = "YES"
Expand All @@ -67,7 +67,7 @@ def setup_upload(tmpdir: py.path.local):
yield upload_dir
if tmpdir.check():
tmpdir.remove(ignore_errors=True)
os.environ.pop("MAPILLARY_UPLOAD_PATH", None)
os.environ.pop("MAPILLARY_UPLOAD_ENDPOINT", None)
os.environ.pop("MAPILLARY_UPLOAD_HISTORY_PATH", None)
os.environ.pop("MAPILLARY_TOOLS__AUTH_VERIFICATION_DISABLED", None)
os.environ.pop("MAPILLARY_TOOLS_PROMPT_DISABLED", None)
Expand Down Expand Up @@ -239,11 +239,11 @@ def load_descs(descs) -> list:


def extract_all_uploaded_descs(upload_folder: Path) -> list[list[dict]]:
FILE_HANDLE_DIRNAME = "file_handles"

session_by_file_handle: dict[str, str] = {}
if upload_folder.joinpath(FILE_HANDLE_DIRNAME).exists():
for session_path in upload_folder.joinpath(FILE_HANDLE_DIRNAME).iterdir():
if upload_folder.joinpath(upload_api_v4.FakeUploadService.FILE_HANDLE_DIR).exists():
for session_path in upload_folder.joinpath(
upload_api_v4.FakeUploadService.FILE_HANDLE_DIR
).iterdir():
file_handle = session_path.read_text()
session_by_file_handle[file_handle] = session_path.name

Expand All @@ -267,7 +267,7 @@ def extract_all_uploaded_descs(upload_folder: Path) -> list[list[dict]]:
sequences.append(validate_and_extract_zip(file))
elif file.suffix == ".mp4":
sequences.append(validate_and_extract_camm(file))
elif file.name == FILE_HANDLE_DIRNAME:
elif file.name == upload_api_v4.FakeUploadService.FILE_HANDLE_DIR:
# Already processed above
pass

Expand Down
33 changes: 19 additions & 14 deletions tests/unit/test_upload_api_v4.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,55 @@
import io
from pathlib import Path

import py

from mapillary_tools import upload_api_v4

from ..integration.fixtures import setup_upload


def test_upload(setup_upload: py.path.local):
def test_upload(tmpdir: py.path.local):
upload_service = upload_api_v4.FakeUploadService(
user_access_token="TEST",
session_key="FOOBAR.txt",
upload_path=Path(tmpdir),
transient_error_ratio=0.02,
)
upload_service._error_ratio = 0
upload_service._transient_error_ratio = 0
content = b"double_foobar"
cluster_id = upload_service.upload_byte_stream(io.BytesIO(content), chunk_size=1)
assert isinstance(cluster_id, str), cluster_id
assert (setup_upload.join("FOOBAR.txt").read_binary()) == content
assert (tmpdir.join("FOOBAR.txt").read_binary()) == content

# reupload should not affect the file
upload_service.upload_byte_stream(io.BytesIO(content), chunk_size=1)
assert (setup_upload.join("FOOBAR.txt").read_binary()) == content
assert (tmpdir.join("FOOBAR.txt").read_binary()) == content


def test_upload_big_chunksize(setup_upload: py.path.local):
def test_upload_big_chunksize(tmpdir: py.path.local):
upload_service = upload_api_v4.FakeUploadService(
user_access_token="TEST",
session_key="FOOBAR.txt",
upload_path=Path(tmpdir),
transient_error_ratio=0.02,
)
upload_service._error_ratio = 0
upload_service._transient_error_ratio = 0
content = b"double_foobar"
cluster_id = upload_service.upload_byte_stream(io.BytesIO(content), chunk_size=1000)
assert isinstance(cluster_id, str), cluster_id
assert (setup_upload.join("FOOBAR.txt").read_binary()) == content
assert (tmpdir.join("FOOBAR.txt").read_binary()) == content

# reupload should not affect the file
upload_service.upload_byte_stream(io.BytesIO(content), chunk_size=1000)
assert (setup_upload.join("FOOBAR.txt").read_binary()) == content
assert (tmpdir.join("FOOBAR.txt").read_binary()) == content


def test_upload_chunks(setup_upload: py.path.local):
def test_upload_chunks(tmpdir: py.path.local):
upload_service = upload_api_v4.FakeUploadService(
user_access_token="TEST",
session_key="FOOBAR2.txt",
upload_path=Path(tmpdir),
transient_error_ratio=0.02,
)
upload_service._error_ratio = 0
upload_service._transient_error_ratio = 0

def _gen_chunks():
yield b"foo"
Expand All @@ -55,8 +60,8 @@ def _gen_chunks():
cluster_id = upload_service.upload_chunks(_gen_chunks())

assert isinstance(cluster_id, str), cluster_id
assert (setup_upload.join("FOOBAR2.txt").read_binary()) == b"foobar"
assert (tmpdir.join("FOOBAR2.txt").read_binary()) == b"foobar"

# reupload should not affect the file
upload_service.upload_chunks(_gen_chunks())
assert (setup_upload.join("FOOBAR2.txt").read_binary()) == b"foobar"
assert (tmpdir.join("FOOBAR2.txt").read_binary()) == b"foobar"