Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
run: pip install .

- name: Run tests
run: pytest --import-mode=importlib -v
run: pytest --import-mode=importlib --tb=short --capture=no


lint:
Expand Down
1 change: 0 additions & 1 deletion e2e/continuous_log_collector.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os
from datetime import datetime
from pathlib import Path
from utils.ssh_utils import SshUtils, RunnerK8sLog
from logger_config import setup_logger

Expand Down
2 changes: 1 addition & 1 deletion e2e/e2e_tests/cluster_test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def collect_management_details(self, post_teardown=False):
self.ssh_obj.exec_command(self.mgmt_nodes[0], cmd)

node+=1
all_nodes = self.storage_nodes + self.mgmt_nodes + self.client_machines:
all_nodes = self.storage_nodes + self.mgmt_nodes + self.client_machines
for node in all_nodes:
base_path = os.path.join(self.docker_logs_path, node)
cmd = f"journalctl -k --no-tail >& {base_path}/jounalctl_{node}-final.txt"
Expand Down
4 changes: 2 additions & 2 deletions e2e/stress_test/continuous_failover_ha_multi_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ def perform_random_outage(self):
for node in self.sn_nodes_with_sec:
# self.ssh_obj.dump_lvstore(node_ip=self.mgmt_nodes[0],
# storage_node_id=node)
self.logger.info(f"Skipping lvstore dump!!")
self.logger.info("Skipping lvstore dump!!")
for node in self.sn_nodes_with_sec:
cur_node_details = self.sbcli_utils.get_storage_node_details(node)
cur_node_ip = cur_node_details[0]["mgmt_ip"]
Expand Down Expand Up @@ -663,7 +663,7 @@ def restart_nodes_after_failover(self, outage_type, restart=False):
for node in self.sn_nodes_with_sec:
# self.ssh_obj.dump_lvstore(node_ip=self.mgmt_nodes[0],
# storage_node_id=node)
self.logger.info(f"Skipping lvstore dump!!")
self.logger.info("Skipping lvstore dump!!")

def create_snapshots_and_clones(self):
"""Create snapshots and clones during an outage."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ def _seed_snapshots_and_clones(self):
if err:
nqn = self.sbcli_utils.get_lvol_details(lvol_id=self.clone_mount_details[clone_name]["ID"])[0]["nqn"]
self.ssh_obj.disconnect_nvme(node=client, nqn_grep=nqn)
self.logger.info(f"[LFNG] connect clone error → cleanup")
self.logger.info("[LFNG] connect clone error → cleanup")
self.sbcli_utils.delete_lvol(lvol_name=clone_name, max_attempt=20, skip_error=True)
sleep_n_sec(3)
del self.clone_mount_details[clone_name]
Expand Down Expand Up @@ -431,7 +431,6 @@ def _perform_outage(self):
return outage_type

def restart_nodes_after_failover(self, outage_type):
node_details = self.sbcli_utils.get_storage_node_details(self.current_outage_node)

self.logger.info(f"[LFNG] Recover outage={outage_type} node={self.current_outage_node}")

Expand Down
3 changes: 2 additions & 1 deletion e2e/utils/ssh_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2891,7 +2891,8 @@ def stop_log_monitor(self):
print("K8s log monitor thread stopped.")

def _rid(n=6):
import string, random
import string
import random
letters = string.ascii_uppercase
digits = string.digits
return random.choice(letters) + ''.join(random.choices(letters + digits, k=n-1))
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ docker
psutil
py-cpuinfo
pytest
pytest-mock
mock
flask
kubernetes
Expand Down
10 changes: 1 addition & 9 deletions simplyblock_core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def get_config_var(name, default=None):
SSD_VENDOR_WHITE_LIST = ["1d0f:cd01", "1d0f:cd00"]
CACHED_LVOL_STAT_COLLECTOR_INTERVAL_SEC = 5
DEV_DISCOVERY_INTERVAL_SEC = 60
LVOL_SCHEDULER_INTERVAL_SEC = 60*15

PMEM_DIR = '/tmp/pmem'

Expand All @@ -60,15 +61,6 @@ def get_config_var(name, default=None):

CLUSTER_NQN = "nqn.2023-02.io.simplyblock"

weights = {
"lvol": 100,
# "cpu": 10,
# "r_io": 10,
# "w_io": 10,
# "r_b": 10,
# "w_b": 10
}


HEALTH_CHECK_INTERVAL_SEC = 30

Expand Down
134 changes: 78 additions & 56 deletions simplyblock_core/controllers/lvol_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,49 +134,79 @@ def validate_add_lvol_func(name, size, host_id_or_name, pool_id_or_name,

def _get_next_3_nodes(cluster_id, lvol_size=0):
db_controller = DBController()
snodes = db_controller.get_storage_nodes_by_cluster_id(cluster_id)
online_nodes = []
node_stats = {}
for node in snodes:
node_stats: dict = {}
nodes_below_25 = []
nodes_between_25_75 = []
nodes_above_75 = []

for node in db_controller.get_storage_nodes_by_cluster_id(cluster_id):
if node.is_secondary_node: # pass
continue

if node.status == node.STATUS_ONLINE:

lvol_count = len(db_controller.get_lvols_by_node_id(node.get_id()))
if lvol_count >= node.max_lvol:
continue

# Validate Eligible nodes for adding lvol
# snode_api = SNodeClient(node.api_endpoint)
# result, _ = snode_api.info()
# memory_free = result["memory_details"]["free"]
# huge_free = result["memory_details"]["huge_free"]
# total_node_capacity = db_controller.get_snode_size(node.get_id())
# error = utils.validate_add_lvol_or_snap_on_node(memory_free, huge_free, node.max_lvol, lvol_size, total_node_capacity, len(node.lvols))
# if error:
# logger.warning(error)
# continue
#
online_nodes.append(node)
# node_stat_list = db_controller.get_node_stats(node, limit=1000)
# combined_record = utils.sum_records(node_stat_list)
node_st = {
"lvol": lvol_count+1,
# "cpu": 1 + (node.cpu * node.cpu_hz),
# "r_io": combined_record.read_io_ps,
# "w_io": combined_record.write_io_ps,
# "r_b": combined_record.read_bytes_ps,
# "w_b": combined_record.write_bytes_ps
}

node_stats[node.get_id()] = node_st

if len(online_nodes) <= 1:
return online_nodes
if node.node_size_util < 25:
nodes_below_25.append(node)
elif node.node_size_util >= 75:
nodes_above_75.append(node)
else:
nodes_between_25_75.append(node)

logger.info(f"nodes_below_25: {len(nodes_below_25)}")
logger.info(f"nodes_between_25_75: {len(nodes_between_25_75)}")
logger.info(f"nodes_above_75: {len(nodes_above_75)}")

if len(nodes_below_25+nodes_between_25_75+nodes_above_75) <= 1:
return nodes_below_25+nodes_between_25_75+nodes_above_75

if len(nodes_below_25) > len(nodes_between_25_75) and len(nodes_below_25) > len(nodes_above_75):
"""
if sum of lvols (+snapshots, including namespace lvols) per node is utilized < [0.25 * max-size] AND
number of lvols (snapshots dont count extra and namspaces on same subsystem count only once) < [0.25 * max-lvol]

