Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/infuse_iot/generated/tdf_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ def field_information(cls):
sf_name = _public_name(subfield)
subfields.append({"name": sf_name, "type": subfield[1]})
info.append({"name": f_name, "type": field[1], "subfields": subfields})
elif isinstance(field[1], ctypes.Array):
info.append({"name": f_name, "type": list})
else:
info.append({"name": f_name, "type": field[1]})
return info
Expand All @@ -103,6 +105,10 @@ def from_buffer_consume(cls, source: bytes, offset: int = 0) -> Self:
raise RuntimeError
source_var_num = source_var_len // var_type_size

# No trailing VLA
if source_var_num == 0:
return cls.from_buffer_copy(source, offset)

# Dynamically create subclass with correct length
class TdfVLA(ctypes.LittleEndianStructure):
name = cls.name
Expand All @@ -113,4 +119,11 @@ class TdfVLA(ctypes.LittleEndianStructure):
iter_fields = cls.iter_fields
field_information = cls.field_information

# Copy convertor functions for fields
for f in cls._fields_:
if f[0][0] == "_":
f_name = f[0][1:]
setattr(TdfVLA, f_name, getattr(cls, f_name))

# Create the object instance
return cast(Self, TdfVLA.from_buffer_copy(source, offset))
66 changes: 66 additions & 0 deletions src/infuse_iot/generated/tdf_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,40 @@ class tdf_struct_lte_cell_id_global(TdfStructBase):
"tac": "{}",
}

class tdf_struct_lte_cell_neighbour(TdfStructBase):
"""LTE cell ID (Global)"""

_fields_ = [
("earfcn", ctypes.c_uint32),
("pci", ctypes.c_uint16),
("_time_diff", ctypes.c_uint16),
("_rsrp", ctypes.c_uint8),
("rsrq", ctypes.c_int8),
]
_pack_ = 1
_postfix_ = {
"earfcn": "",
"pci": "",
"time_diff": "s",
"rsrp": "dBm",
"rsrq": "dB",
}
_display_fmt_ = {
"earfcn": "{}",
"pci": "{}",
"time_diff": "{}",
"rsrp": "{}",
"rsrq": "{}",
}

@property
def time_diff(self):
return self._time_diff * 0.001

@property
def rsrp(self):
return self._rsrp * -1

class tdf_struct_bt_addr_le(TdfStructBase):
"""Bluetooth address type (bt_addr_le_t)"""

Expand Down Expand Up @@ -903,6 +937,37 @@ class algorithm_class_time_series(TdfReadingBase):
"values": "{}",
}

class lte_tac_cells(TdfReadingBase):
"""Information on cells in a tracking area"""

name = "LTE_TAC_CELLS"
_fields_ = [
("cell", structs.tdf_struct_lte_cell_id_global),
("earfcn", ctypes.c_uint32),
("_rsrp", ctypes.c_uint8),
("rsrq", ctypes.c_int8),
("neighbours", 0 * structs.tdf_struct_lte_cell_neighbour),
]
_pack_ = 1
_postfix_ = {
"cell": "",
"earfcn": "",
"rsrp": "dBm",
"rsrq": "dB",
"neighbours": "",
}
_display_fmt_ = {
"cell": "{}",
"earfcn": "{}",
"rsrp": "{}",
"rsrq": "{}",
"neighbours": "{}",
}

@property
def rsrp(self):
return self._rsrp * -1

class array_type(TdfReadingBase):
"""Example array type"""

Expand Down Expand Up @@ -950,5 +1015,6 @@ class array_type(TdfReadingBase):
31: readings.bluetooth_data_throughput,
32: readings.algorithm_class_histogram,
33: readings.algorithm_class_time_series,
34: readings.lte_tac_cells,
100: readings.array_type,
}
46 changes: 35 additions & 11 deletions src/infuse_iot/tools/tdf_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)
from infuse_iot.tdf import TDF
from infuse_iot.time import InfuseTime
from infuse_iot.generated.tdf_base import TdfStructBase


class SubCommand(InfuseCommand):
Expand Down Expand Up @@ -45,24 +46,47 @@ def run(self) -> None:
for tdf in self._decoder.decode(msg.epacket.payload):
t = tdf.data[-1]
num = len(tdf.data)
tdf_name: None | str = None
time_str: None | str = None
if num > 1:
tdf_name = f"{t.name}[{num - 1}]"
else:
tdf_name = t.name
if tdf.time is not None:
if tdf.period is None:
time_str = InfuseTime.utc_time_string(tdf.time)
else:
offset = (len(tdf.data) - 1) * tdf.period
time_str = InfuseTime.utc_time_string(tdf.time + offset)
else:
time_str = InfuseTime.utc_time_string(time.time())

for idx, field in enumerate(t.iter_fields()):
if idx == 0:
if tdf.time is not None:
if tdf.period is None:
time_str = InfuseTime.utc_time_string(tdf.time)
else:
offset = (len(tdf.data) - 1) * tdf.period
time_str = InfuseTime.utc_time_string(tdf.time + offset)
for field in t.iter_fields():
if isinstance(field.val, list):
# Trailing VLA handling
if len(field.val) > 0 and isinstance(field.val[0], TdfStructBase):
for idx, val in enumerate(field.val):
for subfield in val.iter_fields(f"{field.name}[{idx}]"):
table.append(
(
time_str,
tdf_name,
subfield.name,
subfield.val_fmt(),
subfield.postfix,
)
)
tdf_name = None
time_str = None
else:
time_str = InfuseTime.utc_time_string(time.time())
table.append((time_str, tdf_name, field.name, field.val_fmt(), field.postfix))
table.append((time_str, tdf_name, f"{field.name}", field.val_fmt(), field.postfix))
tdf_name = None
time_str = None
else:
table.append((None, None, field.name, field.val_fmt(), field.postfix))
# Standard structs and sub-structs
table.append((time_str, tdf_name, field.name, field.val_fmt(), field.postfix))
tdf_name = None
time_str = None

print(f"Infuse ID: {source.infuse_id:016x}")
print(f"Interface: {source.interface.name}")
Expand Down
18 changes: 18 additions & 0 deletions src/infuse_iot/util/ctypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,23 @@ def bytes_to_uint8(b: bytes):


class VLACompatLittleEndianStruct(ctypes.LittleEndianStructure):
"""
Class to simplify working with variable length arrays.
The standard `ctypes.LittleEndianStructure` field definitions can
be extended with an optional `vla_field` class parameter, which
will be populated when constructed with `vla_from_buffer_copy`.
"""

vla_field: tuple[str, type[Any]] | None = None

@classmethod
def vla_from_buffer_copy(cls, source, offset=0) -> Self:
"""
Equivalent to `from_buffer_copy`, except if the `vla_field`
class property is not `None`, it will consume the remainder of
`source` to populate the trailing variable length array.
"""

base = cls.from_buffer_copy(source, offset)
if cls.vla_field is None:
return base
Expand Down Expand Up @@ -55,6 +68,11 @@ def vla_from_buffer_copy(cls, source, offset=0) -> Self:
return base

def iter_fields(self, prefix: str = "") -> Generator[tuple[str, Any], None, None]:
"""
Yield all fields of the class as `("name", value)`. Fields that are themselves
instances of `VLACompatLittleEndianStruct` will have their fields yielded
individually as `("name.subfield", value)`.
"""
for field_name, _field_type in self._fields_: # type: ignore
val = getattr(self, field_name)
if isinstance(val, VLACompatLittleEndianStruct):
Expand Down
Loading