Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/atecontroller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ func main() {
os.Exit(1)
}

if err = (&controllers.AteletNodeReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "AteletNode")
os.Exit(1)
}

//+kubebuilder:scaffold:builder

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
Expand Down
202 changes: 202 additions & 0 deletions internal/controllers/ateletnode_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controllers

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
k8errors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
corev1ac "k8s.io/client-go/applyconfigurations/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

const (
// AteletNodeLabel is the substrate-owned label that gates atelet DS
// scheduling. Present on a Node iff at least one ateom pod is
// currently scheduled to that Node.
AteletNodeLabel = "ate.dev/atelet"
AteletNodeLabelValue = "true"

// AteletNodeClaimAnnoPrefix is the prefix for per-pool claim
// annotations on Nodes. Full key: ate.dev/claim.<workerpool-uid>.
AteletNodeClaimAnnoPrefix = "ate.dev/claim."

// AteletNodeFieldOwner is the SSA field owner for substrate-managed
// Node fields (the label and claim annotations).
AteletNodeFieldOwner = "atelet-node-controller"

// PodNodeNameIndex is the field-indexer key for Pod.Spec.NodeName.
// Required for List(client.MatchingFields{PodNodeNameIndex: ...}).
PodNodeNameIndex = "spec.nodeName"
)

// claimAnnotationKey returns the Node annotation key that records a claim by
// the WorkerPool with the given UID. It is the single source of truth for the
// claim-annotation format, shared by AteletNodeReconciler (which writes claims)
// and the WorkerPool finalizer (which waits on them).
func claimAnnotationKey(workerPoolUID string) string {
return AteletNodeClaimAnnoPrefix + workerPoolUID
}

// AteletNodeReconciler reconciles ateom Pod events into Node labels and
// claim annotations so the atelet DaemonSet schedules only on Nodes
// currently hosting ateom workloads.
type AteletNodeReconciler struct {
client.Client
Scheme *runtime.Scheme
}

//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;patch
//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch

// Reconcile is keyed by Node name (we map Pod events to Node names in
// SetupWithManager). It converges the Node's substrate-owned label and
// claim annotations to match the set of WorkerPool UIDs currently
// scheduled to it.
func (r *AteletNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
return r.reconcileNode(ctx, req.Name)
}

func (r *AteletNodeReconciler) reconcileNode(ctx context.Context, nodeName string) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithValues("node", nodeName)

// Existence check only (the value is unused): if the node is gone, do
// nothing. The SSA Apply below would otherwise resurrect a deleted Node,
// since server-side apply upserts.
if err := r.Get(ctx, types.NamespacedName{Name: nodeName}, &corev1.Node{}); err != nil {
if k8errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, fmt.Errorf("get node %q: %w", nodeName, err)
}

// A pod that is Terminating still has spec.nodeName set and appears
// here until its object is actually deleted. That is intentional: the
// claim (and thus atelet) must outlive a draining ateom pod, so the
// claim is released only once the pod object is gone.
podList := &corev1.PodList{}
if err := r.List(ctx, podList, client.MatchingFields{PodNodeNameIndex: nodeName}); err != nil {
return ctrl.Result{}, fmt.Errorf("list pods on node %q: %w", nodeName, err)
}

poolUIDs := map[string]struct{}{}
for i := range podList.Items {
pod := &podList.Items[i]
// Defensive: the spec.nodeName field index already filters to this
// node, so this should always be true.
if pod.Spec.NodeName != nodeName {
continue
}
// Skip pods without a worker-pool UID label (e.g. atelet's own DS
// pods). The field index is not label-filtered, so non-ateom pods
// on this node can appear here.
uid, ok := pod.Labels[WorkerPoolUIDLabelKey]
if !ok || uid == "" {
continue
}
poolUIDs[uid] = struct{}{}
}

logger.V(1).Info("reconciling node claims", "pool_count", len(poolUIDs))
return ctrl.Result{}, r.applyNodeClaims(ctx, nodeName, poolUIDs)
}

// applyNodeClaims SSA-patches the Node so substrate-owned fields (the
// ate.dev/atelet label and ate.dev/claim.<uid> annotations) match the
// given set of WorkerPool UIDs. Previously-owned fields not in the set
// are removed via SSA field-ownership semantics.
func (r *AteletNodeReconciler) applyNodeClaims(ctx context.Context, nodeName string, poolUIDs map[string]struct{}) error {
nodeAC := corev1ac.Node(nodeName)

labels := map[string]string{}
if len(poolUIDs) > 0 {
labels[AteletNodeLabel] = AteletNodeLabelValue
}
nodeAC.Labels = labels

annotations := map[string]string{}
for uid := range poolUIDs {
annotations[claimAnnotationKey(uid)] = ""
}
nodeAC.Annotations = annotations

if err := r.Apply(ctx, nodeAC, client.FieldOwner(AteletNodeFieldOwner), client.ForceOwnership); err != nil {
return fmt.Errorf("apply node %q: %w", nodeName, err)
}
return nil
}

// podToNode maps a Pod event to a reconcile request for the Pod's
// assigned Node. Returns no requests for unscheduled Pods.
func (r *AteletNodeReconciler) podToNode(_ context.Context, obj client.Object) []reconcile.Request {
pod, ok := obj.(*corev1.Pod)
if !ok {
return nil
}
if pod.Spec.NodeName == "" {
return nil
}
return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: pod.Spec.NodeName}}}
}

// ateomPodPredicate returns true for pods that look like ateom workloads
// (those carrying the WorkerPoolUIDLabelKey). Atelet's own DS pods and other
// system pods are filtered out. Gating on the UID label — the key
// reconcileNode actually consumes for claim refcounting — keeps the watch
// filter aligned with the work the reconciler does, so a pod that can't be
// refcounted never triggers a no-op reconcile.
func ateomPodPredicate() predicate.Predicate {
return predicate.NewPredicateFuncs(func(obj client.Object) bool {
labels := obj.GetLabels()
if labels == nil {
return false
}
_, ok := labels[WorkerPoolUIDLabelKey]
return ok
})
}

// SetupWithManager registers the reconciler with the manager and adds
// a field indexer on Pod.Spec.NodeName so reconcileNode can List pods
// efficiently.
func (r *AteletNodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Pod{}, PodNodeNameIndex, func(o client.Object) []string {
p := o.(*corev1.Pod)
if p.Spec.NodeName == "" {
return nil
}
return []string{p.Spec.NodeName}
}); err != nil {
return err
}
return ctrl.NewControllerManagedBy(mgr).
Named("atelet-node").
Watches(
&corev1.Pod{},
handler.EnqueueRequestsFromMapFunc(r.podToNode),
builder.WithPredicates(ateomPodPredicate()),
).
Complete(r)
}
Loading
Loading