Skip to content

Commit b26d718

Browse files
committed
O2DPG-MC: Adding more utilities for pipeline metric aggregation/processing
* possibility to extract aggegrated metrics from existing MC productions on the GRID * little server prototype that could aggregate metrics via a REST API in real time * small jq templates to have a top-N consumer summary
1 parent 83b2c5a commit b26d718

File tree

4 files changed

+350
-0
lines changed

4 files changed

+350
-0
lines changed

MC/utils/metrics/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Set of scripts and utilities for the creation and processing
2+
of workflow metrics (CPU, memory, walltime) for tasks executed
3+
in o2dpg_workflow_runner.py and derived from its pipeline_metric*.log monitoring.
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
#!/usr/bin/env python3
2+
3+
# A python tool, that calculates mean O2DPG workflow metrics by
4+
# harvesting from data from pipeline_metric files from the GRID (for a given lpm production tag).
5+
6+
import json
7+
import subprocess
8+
from collections import defaultdict
9+
import re
10+
import os
11+
import argparse
12+
import random
13+
from pathlib import Path
14+
import sys
15+
import time
16+
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
17+
18+
# add the parent directory of the current file to sys.path to find the o2dpg_sim_metric
19+
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
20+
from o2dpg_sim_metrics import json_stat_impl
21+
22+
def alien_find(path, pattern="*", logging=False):
23+
cmd = ["alien.py", "find", path, pattern]
24+
if logging:
25+
print (f"Performing {cmd}")
26+
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
27+
return [line.strip() for line in result.stdout.splitlines() if line.strip()]
28+
29+
30+
def alien_cp(alien_path, local_path, parent=None, fatal=False, logging=False):
31+
cmd = ["alien.py", "cp"]
32+
if parent != None:
33+
cmd = cmd + ["-parent", f"{parent}"]
34+
cmd = cmd + [f"alien://{alien_path}", f"file://{local_path}"]
35+
if logging:
36+
print (f"Performing {cmd}")
37+
try:
38+
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
39+
except subprocess.CalledProcessError as e:
40+
pass
41+
42+
def alien_cp_inputfile(inputfile, logging=False):
43+
cmd = ["alien.py", "cp", "-input", f"{inputfile}"]
44+
if logging:
45+
print (f"Performing {cmd}")
46+
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
47+
48+
def parse_workflow_path(path, prod_tag):
49+
parts = path.strip("/").split("/")
50+
try:
51+
idx = parts.index(prod_tag)
52+
except ValueError:
53+
return None
54+
55+
after = parts[idx + 1 :]
56+
if len(after) < 2:
57+
return None
58+
59+
if after[0].isdigit() and len(after[0]) == 1:
60+
cycle = int(after[0])
61+
run_number = int(after[1])
62+
split = after[2]
63+
else:
64+
cycle = None
65+
run_number = int(after[0])
66+
split = after[1]
67+
68+
return cycle, run_number, split
69+
70+
71+
def calculate_statistics(selecteddirs, prod_tag, run_number, batchcopy = False):
72+
"""
73+
downloads the metrics files and calculates aggregates statistics
74+
"""
75+
targetdir = f"/tmp/o2dpg_metrics_harvester/{prod_tag}/{run_number}"
76+
if not os.path.exists(targetdir):
77+
os.makedirs(targetdir)
78+
79+
start=time.time()
80+
# determine target dir based on tag and run_number
81+
if batchcopy == True:
82+
# make an inputfile
83+
inputfile = f"{targetdir}/cp_input.list"
84+
with open(inputfile,'w') as f:
85+
for dir in selecteddirs:
86+
path = Path(dir)
87+
# Get the last 1 components --> mimics -parent which does not work with inputlists
88+
last_N = Path(*path.parts[-1:])
89+
f.write(f"{dir}/pipeline_metr* file:{targetdir}/{last_N}\n")
90+
91+
# copy with the input-file
92+
alien_cp_inputfile(inputfile, logging=True)
93+
94+
else:
95+
for dir in selecteddirs:
96+
# avoid copy if we can !
97+
# we need to keep 2-top level dirs
98+
alien_cp(f"{dir}/pipeline_metr*", targetdir, parent=1)
99+
100+
end=time.time()
101+
print(f"Copy took {end-start:.4f} seconds")
102+
103+
# construct the list of all inputfiles
104+
input_files = [str(p) for p in Path(targetdir).rglob('pipeline_metr*')]
105+
print(input_files)
106+
107+
# calculate the stats with all the files in targetdir
108+
outputfilename=f"{targetdir}/merged_metrics.json"
109+
meta_info = {"prod-tag" : prod_tag, "run-number" : run_number}
110+
json_stat_impl(input_files, outputfilename, meta_info)
111+
112+
import os
113+
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
114+
115+
def treat_parallel(func, data, use_threads=False, max_workers=None):
116+
"""
117+
Apply `func` to each element of `data` in parallel.
118+
119+
Parameters
120+
----------
121+
func : callable
122+
The function to apply to each element.
123+
data : iterable
124+
The data to process.
125+
use_threads : bool, default=False
126+
If True, use threads (good for I/O-bound tasks).
127+
If False, use processes (good for CPU-bound tasks).
128+
max_workers : int, optional
129+
Number of workers to use. Defaults to number of CPUs for processes.
130+
131+
Returns
132+
-------
133+
list
134+
The results in the same order as `data`.
135+
"""
136+
if max_workers is None:
137+
max_workers = os.cpu_count() if not use_threads else min(32, os.cpu_count() * 5)
138+
139+
Executor = ThreadPoolExecutor if use_threads else ProcessPoolExecutor
140+
141+
# --- Use map to preserve order ---
142+
with Executor(max_workers=max_workers) as executor:
143+
results = list(executor.map(func, data))
144+
145+
return results
146+
147+
def treat_one_run(data_element):
148+
"""
149+
The final worker function to execute for each run.
150+
Expects it's input parameters in a list to work well with Thread/ProcessExecutor and treat_parallel above.
151+
152+
data_element should be a tuple, where
153+
index 0 --> the run_number
154+
index 1 --> a list of tuples(cycle, split, directory)
155+
index 2 --> production tag
156+
index 3 --> sample_size
157+
"""
158+
run_number, candidates = data_element[0], data_element[1]
159+
prod_tag = data_element[2]
160+
sample_size = data_element[3]
161+
universe = [ w[2] for w in candidates ]
162+
selected_dirs = random.sample(universe, min(len(universe), sample_size))
163+
print (f"For {run_number} selected {selected_dirs}")
164+
calculate_statistics(selected_dirs, prod_tag, run_number, batchcopy=True)
165+
166+
167+
def process_prod_tag(prod_tag, year="2025", ccdb_url=None, username=None, overwrite=False, samplesize=20):
168+
base_path = f"/alice/sim/{year}/{prod_tag}"
169+
170+
pipelinemetric_files = alien_find(base_path, "pipeline_metric*")
171+
172+
# exclude some unnecessary paths
173+
pipelinemetric_files = [
174+
zf for zf in pipelinemetric_files
175+
if "/AOD/" not in zf and "/QC/" not in zf and "/TimeseriesTPCmerging/" not in zf and "/Stage" not in zf
176+
]
177+
print (f"Found {len(pipelinemetric_files)} pipeline metric files")
178+
179+
# directories containing workflow.json
180+
workflow_dirs = {os.path.dirname(wf) for wf in pipelinemetric_files}
181+
print (f"Found {len(workflow_dirs)} workflow dirs")
182+
183+
# Step 2: group by run_number
184+
runs = defaultdict(list)
185+
for dir in workflow_dirs:
186+
parsed = parse_workflow_path(dir, prod_tag)
187+
if parsed is None:
188+
continue
189+
cycle, run_number, split = parsed
190+
runs[run_number].append((cycle, split, dir))
191+
print(f"Found {len(runs)} run numbers")
192+
193+
# Step 3: for each run_number, pick samplesize files for the final calculation
194+
# for run_number, candidates in sorted(runs.items()):
195+
# universe = [ w[2] for w in candidates ]
196+
# selected_dirs = random.sample(universe, min(len(universe), samplesize))
197+
# print (f"For {run_number} selected {selected_dirs}")
198+
199+
# # calculate merged statistics from the sample
200+
# calculate_statistics(selected_dirs, prod_tag, run_number, batchcopy=False)
201+
202+
data = [ (d[0], d[1], prod_tag, samplesize) for d in sorted(runs.items()) ]
203+
do_parallel = True
204+
if do_parallel == True:
205+
treat_parallel(treat_one_run, data, use_threads=False, max_workers=8)
206+
else:
207+
for data_element in data:
208+
treat_one_run(data_element)
209+
210+
def main():
211+
parser = argparse.ArgumentParser(
212+
description="Harvest MC metrics from AlienGRID; aggregate; and publish to CCDB"
213+
)
214+
parser.add_argument("--prod_tag", required=True, help="Production tag (e.g. prod2025a)")
215+
parser.add_argument("--ccdb", required=False, default="https://alice-ccdb.cern.ch", help="CCDB server URL")
216+
parser.add_argument("--username", required=False, help="GRID username (needs appropriate AliEn token initialized)")
217+
parser.add_argument("--year", default="2025", help="Production year (default: 2025)")
218+
parser.add_argument("--overwrite", action="store_true", help="Overwrite existing entries")
219+
args = parser.parse_args()
220+
221+
process_prod_tag(args.prod_tag, year=args.year, ccdb_url=args.ccdb, username=args.username, overwrite=args.overwrite)
222+
223+
if __name__ == "__main__":
224+
main()

