Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions installer/data-downloader/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
INFLUX_HOST=http://3.98.181.12:9000
INFLUX_TOKEN=apiv3_wfr_admin_token_change_in_production
INFLUX_DATABASE=WFR25
INFLUX_SCHEMA=iox
INFLUX_TABLE=WFR25
DATA_DIR=/app/data
SCANNER_YEAR=2025
SCANNER_BIN=hour
SCANNER_INCLUDE_COUNTS=true
SCANNER_INITIAL_CHUNK_DAYS=31
SENSOR_WINDOW_DAYS=7
SENSOR_LOOKBACK_DAYS=30
SENSOR_FALLBACK_START=2025-06-19T00:00:00
SENSOR_FALLBACK_END=2025-07-10T00:00:00
SCAN_INTERVAL_SECONDS=3600
VITE_API_BASE_URL=http://localhost:8000
ALLOWED_ORIGINS=http://localhost:3000,http://localhost:5173
35 changes: 35 additions & 0 deletions installer/data-downloader/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Data Downloader Webapp

This project packages the DAQ data-downloader experience into a small stack:

- **React frontend** (`frontend/`) for browsing historic runs, triggering scans, and annotating runs.
- **FastAPI backend** (`backend/`) that reads/writes JSON state, exposes REST endpoints, and can launch scans on demand.
- **Scanner worker** (separate Docker service) that periodically runs the InfluxDB availability scan plus the unique sensor collector and exports the results to `data/runs.json` and `data/sensors.json`.

Both JSON files are shared through the `./data` directory so every service (frontend, API, scanner) sees the latest state. Notes added in the UI are stored in the same JSON payload next to the run entry.

## Getting started

1. Duplicate the sample env file and fill in the InfluxDB credentials:
```bash
cp .env.example .env
```
2. Build + launch everything:
```bash
docker compose up --build
```
3. Open http://localhost:3000 to access the web UI, and keep the API running on http://localhost:8000 if you want to call it directly.

## Runtime behaviour

