Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 239 additions & 4 deletions MC/utils/o2dpg_sim_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import matplotlib
import json
import numpy as np
import math
import pandas as pd

############################################################################
Expand Down Expand Up @@ -744,6 +745,108 @@ def extract_resources(pipelines):
# Collect all metrics we got, here we want to have the median from all the iterations
return [Resources(p) for p in pipelines]

def merge_stats(elementary, running):
"""
Merge an incoming elementary JSON into a running stats structure.
Also maintains running std using Welford's method.

Each metric stores:
mean, std, M2, min, max, count
"""
if not elementary:
return running

n_new_total = int(elementary.get("count", 1))
running["count"] = running.get("count", 0) + n_new_total

for name, metrics in elementary.items():
if name == "count":
continue

if name not in running:
running[name] = {"count": 0}

# existing count for this name
n_old_name = running[name].get("count", 0)

for metric, vals in metrics.items():
if not isinstance(vals, dict):
continue

if metric not in running[name]:
running[name][metric] = {
"min": vals.get("min"),
"max": vals.get("max"),
"mean": vals.get("mean"),
"std": 0.0,
"M2": 0.0,
"count": n_new_total
}
continue

rmetric = running[name][metric]
n_old = rmetric.get("count", 0)
n_new = n_new_total

# update min / max
e_min = vals.get("min")
e_max = vals.get("max")
if e_min is not None:
rmetric["min"] = e_min if rmetric["min"] is None else min(rmetric["min"], e_min)
if e_max is not None:
rmetric["max"] = e_max if rmetric["max"] is None else max(rmetric["max"], e_max)

# combine means & M2
mean_a = rmetric.get("mean")
mean_b = vals.get("mean")

# If either mean is missing, use the one that exists
if mean_a is None and mean_b is None:
# Nothing to do
continue
elif mean_a is None:
rmetric["mean"] = mean_b
rmetric["M2"] = 0.0
rmetric["count"] = n_new
elif mean_b is None:
# keep existing stats
rmetric["mean"] = mean_a
rmetric["M2"] = rmetric.get("M2", 0.0)
rmetric["count"] = n_old
else:
# both defined → do weighted merge
delta = mean_b - mean_a
new_count = n_old + n_new
new_mean = mean_a + delta * (n_new / new_count)
new_M2 = rmetric.get("M2", 0.0) + 0.0 + (delta**2) * (n_old * n_new / new_count)

rmetric["mean"] = new_mean
rmetric["M2"] = new_M2
rmetric["count"] = new_count

# update std from M2
c = rmetric["count"]
rmetric["std"] = math.sqrt(rmetric["M2"] / c) if c > 1 else 0.0

running[name]["count"] = n_old_name + n_new_total

# round mean and std for readability
for name, metrics in running.items():
if name == "count":
continue
for metric, vals in metrics.items():
if not isinstance(vals, dict):
continue
if "mean" in vals:
vals["mean"] = r3(vals["mean"])
if "std" in vals:
vals["std"] = r3(vals["std"])
if "min" in vals:
vals["min"] = r3(vals["min"])
if "max" in vals:
vals["max"] = r3(vals["max"])

return running

