Skip to content

Commit ad4bb02

Browse files
committed
o2dpg_sim_metrics: Introduce JSON statistics and merging of statistics
1 parent e0fc230 commit ad4bb02

File tree

1 file changed

+239
-4
lines changed

1 file changed

+239
-4
lines changed

MC/utils/o2dpg_sim_metrics.py

Lines changed: 239 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import matplotlib
1313
import json
1414
import numpy as np
15+
import math
1516
import pandas as pd
1617

1718
############################################################################
@@ -744,6 +745,108 @@ def extract_resources(pipelines):
744745
# Collect all metrics we got, here we want to have the median from all the iterations
745746
return [Resources(p) for p in pipelines]
746747

748+
def merge_stats(elementary, running):
749+
"""
750+
Merge an incoming elementary JSON into a running stats structure.
751+
Also maintains running std using Welford's method.
752+
753+
Each metric stores:
754+
mean, std, M2, min, max, count
755+
"""
756+
if not elementary:
757+
return running
758+
759+
n_new_total = int(elementary.get("count", 1))
760+
running["count"] = running.get("count", 0) + n_new_total
761+
762+
for name, metrics in elementary.items():
763+
if name == "count":
764+
continue
765+
766+
if name not in running:
767+
running[name] = {"count": 0}
768+
769+
# existing count for this name
770+
n_old_name = running[name].get("count", 0)
771+
772+
for metric, vals in metrics.items():
773+
if not isinstance(vals, dict):
774+
continue
775+
776+
if metric not in running[name]:
777+
running[name][metric] = {
778+
"min": vals.get("min"),
779+
"max": vals.get("max"),
780+
"mean": vals.get("mean"),
781+
"std": 0.0,
782+
"M2": 0.0,
783+
"count": n_new_total
784+
}
785+
continue
786+
787+
rmetric = running[name][metric]
788+
n_old = rmetric.get("count", 0)
789+
n_new = n_new_total
790+
791+
# update min / max
792+
e_min = vals.get("min")
793+
e_max = vals.get("max")
794+
if e_min is not None:
795+
rmetric["min"] = e_min if rmetric["min"] is None else min(rmetric["min"], e_min)
796+
if e_max is not None:
797+
rmetric["max"] = e_max if rmetric["max"] is None else max(rmetric["max"], e_max)
798+
799+
# combine means & M2
800+
mean_a = rmetric.get("mean")
801+
mean_b = vals.get("mean")
802+
803+
# If either mean is missing, use the one that exists
804+
if mean_a is None and mean_b is None:
805+
# Nothing to do
806+
continue
807+
elif mean_a is None:
808+
rmetric["mean"] = mean_b
809+
rmetric["M2"] = 0.0
810+
rmetric["count"] = n_new
811+
elif mean_b is None:
812+
# keep existing stats
813+
rmetric["mean"] = mean_a
814+
rmetric["M2"] = rmetric.get("M2", 0.0)
815+
rmetric["count"] = n_old
816+
else:
817+
# both defined → do weighted merge
818+
delta = mean_b - mean_a
819+
new_count = n_old + n_new
820+
new_mean = mean_a + delta * (n_new / new_count)
821+
new_M2 = rmetric.get("M2", 0.0) + 0.0 + (delta**2) * (n_old * n_new / new_count)
822+
823+
rmetric["mean"] = new_mean
824+
rmetric["M2"] = new_M2
825+
rmetric["count"] = new_count
826+
827+
# update std from M2
828+
c = rmetric["count"]
829+
rmetric["std"] = math.sqrt(rmetric["M2"] / c) if c > 1 else 0.0
830+
831+
running[name]["count"] = n_old_name + n_new_total
832+
833+
# round mean and std for readability
834+
for name, metrics in running.items():
835+
if name == "count":
836+
continue
837+
for metric, vals in metrics.items():
838+
if not isinstance(vals, dict):
839+
continue
840+
if "mean" in vals:
841+
vals["mean"] = r3(vals["mean"])
842+
if "std" in vals:
843+
vals["std"] = r3(vals["std"])
844+
if "min" in vals:
845+
vals["min"] = r3(vals["min"])
846+
if "max" in vals:
847+
vals["max"] = r3(vals["max"])
848+
849+
return running
747850

