Skip to content

Commit afe8d08

Browse files
authored
Merge pull request #181 from LabKey/fb_merge_22.7_to_develop
Merge discvr-22.7 to develop
2 parents dee6b87 + bbc38e9 commit afe8d08

32 files changed

+592
-134
lines changed

SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/SequenceAnalysisService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ static public void setInstance(SequenceAnalysisService instance)
9494

9595
abstract public File combineVcfs(List<File> files, File outputGz, ReferenceGenome genome, Logger log, boolean multiThreaded, @Nullable Integer compressionLevel) throws PipelineJobException;
9696

97+
abstract public File combineVcfs(List<File> files, File outputGz, ReferenceGenome genome, Logger log, boolean multiThreaded, @Nullable Integer compressionLevel, boolean sortAfterMerge) throws PipelineJobException;
98+
9799
abstract public String getScriptPath(String moduleName, String path) throws PipelineJobException;
98100

99101
abstract public void sortGxf(Logger log, File input, @Nullable File output) throws PipelineJobException;

SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/VariantProcessingStep.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,16 @@ default void validateScatter(ScatterGatherMethod method, PipelineJob job) throws
6262
{
6363

6464
}
65+
66+
default void performAdditionalMergeTasks(SequenceOutputHandler.JobContext ctx, PipelineJob job, TaskFileManager manager, ReferenceGenome genome, List<File> orderedScatterOutputs) throws PipelineJobException
67+
{
68+
69+
}
70+
71+
default boolean doSortAfterMerge()
72+
{
73+
return false;
74+
}
6575
}
6676

6777
public static interface MayRequirePrepareTask

SequenceAnalysis/src/org/labkey/sequenceanalysis/SequenceAnalysisServiceImpl.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
package org.labkey.sequenceanalysis;
22

33
import htsjdk.samtools.util.FileExtensions;
4-
import htsjdk.tribble.Tribble;
54
import htsjdk.tribble.index.Index;
65
import htsjdk.tribble.index.IndexFactory;
76
import htsjdk.variant.vcf.VCFCodec;
87
import org.apache.commons.lang3.StringUtils;
98
import org.apache.commons.lang3.SystemUtils;
109
import org.apache.logging.log4j.LogManager;
1110
import org.apache.logging.log4j.Logger;
12-
import org.jetbrains.annotations.NotNull;
1311
import org.jetbrains.annotations.Nullable;
1412
import org.labkey.api.data.CompareType;
1513
import org.labkey.api.data.Container;
@@ -46,7 +44,6 @@
4644
import org.labkey.sequenceanalysis.pipeline.ReferenceGenomeImpl;
4745
import org.labkey.sequenceanalysis.pipeline.ReferenceLibraryPipelineJob;
4846
import org.labkey.sequenceanalysis.pipeline.SequenceTaskHelper;
49-
import org.labkey.sequenceanalysis.run.util.BgzipRunner;
5047
import org.labkey.sequenceanalysis.run.util.FastaIndexer;
5148
import org.labkey.sequenceanalysis.run.util.GxfSorter;
5249
import org.labkey.sequenceanalysis.run.util.IndexFeatureFileWrapper;
@@ -60,7 +57,6 @@
6057
import java.sql.ResultSet;
6158
import java.sql.SQLException;
6259
import java.util.ArrayList;
63-
import java.util.Arrays;
6460
import java.util.Collection;
6561
import java.util.Collections;
6662
import java.util.HashMap;
@@ -69,7 +65,6 @@
6965
import java.util.Map;
7066
import java.util.Set;
7167
import java.util.function.Function;
72-
import java.util.function.Predicate;
7368

