Skip to content

Commit 1ce9653

Browse files
committed
Centralize scatter/gather code
1 parent 3cf9b63 commit 1ce9653

File tree

7 files changed

+88
-67
lines changed

7 files changed

+88
-67
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package org.labkey.sequenceanalysis;
2+
3+
import org.apache.commons.io.FileUtils;
4+
import org.labkey.api.pipeline.PipelineJobException;
5+
import org.labkey.api.sequenceanalysis.SequenceOutputFile;
6+
import org.labkey.api.sequenceanalysis.pipeline.SequenceOutputHandler;
7+
import org.labkey.sequenceanalysis.pipeline.VariantProcessingJob;
8+
import org.labkey.sequenceanalysis.run.util.GenotypeGVCFsWrapper;
9+
10+
import java.io.File;
11+
import java.io.IOException;
12+
import java.util.ArrayList;
13+
import java.util.Arrays;
14+
import java.util.List;
15+
16+
public class ScatterGatherUtils
17+
{
18+
public static void doCopyGvcfLocally(List<SequenceOutputFile> inputFiles, SequenceOutputHandler.JobContext ctx) throws PipelineJobException
19+
{
20+
List<File> inputVCFs = new ArrayList<>();
21+
inputFiles.forEach(f -> inputVCFs.add(f.getFile()));
22+
23+
ctx.getLogger().info("making local copies of gVCFs/GenomicsDB");
24+
GenotypeGVCFsWrapper.copyVcfsLocally(ctx, inputVCFs, new ArrayList<>(), false);
25+
}
26+
27+
public static File getLocalCopyDir(SequenceOutputHandler.JobContext ctx, boolean createIfDoesntExist)
28+
{
29+
if (ctx.getJob() instanceof VariantProcessingJob)
30+
{
31+
return ((VariantProcessingJob)ctx.getJob()).getLocationForCachedInputs(ctx.getWorkDir(), createIfDoesntExist);
32+
}
33+
34+
return ctx.getOutputDir();
35+
}
36+
37+
public static void possiblyCacheSupportFiles(SequenceOutputHandler.JobContext ctx) throws PipelineJobException
38+
{
39+
for (String param : Arrays.asList("exclude_intervals", "forceSitesFile"))
40+
{
41+
if (ctx.getParams().get("variantCalling.GenotypeGVCFs." + param) != null)
42+
{
43+
File inputFile = ctx.getSequenceSupport().getCachedData(ctx.getParams().getInt("variantCalling.GenotypeGVCFs." + param));
44+
if (!inputFile.exists())
45+
{
46+
throw new PipelineJobException("Unable to find file: " + inputFile.getPath());
47+
}
48+
49+
ctx.getLogger().debug("Making local copy of file: " + inputFile.getName());
50+
File localCopy = new File(ScatterGatherUtils.getLocalCopyDir(ctx, true), inputFile.getName());
51+
File doneFile = new File(localCopy.getPath() + ".copyDone");
52+
if (!doneFile.exists())
53+
{
54+
try
55+
{
56+
FileUtils.copyFile(inputFile, localCopy);
57+
FileUtils.touch(doneFile);
58+
}
59+
catch (IOException e)
60+
{
61+
throw new PipelineJobException(e);
62+
}
63+
}
64+
}
65+
}
66+
}
67+
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/analysis/GenotypeGVCFHandler.java

Lines changed: 4 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.labkey.api.util.FileType;
3535
import org.labkey.api.view.ActionURL;
3636
import org.labkey.api.writer.PrintWriters;
37+
import org.labkey.sequenceanalysis.ScatterGatherUtils;
3738
import org.labkey.sequenceanalysis.SequenceAnalysisModule;
3839
import org.labkey.sequenceanalysis.pipeline.JobContextImpl;
3940
import org.labkey.sequenceanalysis.pipeline.ProcessVariantsHandler;
@@ -783,63 +784,13 @@ public boolean isRequired(PipelineJob job)
783784
@Override
784785
public void doWork(List<SequenceOutputFile> inputFiles, JobContext ctx) throws PipelineJobException
785786
{
786-
doCopyGvcfLocally(inputFiles, ctx);
787-
possiblyCacheSupportFiles(ctx);
788-
}
789-
790-
public static void possiblyCacheSupportFiles(JobContext ctx) throws PipelineJobException
791-
{
792-
for (String param : Arrays.asList("exclude_intervals", "forceSitesFile"))
793-
{
794-
if (ctx.getParams().get("variantCalling.GenotypeGVCFs." + param) != null)
795-
{
796-
File inputFile = ctx.getSequenceSupport().getCachedData(ctx.getParams().getInt("variantCalling.GenotypeGVCFs." + param));
797-
if (!inputFile.exists())
798-
{
799-
throw new PipelineJobException("Unable to find file: " + inputFile.getPath());
800-
}
801-
802-
ctx.getLogger().debug("Making local copy of file: " + inputFile.getName());
803-
File localCopy = new File(getLocalCopyDir(ctx, true), inputFile.getName());
804-
File doneFile = new File(localCopy.getPath() + ".copyDone");
805-
if (!doneFile.exists())
806-
{
807-
try
808-
{
809-
FileUtils.copyFile(inputFile, localCopy);
810-
FileUtils.touch(doneFile);
811-
}
812-
catch (IOException e)
813-
{
814-
throw new PipelineJobException(e);
815-
}
816-
}
817-
}
818-
}
819-
}
820-
821-
public static void doCopyGvcfLocally(List<SequenceOutputFile> inputFiles, JobContext ctx) throws PipelineJobException
822-
{
823-
List<File> inputVCFs = new ArrayList<>();
824-
inputFiles.forEach(f -> inputVCFs.add(f.getFile()));
825-
826-
ctx.getLogger().info("making local copies of gVCFs/GenomicsDB");
827-
GenotypeGVCFsWrapper.copyVcfsLocally(ctx, inputVCFs, new ArrayList<>(), false);
828-
}
829-
830-
public static File getLocalCopyDir(JobContext ctx, boolean createIfDoesntExist)
831-
{
832-
if (ctx.getJob() instanceof VariantProcessingJob)
833-
{
834-
return ((VariantProcessingJob)ctx.getJob()).getLocationForCachedInputs(ctx.getWorkDir(), createIfDoesntExist);
835-
}
836-
837-
return ctx.getOutputDir();
787+
ScatterGatherUtils.doCopyGvcfLocally(inputFiles, ctx);
788+
ScatterGatherUtils.possiblyCacheSupportFiles(ctx);
838789
}
839790

840791
protected File getPossiblyLocalFile(JobContext ctx, File sourceFile)
841792
{
842-
File cacheDir = getLocalCopyDir(ctx, false);
793+
File cacheDir = ScatterGatherUtils.getLocalCopyDir(ctx, false);
843794
if (!cacheDir.exists())
844795
{
845796
return sourceFile;

SequenceAnalysis/src/org/labkey/sequenceanalysis/run/analysis/PbsvJointCallingHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import org.labkey.api.sequenceanalysis.pipeline.VariantProcessingStep;
2121
import org.labkey.api.sequenceanalysis.run.SimpleScriptWrapper;
2222
import org.labkey.api.util.FileType;
23+
import org.labkey.sequenceanalysis.ScatterGatherUtils;
2324
import org.labkey.sequenceanalysis.SequenceAnalysisModule;
24-
import org.labkey.sequenceanalysis.analysis.GenotypeGVCFHandler;
2525
import org.labkey.sequenceanalysis.pipeline.ProcessVariantsHandler;
2626
import org.labkey.sequenceanalysis.pipeline.VariantProcessingJob;
2727
import org.labkey.sequenceanalysis.run.util.AbstractGenomicsDBImportHandler;
@@ -282,6 +282,6 @@ private boolean doCopyLocal(JSONObject params)
282282
@Override
283283
public void doWork(List<SequenceOutputFile> inputFiles, JobContext ctx) throws PipelineJobException
284284
{
285-
GenotypeGVCFHandler.doCopyGvcfLocally(inputFiles, ctx);
285+
ScatterGatherUtils.doCopyGvcfLocally(inputFiles, ctx);
286286
}
287287
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/run/util/AbstractGenomicsDBImportHandler.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import org.labkey.api.sequenceanalysis.run.SimpleScriptWrapper;
3131
import org.labkey.api.util.FileType;
3232
import org.labkey.api.util.FileUtil;
33-
import org.labkey.sequenceanalysis.analysis.GenotypeGVCFHandler;
33+
import org.labkey.sequenceanalysis.ScatterGatherUtils;
3434
import org.labkey.sequenceanalysis.pipeline.ProcessVariantsHandler;
3535
import org.labkey.sequenceanalysis.pipeline.VariantProcessingJob;
3636
import org.labkey.sequenceanalysis.util.SequenceUtil;
@@ -46,7 +46,6 @@
4646
import java.util.Collection;
4747
import java.util.Collections;
4848
import java.util.Date;
49-
import java.util.EnumSet;
5049
import java.util.HashSet;
5150
import java.util.LinkedHashSet;
5251
import java.util.List;
@@ -956,6 +955,6 @@ public boolean isRequired(PipelineJob job)
956955
@Override
957956
public void doWork(List<SequenceOutputFile> inputFiles, JobContext ctx) throws PipelineJobException
958957
{
959-
GenotypeGVCFHandler.doCopyGvcfLocally(inputFiles, ctx);
958+
ScatterGatherUtils.doCopyGvcfLocally(inputFiles, ctx);
960959
}
961960
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/run/util/CombineGVCFsHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
import org.labkey.api.sequenceanalysis.pipeline.ToolParameterDescriptor;
1717
import org.labkey.api.sequenceanalysis.pipeline.VariantProcessingStep;
1818
import org.labkey.api.util.FileType;
19+
import org.labkey.sequenceanalysis.ScatterGatherUtils;
1920
import org.labkey.sequenceanalysis.SequenceAnalysisModule;
20-
import org.labkey.sequenceanalysis.analysis.GenotypeGVCFHandler;
2121
import org.labkey.sequenceanalysis.pipeline.ProcessVariantsHandler;
2222
import org.labkey.sequenceanalysis.pipeline.VariantProcessingJob;
2323

@@ -263,6 +263,6 @@ public boolean isRequired(PipelineJob job)
263263
@Override
264264
public void doWork(List<SequenceOutputFile> inputFiles, JobContext ctx) throws PipelineJobException
265265
{
266-
GenotypeGVCFHandler.doCopyGvcfLocally(inputFiles, ctx);
266+
ScatterGatherUtils.doCopyGvcfLocally(inputFiles, ctx);
267267
}
268268
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/run/util/GenotypeGVCFsWrapper.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@
22

33
import org.apache.commons.io.FileUtils;
44
import org.apache.commons.lang3.StringUtils;
5-
import org.apache.logging.log4j.Logger;
6-
import org.apache.logging.log4j.LogManager;
75
import org.apache.commons.lang3.SystemUtils;
6+
import org.apache.logging.log4j.Logger;
87
import org.jetbrains.annotations.Nullable;
98
import org.labkey.api.collections.CaseInsensitiveHashSet;
109
import org.labkey.api.pipeline.PipelineJobException;
@@ -14,8 +13,7 @@
1413
import org.labkey.api.sequenceanalysis.run.AbstractGatk4Wrapper;
1514
import org.labkey.api.sequenceanalysis.run.SimpleScriptWrapper;
1615
import org.labkey.api.util.FileType;
17-
import org.labkey.sequenceanalysis.analysis.GenotypeGVCFHandler;
18-
import org.springframework.util.FileSystemUtils;
16+
import org.labkey.sequenceanalysis.ScatterGatherUtils;
1917

2018
import java.io.File;
2119
import java.io.IOException;
@@ -126,7 +124,7 @@ public static List<File> copyVcfsLocally(SequenceOutputHandler.JobContext ctx, C
126124
inputToDest.put(x, fn);
127125
});
128126

129-
File localWorkDir = GenotypeGVCFHandler.getLocalCopyDir(ctx, true);
127+
File localWorkDir = ScatterGatherUtils.getLocalCopyDir(ctx, true);
130128

131129
// If the cache directory is under the current working dir, mark to delete when done.
132130
// If localWorkDir is null, this indicates we're using /tmp, so also delete.

SequenceAnalysis/src/org/labkey/sequenceanalysis/run/variant/PlinkPcaStep.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ private void runBatch(File inputVCF, File outputDirectory, VariantProcessingStep
177177
File outPrefix;
178178
if (setName != null)
179179
{
180-
outPrefix = new File(outputDirectory, "plink." + FileUtil.makeLegalName(setName));
180+
outPrefix = new File(outputDirectory, "plink." + FileUtil.makeLegalName(setName).replaceAll(" ", "_"));
181181
}
182182
else
183183
{
@@ -187,6 +187,12 @@ private void runBatch(File inputVCF, File outputDirectory, VariantProcessingStep
187187
args.add("--out");
188188
args.add(outPrefix.getPath());
189189

190+
if (SequencePipelineService.get().getMaxThreads(getPipelineCtx().getLogger()) != null)
191+
{
192+
args.add("--threads");
193+
args.add(SequencePipelineService.get().getMaxThreads(getPipelineCtx().getLogger()).toString());
194+
}
195+
190196
args.addAll(getClientCommandArgs());
191197

192198
getWrapper().execute(args);
@@ -297,7 +303,7 @@ public PlinkWrapper(@Nullable Logger logger)
297303

298304
public File getExe()
299305
{
300-
return SequencePipelineService.get().getExeForPackage("PLINK2PATH", "plink");
306+
return SequencePipelineService.get().getExeForPackage("PLINK2PATH", "plink2");
301307
}
302308
}
303309
}

0 commit comments

Comments
 (0)