Skip to content

Commit e4c9278

Browse files
committed
Workflow parser, parallelize subtopology generation
1 parent 211a12d commit e4c9278

File tree

2 files changed

+24
-4
lines changed

2 files changed

+24
-4
lines changed

DATA/production/qc-workflow.sh

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@ if [[ -z "$WORKFLOW" ]] || [[ -z "$MYDIR" ]]; then
55
exit 1
66
fi
77

8+
if [[ ! -z $GEN_TOPO_QC_JSON_FILE ]]; then
9+
exec 101>$GEN_TOPO_QC_JSON_FILE.lock || exit 1
10+
flock 101 || exit 1
11+
fi
12+
813
if [[ -z $QC_JSON_FROM_OUTSIDE && ! -z $GEN_TOPO_QC_JSON_FILE && -f $GEN_TOPO_QC_JSON_FILE ]]; then
914
QC_JSON_FROM_OUTSIDE=$GEN_TOPO_QC_JSON_FILE
1015
elif [[ -z $QC_JSON_FROM_OUTSIDE ]]; then
@@ -197,4 +202,8 @@ if [[ ! -z "$QC_JSON_FROM_OUTSIDE" ]]; then
197202
add_W o2-qc "--config json://$QC_JSON_FROM_OUTSIDE ${QC_CONFIG_PARAM:---local --host ${QC_HOST:-localhost}} ${QC_CONFIG}"
198203
fi
199204

205+
if [[ ! -z $GEN_TOPO_QC_JSON_FILE ]]; then
206+
flock -u 101 || exit 1
207+
fi
208+
200209
true # everything OK up to this point, so the script should return 0 (it is !=0 if the last check failed)

DATA/tools/parse

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#!/usr/bin/env python3
22
import os
3+
import subprocess
4+
import threading
35
import sys
46
import shlex
57
import tempfile
@@ -103,7 +105,8 @@ for line in f:
103105
if NO_PROCESSING_MODE and len(args) > 2:
104106
print('Cannot use DPL workflow together with DD mode', os.environ['DDMODE'])
105107
raise
106-
for i in range(2, len(args)):
108+
109+
def processSubtopology(i, args, tmpdir, reconodes, reconodesmin, recoworkflows, calibworkflows, calibworkflowsdds):
107110
filename = tmpdir + '/wf' + str(i) + '.dds'
108111
calibcores = '1'
109112
if args[i].startswith('reco'):
@@ -131,8 +134,8 @@ for line in f:
131134
if os.environ['WORKFLOWMODE'] == 'print':
132135
command += ' > ' + filename
133136
print('Running DPL command', command)
134-
retVal = os.system(command)
135-
if retVal != 0:
137+
retVal = subprocess.run(command, shell=True)
138+
if retVal.returncode != 0:
136139
print('Error (' + str(retVal) + ') running command', command)
137140
ftmp = open(filename, 'r')
138141
rg = re.compile('^<topology')
@@ -153,9 +156,17 @@ for line in f:
153156
isempty = 0
154157
break
155158
if isempty:
156-
continue
159+
return
157160
calibworkflows.append(filename)
158161
calibworkflowsdds.append(filename + ':' + calibcores)
162+
163+
subtopologyThreads = []
164+
for i in range(2, len(args)):
165+
t = threading.Thread(target = processSubtopology, args = (i, args, tmpdir, reconodes, reconodesmin, recoworkflows, calibworkflows, calibworkflowsdds))
166+
t.start()
167+
subtopologyThreads.append(t)
168+
for t in subtopologyThreads:
169+
t.join()
159170
if reco_num_nodes_override > 0:
160171
reconodes = reco_num_nodes_override
161172
reconodesmin = min(reconodes, reconodesmin)

0 commit comments

Comments
 (0)