Skip to content

Commit 8ea872e

Browse files
committed
Attempt to make PBSV call into scatter/gather job
1 parent d718cab commit 8ea872e

File tree

3 files changed

+122
-37
lines changed

3 files changed

+122
-37
lines changed

SequenceAnalysis/src/org/labkey/sequenceanalysis/SequenceAnalysisController.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,7 @@ public BasePipelineStepAction(String htmlFile)
389389
_htmlFile = htmlFile;
390390
}
391391

392+
@Override
392393
public ModelAndView getView(Object form, BindException errors)
393394
{
394395
LinkedHashSet<ClientDependency> cds = new LinkedHashSet<>();

SequenceAnalysis/src/org/labkey/sequenceanalysis/run/analysis/PbsvJointCallingHandler.java

Lines changed: 111 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package org.labkey.sequenceanalysis.run.analysis;
22

3+
import htsjdk.samtools.util.Interval;
34
import org.apache.commons.io.FileUtils;
5+
import org.jetbrains.annotations.Nullable;
46
import org.json.JSONObject;
57
import org.labkey.api.module.ModuleLoader;
68
import org.labkey.api.pipeline.PipelineJob;
@@ -15,24 +17,31 @@
1517
import org.labkey.api.sequenceanalysis.pipeline.SequenceOutputHandler;
1618
import org.labkey.api.sequenceanalysis.pipeline.SequencePipelineService;
1719
import org.labkey.api.sequenceanalysis.pipeline.ToolParameterDescriptor;
20+
import org.labkey.api.sequenceanalysis.pipeline.VariantProcessingStep;
1821
import org.labkey.api.sequenceanalysis.run.SimpleScriptWrapper;
1922
import org.labkey.api.util.FileType;
2023
import org.labkey.sequenceanalysis.SequenceAnalysisModule;
24+
import org.labkey.sequenceanalysis.pipeline.ProcessVariantsHandler;
25+
import org.labkey.sequenceanalysis.pipeline.VariantProcessingJob;
26+
import org.labkey.sequenceanalysis.run.util.AbstractGenomicsDBImportHandler;
27+
import org.labkey.sequenceanalysis.util.SequenceUtil;
2128

2229
import java.io.File;
2330
import java.io.IOException;
2431
import java.util.ArrayList;
2532
import java.util.Arrays;
33+
import java.util.LinkedHashSet;
2634
import java.util.List;
2735
import java.util.stream.Collectors;
2836

29-
public class PbsvJointCallingHandler extends AbstractParameterizedOutputHandler<SequenceOutputHandler.SequenceOutputProcessor>
37+
public class PbsvJointCallingHandler extends AbstractParameterizedOutputHandler<SequenceOutputHandler.SequenceOutputProcessor> implements SequenceOutputHandler.TracksVCF, VariantProcessingStep.SupportsScatterGather
3038
{
3139
private static final FileType FILE_TYPE = new FileType(".svsig.gz");
40+
private static final String OUTPUT_CATEGORY = "PBSV VCF";
3241

3342
public PbsvJointCallingHandler()
3443
{
35-
super(ModuleLoader.getInstance().getModule(SequenceAnalysisModule.NAME), "Pbsv Call", "Runs pbsv call, which jointly calls genotypes from PacBio data", null, Arrays.asList(
44+
super(ModuleLoader.getInstance().getModule(SequenceAnalysisModule.NAME), "Pbsv Call", "Runs pbsv call, which jointly calls genotypes from PacBio data", new LinkedHashSet<>(Arrays.asList("sequenceanalysis/panel/VariantScatterGatherPanel.js")), Arrays.asList(
3645
ToolParameterDescriptor.create("fileName", "VCF Filename", "The name of the resulting file.", "textfield", new JSONObject(){{
3746
put("allowBlank", false);
3847
put("doNotIncludeInTemplates", true);
@@ -41,9 +50,10 @@ public PbsvJointCallingHandler()
4150
put("storeValues", "DEBUG;INFO;WARN");
4251
put("multiSelect", false);
4352
}}, "INFO"),
44-
ToolParameterDescriptor.create("doCopyLocal", "Copy Inputs Locally", "If checked, the input file(s) willbe copied to the job working directory.", "checkbox", new JSONObject(){{
53+
ToolParameterDescriptor.create("doCopyLocal", "Copy Inputs Locally", "If checked, the input file(s) will be copied to the job working directory.", "checkbox", new JSONObject(){{
4554
put("checked", true);
46-
}}, true)
55+
}}, true),
56+
ToolParameterDescriptor.create("scatterGather", "Scatter/Gather Options", "If selected, this job will be divided to run job per chromosome. The final step will take the VCF from each intermediate step and combined to make a final VCF file.", "sequenceanalysis-variantscattergatherpanel", null, null)
4757
));
4858
}
4959

@@ -116,6 +126,72 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
116126
}
117127
}
118128

129+
ReferenceGenome genome = ctx.getSequenceSupport().getCachedGenomes().iterator().next();
130+
String outputBaseName = ctx.getParams().getString("fileName");
131+
if (!outputBaseName.toLowerCase().endsWith(".gz"))
132+
{
133+
outputBaseName = outputBaseName.replaceAll(".gz$", "");
134+
}
135+
136+
if (!outputBaseName.toLowerCase().endsWith(".vcf"))
137+
{
138+
outputBaseName = outputBaseName.replaceAll(".vcf$", "");
139+
}
140+
141+
List<File> outputs = new ArrayList<>();
142+
if (getVariantPipelineJob(ctx.getJob()).isScatterJob())
143+
{
144+
outputBaseName = outputBaseName + "." + getVariantPipelineJob(ctx.getJob()).getIntervalSetName();
145+
for (Interval i : getVariantPipelineJob(ctx.getJob()).getIntervalsForTask())
146+
{
147+
if (i.getStart() != 1)
148+
{
149+
throw new PipelineJobException("Expected all intervals to start on the first base: " + i.toString());
150+
}
151+
152+
outputs.add(runPbsvCall(ctx, inputs, genome, outputBaseName + (getVariantPipelineJob(ctx.getJob()).getIntervalsForTask().size() == 1 ? "" : "." + i.getContig()), i.getContig()));
153+
}
154+
}
155+
else
156+
{
157+
outputs.add(runPbsvCall(ctx, inputs, genome, outputBaseName, null));
158+
}
159+
160+
File vcfOutGz;
161+
if (outputs.size() == 1)
162+
{
163+
File unzipVcfOut = outputs.get(0);
164+
vcfOutGz = SequenceAnalysisService.get().bgzipFile(unzipVcfOut, ctx.getLogger());
165+
}
166+
else
167+
{
168+
vcfOutGz = SequenceUtil.combineVcfs(outputs, genome, new File(ctx.getOutputDir(), outputBaseName), ctx.getLogger(), true, null, false);
169+
if (vcfOutGz.exists())
170+
{
171+
throw new PipelineJobException("Unzipped VCF should not exist: " + vcfOutGz.getPath());
172+
}
173+
}
174+
175+
try
176+
{
177+
SequenceAnalysisService.get().ensureVcfIndex(vcfOutGz, ctx.getLogger(), true);
178+
}
179+
catch (IOException e)
180+
{
181+
throw new PipelineJobException(e);
182+
}
183+
184+
SequenceOutputFile so = new SequenceOutputFile();
185+
so.setName("pbsv call: " + outputBaseName);
186+
so.setFile(vcfOutGz);
187+
so.setCategory(OUTPUT_CATEGORY);
188+
so.setLibrary_id(genome.getGenomeId());
189+
190+
ctx.addSequenceOutput(so);
191+
}
192+
193+
private File runPbsvCall(JobContext ctx, List<File> inputs, ReferenceGenome genome, String outputBaseName, @Nullable String contig) throws PipelineJobException
194+
{
119195
List<String> args = new ArrayList<>();
120196
args.add(getExe().getPath());
121197
args.add("call");
@@ -127,22 +203,21 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
127203
args.add(String.valueOf(maxThreads));
128204
}
129205

206+
if (contig != null)
207+
{
208+
args.add("-r");
209+
args.add(contig);
210+
}
211+
130212
args.addAll(getClientCommandArgs(ctx.getParams()));
131213

132-
ReferenceGenome genome = ctx.getSequenceSupport().getCachedGenomes().iterator().next();
133214
args.add(genome.getWorkingFastaFile().getPath());
134215

135216
inputs.forEach(f -> {
136217
args.add(f.getPath());
137218
});
138219

139-
String fileName = ctx.getParams().getString("fileName");
140-
if (!fileName.toLowerCase().endsWith("vcf"))
141-
{
142-
fileName = fileName + ".vcf";
143-
}
144-
145-
File vcfOut = new File(ctx.getOutputDir(), fileName);
220+
File vcfOut = new File(ctx.getOutputDir(), outputBaseName + ".vcf");
146221
args.add(vcfOut.getPath());
147222

148223
new SimpleScriptWrapper(ctx.getLogger()).execute(args);
@@ -152,34 +227,35 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
152227
throw new PipelineJobException("Unable to find file: " + vcfOut.getPath());
153228
}
154229

