Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions igf_airflow/utils/dag53_register_external_run_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import os
import json
import logging
from datetime import timedelta
from airflow.models import Variable
from airflow.decorators import task
from airflow.operators.python import get_current_context
from igf_airflow.utils.generic_airflow_utils import (
send_airflow_failed_logs_to_channels,
)
from igf_airflow.utils.dag25_copy_seqruns_to_hpc_utils import (
_create_interop_report,
_load_interop_data_to_db,
_load_interop_overview_data_to_seqrun_attribute,
register_new_seqrun_to_db)
from igf_portal.api_utils import upload_files_to_portal
from igf_data.utils.fileutils import (
get_temp_dir
)
from igf_data.illumina.runinfo_xml import RunInfo_xml

log = logging.getLogger(__name__)

MS_TEAMS_CONF = Variable.get(
'analysis_ms_teams_conf',
default_var=None
)
DATABASE_CONFIG_FILE = Variable.get(
'database_config_file',
default_var=None
)
HPC_SEQRUN_PATH = Variable.get(
'hpc_seqrun_path',
default_var=None
)
IGF_PORTAL_CONF = Variable.get(
'igf_portal_conf', default_var=None
)
PORTAL_ADD_SEQRUN_URL = Variable.get(
'portal_add_seqrun_url',
default_var="/api/v1/raw_seqrun/add_new_seqrun"
)
PORTAL_ADD_INTEROP_REPORT_URL = Variable.get(
'portal_add_interop_report_url',
default_var="/api/v1/interop_data/add_report"
)
HPC_INTEROP_PATH = Variable.get(
'hpc_interop_path',
default_var=None
)
INTEROP_REPORT_TEMPLATE = Variable.get(
'interop_report_template',
default_var=None
)
INTEROP_REPORT_IMAGE = Variable.get(
'interop_report_image',
default_var=None
)
INTEROP_REPORT_BASE_PATH = Variable.get(
'interop_report_base_path',
default_var=None
)
INTEROP_PREDICTION_TEMPLATE = Variable.get(
'interop_prediction_template',
default_var=None
)

@task(
task_id="create_qc_and_load_external_run",
retry_delay=timedelta(minutes=5),
retries=4,
queue='hpc_16G8t')
def create_qc_and_load_external_run(
external_seqrun_id_key: str = "external_seqrun_id",
dag_run_key: str = "dag_run"
) -> None:
try:
context = get_current_context()
dag_run = context.get(dag_run_key)
external_seqrun_id = None
if (
dag_run is not None and
dag_run.conf is not None and
dag_run.conf.get(external_seqrun_id_key) is not None
):
external_seqrun_id = dag_run.conf.get(
external_seqrun_id_key
)
if external_seqrun_id is None:
raise ValueError(
'external_seqrun_id not found in dag_run.conf'
)
## step 1
(
output_notebook_path,
_,
overview_csv_output,
_,
work_dir
) = _create_interop_report(
run_id=external_seqrun_id,
run_dir_base_path=HPC_SEQRUN_PATH,
report_template=INTEROP_REPORT_TEMPLATE,
report_image=INTEROP_REPORT_IMAGE
)
## step 2
_load_interop_data_to_db(
run_id=external_seqrun_id,
interop_output_dir=work_dir,
interop_report_base_path=INTEROP_REPORT_BASE_PATH,
dbconfig_file=DATABASE_CONFIG_FILE
)
## step 3
_ = register_new_seqrun_to_db(
dbconfig_file=DATABASE_CONFIG_FILE,
seqrun_id=external_seqrun_id,
seqrun_base_path=HPC_SEQRUN_PATH
)
## step 4
runinfo_file_path = os.path.join(
HPC_SEQRUN_PATH,
external_seqrun_id,
'RunInfo.xml'
)
runinfo_data = RunInfo_xml(
xml_file=runinfo_file_path
)
formatted_read_stats = (
runinfo_data
.get_formatted_read_stats()
)
## step 5
temp_dir = get_temp_dir(
use_ephemeral_space=True
)
json_data = {
"seqrun_id_list": [external_seqrun_id,],
"run_config_list": [formatted_read_stats,]}
new_run_list_json = os.path.join(
temp_dir,
'new_run_list.json'
)
with open(new_run_list_json, 'w') as fp:
json.dump(json_data, fp)
_ = upload_files_to_portal(
url_suffix=PORTAL_ADD_SEQRUN_URL,
portal_config_file=IGF_PORTAL_CONF,
file_path=new_run_list_json,
verify=False,
jsonify=False
)
## step 6
_load_interop_overview_data_to_seqrun_attribute(
seqrun_igf_id=external_seqrun_id,
dbconfig_file=DATABASE_CONFIG_FILE,
interop_overview_file=overview_csv_output
)
_ = upload_files_to_portal(
portal_config_file=IGF_PORTAL_CONF,
file_path=output_notebook_path,
data={"run_name": external_seqrun_id, "tag": "InterOp"},
url_suffix=PORTAL_ADD_INTEROP_REPORT_URL
)
except Exception as e:
log.error(e)
send_airflow_failed_logs_to_channels(
ms_teams_conf=MS_TEAMS_CONF,
message_prefix=str(e))
raise ValueError(e)
6 changes: 5 additions & 1 deletion test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ def full_suite():
from .igf_airflow.test_dag50_olink_reveal_nextflow_utils import (
Test_dag50_olink_reveal_nextflow_utilsA
)
from .igf_airflow.dag53_register_external_run_utils_test import (
Test_dag53_register_external_run_utils
)


