Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions src/nomadic/map/mappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,19 @@ def map_from_fastqs(self, fastq_dir=None, fastq_paths=None):
raise ValueError("Must set either `fastq_dir` or `fastq_paths`.")

@abstractmethod
def _define_mapping_command(self, output_bam, flags):
def _define_mapping_command(self, output_bam, *, threads: int, flags):
"""
Define the command for the mapping algorithm
"""
pass

def run(self, output_bam, verbose=False):
def run(self, output_bam: str, threads: int, verbose=False):
"""
Run the mapping algorithm inputs

"""
# Define the mapping command
self._define_mapping_command(output_bam)
self._define_mapping_command(output_bam, threads=threads)
if self.map_cmd is None:
raise ValueError("Must define mapping command before running algorithm.")

Expand All @@ -91,12 +91,12 @@ class Minimap2(MappingAlgorithm):

"""

def _define_mapping_command(self, output_bam, flags="--eqx --MD"):
def _define_mapping_command(self, output_bam, *, threads: int, flags="--eqx --MD"):
"""
Run minimap2, compress result to .bam file, and sort

"""
self.map_cmd = "minimap2"
self.map_cmd = f"minimap2 -t {threads}"
self.map_cmd += f" -ax map-ont {flags} {shlex.quote(self.reference.fasta_path)} {encode_input_files(self.input_fastqs)} |"
self.map_cmd += " samtools view -S -b - |"
self.map_cmd += f" samtools sort -o {shlex.quote(output_bam)}"
Expand All @@ -119,12 +119,12 @@ def create_reference_index(self):
index_cmd = f"bwa index {shlex.quote(self.reference.fasta_path)}"
subprocess.run(index_cmd, shell=True, check=True)

def _define_mapping_command(self, output_bam, flags=""):
def _define_mapping_command(self, output_bam, *, threads: int, flags=""):
"""
Run bwa, compress result to .bam file, and sort