155-
// Ensure output bgzipped:
156-
File bgVcf = SequenceAnalysisService.get().bgzipFile(vcfOut, ctx.getLogger());
157-
try
158-
{
159-
SequenceAnalysisService.get().ensureVcfIndex(bgVcf, ctx.getLogger(), true);
160-
}
161-
catch (IOException e)
162-
{
163-
throw new PipelineJobException(e);
164-
}
165-
166-
if (vcfOut.exists())
167-
{
168-
throw new PipelineJobException("Unzipped VCF should not exist: " + vcfOut.getPath());
169-
}
170-
171-
SequenceOutputFile so = new SequenceOutputFile();
172-
so.setName("pbsv call: " + fileName);
173-
so.setFile(bgVcf);
174-
so.setCategory("PBSV VCF");
175-
so.setLibrary_id(genome.getGenomeId());
176-
177-
ctx.addSequenceOutput(so);
230+
return vcfOut;
178231
}
179232

180233
private File getExe()
181234
{
182235
return SequencePipelineService.get().getExeForPackage("PBSVPATH", "pbsv");
183236
}
184237
}
238+
239+
@Override
240+
public File getScatterJobOutput(JobContext ctx) throws PipelineJobException
241+
{
242+
return ProcessVariantsHandler.getScatterOutputByCategory(ctx, OUTPUT_CATEGORY);
243+
}
244+
245+
@Override
246+
public void validateScatter(VariantProcessingStep.ScatterGatherMethod method, PipelineJob job) throws IllegalArgumentException
247+
{
248+
AbstractGenomicsDBImportHandler.validateNoSplitContigScatter(method, job);
249+
}
250+
251+
@Override
252+
public SequenceOutputFile createFinalSequenceOutput(PipelineJob job, File processed, List<SequenceOutputFile> inputFiles)
253+
{
254+
return ProcessVariantsHandler.createSequenceOutput(job, processed, inputFiles, OUTPUT_CATEGORY);
255+
}
256+
257+
private static VariantProcessingJob getVariantPipelineJob(PipelineJob job)
258+
{
259+
return job instanceof VariantProcessingJob ? (VariantProcessingJob)job : null;
260+
}
185261
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/util/SequenceUtil.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,11 @@ public static void sortROD(File input, Logger log, Integer startColumnIdx) throw
437437
}
438438

