Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 69 additions & 13 deletions agents/cluster/mechanisms/fluidity.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@
# #
# #

import time
import asyncio
import json
import traceback
from asyncio import CancelledError
from dataclasses import dataclass, field
from typing import Dict
from pydantic import ValidationError

import mlstelemetry
import fluidity.controller as fluidity_controller

from fluidity.plan_payload import FluidityPlanPayload
from fluidity.internal_payload import FluidityEvent

# from mlsysops.logger_util import logger
from mlsysops import MessageEvents
import mlsysops
from mlsysops.logger_util import logger
Expand All @@ -43,8 +43,10 @@ class FluidityMechanism:
internal_queue_inbound = None
internal_queue_outbound = None
state = None
relocation_state = {}
fluidity_proxy_plans = {}

mls_client = mlstelemetry.MLSTelemetry("cluster_agent", "fluidity_mechanism")

def __init__(self, mlsysops_inbound_queue=None, mlsysops_outbound_queue=None, agent_state=None):
self.inbound_queue = mlsysops_inbound_queue
self.outbound_queue = mlsysops_outbound_queue
Expand Down Expand Up @@ -108,12 +110,6 @@ async def internal_message_listener(self):
# Listen to fluidity messages
message = await self.internal_queue_inbound.get()

# Log or save message for debugging
with open("fluidity_dump.json", "w") as file:
file.write(json.dumps(message, skipkeys=True, indent=4, default=str, ensure_ascii=False, sort_keys=True,
separators=(',', ': ')))
file.write(",\n") # Ensure a newline is added after the content

event = message.get("event")
if not event:
continue
Expand Down Expand Up @@ -258,7 +254,11 @@ async def internal_message_listener(self):
# Handle application created event
payload = message.get("payload", {})
app_name = payload.get("name")
del self.state["applications"][app_name]

if app_name in self.state["applications"]:
del self.state["applications"][app_name]
else:
logger.warning(f"App {app_name} not in state dictionary. Ignoring.")

elif event == MessageEvents.POD_MODIFIED.value:
# Handle pod modified event
Expand All @@ -273,8 +273,10 @@ async def internal_message_listener(self):
app_name = pod_labels.get("mlsysops.eu/app")
component_name = pod_labels.get("mlsysops.eu/component")
component_uid = pod_labels.get("mlsysops.eu/componentUID")
plan_uid = pod_labels.get("mlsysops.eu/planUID", None)

if app_name and app_name in self.state["applications"] and component_name:
logger.debug(f"Pod modified event caught for comp {component_name} of app {app_name}")
# Update the application component state
app = self.state["applications"][app_name]
components = app.get("components", {})
Expand All @@ -285,9 +287,35 @@ async def internal_message_listener(self):
"node_placed": node_name
})

# Test log
logger.test(
f"|3| Fluidity mechanism planuid:{pod_labels.get('mlsysops.eu/planUID','-')} pod modification status:Success")
if plan_uid and plan_uid in self.relocation_state and component_name in self.relocation_state[plan_uid]:
logger.debug(f"Pod name {pod_metadata['name']}")
logger.debug(f"Pod state {pod_state}")
logger.debug(f"Host name {node_name}")

start_timestamp = self.relocation_state[plan_uid][component_name]['start']
# Get timestamp of modification event of the new pod
curr_timestamp = time.perf_counter()
diff = curr_timestamp - start_timestamp

# If the new Pod is deployed successfully on the new host, record the delay
if pod_state == 'Running' and node_name == self.relocation_state[plan_uid][component_name]['dst']:
logger.info(f"Relocation delay is {diff}")
self.mls_client.pushMetric("relocation_delay", "gauge", diff, {"comp_name":component_name})
logger.debug(f"Removing {component_name} from relocation state of plan {plan_uid}")

# Remove entry from dictionary
del self.relocation_state[plan_uid][component_name]
if self.relocation_state[plan_uid] == {}:
logger.debug(f"relocation_state for plan {plan_uid} is empty.")
del self.relocation_state[plan_uid]
# If the Pod is in Pending state, we measure the delay until the call to kubernetes is done
elif pod_state == 'Pending' and node_name == self.relocation_state[plan_uid][component_name]['dst']:
logger.debug(f"New Pod start delay is {diff}")
self.mls_client.pushMetric("deployment_delay", "gauge", diff, {"comp_name":component_name})

# Test log
logger.test(
f"|3| Fluidity mechanism planuid:{pod_labels.get('mlsysops.eu/planUID','-')} pod modification status:Success")

# Update the node's state
if node_name:
Expand Down Expand Up @@ -361,6 +389,34 @@ async def apply(plan):
try:
await fluidity_mechanism_instance.internal_queue_outbound.put(msg)
logger.test(f"|1| Fluidity mechanism forwarded planuid:{plan['plan_uid']} to Fluidity status:True")

logger.debug(f"In apply, plan_uid {plan['plan_uid']}")
logger.debug(f"Plan {plan}")

for comp in plan['deployment_plan']:
if comp == 'initial_plan':
continue
# If 'move' action in a plan for a given component,
# Store the new plan info (plan_uid, plan creation timestamp, component names, src/dst nodes)
for action_entry in plan['deployment_plan'][comp]:
if action_entry['action'] == 'move':
if plan['plan_uid'] not in fluidity_mechanism_instance.relocation_state:
logger.debug(f"Creating entry for plan_uid {plan['plan_uid']}")
fluidity_mechanism_instance.relocation_state[plan['plan_uid']] = {}

curr_timestamp = time.perf_counter()

src = action_entry['src_host']
dst = action_entry['target_host']

logger.debug(f"Found move action for component {comp} from {src} to {dst}")
fluidity_mechanism_instance.relocation_state[plan['plan_uid']][comp] = {
'start': curr_timestamp,
'src': src,
'dst': dst
}

logger.debug(f"Current timestamp {curr_timestamp}")

except Exception as e:
logger.debug("Error in sending message to fluidity")
Expand Down