Skip to content

Commit b114f9a

Browse files
improve control stream discovery to have it get the command schema in the same process.
1 parent 774b3c4 commit b114f9a

7 files changed

Lines changed: 87 additions & 57 deletions

File tree

docs/source/tutorial.rst

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -285,26 +285,31 @@ describing the command structure, then attach it to a system via
285285
286286
control_stream = new_system.add_and_insert_control_stream(command_record)
287287
288-
By default the wire form is ``application/swe+json`` (spec-compliant CS API
289-
Part 2 — ``commandFormat: "application/swe+json"`` plus ``recordSchema`` plus
290-
a ``JSONEncoding`` block). To target the JSON envelope instead (which is
291-
what OSH echoes back from ``/controlstreams/{id}/schema``), pass
292-
``command_format='application/json'``:
288+
The default wire form is ``application/json`` —
289+
``commandFormat: "application/json"`` with a ``parametersSchema`` block
290+
(no ``encoding``). It matches what OSH echoes back from
291+
``GET /controlstreams/{id}/schema?f=json``, which is the form
292+
``discover_controlstreams`` parses, so cross-node sync round-trips
293+
without any format conversion. It also sidesteps the SWE+JSON
294+
``encoding``-omission deviation documented in
295+
``docs/osh_spec_deviations.md`` §1.
296+
297+
For the spec-canonical SWE+JSON form (``recordSchema`` plus a
298+
``JSONEncoding`` block), pass ``command_format='application/swe+json'``:
293299

294300
.. code-block:: python
295301
296302
control_stream = new_system.add_and_insert_control_stream(
297303
command_record,
298-
command_format='application/json',
304+
command_format='application/swe+json',
299305
)
300306
301-
The JSON form emits ``commandFormat: "application/json"`` with a
302-
``parametersSchema`` block (no ``encoding``).
303-
304307
For full control over the resource body — for example, when copying a
305308
control stream from one node to another and you already have a
306309
``ControlStreamResource`` in hand — use ``add_insert_controlstream(...)``
307-
instead. It takes a fully-built resource and POSTs it as-is:
310+
instead. It takes a fully-built resource and POSTs it as-is. Build the
311+
embedded ``command_schema`` as a ``JSONCommandSchema`` for the
312+
recommended JSON form:
308313

