Skip to content

Commit 70fb972

Browse files
Merge remote-tracking branch 'origin/develop' into fb_api_reports_report_r_refactor
2 parents ef22aa3 + 341ad5d commit 70fb972

File tree

53 files changed

+949
-194
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+949
-194
lines changed

SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/SequenceAnalysisService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ static public void setInstance(SequenceAnalysisService instance)
9494

9595
abstract public File combineVcfs(List<File> files, File outputGz, ReferenceGenome genome, Logger log, boolean multiThreaded, @Nullable Integer compressionLevel) throws PipelineJobException;
9696

97+
abstract public File combineVcfs(List<File> files, File outputGz, ReferenceGenome genome, Logger log, boolean multiThreaded, @Nullable Integer compressionLevel, boolean sortAfterMerge) throws PipelineJobException;
98+
9799
abstract public String getScriptPath(String moduleName, String path) throws PipelineJobException;
98100

99101
abstract public void sortGxf(Logger log, File input, @Nullable File output) throws PipelineJobException;

SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/VariantProcessingStep.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,16 @@ default void validateScatter(ScatterGatherMethod method, PipelineJob job) throws
6262
{
6363

6464
}
65+
66+
default void performAdditionalMergeTasks(SequenceOutputHandler.JobContext ctx, PipelineJob job, TaskFileManager manager, ReferenceGenome genome, List<File> orderedScatterOutputs) throws PipelineJobException
67+
{
68+
69+
}
70+
71+
default boolean doSortAfterMerge()
72+
{
73+
return false;
74+
}
6575
}
6676

6777
public static interface MayRequirePrepareTask

SequenceAnalysis/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,10 @@ dependencies {
157157
BuildUtils.addLabKeyDependency(project: project, config: "modules", depProjectPath: ":server:modules:LabDevKitModules:LDK", depProjectConfig: "published", depExtension: "module")
158158
}
159159

