Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 74 additions & 1 deletion database/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ CREATE TABLE cml_stats (
max_rsl REAL,
mean_rsl REAL,
stddev_rsl REAL,
completeness_percent_6h REAL,
total_records_6h BIGINT,
valid_records_6h BIGINT,
mean_rsl_6h REAL,
stddev_rsl_6h REAL,
completeness_percent_1h REAL,
mean_rsl_1h REAL,
stddev_rsl_1h REAL,
last_rsl REAL,
last_update TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (cml_id, user_id)
Expand Down Expand Up @@ -100,6 +108,69 @@ BEGIN
END;
$$ LANGUAGE plpgsql;

-- update_cml_stats_windowed(target_cml_id, target_user_id)
--
-- Computes 6-hour and 1-hour windowed statistics in a single table scan using
-- FILTER clauses. TimescaleDB chunk exclusion prunes all chunks older than 6 hours
-- at the storage level, so the query only touches the current uncompressed chunk
-- regardless of total dataset size.
CREATE OR REPLACE FUNCTION update_cml_stats_windowed(
target_cml_id TEXT,
target_user_id TEXT DEFAULT 'demo_openmrg'
) RETURNS VOID AS $$
DECLARE
now_ts TIMESTAMPTZ := NOW();
BEGIN
INSERT INTO cml_stats (
cml_id, user_id,
completeness_percent_6h, total_records_6h, valid_records_6h,
mean_rsl_6h, stddev_rsl_6h,
completeness_percent_1h, mean_rsl_1h, stddev_rsl_1h,
last_rsl, last_update
)
SELECT
target_cml_id, target_user_id,
-- 6h window
ROUND(
100.0 * COUNT(rsl) FILTER (WHERE time >= now_ts - INTERVAL '6 hours')
/ NULLIF(COUNT(*) FILTER (WHERE time >= now_ts - INTERVAL '6 hours'), 0),
2),
COUNT(*) FILTER (WHERE time >= now_ts - INTERVAL '6 hours'),
COUNT(rsl) FILTER (WHERE time >= now_ts - INTERVAL '6 hours'),
ROUND(AVG(rsl) FILTER (WHERE time >= now_ts - INTERVAL '6 hours')::numeric, 2),
ROUND(STDDEV(rsl) FILTER (WHERE time >= now_ts - INTERVAL '6 hours')::numeric, 2),
-- 1h window
ROUND(
100.0 * COUNT(rsl) FILTER (WHERE time >= now_ts - INTERVAL '1 hour')
/ NULLIF(COUNT(*) FILTER (WHERE time >= now_ts - INTERVAL '1 hour'), 0),
2),
ROUND(AVG(rsl) FILTER (WHERE time >= now_ts - INTERVAL '1 hour')::numeric, 2),
ROUND(STDDEV(rsl) FILTER (WHERE time >= now_ts - INTERVAL '1 hour')::numeric, 2),
-- last_rsl: unconstrained so we get the true last RSL even if the CML
-- has been quiet for more than 6 hours
(SELECT rsl FROM cml_data
WHERE cml_id = target_cml_id
AND user_id = target_user_id
ORDER BY time DESC LIMIT 1),
now_ts
FROM cml_data
WHERE cml_id = target_cml_id
AND user_id = target_user_id
AND time >= now_ts - INTERVAL '6 hours'
ON CONFLICT (cml_id, user_id) DO UPDATE SET
completeness_percent_6h = EXCLUDED.completeness_percent_6h,
total_records_6h = EXCLUDED.total_records_6h,
valid_records_6h = EXCLUDED.valid_records_6h,
mean_rsl_6h = EXCLUDED.mean_rsl_6h,
stddev_rsl_6h = EXCLUDED.stddev_rsl_6h,
completeness_percent_1h = EXCLUDED.completeness_percent_1h,
mean_rsl_1h = EXCLUDED.mean_rsl_1h,
stddev_rsl_1h = EXCLUDED.stddev_rsl_1h,
last_rsl = EXCLUDED.last_rsl,
last_update = EXCLUDED.last_update;
END;
$$ LANGUAGE plpgsql;

SELECT create_hypertable('cml_data', 'time');

-- Per-user lookup indexes.
Expand Down Expand Up @@ -211,8 +282,10 @@ GRANT SELECT ON cml_data TO webserver_role;
GRANT SELECT ON cml_metadata TO webserver_role;
GRANT SELECT ON cml_stats TO webserver_role;

-- Parser calls update_cml_stats() to upsert per-CML statistics.
-- Parser calls update_cml_stats() to upsert per-CML lifetime statistics.
GRANT EXECUTE ON FUNCTION update_cml_stats(TEXT, TEXT) TO demo_openmrg, demo_orange_cameroun;
-- Parser calls update_cml_stats_windowed() from the background stats timer.
GRANT EXECUTE ON FUNCTION update_cml_stats_windowed(TEXT, TEXT) TO demo_openmrg, demo_orange_cameroun;

-- Row-Level Security on cml_metadata and cml_stats.
-- cml_data is excluded: TimescaleDB does not allow RLS on compressed
Expand Down
95 changes: 95 additions & 0 deletions database/migrations/009_add_windowed_stats.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
-- Migration 009: add windowed cml_stats columns and update_cml_stats_windowed function
--
-- Replaces the expensive full-history refresh (update_cml_stats, called for every CML
-- every 60 seconds) with a cheap windowed refresh that only touches the current
-- uncompressed TimescaleDB chunk (~6 hours of data), reducing stats refresh time from
-- 20-30 s to < 1 s and Postgres CPU from ~100% to < 5%.
--
-- New columns on cml_stats: 6-hour and 1-hour windowed completeness, record counts,
-- mean RSL, and stddev RSL.
-- New function: update_cml_stats_windowed(target_cml_id, target_user_id)
--
-- The parser must be updated alongside this migration:
-- - parser/db_writer.py: replace refresh_stats() call with refresh_windowed_stats()
-- - parser/db_writer.py: wire _update_stats_for_cmls() into write_rawdata() so
-- lifetime columns (total_records, min_rsl, max_rsl, ...) stay current.
--
-- Apply with:
-- docker compose exec -T database psql -U myuser -d mydatabase \
-- < database/migrations/009_add_windowed_stats.sql

ALTER TABLE cml_stats
ADD COLUMN IF NOT EXISTS completeness_percent_6h REAL,
ADD COLUMN IF NOT EXISTS total_records_6h BIGINT,
ADD COLUMN IF NOT EXISTS valid_records_6h BIGINT,
ADD COLUMN IF NOT EXISTS mean_rsl_6h REAL,
ADD COLUMN IF NOT EXISTS stddev_rsl_6h REAL,
ADD COLUMN IF NOT EXISTS completeness_percent_1h REAL,
ADD COLUMN IF NOT EXISTS mean_rsl_1h REAL,
ADD COLUMN IF NOT EXISTS stddev_rsl_1h REAL;

-- update_cml_stats_windowed(target_cml_id, target_user_id)
--
-- Computes 6-hour and 1-hour windowed statistics in a single table scan using
-- FILTER clauses. TimescaleDB chunk exclusion prunes all chunks older than 6 hours
-- at the storage level, so the query only touches the current uncompressed chunk
-- regardless of total dataset size.
CREATE OR REPLACE FUNCTION update_cml_stats_windowed(
target_cml_id TEXT,
target_user_id TEXT DEFAULT 'demo_openmrg'
) RETURNS VOID AS $$
DECLARE
now_ts TIMESTAMPTZ := NOW();
BEGIN
INSERT INTO cml_stats (
cml_id, user_id,
completeness_percent_6h, total_records_6h, valid_records_6h,
mean_rsl_6h, stddev_rsl_6h,
completeness_percent_1h, mean_rsl_1h, stddev_rsl_1h,
last_rsl, last_update
)
SELECT
target_cml_id, target_user_id,
-- 6h window
ROUND(
100.0 * COUNT(rsl) FILTER (WHERE time >= now_ts - INTERVAL '6 hours')
/ NULLIF(COUNT(*) FILTER (WHERE time >= now_ts - INTERVAL '6 hours'), 0),
2),
COUNT(*) FILTER (WHERE time >= now_ts - INTERVAL '6 hours'),
COUNT(rsl) FILTER (WHERE time >= now_ts - INTERVAL '6 hours'),
ROUND(AVG(rsl) FILTER (WHERE time >= now_ts - INTERVAL '6 hours')::numeric, 2),
ROUND(STDDEV(rsl) FILTER (WHERE time >= now_ts - INTERVAL '6 hours')::numeric, 2),
-- 1h window
ROUND(
100.0 * COUNT(rsl) FILTER (WHERE time >= now_ts - INTERVAL '1 hour')
/ NULLIF(COUNT(*) FILTER (WHERE time >= now_ts - INTERVAL '1 hour'), 0),
2),
ROUND(AVG(rsl) FILTER (WHERE time >= now_ts - INTERVAL '1 hour')::numeric, 2),
ROUND(STDDEV(rsl) FILTER (WHERE time >= now_ts - INTERVAL '1 hour')::numeric, 2),
-- last_rsl: unconstrained so we get the true last RSL even if the CML
-- has been quiet for more than 6 hours
(SELECT rsl FROM cml_data
WHERE cml_id = target_cml_id
AND user_id = target_user_id
ORDER BY time DESC LIMIT 1),
now_ts
FROM cml_data
WHERE cml_id = target_cml_id
AND user_id = target_user_id
AND time >= now_ts - INTERVAL '6 hours'
ON CONFLICT (cml_id, user_id) DO UPDATE SET
completeness_percent_6h = EXCLUDED.completeness_percent_6h,
total_records_6h = EXCLUDED.total_records_6h,
valid_records_6h = EXCLUDED.valid_records_6h,
mean_rsl_6h = EXCLUDED.mean_rsl_6h,
stddev_rsl_6h = EXCLUDED.stddev_rsl_6h,
completeness_percent_1h = EXCLUDED.completeness_percent_1h,
mean_rsl_1h = EXCLUDED.mean_rsl_1h,
stddev_rsl_1h = EXCLUDED.stddev_rsl_1h,
last_rsl = EXCLUDED.last_rsl,
last_update = EXCLUDED.last_update;
END;
$$ LANGUAGE plpgsql;

GRANT EXECUTE ON FUNCTION update_cml_stats_windowed(TEXT, TEXT)
TO demo_openmrg, demo_orange_cameroun;
30 changes: 28 additions & 2 deletions parser/db_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,16 @@ def write_rawdata(self, df) -> int:
)
)

