Skip to content

Commit b7e551f

Browse files
committed
feat: Go server dual-publish support for all 8 remaining publishers
- Add OSH_BASE_URL override to NWS, NDBC, CO-OPS, AviationWx, USGS Water, USGS NIMS, NDBC BuoyCAM publishers - Add _is_go_server flag with NaN→0.0 and timestamp→string coercion - Add REST-only mode to ISS publisher (bypasses OSHConnect SDK when OSH_BASE_URL is set) - Add uid to all datastream schemas in bootstrap scripts (required by Go server for datastream creation) - Create ISS bootstrap using bootstrap_helpers.py
1 parent f34287d commit b7e551f

15 files changed

Lines changed: 506 additions & 7 deletions

publishers/aviation_wx/aviation_wx_publisher.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,11 @@ def __init__(self, station_filter: list[str] | None = None):
226226

227227
# REST config
228228
import base64
229-
self._base_url = f"https://{self.osh_address}/{self.osh_root}/api"
229+
self._base_url = os.environ.get(
230+
"OSH_BASE_URL",
231+
f"https://{self.osh_address}/{self.osh_root}/api",
232+
)
233+
self._is_go_server = "csapi-go" in self._base_url
230234
self._auth = "Basic " + base64.b64encode(
231235
f"{self.osh_user}:{self.osh_pass}".encode()).decode()
232236

@@ -278,6 +282,16 @@ def _post_observation(self, ds_id: str, obs: dict):
278282
"""POST an observation to the server using direct REST (allows NaN serialization)."""
279283
import ssl
280284

285+
# Go server workarounds
286+
if self._is_go_server:
287+
r = obs.get("result", {})
288+
for key, val in list(r.items()):
289+
if val == "NaN":
290+
r[key] = 0.0
291+
# Coerce numeric timestamp to string
292+
if "timestamp" in r and not isinstance(r["timestamp"], str):
293+
r["timestamp"] = str(r["timestamp"])
294+
281295
url = f"{self._base_url}/datastreams/{ds_id}/observations"
282296
body = json.dumps(obs).encode()
283297

