Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 117 additions & 48 deletions controllers/clusterpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,8 @@ type ClusterPolicyReconciler struct {

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the ClusterPolicy object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
// It compares the state specified by the ClusterPolicy object against the
// actual cluster state and performs operations to reconcile them.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.7.0/pkg/reconcile
Expand All @@ -112,8 +110,20 @@ func (r *ClusterPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return reconcile.Result{}, err
}

// TODO: Handle deletion of the main ClusterPolicy and cycle to the next one.
// We already have a main Clusterpolicy
// Handle deletion of the main ClusterPolicy and cycle to the next one.
// If the current singleton is being deleted, reset the controller so that
// the next available ClusterPolicy can be elected as the new singleton.
if clusterPolicyCtrl.singleton != nil && clusterPolicyCtrl.singleton.Name == instance.Name &&
instance.DeletionTimestamp != nil {
r.Log.Info("Main ClusterPolicy is being deleted, resetting controller state",
"name", instance.Name)
clusterPolicyCtrl = ClusterPolicyController{}
// RequeueAfter a short interval so that we pick up the next available
// ClusterPolicy (if any) once the current one has been fully removed.
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
}

// We already have a main ClusterPolicy and this is a duplicate, mark it as ignored.
if clusterPolicyCtrl.singleton != nil && clusterPolicyCtrl.singleton.Name != instance.Name {
instance.SetStatus(gpuv1.Ignored, clusterPolicyCtrl.operatorNamespace)
// do not change `clusterPolicyCtrl.operatorMetrics.reconciliationStatus` here,
Expand Down Expand Up @@ -243,38 +253,34 @@ func updateCRState(ctx context.Context, r *ClusterPolicyReconciler, namespacedNa
}
}

func addWatchNewGPUNode(r *ClusterPolicyReconciler, c controller.Controller, mgr ctrl.Manager) error {
// Define a mapping from the Node object in the event to one or more
// ClusterPolicy objects to Reconcile
mapFn := func(ctx context.Context, n *corev1.Node) []reconcile.Request {
// find all the ClusterPolicy to trigger their reconciliation
opts := []client.ListOption{} // Namespace = "" to list across all namespaces.
// buildNodeMapFn returns a mapping function that, for any Node event, enqueues
// a reconcile request for every ClusterPolicy found in the cluster.
func buildNodeMapFn(r *ClusterPolicyReconciler) handler.TypedMapFunc[*corev1.Node, reconcile.Request] {
return func(ctx context.Context, n *corev1.Node) []reconcile.Request {
opts := []client.ListOption{}
list := &gpuv1.ClusterPolicyList{}

err := r.List(ctx, list, opts...)
if err != nil {
if err := r.List(ctx, list, opts...); err != nil {
r.Log.Error(err, "Unable to list ClusterPolicies")
return []reconcile.Request{}
}

cpToRec := []reconcile.Request{}

for _, cp := range list.Items {
cpToRec = append(cpToRec, reconcile.Request{NamespacedName: types.NamespacedName{
Name: cp.GetName(),
Namespace: cp.GetNamespace(),
}})
}
r.Log.Info("Reconciliate ClusterPolicies after node label update", "nb", len(cpToRec))

return cpToRec
}
}

p := predicate.TypedFuncs[*corev1.Node]{
// buildNodePredicate returns a predicate that filters Node events to only those
// that are relevant to the GPU Operator (new GPU nodes, label changes, RHCOS deletions).
func buildNodePredicate() predicate.TypedFuncs[*corev1.Node] {
return predicate.TypedFuncs[*corev1.Node]{
CreateFunc: func(e event.TypedCreateEvent[*corev1.Node]) bool {
labels := e.Object.GetLabels()

return hasGPULabels(labels)
return hasGPULabels(e.Object.GetLabels())
},
UpdateFunc: func(e event.TypedUpdateEvent[*corev1.Node]) bool {
newLabels := e.ObjectNew.GetLabels()
Expand All @@ -290,9 +296,7 @@ func addWatchNewGPUNode(r *ClusterPolicyReconciler, c controller.Controller, mgr
newGPUWorkloadConfig, _ := getWorkloadConfig(newLabels, true)
gpuWorkloadConfigLabelChanged := oldGPUWorkloadConfig != newGPUWorkloadConfig

oldOSTreeLabel := oldLabels[nfdOSTreeVersionLabelKey]
newOSTreeLabel := newLabels[nfdOSTreeVersionLabelKey]
osTreeLabelChanged := oldOSTreeLabel != newOSTreeLabel
osTreeLabelChanged := oldLabels[nfdOSTreeVersionLabelKey] != newLabels[nfdOSTreeVersionLabelKey]

needsUpdate := gpuCommonLabelMissing ||
gpuCommonLabelOutdated ||
Expand All @@ -302,43 +306,61 @@ func addWatchNewGPUNode(r *ClusterPolicyReconciler, c controller.Controller, mgr
osTreeLabelChanged

if needsUpdate {
r.Log.Info("Node needs an update",
"name", nodeName,
"gpuCommonLabelMissing", gpuCommonLabelMissing,
"gpuCommonLabelOutdated", gpuCommonLabelOutdated,
"migManagerLabelMissing", migManagerLabelMissing,
"commonOperandsLabelChanged", commonOperandsLabelChanged,
"gpuWorkloadConfigLabelChanged", gpuWorkloadConfigLabelChanged,
"osTreeLabelChanged", osTreeLabelChanged,
)
// log is not accessible here; callers log as needed
_ = nodeName
}
return needsUpdate
},
DeleteFunc: func(e event.TypedDeleteEvent[*corev1.Node]) bool {
// if an RHCOS GPU node is deleted, trigger a
// reconciliation to ensure that there is no dangling
// OpenShift Driver-Toolkit (RHCOS version-specific)
// DaemonSet.
// NB: we cannot know here if the DriverToolkit is
// enabled.

labels := e.Object.GetLabels()

_, hasOSTreeLabel := labels[nfdOSTreeVersionLabelKey]

return hasGPULabels(labels) && hasOSTreeLabel
},
}
}

func addWatchNewGPUNode(r *ClusterPolicyReconciler, c controller.Controller, mgr ctrl.Manager) error {
mapFn := buildNodeMapFn(r)
p := buildNodePredicate()

// Wrap UpdateFunc to emit the "Node needs an update" log when relevant.
innerUpdate := p.UpdateFunc
p.UpdateFunc = func(e event.TypedUpdateEvent[*corev1.Node]) bool {
needsUpdate := innerUpdate(e)
if needsUpdate {
newLabels := e.ObjectNew.GetLabels()
oldLabels := e.ObjectOld.GetLabels()
nodeName := e.ObjectNew.GetName()

gpuCommonLabelMissing := hasGPULabels(newLabels) && !hasCommonGPULabel(newLabels)
gpuCommonLabelOutdated := !hasGPULabels(newLabels) && hasCommonGPULabel(newLabels)
migManagerLabelMissing := hasMIGCapableGPU(newLabels) && !hasMIGManagerLabel(newLabels)
commonOperandsLabelChanged := hasOperandsDisabled(oldLabels) != hasOperandsDisabled(newLabels)
oldGPUWorkloadConfig, _ := getWorkloadConfig(oldLabels, true)
newGPUWorkloadConfig, _ := getWorkloadConfig(newLabels, true)
gpuWorkloadConfigLabelChanged := oldGPUWorkloadConfig != newGPUWorkloadConfig
osTreeLabelChanged := oldLabels[nfdOSTreeVersionLabelKey] != newLabels[nfdOSTreeVersionLabelKey]

r.Log.Info("Node needs an update",
"name", nodeName,
"gpuCommonLabelMissing", gpuCommonLabelMissing,
"gpuCommonLabelOutdated", gpuCommonLabelOutdated,
"migManagerLabelMissing", migManagerLabelMissing,
"commonOperandsLabelChanged", commonOperandsLabelChanged,
"gpuWorkloadConfigLabelChanged", gpuWorkloadConfigLabelChanged,
"osTreeLabelChanged", osTreeLabelChanged,
)
}
return needsUpdate
}

err := c.Watch(
return c.Watch(
source.Kind(mgr.GetCache(),
&corev1.Node{},
handler.TypedEnqueueRequestsFromMapFunc[*corev1.Node](mapFn),
p,
),
)

return err
}

// SetupWithManager sets up the controller with the Manager.
Expand Down Expand Up @@ -371,8 +393,7 @@ func (r *ClusterPolicyReconciler) SetupWithManager(ctx context.Context, mgr ctrl
return err
}

// TODO(user): Modify this to be the types you create that are owned by the primary resource
// Watch for changes to secondary resource Daemonsets and requeue the owner ClusterPolicy
// Watch for changes to secondary resource DaemonSets owned by ClusterPolicy and requeue the owner.
err = c.Watch(
source.Kind(mgr.GetCache(),
&appsv1.DaemonSet{},
Expand All @@ -384,6 +405,54 @@ func (r *ClusterPolicyReconciler) SetupWithManager(ctx context.Context, mgr ctrl
return err
}

// Watch for changes to secondary resource Deployments owned by ClusterPolicy and requeue the owner.
err = c.Watch(
source.Kind(mgr.GetCache(),
&appsv1.Deployment{},
handler.TypedEnqueueRequestForOwner[*appsv1.Deployment](mgr.GetScheme(), mgr.GetRESTMapper(), &gpuv1.ClusterPolicy{},
handler.OnlyControllerOwner()),
),
)
if err != nil {
return err
}

// Watch for changes to secondary resource ConfigMaps owned by ClusterPolicy and requeue the owner.
err = c.Watch(
source.Kind(mgr.GetCache(),
&corev1.ConfigMap{},
handler.TypedEnqueueRequestForOwner[*corev1.ConfigMap](mgr.GetScheme(), mgr.GetRESTMapper(), &gpuv1.ClusterPolicy{},
handler.OnlyControllerOwner()),
),
)
if err != nil {
return err
}

// Watch for changes to secondary resource Services owned by ClusterPolicy and requeue the owner.
err = c.Watch(
source.Kind(mgr.GetCache(),
&corev1.Service{},
handler.TypedEnqueueRequestForOwner[*corev1.Service](mgr.GetScheme(), mgr.GetRESTMapper(), &gpuv1.ClusterPolicy{},
handler.OnlyControllerOwner()),
),
)
if err != nil {
return err
}

// Watch for changes to secondary resource ServiceAccounts owned by ClusterPolicy and requeue the owner.
err = c.Watch(
source.Kind(mgr.GetCache(),
&corev1.ServiceAccount{},
handler.TypedEnqueueRequestForOwner[*corev1.ServiceAccount](mgr.GetScheme(), mgr.GetRESTMapper(), &gpuv1.ClusterPolicy{},
handler.OnlyControllerOwner()),
),
)
if err != nil {
return err
}

// Add an index key which allows our reconciler to quickly look up DaemonSets owned by it.
//
// (cdesiniotis) Ideally we could duplicate this index for all the k8s objects
Expand Down