Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 85 additions & 31 deletions e2e/stress_test/continuous_failover_ha_multi_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(self, **kwargs):
self.sn_nodes = []
self.current_outage_node = None
self.snapshot_names = []
self.current_outage_nodes = []
self.disconnect_thread = None
self.outage_start_time = None
self.outage_end_time = None
Expand All @@ -60,8 +61,7 @@ def __init__(self, **kwargs):
# self.outage_types = ["graceful_shutdown", "container_stop", "interface_full_network_interrupt",
# "interface_partial_network_interrupt",
# "partial_nw"]
self.outage_types = ["graceful_shutdown", "container_stop", "interface_full_network_interrupt",
"interface_partial_network_interrupt"]
self.outage_types = ["graceful_shutdown", "container_stop", "interface_full_network_interrupt"]
# self.outage_types = ["partial_nw"]
self.blocked_ports = None
self.outage_log_file = os.path.join("logs", f"outage_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")
Expand Down Expand Up @@ -111,7 +111,26 @@ def create_lvols_with_fio(self, count):
lvol_name = f"{self.lvol_name}_{i}" if not is_crypto else f"c{self.lvol_name}_{i}"
self.logger.info(f"Creating lvol with Name: {lvol_name}, fs type: {fs_type}, crypto: {is_crypto}")
try:
if self.current_outage_node:
self.logger.info(f"Current Outage Node: {self.current_outage_nodes}")
if self.current_outage_nodes:
self.logger.info(f"Primary vs secondary: {self.sn_primary_secondary_map}")
skip_nodes = [node for node in self.sn_primary_secondary_map if self.sn_primary_secondary_map[node] in self.current_outage_nodes]
self.logger.info(f"Skip Nodes: {skip_nodes}")
for node in self.current_outage_nodes:
skip_nodes.append(node)
self.logger.info(f"Skip Nodes: {skip_nodes}")
self.logger.info(f"Storage Nodes with sec: {self.sn_nodes_with_sec}")
host_id = [node for node in self.sn_nodes_with_sec if node not in skip_nodes]
self.sbcli_utils.add_lvol(
lvol_name=lvol_name,
pool_name=self.pool_name,
size=self.lvol_size,
crypto=is_crypto,
key1=self.lvol_crypt_keys[0],
key2=self.lvol_crypt_keys[1],
host_id=host_id[0]
)
elif self.current_outage_node:
skip_nodes = [node for node in self.sn_primary_secondary_map if self.sn_primary_secondary_map[node] == self.current_outage_node]
skip_nodes.append(self.current_outage_node)
skip_nodes.append(self.sn_primary_secondary_map[self.current_outage_node])
Expand Down Expand Up @@ -276,7 +295,7 @@ def create_lvols_with_fio(self, count):
"iodepth": 1,
"numjobs": 5,
"time_based": True,
"runtime": 2000,
"runtime": 3000,
"log_avg_msec": 1000,
"iolog_file": self.lvol_mount_details[lvol_name]["iolog_base_path"],
},
Expand Down Expand Up @@ -308,9 +327,9 @@ def perform_random_outage(self):

sleep_n_sec(120)
for node in self.sn_nodes_with_sec:
self.ssh_obj.dump_lvstore(node_ip=self.mgmt_nodes[0],
storage_node_id=node)

# self.ssh_obj.dump_lvstore(node_ip=self.mgmt_nodes[0],
# storage_node_id=node)
self.logger.info(f"Skipping lvstore dump!!")
for node in self.sn_nodes_with_sec:
cur_node_details = self.sbcli_utils.get_storage_node_details(node)
cur_node_ip = cur_node_details[0]["mgmt_ip"]
Expand Down Expand Up @@ -417,7 +436,7 @@ def perform_random_outage(self):

self.disconnect_thread = threading.Thread(
target=self.ssh_obj.disconnect_all_active_interfaces,
args=(node_ip, active_interfaces, 600),
args=(node_ip, active_interfaces, 300),
)
self.disconnect_thread.start()
elif outage_type == "interface_partial_network_interrupt":
Expand All @@ -430,7 +449,7 @@ def perform_random_outage(self):

