Skip to content
115 changes: 103 additions & 12 deletions controllers/clustersummary_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/workqueue"
clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
"sigs.k8s.io/cluster-api/util/annotations"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -95,6 +96,9 @@ const (

const (
clusterPausedMessage = "Cluster is paused"

rateLimiterBaseDelay = 1 * time.Second
rateLimiterMaxDelay = 5 * time.Minute
)

// ClusterSummaryReconciler reconciles a ClusterSummary object
Expand All @@ -118,7 +122,8 @@ type ClusterSummaryReconciler struct {

eventRecorder events.EventRecorder

DeletedInstances map[types.NamespacedName]time.Time
DeletedInstances map[types.NamespacedName]time.Time
NextReconcileTimes map[types.NamespacedName]time.Time // in-memory cooldown, survives status-patch conflicts
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is minor, in reconcileDelete, we should delete the clusterSummary instance if present in this map

}

// If the drift-detection component is deployed in the management cluster, the addon-controller will deploy ResourceSummaries within the same cluster,
Expand Down Expand Up @@ -178,7 +183,7 @@ func (r *ClusterSummaryReconciler) Reconcile(ctx context.Context, req ctrl.Reque

if r.skipReconciliation(clusterSummaryScope, req) {
logger.V(logs.LogInfo).Info("ignore update")
return reconcile.Result{Requeue: true, RequeueAfter: normalRequeueAfter}, nil
return reconcile.Result{Requeue: true, RequeueAfter: r.remainingCooldown(clusterSummaryScope, req)}, nil
}

var isMatch bool
Expand All @@ -205,9 +210,16 @@ func (r *ClusterSummaryReconciler) Reconcile(ctx context.Context, req ctrl.Reque
}

// Always close the scope when exiting this function so we can persist any ClusterSummary
// changes.
// changes. Conflict errors are swallowed because the watch event from whatever caused the
// conflict will re-enqueue this resource, and the next reconciliation will recompute status.
// Propagating the conflict would cause controller-runtime to immediately requeue, bypassing
// the intended NextReconcileTime backoff.
defer func() {
if err = clusterSummaryScope.Close(ctx); err != nil {
if apierrors.IsConflict(err) {
logger.V(logs.LogDebug).Info("conflict patching ClusterSummary status, will reconcile on next event")
return
}
reterr = err
}
}()
Expand Down Expand Up @@ -420,35 +432,50 @@ func (r *ClusterSummaryReconciler) reconcileNormal(ctx context.Context,
clusterSummaryScope.ClusterSummary.Status.ReconciliationSuspended = false
clusterSummaryScope.ClusterSummary.Status.SuspensionReason = nil

err = r.startWatcherForTemplateResourceRefs(ctx, clusterSummaryScope.ClusterSummary)
if result := r.prepareForDeployment(ctx, clusterSummaryScope, logger); result.RequeueAfter > 0 {
return result, nil
}

return r.proceedDeployingClusterSummary(ctx, clusterSummaryScope, logger)
}

func (r *ClusterSummaryReconciler) prepareForDeployment(ctx context.Context,
clusterSummaryScope *scope.ClusterSummaryScope, logger logr.Logger) reconcile.Result {

err := r.startWatcherForTemplateResourceRefs(ctx, clusterSummaryScope.ClusterSummary)
if err != nil {
logger.V(logs.LogInfo).Error(err, "failed to start watcher on resources referenced in TemplateResourceRefs.")
return reconcile.Result{Requeue: true, RequeueAfter: deleteRequeueAfter}, nil
r.setNextReconcileTime(clusterSummaryScope, deleteRequeueAfter)
return reconcile.Result{RequeueAfter: deleteRequeueAfter}
}

allDeployed, msg, err := r.areDependenciesDeployed(ctx, clusterSummaryScope, logger)
if err != nil {
return reconcile.Result{Requeue: true, RequeueAfter: normalRequeueAfter}, nil
r.setNextReconcileTime(clusterSummaryScope, normalRequeueAfter)
return reconcile.Result{RequeueAfter: normalRequeueAfter}
}
clusterSummaryScope.SetDependenciesMessage(&msg)
if !allDeployed {
return reconcile.Result{Requeue: true, RequeueAfter: normalRequeueAfter}, nil
r.setNextReconcileTime(clusterSummaryScope, normalRequeueAfter)
return reconcile.Result{RequeueAfter: normalRequeueAfter}
}

err = r.updateChartMap(ctx, clusterSummaryScope, logger)
if err != nil {
return reconcile.Result{Requeue: true, RequeueAfter: normalRequeueAfter}, nil
r.setNextReconcileTime(clusterSummaryScope, normalRequeueAfter)
return reconcile.Result{RequeueAfter: normalRequeueAfter}
}

if !clusterSummaryScope.IsContinuousWithDriftDetection() {
err = r.removeResourceSummary(ctx, clusterSummaryScope, logger)
if err != nil {
logger.V(logs.LogInfo).Error(err, "failed to remove ResourceSummary.")
return reconcile.Result{Requeue: true, RequeueAfter: normalRequeueAfter}, nil
r.setNextReconcileTime(clusterSummaryScope, normalRequeueAfter)
return reconcile.Result{RequeueAfter: normalRequeueAfter}
}
}

return r.proceedDeployingClusterSummary(ctx, clusterSummaryScope, logger)
return reconcile.Result{}
}

func (r *ClusterSummaryReconciler) proceedDeployingClusterSummary(ctx context.Context,
Expand All @@ -460,6 +487,7 @@ func (r *ClusterSummaryReconciler) proceedDeployingClusterSummary(ctx context.Co
ok := errors.As(err, &conflictErr)
if ok {
logger.V(logs.LogInfo).Error(err, "failed to deploy because of conflict")
r.setNextReconcileTime(clusterSummaryScope, r.ConflictRetryTime)
return reconcile.Result{Requeue: true, RequeueAfter: r.ConflictRetryTime}, nil
}

Expand All @@ -471,6 +499,7 @@ func (r *ClusterSummaryReconciler) proceedDeployingClusterSummary(ctx context.Co
"checkName", healthCheckError.CheckName,
"reason", healthCheckError.InternalErr.Error(),
"requeueAfter", r.HealthErrorRetryTime.String())
r.setNextReconcileTime(clusterSummaryScope, r.HealthErrorRetryTime)
return reconcile.Result{Requeue: true, RequeueAfter: r.HealthErrorRetryTime}, nil
}

Expand Down Expand Up @@ -513,6 +542,7 @@ func (r *ClusterSummaryReconciler) proceedDeployingClusterSummary(ctx context.Co
}

logger.V(logs.LogInfo).Error(err, "failed to deploy")
r.setNextReconcileTime(clusterSummaryScope, requeueAfter)
return reconcile.Result{Requeue: true, RequeueAfter: requeueAfter}, nil
}

Expand All @@ -521,12 +551,59 @@ func (r *ClusterSummaryReconciler) proceedDeployingClusterSummary(ctx context.Co
if clusterSummaryScope.IsDryRunSync() {
r.resetFeatureStatusToProvisioning(clusterSummaryScope)
// we need to keep retrying in DryRun ClusterSummaries
r.setNextReconcileTime(clusterSummaryScope, dryRunRequeueAfter)
return reconcile.Result{Requeue: true, RequeueAfter: dryRunRequeueAfter}, nil
}

return reconcile.Result{}, nil
}

// setNextReconcileTime sets NextReconcileTime on the ClusterSummary status
// so that skipReconciliation() can honor the intended backoff period
// even when a watch event re-enqueues the item before RequeueAfter fires.
// It also records the cooldown in the reconciler's in-memory map so that the
// guard works even if the status patch fails (e.g. due to a conflict).
func (r *ClusterSummaryReconciler) setNextReconcileTime(
clusterSummaryScope *scope.ClusterSummaryScope, d time.Duration) {

nextTime := time.Now().Add(d)
clusterSummaryScope.ClusterSummary.Status.NextReconcileTime =
&metav1.Time{Time: nextTime}

// Mirror in the in-memory map so skipReconciliation works even if scope.Close()
// encounters a conflict and the status field is never persisted.
key := types.NamespacedName{
Namespace: clusterSummaryScope.ClusterSummary.Namespace,
Name: clusterSummaryScope.ClusterSummary.Name,
}
r.PolicyMux.Lock()
r.NextReconcileTimes[key] = nextTime
r.PolicyMux.Unlock()
}

// remainingCooldown returns the time remaining before the next reconciliation
// should proceed, checking both the persisted status field and the in-memory map.
func (r *ClusterSummaryReconciler) remainingCooldown(
clusterSummaryScope *scope.ClusterSummaryScope, req ctrl.Request) time.Duration {

requeueAfter := normalRequeueAfter
if nrt := clusterSummaryScope.ClusterSummary.Status.NextReconcileTime; nrt != nil {
if remaining := time.Until(nrt.Time); remaining > 0 {
requeueAfter = remaining
}
}

r.PolicyMux.Lock()
if v, ok := r.NextReconcileTimes[req.NamespacedName]; ok {
if remaining := time.Until(v); remaining > requeueAfter {
requeueAfter = remaining
}
}
r.PolicyMux.Unlock()

return requeueAfter
}

// SetupWithManager sets up the controller with the Manager.
func (r *ClusterSummaryReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
c, err := ctrl.NewControllerManagedBy(mgr).
Expand All @@ -536,6 +613,10 @@ func (r *ClusterSummaryReconciler) SetupWithManager(ctx context.Context, mgr ctr
).
WithOptions(controller.Options{
MaxConcurrentReconciles: r.ConcurrentReconciles,
RateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](
rateLimiterBaseDelay,
rateLimiterMaxDelay,
),
}).
Watches(&libsveltosv1beta1.SveltosCluster{},
handler.EnqueueRequestsFromMapFunc(r.requeueClusterSummaryForSveltosCluster),
Expand Down Expand Up @@ -578,6 +659,7 @@ func (r *ClusterSummaryReconciler) SetupWithManager(ctx context.Context, mgr ctr
initializeManager(ctrl.Log.WithName("watchers"), mgr.GetConfig(), mgr.GetClient())

r.DeletedInstances = make(map[types.NamespacedName]time.Time)
r.NextReconcileTimes = make(map[types.NamespacedName]time.Time)
r.eventRecorder = mgr.GetEventRecorder("event-recorder")
r.ctrl = c

Expand Down Expand Up @@ -1642,10 +1724,19 @@ func (r *ClusterSummaryReconciler) skipReconciliation(clusterSummaryScope *scope
}
}

// Checking if reconciliation should happen
if cs.Status.NextReconcileTime != nil && time.Now().Before(cs.Status.NextReconcileTime.Time) {
// Checking if reconciliation should happen — check both the persisted status field
// and the in-memory map (which survives status-patch conflicts).
now := time.Now()
if cs.Status.NextReconcileTime != nil && now.Before(cs.Status.NextReconcileTime.Time) {
return true
}
if v, ok := r.NextReconcileTimes[req.NamespacedName]; ok {
if now.Before(v) {
return true
}
// Cooldown expired — remove from map
delete(r.NextReconcileTimes, req.NamespacedName)
}

cs.Status.NextReconcileTime = nil

Expand Down
10 changes: 10 additions & 0 deletions controllers/clustersummary_deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,16 @@ func (r *ClusterSummaryReconciler) proceedDeployingFeature(ctx context.Context,
return r.proceedDeployingFeatureInPullMode(ctx, clusterSummaryScope, f, isConfigSame, currentHash, logger)
}

// Skip status update if already provisioned with the same hash — avoids
// unnecessary status patches that would trigger watch events and re-enqueue.
if existingFS := getFeatureSummaryForFeatureID(clusterSummary, f.id); existingFS != nil &&
existingFS.Status == libsveltosv1beta1.FeatureStatusProvisioned &&
reflect.DeepEqual(existingFS.Hash, currentHash) {

logger.V(logs.LogDebug).Info("feature already provisioned with same hash, skipping status update")
return nil
}

r.updateFeatureStatus(clusterSummaryScope, f.id, deployerStatus, currentHash, deployerError, logger)
message := fmt.Sprintf("Feature: %s deployed to cluster %s %s/%s", f.id,
clusterSummary.Spec.ClusterType, clusterSummary.Spec.ClusterNamespace, clusterSummary.Spec.ClusterName)
Expand Down
5 changes: 3 additions & 2 deletions controllers/controllers_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,9 @@ func getClusterSummaryReconciler(c client.Client, dep deployer.DeployerInterface
Deployer: dep,
ClusterMap: make(map[corev1.ObjectReference]*libsveltosset.Set),
ReferenceMap: make(map[corev1.ObjectReference]*libsveltosset.Set),
DeletedInstances: make(map[types.NamespacedName]time.Time),
PolicyMux: sync.Mutex{},
DeletedInstances: make(map[types.NamespacedName]time.Time),
NextReconcileTimes: make(map[types.NamespacedName]time.Time),
PolicyMux: sync.Mutex{},
}
}

Expand Down
2 changes: 2 additions & 0 deletions controllers/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ var (
AddStageStatus = addStageStatus
UpdateStageStatus = updateStageStatus
GetMainDeploymentClusterProfileLabels = getMainDeploymentClusterProfileLabels

SetNextReconcileTime = (*ClusterSummaryReconciler).setNextReconcileTime
)

var (
Expand Down
Loading