- `frontend` serves the compiled React bundle via nginx. The UI calls the API using the `VITE_API_BASE_URL` value that gets baked into the build (defaults to http://localhost:8000). Match this host in `ALLOWED_ORIGINS` so CORS preflights succeed when the UI hits the API from another port.
- `api` runs `uvicorn backend.app:app`, exposing
- `GET /api/runs` and `GET /api/sensors`
- `POST /api/runs/{key}/note` to persist notes per run
- `POST /api/scan` to fire an on-demand scan that refreshes both JSON files in the background
- `POST /api/data/query` to request a timeseries slice for a given `signalName` between two timestamps; the response echoes the exact SQL (matching `sql.py`) so the frontend can display the query being executed.
- `scanner` reuses the same backend image but runs `python -m backend.periodic_worker` so the scan + unique sensor collection happens at the interval defined by `SCAN_INTERVAL_SECONDS`.

Set `INFLUX_SCHEMA`/`INFLUX_TABLE` to the same values used in the legacy scripts (e.g. `iox` + `WFR25`) so the SQL sent from `backend/server_scanner.py` and `backend/sql.py` matches the proven queries.

All services mount `./data` inside the container and the FastAPI layer manages file I/O with atomic writes to keep data consistent between the worker and UI actions. If the rolling lookback produces no sensors, the collector automatically falls back to the historic 2025-06-19 -> 2025-07-10 window (tune via `SENSOR_FALLBACK_START` / `SENSOR_FALLBACK_END`).
15 changes: 15 additions & 0 deletions installer/data-downloader/backend/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM python:3.11-slim AS base

ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1

WORKDIR /app

COPY backend/requirements.txt /tmp/requirements.txt
RUN pip install --no-cache-dir -r /tmp/requirements.txt

COPY backend /app/backend

EXPOSE 8000

CMD ["uvicorn", "backend.app:app", "--host", "0.0.0.0", "--port", "8000"]
1 change: 1 addition & 0 deletions installer/data-downloader/backend/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Makes backend a package.
75 changes: 75 additions & 0 deletions installer/data-downloader/backend/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from __future__ import annotations

from datetime import datetime

from fastapi import BackgroundTasks, FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel

from backend.config import get_settings
from backend.services import DataDownloaderService


class NotePayload(BaseModel):
note: str


class DataQueryPayload(BaseModel):
signal: str
start: datetime
end: datetime
limit: int | None = 2000
no_limit: bool = False


settings = get_settings()
service = DataDownloaderService(settings)

app = FastAPI(title="DAQ Data Downloader API")
app.add_middleware(
CORSMiddleware,
allow_origins=settings.allowed_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)


@app.get("/api/health")
def healthcheck() -> dict:
return {"status": "ok"}


@app.get("/api/runs")
def list_runs() -> dict:
return service.get_runs()


@app.get("/api/sensors")
def list_sensors() -> dict:
return service.get_sensors()


@app.get("/api/scanner-status")
def scanner_status() -> dict:
return service.get_scanner_status()


@app.post("/api/runs/{key}/note")
def save_note(key: str, payload: NotePayload) -> dict:
run = service.update_note(key, payload.note.strip())
if not run:
raise HTTPException(status_code=404, detail=f"Run {key} not found")
return run


@app.post("/api/scan")
def trigger_scan(background_tasks: BackgroundTasks) -> dict:
background_tasks.add_task(service.run_full_scan, "manual")
return {"status": "scheduled"}


@app.post("/api/data/query")
def query_data(payload: DataQueryPayload) -> dict:
limit = None if payload.no_limit else (payload.limit or 2000)
return service.query_signal_series(payload.signal, payload.start, payload.end, limit)
44 changes: 44 additions & 0 deletions installer/data-downloader/backend/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from __future__ import annotations

from functools import lru_cache
import os
from typing import List
from pydantic import BaseModel, Field


def _parse_origins(raw: str | None) -> List[str]:
if not raw or raw.strip() == "*":
return ["*"]
return [origin.strip() for origin in raw.split(",") if origin.strip()]


class Settings(BaseModel):
"""Centralised configuration pulled from environment variables."""

data_dir: str = Field(default_factory=lambda: os.getenv("DATA_DIR", "./data"))

influx_host: str = Field(default_factory=lambda: os.getenv("INFLUX_HOST", "http://localhost:9000"))
influx_token: str = Field(default_factory=lambda: os.getenv("INFLUX_TOKEN", ""))
influx_database: str = Field(default_factory=lambda: os.getenv("INFLUX_DATABASE", "WFR25"))
influx_schema: str = Field(default_factory=lambda: os.getenv("INFLUX_SCHEMA", "iox"))
influx_table: str = Field(default_factory=lambda: os.getenv("INFLUX_TABLE", "WFR25"))

scanner_year: int = Field(default_factory=lambda: int(os.getenv("SCANNER_YEAR", "2025")))
scanner_bin: str = Field(default_factory=lambda: os.getenv("SCANNER_BIN", "hour")) # hour or day
scanner_include_counts: bool = Field(default_factory=lambda: os.getenv("SCANNER_INCLUDE_COUNTS", "true").lower() == "true")
scanner_initial_chunk_days: int = Field(default_factory=lambda: int(os.getenv("SCANNER_INITIAL_CHUNK_DAYS", "31")))

sensor_window_days: int = Field(default_factory=lambda: int(os.getenv("SENSOR_WINDOW_DAYS", "7")))
sensor_lookback_days: int = Field(default_factory=lambda: int(os.getenv("SENSOR_LOOKBACK_DAYS", "30")))
sensor_fallback_start: str | None = Field(default_factory=lambda: os.getenv("SENSOR_FALLBACK_START", "2025-06-19T00:00:00"))
sensor_fallback_end: str | None = Field(default_factory=lambda: os.getenv("SENSOR_FALLBACK_END", "2025-07-10T00:00:00"))

periodic_interval_seconds: int = Field(default_factory=lambda: int(os.getenv("SCAN_INTERVAL_SECONDS", "3600")))

allowed_origins: List[str] = Field(default_factory=lambda: _parse_origins(os.getenv("ALLOWED_ORIGINS", "*")))


@lru_cache(maxsize=1)
def get_settings() -> Settings:
"""Cache settings so the same instance is reused across the app."""
return Settings()
80 changes: 80 additions & 0 deletions installer/data-downloader/backend/influx_queries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from __future__ import annotations

from datetime import datetime, timezone

from influxdb_client_3 import InfluxDBClient3

from backend.config import Settings
from backend.table_utils import quote_literal, quote_table


def _normalize(dt: datetime) -> datetime:
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)


