Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions imap_processing/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from __future__ import annotations

import argparse
import json
import logging
import re
import sys
Expand Down Expand Up @@ -672,6 +673,21 @@ def do_processing(
f"{science_files}."
)
input_dataset = load_cdf(science_files[0])

# Load conversion table (needed for both hist and DE)
conversion_table_file = dependencies.get_processing_inputs(
descriptor="conversion-table-for-anc-data"
)[0]

with open(conversion_table_file.imap_file_paths[0].construct_path()) as f:
conversion_table_dict = json.load(f)

# Use end date buffer for ancillary data
current_day = np.datetime64(
f"{self.start_date[:4]}-{self.start_date[4:6]}-{self.start_date[6:]}"
)
day_buffer = current_day + np.timedelta64(3, "D")

if "hist" in self.descriptor:
# Create file lists for each ancillary type
excluded_regions_files = dependencies.get_processing_inputs(
Expand All @@ -690,12 +706,6 @@ def do_processing(
descriptor="pipeline-settings"
)[0]

# Use end date buffer for ancillary data
current_day = np.datetime64(
f"{self.start_date[:4]}-{self.start_date[4:6]}-{self.start_date[6:]}"
)
day_buffer = current_day + np.timedelta64(3, "D")

# Create combiners for each ancillary dataset
excluded_regions_combiner = GlowsAncillaryCombiner(
excluded_regions_files, day_buffer
Expand All @@ -721,11 +731,12 @@ def do_processing(
suspected_transients_combiner.combined_dataset,
exclusions_by_instr_team_combiner.combined_dataset,
pipeline_settings_combiner.combined_dataset,
conversion_table_dict,
)
]
else:
# Direct events
datasets = [glows_l1b_de(input_dataset)]
datasets = [glows_l1b_de(input_dataset, conversion_table_dict)]

if self.data_level == "l2":
science_files = dependencies.get_file_paths(source="glows")
Expand Down
60 changes: 47 additions & 13 deletions imap_processing/glows/l1b/glows_l1b.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
"""Methods for processing GLOWS L1B data."""

import dataclasses
import json
from pathlib import Path

import numpy as np
import xarray as xr
Expand All @@ -26,6 +24,7 @@ def glows_l1b(
suspected_transients: xr.Dataset,
exclusions_by_instr_team: xr.Dataset,
pipeline_settings_dataset: xr.Dataset,
conversion_table_dict: dict,
) -> xr.Dataset:
"""
Will process the histogram GLOWS L1B data and format the output datasets.
Expand All @@ -47,8 +46,10 @@ def glows_l1b(
Dataset containing manual exclusions by instrument team with time-based masks.
This is the output from GlowsAncillaryCombiner.
pipeline_settings_dataset : xr.Dataset
Dataset containing pipeline settings, including the L1B conversion table and
other ancillary parameters.
Dataset containing pipeline settings and other ancillary parameters.
conversion_table_dict : dict
Dict containing the L1B conversion table for decoding ancillary parameters.
This is read directly out of the JSON file.

Returns
-------
Expand All @@ -72,10 +73,7 @@ def glows_l1b(
pipeline_settings_dataset.sel(epoch=day, method="nearest"),
)

with open(
Path(__file__).parents[1] / "ancillary" / "l1b_conversion_table_v001.json"
) as f:
ancillary_parameters = AncillaryParameters(json.loads(f.read()))
ancillary_parameters = AncillaryParameters(conversion_table_dict)

output_dataarrays = process_histogram(
input_dataset, ancillary_exclusions, ancillary_parameters, pipeline_settings
Expand All @@ -89,6 +87,7 @@ def glows_l1b(

def glows_l1b_de(
input_dataset: xr.Dataset,
conversion_table_dict: dict,
) -> xr.Dataset:
"""
Process GLOWS L1B direct events data.
Expand All @@ -97,6 +96,8 @@ def glows_l1b_de(
----------
input_dataset : xr.Dataset
The input dataset to process.
conversion_table_dict : dict
Dict containing the L1B conversion table for decoding ancillary parameters.

Returns
-------
Expand All @@ -107,12 +108,18 @@ def glows_l1b_de(
cdf_attrs.add_instrument_global_attrs("glows")
cdf_attrs.add_instrument_variable_attrs("glows", "l1b")

output_dataset = create_l1b_de_output(input_dataset, cdf_attrs)
ancillary_parameters = AncillaryParameters(conversion_table_dict)

output_dataset = create_l1b_de_output(
input_dataset, cdf_attrs, ancillary_parameters
)

return output_dataset


def process_de(l1a: xr.Dataset) -> tuple[xr.DataArray]:
def process_de(
l1a: xr.Dataset, ancillary_parameters: AncillaryParameters
) -> tuple[xr.DataArray]:
"""
Will process the direct event data from the L1A dataset and return the L1B dataset.

Expand All @@ -126,6 +133,8 @@ def process_de(l1a: xr.Dataset) -> tuple[xr.DataArray]:
----------
l1a : xr.Dataset
The L1A dataset to process.
ancillary_parameters : AncillaryParameters
The ancillary parameters for decoding DE data.

Returns
-------
Expand Down Expand Up @@ -164,8 +173,29 @@ def process_de(l1a: xr.Dataset) -> tuple[xr.DataArray]:
# (input) variable.
input_dims[0] = ["within_the_second", "direct_event_components"]

# Create a closure that captures the ancillary parameters
def create_direct_event_l1b(*args) -> tuple: # type: ignore[no-untyped-def]
"""
Create DirectEventL1B object with captured ancillary parameters.

Parameters
----------
*args
Variable arguments passed from xr.apply_ufunc containing L1A data.

Returns
-------
tuple
Tuple of values from DirectEventL1B dataclass.
"""
return tuple(
dataclasses.asdict(
DirectEventL1B(*args, ancillary_parameters) # type: ignore[call-arg]
).values()
)

l1b_fields: tuple = xr.apply_ufunc(
lambda *args: tuple(dataclasses.asdict(DirectEventL1B(*args)).values()),
create_direct_event_l1b,
*dataarrays,
input_core_dims=input_dims,
output_core_dims=output_dims,
Expand Down Expand Up @@ -387,7 +417,9 @@ def create_l1b_hist_output(


def create_l1b_de_output(
input_dataset: xr.Dataset, cdf_attrs: ImapCdfAttributes
input_dataset: xr.Dataset,
cdf_attrs: ImapCdfAttributes,
ancillary_parameters: AncillaryParameters,
) -> xr.Dataset:
"""
Create the output dataset for the L1B direct event data.
Expand All @@ -398,6 +430,8 @@ def create_l1b_de_output(
The input dataset to process.
cdf_attrs : ImapCdfAttributes
The CDF attributes to use for the output dataset.
ancillary_parameters : AncillaryParameters
The ancillary parameters for decoding DE data.

Returns
-------
Expand All @@ -407,7 +441,7 @@ def create_l1b_de_output(
data_epoch = input_dataset["epoch"]
data_epoch.attrs = cdf_attrs.get_variable_attributes("epoch", check_schema=False)

output_dataarrays = process_de(input_dataset)
output_dataarrays = process_de(input_dataset, ancillary_parameters)
within_the_second_data = xr.DataArray(
input_dataset["within_the_second"],
name="within_the_second",
Expand Down
54 changes: 33 additions & 21 deletions imap_processing/glows/l1b/glows_l1b_data.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
"""Module for GLOWS L1B data products."""

import dataclasses
import json
from dataclasses import InitVar, dataclass, field
from pathlib import Path

import numpy as np
import xarray as xr
Expand Down Expand Up @@ -231,13 +229,15 @@ class AncillaryParameters:
"""
GLOWS L1B Ancillary Parameters for decoding ancillary histogram data points.

This class reads from a JSON file input which defines ancillary parameters.
It validates to ensure the input file has all the required parameters.
This class reads from either a dict (JSON input) or an xarray Dataset (from
GlowsAncillaryCombiner) which defines ancillary parameters. It validates to
ensure the input has all the required parameters.

Parameters
----------
input_table : dict
Dictionary generated from input JSON file.
Dictionary generated from input JSON file, or xarray Dataset from
GlowsAncillaryCombiner containing conversion table data.

Attributes
----------
Expand All @@ -258,14 +258,28 @@ class AncillaryParameters:
"p01", "p02", "p03", "p04"]
"""

def __init__(self, input_table: dict):
def __init__(self, input_table: dict) -> None:
"""
Generate ancillary parameters from the given input.

Validates parameters and will throw a KeyError if input data is incorrect.

Parameters
----------
input_table : dict
Dictionary containing conversion parameters.
"""
full_keys = ["min", "max", "n_bits", "p01", "p02", "p03", "p04"]
spin_keys = ["min", "max", "n_bits"]
full_keys = [
"min",
"max",
"n_bits",
"p01",
"p02",
"p03",
"p04",
"physical_unit",
]
spin_keys = ["min", "max", "n_bits", "physical_unit"]

try:
self.version = input_table["version"]
Expand Down Expand Up @@ -425,6 +439,8 @@ class DirectEventL1B:
Flag for pulse test in progress, ends up in flags array
memory_error_detected: InitVar[np.double]
Flag for memory error detected, ends up in flags array
ancillary_parameters: InitVar[AncillaryParameters]
The ancillary parameters for decoding DE data
flags: ndarray
array of flags for extra information, per histogram. This is assembled from
L1A variables.
Expand Down Expand Up @@ -460,6 +476,7 @@ class DirectEventL1B:
hv_test_in_progress: InitVar[np.double]
pulse_test_in_progress: InitVar[np.double]
memory_error_detected: InitVar[np.double]
ancillary_parameters: InitVar[AncillaryParameters]
# The following variables are created from the InitVar data
de_flags: np.ndarray | None = field(init=False, default=None)
# TODO: First two values of DE are sec/subsec
Expand All @@ -483,6 +500,7 @@ def __post_init__(
hv_test_in_progress: np.double,
pulse_test_in_progress: np.double,
memory_error_detected: np.double,
ancillary_parameters: AncillaryParameters,
) -> None:
"""
Generate the L1B data for direct events using the inputs from InitVar.
Expand Down Expand Up @@ -515,6 +533,8 @@ def __post_init__(
Flag indicating if a pulse test is in progress.
memory_error_detected : np.double
Flag indicating if a memory error is detected.
ancillary_parameters : AncillaryParameters
The ancillary parameters for decoding DE data.
"""
self.direct_event_glows_times, self.direct_event_pulse_lengths = (
self.process_direct_events(direct_events)
Expand All @@ -530,22 +550,14 @@ def __post_init__(
int(self.glows_time_last_pps), glows_ssclk_last_pps
).to_seconds()

with open(
Path(__file__).parents[1] / "ancillary" / "l1b_conversion_table_v001.json"
) as f:
self.ancillary_parameters = AncillaryParameters(json.loads(f.read()))

self.filter_temperature = self.ancillary_parameters.decode(
# Use passed-in ancillary parameters instead of loading from file
self.filter_temperature = ancillary_parameters.decode(
"filter_temperature", self.filter_temperature
)
self.hv_voltage = self.ancillary_parameters.decode(
"hv_voltage", self.hv_voltage
)
self.spin_period = self.ancillary_parameters.decode(
"spin_period", self.spin_period
)
self.hv_voltage = ancillary_parameters.decode("hv_voltage", self.hv_voltage)
self.spin_period = ancillary_parameters.decode("spin_period", self.spin_period)

self.spin_phase_at_next_pps = self.ancillary_parameters.decode(
self.spin_phase_at_next_pps = ancillary_parameters.decode(
"spin_phase", self.spin_phase_at_next_pps
)

Expand Down
Loading