Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 76 additions & 19 deletions src/common/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3121,24 +3121,23 @@ def get_result(self) -> ResultDymolaBinary:
return ResultDymolaBinary(self._get_file_name())

class _DelayedVarReader4Diags(DelayedVarReader4):
def _create_section_data(self, section_name, hdr):
return {
"section": section_name,
"file_position": self.mat_stream.tell(),
"sizeof_type": hdr.dtype.itemsize,
"nbr_points": hdr.dims[1],
"nbr_variables": hdr.dims[0]
}

def read_sub_array(self, hdr, copy=True):
match hdr.name:
case b"data_2":
return {
"section": "data_2",
"file_position": self.mat_stream.tell(),
"sizeof_type": hdr.dtype.itemsize,
"nbr_points": hdr.dims[1],
"nbr_variables": hdr.dims[0]
}
return self._create_section_data("data_2", hdr)
case b"data_3":
return {
"section": "data_3",
"file_position": self.mat_stream.tell(),
"sizeof_type": hdr.dtype.itemsize,
"nbr_points": hdr.dims[1],
"nbr_variables": hdr.dims[0]
}
return self._create_section_data("data_3", hdr)
case b"data_4":
return self._create_section_data("data_4", hdr)
case b"name":
return {
"section": "name",
Expand Down Expand Up @@ -3177,18 +3176,20 @@ def __init__(self, fname):
elif hasattr(fname, "name") and os.path.isfile(fname.name):
self._fname = fname.name

data_sections = ["name", "dataInfo", "data_2", "data_3"]
with open(self._fname, "rb") as f:
delayed = _DelayedVariableLoadDiags(f, chars_as_strings=False)
self.raw: dict = delayed.get_variables(variable_names = data_sections)

self.raw = self._load_raw_data_info()
self._name_info: dict = self.raw["name"]
self._dataInfo: dict = self.raw["dataInfo"]
self._data_2_info: dict = self.raw["data_2"]
self._contains_diagnostic_data: bool = ("data_3" in self.raw)
if self._contains_diagnostic_data:
self._data_3_info: dict = self.raw["data_3"]

def _load_raw_data_info(self) -> dict:
data_sections = ["name", "dataInfo", "data_2", "data_3"]
with open(self._fname, "rb") as f:
delayed = _DelayedVariableLoadDiags(f, chars_as_strings=False)
return delayed.get_variables(variable_names = data_sections)

@cache
def _get_variable_name_to_index_dict(self) -> dict[str, int]:
name_dict: dict = fmi_util.read_name_list(
Expand Down Expand Up @@ -3283,6 +3284,62 @@ def _get_interpolated_trajectory(self, data_index: int) -> np.ndarray:
f = scipy.interpolate.interp1d(time_vector, data, fill_value="extrapolate")
return f(diag_time_vector)


class ResultReaderBinaryMat(ResultReader):
def __init__(self, fname, allow_file_updates=False):
"""
Load a .mat result file.

Parameters::

fname --
Name of file or a file object supported by scipy.io.loadmat,
which the result is written to.

allow_file_updates --
If this is True, file updates (in terms of more
data points being added to the result file) is allowed.
The number of variables stored in the file needs to be
exactly the same and only the number of data points for
the continuous variables are allowed to change.
Default: False
"""
self._delegate = self._get_delegate(fname, allow_file_updates)


def _get_delegate(self, fname, allow_file_updates: bool):
"""Determines what delegate to use based on input result data"""
try:
with open(fname, "rb") as f:
delayed = _DelayedVariableLoadDiags(f, chars_as_strings=False)
data_sections = ["data_3", "data_4"]
raw_data_info = delayed.get_variables(variable_names = data_sections)
except FileNotFoundError as e:
raise NoResultError(str(e)) from e

if raw_data_info.get("data_3") is not None and raw_data_info.get("data_4") is None:
# The result is 'consolidated' if 'data_3' exists but not 'data_4', meaning
# the dynamic diagnostic variable data exists in 'data_3'. This reader also
# handle results only containing 'data_2' but for now 'ResultDymolaBinary' is used.
# NOTE: Argument 'allow_file_updates' is ignored here. Consolidated results cannot
# be updated so it should not matter what value is given.
return _ResultReaderBinaryMatConsolidated(fname)
else:
return ResultDymolaBinary(
fname,
allow_file_updates=allow_file_updates,
)

def get_variable_names(self) -> list[str]:
return self._delegate.get_variable_names()

def get_trajectory(self, name: str) -> Trajectory:
return self._delegate.get_trajectory(name)

def get_trajectories(self, names: list[str]) -> dict[str, Trajectory]:
return self._delegate.get_trajectories(names)


def verify_result_size(file_name, first_point, current_size, previous_size, max_size, ncp, time):
free_space = get_available_disk_space(file_name)

Expand Down
90 changes: 63 additions & 27 deletions tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import re
from io import StringIO, BytesIO
from collections import OrderedDict
from typing import Protocol

from pyfmi import load_fmu
from scipy.io import savemat
Expand All @@ -45,7 +46,7 @@
ResultCSVTextual,
ResultHandlerBinaryFile,
ResultHandlerFile,
_ResultReaderBinaryMatConsolidated,
ResultReaderBinaryMat,
VariableNotFoundError,
ResultHandlerMemory,
Trajectory,
Expand Down Expand Up @@ -576,6 +577,31 @@ def coupled_clutches_cs_2_0():
def coupled_clutches_me_1_0():
return Dummy_FMUModelME1([], os.path.join(file_path, "files", "FMUs", "XML", "ME1.0", "CoupledClutches.fmu"), _connect_dll=False)

class ResultReaderFactory(Protocol):
def __call__(self, fname, delayed_trajectory_loading=True, allow_file_updates=False) -> ResultDymolaBinary | ResultReaderBinaryMat:
...

def create_result_dymola_binary_reader(fname, delayed_trajectory_loading=True, allow_file_updates=False):
return ResultDymolaBinary(
fname,
delayed_trajectory_loading=delayed_trajectory_loading,
allow_file_updates=allow_file_updates,
)

def create_result_reader_binary_mat(fname, delayed_trajectory_loading=True, allow_file_updates=False):
return ResultReaderBinaryMat(
fname,
allow_file_updates=allow_file_updates,
)

@pytest.fixture(params=[create_result_dymola_binary_reader, create_result_reader_binary_mat])
def result_reader_cls(request) -> ResultReaderFactory:
return request.param

@pytest.fixture
def double_pendulum_mat_file():
return os.path.join(file_path, "files", "Results", "DoublePendulum.mat")


@pytest.mark.assimulo
class TestResultFileBinary:
Expand Down Expand Up @@ -622,7 +648,7 @@ def test_get_description(self, coupled_clutches_me_1_0):

assert res.description[res.get_variable_index("J1.phi")] == "Absolute rotation angle of component"

def test_modified_result_file_data_diagnostics(self, coupled_clutches_me_2_0):
def test_modified_result_file_data_diagnostics(self, result_reader_cls: ResultReaderFactory, coupled_clutches_me_2_0):
"""Verify that computed diagnostics can be retrieved from an updated result file"""
model = coupled_clutches_me_2_0
model.setup_experiment()
Expand Down Expand Up @@ -661,7 +687,7 @@ def test_modified_result_file_data_diagnostics(self, coupled_clutches_me_2_0):
result_writer.integration_point()
result_writer.diagnostics_point(diag_data)

res = ResultDymolaBinary('CoupledClutches_result.mat', allow_file_updates=True)
res = result_reader_cls('CoupledClutches_result.mat', allow_file_updates=True)

assert len(res.get_trajectory("@Diagnostics.state_errors.clutch1.w_rel").x) == 2, res.get_trajectory("@Diagnostics.state_errors.clutch1.w_rel").x

Expand Down Expand Up @@ -812,7 +838,7 @@ def test_modified_result_file_data_1_delayed(self, coupled_clutches_me_2_0):
#Assert that no exception is raised
res.get_trajectory("J2.J")

def test_modified_result_file_time(self, coupled_clutches_me_2_0):
def test_modified_result_file_time(self, result_reader_cls: ResultReaderFactory, coupled_clutches_me_2_0):
"""Verify that 'time' can be retrieved from an updated result file"""
model = coupled_clutches_me_2_0
model.setup_experiment()
Expand All @@ -824,7 +850,7 @@ def test_modified_result_file_time(self, coupled_clutches_me_2_0):
result_writer.initialize_complete()
result_writer.integration_point()

res = ResultDymolaBinary('CoupledClutches_result.mat', allow_file_updates=True)
res = result_reader_cls('CoupledClutches_result.mat', allow_file_updates=True)

res.get_trajectory("time")

Expand Down Expand Up @@ -880,16 +906,16 @@ def test_overwriting_results(self, coupled_clutches_me_1_0):
with pytest.raises(JIOError):
res.get_trajectory("J1.phi")

def test_read_all_variables(self):
res = ResultDymolaBinary(os.path.join(file_path, "files", "Results", "DoublePendulum.mat"))
def test_read_all_variables(self, double_pendulum_mat_file: str, result_reader_cls: ResultReaderFactory):
res = result_reader_cls(double_pendulum_mat_file)

assert len(res.get_variable_names()) == 1097, "Incorrect number of variables found, should be 1097"

for var in res.get_variable_names():
res.get_trajectory(var)

def test_data_matrix_delayed_loading(self):
res = ResultDymolaBinary(os.path.join(file_path, "files", "Results", "DoublePendulum.mat"), delayed_trajectory_loading=True)
def test_data_matrix_delayed_loading(self, double_pendulum_mat_file: str):
res = ResultDymolaBinary(double_pendulum_mat_file, delayed_trajectory_loading=True)

data_matrix = res.get_data_matrix()

Expand All @@ -898,8 +924,8 @@ def test_data_matrix_delayed_loading(self):
assert nbr_continuous_variables == 68, "Number of variables is incorrect, should be 68"
assert nbr_points == 502, "Number of points is incorrect, should be 502"

def test_data_matrix_loading(self):
res = ResultDymolaBinary(os.path.join(file_path, "files", "Results", "DoublePendulum.mat"), delayed_trajectory_loading=False)
def test_data_matrix_loading(self, double_pendulum_mat_file: str):
res = ResultDymolaBinary(double_pendulum_mat_file, delayed_trajectory_loading=False)

data_matrix = res.get_data_matrix()

Expand All @@ -908,22 +934,22 @@ def test_data_matrix_loading(self):
assert nbr_continuous_variables == 68, "Number of variables is incorrect, should be 68"
assert nbr_points == 502, "Number of points is incorrect, should be 502"

def test_read_all_variables_from_stream(self):
def test_read_all_variables_from_stream(self, double_pendulum_mat_file: str):

with open(os.path.join(file_path, "files", "Results", "DoublePendulum.mat"), "rb") as f:
with open(double_pendulum_mat_file, "rb") as f:
res = ResultDymolaBinary(f)

assert len(res.get_variable_names()) == 1097, "Incorrect number of variables found, should be 1097"

for var in res.get_variable_names():
res.get_trajectory(var)

def test_compare_all_variables_from_stream(self):
res_file = ResultDymolaBinary(os.path.join(file_path, "files", "Results", "DoublePendulum.mat"))
def test_compare_all_variables_from_stream(self, double_pendulum_mat_file: str):
res_file = ResultDymolaBinary(double_pendulum_mat_file)

assert len(res_file.get_variable_names()) == 1097, "Incorrect number of variables found, should be 1097"

with open(os.path.join(file_path, "files", "Results", "DoublePendulum.mat"), "rb") as f:
with open(double_pendulum_mat_file, "rb") as f:
res_stream = ResultDymolaBinary(f)
assert len(res_stream.get_variable_names()) == 1097, "Incorrect number of variables found, should be 1097"

Expand All @@ -933,9 +959,9 @@ def test_compare_all_variables_from_stream(self):

np.testing.assert_array_equal(x_file.x, x_stream.x, err_msg="Mismatch in array values for var=%s"%var)

def test_on_demand_loading_32_bits(self):
res_demand = ResultDymolaBinary(os.path.join(file_path, "files", "Results", "DoublePendulum.mat"))
res_all = ResultDymolaBinary(os.path.join(file_path, "files", "Results", "DoublePendulum.mat"))
def test_on_demand_loading_32_bits(self, double_pendulum_mat_file: str, result_reader_cls: ResultReaderFactory):
res_demand = result_reader_cls(double_pendulum_mat_file, delayed_trajectory_loading=True)
res_all = result_reader_cls(double_pendulum_mat_file, delayed_trajectory_loading=False)
t_demand = res_demand.get_trajectory('time').x
t_all = res_all.get_trajectory('time').x
np.testing.assert_array_equal(t_demand, t_all, "On demand loaded result and all loaded does not contain equal result.")
Expand Down Expand Up @@ -2406,13 +2432,13 @@ def mat_file_singular_data(tmp_path):

class TestResultReaderForBinaryMatConsolidated:
def test_get_all_variable_names(self, mat_file):
result = _ResultReaderBinaryMatConsolidated(mat_file)
result = ResultReaderBinaryMat(mat_file)
variables = result.get_variable_names()
expected = {"spring.phi_nominal", "spring.k_constant", "time", "torque.flange.phi", "@Diagnostics.step_time", "@Diagnostics.error_code"}
assert set(variables) == expected

def test_get_values_assert_valid(self, mat_file):
result = _ResultReaderBinaryMatConsolidated(mat_file)
result = ResultReaderBinaryMat(mat_file)

# Test spring.phi_nominal (data_1, constant value)
traj_phi = result.get_trajectory("spring.phi_nominal")
Expand Down Expand Up @@ -2443,7 +2469,7 @@ def test_get_values_assert_valid(self, mat_file):
assert np.allclose(traj_error.x, [0.0, 1.0, 0.0])

def test_get_data_only_one_len(self, mat_file_singular_data):
result = _ResultReaderBinaryMatConsolidated(mat_file_singular_data)
result = ResultReaderBinaryMat(mat_file_singular_data)

# Test time variable
traj_time = result.get_trajectory("time")
Expand All @@ -2458,25 +2484,35 @@ def test_get_data_only_one_len(self, mat_file_singular_data):
assert np.allclose(traj_phi.x, [1.0])

def test_get_all_non_existing_variable_throws(self, mat_file):
result = _ResultReaderBinaryMatConsolidated(mat_file)
result = ResultReaderBinaryMat(mat_file)
with pytest.raises(VariableNotFoundError):
result.get_trajectory("does.not.exist")

def test_get_trajectories_from_all_matrices(self, mat_file):
result = _ResultReaderBinaryMatConsolidated(mat_file)
result = ResultReaderBinaryMat(mat_file)
for var in ["spring.phi_nominal", "torque.flange.phi", "@Diagnostics.step_time"]:
assert result.get_trajectory(var) is not None

def test_with_diagnostic_variable(self, mat_file):
result = _ResultReaderBinaryMatConsolidated(mat_file)
result = ResultReaderBinaryMat(mat_file)
assert result.get_trajectory("@Diagnostics.step_time") is not None

def test_without_diagnostic_variable(self, mat_file_no_diag):
result_no_diag = _ResultReaderBinaryMatConsolidated(mat_file_no_diag)
result_no_diag = ResultReaderBinaryMat(mat_file_no_diag)
assert "@Diagnostics.step_time" not in result_no_diag.get_variable_names()

def test_without_diagnostic_variable_delegates(self, mat_file_no_diag):
result = ResultReaderBinaryMat(mat_file_no_diag)
assert result.get_trajectory("time") is not None
assert result.get_trajectories(["time"])["time"] is not None

def test_result_does_not_exist_raises_error(self):
with pytest.raises(NoResultError):
ResultReaderBinaryMat("does-not-exists")


def test_interpolation_between_points(mat_file_interpolation):
result = _ResultReaderBinaryMatConsolidated(mat_file_interpolation)
result = ResultReaderBinaryMat(mat_file_interpolation)
traj = result.get_trajectory("spring.phi_nominal")

assert np.allclose(traj.t, [0.0, 0.5, 1.0, 1.5, 2.0])
Expand Down