Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 74 additions & 1 deletion imap_processing/hi/hi_goodtimes.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class CullCode(IntEnum):
STAT_FILTER_0 = 1 << 4 # 16
STAT_FILTER_1 = 1 << 5 # 32
STAT_FILTER_2 = 1 << 6 # 64
BAD_ESA_VOLTAGE = 1 << 7 # 128


def hi_goodtimes(
Expand All @@ -63,6 +64,7 @@ def hi_goodtimes(
This is the top-level function that orchestrates all goodtimes culling
operations for a single pointing. It applies the following filters in order:

0. mark_bad_esa_voltage - Remove times with invalid ESA voltage configuration
1. mark_incomplete_spin_sets - Remove incomplete 8-spin histogram periods
2. mark_drf_times - Remove times during spacecraft drift restabilization
3. mark_bad_tdc_cal - Remove times with failed TDC calibration
Expand Down Expand Up @@ -217,7 +219,7 @@ def _apply_goodtimes_filters(
"""
Apply all goodtimes culling filters to the dataset.

Modifies goodtimes_ds in place by applying filters 1-7.
Modifies goodtimes_ds in place by applying filters 0-7.

Parameters
----------
Expand Down Expand Up @@ -269,6 +271,10 @@ def _apply_goodtimes_filters(

# === Apply culling filters ===

# 0. Mark bad ESA voltage times
logger.info("Applying filter: mark_bad_esa_voltage")
mark_bad_esa_voltage(goodtimes_ds, current_l1b_de)
Comment thread
tmplummer marked this conversation as resolved.

# 1. Mark incomplete spin sets
logger.info("Applying filter: mark_incomplete_spin_sets")
mark_incomplete_spin_sets(goodtimes_ds, current_l1b_de)
Expand Down Expand Up @@ -942,6 +948,73 @@ def finalize_dataset(self) -> xr.Dataset:
# ==============================================================================


def mark_bad_esa_voltage(
goodtimes_ds: xr.Dataset,
l1b_de: xr.Dataset,
cull_code: int = CullCode.BAD_ESA_VOLTAGE,
) -> None:
"""
Mark times when ESA voltages don't match expected values.

Filters out 8-spin periods where the ESA energy step is invalid, indicating
either calibration mode (esa_energy_step=0) or an ESA voltage mismatch
(esa_energy_step=FILLVAL). The voltage validation is performed during L1B
processing by matching measured inner/outer ESA voltages against the ESA
energies lookup table.

Algorithm Document Reference:
Section 2.3.2: Good times selection requiring valid ESA configuration

Parameters
----------
goodtimes_ds : xarray.Dataset
Goodtimes dataset to update with cull flags.
l1b_de : xarray.Dataset
L1B Direct Event data containing esa_energy_step field.
cull_code : int, optional
Cull code to use for marking bad times (default: CullCode.BAD_ESA_VOLTAGE).

Notes
-----
This function modifies goodtimes_ds in place by calling mark_bad_times()
for MET timestamps with invalid ESA energy steps.

Invalid ESA energy steps:
- esa_energy_step = 0: Calibration mode (ESA stepping but not science data)
- esa_energy_step = FILLVAL (255): ESA voltage mismatch - measured voltages
didn't match any known energy step in the lookup table
"""
logger.info("Running mark_bad_esa_voltage culling")

# Get FILLVAL from attributes (should be 255 for uint8)
fillval = l1b_de["esa_energy_step"].attrs.get("FILLVAL", 255)

esa_step_met = l1b_de["esa_step_met"].values
esa_energy_step = l1b_de["esa_energy_step"].values

# Find packets with invalid ESA energy steps
invalid_mask = (esa_energy_step == 0) | (esa_energy_step == fillval)

if not np.any(invalid_mask):
logger.info("No invalid ESA energy steps found")
return

# Get unique METs of invalid packets
invalid_mets = np.unique(esa_step_met[invalid_mask])

# Mark all identified times as bad (all spin bins)
goodtimes_ds.goodtimes.mark_bad_times(met=invalid_mets, cull=cull_code)

# Log statistics
n_invalid_0: int = np.sum(esa_energy_step == 0)
n_invalid_fillval: int = np.sum(esa_energy_step == fillval)
logger.info(
f"Found {n_invalid_0} packets with esa_energy_step=0 (calibration), "
f"{n_invalid_fillval} with esa_energy_step=FILLVAL (voltage mismatch). "
f"Marked {len(invalid_mets)} 8-spin period(s) as bad."
)


def mark_incomplete_spin_sets(
goodtimes_ds: xr.Dataset,
l1b_de: xr.Dataset,
Expand Down
108 changes: 107 additions & 1 deletion imap_processing/tests/hi/test_hi_goodtimes.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
_identify_cull_pattern,
create_goodtimes_dataset,
hi_goodtimes,
mark_bad_esa_voltage,
mark_bad_tdc_cal,
Comment thread
tmplummer marked this conversation as resolved.
mark_drf_times,
mark_incomplete_spin_sets,
Expand Down Expand Up @@ -86,6 +87,7 @@ def test_cull_code_values(self):
assert CullCode.STAT_FILTER_0 == 16
assert CullCode.STAT_FILTER_1 == 32
assert CullCode.STAT_FILTER_2 == 64
assert CullCode.BAD_ESA_VOLTAGE == 128

def test_cull_code_is_int(self):
"""Test that CullCode values are integers."""
Expand Down Expand Up @@ -3788,6 +3790,7 @@ def test_loads_cal_config(self, tmp_path):
patch(
"imap_processing.hi.utils.CalibrationProductConfig.from_csv"
) as mock_cal_load,
patch("imap_processing.hi.hi_goodtimes.mark_bad_esa_voltage"),
patch("imap_processing.hi.hi_goodtimes.mark_incomplete_spin_sets"),
patch("imap_processing.hi.hi_goodtimes.mark_drf_times"),
patch("imap_processing.hi.hi_goodtimes.mark_overflow_packets"),
Expand All @@ -3810,7 +3813,7 @@ def test_loads_cal_config(self, tmp_path):
mock_cal_load.assert_called_once_with(cal_path)

def test_calls_all_filters(self, tmp_path):
"""Test that all 7 filters are called."""
"""Test that all 8 filters are called."""
mock_goodtimes = MagicMock()
mock_goodtimes.goodtimes.get_cull_statistics.return_value = {
"good_bins": 100,
Expand All @@ -3825,6 +3828,7 @@ def test_calls_all_filters(self, tmp_path):
"imap_processing.hi.utils.CalibrationProductConfig.from_csv",
return_value=mock_cal,
),
patch("imap_processing.hi.hi_goodtimes.mark_bad_esa_voltage") as mock_f0,
patch(
"imap_processing.hi.hi_goodtimes.mark_incomplete_spin_sets"
) as mock_f1,
Expand All @@ -3850,6 +3854,7 @@ def test_calls_all_filters(self, tmp_path):
cal_product_config_path=tmp_path / "cal.csv",
)

mock_f0.assert_called_once()
mock_f1.assert_called_once()
mock_f2.assert_called_once()
mock_f3.assert_called_once()
Expand All @@ -3874,6 +3879,7 @@ def test_raises_statistical_filter_0_errors(self, tmp_path):
"imap_processing.hi.utils.CalibrationProductConfig.from_csv",
return_value=mock_cal,
),
patch("imap_processing.hi.hi_goodtimes.mark_bad_esa_voltage"),
patch("imap_processing.hi.hi_goodtimes.mark_incomplete_spin_sets"),
patch("imap_processing.hi.hi_goodtimes.mark_drf_times"),
patch("imap_processing.hi.hi_goodtimes.mark_bad_tdc_cal"),
Expand Down Expand Up @@ -3909,6 +3915,7 @@ def test_raises_statistical_filter_1_errors(self, tmp_path):
"imap_processing.hi.utils.CalibrationProductConfig.from_csv",
return_value=mock_cal,
),
patch("imap_processing.hi.hi_goodtimes.mark_bad_esa_voltage"),
patch("imap_processing.hi.hi_goodtimes.mark_incomplete_spin_sets"),
patch("imap_processing.hi.hi_goodtimes.mark_drf_times"),
patch("imap_processing.hi.hi_goodtimes.mark_bad_tdc_cal"),
Expand Down Expand Up @@ -4132,3 +4139,102 @@ def test_returns_datasets(self, tmp_path):

# Should return finalized dataset, not original
assert result == [mock_finalized]


class TestMarkBadEsaVoltage:
"""Tests for mark_bad_esa_voltage culling function."""

@pytest.fixture
def goodtimes_for_esa(self):
"""Create a goodtimes dataset for ESA voltage testing."""
# METs at 50-second intervals
met_values = np.array([1000.0, 1050.0, 1100.0, 1150.0, 1200.0])
return xr.Dataset(
{
"cull_flags": xr.DataArray(
np.zeros((5, 90), dtype=np.uint8),
dims=["met", "spin_bin"],
),
"esa_step": xr.DataArray(np.array([1, 2, 3, 4, 5]), dims=["met"]),
},
coords={
"met": met_values,
"spin_bin": np.arange(90),
},
)

@pytest.fixture
def l1b_de_all_valid(self):
"""L1B DE with all valid esa_energy_step values."""
return xr.Dataset(
{
"esa_step_met": (["epoch"], np.array([1000, 1050, 1100, 1150, 1200])),
"esa_energy_step": (["epoch"], np.array([1, 2, 3, 4, 5])), # All valid
}
)

@pytest.fixture
def l1b_de_with_zero(self):
"""L1B DE with esa_energy_step=0 (calibration)."""
ds = xr.Dataset(
{
"esa_step_met": (["epoch"], np.array([1000, 1050, 1100, 1150, 1200])),
"esa_energy_step": (["epoch"], np.array([1, 0, 3, 4, 5])), # 0 at idx 1
}
)
ds["esa_energy_step"].attrs["FILLVAL"] = 255
return ds

@pytest.fixture
def l1b_de_with_fillval(self):
"""L1B DE with esa_energy_step=FILLVAL (voltage mismatch)."""
ds = xr.Dataset(
{
"esa_step_met": (["epoch"], np.array([1000, 1050, 1100, 1150, 1200])),
"esa_energy_step": (
["epoch"],
np.array([1, 2, 255, 4, 5]),
), # FILLVAL at idx 2
}
)
ds["esa_energy_step"].attrs["FILLVAL"] = 255
return ds

def test_mark_bad_esa_voltage_all_valid(self, goodtimes_for_esa, l1b_de_all_valid):
"""Test that no times are marked when all ESA energy steps are valid."""
mark_bad_esa_voltage(goodtimes_for_esa, l1b_de_all_valid)
assert np.all(goodtimes_for_esa["cull_flags"].values == CullCode.GOOD)

def test_mark_bad_esa_voltage_with_zero(self, goodtimes_for_esa, l1b_de_with_zero):
"""Test that times are marked when esa_energy_step=0 (calibration)."""
mark_bad_esa_voltage(goodtimes_for_esa, l1b_de_with_zero)

# MET 1050 (index 1) should be culled
assert np.all(
goodtimes_for_esa["cull_flags"].values[1, :] == CullCode.BAD_ESA_VOLTAGE
)
# Other times should remain good
assert np.all(goodtimes_for_esa["cull_flags"].values[0, :] == CullCode.GOOD)
assert np.all(goodtimes_for_esa["cull_flags"].values[2, :] == CullCode.GOOD)

def test_mark_bad_esa_voltage_with_fillval(
self, goodtimes_for_esa, l1b_de_with_fillval
):
"""Test that times are marked when esa_energy_step=FILLVAL."""
mark_bad_esa_voltage(goodtimes_for_esa, l1b_de_with_fillval)

# MET 1100 (index 2) should be culled
assert np.all(
goodtimes_for_esa["cull_flags"].values[2, :] == CullCode.BAD_ESA_VOLTAGE
)
# Other times should remain good
assert np.all(goodtimes_for_esa["cull_flags"].values[0, :] == CullCode.GOOD)
assert np.all(goodtimes_for_esa["cull_flags"].values[1, :] == CullCode.GOOD)

def test_mark_bad_esa_voltage_custom_cull_code(
self, goodtimes_for_esa, l1b_de_with_zero
):
"""Test using a custom cull code."""
custom_code = 200
mark_bad_esa_voltage(goodtimes_for_esa, l1b_de_with_zero, cull_code=custom_code)
assert np.all(goodtimes_for_esa["cull_flags"].values[1, :] == custom_code)
Loading