def print_statistics(resource_object):
"""
Expand Down Expand Up @@ -787,9 +890,105 @@ def print_statistics(resource_object):
print(f" {comp:<20s} {mem:10.2f} MB")

#(d) max disc consumption
print ("\nMax-DISC usage (MB): ", dframe['disc'].max())
print ("Mean-DISC usage (MB): ", dframe['disc'].mean())
print ("---> ")
if 'disc' in dframe:
print ("\nMax-DISC usage (MB): ", dframe['disc'].max())
print ("Mean-DISC usage (MB): ", dframe['disc'].mean())
print ("---> ")

def r3(x):
"""Round to 3 decimals, return None for None/NaN."""
if x is None:
return None
try:
xf = float(x)
except Exception:
return None
if math.isnan(xf):
return None
return round(xf, 3)

def produce_json_stat(resource_object):
print ("<--- Producing resource json from file ", resource_object.pipeline_file)
dframe = resource_object.df
meta = resource_object.meta

# also write json summary; This is a file that can be used
# to adjust the resource estimates in o2dpg_workflow_runner.py
#
resource_json = {}
# group by 'name' and compute all needed stats for each metric
stats = (
dframe
.groupby('name')
.agg({
'pss': ['min', 'max', 'mean'],
'uss': ['min', 'max', 'mean'],
'cpu': ['min', 'max', 'mean']
})
)

# turn the multi-level columns into flat names
stats.columns = [f"{col[0]}_{col[1]}" for col in stats.columns]
stats = stats.reset_index()

# ----- compute lifetime ~ walltime per (timeframe, name) -----
# ------------------------------------------------
# Filter out unrealistic timeframes (nice == 19) because it's not the realistic runtime
df_nice_filtered = dframe[dframe['nice'] != 19].copy()

# the calculates of mean runtime should be averaged over timeframes
lifetime_per_tf = (
df_nice_filtered
.groupby(['timeframe', 'name'])['iter']
.agg(lambda x: x.max() - x.min() + 1) # +1 to include both ends
.reset_index(name='lifetime')
)

# now average over timeframes for each name
mean_lifetime = (
lifetime_per_tf
.groupby('name')['lifetime']
.mean()
)
max_lifetime = (
lifetime_per_tf
.groupby('name')['lifetime']
.max()
)
min_lifetime = (
lifetime_per_tf
.groupby('name')['lifetime']
.max()
)

resource_json["count"] = 1 # basic sample size

# convert to nested dictionary
for _, row in stats.iterrows():
name = row['name']
resource_json[name] = {
'pss': {
'min': r3(row['pss_min']),
'max': r3(row['pss_max']),
'mean': r3(row['pss_mean'])
},
'uss': {
'min': r3(row['uss_min']),
'max': r3(row['uss_max']),
'mean': r3(row['uss_mean'])
},
'cpu': {
'min': r3(row['cpu_min']),
'max': r3(row['cpu_max']),
'mean': r3(row['cpu_mean'])
},
'lifetime': {
'min' : r3(float(min_lifetime.get(name, np.nan))),
'max' : r3(float(max_lifetime.get(name, np.nan))),
'mean' : r3(float(mean_lifetime.get(name, np.nan)))
}
}
return resource_json

def stat(args):
"""
Expand All @@ -801,6 +1000,32 @@ def stat(args):
print_statistics(res)


def merge_stats_into(list_of_json_stats, outputfile):
running = {}
# read all the inputs
for inp_json in list_of_json_stats:
running = merge_stats(inp_json, running)

# now write out the result into the output file
if running:
with open(outputfile, 'w') as f:
json.dump(running, f)


def json_stat(args):
resources = extract_resources(args.pipelines)
all_stats = [produce_json_stat(res) for res in resources]
merge_stats_into(all_stats, args.output)


def merge_json_stats(args):
all_stats = []
for inp in args.inputs:
# load the json as a dict
with open(inp,'r') as f:
all_stats.append(json.load(f))
merge_stats_into(all_stats, args.output)

def history(args):
"""
Entrypoint for history
Expand Down Expand Up @@ -1048,7 +1273,17 @@ def main():
stat_parser.set_defaults(func=stat)
stat_parser.add_argument("-p", "--pipelines", nargs="*", help="pipeline_metric files from o2_dpg_workflow_runner", required=True)

plot_parser = sub_parsers.add_parser("history", help="Plot (multiple) metrcis from extracted metrics JSON file(s)")
json_stat_parser = sub_parsers.add_parser("json-stat", help="Produce basic json stat (compatible with o2dog_workflow_runner injection)")
json_stat_parser.set_defaults(func=json_stat)
json_stat_parser.add_argument("-p", "--pipelines", nargs="*", help="Pipeline_metric files from o2_dpg_workflow_runner; Merges information", required=True)
json_stat_parser.add_argument("-o", "--output", type=str, help="Output json filename", required=True)

merge_stat_parser = sub_parsers.add_parser("merge-json-stats", help="Merge information from json-stats into an aggregated stat")
merge_stat_parser.set_defaults(func=merge_json_stats)
merge_stat_parser.add_argument("-i", "--inputs", nargs="*", help="List of incoming/input json stat files", required=True)
merge_stat_parser.add_argument("-o", "--output", type=str, help="Output json filename", required=True)

plot_parser = sub_parsers.add_parser("history", help="Plot (multiple) metrics from extracted metrics JSON file(s)")
plot_parser.set_defaults(func=history)
plot_parser.add_argument("-p", "--pipelines", nargs="*", help="pipeline_metric files from o2_dpg_workflow_runner", required=True)
plot_parser.add_argument("--output", help="output directory", default="resource_history")
Expand Down