Skip to content

Commit f50f01d

Browse files
implement fixes to controlstream discovery and insertion as well as add networked tests to verify that commands can be sent based off source schemas and maintain coherence across the wire.
1 parent 4e25daf commit f50f01d

9 files changed

Lines changed: 745 additions & 62 deletions

docs/source/architecture/construction.md

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -173,30 +173,46 @@ warning so it doesn't poison the rest of the discovery; that
173173
datastream's `record_schema` stays `None`.
174174

175175
For datastreams built locally (no discovery), or when you need the
176-
OM+JSON or logical variant, `Datastream` has three dedicated fetch
177-
methods — one per `obsFormat` the server supports. Each returns a
178-
typed schema model:
176+
OM+JSON or logical variant, hit the schema endpoint directly through
177+
the parent `Node`'s `APIHelper` and parse with the matching schema
178+
model:
179179

180180
```python
181-
ds = Datastream(parent_node=node, datastream_resource=DatastreamResource.from_csapi_dict(server_response))
181+
from oshconnect.csapi4py.constants import APIResourceTypes
182+
from oshconnect.schema_datamodels import (
183+
SWEDatastreamRecordSchema,
184+
OMJSONDatastreamRecordSchema,
185+
LogicalDatastreamRecordSchema,
186+
)
187+
188+
api = node.get_api_helper()
189+
ds_id = ds._underlying_resource.ds_id
182190

183-
# Wire-format schemas (CS API spec)
184-
sw = ds.fetch_swejson_schema() # -> SWEDatastreamRecordSchema (application/swe+json)
185-
om = ds.fetch_omjson_schema() # -> OMJSONDatastreamRecordSchema (application/om+json)
191+
# SWE+JSON (CS API spec)
192+
sw_resp = api.get_resource(APIResourceTypes.DATASTREAM, ds_id,
193+
APIResourceTypes.SCHEMA,
194+
params={'obsFormat': 'application/swe+json'})
195+
sw = SWEDatastreamRecordSchema.from_swejson_dict(sw_resp.json())
196+
197+
# OM+JSON (CS API spec)
198+
om_resp = api.get_resource(APIResourceTypes.DATASTREAM, ds_id,
199+
APIResourceTypes.SCHEMA,
200+
params={'obsFormat': 'application/om+json'})
201+
om = OMJSONDatastreamRecordSchema.from_omjson_dict(om_resp.json())
186202

187203
# OSH-specific JSON Schema flavor
188-
lg = ds.fetch_logical_schema() # -> LogicalDatastreamRecordSchema (obsFormat=logical)
204+
lg_resp = api.get_resource(APIResourceTypes.DATASTREAM, ds_id,
205+
APIResourceTypes.SCHEMA,
206+
params={'obsFormat': 'logical'})
207+
lg = LogicalDatastreamRecordSchema.from_logical_dict(lg_resp.json())
189208
```
190209

191-
Each method:
192-
193-
1. Hits ``GET /datastreams/{id}/schema?obsFormat={format}`` using the
194-
parent `Node`'s `APIHelper` for base URL + auth.
195-
2. Parses the response into the corresponding pydantic model.
196-
3. Returns the parsed model — does *not* mutate the datastream's
197-
`_underlying_resource.record_schema`. (Discovery is the one place
198-
that opts into caching the SWE+JSON variant; if you want to cache
199-
an OM+JSON or logical fetch, assign it yourself.)
210+
`api.get_resource(...)` returns a `requests.Response`; the
211+
`from_*_dict` classmethods on each schema model parse it into the
212+
typed pydantic class. None of these calls mutate the datastream's
213+
`_underlying_resource.record_schema` — only `discover_datastreams`
214+
populates that, and only with the SWE+JSON variant. If you want to
215+
cache an OM+JSON or logical fetch, assign it yourself.
200216

201217
The **logical schema** is OSH-specific (not in the OGC CS API spec):
202218
a JSON Schema document with OGC extension keywords

docs/source/architecture/serialization.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,10 @@ A third schema model, `LogicalDatastreamRecordSchema`, covers OSH's
124124
extension keywords (`x-ogc-definition`, `x-ogc-refFrame`, `x-ogc-unit`,
125125
`x-ogc-axis`) carrying SWE Common metadata. Distinct from the SWE+JSON
126126
and OM+JSON envelopes (no `obsFormat` field, no `recordSchema`
127-
wrapper). See [Construction → "I want the schema for an existing
128-
datastream from the server"](construction.md) for the
129-
`Datastream.fetch_logical_schema()` method that retrieves it.
127+
wrapper). To retrieve it, use the per-`Node` `APIHelper`:
128+
`api.get_resource(APIResourceTypes.DATASTREAM, ds_id, APIResourceTypes.SCHEMA, params={'obsFormat': 'logical'})`,
129+
then parse the response with
130+
`LogicalDatastreamRecordSchema.from_logical_dict(...)`.
130131

