Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions igf_airflow/utils/dag1_calculate_hpc_worker_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
REDIS_CONF_FILE = \
Variable.get(
'redis_conn_file', default_var=None)
REDIS_ALLOWED_QUEUE_NAME_PREFIX_LIST= \
Variable.get(
'redis_allowed_queue_name_prefix_list', default_var=["hpc", "generic"])

def get_celery_flower_workers(
celery_flower_config_file: str,
Expand Down Expand Up @@ -77,11 +80,13 @@ def get_celery_flower_workers(


def fetch_queue_list_from_redis_server(
redis_conf_file: str) -> List[dict]:
redis_conf_file: str,
redis_allowed_queue_name_prefix_list: List[str]) -> List[dict]:
"""
A function for fetching pending job counts from redis db

:param redis_conf_file: A json file containing redis_db as key and redis db connection URL as value
:param redis_allowed_queue_name_prefix_list: A list of allowed redis queue names
:returns: A list of dictionaries with queue name as key and pending job counts as the value
"""
try:
Expand All @@ -97,8 +102,11 @@ def fetch_queue_list_from_redis_server(
queue = i.decode()
else:
queue = i
if not queue.startswith('_') and \
not queue.startswith('unacked'):
allowed_queue = False
for entry in redis_allowed_queue_name_prefix_list:
if queue.startswith(entry):
allowed_queue = True
if allowed_queue:
q_len = r.llen(queue)
queue_list.append({queue: q_len})
return queue_list
Expand Down Expand Up @@ -625,7 +633,8 @@ def redis_queue_workers():
try:
queue_list = \
fetch_queue_list_from_redis_server(
redis_conf_file=REDIS_CONF_FILE)
redis_conf_file=REDIS_CONF_FILE,
redis_allowed_queue_name_prefix_list=REDIS_ALLOWED_QUEUE_NAME_PREFIX_LIST)
return queue_list
except Exception as e:
log.error(e)
Expand Down
15 changes: 9 additions & 6 deletions test/igf_airflow/test_dag1_calculate_hpc_worker_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,19 @@ def test_get_celery_flower_workers(self, *args):
@patch('igf_airflow.utils.dag1_calculate_hpc_worker_utils.redis')
def test_fetch_queue_list_from_redis_server(self,redis_mock):
r = redis_mock.from_url.return_value
r.keys.return_value = {'A': 'a', 'B': 'b', 'unacked1': 'c', '_unacked2': 'd'}
r.keys.return_value = {'A1': 'a', 'B1': 'b', 'unacked1': 'c', '_unacked2': 'd'}
r.llen.side_effect = [1,2]
temp_dir = get_temp_dir()
redis_config_file = os.path.join(temp_dir, 'redis_config.json')
with open(redis_config_file, 'w') as json_data:
json.dump({'redis_db': 'A'}, json_data)
queue_list = fetch_queue_list_from_redis_server(redis_config_file)
queue_list = \
fetch_queue_list_from_redis_server(
redis_config_file,
redis_allowed_queue_name_prefix_list=["A", "B"])
self.assertEqual(len(queue_list), 2)
self.assertEqual(queue_list[0], {'A':1})
self.assertEqual(queue_list[1], {'B':2})
self.assertEqual(queue_list[0], {'A1':1})
self.assertEqual(queue_list[1], {'B1':2})

def test_calculate_scale_out_scale_in_ops(self):
input_data = [ {'queue_name':'hpc_4G','hpc_r':9,'hpc_q':0,'task_r':9,'task_i':0,'queued':7},
Expand Down Expand Up @@ -390,15 +393,15 @@ def test_celery_flower_workers(
self.assertEqual(worker_list[0]['worker_id'], 'worker1')

@patch('igf_airflow.utils.dag1_calculate_hpc_worker_utils.fetch_queue_list_from_redis_server',
return_value=[{'A':1}])
return_value=[{'hpc_4G': 1}])
def test_redis_queue_workers(self, *args):
redis_db_conf = os.path.join(self.temp_dir, 'redis_db_conf.json')
with open(redis_db_conf, 'w') as json_data:
json.dump({'A': 'B'}, json_data)
with patch("igf_airflow.utils.dag1_calculate_hpc_worker_utils.REDIS_CONF_FILE", redis_db_conf):
queue_list = redis_queue_workers.function()
self.assertEqual(len(queue_list), 1)
self.assertEqual(queue_list[0], {'A':1})
self.assertEqual(queue_list[0], {'hpc_4G':1})

@patch('igf_airflow.utils.dag1_calculate_hpc_worker_utils.combine_celery_and_hpc_worker_info',
return_value=['A', 'B'])
Expand Down