309314
.. code-block:: python
310315

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "oshconnect"
3-
version = "0.5.1a2"
3+
version = "0.5.1a3"
44
description = "Library for interfacing with OSH, helping guide visualization efforts, and providing a place to store configurations. Implements OGC CS API Part 3 (Pub/Sub) MQTT topic conventions including :data topics and resource event topics."
55
readme = "README.md"
66
authors = [

src/oshconnect/schema_datamodels.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from __future__ import annotations
88

99
from datetime import datetime
10-
from typing import Union, List
10+
from typing import Union, List, Literal
1111

1212
from pydantic import BaseModel, Field, SerializeAsAny, field_validator, model_validator, HttpUrl, ConfigDict
1313

@@ -101,7 +101,7 @@ class JSONCommandSchema(CommandSchema):
101101
"""
102102
model_config = ConfigDict(populate_by_name=True)
103103

104-
command_format: str = Field("application/json", alias='commandFormat')
104+
command_format: Literal["application/json"] = Field("application/json", alias='commandFormat')
105105
params_schema: AnyComponent = Field(..., alias='parametersSchema')
106106
result_schema: AnyComponent = Field(None, alias='resultSchema')
107107
feasibility_schema: AnyComponent = Field(None, alias='feasibilityResultSchema')

src/oshconnect/streamableresource.py

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -993,15 +993,18 @@ def discover_controlstreams(self) -> list[ControlStream]:
993993
``self.control_channels`` and also returned.
994994
995995
For each discovered control stream we additionally fetch the
996-
command schema (``GET /controlstreams/{id}/schema``, which OSH
997-
returns as ``application/json`` with a ``parametersSchema``
998-
SWE Common component) and cache it on
999-
``_underlying_resource.command_schema``. The CS API listing
1000-
endpoint omits the inner schema, so without this step every
1001-
discovered control stream would be missing the schema callers
1002-
need for command construction or cross-node sync. A failure on
1003-
a single control stream's schema fetch is downgraded to a
1004-
warning so it doesn't poison the whole call.
996+
command schema (``GET /controlstreams/{id}/schema?f=json``,
997+
which OSH returns as ``application/json`` with a
998+
``parametersSchema`` SWE Common component) and cache it on
999+
``_underlying_resource.command_schema`` as a `JSONCommandSchema`.
1000+
``f=json`` is the OGC API standard format-selector and pins the
1001+
response shape to the JSON variant — without it the server
1002+
default could change. The CS API listing endpoint omits the
1003+
inner schema, so without this step every discovered control
1004+
stream would be missing the schema callers need for command
1005+
construction or cross-node sync. A failure on a single control
1006+
stream's schema fetch is downgraded to a warning so it doesn't
1007+
poison the whole call.
10051008
"""
10061009
api = self._parent_node.get_api_helper()
10071010
res = api.get_resource(APIResourceTypes.SYSTEM, self._resource_id,
@@ -1016,6 +1019,7 @@ def discover_controlstreams(self) -> list[ControlStream]:
10161019
schema_resp = api.get_resource(
10171020
APIResourceTypes.CONTROL_CHANNEL, controlstream_objs.cs_id,
10181021
APIResourceTypes.SCHEMA,
1022+
params={'f': 'json'},
10191023
)
10201024
schema_resp.raise_for_status()
10211025
new_cs._underlying_resource.command_schema = (
@@ -1165,12 +1169,21 @@ def add_insert_controlstream(self, controlstream_resource: ControlStreamResource
11651169
the system's parent node via HTTP POST.
11661170
11671171
Mirrors `add_insert_datastream`: caller assembles the full
1168-
`ControlStreamResource` (including the embedded `command_schema`
1169-
— a `JSONCommandSchema` for ``application/json`` or a
1170-
`SWEJSONCommandSchema` for ``application/swe+json``) and this
1171-
method posts it to ``/systems/{id}/controlstreams``, captures
1172-
the new resource ID from the ``Location`` header, and returns a
1173-
wrapped `ControlStream`.
1172+
`ControlStreamResource` (including the embedded `command_schema`)
1173+
and this method posts it to ``/systems/{id}/controlstreams``,
1174+
captures the new resource ID from the ``Location`` header, and
1175+
returns a wrapped `ControlStream`.
1176+
1177+
For the embedded `command_schema`, prefer
1178+
`JSONCommandSchema` (`commandFormat: application/json` with a
1179+
``parametersSchema``). It matches what OSH returns from
1180+
``GET /controlstreams/{id}/schema?f=json`` (the form
1181+
``discover_controlstreams`` parses), keeps round-trip sync
1182+
symmetric, and avoids the SWE+JSON ``encoding``-omission
1183+
deviation documented in ``docs/osh_spec_deviations.md`` §1.
1184+
`SWEJSONCommandSchema` (``application/swe+json`` with
1185+
``recordSchema`` plus ``encoding``) is also accepted for
1186+
spec-strict scenarios.
11741187
11751188
:param controlstream_resource: A fully-built
11761189
`ControlStreamResource` carrying ``name``, ``input_name``,
@@ -1201,18 +1214,24 @@ def add_insert_controlstream(self, controlstream_resource: ControlStreamResource
12011214

12021215
def add_and_insert_control_stream(self, control_stream_record_schema: DataRecordSchema, input_name: str = None,
12031216
valid_time: TimePeriod = None,
1204-
command_format: str = "application/swe+json") -> ControlStream:
1217+
command_format: str = "application/json") -> ControlStream:
12051218
"""Accepts a DataRecordSchema and creates a ControlStreamResource
12061219
with the matching command-schema variant, then POSTs it to the
12071220
parent node.
12081221
12091222
Per CS API Part 2 §16.x, command schemas come in two wire forms:
12101223
1211-
- ``application/swe+json`` → `SWEJSONCommandSchema` carrying
1212-
`recordSchema` (the SWE Common component) and `encoding`
1213-
(`JSONEncoding`). This is the spec-compliant default.
12141224
- ``application/json`` → `JSONCommandSchema` carrying
12151225
`parametersSchema` (the SWE Common component); no `encoding`.
1226+
**This is the default.** It matches what OSH returns from
1227+
``GET /controlstreams/{id}/schema?f=json`` (the form
1228+
``discover_controlstreams`` parses), keeps round-trip sync
1229+
symmetric, and avoids the SWE+JSON ``encoding``-omission
1230+
deviation documented in ``docs/osh_spec_deviations.md`` §1.
1231+
- ``application/swe+json`` → `SWEJSONCommandSchema` carrying
1232+
`recordSchema` (the SWE Common component) and `encoding`
1233+
(`JSONEncoding`). Spec-canonical; pass
1234+
``command_format='application/swe+json'`` to opt in.
12161235
12171236
:param control_stream_record_schema: DataRecordSchema to wrap.
12181237
Must carry a ``name`` matching NameToken
@@ -1222,8 +1241,9 @@ def add_and_insert_control_stream(self, control_stream_record_schema: DataRecord
12221241
is lowercased and whitespace-stripped.
12231242
:param valid_time: Optional `TimePeriod`; defaults to
12241243
``[now, now + 1 year]``.
1225-
:param command_format: ``"application/swe+json"`` (default) or
1226-
``"application/json"``. Anything else raises ``ValueError``.
1244+
:param command_format: ``"application/json"`` (default) or
1245+
``"application/swe+json"``. Anything else raises
1246+
``ValueError``.
12271247
:return: ControlStream object added to the system.
12281248
"""
12291249
input_name_checked = input_name if input_name is not None else control_stream_record_schema.label.lower().replace(

tests/test_controlstream_insert_schema.py

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,10 @@ def _captured_body_json(captured: dict) -> dict:
8484
return json.loads(body)
8585

8686

87-
def test_swejson_default_emits_recordschema_and_encoding(system, monkeypatch):
88-
"""Default `command_format='application/swe+json'` must produce the
89-
spec-compliant wire form: ``commandFormat: application/swe+json`` plus
90-
``recordSchema`` plus ``encoding`` (JSONEncoding). NOT ``parametersSchema``."""
87+
def test_json_default_emits_parametersschema_no_encoding(system, monkeypatch):
88+
"""Default ``command_format='application/json'`` must produce the JSON
89+
wire form: ``commandFormat: application/json`` plus ``parametersSchema``.
90+
NOT ``recordSchema`` and NOT ``encoding``."""
9191
captured: dict = {}
9292
monkeypatch.setattr(
9393
"oshconnect.csapi4py.request_wrappers.requests.post", _capture_post(captured),
@@ -97,37 +97,37 @@ def test_swejson_default_emits_recordschema_and_encoding(system, monkeypatch):
9797

9898
body = _captured_body_json(captured)
9999
schema = body["schema"]
100-
assert schema["commandFormat"] == "application/swe+json"
101-
assert "recordSchema" in schema, "SWE+JSON form must carry recordSchema"
102-
assert "parametersSchema" not in schema, (
103-
"SWE+JSON form must NOT carry parametersSchema (that's the JSON form)"
100+
assert schema["commandFormat"] == "application/json"
101+
assert "parametersSchema" in schema, "JSON form must carry parametersSchema"
102+
assert "recordSchema" not in schema, (
103+
"JSON form must NOT carry recordSchema (that's the SWE+JSON form)"
104+
)
105+
assert "encoding" not in schema, (
106+
"JSON form has no encoding block — that's SWE+JSON only"
104107
)
105-
assert schema["encoding"]["type"] == "JSONEncoding"
106108

107109

108-
def test_json_emits_parametersschema_no_encoding(system, monkeypatch):
109-
"""`command_format='application/json'` must produce the JSON wire form:
110-
``commandFormat: application/json`` plus ``parametersSchema``. NOT
111-
``recordSchema`` and NOT ``encoding``."""
110+
def test_swejson_emits_recordschema_and_encoding(system, monkeypatch):
111+
"""`command_format='application/swe+json'` must produce the
112+
spec-canonical wire form: ``commandFormat: application/swe+json`` plus
113+
``recordSchema`` plus ``encoding`` (JSONEncoding). NOT ``parametersSchema``."""
112114
captured: dict = {}
113115
monkeypatch.setattr(
114116
"oshconnect.csapi4py.request_wrappers.requests.post", _capture_post(captured),
115117
)
116118

117119
system.add_and_insert_control_stream(
118-
_record_schema(), command_format="application/json",
120+
_record_schema(), command_format="application/swe+json",
119121
)
120122

121123
body = _captured_body_json(captured)
122124
schema = body["schema"]
123-
assert schema["commandFormat"] == "application/json"
124-
assert "parametersSchema" in schema, "JSON form must carry parametersSchema"
125-
assert "recordSchema" not in schema, (
126-
"JSON form must NOT carry recordSchema (that's the SWE+JSON form)"
127-
)
128-
assert "encoding" not in schema, (
129-
"JSON form has no encoding block — that's SWE+JSON only"
125+
assert schema["commandFormat"] == "application/swe+json"
126+
assert "recordSchema" in schema, "SWE+JSON form must carry recordSchema"
127+
assert "parametersSchema" not in schema, (
128+
"SWE+JSON form must NOT carry parametersSchema (that's the JSON form)"
130129
)
130+
assert schema["encoding"]["type"] == "JSONEncoding"
131131

132132

133133
def test_unsupported_command_format_raises(system):

tests/test_node_to_node_sync.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,13 @@
2222

2323
from oshconnect import Node, System
2424
from oshconnect.csapi4py.constants import APIResourceTypes
25+
from oshconnect.encoding import JSONEncoding
2526
from oshconnect.resource_datamodels import ControlStreamResource, DatastreamResource
26-
from oshconnect.schema_datamodels import CommandJSON, JSONCommandSchema, SWEDatastreamRecordSchema
27+
from oshconnect.schema_datamodels import (
28+
CommandJSON,
29+
SWEDatastreamRecordSchema,
30+
SWEJSONCommandSchema,
31+
)
2732
from oshconnect.timemanagement import TimeInstant, TimePeriod, TimeUtils
2833

2934
SRC_PORT = int(os.environ.get("OSHC_SRC_PORT", "8282"))

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)