def fetch_signal_series(settings: Settings, signal: str, start: datetime, end: datetime, limit: int | None) -> dict:
start_dt = _normalize(start)
end_dt = _normalize(end)
if start_dt >= end_dt:
raise ValueError("start must be before end")
limit_clause = ""
if limit is not None:
limit = max(10, min(limit, 20000))
limit_clause = f" LIMIT {limit}"

table_ref = quote_table(f"{settings.influx_schema}.{settings.influx_table}")
signal_literal = quote_literal(signal)

sql = f"""
SELECT time, "sensorReading"
FROM {table_ref}
WHERE "signalName" = {signal_literal}
AND time >= TIMESTAMP '{start_dt.isoformat()}'
AND time <= TIMESTAMP '{end_dt.isoformat()}'
ORDER BY time{limit_clause}
"""

with InfluxDBClient3(host=settings.influx_host, token=settings.influx_token, database=settings.influx_database) as client:
tbl = client.query(sql)
points = []
for idx in range(tbl.num_rows):
ts_scalar = tbl.column("time")[idx]
value_scalar = tbl.column("sensorReading")[idx]
ts = _timestamp_scalar_to_datetime(ts_scalar)
value = value_scalar.as_py()
points.append(
{
"time": ts.isoformat(),
"value": float(value),
}
)

return {
"signal": signal,
"start": start_dt.isoformat(),
"end": end_dt.isoformat(),
"limit": limit,
"row_count": len(points),
"points": points,
"sql": " ".join(line.strip() for line in sql.strip().splitlines()),
}


def _timestamp_scalar_to_datetime(scalar) -> datetime:
"""Convert PyArrow TimestampScalar to timezone-aware datetime."""
try:
ts = scalar.as_py()
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
else:
ts = ts.astimezone(timezone.utc)
return ts
except ValueError:
# Fallback for nanosecond precision timestamps that can't fit in datetime micros
ts_ns = getattr(scalar, "value", None)
if ts_ns is None:
raise
ts = datetime.fromtimestamp(ts_ns / 1_000_000_000, tz=timezone.utc)
return ts
28 changes: 28 additions & 0 deletions installer/data-downloader/backend/periodic_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from __future__ import annotations

import asyncio
import logging

from backend.config import get_settings
from backend.services import DataDownloaderService

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")


async def run_worker():
settings = get_settings()
service = DataDownloaderService(settings)
interval = max(30, settings.periodic_interval_seconds)
logging.info("Starting periodic scanner loop (interval=%ss)", interval)
while True:
try:
logging.info("Running scheduled scan...")
service.run_full_scan(source="periodic")
logging.info("Finished scheduled scan.")
except Exception:
logging.exception("Scheduled scan failed")
await asyncio.sleep(interval)


if __name__ == "__main__":
asyncio.run(run_worker())
4 changes: 4 additions & 0 deletions installer/data-downloader/backend/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
fastapi==0.115.4
uvicorn[standard]==0.23.2
influxdb3-python==0.16.0
pydantic==2.9.2
Loading
Loading