7469
/**
7570
* User: bimber
@@ -476,7 +471,13 @@ public String createReferenceLibrary(List<Integer> sequenceIds, Container c, Use
476471
@Override
477472
public File combineVcfs(List<File> files, File outputGz, ReferenceGenome genome, Logger log, boolean multiThreaded, @Nullable Integer compressionLevel) throws PipelineJobException
478473
{
479-
return SequenceUtil.combineVcfs(files, genome, outputGz, log, multiThreaded, compressionLevel);
474+
return combineVcfs(files, outputGz, genome, log, multiThreaded, compressionLevel, false);
475+
}
476+
477+
@Override
478+
public File combineVcfs(List<File> files, File outputGz, ReferenceGenome genome, Logger log, boolean multiThreaded, @Nullable Integer compressionLevel, boolean sortAfterMerge) throws PipelineJobException
479+
{
480+
return SequenceUtil.combineVcfs(files, genome, outputGz, log, multiThreaded, compressionLevel, sortAfterMerge);
480481
}
481482

482483
@Override

SequenceAnalysis/src/org/labkey/sequenceanalysis/analysis/GenotypeGVCFHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ else if (genomeIds.isEmpty())
266266
}
267267

268268
//run post processing, if needed
269-
File processed = ProcessVariantsHandler.processVCF(outputVcf, genomeId, ctx, resumer);
269+
File processed = ProcessVariantsHandler.processVCF(outputVcf, genomeId, ctx, resumer, false);
270270
if (processed == null)
271271
{
272272
ctx.getLogger().debug("adding GenotypeGVCFs output because no processing was selected");

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/ProcessVariantsHandler.java

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.labkey.api.sequenceanalysis.pipeline.SequenceAnalysisJobSupport;
3636
import org.labkey.api.sequenceanalysis.pipeline.SequenceOutputHandler;
3737
import org.labkey.api.sequenceanalysis.pipeline.SequencePipelineService;
38+
import org.labkey.api.sequenceanalysis.pipeline.TaskFileManager;
3839
import org.labkey.api.sequenceanalysis.pipeline.ToolParameterDescriptor;
3940
import org.labkey.api.sequenceanalysis.pipeline.VariantProcessingStep;
4041
import org.labkey.api.sequenceanalysis.run.SimpleScriptWrapper;
@@ -46,6 +47,7 @@
4647
import org.labkey.sequenceanalysis.SequenceAnalysisModule;
4748
import org.labkey.sequenceanalysis.run.util.AbstractGenomicsDBImportHandler;
4849
import org.labkey.sequenceanalysis.run.util.MergeVcfsAndGenotypesWrapper;
50+
import org.labkey.sequenceanalysis.run.variant.OutputVariantsStartingInIntervalsStep;
4951
import org.labkey.sequenceanalysis.util.SequenceUtil;
5052

5153
import java.io.File;
@@ -359,7 +361,7 @@ public static List<Interval> getIntervals(JobContext ctx)
359361
return null;
360362
}
361363

362-
public static File processVCF(File input, Integer libraryId, JobContext ctx, Resumer resumer) throws PipelineJobException
364+
public static File processVCF(File input, Integer libraryId, JobContext ctx, Resumer resumer, boolean subsetToIntervals) throws PipelineJobException
363365
{
364366
try
365367
{
@@ -382,10 +384,37 @@ public static File processVCF(File input, Integer libraryId, JobContext ctx, Res
382384
return null;
383385
}
384386

387+
boolean useScatterGather = getVariantPipelineJob(ctx.getJob()) != null && getVariantPipelineJob(ctx.getJob()).isScatterJob();
388+
if (useScatterGather && subsetToIntervals)
389+
{
390+
if (getIntervals(ctx) == null)
391+
{
392+
throw new PipelineJobException("Did not expect intervals to be null on a scatter/gather job");
393+
}
394+
395+
ctx.getLogger().info("Subsetting input VCF to job intervals");
396+
ctx.getJob().setStatus(PipelineJob.TaskStatus.running, "Subsetting input VCF to job intervals");
397+
398+
File outputFile = new File(ctx.getOutputDir(), SequenceAnalysisService.get().getUnzippedBaseName(currentVCF.getName()) + ".subset.vcf.gz");
399+
File outputFileIdx = new File(outputFile.getPath() + ".tbi");
400+
if (outputFileIdx.exists())
401+
{
402+
ctx.getLogger().debug("Index exists, will not re-subset VCF");
403+
}
404+
else
405+
{
406+
OutputVariantsStartingInIntervalsStep.Wrapper wrapper = new OutputVariantsStartingInIntervalsStep.Wrapper(ctx.getLogger());
407+
wrapper.execute(input, outputFile, getIntervals(ctx));
408+
}
409+
410+
currentVCF = outputFile;
411+
resumer.getFileManager().addIntermediateFile(currentVCF);
412+
resumer.getFileManager().addIntermediateFile(outputFileIdx);
413+
}
414+
385415
for (PipelineStepCtx<VariantProcessingStep> stepCtx : providers)
386416
{
387417
ctx.getLogger().info("Starting to run: " + stepCtx.getProvider().getLabel());
388-
389418
ctx.getJob().setStatus(PipelineJob.TaskStatus.running, "Running: " + stepCtx.getProvider().getLabel());
390419
stepIdx++;
391420

@@ -622,7 +651,7 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
622651

623652
private void processFile(File input, Integer libraryId, Integer readsetId, JobContext ctx) throws PipelineJobException
624653
{
625-
File processed = processVCF(input, libraryId, ctx, _resumer);
654+
File processed = processVCF(input, libraryId, ctx, _resumer, true);
626655
if (processed != null && processed.exists())
627656
{
628657
ctx.getLogger().debug("adding sequence output: " + processed.getPath());
@@ -823,4 +852,18 @@ else if (AbstractGenomicsDBImportHandler.TILE_DB_FILETYPE.isType(input))
823852
throw new PipelineJobException("Unknown file type: " + input.getPath());
824853
}
825854
}
855+
856+
@Override
857+
public void performAdditionalMergeTasks(JobContext ctx, PipelineJob job, TaskFileManager manager, ReferenceGenome genome, List<File> orderedScatterOutputs) throws PipelineJobException
858+
{
859+
List<PipelineStepCtx<VariantProcessingStep>> providers = SequencePipelineService.get().getSteps(job, VariantProcessingStep.class);
860+
for (PipelineStepCtx<VariantProcessingStep> stepCtx : providers)
861+
{
862+
VariantProcessingStep step = stepCtx.getProvider().create(ctx);
863+
if (step instanceof VariantProcessingStep.SupportsScatterGather)
864+
{
865+
((VariantProcessingStep.SupportsScatterGather)step).performAdditionalMergeTasks(ctx, job, manager, genome, orderedScatterOutputs);
866+
}
867+
}
868+
}
826869
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/VariantProcessingRemoteMergeTask.java

Lines changed: 21 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import htsjdk.samtools.util.Interval;
44
import org.apache.commons.lang3.StringUtils;
5-
import org.apache.logging.log4j.Logger;
65
import org.jetbrains.annotations.NotNull;
76
import org.labkey.api.pipeline.AbstractTaskFactory;
87
import org.labkey.api.pipeline.AbstractTaskFactorySettings;
@@ -15,13 +14,12 @@
1514
import org.labkey.api.sequenceanalysis.SequenceOutputFile;
1615
import org.labkey.api.sequenceanalysis.pipeline.ReferenceGenome;
1716
import org.labkey.api.sequenceanalysis.pipeline.SequenceOutputHandler;
18-
import org.labkey.api.sequenceanalysis.run.AbstractDiscvrSeqWrapper;
17+
import org.labkey.api.sequenceanalysis.pipeline.VariantProcessingStep;
1918
import org.labkey.api.util.FileType;
20-
import org.labkey.api.writer.PrintWriters;
19+
import org.labkey.sequenceanalysis.run.variant.OutputVariantsStartingInIntervalsStep;
2120

2221
import java.io.File;
2322
import java.io.IOException;
24-
import java.io.PrintWriter;
2523
import java.util.ArrayList;
2624
import java.util.Collections;
2725
import java.util.HashSet;
@@ -112,6 +110,7 @@ private VariantProcessingJob getPipelineJob()
112110
SequenceTaskHelper.logModuleVersions(getJob().getLogger());
113111
RecordedAction action = new RecordedAction(ACTION_NAME);
114112
TaskFileManagerImpl manager = new TaskFileManagerImpl(getPipelineJob(), _wd.getDir(), _wd);
113+
JobContextImpl ctx = new JobContextImpl(getPipelineJob(), getPipelineJob().getSequenceSupport(), getPipelineJob().getParameterJson(), _wd.getDir(), new TaskFileManagerImpl(getPipelineJob(), _wd.getDir(), _wd), _wd);
115114

116115
File finalOut;
117116
SequenceOutputHandler<SequenceOutputHandler.SequenceOutputProcessor> handler = getPipelineJob().getHandler();
@@ -121,7 +120,7 @@ private VariantProcessingJob getPipelineJob()
121120
}
122121
else
123122
{
124-
finalOut = runDefaultVariantMerge(manager, action, handler);
123+
finalOut = runDefaultVariantMerge(ctx, manager, action, handler);
125124
}
126125

127126
Map<String, File> scatterOutputs = getPipelineJob().getScatterJobOutputs();
@@ -153,7 +152,7 @@ private VariantProcessingJob getPipelineJob()
153152
return new RecordedActionSet(action);
154153
}
155154

156-
private File runDefaultVariantMerge(TaskFileManagerImpl manager, RecordedAction action, SequenceOutputHandler<SequenceOutputHandler.SequenceOutputProcessor> handler) throws PipelineJobException
155+
private File runDefaultVariantMerge(JobContextImpl ctx, TaskFileManagerImpl manager, RecordedAction action, SequenceOutputHandler<SequenceOutputHandler.SequenceOutputProcessor> handler) throws PipelineJobException
157156
{
158157
Map<String, List<Interval>> jobToIntervalMap = getPipelineJob().getJobToIntervalMap();
159158
getJob().setStatus(PipelineJob.TaskStatus.running, "Combining Per-Contig VCFs: " + jobToIntervalMap.size());
@@ -180,12 +179,9 @@ private File runDefaultVariantMerge(TaskFileManagerImpl manager, RecordedAction
180179
if (ensureOutputsWithinIntervals)
181180
{
182181
getJob().getLogger().debug("Ensuring ensure scatter outputs respect intervals");
183-
List<Interval> expectedIntervals = jobToIntervalMap.get(name);
184182

185-
File intervalFile = new File(vcf.getParentFile(), "scatterIntervals.list");
186183
File subsetVcf = new File(vcf.getParentFile(), SequenceAnalysisService.get().getUnzippedBaseName(vcf.getName()) + ".subset.vcf.gz");
187184
File subsetVcfIdx = new File(subsetVcf.getPath() + ".tbi");
188-
manager.addIntermediateFile(intervalFile);
189185
manager.addIntermediateFile(subsetVcf);
190186
manager.addIntermediateFile(subsetVcfIdx);
191187

@@ -195,19 +191,8 @@ private File runDefaultVariantMerge(TaskFileManagerImpl manager, RecordedAction
195191
}
196192
else
197193
{
198-
try (PrintWriter writer = PrintWriters.getPrintWriter(intervalFile))
199-
{
200-
expectedIntervals.forEach(interval -> {
201-
writer.println(interval.getContig() + ":" + interval.getStart() + "-" + interval.getEnd());
202-
});
203-
}
204-
catch (IOException e)
205-
{
206-
throw new PipelineJobException(e);
207-
}
208-
209-
Wrapper wrapper = new Wrapper(getJob().getLogger());
210-
wrapper.execute(vcf, subsetVcf, intervalFile);
194+
OutputVariantsStartingInIntervalsStep.Wrapper wrapper = new OutputVariantsStartingInIntervalsStep.Wrapper(getJob().getLogger());
195+
wrapper.execute(vcf, subsetVcf, getPipelineJob().getIntervalsForTask());
211196
}
212197

213198
toConcat.add(subsetVcf);
@@ -222,6 +207,15 @@ private File runDefaultVariantMerge(TaskFileManagerImpl manager, RecordedAction
222207
manager.addIntermediateFile(new File(vcf.getPath() + ".tbi"));
223208
}
224209

210+
Set<Integer> genomeIds = new HashSet<>();
211+
getPipelineJob().getFiles().forEach(x -> genomeIds.add(x.getLibrary_id()));
212+
if (genomeIds.size() != 1)
213+
{
214+
throw new PipelineJobException("Expected a single genome, found: " + StringUtils.join(genomeIds, ", "));
215+
}
216+
217+
ReferenceGenome genome = getPipelineJob().getSequenceSupport().getCachedGenome(genomeIds.iterator().next());
218+
225219
String basename = SequenceAnalysisService.get().getUnzippedBaseName(toConcat.get(0).getName());
226220
File combined = new File(getPipelineJob().getAnalysisDirectory(), basename + ".vcf.gz");
227221
File combinedIdx = new File(combined.getPath() + ".tbi");
@@ -236,47 +230,16 @@ private File runDefaultVariantMerge(TaskFileManagerImpl manager, RecordedAction
236230
throw new PipelineJobException("Missing one of more VCFs: " + missing.stream().map(File::getPath).collect(Collectors.joining(",")));
237231
}
238232

239-
Set<Integer> genomeIds = new HashSet<>();
240-
getPipelineJob().getFiles().forEach(x -> genomeIds.add(x.getLibrary_id()));
241-
if (genomeIds.size() != 1)
242-
{
243-
throw new PipelineJobException("Expected a single genome, found: " + StringUtils.join(genomeIds, ", "));
244-
}
245-
246-
ReferenceGenome genome = getPipelineJob().getSequenceSupport().getCachedGenome(genomeIds.iterator().next());
247-
combined = SequenceAnalysisService.get().combineVcfs(toConcat, combined, genome, getJob().getLogger(), true, null);
233+
boolean sortAfterMerge = handler instanceof VariantProcessingStep.SupportsScatterGather && ((VariantProcessingStep.SupportsScatterGather)handler).doSortAfterMerge();
234+
combined = SequenceAnalysisService.get().combineVcfs(toConcat, combined, genome, getJob().getLogger(), true, null, sortAfterMerge);
248235
}
249236
manager.addOutput(action, "Merged VCF", combined);
250237

251-
return combined;
252-
}
253-
254-
public static class Wrapper extends AbstractDiscvrSeqWrapper
255-
{
256-
public Wrapper(Logger log)
238+
if (handler instanceof VariantProcessingStep.SupportsScatterGather)
257239
{
258-
super(log);
240+
((VariantProcessingStep.SupportsScatterGather) handler).performAdditionalMergeTasks(ctx, getPipelineJob(), manager, genome, toConcat);
259241
}
260242

261-
public void execute(File inputVcf, File outputVcf, File intervalFile) throws PipelineJobException
262-
{
263-
List<String> args = new ArrayList<>(getBaseArgs());
264-
args.add("OutputVariantsStartingInIntervals");
265-
266-
args.add("-V");
267-
args.add(inputVcf.getPath());
268-
269-
args.add("-O");
270-
args.add(outputVcf.getPath());
271-
272-
args.add("-L");
273-
args.add(intervalFile.getPath());
274-
275-
execute(args);
276-
if (!outputVcf.exists())
277-
{
278-
throw new PipelineJobException("Missing file: " + outputVcf.getPath());
279-
}
280-
}
243+
return combined;
281244
}
282245
}

0 commit comments

Comments
 (0)