11package org .labkey .cluster .pipeline ;
22
33import org .apache .commons .lang3 .StringUtils ;
4+ import org .apache .commons .lang3 .math .NumberUtils ;
45import org .apache .logging .log4j .LogManager ;
56import org .apache .logging .log4j .Logger ;
67import org .jetbrains .annotations .NotNull ;
1112import org .labkey .api .collections .CaseInsensitiveHashSet ;
1213import org .labkey .api .data .Container ;
1314import org .labkey .api .data .ContainerManager ;
15+ import org .labkey .api .data .Table ;
1416import org .labkey .api .pipeline .PipelineJob ;
1517import org .labkey .api .pipeline .PipelineJobException ;
1618import org .labkey .api .pipeline .PipelineService ;
1921import org .labkey .api .util .Pair ;
2022import org .labkey .api .writer .PrintWriters ;
2123import org .labkey .cluster .ClusterManager ;
24+ import org .labkey .cluster .ClusterSchema ;
2225import org .labkey .cluster .ClusterServiceImpl ;
2326import org .quartz .JobExecutionException ;
2427
@@ -94,6 +97,7 @@ protected List<String> submitJobToCluster(ClusterJob j, PipelineJob job) throws
9497 line = line .replaceFirst ("^Submitted batch job" , "" );
9598 line = line .trim ();
9699 j .setClusterId (line );
100+ j .setClusterUser (ClusterServiceImpl .get ().getClusterUser (job .getContainer ()));
97101
98102 break ;
99103 }
@@ -129,6 +133,8 @@ protected Set<String> updateStatusForAllJobs() throws PipelineJobException
129133 int stateIdx = -1 ;
130134 int hostnameIdx = -1 ;
131135 int reasonIdx = -1 ;
136+ int elapsedIdx = -1 ;
137+ int resourcesIdx = -1 ;
132138 for (String line : ret )
133139 {
134140 line = StringUtils .trimToNull (line );
@@ -145,6 +151,8 @@ protected Set<String> updateStatusForAllJobs() throws PipelineJobException
145151 stateIdx = header .indexOf ("STATE" );
146152 hostnameIdx = header .indexOf ("NODELIST" );
147153 reasonIdx = header .indexOf ("REASON" );
154+ elapsedIdx = header .indexOf ("ELAPSEDRAW" );
155+ resourcesIdx = header .indexOf ("ALLOCTRES" );
148156
149157 if (stateIdx == -1 )
150158 {
@@ -177,10 +185,13 @@ protected Set<String> updateStatusForAllJobs() throws PipelineJobException
177185 }
178186 else
179187 {
188+ Map <String , Object > propsToUpdate = new HashMap <>();
189+
180190 String hostname = hostnameIdx != -1 && tokens .length > hostnameIdx ? StringUtils .trimToNull (tokens [hostnameIdx ]) : null ;
181191 if (hostname != null )
182192 {
183193 j .setHostname (hostname );
194+ propsToUpdate .put ("hostname" , hostname );
184195 }
185196
186197 Pair <String , String > status = translateSlurmStatusToTaskStatus (StringUtils .trimToNull (tokens [stateIdx ]));
@@ -199,6 +210,35 @@ protected Set<String> updateStatusForAllJobs() throws PipelineJobException
199210 }
200211 }
201212
213+ if (resourcesIdx > -1 )
214+ {
215+ j .setCpuUsed (findIntValue (tokens [resourcesIdx ], "cpu" ));
216+ if (j .getCpuUsed () != null )
217+ {
218+ propsToUpdate .put ("cpuUsed" , j .getCpuUsed ());
219+ }
220+
221+ j .setGpuUsed (findIntValue (tokens [resourcesIdx ], "gpu" ));
222+ if (j .getGpuUsed () != null )
223+ {
224+ propsToUpdate .put ("gpuUsed" , j .getGpuUsed ());
225+ }
226+ }
227+
228+ if (elapsedIdx > -1 )
229+ {
230+ j .setDuration (Integer .parseInt (tokens [elapsedIdx ]));
231+ if (j .getDuration () != null )
232+ {
233+ propsToUpdate .put ("duration" , j .getDuration ());
234+ }
235+ }
236+
237+ if (!propsToUpdate .isEmpty ())
238+ {
239+ updateClusterSubmission (j , propsToUpdate );
240+ }
241+
202242 updateJobStatus (status == null ? null : status .first , j , status == null ? null : status .second );
203243 jobsUpdated .add (j .getClusterId ());
204244 }
@@ -223,6 +263,40 @@ protected Set<String> updateStatusForAllJobs() throws PipelineJobException
223263 return jobsUpdated ;
224264 }
225265
266+ // parses AllocTRES, such as: cpu=4,gres/disk=1028,mem=20000M,node=1
267+ private Integer findIntValue (String input , String key )
268+ {
269+ input = StringUtils .trimToNull (input );
270+ if (input == null )
271+ {
272+ return null ;
273+ }
274+
275+ String [] tokens = input .split ("," );
276+ for (String token : tokens )
277+ {
278+ if (token .startsWith (key + "=" ))
279+ {
280+ String val = token .split ("=" )[1 ];
281+ if (!NumberUtils .isCreatable (val ))
282+ {
283+ _log .error ("Non-numeric value for: " + key + ", input: " + input );
284+ return null ;
285+ }
286+
287+ return Integer .parseInt (val );
288+ }
289+ }
290+
291+ return null ;
292+ }
293+
294+ private void updateClusterSubmission (ClusterJob j , Map <String , Object > toUpdate )
295+ {
296+ toUpdate .put ("rowid" , j .getRowId ());
297+ Table .update (null , ClusterSchema .getInstance ().getSchema ().getTable (ClusterSchema .CLUSTER_JOBS ), toUpdate , j .getRowId ());
298+ }
299+
226300 @ Override
227301 protected Pair <String , String > getStatusForJob (ClusterJob job , Container c )
228302 {
@@ -248,6 +322,9 @@ protected Pair<String, String> getStatusForJob(ClusterJob job, Container c)
248322 int hostnameIdx = -1 ;
249323 int maxRssIdx = -1 ;
250324 int reqMemIdx = -1 ;
325+ int elapsedIdx = -1 ;
326+ int resourcesIdx = -1 ;
327+
251328 String reqMem = null ;
252329 for (String line : ret )
253330 {
@@ -266,6 +343,8 @@ protected Pair<String, String> getStatusForJob(ClusterJob job, Container c)
266343 hostnameIdx = header .indexOf ("NODELIST" );
267344 maxRssIdx = header .indexOf ("MAXRSS" );
268345 reqMemIdx = header .indexOf ("REQMEM" );
346+ elapsedIdx = header .indexOf ("ELAPSEDRAW" );
347+ resourcesIdx = header .indexOf ("ALLOCTRES" );
269348
270349 if (stateIdx == -1 )
271350 {
@@ -294,6 +373,8 @@ else if (headerFound)
294373 statuses .add (StringUtils .trimToNull (tokens [stateIdx ]));
295374 }
296375
376+ Map <String , Object > propsToUpdate = new HashMap <>();
377+
297378 if (hostnameIdx > -1 )
298379 {
299380 String hostname = tokens .length > hostnameIdx ? StringUtils .trimToNull (tokens [hostnameIdx ]) : null ;
@@ -302,6 +383,7 @@ else if (headerFound)
302383 if (job .getHostname () == null || !job .getHostname ().equals (hostname ))
303384 {
304385 job .setHostname (hostname );
386+ propsToUpdate .put ("hostname" , hostname );
305387 }
306388 }
307389 }
@@ -316,6 +398,35 @@ else if (headerFound)
316398
317399 }
318400
401+ if (resourcesIdx > -1 )
402+ {
403+ job .setCpuUsed (findIntValue (tokens [resourcesIdx ], "cpu" ));
404+ if (job .getCpuUsed () != null )
405+ {
406+ propsToUpdate .put ("cpuUsed" , job .getCpuUsed ());
407+ }
408+
409+ job .setGpuUsed (findIntValue (tokens [resourcesIdx ], "gpu" ));
410+ if (job .getGpuUsed () != null )
411+ {
412+ propsToUpdate .put ("gpuUsed" , job .getGpuUsed ());
413+ }
414+ }
415+
416+ if (elapsedIdx > -1 )
417+ {
418+ job .setDuration (Integer .parseInt (tokens [elapsedIdx ]));
419+ if (job .getDuration () != null )
420+ {
421+ propsToUpdate .put ("duration" , job .getDuration ());
422+ }
423+ }
424+
425+ if (!propsToUpdate .isEmpty ())
426+ {
427+ updateClusterSubmission (job , propsToUpdate );
428+ }
429+
319430 // NOTE: if the line has blank ending columns, trimmed lines might lack that value
320431 if ((job .getClusterId () + ".0" ).equals (id ) && maxRssIdx > -1 && maxRssIdx < tokens .length )
321432 {
@@ -725,6 +836,8 @@ private Pair<String, String> getStatusFromQueue(ClusterJob job)
725836 int jobIdx = -1 ;
726837 int stateIdx = -1 ;
727838 int hostnameIdx = -1 ;
839+ int elapsedIdx = -1 ;
840+ int resourcesIdx = -1 ;
728841
729842 for (String line : ret )
730843 {
@@ -741,6 +854,8 @@ private Pair<String, String> getStatusFromQueue(ClusterJob job)
741854 jobIdx = header .indexOf ("JOBID" );
742855 stateIdx = header .indexOf ("STATE" );
743856 hostnameIdx = header .indexOf ("NODELIST" );
857+ elapsedIdx = header .indexOf ("ELAPSEDRAW" );
858+ resourcesIdx = header .indexOf ("ALLOCTRES" );
744859
745860 if (stateIdx == -1 )
746861 {
@@ -765,15 +880,47 @@ private Pair<String, String> getStatusFromQueue(ClusterJob job)
765880 String id = StringUtils .trimToNull (tokens [jobIdx ]);
766881 if (job .getClusterId ().equals (id ))
767882 {
883+ Map <String , Object > propsToUpdate = new HashMap <>();
884+
768885 if (hostnameIdx > -1 )
769886 {
770887 String hostname = tokens .length > hostnameIdx ? StringUtils .trimToNull (tokens [hostnameIdx ]) : null ;
771888 if (hostname != null )
772889 {
773890 job .setHostname (hostname );
891+ propsToUpdate .put ("hostname" , hostname );
892+ }
893+ }
894+
895+ if (resourcesIdx > -1 )
896+ {
897+ job .setCpuUsed (findIntValue (tokens [resourcesIdx ], "cpu" ));
898+ if (job .getCpuUsed () != null )
899+ {
900+ propsToUpdate .put ("cpuUsed" , job .getCpuUsed ());
901+ }
902+
903+ job .setGpuUsed (findIntValue (tokens [resourcesIdx ], "gpu" ));
904+ if (job .getGpuUsed () != null )
905+ {
906+ propsToUpdate .put ("gpuUsed" , job .getGpuUsed ());
774907 }
775908 }
776909
910+ if (elapsedIdx > -1 )
911+ {
912+ job .setDuration (Integer .parseInt (tokens [elapsedIdx ]));
913+ if (job .getDuration () != null )
914+ {
915+ propsToUpdate .put ("duration" , job .getDuration ());
916+ }
917+ }
918+
919+ if (!propsToUpdate .isEmpty ())
920+ {
921+ updateClusterSubmission (job , propsToUpdate );
922+ }
923+
777924 return translateSlurmStatusToTaskStatus (StringUtils .trimToNull (tokens [stateIdx ]));
778925 }
779926 }
0 commit comments