Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 33 additions & 15 deletions imap_processing/ialirt/generate_coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@
"DSS-74",
"DSS-75",
]
# Non-DSN stations must be listed in order of priority.
NON_DSN_STATIONS = {
"Kiel": STATIONS["Kiel"],
}


def generate_coverage(
def generate_coverage( # noqa: PLR0912
start_time: str,
outages: dict | None = None,
dsn: dict | None = None,
Expand All @@ -55,9 +59,6 @@ def generate_coverage(
duration_seconds = 24 * 60 * 60 # 86400 seconds in 24 hours
time_step = 5 * 60 # 5 min in seconds

stations = {
"Kiel": STATIONS["Kiel"],
}
coverage_dict = {}
outage_dict = {}

Expand All @@ -67,20 +68,33 @@ def generate_coverage(
time_range = np.arange(start_et_input, stop_et_input, time_step)
total_visible_mask = np.zeros(time_range.shape, dtype=bool)

# Precompute DSN outage mask for non-DSN stations
# Precompute DSN occupied mask for non-DSN stations
dsn_contact_mask = np.zeros(time_range.shape, dtype=bool)
dsn_outage_mask = np.zeros(time_range.shape, dtype=bool)
if dsn:
for dsn_contacts in dsn.values():
for start, end in dsn_contacts:
start_et = str_to_et(start)
end_et = str_to_et(end)
dsn_outage_mask |= (time_range >= start_et) & (time_range <= end_et)
for dsn_station, dsn_contacts in dsn.items():
for contact_start, contact_end in dsn_contacts:
contact_start_et = str_to_et(contact_start)
contact_end_et = str_to_et(contact_end)
dsn_contact_mask |= (time_range >= contact_start_et) & (
time_range <= contact_end_et
)

if outages and dsn_station in outages:
for outage_start, outage_end in outages[dsn_station]:
dsn_outage_mask |= (time_range >= str_to_et(outage_start)) & (
time_range <= str_to_et(outage_end)
)

dsn_occupied_mask = dsn_contact_mask & ~dsn_outage_mask

for station_name, (lon, lat, alt, min_elevation) in stations.items():
# Blocks later stations.
non_dsn_occupied_mask = np.zeros(time_range.shape, dtype=bool)
for station_name, (lon, lat, alt, min_elevation) in NON_DSN_STATIONS.items():
_azimuth, elevation = calculate_azimuth_and_elevation(
lon, lat, alt, time_range, obsref="IAU_EARTH"
)
visible = elevation > min_elevation
visible_unblocked = elevation > min_elevation

outage_mask = np.zeros(time_range.shape, dtype=bool)
if outages and station_name in outages:
Expand All @@ -89,9 +103,13 @@ def generate_coverage(
end_et = str_to_et(end)
outage_mask |= (time_range >= start_et) & (time_range <= end_et)

visible[outage_mask] = False
# DSN contacts block other stations
visible[dsn_outage_mask] = False
# Block this station if DSN is active OR already-occupied by earlier stations
unavailable_mask = outage_mask | dsn_occupied_mask | non_dsn_occupied_mask
visible = visible_unblocked & ~unavailable_mask

# This station now occupies these times and will block later stations
non_dsn_occupied_mask |= visible
Comment on lines +110 to +111
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually what you want? This means that the way this loop is defined will dictate the order and amount of time that the stations will cover. If there is 1-hour of overlap but the first station has 4 hours and the second station has 8 hours, that would set this to 4-hours for the first station every time and only 7-hours for the second station, but you might want that second station to take precedence because it has the longer block of time.

In general, I think this is OK right now but it should get some more thought to how we want to approach it. One other idea is to not block based on ground-stations but rather indicate how many stations could cover a specific area, so DSN can block, but the others just add to the coverage time.


total_visible_mask |= visible

coverage_dict[station_name] = et_to_utc(time_range[visible], format_str="ISOC")
Expand Down
107 changes: 106 additions & 1 deletion imap_processing/tests/ialirt/unit/test_generate_coverage.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Test processEphemeris functions."""

from datetime import datetime
from unittest.mock import patch

import numpy as np
import pytest

from imap_processing.ialirt.constants import STATIONS
from imap_processing.ialirt.generate_coverage import (
format_coverage_summary,
generate_coverage,
Expand Down Expand Up @@ -108,4 +110,107 @@ def test_dsn(furnish_kernels):
)

assert "I-ALiRT Coverage Summary" in output["summary"]
assert 40.6 == output["total_coverage_percent"]
assert 42.0 == output["total_coverage_percent"]


@pytest.mark.external_kernel
def test_non_dsn_priority_blocking_with_kernels(furnish_kernels):
"Test that non-dsn station block other non-dsn stations."
kernels = ["naif0012.tls", "pck00011.tpc", "de440s.bsp", "imap_spk_demo.bsp"]
start_time = "2026-09-22T00:00:00Z"

with furnish_kernels(kernels):
# Kiel-only coverage
with patch(
"imap_processing.ialirt.generate_coverage.NON_DSN_STATIONS",
new={"Kiel": STATIONS["Kiel"]},
):
coverage_kiel, _ = generate_coverage(start_time)

kiel_times = coverage_kiel["Kiel"]

# Manaus-only coverage
with patch(
"imap_processing.ialirt.generate_coverage.NON_DSN_STATIONS",
new={"Manaus": STATIONS["Manaus"]},
):
coverage_manaus_only, _ = generate_coverage(start_time)

manaus_only_times = coverage_manaus_only["Manaus"]

overlap = np.intersect1d(kiel_times, manaus_only_times)
# Assert the times overlap.
assert overlap.size > 0

# Kiel first, then Manaus
with patch(
"imap_processing.ialirt.generate_coverage.NON_DSN_STATIONS",
new={
"Kiel": STATIONS["Kiel"],
"Manaus": STATIONS["Manaus"],
},
):
coverage, _ = generate_coverage(start_time)

manaus_coverage = coverage["Manaus"]

# Manaus should have no overlap with Kiel.
blocked_overlap = np.intersect1d(kiel_times, manaus_coverage)
assert blocked_overlap.size == 0
assert manaus_coverage[0] > kiel_times[-1]


@pytest.mark.external_kernel
def test_dsn_outage_allows_ground_station_coverage(furnish_kernels):
"""
DSN contacts block non-DSN stations, but DSN outages remove blocking.
"""
kernels = ["naif0012.tls", "pck00011.tpc", "de440s.bsp", "imap_spk_demo.bsp"]
start_time = "2026-09-22T00:00:00Z"

with furnish_kernels(kernels):
# Baseline Kiel-only coverage (no DSN)
with patch(
"imap_processing.ialirt.generate_coverage.NON_DSN_STATIONS",
new={"Kiel": STATIONS["Kiel"]},
):
coverage_base, _ = generate_coverage(start_time)

kiel_times = coverage_base["Kiel"]

contact_start = kiel_times[10] # inside Kiel coverage
contact_end = kiel_times[16] # 30 min later (inclusive logic in your code)

# Outage inside the DSN contact
outage_start = kiel_times[12]
outage_end = kiel_times[14]

dsn = {"DSS-75": [(contact_start, contact_end)]}

# DSN contact, no DSN outage
# Kiel should be blocked during the full contact window
with patch(
"imap_processing.ialirt.generate_coverage.NON_DSN_STATIONS",
new={"Kiel": STATIONS["Kiel"]},
):
coverage_blocked, _ = generate_coverage(start_time, dsn=dsn, outages=None)

kiel_blocked = coverage_blocked["Kiel"]

assert not np.any(np.isin(kiel_times[10:16], kiel_blocked))

# DSN contact + DSN outage
# Kiel allowed during outage sub-window
outages = {"DSS-75": [(outage_start, outage_end)]}

with patch(
"imap_processing.ialirt.generate_coverage.NON_DSN_STATIONS",
new={"Kiel": STATIONS["Kiel"]},
):
coverage_punched, _ = generate_coverage(
start_time, dsn=dsn, outages=outages
)

kiel_punched = coverage_punched["Kiel"]

assert np.all(np.isin(kiel_times[12:14], kiel_punched))