MC/utils/metrics/server.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
2+
# A simple HTTPS server with an endpoint on which
3+
# callers can inject O2DPG_workflow_runner stat json metrics.
4+
# The service is supposed to run as aggregator of these individual metrics
5+
# and to provide high-quality merged statistics on resource estimates. These
6+
# estimates can then be used to improve the scheduling of o2dpg_workflow_runner workflows.
7+
8+
from fastapi import FastAPI
9+
from pydantic import BaseModel
10+
from asyncio import Queue, create_task
11+
import asyncio, json, time
12+
import aiofiles
13+
from fastapi import Request
14+
15+
import sys, os
16+
17+
# add the parent directory of the current file to sys.path to find the o2dpg_sim_metric
18+
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
19+
from o2dpg_sim_metrics import merge_stats_into
20+
21+
app = FastAPI()
22+
queue = Queue()
23+
agg = {} # {(metric_name): {"sum": 0.0, "count": 0}}
24+
flush_interval = 5 # seconds
25+
outfile = "metrics.json"
26+
27+
# Global state
28+
agg_by_tag = {} # { production_tag: cached_result }
29+
30+
31+
@app.post("/metric")
32+
async def receive_metric(request : Request):
33+
# just enqueue, return quickly
34+
payload = await request.json()
35+
await queue.put(payload)
36+
return {"status": "ok"}
37+
38+
def init_cache():
39+
"""
40+
Initializes the cache of results from files
41+
"""
42+
pass
43+
44+
def flush_to_disc(tag):
45+
"""
46+
flushes result for tag to disc
47+
"""
48+
metrics = agg_by_tag.get(tag, {})
49+
filename = f"aggr_metrics_tag_{tag}.json"
50+
with open(filename, 'w') as f:
51+
json.dump(metrics, f)
52+
53+
async def worker():
54+
"""
55+
Function performing the metrics aggregation
56+
"""
57+
while True:
58+
payload = await queue.get()
59+
60+
# Extract production-tag from metadata
61+
meta = payload.get("meta-data", {})
62+
tag = meta.get("production-tag", "default")
63+
64+
print (f"Worker is treating payload for tag {tag}")
65+
66+
current = agg_by_tag.get(tag, {}) # fetch existing aggregate
67+
updated = merge_stats_into([payload, current], None, meta) # merge new payload with cached
68+
agg_by_tag[tag] = updated # store back in cache
69+
70+
flush_to_disc(tag)
71+
72+
queue.task_done()
73+
74+
75+
async def flusher():
76+
while True:
77+
await asyncio.sleep(flush_interval)
78+
snapshot = {
79+
k: (v["sum"] / v["count"]) if v["count"] else 0
80+
for k, v in agg.items()
81+
}
82+
async with aiofiles.open(outfile, "w") as f:
83+
await f.write(json.dumps(snapshot, indent=2))
84+
print(f"Flushed {len(snapshot)} metrics at {time.ctime()}")
85+
86+
87+
@app.on_event("startup")
88+
async def startup():
89+
# start multiple workers for parallelism
90+
for _ in range(8): # one per CPU core
91+
create_task(worker())
92+
# create_task(flusher())

