Skip to content

Commit b4f169e

Browse files
fix a flake8 linting error, add overlooked test_csapi_serialization.py file to git
1 parent db99d5d commit b4f169e

2 files changed

Lines changed: 338 additions & 2 deletions

File tree

scripts/publish-local.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,9 @@ def main() -> int:
145145
print(f" Browse: {PYPI_URL}/simple/")
146146
print(f" Install: pip install --index-url {PYPI_URL}/simple/ oshconnect")
147147
print(f" uv: uv pip install --index-url {PYPI_URL}/simple/ oshconnect")
148-
print(f" uv sync: uv sync (if pyproject.toml has [[tool.uv.index]] configured)")
148+
print(" uv sync: uv sync (if pyproject.toml has [[tool.uv.index]] configured)")
149149
return 0
150150

151151

152152
if __name__ == "__main__":
153-
sys.exit(main())
153+
sys.exit(main())

tests/test_csapi_serialization.py

Lines changed: 336 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,336 @@
1+
"""OGC standard-format (de)serialization for OSHConnect resources.
2+
3+
Three layers per wrapper class:
4+
5+
- Resource representation (System: SML+JSON / GeoJSON;
6+
Datastream and ControlStream: application/json).
7+
- Schema document (Datastream: SWE+JSON / OM+JSON;
8+
ControlStream: SWE+JSON / JSON).
9+
- Single record (one observation or one command).
10+
11+
Tests are organized in those sections plus a generic "no behavior drift"
12+
guard that confirms the new convenience methods produce the same output
13+
as a raw `model_dump(by_alias=True, exclude_none=True, mode='json')`.
14+
"""
15+
from __future__ import annotations
16+
17+
import json
18+
from pathlib import Path
19+
20+
import pytest
21+
from pydantic import ValidationError
22+
23+
from oshconnect import Node
24+
from oshconnect.resource_datamodels import (
25+
ControlStreamResource,
26+
DatastreamResource,
27+
ObservationResource,
28+
SystemResource,
29+
)
30+
from oshconnect.schema_datamodels import (
31+
CommandJSON,
32+
JSONCommandSchema,
33+
JSONDatastreamRecordSchema,
34+
ObservationOMJSONInline,
35+
SWEDatastreamRecordSchema,
36+
SWEJSONCommandSchema,
37+
)
38+
from oshconnect.streamableresource import ControlStream, Datastream, System
39+
from oshconnect.timemanagement import TimeInstant, TimePeriod
40+
41+
FIXTURES_DIR = Path(__file__).parent / "fixtures"
42+
43+
44+
@pytest.fixture
45+
def node() -> Node:
46+
return Node(protocol="http", address="localhost", port=8282)
47+
48+
49+
# ===========================================================================
50+
# System: SML+JSON, GeoJSON
51+
# ===========================================================================
52+
53+
def test_system_resource_to_smljson_round_trips():
54+
src = SystemResource(uid="urn:test:s1", label="S1", feature_type="PhysicalSystem")
55+
dumped = src.to_smljson_dict()
56+
assert dumped["type"] == "PhysicalSystem"
57+
assert dumped["uniqueId"] == "urn:test:s1"
58+
rebuilt = SystemResource.from_smljson_dict(dumped)
59+
assert rebuilt.uid == "urn:test:s1"
60+
61+
62+
def test_system_resource_to_geojson_round_trips():
63+
src = SystemResource(
64+
uid="urn:test:s1", label="S1", feature_type="Feature",
65+
properties={"name": "S1", "uid": "urn:test:s1"},
66+
)
67+
dumped = src.to_geojson_dict()
68+
assert dumped["type"] == "Feature"
69+
rebuilt = SystemResource.from_geojson_dict(dumped)
70+
assert rebuilt.uid == "urn:test:s1"
71+
72+
73+
def test_system_resource_from_csapi_autodetects_smljson():
74+
payload = {"type": "PhysicalSystem", "uniqueId": "urn:test:auto",
75+
"label": "Auto"}
76+
res = SystemResource.from_csapi_dict(payload)
77+
assert res.feature_type == "PhysicalSystem"
78+
assert res.uid == "urn:test:auto"
79+
80+
81+
def test_system_resource_from_csapi_autodetects_geojson():
82+
payload = {"type": "Feature", "properties": {"name": "Auto",
83+
"uid": "urn:test:auto"}}
84+
res = SystemResource.from_csapi_dict(payload)
85+
assert res.feature_type == "Feature"
86+
assert res.properties["uid"] == "urn:test:auto"
87+
88+
89+
def test_system_smljson_fixture_round_trips():
90+
raw = json.loads((FIXTURES_DIR / "fake_weather_system_smljson.json").read_text())
91+
res = SystemResource.from_smljson_dict(raw)
92+
assert res.feature_type == "PhysicalSystem"
93+
assert res.uid == "urn:osh:sensor:fakeweather:001"
94+
re_dumped = res.to_smljson_dict()
95+
# Required SML fields preserved
96+
for key in ("type", "uniqueId", "label", "definition"):
97+
assert key in re_dumped
98+
99+
100+
def test_system_wrapper_from_smljson_dict_builds_attached_to_node(node):
101+
raw = json.loads((FIXTURES_DIR / "fake_weather_system_smljson.json").read_text())
102+
sys = System.from_smljson_dict(raw, node)
103+
assert isinstance(sys, System)
104+
assert sys.urn == "urn:osh:sensor:fakeweather:001"
105+
assert sys.get_parent_node() is node
106+
107+
108+
def test_system_wrapper_from_csapi_dict_dispatches_on_type(node):
109+
raw_sml = json.loads((FIXTURES_DIR / "fake_weather_system_smljson.json").read_text())
110+
raw_geo = {"type": "Feature", "id": "geo-1",
111+
"properties": {"name": "GeoSys", "uid": "urn:test:geo"}}
112+
sys_sml = System.from_csapi_dict(raw_sml, node)
113+
sys_geo = System.from_csapi_dict(raw_geo, node)
114+
assert sys_sml.urn == "urn:osh:sensor:fakeweather:001"
115+
assert sys_geo.urn == "urn:test:geo"
116+
117+
118+
# ===========================================================================
119+
# Datastream: resource representation, schema document, observations
120+
# ===========================================================================
121+
122+
def _datastream_resource_from_swejson_fixture() -> DatastreamResource:
123+
raw = json.loads((FIXTURES_DIR / "fake_weather_schema_swejson.json").read_text())
124+
schema = SWEDatastreamRecordSchema.from_swejson_dict(raw)
125+
return DatastreamResource(
126+
ds_id="ds-001", name="weather",
127+
valid_time=TimePeriod(start="2025-01-01T00:00:00Z",
128+
end="2099-12-31T00:00:00Z"),
129+
record_schema=schema,
130+
)
131+
132+
133+
def test_datastream_resource_round_trips():
134+
src = _datastream_resource_from_swejson_fixture()
135+
dumped = src.to_csapi_dict()
136+
assert dumped["id"] == "ds-001"
137+
assert dumped["schema"]["obsFormat"] == "application/swe+json"
138+
rebuilt = DatastreamResource.from_csapi_dict(dumped)
139+
assert rebuilt.ds_id == "ds-001"
140+
141+
142+
def test_datastream_schema_to_swejson_dict_matches_fixture(node):
143+
raw = json.loads((FIXTURES_DIR / "fake_weather_schema_swejson.json").read_text())
144+
schema = SWEDatastreamRecordSchema.from_swejson_dict(raw)
145+
ds_resource = DatastreamResource(
146+
ds_id="ds-1", name="w",
147+
valid_time=TimePeriod(start="2025-01-01T00:00:00Z",
148+
end="2099-12-31T00:00:00Z"),
149+
record_schema=schema,
150+
)
151+
ds = Datastream(parent_node=node, datastream_resource=ds_resource)
152+
out = ds.schema_to_swejson_dict()
153+
assert out["obsFormat"] == "application/swe+json"
154+
assert out["recordSchema"]["name"] == "weather"
155+
156+
157+
def test_datastream_schema_to_omjson_dict_matches_fixture(node):
158+
raw = json.loads((FIXTURES_DIR / "fake_weather_schema_omjson.json").read_text())
159+
schema = JSONDatastreamRecordSchema.from_omjson_dict(raw)
160+
ds_resource = DatastreamResource(
161+
ds_id="ds-1", name="w",
162+
valid_time=TimePeriod(start="2025-01-01T00:00:00Z",
163+
end="2099-12-31T00:00:00Z"),
164+
record_schema=schema,
165+
)
166+
ds = Datastream(parent_node=node, datastream_resource=ds_resource)
167+
out = ds.schema_to_omjson_dict()
168+
assert out["obsFormat"] == "application/om+json"
169+
assert out["resultSchema"]["name"] == "weather"
170+
171+
172+
def test_datastream_schema_methods_reject_wrong_variant(node):
173+
raw = json.loads((FIXTURES_DIR / "fake_weather_schema_swejson.json").read_text())
174+
schema = SWEDatastreamRecordSchema.from_swejson_dict(raw)
175+
ds = Datastream(parent_node=node, datastream_resource=DatastreamResource(
176+
ds_id="ds-1", name="w",
177+
valid_time=TimePeriod(start="2025-01-01T00:00:00Z",
178+
end="2099-12-31T00:00:00Z"),
179+
record_schema=schema,
180+
))
181+
with pytest.raises(TypeError, match="OM\\+JSON"):
182+
ds.schema_to_omjson_dict()
183+
184+
185+
def test_observation_to_omjson_round_trips():
186+
src_time = TimeInstant.from_string("2025-06-01T12:00:00Z")
187+
obs = ObservationResource(
188+
result={"temperature": 22.5},
189+
result_time=src_time,
190+
)
191+
dumped = obs.to_omjson_dict(datastream_id="ds-1")
192+
assert dumped["datastream@id"] == "ds-1"
193+
assert dumped["result"] == {"temperature": 22.5}
194+
# resultTime is rendered via TimeUtils.time_to_iso (microsecond ISO 8601 with Z).
195+
assert dumped["resultTime"].startswith("2025-06-01T12:00:00")
196+
assert dumped["resultTime"].endswith("Z")
197+
rebuilt = ObservationResource.from_omjson_dict(dumped)
198+
assert rebuilt.result == {"temperature": 22.5}
199+
assert rebuilt.result_time.epoch_time == src_time.epoch_time
200+
201+
202+
def test_observation_to_swejson_round_trips():
203+
obs = ObservationResource(
204+
result={"time": "2025-06-01T12:00:00Z", "temperature": 22.5},
205+
result_time=TimeInstant.from_string("2025-06-01T12:00:00Z"),
206+
)
207+
payload = obs.to_swejson_dict()
208+
assert payload == {"time": "2025-06-01T12:00:00Z", "temperature": 22.5}
209+
rebuilt = ObservationResource.from_swejson_dict(
210+
payload, result_time="2025-06-01T12:00:00Z"
211+
)
212+
assert rebuilt.result == payload
213+
214+
215+
def test_datastream_observation_methods_attach_datastream_id(node):
216+
ds_resource = DatastreamResource(
217+
ds_id="ds-99", name="w",
218+
valid_time=TimePeriod(start="2025-01-01T00:00:00Z",
219+
end="2099-12-31T00:00:00Z"),
220+
)
221+
ds = Datastream(parent_node=node, datastream_resource=ds_resource)
222+
payload = ds.observation_to_omjson_dict({"temperature": 22.5})
223+
assert payload["datastream@id"] == "ds-99"
224+
225+
226+
# ===========================================================================
227+
# ControlStream: resource representation, schema, commands
228+
# ===========================================================================
229+
230+
def _controlstream_resource_with_json_schema() -> ControlStreamResource:
231+
schema = JSONCommandSchema.from_json_dict({
232+
"commandFormat": "application/json",
233+
"parametersSchema": {
234+
"type": "DataRecord", "name": "params",
235+
"fields": [{
236+
"type": "Quantity", "name": "speed", "label": "Speed",
237+
"definition": "http://example.org/speed", "uom": {"code": "m/s"},
238+
}],
239+
},
240+
})
241+
return ControlStreamResource(
242+
cs_id="cs-001", name="motor", input_name="motor",
243+
valid_time=TimePeriod(start="2025-01-01T00:00:00Z",
244+
end="2099-12-31T00:00:00Z"),
245+
command_schema=schema,
246+
)
247+
248+
249+
def test_controlstream_resource_round_trips():
250+
src = _controlstream_resource_with_json_schema()
251+
dumped = src.to_csapi_dict()
252+
assert dumped["id"] == "cs-001"
253+
assert dumped["schema"]["commandFormat"] == "application/json"
254+
rebuilt = ControlStreamResource.from_csapi_dict(dumped)
255+
assert rebuilt.cs_id == "cs-001"
256+
257+
258+
def test_controlstream_schema_to_json_dict(node):
259+
cs_resource = _controlstream_resource_with_json_schema()
260+
cs = ControlStream(node=node, controlstream_resource=cs_resource)
261+
out = cs.schema_to_json_dict()
262+
assert out["commandFormat"] == "application/json"
263+
assert out["parametersSchema"]["name"] == "params"
264+
265+
266+
def test_controlstream_schema_methods_reject_wrong_variant(node):
267+
cs_resource = _controlstream_resource_with_json_schema()
268+
cs = ControlStream(node=node, controlstream_resource=cs_resource)
269+
with pytest.raises(TypeError, match="SWE\\+JSON"):
270+
cs.schema_to_swejson_dict()
271+
272+
273+
def test_controlstream_command_to_json_dict(node):
274+
cs_resource = _controlstream_resource_with_json_schema()
275+
cs = ControlStream(node=node, controlstream_resource=cs_resource)
276+
out = cs.command_to_json_dict({"speed": 1.5}, sender="tester")
277+
assert out["control@id"] == "cs-001"
278+
assert out["sender"] == "tester"
279+
assert out["params"] == {"speed": 1.5}
280+
281+
282+
def test_controlstream_command_to_swejson_round_trips(node):
283+
cs_resource = _controlstream_resource_with_json_schema()
284+
cs = ControlStream(node=node, controlstream_resource=cs_resource)
285+
payload = cs.command_to_swejson_dict({"speed": 1.5})
286+
assert payload == {"speed": 1.5}
287+
rebuilt = ControlStream.command_from_swejson_dict(payload)
288+
assert rebuilt == payload
289+
290+
291+
def test_command_json_round_trips():
292+
src = CommandJSON(control_id="cs-1", sender="me", params={"x": 1})
293+
dumped = src.to_csapi_dict()
294+
assert dumped["control@id"] == "cs-1"
295+
rebuilt = CommandJSON.from_csapi_dict(dumped)
296+
assert rebuilt.params == {"x": 1}
297+
298+
299+
# ===========================================================================
300+
# Generic: no behavior drift from raw model_dump
301+
# ===========================================================================
302+
303+
@pytest.mark.parametrize("build,method", [
304+
(lambda: SystemResource(uid="urn:test:1", label="X", feature_type="PhysicalSystem"),
305+
"to_smljson_dict"),
306+
(lambda: _datastream_resource_from_swejson_fixture(), "to_csapi_dict"),
307+
(lambda: _controlstream_resource_with_json_schema(), "to_csapi_dict"),
308+
])
309+
def test_resource_to_csapi_matches_raw_model_dump(build, method):
310+
instance = build()
311+
new_way = getattr(instance, method)()
312+
raw_way = instance.model_dump(by_alias=True, exclude_none=True, mode='json')
313+
assert new_way == raw_way
314+
315+
316+
# ===========================================================================
317+
# Deprecation warnings on the old factories
318+
# ===========================================================================
319+
320+
def test_system_from_system_resource_emits_deprecation_warning(node):
321+
raw = json.loads((FIXTURES_DIR / "fake_weather_system_smljson.json").read_text())
322+
res = SystemResource.from_smljson_dict(raw)
323+
with pytest.warns(DeprecationWarning, match="from_csapi_dict"):
324+
sys = System.from_system_resource(res, node)
325+
assert sys.urn == "urn:osh:sensor:fakeweather:001"
326+
327+
328+
def test_datastream_from_resource_emits_deprecation_warning(node):
329+
ds_resource = DatastreamResource(
330+
ds_id="ds-1", name="w",
331+
valid_time=TimePeriod(start="2025-01-01T00:00:00Z",
332+
end="2099-12-31T00:00:00Z"),
333+
)
334+
with pytest.warns(DeprecationWarning, match="from_csapi_dict"):
335+
ds = Datastream.from_resource(ds_resource, node)
336+
assert ds.get_id() == "ds-1"

0 commit comments

Comments
 (0)