160+
160161
if (project.findProject(BuildUtils.getTestProjectPath(project.gradle)) != null && project.hasProperty("teamcity"))
161162
{
163+
project.evaluationDependsOn(BuildUtils.getTestProjectPath(project.gradle))
162164
def testProject = project.findProject(BuildUtils.getTestProjectPath(project.gradle))
163165
def createPipelineConfigTask = project.tasks.register("createPipelineConfig", Copy) {
164166
Copy task ->

SequenceAnalysis/src/org/labkey/sequenceanalysis/SequenceAnalysisModule.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ public static void registerPipelineSteps()
303303
SequencePipelineService.get().registerPipelineStep(new VariantsToTableStep.Provider());
304304
SequencePipelineService.get().registerPipelineStep(new VariantQCStep.Provider());
305305
SequencePipelineService.get().registerPipelineStep(new PlinkPcaStep.Provider());
306+
SequencePipelineService.get().registerPipelineStep(new KingInferenceStep.Provider());
306307
SequencePipelineService.get().registerPipelineStep(new MendelianViolationReportStep.Provider());
307308
SequencePipelineService.get().registerPipelineStep(new SummarizeGenotypeQualityStep.Provider());
308309

SequenceAnalysis/src/org/labkey/sequenceanalysis/SequenceAnalysisServiceImpl.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
package org.labkey.sequenceanalysis;
22

33
import htsjdk.samtools.util.FileExtensions;
4-
import htsjdk.tribble.Tribble;
54
import htsjdk.tribble.index.Index;
65
import htsjdk.tribble.index.IndexFactory;
76
import htsjdk.variant.vcf.VCFCodec;
87
import org.apache.commons.lang3.StringUtils;
98
import org.apache.commons.lang3.SystemUtils;
109
import org.apache.logging.log4j.LogManager;
1110
import org.apache.logging.log4j.Logger;
12-
import org.jetbrains.annotations.NotNull;
1311
import org.jetbrains.annotations.Nullable;
1412
import org.labkey.api.data.CompareType;
1513
import org.labkey.api.data.Container;
@@ -46,7 +44,6 @@
4644
import org.labkey.sequenceanalysis.pipeline.ReferenceGenomeImpl;
4745
import org.labkey.sequenceanalysis.pipeline.ReferenceLibraryPipelineJob;
4846
import org.labkey.sequenceanalysis.pipeline.SequenceTaskHelper;
49-
import org.labkey.sequenceanalysis.run.util.BgzipRunner;
5047
import org.labkey.sequenceanalysis.run.util.FastaIndexer;
5148
import org.labkey.sequenceanalysis.run.util.GxfSorter;
5249
import org.labkey.sequenceanalysis.run.util.IndexFeatureFileWrapper;
@@ -60,7 +57,6 @@
6057
import java.sql.ResultSet;
6158
import java.sql.SQLException;
6259
import java.util.ArrayList;
63-
import java.util.Arrays;
6460
import java.util.Collection;
6561
import java.util.Collections;
6662
import java.util.HashMap;
@@ -69,7 +65,6 @@
6965
import java.util.Map;
7066
import java.util.Set;
7167
import java.util.function.Function;
72-
import java.util.function.Predicate;
7368

7469
/**
7570
* User: bimber
@@ -258,16 +253,10 @@ public File ensureVcfIndex(File vcf, Logger log, boolean forceRecreate) throws I
258253
try
259254
{
260255
FileType gz = new FileType(".gz");
261-
File expected = new File(vcf.getPath() + FileExtensions.TRIBBLE_INDEX);
262-
File tbi = new File(vcf.getPath() + ".tbi");
263-
264-
if (!forceRecreate && expected.exists())
265-
{
266-
return expected;
267-
}
268-
else if (!forceRecreate && tbi.exists())
256+
File expectedIdx = gz.isType(vcf) ? new File(vcf.getPath() + ".tbi") : new File(vcf.getPath() + FileExtensions.TRIBBLE_INDEX);
257+
if (!forceRecreate && expectedIdx.exists())
269258
{
270-
return tbi;
259+
return expectedIdx;
271260
}
272261
else
273262
{
@@ -277,15 +266,23 @@ else if (!forceRecreate && tbi.exists())
277266
{
278267
TabixRunner r = new TabixRunner(log);
279268
r.execute(vcf);
269+
if (!expectedIdx.exists())
270+
{
271+
throw new PipelineJobException("Expected index was not created: " + expectedIdx.getPath());
272+
}
280273

281-
return tbi;
274+
return expectedIdx;
282275
}
283276
else
284277
{
285278
Index idx = IndexFactory.createDynamicIndex(vcf, new VCFCodec());
286279
idx.writeBasedOnFeatureFile(vcf);
280+
if (!expectedIdx.exists())
281+
{
282+
throw new PipelineJobException("Expected index was not created: " + expectedIdx.getPath());
283+
}
287284

288-
return expected;
285+
return expectedIdx;
289286
}
290287
}
291288
}
@@ -476,7 +473,13 @@ public String createReferenceLibrary(List<Integer> sequenceIds, Container c, Use
476473
@Override
477474
public File combineVcfs(List<File> files, File outputGz, ReferenceGenome genome, Logger log, boolean multiThreaded, @Nullable Integer compressionLevel) throws PipelineJobException
478475
{
479-
return SequenceUtil.combineVcfs(files, genome, outputGz, log, multiThreaded, compressionLevel);
476+
return combineVcfs(files, outputGz, genome, log, multiThreaded, compressionLevel, false);
477+
}
478+
479+
@Override
480+
public File combineVcfs(List<File> files, File outputGz, ReferenceGenome genome, Logger log, boolean multiThreaded, @Nullable Integer compressionLevel, boolean sortAfterMerge) throws PipelineJobException
481+
{
482+
return SequenceUtil.combineVcfs(files, genome, outputGz, log, multiThreaded, compressionLevel, sortAfterMerge);
480483
}
481484

482485
@Override

SequenceAnalysis/src/org/labkey/sequenceanalysis/analysis/GenotypeGVCFHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ else if (genomeIds.isEmpty())
266266
}
267267

268268
//run post processing, if needed
269-
File processed = ProcessVariantsHandler.processVCF(outputVcf, genomeId, ctx, resumer);
269+
File processed = ProcessVariantsHandler.processVCF(outputVcf, genomeId, ctx, resumer, false);
270270
if (processed == null)
271271
{
272272
ctx.getLogger().debug("adding GenotypeGVCFs output because no processing was selected");

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/ProcessVariantsHandler.java

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.labkey.api.sequenceanalysis.pipeline.SequenceAnalysisJobSupport;
3636
import org.labkey.api.sequenceanalysis.pipeline.SequenceOutputHandler;
3737
import org.labkey.api.sequenceanalysis.pipeline.SequencePipelineService;
38+
import org.labkey.api.sequenceanalysis.pipeline.TaskFileManager;
3839
import org.labkey.api.sequenceanalysis.pipeline.ToolParameterDescriptor;
3940
import org.labkey.api.sequenceanalysis.pipeline.VariantProcessingStep;
4041
import org.labkey.api.sequenceanalysis.run.SimpleScriptWrapper;
@@ -46,6 +47,7 @@
4647
import org.labkey.sequenceanalysis.SequenceAnalysisModule;
4748
import org.labkey.sequenceanalysis.run.util.AbstractGenomicsDBImportHandler;
4849
import org.labkey.sequenceanalysis.run.util.MergeVcfsAndGenotypesWrapper;
50+
import org.labkey.sequenceanalysis.run.variant.OutputVariantsStartingInIntervalsStep;
4951
import org.labkey.sequenceanalysis.util.SequenceUtil;
5052

5153
import java.io.File;
@@ -359,7 +361,7 @@ public static List<Interval> getIntervals(JobContext ctx)
359361
return null;
360362
}
361363

362-
public static File processVCF(File input, Integer libraryId, JobContext ctx, Resumer resumer) throws PipelineJobException
364+
public static File processVCF(File input, Integer libraryId, JobContext ctx, Resumer resumer, boolean subsetToIntervals) throws PipelineJobException
363365
{
364366
try
365367
{
@@ -382,10 +384,37 @@ public static File processVCF(File input, Integer libraryId, JobContext ctx, Res
382384
return null;
383385
}
384386

387+
boolean useScatterGather = getVariantPipelineJob(ctx.getJob()) != null && getVariantPipelineJob(ctx.getJob()).isScatterJob();
388+
if (useScatterGather && subsetToIntervals)
389+
{
390+
if (getIntervals(ctx) == null)
391+
{
392+
throw new PipelineJobException("Did not expect intervals to be null on a scatter/gather job");
393+
}
394+
395+
ctx.getLogger().info("Subsetting input VCF to job intervals");
396+
ctx.getJob().setStatus(PipelineJob.TaskStatus.running, "Subsetting input VCF to job intervals");
397+
398+
File outputFile = new File(ctx.getOutputDir(), SequenceAnalysisService.get().getUnzippedBaseName(currentVCF.getName()) + ".subset.vcf.gz");
399+
File outputFileIdx = new File(outputFile.getPath() + ".tbi");
400+
if (outputFileIdx.exists())
401+
{
402+
ctx.getLogger().debug("Index exists, will not re-subset VCF");
403+
}
404+
else
405+
{
406+
OutputVariantsStartingInIntervalsStep.Wrapper wrapper = new OutputVariantsStartingInIntervalsStep.Wrapper(ctx.getLogger());
407+
wrapper.execute(input, outputFile, getIntervals(ctx));
408+
}
409+
410+
currentVCF = outputFile;
411+
resumer.getFileManager().addIntermediateFile(currentVCF);
412+
resumer.getFileManager().addIntermediateFile(outputFileIdx);
413+
}
414+
385415
for (PipelineStepCtx<VariantProcessingStep> stepCtx : providers)
386416
{
387417
ctx.getLogger().info("Starting to run: " + stepCtx.getProvider().getLabel());
388-
389418
ctx.getJob().setStatus(PipelineJob.TaskStatus.running, "Running: " + stepCtx.getProvider().getLabel());
390419
stepIdx++;
391420

@@ -622,7 +651,7 @@ public void processFilesRemote(List<SequenceOutputFile> inputFiles, JobContext c
622651

623652
private void processFile(File input, Integer libraryId, Integer readsetId, JobContext ctx) throws PipelineJobException
624653
{
625-
File processed = processVCF(input, libraryId, ctx, _resumer);
654+
File processed = processVCF(input, libraryId, ctx, _resumer, true);
626655
if (processed != null && processed.exists())
627656
{
628657
ctx.getLogger().debug("adding sequence output: " + processed.getPath());
@@ -823,4 +852,18 @@ else if (AbstractGenomicsDBImportHandler.TILE_DB_FILETYPE.isType(input))
823852
throw new PipelineJobException("Unknown file type: " + input.getPath());
824853
}
825854
}
855+
856+
@Override
857+
public void performAdditionalMergeTasks(JobContext ctx, PipelineJob job, TaskFileManager manager, ReferenceGenome genome, List<File> orderedScatterOutputs) throws PipelineJobException
858+
{
859+
List<PipelineStepCtx<VariantProcessingStep>> providers = SequencePipelineService.get().getSteps(job, VariantProcessingStep.class);
860+
for (PipelineStepCtx<VariantProcessingStep> stepCtx : providers)
861+
{
862+
VariantProcessingStep step = stepCtx.getProvider().create(ctx);
863+
if (step instanceof VariantProcessingStep.SupportsScatterGather)
864+
{
865+
((VariantProcessingStep.SupportsScatterGather)step).performAdditionalMergeTasks(ctx, job, manager, genome, orderedScatterOutputs);
866+
}
867+
}
868+
}
826869
}

0 commit comments

Comments
 (0)