Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions igf_airflow/utils/dag45_metadata_registration_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import logging
from airflow.decorators import task
from airflow.models import Variable
from datetime import timedelta
from airflow.operators.python import get_current_context
from igf_airflow.utils.dag20_portal_metadata_utils import (
_parse_default_user_email_from_email_config)
from igf_airflow.utils.generic_airflow_utils import (
send_airflow_failed_logs_to_channels)
from igf_data.process.seqrun_processing.find_and_process_new_project_data_from_portal_db import (
Find_and_register_new_project_data_from_portal_db)

log = logging.getLogger(__name__)

MS_TEAMS_CONF = \
Variable.get('ms_teams_conf', default_var=None)
IGF_PORTAL_CONF = \
Variable.get('igf_portal_conf', default_var=None)
EMAIL_TEMPLATE = \
Variable.get('user_account_creation_template', default_var=None)
EMAIL_CONF = \
Variable.get("email_config", default_var=None)
DATABASE_CONFIG_FILE = \
Variable.get('database_config_file', default_var=None)

## TASK - find raw metadata id in datrun.conf
@task(
task_id="find_raw_metadata_id",
retry_delay=timedelta(minutes=5),
retries=4,
queue='hpc_4G',
multiple_outputs=False)
def find_raw_metadata_id(
raw_metadata_id_tag: str = "raw_metadata_id",
dag_run_key: str = "dag_run") \
-> int:
try:
### dag_run.conf should have raw_analysis_id
context = get_current_context()
dag_run = context.get(dag_run_key)
raw_metadata_id = None
if dag_run is not None and \
dag_run.conf is not None and \
dag_run.conf.get(raw_metadata_id_tag) is not None:
raw_metadata_id = \
dag_run.conf.get(raw_metadata_id_tag)
if raw_metadata_id is None:
raise ValueError(
'raw_metadata_id not found in dag_run.conf')
return raw_metadata_id
except Exception as e:
message = \
f"Failed to get raw_metadata_id, error: {e}"
log.error(message)
send_airflow_failed_logs_to_channels(
ms_teams_conf=MS_TEAMS_CONF,
message_prefix=str(message))
raise ValueError(message)


##TASK
@task(
task_id="register_metadata_from_portal",
retries=1,
queue='hpc_4G',
multiple_outputs=False)
def register_metadata_from_portal(raw_metadata_id: int) -> None:
try:
default_project_user_email = \
_parse_default_user_email_from_email_config(EMAIL_CONF)
fa = \
Find_and_register_new_project_data_from_portal_db(
portal_db_conf_file=IGF_PORTAL_CONF,
dbconfig=DATABASE_CONFIG_FILE,
user_account_template=EMAIL_TEMPLATE,
default_user_email=default_project_user_email,
raw_metadata_id=raw_metadata_id,
log_slack=False,
check_hpc_user=False,
hpc_user=None,
hpc_address=None,
ldap_server=None,
setup_irods=False,
notify_user=True,
email_config_json=EMAIL_CONF)
fa.process_project_data_and_account()
except Exception as e:
message = \
f"Failed to fetch raw_analysis_metadata, error: {e}"
log.error(message)
send_airflow_failed_logs_to_channels(
ms_teams_conf=MS_TEAMS_CONF,
message_prefix=str(message))
raise ValueError(message)
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
from io import StringIO
import smtplib

IGFPORTAL_RAW_METADATA_DOWNLOAD_ALL_URL = "/api/v1/raw_metadata/download_ready_metadata"
IGFPORTAL_RAW_METADATA_MARK_ALL_SYNCED_URL = "/api/v1/raw_metadata/mark_ready_metadata_as_synced"
IGFPORTAL_RAW_METADATA_DOWNLOAD_ENTRY_URL = "/api/v1/raw_metadata/get_raw_metadata"
IGFPORTAL_RAW_METADATA_MARK_ENTRY_SYNCED_URL = "/api/v1/raw_metadata/mark_ready_metadata_as_synced"


