Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions python/examples/ingestion_with_python_config/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from sift_py.ingestion.channel import (
bit_field_value,
bytes_value,
double_value,
enum_value,
int32_value,
Expand Down Expand Up @@ -100,6 +101,10 @@ def run(self):
random.choice(self.sample_bit_field_values)
),
},
{
"channel_name": "raw_bin",
"value": bytes_value(str(timestamp).encode("utf-8")),
},
],
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ def nostromos_lv_426() -> TelemetryConfig:
ChannelBitFieldElement(name="heater", index=7, bit_count=1),
],
)
raw_binary_channel = ChannelConfig(
name="raw_bin",
data_type=ChannelDataType.BYTES,
description="Example of binary encoded data (binary string encoding of time in seconds)",
)

return TelemetryConfig(
asset_name="NostromoLV426",
Expand All @@ -66,6 +71,7 @@ def nostromos_lv_426() -> TelemetryConfig:
voltage_channel,
vehicle_state_channel,
gpio_channel,
raw_binary_channel,
],
),
FlowConfig(
Expand Down
7 changes: 6 additions & 1 deletion python/examples/ingestion_with_yaml_config/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from sift_py.ingestion.channel import (
bit_field_value,
bytes_value,
double_value,
enum_value,
int32_value,
Expand Down Expand Up @@ -39,7 +40,7 @@ def __init__(self, ingestion_service: IngestionService):
sample_bit_field_values = ["00001001", "00100011", "00001101", "11000001"]
self.sample_bit_field_values = [bytes([int(byte, 2)]) for byte in sample_bit_field_values]

sample_logs = Path().joinpath("sample_data").joinpath("sample_logs.txt")
sample_logs = Path(__file__).parent.joinpath("sample_data").joinpath("sample_logs.txt")

with open(sample_logs, "r") as file:
self.sample_logs = file.readlines()
Expand Down Expand Up @@ -100,6 +101,10 @@ def run(self):
random.choice(self.sample_bit_field_values)
),
},
{
"channel_name": "raw_bin",
"value": bytes_value(str(timestamp).encode("utf-8")),
},
],
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

from sift_py.ingestion.service import TelemetryConfig

TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs")
TELEMETRY_CONFIGS_DIR = Path(__file__).parent.joinpath("telemetry_configs")


def nostromos_lv_426() -> TelemetryConfig:
telemetry_config_name = os.getenv("TELEMETRY_CONFIG")
telemetry_config_name = os.getenv("TELEMETRY_CONFIG", "nostromo_lv_426.yml")

if telemetry_config_name is None:
raise Exception("Missing 'TELEMETRY_CONFIG' environment variable.")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
asset_name: NostromoLV426
ingestion_client_key: nostromo_lv_426

channels:
log_channel: &log_channel
Expand Down Expand Up @@ -51,13 +50,19 @@ channels:
index: 7
bit_count: 1

raw_binary_channel: &raw_binary_channel
name: raw_bin
data_type: bytes
description: Example of binary encoded data (binary string encoding of time in seconds)

flows:
- name: readings
channels:
- <<: *velocity_channel
- <<: *voltage_channel
- <<: *vehicle_state_channel
- <<: *gpio_channel
- <<: *raw_binary_channel

- name: voltage
channels:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ async def get_channel_data(
end_time: datetime | None = None,
limit: int | None = None,
ignore_cache: bool = False,
):
) -> dict[str, pd.DataFrame]:
"""
Get the data for a channel during a run.
"""
Expand Down
4 changes: 2 additions & 2 deletions python/lib/sift_client/resources/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from datetime import datetime
from typing import TYPE_CHECKING, Dict, List

import numpy as np
import pandas as pd
import pyarrow as pa