"""
self.map_cmd = "bwa mem"
self.map_cmd = f"bwa mem -t {threads}"
self.map_cmd += " -R '@RG\\tID:misc\\tSM:pool'" # ID and SM tags needed for gatk HaplotypeCaller
self.map_cmd += f" {flags} {shlex.quote(self.reference.fasta_path)} {encode_input_files(self.input_fastqs)} |"
self.map_cmd += " samtools view -S -b - |"
Expand Down
10 changes: 10 additions & 0 deletions src/nomadic/process/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@
default=False,
help="Resume processing a previous experiment if the output directory already exists. This is necessary to pick of processing of an experiment that was aborted.",
)
@click.option(
"-t",
"--threads",
type=int,
default=4,
show_default=True,
help="Number of threads to use for analysis. Note that using more threads can increase the computational load and might lead to slower performance if the computer is not powerful enough.",
)
@click.option(
"-v",
"--verbose",
Expand All @@ -108,6 +116,7 @@ def process(
caller: str,
overwrite: bool,
resume: bool,
threads: int,
verbose: bool,
):
"""
Expand Down Expand Up @@ -166,6 +175,7 @@ def process(
region_bed,
reference_name,
caller,
threads,
verbose,
with_dashboard=False,
host="",
Expand Down
10 changes: 10 additions & 0 deletions src/nomadic/realtime/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@
default=True,
help="Whether to start the web dashboard to monitor the run.",
)
@click.option(
"-t",
"--threads",
type=int,
default=4,
show_default=True,
help="Number of threads to use for analysis. Note that using more threads can increase the computational load and might lead to slower performance if the computer is not powerful enough.",
)
@click.option(
"-v",
"--verbose",
Expand Down Expand Up @@ -136,6 +144,7 @@ def realtime(
resume: bool,
dashboard: bool,
verbose: bool,
threads: int,
host: str,
port: Optional[int],
):
Expand Down Expand Up @@ -207,6 +216,7 @@ def realtime(
region_bed,
reference_name,
caller,
threads,
verbose,
with_dashboard=dashboard,
realtime=True,
Expand Down
3 changes: 3 additions & 0 deletions src/nomadic/realtime/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(
expt_dirs: ExperimentDirectories,
fastq_dir: str,
caller: str,
threads: int,
ref_name: str = "Pf3D7",
):
"""
Expand All @@ -59,6 +60,7 @@ def __init__(
self.reference = REFERENCE_COLLECTION[ref_name]

self.caller = caller
self.threads = threads

def _get_barcode_pipeline(self, barcode_name: str) -> BarcodePipelineRT:
"""
Expand All @@ -70,6 +72,7 @@ def _get_barcode_pipeline(self, barcode_name: str) -> BarcodePipelineRT:
"expt_dirs": self.expt_dirs,
"bed_path": self.regions.path,
"ref_name": self.ref_name,
"threads": self.threads,
}

if self.caller:
Expand Down
10 changes: 9 additions & 1 deletion src/nomadic/realtime/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def main(
region_bed: str,
reference_name: str,
caller: str,
threads: int,
verbose: bool,
host: str,
with_dashboard: bool = True,
Expand Down Expand Up @@ -96,7 +97,14 @@ def main(

# INITIALISE WATCHERS
factory = PipelineFactory(
expt_name, metadata, regions, expt_dirs, fastq_dir, caller, reference_name
expt_name,
metadata,
regions,
expt_dirs,
fastq_dir,
caller,
ref_name=reference_name,
threads=threads,
)

watchers = factory.get_watchers()
Expand Down
12 changes: 9 additions & 3 deletions src/nomadic/realtime/pipelines/barcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def __init__(
expt_dirs: ExperimentDirectories,
bed_path: str,
caller: str,
threads: int,
ref_name: str = "Pf3D7",
):
"""
Expand All @@ -166,7 +167,7 @@ def __init__(
# Initialise analysis steps
common = {"barcode_name": barcode_name, "expt_dirs": expt_dirs}
self.fastq_step = FASTQProcessedRT(**common)
self.map_step = MappingRT(**common, ref_name=ref_name)
self.map_step = MappingRT(**common, ref_name=ref_name, threads=threads)
self.flagstat_step = FlagstatsRT(**common, ref_name=ref_name)
self.bedcov_step = RegionCoverage(
**common, bed_path=bed_path, ref_name=ref_name
Expand All @@ -176,11 +177,16 @@ def __init__(
)
if caller == "delve":
self.call_step = CallVariantsRTDelve(
**common, bed_path=bed_path, ref_name=ref_name
**common,
bed_path=bed_path,
ref_name=ref_name,
threads=threads,
)
elif caller == "bcftools":
self.call_step = CallVariantsRTBcftools(
**common, bed_path=bed_path, ref_name=ref_name
**common,
bed_path=bed_path,
ref_name=ref_name,
)
else:
raise RuntimeError(f"Unknown caller: {caller}")
Expand Down
7 changes: 6 additions & 1 deletion src/nomadic/realtime/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def __init__(
self,
barcode_name: str,
expt_dirs: ExperimentDirectories,
threads: int,
ref_name: str = "Pf3D7",
):
"""
Expand All @@ -153,6 +154,7 @@ def __init__(

self.reference = REFERENCE_COLLECTION[ref_name]
self.mapper = Minimap2(self.reference)
self.threads = threads

self.output_bam = (
f"{self.step_dir}/{self.barcode_name}.{self.reference.name}.final.bam"
Expand Down Expand Up @@ -181,7 +183,7 @@ def run(self, new_fastqs: List[str], incr_id: str):

incr_bam = self._get_incremental_bam_path(incr_id)
self.mapper.map_from_fastqs(fastq_paths=new_fastqs)
self.mapper.run(output_bam=incr_bam)
self.mapper.run(output_bam=incr_bam, threads=self.threads)
samtools_index(incr_bam)

return incr_bam
Expand Down Expand Up @@ -462,6 +464,7 @@ def __init__(
barcode_name: str,
expt_dirs: ExperimentDirectories,
bed_path: str,
threads: int,
ref_name: str = "Pf3D7",
):
"""Initialise output directory and define file names"""
Expand All @@ -470,6 +473,7 @@ def __init__(

self.bed_path = bed_path
self.reference = REFERENCE_COLLECTION[ref_name]
self.threads = threads

self.output_dir = produce_dir(self.barcode_dir, self.step_name)
self.output_vcf = (
Expand Down Expand Up @@ -531,6 +535,7 @@ def run(self, input_bam: str) -> str:
f" -f {shlex.quote(self.reference.fasta_path)}"
" --set-failed-GTs ."
f" {shlex.quote(filtered_bam_path)}"
f" --threads {self.threads}"
)

cmd_lowcomplexity_filter = self._get_lowcomplexity_filter_command()
Expand Down
Loading