publishers/aviation_wx/bootstrap_aviation_wx.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ def _datastream_schema() -> dict:
337337
raw_metar - Raw METAR text
338338
"""
339339
return {
340+
"uid": "urn:os4csapi:datastream:awx-station:metarObs:v1",
340341
"outputName": DS_OUTPUT_NAME,
341342
"name": "METAR Observation",
342343
"description": (

publishers/coops/bootstrap_coops.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,7 @@ def _datastream_schema() -> dict:
533533
pressure_hpa - Barometric pressure (hPa / mb)
534534
"""
535535
return {
536+
"uid": "urn:os4csapi:datastream:coops-station:coopsCoastalObs:v1",
536537
"outputName": DS_OUTPUT_NAME,
537538
"name": "Coastal Observation",
538539
"description": (

publishers/coops/coops_publisher.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,11 @@ def __init__(self, station_filter: list[str] | None = None):
247247

248248
# REST config
249249
import base64
250-
self._base_url = f"https://{self.osh_address}/{self.osh_root}/api"
250+
self._base_url = os.environ.get(
251+
"OSH_BASE_URL",
252+
f"https://{self.osh_address}/{self.osh_root}/api",
253+
)
254+
self._is_go_server = "csapi-go" in self._base_url
251255
self._auth = "Basic " + base64.b64encode(
252256
f"{self.osh_user}:{self.osh_pass}".encode()).decode()
253257

@@ -300,6 +304,16 @@ def _post_observation(self, ds_id: str, obs: dict):
300304
import ssl
301305
from urllib.request import Request as _Req, urlopen as _urlopen
302306

307+
# Go server workarounds
308+
if self._is_go_server:
309+
r = obs.get("result", {})
310+
for key, val in list(r.items()):
311+
if val == "NaN":
312+
r[key] = 0.0
313+
# Coerce numeric timestamp to string
314+
if "timestamp" in r and not isinstance(r["timestamp"], str):
315+
r["timestamp"] = str(r["timestamp"])
316+
303317
url = f"{self._base_url}/datastreams/{ds_id}/observations"
304318
body = json.dumps(obs).encode()
305319

publishers/iss/bootstrap_iss.py

Lines changed: 346 additions & 0 deletions
Large diffs are not rendered by default.

publishers/iss/iss_publisher.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,24 @@ class ISSPublisher(PublisherBase):
195195
"urn:os4csapi:system:iss-position-publisher:v1")
196196
ds_name = os.environ.get("POS_DS_NAME", "ISS Position (SGP4)")
197197

198+
def __init__(self):
199+
# REST-only mode: bypass OSHConnect SDK when OSH_BASE_URL is set
200+
self._rest_mode = bool(os.environ.get("OSH_BASE_URL"))
201+
if self._rest_mode:
202+
import base64
203+
self.osh_address = os.environ.get("OSH_ADDRESS", "")
204+
self.osh_user = os.environ.get("OSH_USER", "")
205+
self.osh_pass = os.environ.get("OSH_PASS", "")
206+
self._base_url = os.environ["OSH_BASE_URL"]
207+
self._is_go_server = "csapi-go" in self._base_url
208+
self._auth = "Basic " + base64.b64encode(
209+
f"{self.osh_user}:{self.osh_pass}".encode()).decode()
210+
self._ds_id: str | None = None
211+
self.stats = {"published": 0, "errors": 0, "reconnects": 0}
212+
else:
213+
super().__init__()
214+
self._is_go_server = False
215+
198216
def configure_cli(self, parser: argparse.ArgumentParser):
199217
parser.add_argument("--tle-refresh", type=float, default=3600.0,
200218
help="Seconds between TLE refreshes (default: 3600)")
@@ -212,6 +230,58 @@ def on_startup(self, args):
212230
print(f" FATAL: Could not fetch TLE: {e}")
213231
sys.exit(1)
214232

233+
def connect(self):
234+
"""Connect to server. Uses REST mode when OSH_BASE_URL is set, SDK otherwise."""
235+
if self._rest_mode:
236+
from publishers.bootstrap_helpers import api_get, find_by_uid
237+
sys_id = find_by_uid(self._base_url, self._auth, "systems", self.system_uid)
238+
if not sys_id:
239+
raise RuntimeError(f"System '{self.system_uid}' not found on server")
240+
ds_list = api_get(self._base_url, f"systems/{sys_id}/datastreams", self._auth)
241+
if ds_list:
242+
for item in ds_list.get("items", []):
243+
if item.get("outputName") == "issPosition":
244+
self._ds_id = item.get("id")
245+
break
246+
if not self._ds_id:
247+
raise RuntimeError(f"Datastream 'issPosition' not found under system {sys_id}")
248+
print(f" Connected (REST): sys={sys_id} ds={self._ds_id}")
249+
else:
250+
return super().connect()
251+
252+
def publish_obs(self, obs: dict) -> bool:
253+
"""POST observation. Uses REST when in REST mode, SDK otherwise."""
254+
if self._rest_mode:
255+
import ssl
256+
url = f"{self._base_url}/datastreams/{self._ds_id}/observations"
257+
258+
# Go server: coerce numeric timestamp to string
259+
if self._is_go_server:
260+
r = obs.get("result", {})
261+
if "timestamp" in r and not isinstance(r["timestamp"], str):
262+
r["timestamp"] = str(r["timestamp"])
263+
264+
body = json.dumps(obs).encode()
265+
ctx = ssl.create_default_context()
266+
ctx.check_hostname = False
267+
ctx.verify_mode = ssl.CERT_NONE
268+
req = Request(url, data=body, method="POST", headers={
269+
"Content-Type": "application/json",
270+
"Accept": "application/json",
271+
"Authorization": self._auth,
272+
})
273+
try:
274+
with urlopen(req, timeout=30, context=ctx) as resp:
275+
if resp.status not in (200, 201, 204):
276+
raise RuntimeError(f"HTTP {resp.status}")
277+
self.stats["published"] += 1
278+
return True
279+
except Exception as e:
280+
self.stats["errors"] += 1
281+
raise
282+
else:
283+
return super().publish_obs(obs)
284+
215285
def fetch(self) -> Any:
216286
sat = get_satrec()
217287
now = datetime.now(timezone.utc)

publishers/ndbc/bootstrap_ndbc.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,7 @@ def _datastream_schema() -> dict:
436436
TIDE - Water level (ft)
437437
"""
438438
return {
439+
"uid": "urn:os4csapi:datastream:ndbc-buoy:ndbcBuoyObs:v1",
439440
"outputName": DS_OUTPUT_NAME,
440441
"name": "Buoy Observation",
441442
"description": (
@@ -551,6 +552,7 @@ def _datastream_schema() -> dict:
551552
def _buoycam_datastream_schema() -> dict:
552553
"""SWE DataRecord schema for BuoyCAM image-reference datastream."""
553554
return {
555+
"uid": "urn:os4csapi:datastream:ndbc-buoy:ndbcBuoyCamImage:v1",
554556
"outputName": BUOYCAM_DS_OUTPUT_NAME,
555557
"name": "BuoyCAM Image",
556558
"description": (

publishers/ndbc/ndbc_buoycam_publisher.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,11 @@ def __init__(self, station_filter: list[str] | None = None):
149149
" Copy publishers/.env.example → .env and set your server details."
150150
)
151151

152-
self._base_url = f"https://{self.osh_address}/{self.osh_root}/api"
152+
self._base_url = os.environ.get(
153+
"OSH_BASE_URL",
154+
f"https://{self.osh_address}/{self.osh_root}/api",
155+
)
156+
self._is_go_server = "csapi-go" in self._base_url
153157
self._auth = "Basic " + base64.b64encode(
154158
f"{self.osh_user}:{self.osh_pass}".encode()).decode()
155159

publishers/ndbc/ndbc_publisher.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,11 @@ def __init__(self, station_filter: list[str] | None = None):
192192

193193
# REST config
194194
import base64
195-
self._base_url = f"https://{self.osh_address}/{self.osh_root}/api"
195+
self._base_url = os.environ.get(
196+
"OSH_BASE_URL",
197+
f"https://{self.osh_address}/{self.osh_root}/api",
198+
)
199+
self._is_go_server = "csapi-go" in self._base_url
196200
self._auth = "Basic " + base64.b64encode(
197201
f"{self.osh_user}:{self.osh_pass}".encode()).decode()
198202

@@ -245,6 +249,16 @@ def _post_observation(self, ds_id: str, obs: dict):
245249
import ssl
246250
from urllib.request import Request as _Req, urlopen as _urlopen
247251

252+
# Go server workarounds
253+
if self._is_go_server:
254+
r = obs.get("result", {})
255+
for key, val in list(r.items()):
256+
if val == "NaN":
257+
r[key] = 0.0
258+
# Coerce numeric timestamp to string
259+
if "timestamp" in r and not isinstance(r["timestamp"], str):
260+
r["timestamp"] = str(r["timestamp"])
261+
248262
url = f"{self._base_url}/datastreams/{ds_id}/observations"
249263
body = json.dumps(obs).encode()
250264

publishers/nws/bootstrap_nws.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ def _datastream_schema() -> dict:
341341
{ name, outputName, schema: { obsFormat, resultSchema: { ... } } }
342342
"""
343343
return {
344+
"uid": "urn:os4csapi:datastream:nws-station:nwsSurfaceObs:v1",
344345
"outputName": DS_OUTPUT_NAME,
345346
"name": "Surface Observation",
346347
"description": (

0 commit comments

Comments
 (0)