Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions igf_airflow/utils/dag50_olink_reveal_nextflow_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import os
import logging
from pathlib import Path
from datetime import timedelta
from airflow.models import Variable
from airflow.decorators import task
from igf_data.utils.fileutils import (
check_file_path
)
from igf_airflow.utils.generic_airflow_utils import (
get_analysis_id_and_project_igf_id_from_airflow_dagrun_conf,
send_airflow_failed_logs_to_channels,
parse_analysis_design_and_get_metadata,
_create_output_from_jinja_template
)

log = logging.getLogger(__name__)

MS_TEAMS_CONF = Variable.get(
'analysis_ms_teams_conf',
default_var=None
)
DATABASE_CONFIG_FILE = Variable.get(
'database_config_file',
default_var=None
)
OLINK_NEXTFLOW_CONF_TEMPLATE = Variable.get(
'olink_nextflow_conf_template',
default_var=None
)
OLINK_NEXTFLOW_RUNNER_TEMPLATE = Variable.get(
"olink_nextflow_runner_template",
default_var=None
)

@task(
task_id="prepare_olink_nextflow_script",
retry_delay=timedelta(minutes=5),
retries=4,
queue='hpc_4G')
def prepare_olink_nextflow_script(
design_dict: dict,
work_dir: str,
analysis_design_key: str = "analysis_design",
seqrun_metadata_key: str = "seqrun_metadata",
run_base_dir_key: str = "run_base_dir",
reveal_fixed_lod_csv_key: str = "reveal_fixed_lod_csv",
sample_type_key: str = "sample_type",
dataAnalysisRefIds_key: str = "dataAnalysisRefIds",
panelDataArchive_key: str = "panelDataArchive",
plate_design_csv_key: str = "plate_design_csv",
indexPlate_key: str = "indexPlate",
plate_design_csv_file: str = "plate_design.csv"
) -> str:
try:
## read design and get sample metadata
design_file = \
design_dict.get(analysis_design_key)
check_file_path(design_file)
with open(design_file, 'r') as fp:
input_design_yaml = fp.read()
seqrun_list, metadata = parse_analysis_design_and_get_metadata(
sample_metadata_key=seqrun_metadata_key,
input_design_yaml=input_design_yaml
)
if (
seqrun_list is None or
len(seqrun_list) == 0
):
raise KeyError(
f"No entry for {seqrun_metadata_key} found in {design_file}."
)
if metadata is None:
raise KeyError(
f"No analysis metadata found in file {design_file}"
)
## check required metadata
run_base_dir = metadata.get(run_base_dir_key)
reveal_fixed_lod_csv = metadata.get(reveal_fixed_lod_csv_key)
sample_type = metadata.get(sample_type_key)
dataAnalysisRefIds = metadata.get(dataAnalysisRefIds_key)
panelDataArchive = metadata.get(panelDataArchive_key)
plate_design_csv = metadata.get(plate_design_csv_key)
indexPlate = metadata.get(indexPlate_key)
if (
run_base_dir is None or
reveal_fixed_lod_csv is None or
sample_type is None or
dataAnalysisRefIds is None or
panelDataArchive is None or
plate_design_csv is None or
indexPlate is None
):
raise KeyError(
f"Required metadata is not set correctly. Check {design_file}"
)
## prepare plate_design_csv
if not isinstance(plate_design_csv, list):
raise ValueError(
"Plate design csv data should be in the list format, got " +
type(plate_design_csv)
)
plate_design_file = Path(work_dir) / plate_design_csv_file
with open(plate_design_file.as_posix(), "w") as fp:
fp.write("\n".join(plate_design_csv))
## get project name
_, project_name = get_analysis_id_and_project_igf_id_from_airflow_dagrun_conf(
DATABASE_CONFIG_FILE
)
## render conf
nf_conf_file = Path(work_dir) / os.path.basename(OLINK_NEXTFLOW_CONF_TEMPLATE)
_create_output_from_jinja_template(
template_file=OLINK_NEXTFLOW_CONF_TEMPLATE,
output_file=nf_conf_file.as_posix(),
autoescape_list=['xml', 'html'],
data=dict(
RUN_ID=seqrun_list[0],
RUN_BASE_DIR=run_base_dir,
PLATE_DESIGN_CSV=plate_design_file,
PROJECT_NAME=project_name,
SAMPLE_TYPE=sample_type,
DATA_ANALYSIS_REF_IDS=dataAnalysisRefIds,
INDEX_PLATE=indexPlate,
WORKDIR=work_dir
)
)
## render runner script
nf_script_file = Path(work_dir) / os.path.basename(OLINK_NEXTFLOW_RUNNER_TEMPLATE)
_create_output_from_jinja_template(
template_file=OLINK_NEXTFLOW_RUNNER_TEMPLATE,
output_file=nf_script_file.as_posix(),
autoescape_list=['xml', 'html'],
data=dict(
CONFIG_FILE=nf_conf_file,
WORKDIR=work_dir
)
)
return nf_script_file
except Exception as e:
log.error(e)
send_airflow_failed_logs_to_channels(
ms_teams_conf=MS_TEAMS_CONF,
message_prefix=str(e))
raise ValueError(e)