def send_email_via_smtp(
sender: str,
Expand Down Expand Up @@ -81,6 +86,7 @@ def __init__(
user_account_template: str,
portal_db_conf_file: str,
log_slack: bool = True,
raw_metadata_id: Union[int, None] = None,
slack_config: Union[str, None] = None,
check_hpc_user: bool = False,
hpc_user: Union[str, None] = None,
Expand All @@ -104,6 +110,7 @@ def __init__(
self.user_lookup_column = user_lookup_column
self.sample_lookup_column = sample_lookup_column
self.data_authority_column = data_authority_column
self.raw_metadata_id = raw_metadata_id
self.log_slack = log_slack
dbparams = read_dbconf_json(dbconfig)
base = BaseAdaptor(**dbparams)
Expand All @@ -121,13 +128,15 @@ def __init__(
if log_slack and slack_config is None:
raise ValueError('Missing slack config file')
elif log_slack and slack_config:
self.igf_slack = IGF_slack(slack_config=slack_config)
self.igf_slack = \
IGF_slack(slack_config=slack_config)
if check_hpc_user and \
(hpc_user is None or \
hpc_address is None or \
ldap_server is None):
raise ValueError(
f'Hpc user {hpc_user} address {hpc_address}, and ldap server {ldap_server} needed for hpc check')
f"Hpc user {hpc_user} address {hpc_address}," + \
f"and ldap server {ldap_server} needed for hpc check")
except Exception as e:
raise ValueError(
f'Init failed, error: {e}')
Expand Down Expand Up @@ -171,10 +180,16 @@ def _compare_and_mark_portal_data_as_synced(
def _get_new_project_data_from_portal(self) \
-> Any:
try:
res = \
get_data_from_portal(
url_suffix="/api/v1/raw_metadata/download_ready_metadata",
portal_config_file=self.portal_db_conf_file)
if self.raw_metadata_id is None:
res = \
get_data_from_portal(
url_suffix=IGFPORTAL_RAW_METADATA_DOWNLOAD_ALL_URL,
portal_config_file=self.portal_db_conf_file)
else:
res = \
get_data_from_portal(
url_suffix=f"{IGFPORTAL_RAW_METADATA_DOWNLOAD_ENTRY_URL}/{self.raw_metadata_id}",
portal_config_file=self.portal_db_conf_file)
return res
except Exception as e:
raise ValueError(
Expand All @@ -184,10 +199,16 @@ def _get_new_project_data_from_portal(self) \
def _mark_portal_db_as_synced(self) \
-> Any:
try:
res = \
get_data_from_portal(
url_suffix="/api/v1/raw_metadata/mark_ready_metadata_as_synced",
portal_config_file=self.portal_db_conf_file)
if self.raw_metadata_id is None:
res = \
get_data_from_portal(
url_suffix=IGFPORTAL_RAW_METADATA_MARK_ALL_SYNCED_URL,
portal_config_file=self.portal_db_conf_file)
else:
res = \
get_data_from_portal(
url_suffix=f"{IGFPORTAL_RAW_METADATA_MARK_ENTRY_SYNCED_URL}/{self.raw_metadata_id}",
portal_config_file=self.portal_db_conf_file)
return res
except Exception as e:
raise ValueError(e)
Expand Down Expand Up @@ -245,11 +266,16 @@ def _read_project_info_and_get_new_entries(
required_project_user_columns)
project_user_data = \
project_info_data.loc[:, project_user_data_columns] # get data for project user table
required_sample_columns = \
list(set(project_info_data.columns).\
difference(set(list(project_data)+\
list(user_data)+\
list(project_user_data)))) # all remaining column goes to sample tables
required_sample_columns = list(
set(project_info_data.columns)
.difference(
set(
list(project_data) + \
list(user_data) + \
list(project_user_data)
)
)
) # all remaining column goes to sample tables
required_sample_columns.\
append('project_igf_id')
sample_data = \
Expand Down
8 changes: 7 additions & 1 deletion test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,12 @@ def full_suite():
from .process.unified_metadata_registration_test import (
TestUnifiedMetadataRegistrationA,
TestUnifiedMetadataRegistrationB)
from .igf_airflow.test_dag45_metadata_registration_utils import (
Test_dag45_metadata_registration_utilsA)
from .igf_airflow.test_dag46_scRNA_10X_flex_utils import (
Test_dag46_scRNA_10X_flex_utilsA)
from .igf_airflow.test_dag44_analysis_registration_utils import (
Test_dag44_analysis_registration_utilsA)


return unittest.TestSuite([
Expand Down Expand Up @@ -366,6 +370,8 @@ def full_suite():
Test_dag1_calculate_hpc_worker_utils,
TestUnifiedMetadataRegistrationA,
TestUnifiedMetadataRegistrationB,
Test_dag46_scRNA_10X_flex_utilsA
Test_dag46_scRNA_10X_flex_utilsA,
Test_dag45_metadata_registration_utilsA,
Test_dag44_analysis_registration_utilsA
]
])
38 changes: 38 additions & 0 deletions test/igf_airflow/test_dag45_metadata_registration_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import unittest
from unittest.mock import (
patch,
MagicMock)
from igf_airflow.utils.dag45_metadata_registration_utils import (
find_raw_metadata_id,
register_metadata_from_portal)

class Test_dag45_metadata_registration_utilsA(unittest.TestCase):
def setUp(self):
pass

def tearDown(self):
pass

@patch("igf_airflow.utils.dag45_metadata_registration_utils.get_current_context")
def test_find_raw_metadata_id(self, mock_get_context):
# Setup Airflow context mock
mock_context = MagicMock()
mock_context.dag_run.conf.raw_metadata_id = 1
mock_context.get.return_value = mock_context.dag_run
mock_context.dag_run.conf.get.return_value = 1
mock_get_context.return_value = mock_context
raw_metadata_id = find_raw_metadata_id.function()
assert raw_metadata_id == 1

@patch('igf_airflow.utils.dag45_metadata_registration_utils.Find_and_register_new_project_data_from_portal_db')
@patch('igf_airflow.utils.dag45_metadata_registration_utils._parse_default_user_email_from_email_config')
def test_register_metadata_from_portal(self, mock_parse, mock_class):
# Create instance of the mocked class
mock_instance = MagicMock()
mock_instance.process_project_data_and_account.return_value = None
mock_class.return_value = mock_instance
register_metadata_from_portal.function(raw_metadata_id=1)
mock_instance.process_project_data_and_account.assert_called_once()

if __name__=='__main__':
unittest.main()
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
import pandas as pd
import unittest,json,os,shutil
from igf_data.igfdb.igfTables import Base, Project, User, Sample, ProjectUser, Project_attribute
import unittest
import json
import os
from unittest.mock import patch
from igf_data.igfdb.igfTables import (
Base,
Project,
User,
Sample,
ProjectUser)
from igf_data.igfdb.baseadaptor import BaseAdaptor
from igf_data.igfdb.projectadaptor import ProjectAdaptor
from igf_data.igfdb.useradaptor import UserAdaptor
from igf_data.igfdb.sampleadaptor import SampleAdaptor
from igf_data.utils.fileutils import get_temp_dir, remove_dir
from igf_data.process.seqrun_processing.find_and_process_new_project_data_from_portal_db import Find_and_register_new_project_data_from_portal_db
from igf_data.utils.fileutils import (
get_temp_dir,
remove_dir)
from igf_data.process.seqrun_processing.find_and_process_new_project_data_from_portal_db import (
Find_and_register_new_project_data_from_portal_db)

class Find_and_register_new_project_data_from_portal_db_test1(unittest.TestCase):
def setUp(self):
Expand Down Expand Up @@ -113,6 +124,62 @@ def test_read_project_info_and_get_new_entries(self):
self.assertTrue('email_id' in user_data.columns)
self.assertEqual(user_data['email_id'].values[0], 'user2@ic.ac.uk')

@patch("igf_data.process.seqrun_processing.find_and_process_new_project_data_from_portal_db.get_data_from_portal")
def test_get_new_project_data_from_portal(self, mock_context):
project_data = [
{'project_igf_id':'IGFP0002_test_23-5-2017_rna',
'name':'user2',
'email_id':'user2@ic.ac.uk',
'sample_igf_id':'IGF00006'},
{'project_igf_id':'IGFP0003_test_24-8-2017_rna',
'name':'user2',
'email_id':'user2@ic.ac.uk',
'sample_igf_id':'IGF00007',
'barcode_check':'OFF'},
{'sample_igf_id':'IGF00001',
'project_igf_id':'IGFP0001_test_22-8-2017_rna',
'name':'user1',
'email_id':'user1@ic.ac.uk'}]
fa1 = \
Find_and_register_new_project_data_from_portal_db(
portal_db_conf_file=self.portal_conf_file,
dbconfig=self.dbconfig,
user_account_template='template/email_notification/send_new_account_info.txt',
log_slack=False,
default_user_email='admin@email.com',
setup_irods=False,
notify_user=False)
fa1._get_new_project_data_from_portal()
mock_context.assert_called_once()

@patch("igf_data.process.seqrun_processing.find_and_process_new_project_data_from_portal_db.get_data_from_portal")
def test_mark_portal_db_as_synced(self, mock_context):
project_data = [
{'project_igf_id':'IGFP0002_test_23-5-2017_rna',
'name':'user2',
'email_id':'user2@ic.ac.uk',
'sample_igf_id':'IGF00006'},
{'project_igf_id':'IGFP0003_test_24-8-2017_rna',
'name':'user2',
'email_id':'user2@ic.ac.uk',
'sample_igf_id':'IGF00007',
'barcode_check':'OFF'},
{'sample_igf_id':'IGF00001',
'project_igf_id':'IGFP0001_test_22-8-2017_rna',
'name':'user1',
'email_id':'user1@ic.ac.uk'}]
fa2 = \
Find_and_register_new_project_data_from_portal_db(
portal_db_conf_file=self.portal_conf_file,
dbconfig=self.dbconfig,
user_account_template='template/email_notification/send_new_account_info.txt',
log_slack=False,
default_user_email='admin@email.com',
setup_irods=False,
notify_user=False)
fa2._mark_portal_db_as_synced()
mock_context.assert_called_once()

def test_check_and_register_data(self):
project_data = [
{'project_igf_id':'IGFP0002_test_23-5-2017_rna',
Expand Down