748851
def print_statistics(resource_object):
749852
"""
@@ -787,9 +890,105 @@ def print_statistics(resource_object):
787890
print(f" {comp:<20s} {mem:10.2f} MB")
788891

789892
#(d) max disc consumption
790-
print ("\nMax-DISC usage (MB): ", dframe['disc'].max())
791-
print ("Mean-DISC usage (MB): ", dframe['disc'].mean())
792-
print ("---> ")
893+
if 'disc' in dframe:
894+
print ("\nMax-DISC usage (MB): ", dframe['disc'].max())
895+
print ("Mean-DISC usage (MB): ", dframe['disc'].mean())
896+
print ("---> ")
897+
898+
def r3(x):
899+
"""Round to 3 decimals, return None for None/NaN."""
900+
if x is None:
901+
return None
902+
try:
903+
xf = float(x)
904+
except Exception:
905+
return None
906+
if math.isnan(xf):
907+
return None
908+
return round(xf, 3)
909+
910+
def produce_json_stat(resource_object):
911+
print ("<--- Producing resource json from file ", resource_object.pipeline_file)
912+
dframe = resource_object.df
913+
meta = resource_object.meta
914+
915+
# also write json summary; This is a file that can be used
916+
# to adjust the resource estimates in o2dpg_workflow_runner.py
917+
#
918+
resource_json = {}
919+
# group by 'name' and compute all needed stats for each metric
920+
stats = (
921+
dframe
922+
.groupby('name')
923+
.agg({
924+
'pss': ['min', 'max', 'mean'],
925+
'uss': ['min', 'max', 'mean'],
926+
'cpu': ['min', 'max', 'mean']
927+
})
928+
)
929+
930+
# turn the multi-level columns into flat names
931+
stats.columns = [f"{col[0]}_{col[1]}" for col in stats.columns]
932+
stats = stats.reset_index()
933+
934+
# ----- compute lifetime ~ walltime per (timeframe, name) -----
935+
# ------------------------------------------------
936+
# Filter out unrealistic timeframes (nice == 19) because it's not the realistic runtime
937+
df_nice_filtered = dframe[dframe['nice'] != 19].copy()
938+
939+
# the calculates of mean runtime should be averaged over timeframes
940+
lifetime_per_tf = (
941+
df_nice_filtered
942+
.groupby(['timeframe', 'name'])['iter']
943+
.agg(lambda x: x.max() - x.min() + 1) # +1 to include both ends
944+
.reset_index(name='lifetime')
945+
)
946+
947+
# now average over timeframes for each name
948+
mean_lifetime = (
949+
lifetime_per_tf
950+
.groupby('name')['lifetime']
951+
.mean()
952+
)
953+
max_lifetime = (
954+
lifetime_per_tf
955+
.groupby('name')['lifetime']
956+
.max()
957+
)
958+
min_lifetime = (
959+
lifetime_per_tf
960+
.groupby('name')['lifetime']
961+
.max()
962+
)
963+
964+
resource_json["count"] = 1 # basic sample size
965+
966+
# convert to nested dictionary
967+
for _, row in stats.iterrows():
968+
name = row['name']
969+
resource_json[name] = {
970+
'pss': {
971+
'min': r3(row['pss_min']),
972+
'max': r3(row['pss_max']),
973+
'mean': r3(row['pss_mean'])
974+
},
975+
'uss': {
976+
'min': r3(row['uss_min']),
977+
'max': r3(row['uss_max']),
978+
'mean': r3(row['uss_mean'])
979+
},
980+
'cpu': {
981+
'min': r3(row['cpu_min']),
982+
'max': r3(row['cpu_max']),
983+
'mean': r3(row['cpu_mean'])
984+
},
985+
'lifetime': {
986+
'min' : r3(float(min_lifetime.get(name, np.nan))),
987+
'max' : r3(float(max_lifetime.get(name, np.nan))),
988+
'mean' : r3(float(mean_lifetime.get(name, np.nan)))
989+
}
990+
}
991+
return resource_json
793992

794993
def stat(args):
795994
"""
@@ -801,6 +1000,32 @@ def stat(args):
8011000
print_statistics(res)
8021001

8031002

1003+
def merge_stats_into(list_of_json_stats, outputfile):
1004+
running = {}
1005+
# read all the inputs
1006+
for inp_json in list_of_json_stats:
1007+
running = merge_stats(inp_json, running)
1008+
1009+
# now write out the result into the output file
1010+
if running:
1011+
with open(outputfile, 'w') as f:
1012+
json.dump(running, f)
1013+
1014+
1015+
def json_stat(args):
1016+
resources = extract_resources(args.pipelines)
1017+
all_stats = [produce_json_stat(res) for res in resources]
1018+
merge_stats_into(all_stats, args.output)
1019+
1020+
1021+
def merge_json_stats(args):
1022+
all_stats = []
1023+
for inp in args.inputs:
1024+
# load the json as a dict
1025+
with open(inp,'r') as f:
1026+
all_stats.append(json.load(f))
1027+
merge_stats_into(all_stats, args.output)
1028+
8041029
def history(args):
8051030
"""
8061031
Entrypoint for history
@@ -1048,7 +1273,17 @@ def main():
10481273
stat_parser.set_defaults(func=stat)
10491274
stat_parser.add_argument("-p", "--pipelines", nargs="*", help="pipeline_metric files from o2_dpg_workflow_runner", required=True)
10501275

1051-
plot_parser = sub_parsers.add_parser("history", help="Plot (multiple) metrcis from extracted metrics JSON file(s)")
1276+
json_stat_parser = sub_parsers.add_parser("json-stat", help="Produce basic json stat (compatible with o2dog_workflow_runner injection)")
1277+
json_stat_parser.set_defaults(func=json_stat)
1278+
json_stat_parser.add_argument("-p", "--pipelines", nargs="*", help="Pipeline_metric files from o2_dpg_workflow_runner; Merges information", required=True)
1279+
json_stat_parser.add_argument("-o", "--output", type=str, help="Output json filename", required=True)
1280+
1281+
merge_stat_parser = sub_parsers.add_parser("merge-json-stats", help="Merge information from json-stats into an aggregated stat")
1282+
merge_stat_parser.set_defaults(func=merge_json_stats)
1283+
merge_stat_parser.add_argument("-i", "--inputs", nargs="*", help="List of incoming/input json stat files", required=True)
1284+
merge_stat_parser.add_argument("-o", "--output", type=str, help="Output json filename", required=True)
1285+
1286+
plot_parser = sub_parsers.add_parser("history", help="Plot (multiple) metrics from extracted metrics JSON file(s)")
10521287
plot_parser.set_defaults(func=history)
10531288
plot_parser.add_argument("-p", "--pipelines", nargs="*", help="pipeline_metric files from o2_dpg_workflow_runner", required=True)
10541289
plot_parser.add_argument("--output", help="output directory", default="resource_history")

0 commit comments

Comments
 (0)