Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 43 additions & 25 deletions igf_airflow/utils/dag22_bclconvert_demult_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4318,10 +4318,12 @@ def bclconvert_singularity_wrapper(
bcl_num_decompression_threads: int = 1,
bcl_num_parallel_tiles: int = 1,
lane_id : int = 0,
tile_id_list: tuple = (),
tiles: str|None = None,
exclude_tiles: str|None = None,
first_tile_only: bool = False,
dry_run: bool = False) \
-> str:
dry_run: bool = False,
additional_params: dict[str, str]| None = None
) -> str:
try:
check_file_path(image_path)
check_file_path(input_dir)
Expand All @@ -4331,39 +4333,55 @@ def bclconvert_singularity_wrapper(
temp_dir = get_temp_dir(use_ephemeral_space=True)
bclconvert_cmd = [
"bcl-convert",
"--bcl-input-directory", input_dir,
"--output-directory", output_dir,
"--sample-sheet", samplesheet_file,
"--bcl-num-conversion-threads", str(bcl_num_conversion_threads),
"--bcl-num-compression-threads", str(bcl_num_compression_threads),
"--bcl-num-decompression-threads", str(bcl_num_decompression_threads),
"--bcl-num-parallel-tiles", str(bcl_num_parallel_tiles),
"--bcl-sampleproject-subdirectories", "true",
"--strict-mode", "true"]
f"--bcl-input-directory {str(input_dir)}",
f"--output-directory {str(output_dir)}",
f"--sample-sheet {str(samplesheet_file)}",
f"--bcl-num-conversion-threads {str(bcl_num_conversion_threads)}",
f"--bcl-num-compression-threads {str(bcl_num_compression_threads)}",
f"--bcl-num-decompression-threads {str(bcl_num_decompression_threads)}",
f"--bcl-num-parallel-tiles {str(bcl_num_parallel_tiles)}",
"--bcl-sampleproject-subdirectories true",
"--strict-mode true"
]
if first_tile_only:
bclconvert_cmd.\
extend(["--first-tile-only", "true"])
bclconvert_cmd.append(
"--first-tile-only true"
)
if lane_id > 0:
bclconvert_cmd.\
extend(["--bcl-only-lane", str(lane_id)])
if len(tile_id_list) > 0:
bclconvert_cmd.\
extend(["--tiles", ",".join(tile_id_list)])
bclconvert_cmd = \
' '.join(bclconvert_cmd)
bclconvert_cmd.append(
f"--bcl-only-lane {str(lane_id)}",
)
if tiles is not None:
bclconvert_cmd.append(
f"--tiles {str(tiles)}"
)
if exclude_tiles is not None:
bclconvert_cmd.append(
f"--exclude-tiles {str(exclude_tiles)}"
)
if additional_params is not None:
for key, val in additional_params.items():
bclconvert_cmd.append(
f"{str(key)} {str(val)}"
)
bclconvert_cmd = ' '.join(bclconvert_cmd)
bind_paths = [
f'{temp_dir}:/var/log',
os.path.dirname(samplesheet_file),
input_dir,
os.path.dirname(output_dir)]
os.path.dirname(output_dir)
]
cmd = execute_singuarity_cmd(
image_path=image_path,
command_string=bclconvert_cmd,
bind_dir_list=bind_paths,
dry_run=dry_run)
dry_run=dry_run
)
return cmd
except:
raise
except Exception as e:
raise ValueError(
f"Failed with error: {e}"
)