return unittest.TestSuite([
Expand Down Expand Up @@ -380,6 +383,7 @@ def full_suite():
Test_dag45_metadata_registration_utilsA,
Test_dag44_analysis_registration_utilsA,
Test_dag49_cosmx_metadata_registration_utilsA,
Test_dag50_olink_reveal_nextflow_utilsA
Test_dag50_olink_reveal_nextflow_utilsA,
Test_dag53_register_external_run_utils
]
])
169 changes: 169 additions & 0 deletions test/igf_airflow/dag53_register_external_run_utils_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import os
import unittest
from unittest.mock import patch, MagicMock

MODULE = "igf_airflow.utils.dag53_register_external_run_utils"


class Test_dag53_register_external_run_utils(unittest.TestCase):
def setUp(self):
self.external_seqrun_id = "EXT_RUN_001"
self.mock_dag_run = MagicMock()
self.mock_dag_run.conf = {
"external_seqrun_id": self.external_seqrun_id
}
self.mock_context = {
"dag_run": self.mock_dag_run
}

@patch(f"{MODULE}.send_airflow_failed_logs_to_channels")
@patch(f"{MODULE}.upload_files_to_portal")
@patch(f"{MODULE}._load_interop_overview_data_to_seqrun_attribute")
@patch(f"{MODULE}.get_temp_dir", return_value="/test_temp")
@patch(f"{MODULE}.RunInfo_xml")
@patch(f"{MODULE}.register_new_seqrun_to_db")
@patch(f"{MODULE}._load_interop_data_to_db")
@patch(f"{MODULE}._create_interop_report",
return_value=("/path/notebook.html", None, "/path/overview.csv", None, "/path/work_dir"))
@patch(f"{MODULE}.get_current_context")
@patch(f"{MODULE}.Variable")
def test_create_qc_and_load_external_run_success(
self,
mock_variable,
mock_get_context,
mock_create_interop_report,
mock_load_interop_data,
mock_register_seqrun,
mock_runinfo_xml,
mock_get_temp_dir,
mock_load_interop_overview,
mock_upload_files,
mock_send_logs):
mock_get_context.return_value = self.mock_context
mock_runinfo_instance = MagicMock()
mock_runinfo_instance.get_formatted_read_stats.return_value = "1x50"
mock_runinfo_xml.return_value = mock_runinfo_instance
mock_open_func = unittest.mock.mock_open()
with patch(f"{MODULE}.HPC_SEQRUN_PATH", "/seqrun"), \
patch(f"{MODULE}.INTEROP_REPORT_TEMPLATE", "/template"), \
patch(f"{MODULE}.INTEROP_REPORT_IMAGE", "/image"), \
patch(f"{MODULE}.INTEROP_REPORT_BASE_PATH", "/interop_base"), \
patch(f"{MODULE}.DATABASE_CONFIG_FILE", "/db_config"), \
patch(f"{MODULE}.IGF_PORTAL_CONF", "/portal_conf"), \
patch(f"{MODULE}.PORTAL_ADD_SEQRUN_URL", "/api/v1/raw_seqrun/add_new_seqrun"), \
patch(f"{MODULE}.PORTAL_ADD_INTEROP_REPORT_URL", "/api/v1/interop_data/add_report"), \
patch(f"{MODULE}.MS_TEAMS_CONF", "/ms_teams_conf"), \
patch("builtins.open", mock_open_func):
from igf_airflow.utils.dag53_register_external_run_utils import (
create_qc_and_load_external_run
)
create_qc_and_load_external_run.function()
mock_create_interop_report.assert_called_once_with(
run_id=self.external_seqrun_id,
run_dir_base_path="/seqrun",
report_template="/template",
report_image="/image"
)
mock_load_interop_data.assert_called_once_with(
run_id=self.external_seqrun_id,
interop_output_dir="/path/work_dir",
interop_report_base_path="/interop_base",
dbconfig_file="/db_config"
)
mock_register_seqrun.assert_called_once_with(
dbconfig_file="/db_config",
seqrun_id=self.external_seqrun_id,
seqrun_base_path="/seqrun"
)
mock_runinfo_xml.assert_called_once_with(
xml_file=os.path.join("/seqrun", self.external_seqrun_id, "RunInfo.xml")
)
mock_get_temp_dir.assert_called_once_with(use_ephemeral_space=True)
self.assertEqual(mock_upload_files.call_count, 2)
first_upload_call = mock_upload_files.call_args_list[0]
self.assertEqual(
first_upload_call.kwargs["url_suffix"],
"/api/v1/raw_seqrun/add_new_seqrun"
)
second_upload_call = mock_upload_files.call_args_list[1]
self.assertEqual(
second_upload_call.kwargs["url_suffix"],
"/api/v1/interop_data/add_report"
)
self.assertEqual(
second_upload_call.kwargs["data"],
{"run_name": self.external_seqrun_id, "tag": "InterOp"}
)
mock_load_interop_overview.assert_called_once_with(
seqrun_igf_id=self.external_seqrun_id,
dbconfig_file="/db_config",
interop_overview_file="/path/overview.csv"
)
mock_send_logs.assert_not_called()