self.disconnect_thread = threading.Thread(
target=self.ssh_obj.disconnect_all_active_interfaces,
args=(node_ip, active_interfaces, 600),
args=(node_ip, active_interfaces, 300),
)
self.disconnect_thread.start()
elif outage_type == "partial_nw":
Expand Down Expand Up @@ -483,7 +502,7 @@ def perform_random_outage(self):
return outage_type


def restart_nodes_after_failover(self, outage_type):
def restart_nodes_after_failover(self, outage_type, restart=False):
"""Perform steps for node restart."""
node_details = self.sbcli_utils.get_storage_node_details(self.current_outage_node)
node_ip = node_details[0]["mgmt_ip"]
Expand Down Expand Up @@ -543,14 +562,48 @@ def restart_nodes_after_failover(self, outage_type):
self.ssh_obj.exec_command(node=self.lvol_mount_details[lvol]["Client"], command=connect)

elif outage_type == "container_stop":
self.sbcli_utils.wait_for_storage_node_status(self.current_outage_node, "online", timeout=1000)
# Log the restart event
self.log_outage_event(self.current_outage_node, outage_type, "Node restarted", outage_time=1)
if restart:
max_retries = 10
retry_delay = 10 # seconds

# Retry mechanism for restarting the node
for attempt in range(max_retries):
try:
force=False
if attempt == max_retries - 1:
force=True
self.logger.info("[CHECK] Restarting Node via CLI with Force flag as via API Fails.")
else:
self.logger.info("[CHECK] Restarting Node via CLI as via API Fails.")
self.ssh_obj.restart_node(node=self.mgmt_nodes[0],
node_id=self.current_outage_node,
force=force)
# else:
# self.sbcli_utils.restart_node(node_uuid=self.current_outage_node, expected_error_code=[503])
self.sbcli_utils.wait_for_storage_node_status(self.current_outage_node, "online", timeout=1000)
break # Exit loop if successful
except Exception as _:
if attempt < max_retries - 2:
self.logger.info(f"Attempt {attempt + 1} failed to restart node. Retrying in {retry_delay} seconds...")
sleep_n_sec(retry_delay)
elif attempt < max_retries - 1:
self.logger.info(f"Attempt {attempt + 1} failed to restart node via API. Retrying in {retry_delay} seconds via CMD...")
sleep_n_sec(retry_delay)
else:
self.logger.info("Max retries reached. Failed to restart node.")
raise # Rethrow the last exception
self.sbcli_utils.wait_for_storage_node_status(self.current_outage_node, "online", timeout=1000)
# Log the restart event
self.log_outage_event(self.current_outage_node, outage_type, "Node restarted", outage_time=0)
else:
self.sbcli_utils.wait_for_storage_node_status(self.current_outage_node, "online", timeout=1000)
# Log the restart event
self.log_outage_event(self.current_outage_node, outage_type, "Node restarted", outage_time=2)

elif "network_interrupt" in outage_type:
self.sbcli_utils.wait_for_storage_node_status(self.current_outage_node, "online", timeout=1000)
# Log the restart event
self.log_outage_event(self.current_outage_node, outage_type, "Node restarted", outage_time=11)
self.log_outage_event(self.current_outage_node, outage_type, "Node restarted", outage_time=6)

if not self.k8s_test:
for node in self.storage_nodes:
Expand Down Expand Up @@ -608,9 +661,9 @@ def restart_nodes_after_failover(self, outage_type):
# sleep_n_sec(30)

for node in self.sn_nodes_with_sec:
self.ssh_obj.dump_lvstore(node_ip=self.mgmt_nodes[0],
storage_node_id=node)

# self.ssh_obj.dump_lvstore(node_ip=self.mgmt_nodes[0],
# storage_node_id=node)
self.logger.info(f"Skipping lvstore dump!!")