131132
## Deprecated factories
132133

docs/source/tutorial.rst

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,27 @@ Discover all datastreams across all discovered systems:
139139
140140
app.discover_datastreams()
141141
142+
Each discovered ``Datastream`` arrives with its SWE+JSON record schema
143+
already cached on ``ds._underlying_resource.record_schema`` — discovery
144+
makes a follow-up ``GET /datastreams/{id}/schema`` per stream so callers
145+
that build observations don't need a second round trip.
146+
147+
Discover control streams the same way, per system:
148+
149+
.. code-block:: python
150+
151+
for system in node.get_systems():
152+
control_streams = system.discover_controlstreams()
153+
for cs in control_streams:
154+
print(cs.get_id(), cs._underlying_resource.input_name)
155+
156+
Discovered control streams arrive with their command schema cached on
157+
``cs._underlying_resource.command_schema`` (a ``JSONCommandSchema`` —
158+
OSH normalizes responses to the JSON envelope). Reach the inner SWE
159+
Common component via ``cs._underlying_resource.command_schema.params_schema``;
160+
its ``items`` (for ``DataChoice``) or ``fields`` (for ``DataRecord``)
161+
list the parameters the stream accepts.
162+
142163

143164
Streaming Observations (MQTT)
144165
------------------------------
@@ -239,6 +260,146 @@ Build a schema using SWE Common component classes, then attach it to a system:
239260
A ``TimeSchema`` must be the first field in the ``DataRecordSchema`` when targeting OpenSensorHub.
240261

241262

