|
84 | 84 | spec.loader.exec_module(o2dpg_workflow_utils) |
85 | 85 | from o2dpg_workflow_utils import createTask, dump_workflow |
86 | 86 |
|
| 87 | +####################### |
| 88 | +# ANALYSIS definition # |
| 89 | +####################### |
| 90 | + |
87 | 91 | # some commong definitions |
88 | 92 | ANALYSIS_LABEL = "Analysis" |
89 | 93 | ANALYSIS_LABEL_MERGED = f"{ANALYSIS_LABEL}Merged" |
|
153 | 157 | # "valid_for": [ANALYSIS_VALID_MC], |
154 | 158 | # "cmd": "o2-analysis-mm-vertexing-fwd {CONFIG} {AOD}"} |
155 | 159 | # ANALYSES.append(analysis_PWGMMFwdVertexing) |
| 160 | +analysis_PWGMMMDnDeta = {"name": "PWGMMMDnDeta", |
| 161 | + "expected_output": ["AnalysisResults.root"], |
| 162 | + "valid_for": [ANALYSIS_VALID_MC], |
| 163 | + "cmd": "o2-analysis-timestamp {CONFIG} | o2-analysis-track-propagation {CONFIG} | o2-analysis-event-selection {CONFIG} | o2-analysis-mm-particles-to-tracks {CONFIG} | o2-analysis-mm-dndeta {CONFIG} {AOD}"} |
| 164 | +ANALYSES.append(analysis_PWGMMMDnDeta) |
156 | 165 |
|
157 | 166 | def make_merged_analysis(*analysis_names, accept_data_or_mc=ANALYSIS_VALID_MC): |
158 | 167 | """merge CMD / DPL piping to one large pipe |
@@ -249,6 +258,38 @@ def create_ana_task(name, cmd, output_dir, *, needs=None, shmsegmentsize="--shm- |
249 | 258 | task['cmd'] = f"{cmd} {shmsegmentsize} {aodmemoryratelimit} {readers} {extraarguments}" |
250 | 259 | return task |
251 | 260 |
|
| 261 | +def add_analysis_post_processing_tasks(workflow): |
| 262 | + """add post-processing step to analysis tasks if possible |
| 263 | +
|
| 264 | + Args: |
| 265 | + workflow: list |
| 266 | + current list of tasks |
| 267 | + """ |
| 268 | + analyses_to_add_for = {} |
| 269 | + # collect analyses in current workflow |
| 270 | + for task in workflow: |
| 271 | + if ANALYSIS_LABEL in task["labels"] or ANALYSIS_LABEL_MERGED in task["labels"]: |
| 272 | + analyses_to_add_for[task["name"]] = task |
| 273 | + |
| 274 | + for ana in ANALYSES: |
| 275 | + if not ana["expected_output"]: |
| 276 | + continue |
| 277 | + ana_name_raw = ana["name"] |
| 278 | + post_processing_macro = join(O2DPG_ROOT, "MC", "analysis_testing", "post_processing", f"{ana_name_raw}.C") |
| 279 | + if not exists(post_processing_macro): |
| 280 | + continue |
| 281 | + ana_name = full_ana_name(ana_name_raw) |
| 282 | + if ana_name not in analyses_to_add_for: |
| 283 | + continue |
| 284 | + pot_ana = analyses_to_add_for[ana_name] |
| 285 | + cwd = pot_ana["cwd"] |
| 286 | + needs = [ana_name] |
| 287 | + task = createTask(name=f"{ANALYSIS_LABEL}_post_processing_{ana_name_raw}", cwd=join(cwd, "post_processing"), lab=[ANALYSIS_LABEL, f"{ANALYSIS_LABEL}PostProcessing", ana_name_raw], cpu=1, mem='2000', needs=needs) |
| 288 | + input_files = ",".join([f"../{eo}" for eo in ana["expected_output"]]) |
| 289 | + cmd = f"\\(\\\"{input_files}\\\",\\\"./\\\"\\)" |
| 290 | + task["cmd"] = f"root -l -b -q {post_processing_macro}{cmd}" |
| 291 | + workflow.append(task) |
| 292 | + |
252 | 293 | def add_analysis_tasks(workflow, input_aod="./AO2D.root", output_dir="./Analysis", *, analyses_only=None, is_mc=True, add_merged_task=False, config=None, needs=None): |
253 | 294 | """Add default analyses to user workflow |
254 | 295 |
|
@@ -286,6 +327,8 @@ def add_analysis_tasks(workflow, input_aod="./AO2D.root", output_dir="./Analysis |
286 | 327 | task = create_ana_task(ana["name"], ana["cmd"].format(CONFIG=f"--configuration {configuration}", AOD=f"--aod-file {input_aod}"), output_dir, needs=needs, is_mc=is_mc) |
287 | 328 | task["labels"].append(ANALYSIS_LABEL_MERGED) |
288 | 329 | workflow.append(task) |
| 330 | + # append potential post-processing |
| 331 | + add_analysis_post_processing_tasks(workflow) |
289 | 332 |
|
290 | 333 | def add_analysis_qc_upload_tasks(workflow, period_name, run_number, pass_name): |
291 | 334 | """add o2-qc-upload-root-objects to specified analysis tasks |
|
0 commit comments