--> simply round-robin schedule lvols (no randomization, no weights)
BUT: if storage utilization > 25% on one or more nodes, those nodes are excluded from round robin

"""

for node in nodes_below_25:
node_stats[node.get_id()] = node.lvol_count_util

sorted_keys = list(node_stats.values())
sorted_keys.sort()
sorted_nodes = []
for k in sorted_keys:
for node in nodes_below_25:
if node.lvol_count_util == k:
if node not in sorted_nodes:
sorted_nodes.append(node)
return sorted_nodes

elif len(nodes_between_25_75) > len(nodes_above_75):
"""
Once all nodes have > 25% of storage utilization, we weight
(relative-number-of-lvol-compared-to-total-number + relative-utilization-compared-to-total-utilzation)
--> and based on the weight just a random location
"""
for node in nodes_between_25_75:
node_stats[node.get_id()] = {
"lvol_count_util": node.lvol_count_util,
"node_size_util": node.node_size_util}

elif len(nodes_below_25) < len(nodes_above_75) and len(nodes_between_25_75) < len(nodes_above_75) :
"""
Once a node has > 75% uof storage utilization, it is excluded to add new lvols
(unless all nodes exceed this limit, than it is weighted again)
"""
for node in nodes_above_75:
node_stats[node.get_id()] = {
"lvol_count_util": node.lvol_count_util,
"node_size_util": node.node_size_util}


keys_weights = {
"lvol_count_util": 50,
"node_size_util": 50}
cluster_stats = utils.dict_agg([node_stats[k] for k in node_stats])

nodes_weight = utils.get_weights(node_stats, cluster_stats)
nodes_weight = utils.get_weights(node_stats, cluster_stats, keys_weights)

node_start_end = {}
n_start = 0
Expand All @@ -191,19 +221,14 @@ def _get_next_3_nodes(cluster_id, lvol_size=0):
for node_id in node_start_end:
node_start_end[node_id]['%'] = int(node_start_end[node_id]['weight'] * 100 / n_start)

############# log
print("Node stats")
utils.print_table_dict({**node_stats, "Cluster": cluster_stats})
print("Node weights")
utils.print_table_dict({**nodes_weight, "weights": {"lvol": n_start, "total": n_start}})
print("Node selection range")
utils.print_table_dict(node_start_end)
#############
logger.info(f"Node stats: \n {utils.print_table_dict({**node_stats, 'Cluster': cluster_stats})}")
logger.info(f"Node weights: \n {utils.print_table_dict({**nodes_weight})}")
logger.info(f"Node selection range: \n {utils.print_table_dict(node_start_end)}")

selected_node_ids: List[str] = []
while len(selected_node_ids) < min(len(node_stats), 3):
r_index = random.randint(0, n_start)
print(f"Random is {r_index}/{n_start}")
logger.info(f"Random is {r_index}/{n_start}")
for node_id in node_start_end:
if node_start_end[node_id]['start'] <= r_index <= node_start_end[node_id]['end']:
if node_id not in selected_node_ids:
Expand All @@ -224,14 +249,11 @@ def _get_next_3_nodes(cluster_id, lvol_size=0):
break

ret = []
if selected_node_ids:
for node_id in selected_node_ids:
node = db_controller.get_storage_node_by_id(node_id)
print(f"Selected node: {node_id}, {node.hostname}")
ret.append(node)
return ret
else:
return online_nodes
for node_id in selected_node_ids:
node = db_controller.get_storage_node_by_id(node_id)
logger.info(f"Selected node: {node_id}, {node.hostname}")
ret.append(node)
return ret

def is_hex(s: str) -> bool:
"""
Expand Down
2 changes: 1 addition & 1 deletion simplyblock_core/env_var
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
SIMPLY_BLOCK_COMMAND_NAME=sbcli-dev
SIMPLY_BLOCK_VERSION=19.2.27

