Skip to content

Commit 445312f

Browse files
committed
Allow smarter resume of PBSV call if job dies during the cleanup phase
1 parent 69a5a57 commit 445312f

File tree

1 file changed

+41
-11
lines changed

1 file changed

+41
-11
lines changed

SequenceAnalysis/src/org/labkey/sequenceanalysis/run/analysis/PbsvJointCallingHandler.java

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -139,18 +139,26 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
139139
outputBaseName = outputBaseName.replaceAll(".vcf$", "");
140140
}
141141

142-
List<File> outputs = new ArrayList<>();
143142
if (getVariantPipelineJob(ctx.getJob()).isScatterJob())
144143
{
145144
outputBaseName = outputBaseName + "." + getVariantPipelineJob(ctx.getJob()).getIntervalSetName();
145+
}
146+
147+
File expectedFinalOutput = new File(ctx.getOutputDir(), outputBaseName + ".vcf.gz");
148+
File expectedFinalOutputIdx = new File(expectedFinalOutput.getPath() + ".tbi");
149+
boolean jobCompleted = expectedFinalOutputIdx.exists(); // this would occur if the job died during the cleanup phase
150+
151+
List<File> outputs = new ArrayList<>();
152+
if (getVariantPipelineJob(ctx.getJob()).isScatterJob())
153+
{
146154
for (Interval i : getVariantPipelineJob(ctx.getJob()).getIntervalsForTask())
147155
{
148156
if (i.getStart() != 1)
149157
{
150158
throw new PipelineJobException("Expected all intervals to start on the first base: " + i.toString());
151159
}
152160

153-
File o = runPbsvCall(ctx, filesToProcess, genome, outputBaseName + (getVariantPipelineJob(ctx.getJob()).getIntervalsForTask().size() == 1 ? "" : "." + i.getContig()), i.getContig());
161+
File o = runPbsvCall(ctx, filesToProcess, genome, outputBaseName + (getVariantPipelineJob(ctx.getJob()).getIntervalsForTask().size() == 1 ? "" : "." + i.getContig()), i.getContig(), jobCompleted);
154162
if (o != null)
155163
{
156164
outputs.add(o);
@@ -159,19 +167,27 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
159167
}
160168
else
161169
{
162-
outputs.add(runPbsvCall(ctx, filesToProcess, genome, outputBaseName, null));
170+
outputs.add(runPbsvCall(ctx, filesToProcess, genome, outputBaseName, null, jobCompleted));
163171
}
164172

165173
try
166174
{
167175
File vcfOutGz;
168176
if (outputs.size() == 1)
169177
{
170-
File unzipVcfOut = outputs.get(0);
171-
vcfOutGz = SequenceAnalysisService.get().bgzipFile(unzipVcfOut, ctx.getLogger());
172-
if (unzipVcfOut.exists())
178+
if (jobCompleted)
173179
{
174-
throw new PipelineJobException("Unzipped VCF should not exist: " + vcfOutGz.getPath());
180+
ctx.getLogger().debug("The final output VCF and index are found, so this is likely a resumed job. skipping merge");
181+
vcfOutGz = expectedFinalOutput;
182+
}
183+
else
184+
{
185+
File unzipVcfOut = outputs.get(0);
186+
vcfOutGz = SequenceAnalysisService.get().bgzipFile(unzipVcfOut, ctx.getLogger());
187+
if (unzipVcfOut.exists())
188+
{
189+
throw new PipelineJobException("Unzipped VCF should not exist: " + vcfOutGz.getPath());
190+
}
175191
}
176192
}
177193
else
@@ -181,10 +197,19 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
181197
ctx.getFileManager().addIntermediateFile(f);
182198
ctx.getFileManager().addIntermediateFile(SequenceAnalysisService.get().ensureVcfIndex(f, ctx.getLogger(), false));
183199
}
184-
vcfOutGz = SequenceUtil.combineVcfs(outputs, genome, new File(ctx.getOutputDir(), outputBaseName + ".vcf.gz"), ctx.getLogger(), true, null, false, true);
185200

186-
// NOTE: the resulting file can be out of order due to translocations
187-
SequenceUtil.sortROD(vcfOutGz, ctx.getLogger(), 2);
201+
if (jobCompleted)
202+
{
203+
ctx.getLogger().debug("The final output VCF and index are found, so this is likely a resumed job. skipping merge");
204+
vcfOutGz = expectedFinalOutput;
205+
}
206+
else
207+
{
208+
vcfOutGz = SequenceUtil.combineVcfs(outputs, genome, expectedFinalOutput, ctx.getLogger(), true, null, false, true);
209+
210+
// NOTE: the resulting file can be out of order due to translocations
211+
SequenceUtil.sortROD(vcfOutGz, ctx.getLogger(), 2);
212+
}
188213
}
189214

190215
SequenceAnalysisService.get().ensureVcfIndex(vcfOutGz, ctx.getLogger(), true);
@@ -203,7 +228,7 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
203228
}
204229
}
205230

206-
private File runPbsvCall(JobContext ctx, List<File> inputs, ReferenceGenome genome, String outputBaseName, @Nullable String contig) throws PipelineJobException
231+
private File runPbsvCall(JobContext ctx, List<File> inputs, ReferenceGenome genome, String outputBaseName, @Nullable String contig, boolean jobCompleted) throws PipelineJobException
207232
{
208233
if (inputs.isEmpty())
209234
{
@@ -219,6 +244,11 @@ private File runPbsvCall(JobContext ctx, List<File> inputs, ReferenceGenome geno
219244
verifyAndAddMissingSamples(ctx, vcfOut, inputs, genome);
220245
return vcfOut;
221246
}
247+
else if (jobCompleted)
248+
{
249+
ctx.getLogger().debug("The overall job has completed and this is a job resume. Skipping pbsv call");
250+
return vcfOut;
251+
}
222252

223253
List<String> args = new ArrayList<>();
224254
args.add(getExe().getPath());

0 commit comments

Comments
 (0)