Skip to content
Merged
18 changes: 9 additions & 9 deletions mapillary_tools/authenticate.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import requests

from . import api_v4, config, constants, exceptions, types
from . import api_v4, config, constants, exceptions


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -64,7 +64,7 @@ def authenticate(
LOG.info('Creating new profile: "%s"', profile_name)

if jwt:
user_items: types.UserItem = {"user_upload_token": jwt}
user_items: config.UserItem = {"user_upload_token": jwt}
user_items = _verify_user_auth(_validate_profile(user_items))
else:
user_items = _prompt_login(
Expand All @@ -89,7 +89,7 @@ def authenticate(
def fetch_user_items(
user_name: str | None = None,
organization_key: str | None = None,
) -> types.UserItem:
) -> config.UserItem:
"""
Read user information from the config file,
or prompt the user to authenticate if the specified profile does not exist
Expand Down Expand Up @@ -155,17 +155,17 @@ def _prompt(message: str) -> str:
return input()


def _validate_profile(user_items: types.UserItem) -> types.UserItem:
def _validate_profile(user_items: config.UserItem) -> config.UserItem:
try:
jsonschema.validate(user_items, types.UserItemSchema)
jsonschema.validate(user_items, config.UserItemSchema)
except jsonschema.ValidationError as ex:
raise exceptions.MapillaryBadParameterError(
f"Invalid profile format: {ex.message}"
)
return user_items


def _verify_user_auth(user_items: types.UserItem) -> types.UserItem:
def _verify_user_auth(user_items: config.UserItem) -> config.UserItem:
"""
Verify that the user access token is valid
"""
Expand Down Expand Up @@ -205,7 +205,7 @@ def _validate_profile_name(profile_name: str):
)


def _list_all_profiles(profiles: dict[str, types.UserItem]) -> None:
def _list_all_profiles(profiles: dict[str, config.UserItem]) -> None:
_echo("Existing Mapillary profiles:")

# Header
Expand Down Expand Up @@ -256,7 +256,7 @@ def _is_login_retryable(ex: requests.HTTPError) -> bool:
def _prompt_login(
user_email: str | None = None,
user_password: str | None = None,
) -> types.UserItem:
) -> config.UserItem:
_enabled = _prompt_enabled()

if user_email is None:
Expand Down Expand Up @@ -288,7 +288,7 @@ def _prompt_login(

data = resp.json()

user_items: types.UserItem = {
user_items: config.UserItem = {
"user_upload_token": str(data["access_token"]),
"MAPSettingsUserKey": str(data["user_id"]),
}
Expand Down
55 changes: 38 additions & 17 deletions mapillary_tools/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,54 @@

import configparser
import os
import sys
import typing as T
from typing import TypedDict

from . import api_v4, types
if sys.version_info >= (3, 11):
from typing import Required
else:
from typing_extensions import Required

from . import api_v4

_CLIENT_ID = api_v4.MAPILLARY_CLIENT_TOKEN
# Windows is not happy with | so we convert MLY|ID|TOKEN to MLY_ID_TOKEN
_CLIENT_ID = _CLIENT_ID.replace("|", "_", 2)

DEFAULT_MAPILLARY_FOLDER = os.path.join(
os.path.expanduser("~"),
".config",
"mapillary",
)

DEFAULT_MAPILLARY_FOLDER = os.path.join(os.path.expanduser("~"), ".config", "mapillary")
MAPILLARY_CONFIG_PATH = os.getenv(
"MAPILLARY_CONFIG_PATH",
os.path.join(
DEFAULT_MAPILLARY_FOLDER,
"configs",
_CLIENT_ID,
# Windows is not happy with | so we convert MLY|ID|TOKEN to MLY_ID_TOKEN
api_v4.MAPILLARY_CLIENT_TOKEN.replace("|", "_"),
),
)


class UserItem(TypedDict, total=False):
MAPOrganizationKey: int | str
# Username
MAPSettingsUsername: str
# User ID
MAPSettingsUserKey: str
# User access token
user_upload_token: Required[str]


UserItemSchema = {
"type": "object",
"properties": {
"MAPOrganizationKey": {"type": ["integer", "string"]},
# Not in use. Keep here for back-compatibility
"MAPSettingsUsername": {"type": "string"},
"MAPSettingsUserKey": {"type": "string"},
"user_upload_token": {"type": "string"},
},
"required": ["user_upload_token"],
"additionalProperties": True,
}


def _load_config(config_path: str) -> configparser.ConfigParser:
config = configparser.ConfigParser()
# Override to not change option names (by default it will lower them)
Expand All @@ -36,19 +59,17 @@ def _load_config(config_path: str) -> configparser.ConfigParser:
return config


def load_user(
profile_name: str, config_path: str | None = None
) -> types.UserItem | None:
def load_user(profile_name: str, config_path: str | None = None) -> UserItem | None:
if config_path is None:
config_path = MAPILLARY_CONFIG_PATH
config = _load_config(config_path)
if not config.has_section(profile_name):
return None
user_items = dict(config.items(profile_name))
return T.cast(types.UserItem, user_items)
return T.cast(UserItem, user_items)


def list_all_users(config_path: str | None = None) -> dict[str, types.UserItem]:
def list_all_users(config_path: str | None = None) -> dict[str, UserItem]:
if config_path is None:
config_path = MAPILLARY_CONFIG_PATH
cp = _load_config(config_path)
Expand All @@ -60,7 +81,7 @@ def list_all_users(config_path: str | None = None) -> dict[str, types.UserItem]:


def update_config(
profile_name: str, user_items: types.UserItem, config_path: str | None = None
profile_name: str, user_items: UserItem, config_path: str | None = None
) -> None:
if config_path is None:
config_path = MAPILLARY_CONFIG_PATH
Expand Down
13 changes: 7 additions & 6 deletions mapillary_tools/geotag/geotag_images_from_gpx.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from typing_extensions import override

from .. import exceptions, geo, types
from ..serializer.description import build_capture_time
from .base import GeotagImagesFromGeneric
from .geotag_images_from_exif import ImageEXIFExtractor

Expand Down Expand Up @@ -43,26 +44,26 @@ def _interpolate_image_metadata_along(

if image_metadata.time < sorted_points[0].time:
delta = sorted_points[0].time - image_metadata.time
gpx_start_time = types.datetime_to_map_capture_time(sorted_points[0].time)
gpx_end_time = types.datetime_to_map_capture_time(sorted_points[-1].time)
gpx_start_time = build_capture_time(sorted_points[0].time)
gpx_end_time = build_capture_time(sorted_points[-1].time)
# with the tolerance of 1ms
if 0.001 < delta:
raise exceptions.MapillaryOutsideGPXTrackError(
f"The image date time is {round(delta, 3)} seconds behind the GPX start point",
image_time=types.datetime_to_map_capture_time(image_metadata.time),
image_time=build_capture_time(image_metadata.time),
gpx_start_time=gpx_start_time,
gpx_end_time=gpx_end_time,
)

if sorted_points[-1].time < image_metadata.time:
delta = image_metadata.time - sorted_points[-1].time
gpx_start_time = types.datetime_to_map_capture_time(sorted_points[0].time)
gpx_end_time = types.datetime_to_map_capture_time(sorted_points[-1].time)
gpx_start_time = build_capture_time(sorted_points[0].time)
gpx_end_time = build_capture_time(sorted_points[-1].time)
# with the tolerance of 1ms
if 0.001 < delta:
raise exceptions.MapillaryOutsideGPXTrackError(
f"The image time is {round(delta, 3)} seconds beyond the GPX end point",
image_time=types.datetime_to_map_capture_time(image_metadata.time),
image_time=build_capture_time(image_metadata.time),
gpx_start_time=gpx_start_time,
gpx_end_time=gpx_end_time,
)
Expand Down
5 changes: 4 additions & 1 deletion mapillary_tools/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pathlib import Path

from . import constants, types
from .serializer.description import DescriptionJSONSerializer

JSONDict = T.Dict[str, T.Union[str, int, float, None]]

Expand Down Expand Up @@ -57,6 +58,8 @@ def write_history(
"summary": summary,
}
if metadatas is not None:
history["descs"] = [types.as_desc(metadata) for metadata in metadatas]
history["descs"] = [
DescriptionJSONSerializer.as_desc(metadata) for metadata in metadatas
]
with open(path, "w") as fp:
fp.write(json.dumps(history))
22 changes: 15 additions & 7 deletions mapillary_tools/process_geotag_properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import collections
import datetime
import json
import logging
import typing as T
from pathlib import Path
Expand All @@ -17,6 +16,11 @@
SourcePathOption,
SourceType,
)
from .serializer.description import (
DescriptionJSONSerializer,
validate_and_fail_metadata,
)
from .serializer.gpx import GPXSerializer

LOG = logging.getLogger(__name__)
DEFAULT_GEOTAG_SOURCE_OPTIONS = [
Expand Down Expand Up @@ -200,12 +204,16 @@ def _write_metadatas(
desc_path: str,
) -> None:
if desc_path == "-":
descs = [types.as_desc(metadata) for metadata in metadatas]
print(json.dumps(descs, indent=2))
descs = DescriptionJSONSerializer.serialize(metadatas)
print(descs.decode("utf-8"))
else:
descs = [types.as_desc(metadata) for metadata in metadatas]
with open(desc_path, "w") as fp:
json.dump(descs, fp)
normalized_suffix = Path(desc_path).suffix.strip().lower()
if normalized_suffix in [".gpx"]:
descs = GPXSerializer.serialize(metadatas)
else:
descs = DescriptionJSONSerializer.serialize(metadatas)
with open(desc_path, "wb") as fp:
fp.write(descs)
LOG.info("Check the description file for details: %s", desc_path)


Expand Down Expand Up @@ -293,7 +301,7 @@ def _validate_metadatas(
# See https://stackoverflow.com/a/61432070
good_metadatas, error_metadatas = types.separate_errors(metadatas)
map_results = utils.mp_map_maybe(
types.validate_and_fail_metadata,
validate_and_fail_metadata,
T.cast(T.Iterable[types.Metadata], good_metadatas),
num_processes=num_processes,
)
Expand Down
3 changes: 2 additions & 1 deletion mapillary_tools/process_sequence_properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import typing as T

from . import constants, exceptions, geo, types, utils
from .serializer.description import DescriptionJSONSerializer

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -106,7 +107,7 @@ def duplication_check(
dup = types.describe_error_metadata(
exceptions.MapillaryDuplicationError(
msg,
types.as_desc(cur),
DescriptionJSONSerializer.as_desc(cur),
distance=distance,
angle_diff=angle_diff,
),
Expand Down
3 changes: 2 additions & 1 deletion mapillary_tools/sample_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .exif_write import ExifEdit
from .geotag import geotag_videos_from_video
from .mp4 import mp4_sample_parser
from .serializer.description import parse_capture_time

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -65,7 +66,7 @@ def sample_video(
video_start_time_dt: datetime.datetime | None = None
if video_start_time is not None:
try:
video_start_time_dt = types.map_capture_time_to_datetime(video_start_time)
video_start_time_dt = parse_capture_time(video_start_time)
except ValueError as ex:
raise exceptions.MapillaryBadParameterError(str(ex))

Expand Down
Loading
Loading