@task.bash(
task_id="run_olink_nextflow_script",
retry_delay=timedelta(minutes=5),
queue='hpc_16G',
retries=4)
def run_olink_nextflow_script(run_script: str):
try:
bash_cmd = f"""set -eo pipefail;
bash { run_script }"""
return bash_cmd
except Exception as e:
log.error(e)
send_airflow_failed_logs_to_channels(
ms_teams_conf=MS_TEAMS_CONF,
message_prefix=str(e))
raise ValueError(e)
22 changes: 22 additions & 0 deletions template/nextflow_template/olink_nextflow_runner_v0.0.1.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/bash
## INPUT:
# * WORKDIR
# * CONFIG_FILE
## ENVS
source /rds/general/project/genomics-facility-archive-2019/live/tgu/resources/pipeline_resource/nextflow/env.sh

export TMPDIR=$EPHEMERAL
export NXF_OPTS='-Xms1g -Xmx4g'

cd {{ WORKDIR }}

$NEXTFLOW_EXE run /rds/general/project/genomics-facility-archive-2019/live/OLINK_DATA/olink_reveal_npx_and_qc_pipeline/main.nf \
-with-tower "$NEXTFLOW_TOWER" \
-resume \
-profile singularity \
--outdir {{ WORKDIR }}/results \
-work-dir {{ WORKDIR }}/work \
-with-report {{ WORKDIR }}/results/report.html \
-with-dag {{ WORKDIR }}/results/dag.html \
-with-timeline {{ WORKDIR }}/results/timeline.html \
-config {{ CONFIG_FILE }}
98 changes: 98 additions & 0 deletions template/nextflow_template/olink_nextflow_v0.0.1.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/* INPUT:
RUN_ID
RUN_BASE_DIR
PLATE_DESIGN_CSV
PROJECT_NAME
SAMPLE_TYPE
DATA_ANALYSIS_REF_IDS
INDEX_PLATE
WORKDIR
*/

params {
run_id = "{{ RUN_ID }}"
run_dir = "{{ RUN_BASE_DIR }}/{{ RUN_ID }}"
plate_design_csv = "{{ PLATE_DESIGN_CSV }}"
reveal_fixed_lod_csv = "/rds/general/project/genomics-facility-archive-2019/live/OLINK_DATA/fix_lod_csv/Reveal_Fixed_LOD.csv"
project_name = "{{ PROJECT_NAME }}"
sample_type = "{{ SAMPLE_TYPE }}"
dataAnalysisRefIds = "{{ DATA_ANALYSIS_REF_IDS }}"
indexPlate = "{{ INDEX_PLATE }}"
panelDataArchive = "/rds/general/project/genomics-facility-archive-2019/live/OLINK_DATA/panel/NPXMap_PanelDataArchive_2.0.0.dat"
// resources
max_memory = 96.GB
max_cpus = 16
max_time = 24.h
}


executor {
$pbspro {
queueSize = 25
}
$local {
cpus = 4
queueSize = 3
memory = '8 GB'
}
}


process {
executor = 'pbspro'
maxRetries = 4
clusterOptions = '-V'

withName: BCLCONVERT {
cpus = 4
memory = "8 GB"
container = "file:///rds/general/project/genomics-facility-archive-2019/live/OLINK_DATA/singularity/nf-core-bclconvert-4.4.6.img"
}


withName: OLINK_NGS2COUNTS {
cpus = 1
memory = "4 GB"
container = "file:///rds/general/project/genomics-facility-archive-2019/live/OLINK_DATA/singularity/igf_olink_ngs2counts_v6.2.sif"
}

withName: OLINK_REVEAL_NPX_MAP_PROJECT_CREATE {
cpus = 1
memory = "8 GB"
container = "file:///rds/general/project/genomics-facility-archive-2019/live/OLINK_DATA/singularity/npx-map-cli.sif"
}

withName: OLINK_REVEAL_NPX_MAP_PROJECT_EXPORT {
cpus = 1
memory = "8 GB"
container = "file:///rds/general/project/genomics-facility-archive-2019/live/OLINK_DATA/singularity/npx-map-cli.sif"
}

withName: OLINK_REVEAL_R_QC {
cpus = 1
memory = "8 GB"
container = "file:///rds/general/project/genomics-facility-archive-2019/live/OLINK_DATA/singularity/igf_olink_r_qc_v0.2.sif"
}



}

report {
overwrite = true
}

dag {
overwrite = true
}

timeline {
overwrite = true
}

