Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions imap_processing/ialirt/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ class IalirtSwapiConstants:
speed_ew = 0.5 * fwhm_width # speed width of energy passband
e_charge = 1.602176634e-19 # electronic charge, [C]
speed_coeff = np.sqrt(2 * e_charge / prot_mass) / 1e3

# temporary correction factor based on WIND data available
# overlapping with the first ~month of SWAPI data.
# to be replaced once SWAPI's L3 processing pipeline is finalized
# this will increase the model count by a factor of e^1,
# changing the output density by a factor of e^-1.
temporary_density_factor = np.exp(1)


class StationProperties(NamedTuple):
Expand Down
55 changes: 44 additions & 11 deletions imap_processing/ialirt/l0/process_swapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
from decimal import Decimal
from math import isfinite

import numpy as np
import pandas as pd
Expand All @@ -22,6 +23,7 @@
logger = logging.getLogger(__name__)

NUM_IALIRT_ENERGY_STEPS = 63
FILLVAL_FLOAT32 = -1.0e31


def count_rate(
Expand Down Expand Up @@ -57,6 +59,9 @@ def count_rate(
speed = speed * 1000 # convert km/s to m/s
density = density * 1e6 # convert 1/cm**3 to 1/m**3

# see comment on Consts.temporary_density_factor
density = density * Consts.temporary_density_factor

return (
(density * Consts.eff_area * (beta / np.pi) ** (3 / 2))
* (np.exp(-beta * (center_speed**2 + speed**2 - 2 * center_speed * speed)))
Expand Down Expand Up @@ -104,15 +109,42 @@ def optimize_pseudo_parameters(
60000 * (initial_speed_guess / 400) ** 2,
]
)
sol = curve_fit(
f=count_rate,
xdata=energy_passbands.take(range(max_index - 3, max_index + 3), mode="clip"),
ydata=count_rates.take(range(max_index - 3, max_index + 3), mode="clip"),
sigma=count_rate_error.take(range(max_index - 3, max_index + 3), mode="clip"),
p0=initial_param_guess,
)

return sol[0]

sol = None

try:
five_point_range = range(max_index - 2, max_index + 2 + 1)
xdata = energy_passbands.take(five_point_range, mode="clip")
ydata = count_rates.take(five_point_range, mode="clip")
sigma = count_rate_error.take(five_point_range, mode="clip")
curve_fit_output = curve_fit(
f=count_rate,
xdata=xdata,
ydata=ydata,
sigma=sigma,
p0=initial_param_guess,
)

# if covariance matrix is not finite, scipy failed to converge to a solution and could just be reporting the initial guess
covariance_matrix_is_finite = np.all(np.isfinite(curve_fit_output[1]))

# fit has failed if R^2 < 0.7
yfit = count_rate(xdata, *curve_fit_output[0])
R2 = 1 - np.sum((ydata - yfit) ** 2) / np.sum((ydata - ydata.mean()) ** 2)
R2_is_acceptable = R2 >= 0.7

if covariance_matrix_is_finite and R2_is_acceptable:
sol = curve_fit_output[0]
except RuntimeError as runtime_error:
logger.error(f"curve_fit failed", runtime_error)
sol = None

# report speed only if fit fails
if sol is None:
sol = initial_param_guess.copy()
sol[1:] = FILLVAL_FLOAT32

return sol


def geometric_mean(
Expand Down Expand Up @@ -237,8 +269,9 @@ def process_swapi_ialirt(
grouped_subset = grouped_dataset.sel(epoch=grouped_dataset.group == group)

raw_coin_count = process_sweep_data(grouped_subset, "swapi_coin_cnt")
# I-ALiRT packets are 16 times less than the regular science packets.
raw_coin_count = raw_coin_count * 16
# I-ALiRT packets have counts compressed by a factor of 16.
# Add 8 to avoid having counts truncated to 0 and to avoid counts being systematically too low
raw_coin_count = raw_coin_count * 16 + 8
# Subset to only the relevant I-ALiRT energy steps
raw_coin_count = raw_coin_count[:, :NUM_IALIRT_ENERGY_STEPS]
raw_coin_rate = raw_coin_count / SWAPI_LIVETIME
Expand Down
103 changes: 96 additions & 7 deletions imap_processing/tests/ialirt/unit/test_process_swapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

from imap_processing import imap_module_directory
from imap_processing.ialirt.l0.process_swapi import (
FILLVAL_FLOAT32,
Consts,
count_rate,
geometric_mean,
optimize_pseudo_parameters,
Expand Down Expand Up @@ -184,8 +186,8 @@ def test_count_rate():
"""Use random realistic values to test for expected output of count_rate()."""

actual_result = count_rate(1370, *[550, 5.27, 1e5])
expected_result = 3073.023325893161
assert actual_result == expected_result, (
expected_result = 3073.023325893161 * Consts.temporary_density_factor
assert np.isclose(actual_result, expected_result), (
f"The actual result of count_rate()"
f" {actual_result} does not "
f"match the expected result "
Expand All @@ -202,24 +204,24 @@ def test_optimize_parameters():
"file_name": "ialirt_test_data_u_sw_550_n_sw_5_T_sw_100000_v2.csv",
"expected_values": { # expected output and acceptable tolerance
"pseudo_speed": (550, 0.01),
"pseudo_density": (5, 0.14),
"pseudo_temperature": (1e5, 0.2),
"pseudo_density": (5 / Consts.temporary_density_factor, 0.14),
"pseudo_temperature": (1e5, 0.25),
},
},
"test_set_2": {
"file_name": "ialirt_test_data_u_sw_650_n_sw_3.0_T_sw_120000_v2.csv",
"expected_values": { # expected output and acceptable tolerance
"pseudo_speed": (650, 0.01),
"pseudo_density": (3, 0.3),
"pseudo_density": (3 / Consts.temporary_density_factor, 0.3),
"pseudo_temperature": (1.2e5, 0.28),
},
},
"test_set_3": {
"file_name": "ialirt_test_data_u_sw_400_n_sw_6.0_T_sw_80000_v2.csv",
"expected_values": { # expected output and acceptable tolerance
"pseudo_speed": (400, 0.01),
"pseudo_density": (6, 0.39),
"pseudo_temperature": (8e4, 0.15),
"pseudo_density": (6 / Consts.temporary_density_factor, 0.39),
"pseudo_temperature": (8e4, 0.2),
},
},
}
Expand Down Expand Up @@ -259,6 +261,93 @@ def test_optimize_parameters():
)


def test_optimize_parameters_exception_handling():
"""Test that the optimize_pseudo_parameters() function reports speed only when given data that causes curve_fit to fail."""

expected_speed = 557.279273 # peak passband speed
file_name = "ialirt_test_data_u_sw_550_n_sw_5_T_sw_100000_v2.csv"

calibration_test_file = pd.read_csv(
f"{imap_module_directory}/tests/ialirt/data/l0/swapi_ialirt_energy_steps.csv"
)
energy_passbands = calibration_test_file["Energy"][0:63].to_numpy().astype(float)

energy_data = pd.read_csv(f"{imap_module_directory}/tests/ialirt/data/l0/{file_name}")
count_rates = energy_data["Count Rates [Hz]"].to_numpy()
count_rates[0] = 0.0
count_rates = np.tile(count_rates, (2, 1))
count_rates_errors = energy_data["Count Rates Error [Hz]"].to_numpy()

"""
code to select the random seed:
for i in range(100):
np.random.seed(i)
result = optimize_pseudo_parameters(count_rates * np.abs(np.random.standard_normal(size=count_rates.shape)), count_rates_errors, energy_passbands)
if np.isclose(result['pseudo_speed'][0], expected_speed, rtol=1e-6) and np.isnan(result['pseudo_density'][0]):
print(i)
"""
np.random.seed(14)
speed, density, temperature = optimize_pseudo_parameters(count_rates * np.abs(np.random.standard_normal(size=count_rates.shape)), count_rates_errors, energy_passbands)

np.testing.assert_allclose(speed, expected_speed, rtol=1e-6)
np.testing.assert_allclose(density, FILLVAL_FLOAT32)
np.testing.assert_allclose(temperature, FILLVAL_FLOAT32)


def test_optimize_parameters_bad_fit_handling():
"""Test that the optimize_pseudo_parameters() function reports speed only when the fit is too poor."""

file_name = "ialirt_test_data_u_sw_550_n_sw_5_T_sw_100000_v2.csv"

calibration_test_file = pd.read_csv(
f"{imap_module_directory}/tests/ialirt/data/l0/swapi_ialirt_energy_steps.csv"
)
energy_passbands = calibration_test_file["Energy"][0:63].to_numpy().astype(float)

energy_data = pd.read_csv(f"{imap_module_directory}/tests/ialirt/data/l0/{file_name}")
count_rates = energy_data["Count Rates [Hz]"].to_numpy()
count_rates[0] = 0.0
count_rates_errors = energy_data["Count Rates Error [Hz]"].to_numpy()

# add high-amplitude randomness to the count rates to make the fit poor
np.random.seed(0)
count_rates = count_rates + np.abs(np.random.standard_normal(size=count_rates.shape) * count_rates.max())

speed, density, temperature = optimize_pseudo_parameters(count_rates, count_rates_errors, energy_passbands)

expected_speed = np.sqrt(energy_passbands[count_rates.argmax(axis=-1)]) * Consts.speed_coeff

np.testing.assert_allclose(speed, expected_speed, rtol=1e-6)
np.testing.assert_allclose(density, FILLVAL_FLOAT32)
np.testing.assert_allclose(temperature, FILLVAL_FLOAT32)


def test_optimize_parameters_bad_covariance_handling():
"""Test that the optimize_pseudo_parameters() function reports speed only when output covariance is nonsensical."""

file_name = "ialirt_test_data_u_sw_550_n_sw_5_T_sw_100000_v2.csv"

calibration_test_file = pd.read_csv(
f"{imap_module_directory}/tests/ialirt/data/l0/swapi_ialirt_energy_steps.csv"
)
energy_passbands = calibration_test_file["Energy"][0:63].to_numpy().astype(float)

energy_data = pd.read_csv(f"{imap_module_directory}/tests/ialirt/data/l0/{file_name}")
count_rates = energy_data["Count Rates [Hz]"].to_numpy()
count_rates[0] = 0.0
count_rates_errors = energy_data["Count Rates Error [Hz]"].to_numpy()

# setting errors to 0 results in infinite covariance
count_rates_errors *= 0

speed, density, temperature = optimize_pseudo_parameters(count_rates, count_rates_errors, energy_passbands)

expected_speed = np.sqrt(energy_passbands[count_rates.argmax(axis=-1)]) * Consts.speed_coeff

np.testing.assert_allclose(speed, expected_speed, rtol=1e-6)
np.testing.assert_allclose(density, FILLVAL_FLOAT32)
np.testing.assert_allclose(temperature, FILLVAL_FLOAT32)

def test_geometric_mean():
"""Test geometric_mean function."""

Expand Down