Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions DHIS2/fix_visualizations_order/download_and_fix_wrong_fix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
DHIS2 visualization fetch + period-order fixer (compact)

- Reads UIDs from a CSV (first column).
- Fetches each visualization via /api/visualizations/{uid}.json?fields=*,rows[*].
- Fixes 'pe' (period) rows by sorting 4-digit years ascending while keeping non-year tokens at the end.
- Writes two pretty JSON files:
* original_output{suffix}.json (as downloaded)
* fixed_output{suffix}.json (after normalization)
"""

import csv
import json
import re
import sys
from typing import Any, Dict, List, Optional
import requests
from pathlib import Path
from copy import deepcopy

# =========================
# CONFIG
# =========================

suffix = "_dev"
UIDS_CSV_PATH = f"errors{suffix}.csv"

OUTPUT_JSON_MOD = f"fixed_output{suffix}.json" # fixed
OUTPUT_JSON_ORIG = f"original_output{suffix}.json" # as-is

BASE_URL = "https://server/dhis2"

# Hardcoded JSESSIONID
JSESSIONID_VALUE = ""

REQUEST_TIMEOUT = 30
NUMERIC_YEAR_RE = re.compile(r"^\d{4}$")


def build_session() -> requests.Session:
s = requests.Session()
s.headers.update({
"Accept": "application/json",
"User-Agent": "simple-dhis2-fixer/1.0",
})
s.cookies.set("JSESSIONID", JSESSIONID_VALUE)
return s


def load_uids(csv_path: str) -> List[str]:
uids: List[str] = []
p = Path(csv_path)
if not p.is_file():
print(f"[ERROR] UIDs CSV not found: {csv_path}", file=sys.stderr)
sys.exit(1)

with p.open("r", encoding="utf-8") as f:
reader = csv.reader(f)
for i, row in enumerate(reader):
if not row:
continue
uid = row[0].strip()
if not uid:
continue
if i == 0 and uid.lower() in {"uid", "uids", "id"}:
continue
uids.append(uid)
return uids


def fetch_visualization(session: requests.Session, base_url: str, uid: str, timeout: int) -> Optional[Dict[str, Any]]:
url = f"{base_url.rstrip('/')}/api/visualizations/{uid}.json?fields=*,rows[*]"
print(f"📡 GET: {url}")
try:
r = session.get(url, timeout=timeout)
except requests.RequestException as e:
print(f"[ERROR] {uid}: network error -> {e}", file=sys.stderr)
return None

if r.status_code != 200:
print(f"[ERROR] {uid}: HTTP {r.status_code} -> {r.text[:200]}", file=sys.stderr)
return None

try:
return r.json()
except ValueError:
print(f"[ERROR] {uid}: response is not JSON. Snippet: {r.text[:200]}", file=sys.stderr)
return None


def reorder_years(ids: List[str]) -> List[str]:
numeric = [x for x in ids if NUMERIC_YEAR_RE.match(x)]
non_numeric = [x for x in ids if not NUMERIC_YEAR_RE.match(x)]
return sorted(numeric, key=lambda x: int(x)) + non_numeric


def fix_pe_rows(viz: Dict[str, Any]) -> bool:
rows = viz.get("rows")
if not isinstance(rows, list):
return False

changed = False
for row in rows:
if isinstance(row, dict) and row.get("dimension") == "pe":
items = row.get("items", [])
if not isinstance(items, list) or not items:
continue
ids = [str(it.get("id")) for it in items if isinstance(it, dict) and "id" in it]
if not ids:
continue
expected = reorder_years(ids)
if ids != expected:
original_map = {str(it.get("id")): it for it in items if "id" in it}
row["items"] = [original_map.get(i, {"id": i}) for i in expected]
changed = True
return changed


def main():
session = build_session()
uids = load_uids(UIDS_CSV_PATH)
print(f"Processing {len(uids)} UIDs...\n")

originals: List[Dict[str, Any]] = []
modifieds: List[Dict[str, Any]] = []
changed_count = 0
missing_count = 0

for uid in uids:
viz = fetch_visualization(session, BASE_URL, uid, REQUEST_TIMEOUT)
if viz is None:
missing_count += 1
continue

originals.append(deepcopy(viz))

fixed_viz = deepcopy(viz)
if fix_pe_rows(fixed_viz):
changed_count += 1
modifieds.append(fixed_viz)

with open(OUTPUT_JSON_ORIG, "w", encoding="utf-8") as f:
json.dump({"count": len(originals), "visualizations": originals}, f, ensure_ascii=False, indent=2)

with open(OUTPUT_JSON_MOD, "w", encoding="utf-8") as f:
json.dump({"count": len(modifieds), "visualizations": modifieds}, f, ensure_ascii=False, indent=2)

print("\nSummary")
print(f" Total UIDs : {len(uids)}")
print(f" Visualizations OK : {len(modifieds) - changed_count}")
print(f" Reordered : {changed_count}")
print(f" Not downloaded (err) : {missing_count}")
print(f"\nOriginal output : {OUTPUT_JSON_ORIG}")
print(f"Fixed output : {OUTPUT_JSON_MOD}")


if __name__ == "__main__":
main()
125 changes: 125 additions & 0 deletions DHIS2/fix_visualizations_order/identify_wrong_vis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
DHIS2 period-order checker (minimal version)

What it does
------------
- Loads visualizations from: visualizations_check{SUFIX}.json (in the same folder as this script)
- For each visualization, finds the 'pe' dimension under 'rows' and reads its item IDs
- Considers these valid:
* Strict ascending years (e.g., 2013 2014 2015)
* Strict descending years (e.g., 2018 2017 2016)
Non-year tokens (e.g., LAST_5_YEARS) are kept at the end in their original order
- Flags anything else as wrong_order
- Writes a CSV summary to: output{SUFIX}.csv

Notes
-----
- Input is produced manually (e.g. via):
https://server.com/dhis2/api/visualizations?fields=rows[*],lastUpdated,created,lastUpdatedBy,id,name&paging=false
- Keep one file per instance by changing SUFFIX (e.g., "_dev", "_prod") and naming your input:
visualizations_check_dev.json, visualizations_check_prod.json, etc.
"""