SIMPLY_BLOCK_DOCKER_IMAGE=public.ecr.aws/simply-block/simplyblock:main
SIMPLY_BLOCK_DOCKER_IMAGE=public.ecr.aws/simply-block/simplyblock:main-lvol-scheduler
SIMPLY_BLOCK_SPDK_ULTRA_IMAGE=public.ecr.aws/simply-block/ultra:main-latest

7 changes: 7 additions & 0 deletions simplyblock_core/models/storage_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from simplyblock_core.models.iface import IFace
from simplyblock_core.models.nvme_device import NVMeDevice, JMDevice
from simplyblock_core.rpc_client import RPCClient, RPCException
from simplyblock_core.snode_client import SNodeClient

logger = utils.get_logger(__name__)

Expand Down Expand Up @@ -102,6 +103,8 @@ class StorageNode(BaseNodeObject):
hublvol: HubLVol = None # type: ignore[assignment]
active_tcp: bool = True
active_rdma: bool = False
lvol_count_util: int = 0
node_size_util: int = 0

def rpc_client(self, **kwargs):
"""Return rpc client to this node
Expand All @@ -110,6 +113,10 @@ def rpc_client(self, **kwargs):
self.mgmt_ip, self.rpc_port,
self.rpc_username, self.rpc_password, **kwargs)

def snode_api(self, **kwargs) -> SNodeClient:
"""Return storage node API client to this node"""
return SNodeClient(f"{self.mgmt_ip}:5000", timeout=10, retry=2)

def expose_bdev(self, nqn, bdev_name, model_number, uuid, nguid, port):
rpc_client = self.rpc_client()

Expand Down
39 changes: 35 additions & 4 deletions simplyblock_core/services/capacity_and_stats_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from simplyblock_core.rpc_client import RPCClient
from simplyblock_core.models.stats import DeviceStatObject, NodeStatObject, ClusterStatObject

logger = utils.get_logger(__name__)


logger = utils.get_logger(__name__)
db = db_controller.DBController()
last_object_record: dict[str, DeviceStatObject] = {}


Expand Down Expand Up @@ -164,9 +164,36 @@ def add_cluster_stats(cl, records):
return stat_obj


def add_lvol_scheduler_values(node: StorageNode):
node_used_size = 0
lvols_subsystems = []
for lvol in db.get_lvols_by_node_id(node.get_id()):
records = db.get_lvol_stats(lvol, 1)
if records:
node_used_size += records[0].size_used
if lvol.nqn not in lvols_subsystems:
lvols_subsystems.append(lvol.nqn)
for snap in db.get_snapshots_by_node_id(node.get_id()):
node_used_size += snap.used_size

lvol_count_util = int(len(lvols_subsystems) / node.max_lvol * 100)
node_size_util = int(node_used_size / node.max_prov * 100)

if lvol_count_util <= 0 or node_size_util <= 0:
return False
db_node = db.get_storage_node_by_id(node.get_id())
db_node.lvol_count_util = lvol_count_util
db_node.node_size_util = node_size_util
db_node.write_to_db()

# Once a node has > 90% of storage utilization, the largest lvol will be live migrated to another node
# (the one with small storage utilization)
if db_node.node_size_util > 90:
pass # todo: migration lvols to free space

return True


# get DB controller
db = db_controller.DBController()

logger.info("Starting capacity and stats collector...")
while True:
Expand Down Expand Up @@ -214,6 +241,10 @@ def add_cluster_stats(cl, records):
node_record = add_node_stats(node, devices_records)
node_records.append(node_record)

ret = add_lvol_scheduler_values(node)
if not ret:
logger.warning("Failed to add lvol scheduler values")

add_cluster_stats(cl, node_records)

time.sleep(constants.DEV_STAT_COLLECTOR_INTERVAL_SEC)
Loading
Loading