# Commit immediately after insert; stats are updated separately
# Update lifetime stats for CMLs in this batch (same transaction as the insert)
cml_ids = df_subset["cml_id"].unique().tolist()
self._update_stats_for_cmls(cml_ids)

# Single commit covers both the data insert and the stats update
try:
if self.conn:
self.conn.commit()
except Exception:
logger.exception("Failed to commit raw data")
logger.exception("Failed to commit raw data and stats")
raise

return rows_written
Expand Down Expand Up @@ -383,3 +387,25 @@ def refresh_stats(self) -> None:
finally:
if cur and not cur.closed:
cur.close()

def refresh_windowed_stats(self) -> None:
"""Recalculate windowed (6h/1h) cml_stats for all known CMLs.
Cheap: only touches the most-recent uncompressed TimescaleDB chunk."""
cur = self.conn.cursor()
try:
cur.execute(
"SELECT update_cml_stats_windowed(cml_id::text, %s) "
"FROM (SELECT DISTINCT cml_id FROM cml_metadata WHERE user_id = %s) t",
(self.user_id, self.user_id),
)
self.conn.commit()
logger.info("Refreshed windowed cml_stats for all CMLs")
except Exception:
try:
self.conn.rollback()
except Exception:
pass
logger.exception("Failed to refresh windowed cml_stats")
finally:
if cur and not cur.closed:
cur.close()
8 changes: 4 additions & 4 deletions parser/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,14 @@ def stats_loop():
# Run immediately on startup so Grafana has fresh stats without
# waiting a full interval after the backlog is processed.
try:
stats_db.refresh_stats()
stats_db.refresh_windowed_stats()
except Exception:
logger.exception("Stats thread: initial refresh_stats failed")
logger.exception("Stats thread: initial refresh_windowed_stats failed")
while not stop_event.wait(Config.STATS_REFRESH_INTERVAL):
try:
stats_db.refresh_stats()
stats_db.refresh_windowed_stats()
except Exception:
logger.exception("Stats thread: refresh_stats failed")
logger.exception("Stats thread: refresh_windowed_stats failed")
stats_db.close()