def create_snapshots_and_clones(self):
"""Create snapshots and clones during an outage."""
Expand Down Expand Up @@ -777,7 +830,7 @@ def create_snapshots_and_clones(self):
"iodepth": 1,
"numjobs": 5,
"time_based": True,
"runtime": 2000,
"runtime": 3000,
"log_avg_msec": 1000,
"iolog_file": self.clone_mount_details[clone_name]["iolog_base_path"],
},
Expand All @@ -786,22 +839,23 @@ def create_snapshots_and_clones(self):
self.fio_threads.append(fio_thread)
self.logger.info(f"Created snapshot {snapshot_name} and clone {clone_name}.")

self.sbcli_utils.resize_lvol(lvol_id=self.lvol_mount_details[lvol]["ID"],
new_size=f"{self.int_lvol_size}G")
if self.lvol_mount_details[lvol]["ID"]:
self.sbcli_utils.resize_lvol(lvol_id=self.lvol_mount_details[lvol]["ID"],
new_size=f"{self.int_lvol_size}G")
sleep_n_sec(10)
self.sbcli_utils.resize_lvol(lvol_id=self.clone_mount_details[clone_name]["ID"],
new_size=f"{self.int_lvol_size}G")

if self.clone_mount_details[clone_name]["ID"]:
self.sbcli_utils.resize_lvol(lvol_id=self.clone_mount_details[clone_name]["ID"],
new_size=f"{self.int_lvol_size}G")


def delete_random_lvols(self, count):
"""Delete random lvols during an outage."""
skip_nodes = [node for node in self.sn_primary_secondary_map if self.sn_primary_secondary_map[node] == self.current_outage_node]
skip_nodes.append(self.current_outage_node)
skip_nodes.append(self.sn_primary_secondary_map[self.current_outage_node])
skip_nodes_lvol = []
self.logger.info(f"Skipping Nodes: {skip_nodes_lvol}")
self.logger.info(f"Skipping Nodes: {skip_nodes}")
available_lvols = [
lvol for node, lvols in self.node_vs_lvol.items() if node not in skip_nodes_lvol for lvol in lvols
lvol for node, lvols in self.node_vs_lvol.items() if node not in skip_nodes for lvol in lvols
]
self.logger.info(f"Available Lvols: {available_lvols}")
if len(available_lvols) < count:
Expand Down Expand Up @@ -922,7 +976,7 @@ def perform_failover_during_outage(self):
storage_node_id=node,
logs_path=self.docker_logs_path
)
self.create_lvols_with_fio(3)
self.create_lvols_with_fio(5)
if not self.k8s_test:
for node in self.storage_nodes:
self.ssh_obj.restart_docker_logging(
Expand Down Expand Up @@ -1041,7 +1095,7 @@ def restart_fio(self, iteration):
"iodepth": 1,
"numjobs": 5,
"time_based": True,
"runtime": 2000,
"runtime": 3000,
"log_avg_msec": 1000,
"iolog_file": self.lvol_mount_details[lvol]["iolog_base_path"],
},
Expand Down Expand Up @@ -1150,7 +1204,7 @@ def run(self):
storage_node_id=node,
logs_path=self.docker_logs_path
)
self.create_lvols_with_fio(5)
self.create_lvols_with_fio(3)
if not self.k8s_test:
for node in self.storage_nodes:
self.ssh_obj.restart_docker_logging(
Expand All @@ -1175,7 +1229,7 @@ def run(self):
else:
self.logger.info(f"Current outage node: {self.current_outage_node} is secondary node. Skipping delete and create")
if outage_type != "partial_nw" or outage_type != "partial_nw_single_port":
sleep_n_sec(280)
sleep_n_sec(100)
for node in self.sn_nodes_with_sec:
cur_node_details = self.sbcli_utils.get_storage_node_details(node)
cur_node_ip = cur_node_details[0]["mgmt_ip"]
Expand Down Expand Up @@ -1229,7 +1283,7 @@ def run(self):
# Perform failover and manage resources during outage
outage_type = self.perform_failover_during_outage()
if outage_type != "partial_nw" or outage_type != "partial_nw_single_port":
sleep_n_sec(100)
sleep_n_sec(280)
time_duration = self.common_utils.calculate_time_duration(
start_timestamp=self.outage_start_time,
end_timestamp=self.outage_end_time
Expand Down
Loading
Loading