import csv, json, os, re, sys

SUFIX = "_prod"
YEAR = re.compile(r"^\d{4}$")

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
INPUT_FILE = os.path.join(BASE_DIR, f"visualizations_check{SUFIX}.json")
OUTPUT_FILE = os.path.join(BASE_DIR, f"output{SUFIX}.csv")

def load_visualizations(path):
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict) and isinstance(data.get("visualizations"), list):
return data["visualizations"]
if isinstance(data, list):
return data
if isinstance(data, dict):
return [data]
raise ValueError("Unsupported JSON structure")

def get_pe_ids(v):
rows = v.get("rows", [])
for row in rows if isinstance(rows, list) else []:
if isinstance(row, dict) and row.get("dimension") == "pe":
items = row.get("items", [])
return [str(it["id"]) for it in items if isinstance(it, dict) and "id" in it]
return None

def sort_years(ids, reverse=False):
nums = [x for x in ids if YEAR.match(x)]
rest = [x for x in ids if not YEAR.match(x)]
nums_sorted = sorted(nums, key=lambda x: int(x), reverse=reverse)
return nums_sorted + rest

def analyze(v):
ids = get_pe_ids(v)
if ids is None:
return ("no_pe_dimension", [], [])
asc = sort_years(ids, reverse=False)
desc = sort_years(ids, reverse=True)
if ids == asc:
return ("ok", ids, asc)
if ids == desc:
return ("ok_desc", ids, asc) # keep asc as the reference/expected
return ("wrong_order", ids, asc)

def main():
try:
print(f"Loading: {INPUT_FILE}")
visualizations = load_visualizations(INPUT_FILE)
except Exception as e:
print(f"Error loading JSON: {e}", file=sys.stderr)
sys.exit(1)

results = []
total = len(visualizations)
wrong = 0
no_pe = 0

for v in visualizations:
vid = v.get("id", "")
name = v.get("name", "")
created = v.get("created", "")
last_updated = v.get("lastUpdated", "")

status, current, expected_asc = analyze(v)
if status == "wrong_order":
wrong += 1
print(f"[WRONG ORDER] {name} ({vid}) | current={current} | expected(asc)={expected_asc}")
elif status == "no_pe_dimension":
no_pe += 1

results.append({
"id": vid,
"created": created,
"lastUpdated": last_updated,
"name": name,
"status": status,
"current_order": " ".join(current),
"expected_order": " ".join(expected_asc),
})

print("\nSummary")
print(f" Total : {total}")
print(f" Without 'pe' : {no_pe}")
print(f" Wrong order : {wrong}")
print(f" Valid (with 'pe') : {total - wrong - no_pe}")

with open(OUTPUT_FILE, "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(
f,
fieldnames=["id", "created", "lastUpdated", "name", "status", "current_order", "expected_order"]
)
writer.writeheader()
writer.writerows(results)

print(f"\nCSV written: {OUTPUT_FILE}")

if __name__ == "__main__":
main()