stats_thread = threading.Thread(target=stats_loop, daemon=True, name="stats-refresh")
Expand Down
27 changes: 27 additions & 0 deletions parser/tests/test_db_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,33 @@ def test__update_stats_for_cmls_rollback_on_error(mock_connection):
mock_connection.rollback.assert_called()


def test_refresh_windowed_stats_commits_on_success(mock_connection):
"""refresh_windowed_stats calls update_cml_stats_windowed and commits."""
writer = DBWriter("postgresql://test", user_id="demo_openmrg")
writer.conn = mock_connection

writer.refresh_windowed_stats()

cur = mock_connection.cursor.return_value
cur.execute.assert_called_once()
sql = cur.execute.call_args.args[0]
assert "update_cml_stats_windowed" in sql
mock_connection.commit.assert_called_once()


def test_refresh_windowed_stats_rollback_on_error(mock_connection):
"""refresh_windowed_stats rolls back and swallows the exception on DB error."""
writer = DBWriter("postgresql://test", user_id="demo_openmrg")
writer.conn = mock_connection

mock_connection.cursor.return_value.execute.side_effect = Exception("DB error")

writer.refresh_windowed_stats() # must not raise

mock_connection.rollback.assert_called_once()
mock_connection.commit.assert_not_called()


# ---------------------------------------------------------------------------
# log_file_event
# ---------------------------------------------------------------------------
Expand Down
85 changes: 85 additions & 0 deletions parser/tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,91 @@ def test_metadata_exception_is_swallowed(
mock_batch.assert_called_once()


def _run_stats_loop(tmp_path, mock_event, *, configure_db=None):
"""Run main(), then invoke the captured stats_loop closure under the same patches.

Calling stats_loop() inside the with-block is critical: if called after
the block exits, DBWriter and Config are unpatched, connect() fails against
a real DB, and the retry loop hangs forever.

Returns the MagicMock instance used as every DBWriter in the test.
"""
captured = {}

class CapturingThread(threading.Thread):
def __init__(self, *args, target=None, name=None, **kwargs):
super().__init__(*args, target=target, name=name, **kwargs)
if name == "stats-refresh":
captured["stats_loop"] = target

def start(self):
pass # don't spawn real threads in unit tests

mock_db = MagicMock()

with patch("parser.main.threading.Thread", CapturingThread), \
patch("parser.main.threading.Event", return_value=mock_event), \
patch("parser.main.FileManager"), \
patch("parser.main.FileWatcher"), \
patch("parser.main.DBWriter", return_value=mock_db), \
patch("parser.main.Config.PARSER_ENABLED", True), \
patch("parser.main.Config.PROCESS_EXISTING_ON_STARTUP", False), \
patch("parser.main.Config.DATABASE_URL", "postgresql://test"), \
patch("parser.main.Config.USER_ID", "test_user"), \
patch("parser.main.Config.STATS_REFRESH_INTERVAL", 60), \
patch("parser.main.Config.INCOMING_DIR", tmp_path), \
patch("parser.main.Config.ARCHIVED_DIR", tmp_path), \
patch("parser.main.Config.QUARANTINE_DIR", tmp_path), \
patch("parser.main.time.sleep", side_effect=KeyboardInterrupt):
try:
main()
except (KeyboardInterrupt, SystemExit):
pass
if configure_db is not None:
configure_db(mock_db)
if "stats_loop" in captured:
captured["stats_loop"]()

return mock_db


def test_stats_loop_calls_refresh_windowed_stats_on_startup(tmp_path):
"""stats_loop calls refresh_windowed_stats once before entering the timer loop."""
mock_event = MagicMock()
mock_event.is_set.return_value = False # connect loop: enter → connect succeeds → break
mock_event.wait.return_value = True # timer wait → loop exits immediately

mock_db = _run_stats_loop(tmp_path, mock_event)

mock_db.refresh_windowed_stats.assert_called_once()
mock_db.close.assert_called() # called by stats_loop (possibly also by main cleanup)


def test_stats_loop_initial_refresh_error_is_swallowed(tmp_path):
"""stats_loop swallows exceptions raised by the startup refresh_windowed_stats call."""
mock_event = MagicMock()
mock_event.is_set.return_value = False
mock_event.wait.return_value = True

def configure(db):
db.refresh_windowed_stats.side_effect = Exception("stats error")

_run_stats_loop(tmp_path, mock_event, configure_db=configure) # must not raise


def test_stats_loop_calls_refresh_windowed_stats_in_timer_loop(tmp_path):
"""stats_loop calls refresh_windowed_stats again on each timer tick."""
mock_event = MagicMock()
mock_event.is_set.return_value = False
# First timer wait → False (enter loop body), second → True (exit)
mock_event.wait.side_effect = [False, True]

mock_db = _run_stats_loop(tmp_path, mock_event)

# startup call + 1 timer-loop call = 2 total
assert mock_db.refresh_windowed_stats.call_count == 2


def test_stats_loop_creates_dbwriter_with_config_user_id(tmp_path):
"""stats_loop must pass user_id=Config.USER_ID to DBWriter.

Expand Down
1 change: 1 addition & 0 deletions scripts/generate_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ def generate_users_json(users: list[dict], existing_json: dict) -> dict:
GRANT SELECT ON cml_data_secure TO {user_id};
GRANT SELECT ON cml_data_1h_secure TO {user_id};
GRANT EXECUTE ON FUNCTION update_cml_stats(TEXT, TEXT) TO {user_id};
GRANT EXECUTE ON FUNCTION update_cml_stats_windowed(TEXT, TEXT) TO {user_id};

-- file_processing_log: parser INSERTs a row for every processed file;
-- webserver_role only needs SELECT.
Expand Down
Loading
Loading