@@ -133,8 +133,7 @@ protected Set<String> updateStatusForAllJobs() throws PipelineJobException
133133 int stateIdx = -1 ;
134134 int hostnameIdx = -1 ;
135135 int reasonIdx = -1 ;
136- int elapsedIdx = -1 ;
137- int resourcesIdx = -1 ;
136+
138137 for (String line : ret )
139138 {
140139 line = StringUtils .trimToNull (line );
@@ -151,8 +150,6 @@ protected Set<String> updateStatusForAllJobs() throws PipelineJobException
151150 stateIdx = header .indexOf ("STATE" );
152151 hostnameIdx = header .indexOf ("NODELIST" );
153152 reasonIdx = header .indexOf ("REASON" );
154- elapsedIdx = header .indexOf ("ELAPSEDRAW" );
155- resourcesIdx = header .indexOf ("ALLOCTRES" );
156153
157154 if (stateIdx == -1 )
158155 {
@@ -210,30 +207,6 @@ protected Set<String> updateStatusForAllJobs() throws PipelineJobException
210207 }
211208 }
212209
213- if (resourcesIdx > -1 )
214- {
215- j .setCpuUsed (findIntValue (tokens [resourcesIdx ], "cpu" ));
216- if (j .getCpuUsed () != null )
217- {
218- propsToUpdate .put ("cpuUsed" , j .getCpuUsed ());
219- }
220-
221- j .setGpuUsed (findIntValue (tokens [resourcesIdx ], "gpu" ));
222- if (j .getGpuUsed () != null )
223- {
224- propsToUpdate .put ("gpuUsed" , j .getGpuUsed ());
225- }
226- }
227-
228- if (elapsedIdx > -1 )
229- {
230- j .setDuration (Integer .parseInt (tokens [elapsedIdx ]));
231- if (j .getDuration () != null )
232- {
233- propsToUpdate .put ("duration" , j .getDuration ());
234- }
235- }
236-
237210 if (!propsToUpdate .isEmpty ())
238211 {
239212 updateClusterSubmission (j , propsToUpdate );
@@ -293,6 +266,8 @@ private Integer findIntValue(String input, String key)
293266
294267 private void updateClusterSubmission (ClusterJob j , Map <String , Object > toUpdate )
295268 {
269+ _log .debug ("Updating job: " + j .getJobId () + ", " + toUpdate .keySet ().stream ().map (x -> x + "=" + toUpdate .get (x )).collect (Collectors .joining (", " )));
270+
296271 toUpdate .put ("rowid" , j .getRowId ());
297272 Table .update (null , ClusterSchema .getInstance ().getSchema ().getTable (ClusterSchema .CLUSTER_JOBS ), toUpdate , j .getRowId ());
298273 }
@@ -315,6 +290,7 @@ protected Pair<String, String> getStatusForJob(ClusterJob job, Container c)
315290 //verify success
316291 boolean headerFound = false ;
317292 boolean foundJobLine = false ;
293+ List <String > fieldWidths = new ArrayList <>();
318294 LinkedHashSet <String > statuses = new LinkedHashSet <>();
319295 List <String > header ;
320296 int jobIdx = -1 ;
@@ -360,24 +336,24 @@ protected Pair<String, String> getStatusForJob(ClusterJob job, Container c)
360336 }
361337 else if (foundJobLine && line .startsWith ("------------" ))
362338 {
339+ fieldWidths .addAll (Arrays .asList (line .split (" " )));
363340 headerFound = true ;
364341 }
365342 else if (headerFound )
366343 {
367344 try
368345 {
369- String [] tokens = line .split ("( )+" );
370- String id = StringUtils .trimToNull (tokens [jobIdx ]);
346+ String id = StringUtils .trimToNull (extractField (line , fieldWidths , jobIdx ));
371347 if (id .equals (job .getClusterId ()))
372348 {
373- statuses .add (StringUtils .trimToNull (tokens [ stateIdx ] ));
349+ statuses .add (StringUtils .trimToNull (extractField ( line , fieldWidths , stateIdx ) ));
374350 }
375351
376352 Map <String , Object > propsToUpdate = new HashMap <>();
377353
378354 if (hostnameIdx > -1 )
379355 {
380- String hostname = tokens . length > hostnameIdx ? StringUtils .trimToNull (tokens [ hostnameIdx ]) : null ;
356+ String hostname = StringUtils .trimToNull (extractField ( line , fieldWidths , hostnameIdx )) ;
381357 if (hostname != null )
382358 {
383359 if (job .getHostname () == null || !job .getHostname ().equals (hostname ))
@@ -388,9 +364,9 @@ else if (headerFound)
388364 }
389365 }
390366
391- if (reqMemIdx > -1 && reqMemIdx < tokens . length )
367+ if (reqMemIdx > -1 )
392368 {
393- String val = StringUtils .trimToNull (tokens [ reqMemIdx ] );
369+ String val = StringUtils .trimToNull (extractField ( line , fieldWidths , reqMemIdx ) );
394370 if (val != null )
395371 {
396372 reqMem = val ;
@@ -400,13 +376,13 @@ else if (headerFound)
400376
401377 if (resourcesIdx > -1 )
402378 {
403- job .setCpuUsed (findIntValue (tokens [ resourcesIdx ] , "cpu" ));
379+ job .setCpuUsed (findIntValue (extractField ( line , fieldWidths , resourcesIdx ) , "cpu" ));
404380 if (job .getCpuUsed () != null )
405381 {
406382 propsToUpdate .put ("cpuUsed" , job .getCpuUsed ());
407383 }
408384
409- job .setGpuUsed (findIntValue (tokens [ resourcesIdx ] , "gpu" ));
385+ job .setGpuUsed (findIntValue (extractField ( line , fieldWidths , resourcesIdx ) , "gpu" ));
410386 if (job .getGpuUsed () != null )
411387 {
412388 propsToUpdate .put ("gpuUsed" , job .getGpuUsed ());
@@ -415,7 +391,7 @@ else if (headerFound)
415391
416392 if (elapsedIdx > -1 )
417393 {
418- job .setDuration (Integer .parseInt (tokens [ elapsedIdx ] ));
394+ job .setDuration (Integer .parseInt (extractField ( line , fieldWidths , elapsedIdx ) ));
419395 if (job .getDuration () != null )
420396 {
421397 propsToUpdate .put ("duration" , job .getDuration ());
@@ -428,11 +404,11 @@ else if (headerFound)
428404 }
429405
430406 // NOTE: if the line has blank ending columns, trimmed lines might lack that value
431- if ((job .getClusterId () + ".0" ).equals (id ) && maxRssIdx > -1 && maxRssIdx < tokens . length )
407+ if ((job .getClusterId () + ".0" ).equals (id ) && maxRssIdx > -1 )
432408 {
433409 try
434410 {
435- String maxRSS = StringUtils .trimToNull (tokens [ maxRssIdx ] );
411+ String maxRSS = StringUtils .trimToNull (extractField ( line , fieldWidths , maxRssIdx ) );
436412 if (maxRSS != null )
437413 {
438414 double bytes = FileSizeFormatter .convertStringRepresentationToBytes (maxRSS );
@@ -471,7 +447,7 @@ else if (headerFound)
471447 }
472448 catch (Exception e )
473449 {
474- _log .error ("Error parsing line: " + line , e );
450+ _log .error ("Error parsing line: [ " + line + "]" , e );
475451 throw e ;
476452 }
477453 }
@@ -507,6 +483,19 @@ else if (headerFound)
507483 return null ;
508484 }
509485
486+ private String extractField (String line , List <String > fieldWidths , int idx )
487+ {
488+ int start = 0 ;
489+ for (int i = 0 ; i < idx ; i ++)
490+ {
491+ start += fieldWidths .get (i ).length () + 1 ;
492+ }
493+
494+ int end = start + fieldWidths .get (idx ).length ();
495+
496+ return line .substring (start , end );
497+ }
498+
510499 @ Override
511500 protected boolean removeJob (ClusterJob clusterJob )
512501 {
@@ -836,8 +825,6 @@ private Pair<String, String> getStatusFromQueue(ClusterJob job)
836825 int jobIdx = -1 ;
837826 int stateIdx = -1 ;
838827 int hostnameIdx = -1 ;
839- int elapsedIdx = -1 ;
840- int resourcesIdx = -1 ;
841828
842829 for (String line : ret )
843830 {
@@ -854,8 +841,6 @@ private Pair<String, String> getStatusFromQueue(ClusterJob job)
854841 jobIdx = header .indexOf ("JOBID" );
855842 stateIdx = header .indexOf ("STATE" );
856843 hostnameIdx = header .indexOf ("NODELIST" );
857- elapsedIdx = header .indexOf ("ELAPSEDRAW" );
858- resourcesIdx = header .indexOf ("ALLOCTRES" );
859844
860845 if (stateIdx == -1 )
861846 {
@@ -892,30 +877,6 @@ private Pair<String, String> getStatusFromQueue(ClusterJob job)
892877 }
893878 }
894879
895- if (resourcesIdx > -1 )
896- {
897- job .setCpuUsed (findIntValue (tokens [resourcesIdx ], "cpu" ));
898- if (job .getCpuUsed () != null )
899- {
900- propsToUpdate .put ("cpuUsed" , job .getCpuUsed ());
901- }
902-
903- job .setGpuUsed (findIntValue (tokens [resourcesIdx ], "gpu" ));
904- if (job .getGpuUsed () != null )
905- {
906- propsToUpdate .put ("gpuUsed" , job .getGpuUsed ());
907- }
908- }
909-
910- if (elapsedIdx > -1 )
911- {
912- job .setDuration (Integer .parseInt (tokens [elapsedIdx ]));
913- if (job .getDuration () != null )
914- {
915- propsToUpdate .put ("duration" , job .getDuration ());
916- }
917- }
918-
919880 if (!propsToUpdate .isEmpty ())
920881 {
921882 updateClusterSubmission (job , propsToUpdate );
0 commit comments