Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions imap_processing/idex/idex_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class IdexConstants:
SECONDS_IN_DAY = 86400
# Nanoseconds in day
NANOSECONDS_IN_DAY = SECONDS_IN_DAY * int(1e9)
# Picocoulombs to coulombs conversion factor
PICOCOULOMB_TO_COULOMB = 1e-12
# fg to kg conversion factor
FG_TO_KG = 1e-15

Expand Down
54 changes: 32 additions & 22 deletions imap_processing/idex/idex_l2a.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ def load_calibration_files(ancillary_files: dict) -> tuple[NDArray, NDArray]:
"""
# Load calibration coefficients from ancillary files
t_rise_params = pd.read_csv(
ancillary_files["l2a-calibration-curve-yield-params"], skiprows=1, header=None
ancillary_files["l2a-calibration-curve-t-rise"], skiprows=1, header=None
).values.flatten()[:8]
yield_params = pd.read_csv(
ancillary_files["l2a-calibration-curve-t-rise"], skiprows=1, header=None
ancillary_files["l2a-calibration-curve-yield-params"], skiprows=1, header=None
).values.flatten()[:8]
Comment thread
lacoak21 marked this conversation as resolved.
return t_rise_params, yield_params

Expand Down Expand Up @@ -319,11 +319,11 @@ def calculate_velocity_and_mass(
Parameters
----------
sig_amp : float
Signal amplitude.
Signal amplitude (pC).
t_rise : float
T_rise fit parameter from the target fit.
T_rise fit parameter from the target fit (us).
t_rise_params : np.ndarray
Calibration parameters for rise time.
Calibration parameters for rise time (us).
yield_params : np.ndarray
Calibration parameters for yield.

Expand All @@ -334,7 +334,7 @@ def calculate_velocity_and_mass(
mass_est : float
Estimated mass.
"""
log_a_t: float = np.log10(t_rise_params[0])
log_a_t: float = float(t_rise_params[0])
try:
root = root_scalar(
lambda lv: (
Expand All @@ -351,41 +351,51 @@ def calculate_velocity_and_mass(
)
return np.nan, np.nan

log_a_y: float = np.log10(yield_params[0])
log_a_y: float = float(yield_params[0])
yield_val = 10 ** log_smooth_powerlaw(np.log10(v_est), log_a_y, yield_params[1:])
mass_est = sig_amp / yield_val
sig_amp_coulombs = sig_amp * idex_constants.PICOCOULOMB_TO_COULOMB
mass_est = sig_amp_coulombs / yield_val
Comment on lines 355 to +357

return v_est, mass_est


def log_smooth_powerlaw(log_v: float, log_a: float, params: np.ndarray) -> float:
"""
Define a smoothly transitioning power law to fit the calibration curve to.
Define a smoothly transitioning power law used by the IDEX calibration curves.

This helper is used in two ways:
- rise-time calibration: log10(rise_time [us]) to log10(velocity [km/s])
- yield calibration: log10(velocity [km/s]) to log10(charge_yield [C/kg])

Parameters
----------
log_v : float
Velocity.
The log10 input to the calibration curve.
This is either log10(rise_time [us]) for the rise-time case or
Log10(velocity [km/s]) for the yield case.
Comment thread
lacoak21 marked this conversation as resolved.
log_a : float
Scale factor.
Log10 of the calibration scale factor A.
params : np.ndarray
Calibration parameters for the power law.
Calibration parameters for the power law
[a1, a2, a3, vb, vc, k, m].

Returns
-------
float
The value of the power law at the given velocity.
The calibrated log10 output.
This is either log10(velocity [km/s]) for the rise-time case or
log10(charge_yield [C/kg]) for the yield case.
"""
# Unpack the rest of the calibration parameters
# a1, a2, and a3 are the power law exponents for the low, medium, and high-velocity
# segments.
# vb and vc are the characteristic speeds where the slope transition happens, and k
# setting the sharpness of the transitions.
a1, a2, a3, vb, vc, _k, m = params
a1, a2, a3, vb, vc, k, _m = params
v = 10**log_v
base = log_a + a1 * log_v
transition1 = (1 + (v / vb) ** m) ** ((a2 - a1) / m)
transition2 = (1 + (v / vc) ** m) ** ((a3 - a2) / m)
transition1 = (1 + (v / vb) ** k) ** ((a2 - a1) / k)
transition2 = (1 + (v / vc) ** k) ** ((a3 - a2) / k)
return base + np.log10(transition1 * transition2)


Expand Down Expand Up @@ -759,7 +769,7 @@ def estimate_dust_mass(
Parameters
----------
low_sampling_time : xarray.DataArray
The low sampling time array.
The low sampling time array in microseconds.
target_signal : xarray.DataArray
Target signal data.
remove_noise : bool
Expand Down Expand Up @@ -824,8 +834,8 @@ def estimate_dust_mass(
if channel_name != "Ion_Grid" and amplitude <= 0.0:
amplitude = float(np.max(signal))

rise_time_0 = 0.371 # How fast the signal rises (s)
discharge_time_0 = 37.1 # How fast signal decays (s)
rise_time_0 = 0.371 # How fast the signal rises (us)
discharge_time_0 = 37.1 # How fast signal decays (us)

p0 = [time_of_impact, constant_offset, amplitude, rise_time_0, discharge_time_0]
positive_min = float(np.finfo(float).eps)
Expand Down Expand Up @@ -893,13 +903,13 @@ def fit_impact(
Parameters
----------
time : np.ndarray
Time values for the signal.
Time values for the signal (us).
time_of_impact : float
Time of dust impact.
Time of dust impact (us).
constant_offset : float
Initial baseline noise.
amplitude : float
Signal height.
Signal height (pC).
rise_time : float
How fast the signal rises (s).
discharge_time : float
Expand Down
96 changes: 93 additions & 3 deletions imap_processing/tests/idex/test_idex_l2a.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
estimate_dust_mass,
fit_impact,
idex_l2a,
load_calibration_files,
log_smooth_powerlaw,
remove_signal_noise,
sine_fit,
time_to_mass,
Expand Down Expand Up @@ -66,6 +68,13 @@ def mock_microphonics_noise(time: np.ndarray) -> np.ndarray:
return combined_sig


def _write_calibration_csv(path, values):
"""Write a one-row calibration CSV with the ancillary-file structure."""
header = "A,a1,a2,a3,v_b,v_c,k,sigma,delta\n"
row = ",".join(str(value) for value in values) + "\n"
path.write_text(header + row)


@pytest.mark.external_test_data
def test_l2a_logical_source_and_cdf(l2a_dataset: xr.Dataset):
"""Tests that the ``idex_l2a`` function generates datasets
Expand Down Expand Up @@ -275,19 +284,100 @@ def test_analyze_peaks_warning(caplog):
np.testing.assert_array_equal(area_under_curve, np.zeros(area_under_curve.shape))


def test_load_calibration_files_returns_expected_t_rise_params(tmp_path):
"""Tests that t-rise ancillary values are loaded into t_rise_params."""
expected_t_rise_params = np.array([3.6, -0.2, -2.0, 0.38, 5.1, 13.7, 13.3, 0.28])
yield_values = np.array([0.06, 2.8, 5.9, 4.1, 13.0, 22.7, 8.2, 0.40, 1.47])

t_rise_path = tmp_path / "t_rise.csv"
yield_path = tmp_path / "yield.csv"
_write_calibration_csv(t_rise_path, expected_t_rise_params)
_write_calibration_csv(yield_path, yield_values)

t_rise_params, _yield_params = load_calibration_files(
{
"l2a-calibration-curve-t-rise": t_rise_path,
"l2a-calibration-curve-yield-params": yield_path,
}
)

np.testing.assert_allclose(t_rise_params, expected_t_rise_params)


def test_load_calibration_files_returns_expected_yield_params(tmp_path):
"""Tests that yield ancillary values are loaded into yield_params."""
t_rise_values = np.array([3.6, -0.2, -2.0, 0.38, 5.1, 13.7, 13.3, 0.28, 1.33])
expected_yield_params = np.array([0.06, 2.8, 5.9, 4.1, 13.0, 22.7, 8.2, 0.40])

t_rise_path = tmp_path / "t_rise.csv"
yield_path = tmp_path / "yield.csv"
_write_calibration_csv(t_rise_path, t_rise_values)
_write_calibration_csv(yield_path, expected_yield_params)

_t_rise_params, yield_params = load_calibration_files(
{
"l2a-calibration-curve-t-rise": t_rise_path,
"l2a-calibration-curve-yield-params": yield_path,
}
)

np.testing.assert_allclose(yield_params, expected_yield_params)


def test_log_smooth_powerlaw_yield_curve_at_10_km_s():
"""Tests that the yield calibration returns the expected value at 10 km/s."""
yield_params = np.array([0.06, 2.8, 5.9, 4.1, 13.0, 22.7, 8.2, 0.40])

log_yield = log_smooth_powerlaw(np.log10(10.0), yield_params[0], yield_params[1:])
yield_value = 10**log_yield

assert yield_value == pytest.approx(755.0, rel=1e-3)


def test_calculate_velocity_and_mass_at_10_km_s():
"""Tests mass estimation using a mocked 10 km/s velocity solution."""
t_rise_params = np.array([3.6, -0.2, -2.0, 0.38, 5.1, 13.7, 13.3, 0.28])
yield_params = np.array([0.06, 2.8, 5.9, 4.1, 13.0, 22.7, 8.2, 0.40])
sig_amp_pc = 10.0

# This test intentionally bypasses the t_rise -> velocity inversion.
# The t_rise calibration path is currently under review and will be
# covered by a dedicated follow-up test once that behavior is finalized.
mocked_root = mock.Mock()
mocked_root.root = 1.0 # 10**1.0 == 10 km/s

with mock.patch(
"imap_processing.idex.idex_l2a.root_scalar", return_value=mocked_root
):
velocity_estimate, mass_estimate = calculate_velocity_and_mass(
sig_amp_pc, 2.0, t_rise_params, yield_params
)

expected_yield = 755.0090524738858
expected_mass_kg = sig_amp_pc * 1e-12 / expected_yield

assert velocity_estimate == pytest.approx(10.0, rel=1e-12)
assert mass_estimate == pytest.approx(expected_mass_kg, rel=1e-12)


@pytest.mark.external_test_data
def test_velocity_and_mass_estimate(ancillary_files):
"""Tests that the velocity and mass estimate function."""
# Load calibration coefficients from ancillary files
t_rise_params = pd.read_csv(
ancillary_files["l2a-calibration-curve-yield-params"], skiprows=1, header=None
ancillary_files["l2a-calibration-curve-t-rise"], skiprows=1, header=None
).values.flatten()[:8]
yield_params = pd.read_csv(
ancillary_files["l2a-calibration-curve-t-rise"], skiprows=1, header=None
ancillary_files["l2a-calibration-curve-yield-params"], skiprows=1, header=None
).values.flatten()[:8]
estimates = calculate_velocity_and_mass(10, 2, t_rise_params, yield_params)
expected_velocity = 5.0
t_rise = 10 ** log_smooth_powerlaw(
np.log10(expected_velocity), float(t_rise_params[0]), t_rise_params[1:]
)
estimates = calculate_velocity_and_mass(10, t_rise, t_rise_params, yield_params)
assert len(estimates) == 2
assert not np.any(np.isnan(estimates))
assert estimates[0] == pytest.approx(expected_velocity, rel=1e-12)


def test_analyze_peaks_perfect_fits():
Expand Down
Loading