439439
public static File combineVcfs(List<File> files, ReferenceGenome genome, File outputGzip, Logger log, boolean multiThreaded, @Nullable Integer compressionLevel) throws PipelineJobException
440+
{
441+
return combineVcfs(files, genome, outputGzip, log, multiThreaded, compressionLevel, true);
442+
}
443+
444+
public static File combineVcfs(List<File> files, ReferenceGenome genome, File outputGzip, Logger log, boolean multiThreaded, @Nullable Integer compressionLevel, boolean showTotals) throws PipelineJobException
440445
{
441446
log.info("combining VCFs: ");
442447

@@ -505,8 +510,11 @@ else if (!samples.equals(header.getGenotypeSamples()))
505510

506511
bashTmp.delete();
507512

508-
log.info("total variants: " + SequenceAnalysisService.get().getVCFLineCount(outputGzip, log, false));
509-
log.info("passing variants: " + SequenceAnalysisService.get().getVCFLineCount(outputGzip, log, true));
513+
if (showTotals)
514+
{
515+
log.info("total variants: " + SequenceAnalysisService.get().getVCFLineCount(outputGzip, log, false));
516+
log.info("passing variants: " + SequenceAnalysisService.get().getVCFLineCount(outputGzip, log, true));
517+
}
510518

511519
headerFile.delete();
512520
File headerIdx = new File(headerFile.getPath() + ".idx");

0 commit comments

Comments
 (0)