Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 20 additions & 25 deletions igf_airflow/utils/dag44_analysis_registration_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,11 @@
task_id="find_raw_metadata_id",
retry_delay=timedelta(minutes=5),
retries=4,
queue='hpc_4G',
multiple_outputs=False)
queue='hpc_4G')
def find_raw_metadata_id(
raw_analysis_id_tag: str = "raw_analysis_id",
dag_run_key: str = "dag_run") \
-> Dict[str, int]:
-> int:
try:
### dag_run.conf should have raw_analysis_id
context = get_current_context()
Expand All @@ -61,7 +60,7 @@ def find_raw_metadata_id(
if raw_analysis_id is None:
raise ValueError(
'raw_analysis_id not found in dag_run.conf')
return {raw_analysis_id_tag: raw_analysis_id}
return int(raw_analysis_id)
except Exception as e:
message = \
f"Failed to get raw_analysis_id, error: {e}"
Expand All @@ -74,12 +73,10 @@ def find_raw_metadata_id(
## TASK - fetch raw analysis metadata from portal
@task(
task_id="fetch_raw_metadata_from_portal",
retries=0,
queue='hpc_4G',
multiple_outputs=False)
retries=1,
queue='hpc_4G')
def fetch_raw_metadata_from_portal(
raw_analysis_id: int,
raw_metadata_file_tag: str = "raw_metadata_file") -> Dict[str, str]:
raw_analysis_id: int) -> str:
try:
raw_analysis_data = \
get_data_from_portal(
Expand All @@ -101,7 +98,7 @@ def fetch_raw_metadata_from_portal(
os.path.join(temp_dir, "raw_metadata.json")
with open(raw_metadata_json_file, "w") as fp:
json.dump(raw_analysis_data, fp)
return {raw_metadata_file_tag: raw_metadata_json_file}
return raw_metadata_json_file
except Exception as e:
message = \
f"Failed to fetch raw_analysis_metadata, error: {e}"
Expand Down Expand Up @@ -141,12 +138,10 @@ def check_registered_analysis_in_db(
task_id="check_raw_metadata_in_db",
retry_delay=timedelta(minutes=5),
retries=4,
queue='hpc_4G',
multiple_outputs=False)
queue='hpc_4G')
def check_raw_metadata_in_db(
raw_metadata_file: str,
valid_raw_metadata_file_tag: str = "valid_raw_metadata_file") \
-> Dict[str, str]:
raw_metadata_file: str) \
-> str:
try:
check_file_path(raw_metadata_file)
with open(raw_metadata_file, "r") as fp:
Expand All @@ -166,10 +161,9 @@ def check_raw_metadata_in_db(
project_id=project_id,
analysis_name=analysis_name,
dbconf_json=DATABASE_CONFIG_FILE)
if analysis_reg:
return {valid_raw_metadata_file_tag: raw_metadata_file}
else:
return {valid_raw_metadata_file_tag: ""}
if not analysis_reg:
raw_metadata_file = ""
return raw_metadata_file
except Exception as e:
message = \
f"Failed to check existing raw_analysis_metadata, error: {e}"
Expand Down Expand Up @@ -256,10 +250,10 @@ def register_analysis_in_db(
retries=4,
queue='hpc_4G',
multiple_outputs=False)
def register_raw_analysis_metadata_in_db(valid_raw_metadata_file):
def register_raw_analysis_metadata_in_db(valid_raw_metadata_file: str) -> bool:
try:
if valid_raw_metadata_file == "":
return {"status": False}
return False
else:
check_file_path(valid_raw_metadata_file)
with open(valid_raw_metadata_file, "r") as fp:
Expand All @@ -281,7 +275,7 @@ def register_raw_analysis_metadata_in_db(valid_raw_metadata_file):
analysis_name=analysis_name,
analysis_yaml=analysis_yaml,
dbconf_json=DATABASE_CONFIG_FILE)
return {"status": status}
return status
except Exception as e:
message = \
f"Failed to register raw_analysis_metadata, error: {e}"
Expand All @@ -296,9 +290,10 @@ def register_raw_analysis_metadata_in_db(valid_raw_metadata_file):
task_id="mark_metadata_synced_on_portal",
retry_delay=timedelta(minutes=5),
retries=4,
queue='hpc_4G',
multiple_outputs=False)
def mark_metadata_synced_on_portal(raw_analysis_id, registration_status):
queue='hpc_4G')
def mark_metadata_synced_on_portal(
raw_analysis_id: int,
registration_status: bool) -> None:
try:
if registration_status:
_ = \
Expand Down
29 changes: 12 additions & 17 deletions test/igf_airflow/test_dag44_analysis_registration_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,15 @@ def test_find_raw_metadata_id(self, mock_get_context):
mock_context.get.return_value = mock_context.dag_run
mock_context.dag_run.conf.get.return_value = 1
mock_get_context.return_value = mock_context
raw_analysis_info = find_raw_metadata_id.function()
assert "raw_analysis_id" in raw_analysis_info
assert raw_analysis_info.get("raw_analysis_id") == 1
raw_analysis_id = find_raw_metadata_id.function()
assert raw_analysis_id == 1

@patch("igf_airflow.utils.dag44_analysis_registration_utils.get_data_from_portal",
return_value={'project_id': 1, 'pipeline_id': 2, 'analysis_name': 'a', 'analysis_yaml': 'b:'})
def test_fetch_raw_metadata_from_portal(self, *args):
raw_metadata_info = \
raw_metadata_file = \
fetch_raw_metadata_from_portal.function(raw_analysis_id=1)
assert "raw_metadata_file" in raw_metadata_info
assert os.path.exists(raw_metadata_file)


def test_check_registered_analysis_in_db(self):
Expand Down Expand Up @@ -125,16 +124,14 @@ def test_check_raw_metadata_in_db(self, *args):
'analysis_name': 'analysis_2',
'pipeline_id': 1,
'analysis_yaml': 'b:'}, fp)
valid_raw_metadata_info = \
valid_raw_metadata_file = \
check_raw_metadata_in_db.function(
raw_metadata_file=json_file_1)
assert "valid_raw_metadata_file" in valid_raw_metadata_info
assert valid_raw_metadata_info.get("valid_raw_metadata_file") == ""
valid_raw_metadata_info = \
assert valid_raw_metadata_file == ""
valid_raw_metadata_file = \
check_raw_metadata_in_db.function(
raw_metadata_file=json_file_2)
assert "valid_raw_metadata_file" in valid_raw_metadata_info
assert valid_raw_metadata_info.get("valid_raw_metadata_file") == json_file_2
assert valid_raw_metadata_file == json_file_2

def test_register_analysis_in_db(self):
status = \
Expand Down Expand Up @@ -190,16 +187,14 @@ def test_register_raw_analysis_metadata_in_db(self, *args):
'analysis_name': 'analysis_2',
'pipeline_id': 1,
'analysis_yaml': 'b:'}, fp)
status_info = \
status = \
register_raw_analysis_metadata_in_db.function(
valid_raw_metadata_file=json_file_1)
assert "status" in status_info
assert status_info.get("status") is False
status_info = \
assert status is False
status = \
register_raw_analysis_metadata_in_db.function(
valid_raw_metadata_file=json_file_2)
assert "status" in status_info
assert status_info.get("status") is True
assert status is True
aa = \
AnalysisAdaptor(**{'session_class': self.session_class})
aa.start_session()
Expand Down