|
| 1 | +# Architecture |
| 2 | + |
| 3 | +OSHConnect is structured around a small number of long-lived objects that mirror |
| 4 | +the resource hierarchy of the OGC API – Connected Systems specification. |
| 5 | + |
| 6 | +## Object hierarchy |
| 7 | + |
| 8 | +```mermaid |
| 9 | +graph TD |
| 10 | + OSHConnect[OSHConnect<br/><i>application entry point</i>] |
| 11 | + Node[Node<br/><i>connection to one OSH server</i>] |
| 12 | + APIHelper[APIHelper<br/><i>CS API HTTP requests</i>] |
| 13 | + Session[SessionManager<br/><i>OSHClientSession instances</i>] |
| 14 | + MQTT[MQTTCommClient<br/><i>paho-mqtt wrapper</i>] |
| 15 | + System[System<br/><i>sensor system</i>] |
| 16 | + Datastream[Datastream<br/><i>output channel — observations</i>] |
| 17 | + ControlStream[ControlStream<br/><i>input channel — commands & status</i>] |
| 18 | +
|
| 19 | + OSHConnect --> Node |
| 20 | + Node --> APIHelper |
| 21 | + Node --> Session |
| 22 | + Node --> MQTT |
| 23 | + Node --> System |
| 24 | + System --> Datastream |
| 25 | + System --> ControlStream |
| 26 | +``` |
| 27 | + |
| 28 | +## Key abstractions |
| 29 | + |
| 30 | +- **`OSHConnect`** (`oshconnectapi.py`) — top-level class. Owns nodes and |
| 31 | + provides `discover_systems()`, `discover_datastreams()`, |
| 32 | + `save_config()` / `load_config()`, and `create_and_insert_system()`. |
| 33 | +- **`Node`** (`streamableresource.py`) — wraps a server connection. Drives |
| 34 | + discovery via `APIHelper` and owns the `MQTTCommClient`. All HTTP resource |
| 35 | + creation goes through here. |
| 36 | +- **`StreamableResource`** (`streamableresource.py`) — abstract base for |
| 37 | + `System`, `Datastream`, and `ControlStream`. Manages MQTT |
| 38 | + subscriptions/publications, WebSocket connections, and the inbound / |
| 39 | + outbound message deques. Connection modes: `PUSH`, `PULL`, `BIDIRECTIONAL`. |
| 40 | +- **`Datastream` / `ControlStream`** (`streamableresource.py`) — concrete |
| 41 | + streamable resources. Datastreams publish observations; ControlStreams |
| 42 | + publish commands and receive status updates. Both follow CS API Part 3 |
| 43 | + topic conventions (`:data`, `:status`, `:commands`). |
| 44 | +- **`resource_datamodels.py`** — Pydantic models for the CS API resource types |
| 45 | + (`SystemResource`, `DatastreamResource`, `ControlStreamResource`, |
| 46 | + `ObservationResource`). These map directly to API request and response |
| 47 | + bodies. |
| 48 | +- **`swe_components.py`** — Pydantic models for SWE Common schema components |
| 49 | + (`DataRecordSchema`, `QuantitySchema`, `VectorSchema`, etc.). Used to define |
| 50 | + observation and command schemas when creating new datastreams. |
| 51 | +- **`csapi4py/`** — sub-package that handles the CS API specifics: URL |
| 52 | + construction (`endpoints.py`), request building (`con_sys_api.py`), enums |
| 53 | + (`constants.py`), and MQTT topic conventions (`mqtt.py`). |
| 54 | +- **`EventHandler`** (`eventbus.py`) — singleton pub/sub bus. Listeners |
| 55 | + subscribe to event types (e.g. `NEW_OBSERVATION`) and topic strings; events |
| 56 | + are dispatched asynchronously through an internal queue. |
| 57 | +- **`timemanagement.py`** — `TimeInstant` (epoch / ISO-8601), `TimePeriod`, |
| 58 | + `TemporalModes` (`REAL_TIME`, `ARCHIVE`, `BATCH`), and `TimeUtils` |
| 59 | + conversions. |
| 60 | + |
| 61 | +## Typical data flow |
| 62 | + |
| 63 | +```mermaid |
| 64 | +sequenceDiagram |
| 65 | + autonumber |
| 66 | + participant App as OSHConnect |
| 67 | + participant N as Node |
| 68 | + participant H as APIHelper |
| 69 | + participant S as Server |
| 70 | + participant DS as Datastream |
| 71 | +
|
| 72 | + App->>N: add_node() |
| 73 | + App->>N: discover_systems() |
| 74 | + N->>H: retrieve_resource(SYSTEM) |
| 75 | + H->>S: HTTP GET /systems |
| 76 | + S-->>H: JSON |
| 77 | + H-->>N: System objects |
| 78 | + App->>DS: discover_datastreams() |
| 79 | + DS->>DS: initialize() — open MQTT/WebSocket |
| 80 | + DS->>DS: start() — begin streaming |
| 81 | + S-->>DS: observations → _inbound_deque |
| 82 | + Note over App,DS: To insert: resource.insert_self() →<br/>APIHelper.create_resource() → POST →<br/>server returns Location header with new ID |
| 83 | +``` |
| 84 | + |
| 85 | +## Dependencies |
| 86 | + |
| 87 | +- **pydantic** — all resource and schema models. Bumping the minimum requires |
| 88 | + confirming pre-built wheels exist for all supported Python versions |
| 89 | + (3.12 – 3.14). |
| 90 | +- **shapely** — geometry handling for spatial resources. |
| 91 | +- **paho-mqtt** — MQTT streaming for CS API Part 3. |
| 92 | +- **websockets** / **aiohttp** — WebSocket and async HTTP streaming. |
| 93 | +- **requests** — synchronous HTTP for discovery and resource creation. |
0 commit comments