from sift_client._internal.low_level_wrappers.channels import ChannelsLowLevelClient
Expand Down Expand Up @@ -176,7 +176,7 @@ async def get_data(
start_time: datetime | None = None,
end_time: datetime | None = None,
limit: int | None = None,
) -> Dict[str, np.ndarray]:
) -> Dict[str, pd.DataFrame]:
"""
Get data for one or more channels.

Expand Down
4 changes: 2 additions & 2 deletions python/lib/sift_client/resources/sync_stubs/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import re
from datetime import datetime
from typing import Any, Dict, List

import numpy as np
import pandas as pd
import pyarrow as pa

from sift_client.client import SiftClient
Expand Down Expand Up @@ -428,7 +428,7 @@ class ChannelsAPI:
start_time: datetime | None = None,
end_time: datetime | None = None,
limit: int | None = None,
) -> Dict[str, np.ndarray]:
) -> Dict[str, pd.DataFrame]:
"""
Get data for one or more channels.

Expand Down
10 changes: 8 additions & 2 deletions python/lib/sift_client/types/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from sift.data.v2.data_pb2 import (
BitFieldValues,
BoolValues,
BytesValues,
DoubleValues,
EnumValues,
FloatValues,
Expand Down Expand Up @@ -45,6 +46,7 @@ class ChannelDataType(Enum):
INT_64 = channel_pb.CHANNEL_DATA_TYPE_INT_64
UINT_32 = channel_pb.CHANNEL_DATA_TYPE_UINT_32
UINT_64 = channel_pb.CHANNEL_DATA_TYPE_UINT_64
BYTES = channel_pb.CHANNEL_DATA_TYPE_BYTES

def __str__(self) -> str:
ret = self.name.lower()
Expand Down Expand Up @@ -78,7 +80,7 @@ def from_str(raw: str) -> Optional["ChannelDataType"]:
if item.__str__() == val:
return item
raise Exception(
"Unreachable. ChannelTypeUrls and ChannelDataType enum names are out of sync."
f"{raw} type not found. ChannelTypeUrls and ChannelDataType enum names are out of sync."
)
else:
try:
Expand Down Expand Up @@ -111,6 +113,8 @@ def proto_data_class(data_type: ChannelDataType) -> Any:
return Uint32Values
elif data_type == ChannelDataType.UINT_64:
return Uint64Values
elif data_type == ChannelDataType.BYTES:
return BytesValues
else:
raise ValueError(f"Unknown data type: {data_type}")

Expand Down Expand Up @@ -138,6 +142,8 @@ def hash_str(self, api_format: bool = False) -> str:
return "CHANNEL_DATA_TYPE_UINT_32" if api_format else ChannelDataType.UINT_32.__str__()
elif self == ChannelDataType.UINT_64:
return "CHANNEL_DATA_TYPE_UINT_64" if api_format else ChannelDataType.UINT_64.__str__()
elif self == ChannelDataType.BYTES:
return "CHANNEL_DATA_TYPE_BYTES" if api_format else ChannelDataType.BYTES.__str__()
else:
raise Exception("Unreachable.")

Expand Down Expand Up @@ -249,7 +255,7 @@ def data(
limit: The maximum number of data points to return.

Returns:
A ChannelTimeSeries object.
A dict of channel name to pandas DataFrame or Arrow Table object.
"""
if as_arrow:
data = self.client.channels.get_data_as_arrow(
Expand Down
23 changes: 20 additions & 3 deletions python/lib/sift_py/ingestion/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __init__(
self.identifier = self.fqn()

def value_from(
self, value: Optional[Union[int, float, bool, str]]
self, value: Optional[Union[int, float, bool, str, bytes]]
) -> Optional[IngestWithConfigDataChannelValue]:
"""
Like `try_value_from` except will return `None` there is a failure to produce a channel value due to a type mismatch.
Expand All @@ -82,7 +82,7 @@ def value_from(
return None

def try_value_from(
self, value: Optional[Union[int, float, bool, str]]
self, value: Optional[Union[int, float, bool, str, bytes]]
) -> IngestWithConfigDataChannelValue:
"""
Generate a channel value for this particular channel configuration. This will raise an exception
Expand Down Expand Up @@ -112,7 +112,8 @@ def try_value_from(
return enum_value(int(value))
elif isinstance(value, str) and self.data_type == ChannelDataType.STRING:
return string_value(value)

elif isinstance(value, bytes) and self.data_type == ChannelDataType.BYTES:
return bytes_value(value)
raise ValueError(f"Failed to cast value of type {type(value)} to {self.data_type}")

def as_pb(self, klass: Type[ChannelConfigPb]) -> ChannelConfigPb:
Expand Down Expand Up @@ -209,6 +210,7 @@ class ChannelDataTypeStrRep(Enum):
INT_64 = "int64"
UINT_32 = "uint32"
UINT_64 = "uint64"
BYTES = "bytes"

@staticmethod
def from_api_format(val: str) -> Optional["ChannelDataTypeStrRep"]:
Expand All @@ -224,6 +226,7 @@ def from_api_format(val: str) -> Optional["ChannelDataTypeStrRep"]:
"CHANNEL_DATA_TYPE_INT_64": ChannelDataTypeStrRep.INT_64,
"CHANNEL_DATA_TYPE_UINT_32": ChannelDataTypeStrRep.UINT_32,
"CHANNEL_DATA_TYPE_UINT_64": ChannelDataTypeStrRep.UINT_64,
"CHANNEL_DATA_TYPE_BYTES": ChannelDataTypeStrRep.BYTES,
}[val]
except KeyError:
return None
Expand All @@ -244,6 +247,7 @@ class ChannelDataType(Enum):
INT_64 = channel_pb.CHANNEL_DATA_TYPE_INT_64
UINT_32 = channel_pb.CHANNEL_DATA_TYPE_UINT_32
UINT_64 = channel_pb.CHANNEL_DATA_TYPE_UINT_64
BYTES = channel_pb.CHANNEL_DATA_TYPE_BYTES

@classmethod
def from_pb(cls, val: channel_pb.ChannelDataType.ValueType) -> "ChannelDataType":
Expand All @@ -267,6 +271,8 @@ def from_pb(cls, val: channel_pb.ChannelDataType.ValueType) -> "ChannelDataType"
return cls.UINT_32
elif val == cls.UINT_64.value:
return cls.UINT_64
elif val == cls.BYTES.value:
return cls.BYTES
else:
raise ValueError(f"Unknown channel data type '{val}'.")

Expand Down Expand Up @@ -302,6 +308,8 @@ def from_str(cls, raw: str) -> Optional["ChannelDataType"]:
return cls.UINT_32
elif val == ChannelDataTypeStrRep.UINT_64:
return cls.UINT_64
elif val == ChannelDataTypeStrRep.BYTES:
return cls.BYTES
else:
raise Exception("Unreachable")

Expand Down Expand Up @@ -334,6 +342,8 @@ def as_human_str(self, api_format: bool = False) -> str:
return (
"CHANNEL_DATA_TYPE_UINT_64" if api_format else ChannelDataTypeStrRep.UINT_64.value
)
elif self == ChannelDataType.BYTES:
return "CHANNEL_DATA_TYPE_BYTES" if api_format else ChannelDataTypeStrRep.BYTES.value
else:
raise Exception("Unreachable.")

Expand Down Expand Up @@ -421,6 +431,10 @@ def empty_value() -> IngestWithConfigDataChannelValue:
return IngestWithConfigDataChannelValue(empty=Empty())


def bytes_value(val: bytes) -> IngestWithConfigDataChannelValue:
return IngestWithConfigDataChannelValue(bytes=val)


def is_data_type(val: IngestWithConfigDataChannelValue, target_type: ChannelDataType) -> bool:
if target_type == ChannelDataType.DOUBLE:
return val.HasField("double")
Expand All @@ -442,3 +456,6 @@ def is_data_type(val: IngestWithConfigDataChannelValue, target_type: ChannelData
return val.HasField("uint32")
elif target_type == ChannelDataType.UINT_64:
return val.HasField("uint64")
elif target_type == ChannelDataType.BYTES:
return val.HasField("bytes")
raise ValueError(f"Unknown channel data type '{target_type}'.")
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "sift_stack_py"
version = "0.8.4"
version = "0.8.5"
description = "Python client library for the Sift API"
requires-python = ">=3.8"
readme = { file = "README.md", content-type = "text/markdown" }
Expand Down
Loading