1212import matplotlib
1313import json
1414import numpy as np
15+ import math
1516import pandas as pd
1617
1718############################################################################
@@ -744,6 +745,108 @@ def extract_resources(pipelines):
744745 # Collect all metrics we got, here we want to have the median from all the iterations
745746 return [Resources (p ) for p in pipelines ]
746747
748+ def merge_stats (elementary , running ):
749+ """
750+ Merge an incoming elementary JSON into a running stats structure.
751+ Also maintains running std using Welford's method.
752+
753+ Each metric stores:
754+ mean, std, M2, min, max, count
755+ """
756+ if not elementary :
757+ return running
758+
759+ n_new_total = int (elementary .get ("count" , 1 ))
760+ running ["count" ] = running .get ("count" , 0 ) + n_new_total
761+
762+ for name , metrics in elementary .items ():
763+ if name == "count" :
764+ continue
765+
766+ if name not in running :
767+ running [name ] = {"count" : 0 }
768+
769+ # existing count for this name
770+ n_old_name = running [name ].get ("count" , 0 )
771+
772+ for metric , vals in metrics .items ():
773+ if not isinstance (vals , dict ):
774+ continue
775+
776+ if metric not in running [name ]:
777+ running [name ][metric ] = {
778+ "min" : vals .get ("min" ),
779+ "max" : vals .get ("max" ),
780+ "mean" : vals .get ("mean" ),
781+ "std" : 0.0 ,
782+ "M2" : 0.0 ,
783+ "count" : n_new_total
784+ }
785+ continue
786+
787+ rmetric = running [name ][metric ]
788+ n_old = rmetric .get ("count" , 0 )
789+ n_new = n_new_total
790+
791+ # update min / max
792+ e_min = vals .get ("min" )
793+ e_max = vals .get ("max" )
794+ if e_min is not None :
795+ rmetric ["min" ] = e_min if rmetric ["min" ] is None else min (rmetric ["min" ], e_min )
796+ if e_max is not None :
797+ rmetric ["max" ] = e_max if rmetric ["max" ] is None else max (rmetric ["max" ], e_max )
798+
799+ # combine means & M2
800+ mean_a = rmetric .get ("mean" )
801+ mean_b = vals .get ("mean" )
802+
803+ # If either mean is missing, use the one that exists
804+ if mean_a is None and mean_b is None :
805+ # Nothing to do
806+ continue
807+ elif mean_a is None :
808+ rmetric ["mean" ] = mean_b
809+ rmetric ["M2" ] = 0.0
810+ rmetric ["count" ] = n_new
811+ elif mean_b is None :
812+ # keep existing stats
813+ rmetric ["mean" ] = mean_a
814+ rmetric ["M2" ] = rmetric .get ("M2" , 0.0 )
815+ rmetric ["count" ] = n_old
816+ else :
817+ # both defined → do weighted merge
818+ delta = mean_b - mean_a
819+ new_count = n_old + n_new
820+ new_mean = mean_a + delta * (n_new / new_count )
821+ new_M2 = rmetric .get ("M2" , 0.0 ) + 0.0 + (delta ** 2 ) * (n_old * n_new / new_count )
822+
823+ rmetric ["mean" ] = new_mean
824+ rmetric ["M2" ] = new_M2
825+ rmetric ["count" ] = new_count
826+
827+ # update std from M2
828+ c = rmetric ["count" ]
829+ rmetric ["std" ] = math .sqrt (rmetric ["M2" ] / c ) if c > 1 else 0.0
830+
831+ running [name ]["count" ] = n_old_name + n_new_total
832+
833+ # round mean and std for readability
834+ for name , metrics in running .items ():
835+ if name == "count" :
836+ continue
837+ for metric , vals in metrics .items ():
838+ if not isinstance (vals , dict ):
839+ continue
840+ if "mean" in vals :
841+ vals ["mean" ] = r3 (vals ["mean" ])
842+ if "std" in vals :
843+ vals ["std" ] = r3 (vals ["std" ])
844+ if "min" in vals :
845+ vals ["min" ] = r3 (vals ["min" ])
846+ if "max" in vals :
847+ vals ["max" ] = r3 (vals ["max" ])
848+
849+ return running
747850
748851def print_statistics (resource_object ):
749852 """
@@ -787,9 +890,105 @@ def print_statistics(resource_object):
787890 print (f" { comp :<20s} { mem :10.2f} MB" )
788891
789892 #(d) max disc consumption
790- print ("\n Max-DISC usage (MB): " , dframe ['disc' ].max ())
791- print ("Mean-DISC usage (MB): " , dframe ['disc' ].mean ())
792- print ("---> " )
893+ if 'disc' in dframe :
894+ print ("\n Max-DISC usage (MB): " , dframe ['disc' ].max ())
895+ print ("Mean-DISC usage (MB): " , dframe ['disc' ].mean ())
896+ print ("---> " )
897+
898+ def r3 (x ):
899+ """Round to 3 decimals, return None for None/NaN."""
900+ if x is None :
901+ return None
902+ try :
903+ xf = float (x )
904+ except Exception :
905+ return None
906+ if math .isnan (xf ):
907+ return None
908+ return round (xf , 3 )
909+
910+ def produce_json_stat (resource_object ):
911+ print ("<--- Producing resource json from file " , resource_object .pipeline_file )
912+ dframe = resource_object .df
913+ meta = resource_object .meta
914+
915+ # also write json summary; This is a file that can be used
916+ # to adjust the resource estimates in o2dpg_workflow_runner.py
917+ #
918+ resource_json = {}
919+ # group by 'name' and compute all needed stats for each metric
920+ stats = (
921+ dframe
922+ .groupby ('name' )
923+ .agg ({
924+ 'pss' : ['min' , 'max' , 'mean' ],
925+ 'uss' : ['min' , 'max' , 'mean' ],
926+ 'cpu' : ['min' , 'max' , 'mean' ]
927+ })
928+ )
929+
930+ # turn the multi-level columns into flat names
931+ stats .columns = [f"{ col [0 ]} _{ col [1 ]} " for col in stats .columns ]
932+ stats = stats .reset_index ()
933+
934+ # ----- compute lifetime ~ walltime per (timeframe, name) -----
935+ # ------------------------------------------------
936+ # Filter out unrealistic timeframes (nice == 19) because it's not the realistic runtime
937+ df_nice_filtered = dframe [dframe ['nice' ] != 19 ].copy ()
938+
939+ # the calculates of mean runtime should be averaged over timeframes
940+ lifetime_per_tf = (
941+ df_nice_filtered
942+ .groupby (['timeframe' , 'name' ])['iter' ]
943+ .agg (lambda x : x .max () - x .min () + 1 ) # +1 to include both ends
944+ .reset_index (name = 'lifetime' )
945+ )
946+
947+ # now average over timeframes for each name
948+ mean_lifetime = (
949+ lifetime_per_tf
950+ .groupby ('name' )['lifetime' ]
951+ .mean ()
952+ )
953+ max_lifetime = (
954+ lifetime_per_tf
955+ .groupby ('name' )['lifetime' ]
956+ .max ()
957+ )
958+ min_lifetime = (
959+ lifetime_per_tf
960+ .groupby ('name' )['lifetime' ]
961+ .max ()
962+ )
963+
964+ resource_json ["count" ] = 1 # basic sample size
965+
966+ # convert to nested dictionary
967+ for _ , row in stats .iterrows ():
968+ name = row ['name' ]
969+ resource_json [name ] = {
970+ 'pss' : {
971+ 'min' : r3 (row ['pss_min' ]),
972+ 'max' : r3 (row ['pss_max' ]),
973+ 'mean' : r3 (row ['pss_mean' ])
974+ },
975+ 'uss' : {
976+ 'min' : r3 (row ['uss_min' ]),
977+ 'max' : r3 (row ['uss_max' ]),
978+ 'mean' : r3 (row ['uss_mean' ])
979+ },
980+ 'cpu' : {
981+ 'min' : r3 (row ['cpu_min' ]),
982+ 'max' : r3 (row ['cpu_max' ]),
983+ 'mean' : r3 (row ['cpu_mean' ])
984+ },
985+ 'lifetime' : {
986+ 'min' : r3 (float (min_lifetime .get (name , np .nan ))),
987+ 'max' : r3 (float (max_lifetime .get (name , np .nan ))),
988+ 'mean' : r3 (float (mean_lifetime .get (name , np .nan )))
989+ }
990+ }
991+ return resource_json
793992
794993def stat (args ):
795994 """
@@ -801,6 +1000,32 @@ def stat(args):
8011000 print_statistics (res )
8021001
8031002
1003+ def merge_stats_into (list_of_json_stats , outputfile ):
1004+ running = {}
1005+ # read all the inputs
1006+ for inp_json in list_of_json_stats :
1007+ running = merge_stats (inp_json , running )
1008+
1009+ # now write out the result into the output file
1010+ if running :
1011+ with open (outputfile , 'w' ) as f :
1012+ json .dump (running , f )
1013+
1014+
1015+ def json_stat (args ):
1016+ resources = extract_resources (args .pipelines )
1017+ all_stats = [produce_json_stat (res ) for res in resources ]
1018+ merge_stats_into (all_stats , args .output )
1019+
1020+
1021+ def merge_json_stats (args ):
1022+ all_stats = []
1023+ for inp in args .inputs :
1024+ # load the json as a dict
1025+ with open (inp ,'r' ) as f :
1026+ all_stats .append (json .load (f ))
1027+ merge_json_stats (all_stats , args .output )
1028+
8041029def history (args ):
8051030 """
8061031 Entrypoint for history
@@ -1048,7 +1273,17 @@ def main():
10481273 stat_parser .set_defaults (func = stat )
10491274 stat_parser .add_argument ("-p" , "--pipelines" , nargs = "*" , help = "pipeline_metric files from o2_dpg_workflow_runner" , required = True )
10501275
1051- plot_parser = sub_parsers .add_parser ("history" , help = "Plot (multiple) metrcis from extracted metrics JSON file(s)" )
1276+ json_stat_parser = sub_parsers .add_parser ("json-stat" , help = "Produce basic json stat (compatible with o2dog_workflow_runner injection)" )
1277+ json_stat_parser .set_defaults (func = json_stat )
1278+ json_stat_parser .add_argument ("-p" , "--pipelines" , nargs = "*" , help = "Pipeline_metric files from o2_dpg_workflow_runner; Merges information" , required = True )
1279+ json_stat_parser .add_argument ("-o" , "--output" , type = str , help = "Output json filename" , required = True )
1280+
1281+ merge_stat_parser = sub_parsers .add_parser ("merge-json-stats" , help = "Merge information from json-stats into an aggregated stat" )
1282+ merge_stat_parser .set_defaults (func = merge_json_stats )
1283+ merge_stat_parser .add_argument ("-i" , "--inputs" , nargs = "*" , help = "List of incoming/input json stat files" , required = True )
1284+ merge_stat_parser .add_argument ("-o" , "--output" , type = str , help = "Output json filename" , required = True )
1285+
1286+ plot_parser = sub_parsers .add_parser ("history" , help = "Plot (multiple) metrics from extracted metrics JSON file(s)" )
10521287 plot_parser .set_defaults (func = history )
10531288 plot_parser .add_argument ("-p" , "--pipelines" , nargs = "*" , help = "pipeline_metric files from o2_dpg_workflow_runner" , required = True )
10541289 plot_parser .add_argument ("--output" , help = "output directory" , default = "resource_history" )
0 commit comments