263+
Inserting a New Control Stream
264+
------------------------------
265+
A control stream is the input counterpart to a datastream — it accepts
266+
commands and emits status reports. Build a ``DataRecordSchema``
267+
describing the command structure, then attach it to a system via
268+
``System.add_and_insert_control_stream(...)``:
269+
270+
.. code-block:: python
271+
272+
from oshconnect import DataRecordSchema, BooleanSchema, CountSchema
273+
274+
command_record = DataRecordSchema(
275+
name='counterControl',
276+
label='Counter Control',
277+
description='Commands to control the counter behavior',
278+
fields=[
279+
BooleanSchema(name='setCountDown', label='Set Count Down',
280+
definition='http://sensorml.com/ont/swe/property/SetCountDown'),
281+
CountSchema(name='setStep', label='Set Step',
282+
definition='http://sensorml.com/ont/swe/property/SetStep'),
283+
],
284+
)
285+
286+
control_stream = new_system.add_and_insert_control_stream(command_record)
287+
288+
By default the wire form is ``application/swe+json`` (spec-compliant CS API
289+
Part 2 — ``commandFormat: "application/swe+json"`` plus ``recordSchema`` plus
290+
a ``JSONEncoding`` block). To target the JSON envelope instead (which is
291+
what OSH echoes back from ``/controlstreams/{id}/schema``), pass
292+
``command_format='application/json'``:
293+
294+
.. code-block:: python
295+
296+
control_stream = new_system.add_and_insert_control_stream(
297+
command_record,
298+
command_format='application/json',
299+
)
300+
301+
The JSON form emits ``commandFormat: "application/json"`` with a
302+
``parametersSchema`` block (no ``encoding``).
303+
304+
For full control over the resource body — for example, when copying a
305+
control stream from one node to another and you already have a
306+
``ControlStreamResource`` in hand — use ``add_insert_controlstream(...)``
307+
instead. It takes a fully-built resource and POSTs it as-is:
308+
309+
.. code-block:: python
310+
311+
from oshconnect.resource_datamodels import ControlStreamResource
312+
from oshconnect.schema_datamodels import JSONCommandSchema
313+
314+
resource = ControlStreamResource(
315+
name='Counter Control',
316+
input_name='counterControl',
317+
command_schema=JSONCommandSchema(
318+
command_format='application/json',
319+
params_schema=command_record,
320+
),
321+
)
322+
control_stream = new_system.add_insert_controlstream(resource)
323+
324+
After insert, the returned ``ControlStream`` carries the server-assigned
325+
ID (``control_stream.get_id()``) and is appended to ``new_system.control_channels``.
326+
327+
328+
Sending Commands
329+
----------------
330+
A control stream is the input side of a system. Once you have one — either
331+
freshly inserted or reconstructed from ``System.discover_controlstreams()`` —
332+
there are two ways to deliver a command:
333+
334+
**Over MQTT (preferred for real-time control).** Initialize the stream's
335+
MQTT client, then publish to the command topic:
336+
337+
.. code-block:: python
338+
339+
from oshconnect import StreamableModes
340+
341+
control_stream.set_connection_mode(StreamableModes.BIDIRECTIONAL)
342+
control_stream.initialize()
343+
control_stream.start()
344+
345+
control_stream.publish_command({
346+
'params': {'setStep': 5},
347+
})
348+
349+
``publish_command(payload)`` is sugar for ``publish(payload, topic='command')``;
350+
it routes to the CS API Part 3 ``:commands`` topic for this stream
351+
(``…/controlstreams/{id}/commands``). The payload shape is whatever the
352+
control stream's command schema accepts — a dict matching the field names
353+
under ``params``, or a SWE+JSON envelope if the stream uses the SWE form.
354+
355+
**Over HTTP (stateless, one-shot).** POST a command directly to the
356+
``/controlstreams/{id}/commands`` endpoint via the node's
357+
``APIHelper``:
358+
359+
.. code-block:: python
360+
361+
from oshconnect.csapi4py.constants import APIResourceTypes
362+
from oshconnect.schema_datamodels import CommandJSON
363+
364+
command = CommandJSON(params={'setStep': 5})
365+
api = node.get_api_helper()
366+
resp = api.create_resource(
367+
APIResourceTypes.COMMAND,
368+
command.to_csapi_dict(),
369+
parent_res_id=control_stream.get_id(),
370+
req_headers={'Content-Type': 'application/json'},
371+
)
372+
resp.raise_for_status()
373+
command_id = resp.headers['Location'].rsplit('/', 1)[-1]
374+
375+
The server responds with ``201 Created`` and a ``Location`` header pointing
376+
at the newly-created command resource (``/commands/{id}``); poll its
377+
``/status`` sub-resource (or subscribe to the MQTT status topic — next
378+
section) to see whether the system accepted and executed it.
379+
380+
Subscribing to Command Status
381+
-----------------------------
382+
Control streams emit two MQTT topics: ``:commands`` (input) and ``:status``
383+
(output, where the system reports execution results). Subscribe to status
384+
updates:
385+
386+
.. code-block:: python
387+
388+
def on_status(client, userdata, msg):
389+
print(f"Status on {msg.topic}: {msg.payload}")
390+
391+
control_stream.subscribe(topic='status', callback=on_status)
392+
393+
Inbound status reports are also pushed onto an internal deque — drain it
394+
exactly like a datastream's inbound queue:
395+
396+
.. code-block:: python
397+
398+
while control_stream.get_status_deque_inbound():
399+
status = control_stream.get_status_deque_inbound().popleft()
400+
print(status)
401+
402+
242403
Inserting an Observation
243404
------------------------
244405
Once a datastream is registered, send observation data using ``insert_observation_dict()``:

src/oshconnect/schema_datamodels.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,15 @@
1616
from .encoding import Encoding
1717
from .geometry import Geometry
1818
from .swe_components import AnyComponent, check_named
19+
from .timemanagement import TimeInstant
20+
21+
22+
def _now_iso8601_z() -> str:
23+
"""Per-call default for ``CommandJSON.issue_time``: a UTC timestamp with
24+
trailing ``Z`` (CS API Part 2 / SWE Common 3 expect a valid ISO8601
25+
with zone info — OSH 400s on the bare ``datetime.now().isoformat()``
26+
form because it has no zone designator)."""
27+
return TimeInstant.now_as_time_instant().get_iso_time()
1928

2029

2130
def _dump_csapi(model: BaseModel) -> dict:
@@ -35,9 +44,12 @@ class CommandJSON(BaseModel):
3544
"""
3645
model_config = ConfigDict(populate_by_name=True)
3746
control_id: str = Field(None, serialization_alias="control@id")
38-
issue_time: Union[str, float] = Field(datetime.now().isoformat(), serialization_alias="issueTime")
47+
issue_time: Union[str, float] = Field(default_factory=_now_iso8601_z,
48+
serialization_alias="issueTime")
3949
sender: str = Field(None)
40-
params: Union[dict, list, int, float, str] = Field(None)
50+
# CS API Part 2 — and OSH — call this field ``parameters`` on the wire.
51+
# ``populate_by_name=True`` keeps the Python attribute readable as ``params``.
52+
params: Union[dict, list, int, float, str] = Field(None, alias="parameters")
4153

4254
def to_csapi_dict(self) -> dict:
4355
"""Render as the CS API `application/json` command body."""

0 commit comments

Comments
 (0)