def run_bclconvert_func(**context):
try:
Expand Down
172 changes: 99 additions & 73 deletions igf_airflow/utils/dag23_test_bclconvert_demult_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,103 +564,127 @@ def bcl_convert_run_func(**context):
try:
ti = context.get('ti')
dag_run = context.get('dag_run')
demult_dir_key = \
context['params'].\
get(
demult_dir_key = (
context['params']
.get(
'demult_dir_key',
'demult_dir')
demult_info_key = \
context['params'].\
get(
)
demult_info_key = (
context['params']
.get(
'demult_info_key',
'demult_info')
formatted_samplesheet_xcom_key = \
context['params'].\
get(
)
formatted_samplesheet_xcom_key = (
context['params']
.get(
'formatted_samplesheet_xcom_key',
'formatted_samplesheet_data')
formatted_samplesheet_xcom_task = \
context['params'].\
get(
)
formatted_samplesheet_xcom_task = (
context['params']
.get(
'formatted_samplesheet_xcom_task',
'get_formatted_samplesheets')
samplesheet_index = \
context['params'].\
get('samplesheet_index')
index_column = \
context['params'].\
get('index_column', 'index')
lane_column = \
context['params'].\
get('lane_column', 'lane')
filtered_df = \
_fetch_formatted_samplesheet_info_from_task_instance(
ti=ti,
samplesheet_index=samplesheet_index,
index_column=index_column,
samplesheet_key=formatted_samplesheet_xcom_key,
samplesheet_task=formatted_samplesheet_xcom_task)
)
samplesheet_index = (
context['params']
.get('samplesheet_index')
)
index_column = (
context['params']
.get('index_column', 'index')
)
lane_column = (
context['params']
.get('lane_column', 'lane')
)
filtered_df = _fetch_formatted_samplesheet_info_from_task_instance(
ti=ti,
samplesheet_index=samplesheet_index,
index_column=index_column,
samplesheet_key=formatted_samplesheet_xcom_key,
samplesheet_task=formatted_samplesheet_xcom_task
)
lane_id = filtered_df[lane_column].values[0]
if str(lane_id) != 'all':
lane_id = int(lane_id)
else:
lane_id = 0
mod_samplesheet_xcom_key = \
context['params'].\
get(
mod_samplesheet_xcom_key = (
context['params']
.get(
'mod_samplesheet_xcom_key',
'mod_samplesheet')
mod_samplesheet_xcom_task = \
context['params'].\
get('mod_samplesheet_xcom_task')
samplesheet_file = \
ti.xcom_pull(
task_ids=mod_samplesheet_xcom_task,
key=mod_samplesheet_xcom_key)
check_file_path(samplesheet_file)
)
mod_samplesheet_xcom_task = (
context['params']
.get('mod_samplesheet_xcom_task')
)
samplesheet_file = ti.xcom_pull(
task_ids=mod_samplesheet_xcom_task,
key=mod_samplesheet_xcom_key
)
check_file_path(
samplesheet_file
)
seqrun_id = None
if dag_run is not None and \
dag_run.conf is not None and \
dag_run.conf.get('seqrun_id') is not None:
seqrun_id = \
dag_run.conf.get('seqrun_id')
if (
dag_run is not None and
dag_run.conf is not None and
dag_run.conf.get('seqrun_id') is not None
):
seqrun_id = dag_run.conf.get(
'seqrun_id'
)
if seqrun_id is None:
raise ValueError('seqrun_id is not found in dag_run.conf')
raise ValueError(
'seqrun_id is not found in dag_run.conf'
)
# seqrun path
seqrun_path = \
os.path.join(HPC_SEQRUN_BASE_PATH, seqrun_id)
temp_dir = \
get_temp_dir(use_ephemeral_space=True)
demult_dir = \
os.path.join(
temp_dir,
'demult')
cmd = \
bclconvert_singularity_wrapper(
image_path=BCLCONVERT_IMAGE,
input_dir=seqrun_path,
output_dir=demult_dir,
lane_id=lane_id,
samplesheet_file=samplesheet_file,
bcl_num_conversion_threads=1,
bcl_num_compression_threads=1,
bcl_num_decompression_threads=1,
bcl_num_parallel_tiles=1,
first_tile_only=True)
seqrun_path = os.path.join(
HPC_SEQRUN_BASE_PATH,
seqrun_id
)
temp_dir = get_temp_dir(
use_ephemeral_space=True
)
demult_dir = os.path.join(
temp_dir,
'demult'
)
_ = bclconvert_singularity_wrapper(
image_path=BCLCONVERT_IMAGE,
input_dir=seqrun_path,
output_dir=demult_dir,
lane_id=lane_id,
samplesheet_file=samplesheet_file,
bcl_num_conversion_threads=1,
bcl_num_compression_threads=1,
bcl_num_decompression_threads=1,
bcl_num_parallel_tiles=1,
tiles='110[1-3]'
)
check_file_path(
os.path.join(
demult_dir,
'Reports',
'Demultiplex_Stats.csv'))
'Demultiplex_Stats.csv'
)
)
ti.xcom_push(
key=demult_dir_key,
value=demult_dir)
value=demult_dir
)
demult_info = {
index_column: samplesheet_index,
'demult_dir': demult_dir}
'demult_dir': demult_dir
}
ti.xcom_push(
key=demult_info_key,
value=demult_info)
value=demult_info
)
except Exception as e:
log.error(e)
ti = context.get('ti')
Expand All @@ -669,16 +693,18 @@ def bcl_convert_run_func(**context):
f"dag_id={ti.dag_id}",
f"run_id={ti.run_id}",
f"task_id={ti.task_id}",
f"attempt={ti.try_number}.log"]
message = \
f"Error: {e}, Log: {os.path.join(*log_file_path)}"
f"attempt={ti.try_number}.log"
]
message = f"Error: {e}," + \
f"Log: {os.path.join(*log_file_path)}"
send_log_to_channels(
slack_conf=SLACK_CONF,
ms_teams_conf=MS_TEAMS_CONF,
task_id=context['task'].task_id,
dag_id=context['task'].dag_id,
comment=message,
reaction='fail')
reaction='fail'
)
raise

def _fetch_formatted_samplesheet_info_from_task_instance(
Expand Down
2 changes: 1 addition & 1 deletion test/igf_airflow/dag22_bclconvert_demult_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def test_bclconvert_singularity_wrapper(self):
output_dir=self.output_dir,
samplesheet_file=self.samplesheet_file,
lane_id=1,
tile_id_list=('s_1_1102', 's_1_1103'),
tiles='s_1_1102,s_1_1103',
dry_run=True)
self.assertTrue('--tiles s_1_1102,s_1_1103' in cmd)

Expand Down
Loading