Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 71 additions & 63 deletions mapillary_tools/blackvue_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,50 @@ def _parse_nmea_lines(
yield epoch_ms, message


def _detect_timezone_offset(
parsed_lines: list[tuple[float, pynmea2.NMEASentence]],
) -> float:
"""
Detect timezone offset between camera clock and GPS time.

Tries RMC messages first (most reliable - has full date+time),
then falls back to GGA/GLL (less reliable - time only, no date).
Returns 0.0 if no offset could be determined.
"""
first_valid_gga_gll: tuple[float, pynmea2.NMEASentence] | None = None

for epoch_sec, message in parsed_lines:
if message.sentence_type == "RMC":
if hasattr(message, "is_valid") and message.is_valid:
offset = _compute_timezone_offset_from_rmc(epoch_sec, message)
if offset is not None:
LOG.debug(
"Computed timezone offset %.1fs from RMC (%s %s)",
offset,
message.datestamp,
message.timestamp,
)
return offset

if first_valid_gga_gll is None and message.sentence_type in ["GGA", "GLL"]:
if hasattr(message, "is_valid") and message.is_valid:
first_valid_gga_gll = (epoch_sec, message)

# Fallback: if no RMC found, try GGA/GLL (less reliable - no date info)
if first_valid_gga_gll is not None:
epoch_sec, message = first_valid_gga_gll
offset = _compute_timezone_offset_from_time_only(epoch_sec, message)
if offset is not None:
LOG.debug(
"Computed timezone offset %.1fs from %s (fallback, no date info)",
offset,
message.sentence_type,
)
return offset

return 0.0


def _parse_gps_box(gps_data: bytes) -> list[telemetry.GPSPoint]:
"""
>>> list(_parse_gps_box(b"[1623057074211]$GPGGA,202530.00,5109.0262,N,11401.8407,W,5,40,0.5,1097.36,M,-17.00,M,18,TSTR*61"))
Expand All @@ -210,83 +254,47 @@ def _parse_gps_box(gps_data: bytes) -> list[telemetry.GPSPoint]:
>>> list(_parse_gps_box(b"[1623057074211]$GPVTG,,T,,M,0.078,N,0.144,K,D*28[1623057075215]"))
[]
"""
timezone_offset: float | None = None
parsed_lines: list[tuple[float, pynmea2.NMEASentence]] = []
first_valid_gga_gll: tuple[float, pynmea2.NMEASentence] | None = None

# First pass: collect parsed_lines and compute timezone offset from the first valid RMC message
# First pass: collect parsed_lines
for epoch_ms, message in _parse_nmea_lines(gps_data):
# Rounding needed to avoid floating point precision issues
epoch_sec = round(epoch_ms / 1000, 3)
parsed_lines.append((epoch_sec, message))
if timezone_offset is None and message.sentence_type == "RMC":
if hasattr(message, "is_valid") and message.is_valid:
timezone_offset = _compute_timezone_offset_from_rmc(epoch_sec, message)
if timezone_offset is not None:
LOG.debug(
"Computed timezone offset %.1fs from RMC (%s %s)",
timezone_offset,
message.datestamp,
message.timestamp,
)
# Track first valid GGA/GLL for fallback
if first_valid_gga_gll is None and message.sentence_type in ["GGA", "GLL"]:
if hasattr(message, "is_valid") and message.is_valid:
first_valid_gga_gll = (epoch_sec, message)

# Fallback: if no RMC found, try GGA/GLL (less reliable - no date info)
if timezone_offset is None and first_valid_gga_gll is not None:
epoch_sec, message = first_valid_gga_gll
timezone_offset = _compute_timezone_offset_from_time_only(epoch_sec, message)
if timezone_offset is not None:
LOG.debug(
"Computed timezone offset %.1fs from %s (fallback, no date info)",
timezone_offset,
message.sentence_type,
)

# If no offset could be determined, use 0 (camera clock assumed correct)
if timezone_offset is None:
timezone_offset = 0.0
timezone_offset = _detect_timezone_offset(parsed_lines)

points_by_sentence_type: dict[str, list[telemetry.GPSPoint]] = {}

# Second pass: apply offset to all GPS points
for epoch_sec, message in parsed_lines:
if message.sentence_type not in ("GGA", "RMC", "GLL"):
continue
if not message.is_valid:
continue

corrected_epoch = round(epoch_sec + timezone_offset, 3)

# https://tavotech.com/gps-nmea-sentence-structure/
if message.sentence_type in ["GGA"]:
if not message.is_valid:
continue
point = telemetry.GPSPoint(
time=corrected_epoch,
lat=message.latitude,
lon=message.longitude,
alt=message.altitude,
angle=None,
epoch_time=corrected_epoch,
fix=telemetry.GPSFix.FIX_3D if message.gps_qual >= 1 else None,
precision=None,
ground_speed=None,
)
points_by_sentence_type.setdefault(message.sentence_type, []).append(point)

elif message.sentence_type in ["RMC", "GLL"]:
if not message.is_valid:
continue
point = telemetry.GPSPoint(
time=corrected_epoch,
lat=message.latitude,
lon=message.longitude,
alt=None,
angle=None,
epoch_time=corrected_epoch,
fix=None,
precision=None,
ground_speed=None,
)
points_by_sentence_type.setdefault(message.sentence_type, []).append(point)
# GGA has altitude and fix; RMC and GLL do not
if message.sentence_type == "GGA":
alt = message.altitude
fix = telemetry.GPSFix.FIX_3D if message.gps_qual >= 1 else None
else:
alt = None
fix = None

point = telemetry.GPSPoint(
time=corrected_epoch,
lat=message.latitude,
lon=message.longitude,
alt=alt,
angle=None,
epoch_time=corrected_epoch,
fix=fix,
precision=None,
ground_speed=None,
)
points_by_sentence_type.setdefault(message.sentence_type, []).append(point)

# This is the extraction order in exiftool
if "RMC" in points_by_sentence_type:
Expand Down
150 changes: 93 additions & 57 deletions mapillary_tools/exiftool_read_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,86 @@ def _deduplicate_gps_points(
return deduplicated_track


def _aggregate_float_values_same_length(
texts_by_tag: dict[str, list[str]],
tag: str | None,
expected_length: int,
) -> list[float | None]:
if tag is not None:
vals = [
_maybe_float(val)
for val in _extract_alternative_fields(texts_by_tag, [tag], list) or []
]
else:
vals = []
while len(vals) < expected_length:
vals.append(None)
return vals


def _aggregate_epoch_times(
texts_by_tag: dict[str, list[str]],
gps_time_tag: str | None,
time_tag: str | None,
timestamps: list[float | None],
expected_length: int,
) -> list[float | None]:
"""Aggregate GPS epoch times from tags, with fallback to per-point timestamps."""
if gps_time_tag is not None:
gps_epoch_times: list[float | None] = [
geo.as_unix_time(dt) if dt is not None else None
for dt in (
exif_read.parse_gps_datetime(text)
for text in _extract_alternative_fields(
texts_by_tag, [gps_time_tag], list
)
or []
)
]
if len(gps_epoch_times) != expected_length:
LOG.warning(
"Found different number of GPS epoch times %d and coordinates %d",
len(gps_epoch_times),
expected_length,
)
gps_epoch_times = [None] * expected_length
return gps_epoch_times
elif time_tag is not None:
# Use per-point GPS timestamps as epoch times
return [t for t in timestamps]
else:
return [None] * expected_length


def _aggregate_timestamps(
texts_by_tag: dict[str, list[str]],
time_tag: str | None,
expected_length: int,
) -> list[float | None] | None:
"""Aggregate timestamps from the time tag.

Returns the timestamp list, or None if the lengths don't match
(caller should return [] in that case).
"""
if time_tag is not None:
dts = [
exif_read.parse_gps_datetime(text)
for text in _extract_alternative_fields(texts_by_tag, [time_tag], list)
or []
]
timestamps = [geo.as_unix_time(dt) if dt is not None else None for dt in dts]
if expected_length != len(timestamps):
LOG.warning(
"Found different number of timestamps %d and coordinates %d",
len(timestamps),
expected_length,
)
return None
else:
timestamps = [0.0] * expected_length
return timestamps


def _aggregate_gps_track(
texts_by_tag: dict[str, list[str]],
time_tag: str | None,
Expand Down Expand Up @@ -159,73 +239,29 @@ def _aggregate_gps_track(
expected_length = len(lats)

# aggregate timestamps (optional)
if time_tag is not None:
dts = [
exif_read.parse_gps_datetime(text)
for text in _extract_alternative_fields(texts_by_tag, [time_tag], list)
or []
]
timestamps = [geo.as_unix_time(dt) if dt is not None else None for dt in dts]
if expected_length != len(timestamps):
# no idea what to do if we have different number of timestamps and coordinates
LOG.warning(
"Found different number of timestamps %d and coordinates %d",
len(timestamps),
expected_length,
)
return []
else:
timestamps = [0.0] * expected_length
timestamps = _aggregate_timestamps(texts_by_tag, time_tag, expected_length)
if timestamps is None:
return []

assert len(timestamps) == expected_length

def _aggregate_float_values_same_length(
tag: str | None,
) -> list[float | None]:
if tag is not None:
vals = [
_maybe_float(val)
for val in _extract_alternative_fields(texts_by_tag, [tag], list) or []
]
else:
vals = []
while len(vals) < expected_length:
vals.append(None)
return vals

# aggregate altitudes (optional)
alts = _aggregate_float_values_same_length(alt_tag)
alts = _aggregate_float_values_same_length(texts_by_tag, alt_tag, expected_length)

# aggregate directions (optional)
directions = _aggregate_float_values_same_length(direction_tag)
directions = _aggregate_float_values_same_length(
texts_by_tag, direction_tag, expected_length
)

# aggregate speeds (optional)
ground_speeds = _aggregate_float_values_same_length(ground_speed_tag)
ground_speeds = _aggregate_float_values_same_length(
texts_by_tag, ground_speed_tag, expected_length
)

# GPS epoch times (optional)
if gps_time_tag is not None:
gps_epoch_times: list[float | None] = [
geo.as_unix_time(dt) if dt is not None else None
for dt in (
exif_read.parse_gps_datetime(text)
for text in _extract_alternative_fields(
texts_by_tag, [gps_time_tag], list
)
or []
)
]
if len(gps_epoch_times) != expected_length:
LOG.warning(
"Found different number of GPS epoch times %d and coordinates %d",
len(gps_epoch_times),
expected_length,
)
gps_epoch_times = [None] * expected_length
elif time_tag is not None:
# Use per-point GPS timestamps as epoch times
gps_epoch_times = [t for t in timestamps]
else:
gps_epoch_times = [None] * expected_length
gps_epoch_times = _aggregate_epoch_times(
texts_by_tag, gps_time_tag, time_tag, timestamps, expected_length
)

# build track
track: list[GPSPoint] = []
Expand Down
Loading