MC/utils/metrics/top_metrics.sh

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#!/usr/bin/env bash
2+
3+
# a simple top-N metrics query for a couple of examples using jq
4+
# assumes JSON produced by o2dpg_sim_metrics.py
5+
6+
# top CPU-time consumers
7+
jq 'to_entries
8+
| map(select(.value.cpu?.mean != null and .value.lifetime?.mean != null))
9+
| sort_by(-(.value.cpu.mean * .value.lifetime.mean))
10+
| .[:5]
11+
| map({name: .key,
12+
cpu_mean: .value.cpu.mean,
13+
lifetime_mean: .value.lifetime.mean,
14+
product: (.value.cpu.mean * .value.lifetime.mean)})' merged_metrics.json
15+
16+
# top mem consumers
17+
18+
19+
# top cpu consumers
20+
jq 'to_entries
21+
| map(select(.value.cpu?.mean != null))
22+
| sort_by(-.value.cpu.mean)
23+
| .[:5]
24+
| map({name: .key, cpu_mean: .value.cpu.mean})'
25+
26+
# top walltime consumers
27+
jq 'to_entries
28+
| map(select(.value.lifetime?.mean != null))
29+
| sort_by(-.value.lifetime.mean)
30+
| .[:5]
31+
| map({name: .key, lifetime_mean: .value.lifetime.mean})'

0 commit comments

Comments
 (0)