-
Notifications
You must be signed in to change notification settings - Fork 2
Update waveform conversion code to account for new timing fields #333
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
3f79f5a
a65e5f9
e6e7084
cd35f0c
2a7d797
e2f903e
fc41c89
ef9d137
58482be
0c09a04
b70813e
714184e
01dd36e
c455ce4
1837d87
9645eaa
5d15277
aea5175
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,3 +18,6 @@ dist/ | |
|
|
||
| # mypy | ||
| .mypy_cache/ | ||
|
|
||
| # vscode settings | ||
| .vscode/ | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,15 +2,15 @@ | |
|
|
||
| from __future__ import annotations | ||
|
|
||
| import datetime as dt | ||
| from collections.abc import Mapping | ||
| from collections.abc import Iterable, Mapping | ||
| from typing import Any | ||
|
|
||
| import hightime as ht | ||
| import nitypes.bintime as bt | ||
| import numpy as np | ||
| from nitypes.complex import ComplexInt32Base, ComplexInt32DType | ||
| from nitypes.time import convert_datetime | ||
| from nitypes.time.typing import AnyDateTime, AnyTimeDelta | ||
| from nitypes.waveform import ( | ||
| AnalogWaveform, | ||
| ComplexWaveform, | ||
|
|
@@ -43,17 +43,24 @@ def float64_analog_waveform_to_protobuf( | |
| value: AnalogWaveform[np.float64], / | ||
| ) -> DoubleAnalogWaveform: | ||
| """Convert the Python AnalogWaveform to a protobuf DoubleAnalogWaveform.""" | ||
| _validate_timing(value) | ||
| t0 = _t0_from_waveform(value) | ||
| time_interval = _time_interval_from_waveform(value) | ||
| attributes = _extended_properties_to_attributes(value.extended_properties) | ||
|
|
||
| return DoubleAnalogWaveform( | ||
| t0=t0, | ||
| dt=time_interval, | ||
| y_data=value.scaled_data, | ||
| attributes=attributes, | ||
| ) | ||
| if value.timing.sample_interval_mode in [SampleIntervalMode.REGULAR, SampleIntervalMode.NONE]: | ||
| return DoubleAnalogWaveform( | ||
| t0=_t0_from_waveform(value), | ||
| dt=_time_interval_from_waveform(value), | ||
| y_data=value.scaled_data, | ||
| attributes=attributes, | ||
| timestamp=_timestamp_from_waveform(value), | ||
| time_offset=_time_offset_from_waveform(value), | ||
| ) | ||
| elif value.timing.sample_interval_mode == SampleIntervalMode.IRREGULAR: | ||
| return DoubleAnalogWaveform( | ||
| y_data=value.scaled_data, | ||
| attributes=attributes, | ||
| timestamps=_timestamps_from_waveform(value), | ||
| ) | ||
| else: | ||
| raise ValueError(f"Invalid sample interval mode: {value.timing.sample_interval_mode}") | ||
|
|
||
|
|
||
| def float64_analog_waveform_from_protobuf( | ||
|
|
@@ -76,19 +83,25 @@ def float64_complex_waveform_to_protobuf( | |
| value: ComplexWaveform[np.complex128], / | ||
| ) -> DoubleComplexWaveform: | ||
| """Convert the Python ComplexWaveform to a protobuf DoubleComplexWaveform.""" | ||
| _validate_timing(value) | ||
| t0 = _t0_from_waveform(value) | ||
| time_interval = _time_interval_from_waveform(value) | ||
| attributes = _extended_properties_to_attributes(value.extended_properties) | ||
|
|
||
| interleaved_array = value.scaled_data.view(np.float64) | ||
|
|
||
| return DoubleComplexWaveform( | ||
| t0=t0, | ||
| dt=time_interval, | ||
| y_data=interleaved_array, | ||
| attributes=attributes, | ||
| ) | ||
| if value.timing.sample_interval_mode in [SampleIntervalMode.REGULAR, SampleIntervalMode.NONE]: | ||
| return DoubleComplexWaveform( | ||
| t0=_t0_from_waveform(value), | ||
| dt=_time_interval_from_waveform(value), | ||
| y_data=interleaved_array, | ||
| attributes=attributes, | ||
| timestamp=_timestamp_from_waveform(value), | ||
| time_offset=_time_offset_from_waveform(value), | ||
| ) | ||
| elif value.timing.sample_interval_mode == SampleIntervalMode.IRREGULAR: | ||
| return DoubleComplexWaveform( | ||
| y_data=interleaved_array, | ||
| attributes=attributes, | ||
| timestamps=_timestamps_from_waveform(value), | ||
| ) | ||
| else: | ||
| raise ValueError(f"Invalid sample interval mode: {value.timing.sample_interval_mode}") | ||
|
|
||
|
|
||
| def float64_complex_waveform_from_protobuf( | ||
|
|
@@ -114,21 +127,28 @@ def int16_complex_waveform_to_protobuf( | |
| value: ComplexWaveform[ComplexInt32Base], / | ||
| ) -> I16ComplexWaveform: | ||
| """Convert the Python ComplexWaveform to a protobuf DoubleComplexWaveform.""" | ||
| _validate_timing(value) | ||
| t0 = _t0_from_waveform(value) | ||
| time_interval = _time_interval_from_waveform(value) | ||
| attributes = _extended_properties_to_attributes(value.extended_properties) | ||
| scale = _scale_from_waveform(value) | ||
|
|
||
| interleaved_array = value.raw_data.view(np.int16) | ||
|
|
||
| return I16ComplexWaveform( | ||
| t0=t0, | ||
| dt=time_interval, | ||
| y_data=interleaved_array, | ||
| attributes=attributes, | ||
| scale=scale, | ||
| ) | ||
| if value.timing.sample_interval_mode in [SampleIntervalMode.REGULAR, SampleIntervalMode.NONE]: | ||
| return I16ComplexWaveform( | ||
| t0=_t0_from_waveform(value), | ||
| dt=_time_interval_from_waveform(value), | ||
| y_data=interleaved_array, | ||
| attributes=attributes, | ||
| scale=scale, | ||
| timestamp=_timestamp_from_waveform(value), | ||
| time_offset=_time_offset_from_waveform(value), | ||
| ) | ||
| elif value.timing.sample_interval_mode == SampleIntervalMode.IRREGULAR: | ||
| return I16ComplexWaveform( | ||
| y_data=interleaved_array, | ||
| attributes=attributes, | ||
| scale=scale, | ||
| timestamps=_timestamps_from_waveform(value), | ||
| ) | ||
| else: | ||
| raise ValueError(f"Invalid sample interval mode: {value.timing.sample_interval_mode}") | ||
|
|
||
|
|
||
| def int16_complex_waveform_from_protobuf( | ||
|
|
@@ -176,19 +196,27 @@ def float64_spectrum_from_protobuf(message: DoubleSpectrum, /) -> Spectrum[np.fl | |
|
|
||
| def int16_analog_waveform_to_protobuf(value: AnalogWaveform[np.int16], /) -> I16AnalogWaveform: | ||
| """Convert the Python AnalogWaveform to a protobuf I16AnalogWaveform.""" | ||
| _validate_timing(value) | ||
| t0 = _t0_from_waveform(value) | ||
| time_interval = _time_interval_from_waveform(value) | ||
| attributes = _extended_properties_to_attributes(value.extended_properties) | ||
| scale = _scale_from_waveform(value) | ||
|
|
||
| return I16AnalogWaveform( | ||
| t0=t0, | ||
| dt=time_interval, | ||
| y_data=value.raw_data, | ||
| attributes=attributes, | ||
| scale=scale, | ||
| ) | ||
| attributes = _extended_properties_to_attributes(value.extended_properties) | ||
| if value.timing.sample_interval_mode in [SampleIntervalMode.REGULAR, SampleIntervalMode.NONE]: | ||
| return I16AnalogWaveform( | ||
| t0=_t0_from_waveform(value), | ||
| dt=_time_interval_from_waveform(value), | ||
| y_data=value.raw_data, | ||
| attributes=attributes, | ||
| scale=scale, | ||
| timestamp=_timestamp_from_waveform(value), | ||
| time_offset=_time_offset_from_waveform(value), | ||
| ) | ||
| elif value.timing.sample_interval_mode == SampleIntervalMode.IRREGULAR: | ||
| return I16AnalogWaveform( | ||
| y_data=value.raw_data, | ||
| attributes=attributes, | ||
| scale=scale, | ||
| timestamps=_timestamps_from_waveform(value), | ||
| ) | ||
| else: | ||
| raise ValueError(f"Invalid sample interval mode: {value.timing.sample_interval_mode}") | ||
|
|
||
|
|
||
| def int16_analog_waveform_from_protobuf(message: I16AnalogWaveform, /) -> AnalogWaveform[np.int16]: | ||
|
|
@@ -208,18 +236,26 @@ def int16_analog_waveform_from_protobuf(message: I16AnalogWaveform, /) -> Analog | |
|
|
||
| def digital_waveform_to_protobuf(value: DigitalWaveform[Any], /) -> DigitalWaveformProto: | ||
| """Convert the Python DigitalWaveform to a protobuf DigitalWaveform.""" | ||
| _validate_timing(value) | ||
| t0 = _t0_from_waveform(value) | ||
| time_interval = _time_interval_from_waveform(value) | ||
| attributes = _extended_properties_to_attributes(value.extended_properties) | ||
|
|
||
| return DigitalWaveformProto( | ||
| t0=t0, | ||
| dt=time_interval, | ||
| signal_count=value.signal_count, | ||
| y_data=value.data.tobytes(), | ||
| attributes=attributes, | ||
| ) | ||
| if value.timing.sample_interval_mode in [SampleIntervalMode.REGULAR, SampleIntervalMode.NONE]: | ||
| return DigitalWaveformProto( | ||
| t0=_t0_from_waveform(value), | ||
| dt=_time_interval_from_waveform(value), | ||
| signal_count=value.signal_count, | ||
| y_data=value.data.tobytes(), | ||
| attributes=attributes, | ||
| timestamp=_timestamp_from_waveform(value), | ||
| time_offset=_time_offset_from_waveform(value), | ||
| ) | ||
| elif value.timing.sample_interval_mode == SampleIntervalMode.IRREGULAR: | ||
| return DigitalWaveformProto( | ||
| signal_count=value.signal_count, | ||
| y_data=value.data.tobytes(), | ||
| attributes=attributes, | ||
| timestamps=_timestamps_from_waveform(value), | ||
| ) | ||
| else: | ||
| raise AttributeError(f"Invalid sample interval mode{value.timing.sample_interval_mode}") | ||
|
mjohanse-emr marked this conversation as resolved.
|
||
|
|
||
|
|
||
| def digital_waveform_from_protobuf(message: DigitalWaveformProto, /) -> DigitalWaveform[np.uint8]: | ||
|
|
@@ -280,13 +316,6 @@ def _value_to_attribute(value: ExtendedPropertyValue) -> WaveformAttributeValue: | |
| return attr_value | ||
|
|
||
|
|
||
| def _validate_timing( | ||
| waveform: AnalogWaveform[Any] | ComplexWaveform[Any] | DigitalWaveform[Any], | ||
| ) -> None: | ||
| if waveform.timing.sample_interval_mode == SampleIntervalMode.IRREGULAR: | ||
| raise ValueError("Cannot convert irregular sample interval to protobuf.") | ||
|
|
||
|
|
||
| def _t0_from_waveform( | ||
| waveform: AnalogWaveform[Any] | ComplexWaveform[Any] | DigitalWaveform[Any], | ||
| ) -> PrecisionTimestamp | None: | ||
|
|
@@ -297,6 +326,37 @@ def _t0_from_waveform( | |
| return None | ||
|
|
||
|
|
||
| def _timestamp_from_waveform( | ||
| waveform: AnalogWaveform[Any] | ComplexWaveform[Any] | DigitalWaveform[Any], | ||
| ) -> PrecisionTimestamp | None: | ||
| if waveform.timing.has_timestamp: | ||
| bin_datetime = convert_datetime(bt.DateTime, waveform.timing.timestamp) | ||
| return ptc.bintime_datetime_to_protobuf(bin_datetime) | ||
| else: | ||
| return None | ||
|
|
||
|
|
||
| def _time_offset_from_waveform( | ||
| waveform: AnalogWaveform[Any] | ComplexWaveform[Any] | DigitalWaveform[Any], | ||
| ) -> float: | ||
| if waveform.timing.has_time_offset: | ||
| return waveform.timing.time_offset.total_seconds() | ||
| else: | ||
| return 0 | ||
|
|
||
|
|
||
| def _timestamps_from_waveform( | ||
| waveform: AnalogWaveform[Any] | ComplexWaveform[Any] | DigitalWaveform[Any], | ||
| ) -> Iterable[PrecisionTimestamp] | None: | ||
| if waveform.timing.sample_interval_mode == SampleIntervalMode.IRREGULAR: | ||
| timestamps = waveform.timing.get_timestamps(0, waveform.sample_count) | ||
| return [ | ||
| ptc.bintime_datetime_to_protobuf(convert_datetime(bt.DateTime, ts)) for ts in timestamps | ||
| ] | ||
| else: | ||
| return None | ||
|
|
||
|
|
||
| def _time_interval_from_waveform( | ||
| waveform: AnalogWaveform[Any] | ComplexWaveform[Any] | DigitalWaveform[Any], | ||
| ) -> float: | ||
|
|
@@ -314,29 +374,104 @@ def _timing_from_waveform_message( | |
| | I16ComplexWaveform | ||
| | DigitalWaveformProto | ||
| ), | ||
| ) -> Timing[bt.DateTime | dt.datetime]: | ||
| # Declare timing to accept both bintime and dt.datetime to satisfy mypy. | ||
| timing: Timing[bt.DateTime | dt.datetime] | ||
| if not message.dt and not message.HasField("t0"): | ||
| ) -> Timing[AnyDateTime, AnyTimeDelta, AnyTimeDelta]: | ||
| timing: Timing[AnyDateTime, AnyTimeDelta, AnyTimeDelta] | ||
| _check_regular_vs_irregular_fields(message) | ||
| if message.timestamps: | ||
| timestamps_list = [ptc.bintime_datetime_from_protobuf(ts) for ts in message.timestamps] | ||
| timing = Timing.create_with_irregular_interval(timestamps_list) | ||
| elif not message.dt and not message.HasField("t0"): | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't exactly have an analog in the C# implementation, but it was already here, so I'm leaving it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps this should also be updated to also check that the offset and timestamp fields are also zero or not set? It is valid to set an offset without any timestamp data. Whether setting an offset without also setting dt is perhaps questionable, but I don't know if we should reject it either. |
||
| # If both dt and t0 are unset, use Timing.empty. | ||
| timing = Timing.empty | ||
|
Comment on lines
+383
to
385
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This discards time_offset. Look at CreateNoIntervalPrecisionTiming in the C# version. |
||
| else: | ||
| # Timestamp | ||
| bin_datetime = ptc.bintime_datetime_from_protobuf(message.t0) | ||
| # Timestamp/T0 | ||
| raw_timestamp = _calculate_raw_timestamp(message) | ||
| bin_datetime: bt.DateTime | None | ||
| if raw_timestamp: | ||
| bin_datetime = ptc.bintime_datetime_from_protobuf(raw_timestamp) | ||
| else: | ||
| bin_datetime = None | ||
| if message.time_offset: | ||
| time_offset = ht.timedelta(seconds=message.time_offset) | ||
| else: | ||
| time_offset = None | ||
|
|
||
| # Sample Interval | ||
| if not message.dt: | ||
| timing = Timing.create_with_no_interval(timestamp=bin_datetime) | ||
| timing = Timing.create_with_no_interval(timestamp=bin_datetime, time_offset=time_offset) | ||
|
mjohanse-emr marked this conversation as resolved.
|
||
| else: | ||
| sample_interval = ht.timedelta(seconds=message.dt) | ||
| timing = Timing.create_with_regular_interval( | ||
| sample_interval=sample_interval, | ||
| timestamp=bin_datetime, | ||
| time_offset=time_offset, | ||
| ) | ||
|
|
||
| return timing | ||
|
|
||
|
|
||
| def _check_regular_vs_irregular_fields( | ||
| message: ( | ||
| DoubleAnalogWaveform | ||
| | DoubleComplexWaveform | ||
| | I16AnalogWaveform | ||
| | I16ComplexWaveform | ||
| | DigitalWaveformProto | ||
| ), | ||
| ) -> None: | ||
| has_any_regular_timing_fields = ( | ||
| message.dt or message.time_offset or message.HasField("t0") or message.HasField("timestamp") | ||
| ) | ||
| if message.timestamps and has_any_regular_timing_fields: | ||
| raise ValueError( | ||
| "Waveform message has mutually exclusive timing fields set: " | ||
| "`timestamps` cannot be used together with `t0`, `timestamp`, " | ||
| "`time_offset`, or `dt`." | ||
| ) | ||
|
|
||
|
|
||
| def _calculate_raw_timestamp( | ||
| message: ( | ||
| DoubleAnalogWaveform | ||
| | DoubleComplexWaveform | ||
| | I16AnalogWaveform | ||
| | I16ComplexWaveform | ||
| | DigitalWaveformProto | ||
| ), | ||
| ) -> PrecisionTimestamp | None: | ||
| _verify_t0_timestamp_offset_relationship(message) | ||
| raw_timestamp = None | ||
|
|
||
| # Agreed precedence of timestamp over t0 | ||
| if message.HasField("timestamp"): | ||
| raw_timestamp = message.timestamp | ||
| elif message.HasField("t0"): | ||
| raw_timestamp = message.t0 | ||
|
|
||
| return raw_timestamp | ||
|
|
||
|
|
||
| def _verify_t0_timestamp_offset_relationship( | ||
| message: ( | ||
| DoubleAnalogWaveform | ||
| | DoubleComplexWaveform | ||
| | I16AnalogWaveform | ||
| | I16ComplexWaveform | ||
| | DigitalWaveformProto | ||
| ), | ||
| ) -> None: | ||
| # TODO: Is the conversion to bintime necessary? Seems expensive. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Clean up this todo. I don't think a conversion to a specific time format is strictly needed. We just need to make sure all three values are in the same time format. I'm not sure if python generics enforce that or if we need to explicitly check for that. We probably also want to apply some fuzziness to the math below.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem is that I can't do math on these objects in their current types I need to convert them to something that allows me to check that t0 = timestamp + time_offset. I may be mistaken, but converting
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or possibly we can just leave this check out. It was suggested by copilot, but that doesn't mean that it's strictly necessary. However, I think that if both
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What you can do to make this less expensive is to convert all 3 fields into bintime or None up front and pass them into this function so that you don't do any conversions twice.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, you could convert time_offset to datetime if it's faster, but this component doesn't have any benchmarks for making a decision like this. |
||
| if message.HasField("timestamp") and message.HasField("t0"): | ||
| bt_timestamp = ptc.bintime_datetime_from_protobuf(message.timestamp) | ||
| bt_t0 = ptc.bintime_datetime_from_protobuf(message.t0) | ||
| bt_time_offset = bt.TimeDelta(message.time_offset) | ||
| if bt_t0 != bt_timestamp + bt_time_offset: | ||
| raise ValueError("t0 must equal timestamp + time_offset.") | ||
|
|
||
| if not message.HasField("timestamp") and message.HasField("t0") and message.time_offset: | ||
| raise ValueError("Timestamp must be set when supplying a TimeOffset and T0.") | ||
|
|
||
|
|
||
| def _scale_from_waveform(waveform: AnalogWaveform[Any] | ComplexWaveform[Any]) -> Scale | None: | ||
| if isinstance(waveform.scale_mode, LinearScaleMode): | ||
| linear_scale = LinearScale(gain=waveform.scale_mode.gain, offset=waveform.scale_mode.offset) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.