singularity {
enabled = true
autoMounts = true
cacheDir = "$NEXTFLOW_SINGULARITY_CACHE_DIR"
runOptions = "-B {{ RUN_BASE_DIR }}/{{ RUN_ID }},{{ WORKDIR }},/rds/general/project/genomics-facility-archive-2019/live/OLINK_DATA/,$EPHEMERAL:/tmp,$EPHEMERAL:/var/tmp"
}
6 changes: 5 additions & 1 deletion test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ def full_suite():
from .igf_airflow.test_dag49_cosmx_metadata_registration_utils import (
Test_dag49_cosmx_metadata_registration_utilsA
)
from .igf_airflow.test_dag50_olink_reveal_nextflow_utils import (
Test_dag50_olink_reveal_nextflow_utilsA
)


return unittest.TestSuite([
Expand Down Expand Up @@ -376,6 +379,7 @@ def full_suite():
Test_dag46_scRNA_10X_flex_utilsA,
Test_dag45_metadata_registration_utilsA,
Test_dag44_analysis_registration_utilsA,
Test_dag49_cosmx_metadata_registration_utilsA
Test_dag49_cosmx_metadata_registration_utilsA,
Test_dag50_olink_reveal_nextflow_utilsA
]
])
90 changes: 90 additions & 0 deletions test/igf_airflow/test_dag50_olink_reveal_nextflow_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import os
import unittest
from unittest.mock import patch
from pathlib import Path
from igf_data.utils.fileutils import (
get_temp_dir,
remove_dir)
from igf_airflow.utils.dag50_olink_reveal_nextflow_utils import (
prepare_olink_nextflow_script,
run_olink_nextflow_script
)

class Test_dag50_olink_reveal_nextflow_utilsA(unittest.TestCase):
def setUp(self):
self.temp_dir = get_temp_dir()

def tearDown(self):
remove_dir(self.temp_dir)

@patch(
"igf_airflow.utils.dag50_olink_reveal_nextflow_utils.DATABASE_CONFIG_FILE",
"TEST"
)
@patch(
"igf_airflow.utils.dag50_olink_reveal_nextflow_utils.OLINK_NEXTFLOW_CONF_TEMPLATE",
"template/nextflow_template/olink_nextflow_v0.0.1.conf"
)
@patch(
"igf_airflow.utils.dag50_olink_reveal_nextflow_utils.OLINK_NEXTFLOW_RUNNER_TEMPLATE",
"template/nextflow_template/olink_nextflow_runner_v0.0.1.sh"
)
@patch(
"igf_airflow.utils.dag50_olink_reveal_nextflow_utils.get_analysis_id_and_project_igf_id_from_airflow_dagrun_conf",
return_value=[1, "Test_project"]
)
def test_prepare_olink_nextflow_script(self, *args):
design_yaml = """seqrun_metadata:
- SEQRUN_ID_1
analysis_metadata:
run_base_dir: /run_dir
reveal_fixed_lod_csv: /OLINK/Reveal_Fixed_LOD.csv
sample_type: Generic
dataAnalysisRefIds: R10003
panelDataArchive: /OLINK/NPXMap_PanelDataArchive_2.0.0.dat
indexPlate: A
plate_design_csv:
- well_id;sample_id;sample_type
- A1;A200;SAMPLE
- A12;Olink_external_control3;CONTROL
- D12;Olink_external_control6;PLATE_CONTROL
- G11;Olink_external_control1;NEGATIVE_CONTROL
"""
design_yaml_file = Path(self.temp_dir) / "analysis_design.yaml"
with open(design_yaml_file, "w") as fp:
fp.write(design_yaml)
design_dict = {"analysis_design": design_yaml_file}
analysis_script = prepare_olink_nextflow_script.function(
design_dict=design_dict,
work_dir=self.temp_dir
)
assert os.path.exists(analysis_script)
with open(analysis_script, "r") as fp:
script_data = fp.read() ## small file
assert "olink_nextflow_v0.0.1.conf" in script_data
conf_path = Path(self.temp_dir) / "olink_nextflow_v0.0.1.conf"
assert conf_path.as_posix() in script_data
with open(conf_path.as_posix(), "r") as fp:
conf_data = fp.read() ## small file
assert "run_id = \"SEQRUN_ID_1\"" in conf_data
assert "run_dir = \"/run_dir/SEQRUN_ID_1\"" in conf_data
plate_design_csv = Path(self.temp_dir) / "plate_design.csv"
assert f"plate_design_csv = \"{plate_design_csv.as_posix()}\"" in conf_data
assert "project_name = \"Test_project\"" in conf_data
with open(plate_design_csv.as_posix(), "r") as fp:
csv_data = fp.read()
assert len(csv_data.split("\n")) == 5
assert "well_id;sample_id;sample_type\nA1;A200;SAMPLE" in csv_data
assert "G11;Olink_external_control1;NEGATIVE_CONTROL" in csv_data



def test_run_olink_nextflow_script(self):
script_file = "/tmp/t.sh"
analysis_cmd = run_olink_nextflow_script.function(
script_file
)
assert analysis_cmd == f"set -eo pipefail;\nbash {script_file}"

if __name__=='__main__':
unittest.main()