@patch(f"{MODULE}.send_airflow_failed_logs_to_channels")
@patch(f"{MODULE}.get_current_context")
@patch(f"{MODULE}.Variable")
def test_create_qc_and_load_external_run_no_seqrun_id(
self,
mock_variable,
mock_get_context,
mock_send_logs):
mock_dag_run = MagicMock()
mock_dag_run.conf = {}
mock_get_context.return_value = {"dag_run": mock_dag_run}
with patch(f"{MODULE}.MS_TEAMS_CONF", "/ms_teams_conf"):
from igf_airflow.utils.dag53_register_external_run_utils import (
create_qc_and_load_external_run
)
with self.assertRaises(ValueError):
create_qc_and_load_external_run.function()
mock_send_logs.assert_called_once()

@patch(f"{MODULE}.send_airflow_failed_logs_to_channels")
@patch(f"{MODULE}.get_current_context")
@patch(f"{MODULE}.Variable")
def test_create_qc_and_load_external_run_no_dag_run(
self,
mock_variable,
mock_get_context,
mock_send_logs):
mock_get_context.return_value = {"dag_run": None}
with patch(f"{MODULE}.MS_TEAMS_CONF", "/ms_teams_conf"):
from igf_airflow.utils.dag53_register_external_run_utils import (
create_qc_and_load_external_run
)
with self.assertRaises(ValueError):
create_qc_and_load_external_run.function()
mock_send_logs.assert_called_once()

@patch(f"{MODULE}.send_airflow_failed_logs_to_channels")
@patch(f"{MODULE}._create_interop_report",
side_effect=Exception("interop report failed"))
@patch(f"{MODULE}.get_current_context")
@patch(f"{MODULE}.Variable")
def test_create_qc_and_load_external_run_interop_report_failure(
self,
mock_variable,
mock_get_context,
mock_create_interop_report,
mock_send_logs):
mock_get_context.return_value = self.mock_context
with patch(f"{MODULE}.HPC_SEQRUN_PATH", "/seqrun"), \
patch(f"{MODULE}.INTEROP_REPORT_TEMPLATE", "/template"), \
patch(f"{MODULE}.INTEROP_REPORT_IMAGE", "/image"), \
patch(f"{MODULE}.MS_TEAMS_CONF", "/ms_teams_conf"):
from igf_airflow.utils.dag53_register_external_run_utils import (
create_qc_and_load_external_run
)
with self.assertRaises(ValueError):
create_qc_and_load_external_run.function()
mock_send_logs.assert_called_once()
self.assertIn(
"interop report failed",
mock_send_logs.call_args.kwargs["message_prefix"]
)


if __name__ == '__main__':
unittest.main()
Loading