Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 47 additions & 24 deletions internal/controller/kustomization_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand All @@ -47,7 +46,7 @@ import (
"github.com/fluxcd/cli-utils/pkg/kstatus/polling/engine"
"github.com/fluxcd/cli-utils/pkg/object"
apiacl "github.com/fluxcd/pkg/apis/acl"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
eventv1 "github.com/fluxcd/pkg/apis/event/v1"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/auth"
authutils "github.com/fluxcd/pkg/auth/utils"
Expand All @@ -59,6 +58,7 @@ import (
runtimeClient "github.com/fluxcd/pkg/runtime/client"
"github.com/fluxcd/pkg/runtime/conditions"
runtimeCtrl "github.com/fluxcd/pkg/runtime/controller"
"github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/pkg/runtime/jitter"
"github.com/fluxcd/pkg/runtime/patch"
"github.com/fluxcd/pkg/runtime/statusreaders"
Expand All @@ -85,7 +85,7 @@ import (
// KustomizationReconciler reconciles a Kustomization object
type KustomizationReconciler struct {
client.Client
kuberecorder.EventRecorder
EventRecorder *events.Recorder
runtimeCtrl.Metrics

// Kubernetes options
Expand Down Expand Up @@ -135,6 +135,18 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
// Initialize the runtime patcher with the current version of the object.
patcher := patch.NewSerialPatcher(obj, r.Client)

var src sourcev1.Source
// src := &metav1.PartialObjectMetadata{
// TypeMeta: metav1.TypeMeta{
// Kind: obj.Spec.SourceRef.Kind,
// APIVersion: obj.Spec.SourceRef.APIVersion,
// },
// ObjectMeta: metav1.ObjectMeta{
// Name: obj.Spec.SourceRef.Name,
// Namespace: obj.Spec.SourceRef.Namespace,
// },
// }

// Finalise the reconciliation and report the results.
defer func() {
// Patch finalizers, status and conditions.
Expand All @@ -156,7 +168,8 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
time.Since(reconcileStart).String(),
obj.Spec.Interval.Duration.String())
log.Info(msg, "revision", obj.Status.LastAttemptedRevision)
r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityInfo, msg,
r.event(obj, &src, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityInfo,
eventv1.ActionReconciled, msg,
map[string]string{
kustomizev1.GroupVersion.Group + "/" + eventv1.MetaCommitStatusKey: eventv1.MetaCommitStatusUpdateValue,
})
Expand All @@ -165,7 +178,7 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques

// Prune managed resources if the object is under deletion.
if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
return r.finalize(ctx, obj)
return r.finalize(ctx, obj, &src)
}

// Add finalizer first if it doesn't exist to avoid the race condition
Expand All @@ -190,7 +203,7 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
conditions.MarkFalse(obj, meta.ReadyCondition, meta.InvalidCELExpressionReason, "%s", errMsg)
conditions.MarkStalled(obj, meta.InvalidCELExpressionReason, "%s", errMsg)
obj.Status.ObservedGeneration = obj.Generation
r.event(obj, "", "", eventv1.EventSeverityError, errMsg, nil)
r.event(obj, &src, "", "", eventv1.EventSeverityError, eventv1.ActionFailed, errMsg, nil)
return ctrl.Result{}, reconcile.TerminalError(err)
}

Expand All @@ -202,7 +215,7 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FeatureGateDisabledReason, msgFmt, gate)
conditions.MarkStalled(obj, meta.FeatureGateDisabledReason, msgFmt, gate)
log.Error(auth.ErrObjectLevelWorkloadIdentityNotEnabled, msg)
r.event(obj, "", "", eventv1.EventSeverityError, msg, nil)
r.event(obj, &src, "", "", eventv1.EventSeverityError, eventv1.ActionFailed, msg, nil)
return ctrl.Result{}, nil
}

Expand All @@ -220,7 +233,7 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
if acl.IsAccessDenied(err) {
conditions.MarkFalse(obj, meta.ReadyCondition, apiacl.AccessDeniedReason, "%s", err)
conditions.MarkStalled(obj, apiacl.AccessDeniedReason, "%s", err)
r.event(obj, "", "", eventv1.EventSeverityError, err.Error(), nil)
r.event(obj, &src, "", "", eventv1.EventSeverityError, eventv1.ActionFailed, err.Error(), nil)
return ctrl.Result{}, reconcile.TerminalError(err)
}

Expand All @@ -247,15 +260,15 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
conditions.MarkFalse(obj, meta.ReadyCondition, meta.InvalidCELExpressionReason, "%s", errMsg)
conditions.MarkStalled(obj, meta.InvalidCELExpressionReason, "%s", errMsg)
obj.Status.ObservedGeneration = obj.Generation
r.event(obj, revision, originRevision, eventv1.EventSeverityError, errMsg, nil)
r.event(obj, &artifactSource, revision, originRevision, eventv1.EventSeverityError, eventv1.ActionFailed, errMsg, nil)
return ctrl.Result{}, err
}

// Retry on transient errors.
conditions.MarkFalse(obj, meta.ReadyCondition, meta.DependencyNotReadyReason, "%s", err)
msg := fmt.Sprintf("Dependencies do not meet ready condition, retrying in %s", r.DependencyRequeueInterval.String())
log.Info(msg)
r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, msg, nil)
r.event(obj, &artifactSource, revision, originRevision, eventv1.EventSeverityInfo, eventv1.ActionWaiting, msg, nil)
return ctrl.Result{RequeueAfter: r.DependencyRequeueInterval}, nil
}
log.Info("All dependencies are ready, proceeding with reconciliation")
Expand All @@ -279,7 +292,7 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
meta.HealthCheckCanceledReason,
"New reconciliation triggered by %s/%s/%s", qes.Kind, qes.Namespace, qes.Name)
ctrl.LoggerFrom(ctx).Info("New reconciliation triggered, canceling health checks", "trigger", qes)
r.event(obj, revision, originRevision, eventv1.EventSeverityInfo,
r.event(obj, &src, revision, originRevision, eventv1.EventSeverityInfo, eventv1.ActionProgressing,
fmt.Sprintf("Health checks canceled due to new reconciliation triggered by %s/%s/%s",
qes.Kind, qes.Namespace, qes.Name), nil)
return ctrl.Result{}, nil
Expand All @@ -292,7 +305,7 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
obj.GetRetryInterval().String()),
"revision",
revision)
r.event(obj, revision, originRevision, eventv1.EventSeverityError,
r.event(obj, &src, revision, originRevision, eventv1.EventSeverityError, eventv1.ActionFailed,
reconcileErr.Error(), nil)
return ctrl.Result{RequeueAfter: obj.GetRetryInterval()}, nil
}
Expand Down Expand Up @@ -462,7 +475,7 @@ func (r *KustomizationReconciler) reconcile(
}

// Validate and apply resources in stages.
drifted, changeSetWithSkipped, err := r.apply(ctx, resourceManager, obj, revision, originRevision, objects)
drifted, changeSetWithSkipped, err := r.apply(ctx, resourceManager, obj, &src, revision, originRevision, objects)
if err != nil {
obj.Status.History.Upsert(checksum, time.Now(), time.Since(reconcileStart), meta.ReconciliationFailedReason, historyMeta)
conditions.MarkFalse(obj, meta.ReadyCondition, meta.ReconciliationFailedReason, "%s", err)
Expand Down Expand Up @@ -501,7 +514,7 @@ func (r *KustomizationReconciler) reconcile(
}

// Run garbage collection for stale resources that do not have pruning disabled.
if _, err := r.prune(ctx, resourceManager, obj, revision, originRevision, staleObjects); err != nil {
if _, err := r.prune(ctx, resourceManager, obj, &src, revision, originRevision, staleObjects); err != nil {
obj.Status.History.Upsert(checksum, time.Now(), time.Since(reconcileStart), meta.PruneFailedReason, historyMeta)
conditions.MarkFalse(obj, meta.ReadyCondition, meta.PruneFailedReason, "%s", err)
return err
Expand All @@ -513,6 +526,7 @@ func (r *KustomizationReconciler) reconcile(
resourceManager,
patcher,
obj,
&src,
revision,
originRevision,
isNewRevision,
Expand Down Expand Up @@ -836,6 +850,7 @@ func (r *KustomizationReconciler) build(ctx context.Context,
func (r *KustomizationReconciler) apply(ctx context.Context,
manager *ssa.ResourceManager,
obj *kustomizev1.Kustomization,
src *sourcev1.Source,
revision string,
originRevision string,
objects []*unstructured.Unstructured) (bool, *ssa.ChangeSet, error) {
Expand Down Expand Up @@ -960,7 +975,7 @@ func (r *KustomizationReconciler) apply(ctx context.Context,
// emit event only if the server-side apply resulted in changes
applyLog := strings.TrimSuffix(changeSetLog.String(), "\n")
if applyLog != "" {
r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, applyLog, nil)
r.event(obj, src, revision, originRevision, eventv1.EventSeverityInfo, eventv1.ActionApplied, applyLog, nil)
}

return applyLog != "", resultSet, nil
Expand All @@ -970,6 +985,7 @@ func (r *KustomizationReconciler) checkHealth(ctx context.Context,
manager *ssa.ResourceManager,
patcher *patch.SerialPatcher,
obj *kustomizev1.Kustomization,
src *sourcev1.Source,
revision string,
originRevision string,
isNewRevision bool,
Expand Down Expand Up @@ -1036,7 +1052,7 @@ func (r *KustomizationReconciler) checkHealth(ctx context.Context,
// Emit recovery event if the previous health check failed.
msg := fmt.Sprintf("Health check passed in %s", time.Since(checkStart).String())
if !wasHealthy || (isNewRevision && drifted) {
r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, msg, nil)
r.event(obj, src, revision, originRevision, eventv1.EventSeverityInfo, eventv1.ActionReconciled, msg, nil)
}

conditions.MarkTrue(obj, meta.HealthyCondition, meta.SucceededReason, "%s", msg)
Expand All @@ -1050,6 +1066,7 @@ func (r *KustomizationReconciler) checkHealth(ctx context.Context,
func (r *KustomizationReconciler) prune(ctx context.Context,
manager *ssa.ResourceManager,
obj *kustomizev1.Kustomization,
src *sourcev1.Source,
revision string,
originRevision string,
objects []*unstructured.Unstructured) (bool, error) {
Expand All @@ -1076,7 +1093,7 @@ func (r *KustomizationReconciler) prune(ctx context.Context,
// emit event only if the prune operation resulted in changes
if changeSet != nil && len(changeSet.Entries) > 0 {
log.Info(fmt.Sprintf("garbage collection completed: %s", changeSet.String()))
r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, changeSet.String(), nil)
r.event(obj, src, revision, originRevision, eventv1.EventSeverityInfo, eventv1.ActionDeleted, changeSet.String(), nil)
return true, nil
}

Expand Down Expand Up @@ -1114,7 +1131,7 @@ func finalizerShouldDeleteResources(obj *kustomizev1.Kustomization) bool {
// If the service account used for impersonation is no longer available or if a timeout occurs
// while waiting for resources to be terminated, an error is logged and the finalizer is removed.
func (r *KustomizationReconciler) finalize(ctx context.Context,
obj *kustomizev1.Kustomization) (ctrl.Result, error) {
obj *kustomizev1.Kustomization, src *sourcev1.Source) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
if finalizerShouldDeleteResources(obj) {
objects, _ := inventory.List(obj.Status.Inventory)
Expand Down Expand Up @@ -1164,14 +1181,16 @@ func (r *KustomizationReconciler) finalize(ctx context.Context,

changeSet, err := resourceManager.DeleteAll(ctx, objects, opts)
if err != nil {
r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityError, "pruning for deleted resource failed", nil)
r.event(obj, src, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision,
eventv1.EventSeverityError, eventv1.ActionFailed, "pruning for deleted resource failed", nil)
// Return the error so we retry the failed garbage collection
return ctrl.Result{}, err
}

if changeSet != nil && len(changeSet.Entries) > 0 {
// Emit event with the resources marked for deletion.
r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityInfo, changeSet.String(), nil)
r.event(obj, src, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision,
eventv1.EventSeverityInfo, eventv1.ActionDeleted, changeSet.String(), nil)

// Wait for the resources marked for deletion to be terminated.
if obj.GetDeletionPolicy() == kustomizev1.DeletionPolicyWaitForTermination {
Expand All @@ -1182,15 +1201,17 @@ func (r *KustomizationReconciler) finalize(ctx context.Context,
// Emit an event and log the error if a timeout occurs.
msg := "failed to wait for resources termination"
log.Error(err, msg)
r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityError, msg, nil)
r.event(obj, src, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision,
eventv1.EventSeverityError, eventv1.ActionFailed, msg, nil)
}
}
}
} else {
// when the account to impersonate is gone, log the stale objects and continue with the finalization
msg := fmt.Sprintf("unable to prune objects: \n%s", ssautil.FmtUnstructuredList(objects))
log.Error(fmt.Errorf("skiping pruning, failed to find account to impersonate"), msg)
r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityError, msg, nil)
r.event(obj, src, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision,
eventv1.EventSeverityError, eventv1.ActionFailed, msg, nil)
}
}

Expand All @@ -1207,7 +1228,9 @@ func (r *KustomizationReconciler) finalize(ctx context.Context,
}

func (r *KustomizationReconciler) event(obj *kustomizev1.Kustomization,
revision, originRevision, severity, msg string,
src *sourcev1.Source,
revision, originRevision, severity, action string,
msg string,
metadata map[string]string) {
if metadata == nil {
metadata = map[string]string{}
Expand All @@ -1229,7 +1252,7 @@ func (r *KustomizationReconciler) event(obj *kustomizev1.Kustomization,
eventType = corev1.EventTypeWarning
}

r.EventRecorder.AnnotatedEventf(obj, metadata, eventType, reason, msg)
r.EventRecorder.AnnotatedEventf(obj, *src, metadata, eventType, reason, action, "%s", msg)
}

func (r *KustomizationReconciler) finalizeStatus(ctx context.Context,
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/kustomization_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (
"time"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/events"
sourcev1 "github.com/fluxcd/source-controller/api/v1"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -130,7 +130,7 @@ func TestKustomizationReconciler_deleteBeforeFinalizer(t *testing.T) {

r := &KustomizationReconciler{
Client: k8sClient,
EventRecorder: record.NewFakeRecorder(32),
EventRecorder: &events.Recorder{EventRecorder: testEnv.GetEventRecorder("kustomization-controller")},
}
// NOTE: Only a real API server responds with an error in this scenario.
_, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: client.ObjectKeyFromObject(kustomization)})
Expand Down
3 changes: 2 additions & 1 deletion internal/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/fluxcd/pkg/runtime/conditions"
kcheck "github.com/fluxcd/pkg/runtime/conditions/check"
"github.com/fluxcd/pkg/runtime/controller"
"github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/pkg/runtime/metrics"
"github.com/fluxcd/pkg/runtime/testenv"
"github.com/fluxcd/pkg/testserver"
Expand Down Expand Up @@ -179,7 +180,7 @@ func TestMain(m *testing.M) {
Client: testEnv,
Mapper: testEnv.GetRESTMapper(),
APIReader: testEnv,
EventRecorder: testEnv.GetEventRecorderFor(controllerName),
EventRecorder: &events.Recorder{EventRecorder: testEnv.GetEventRecorder(controllerName)},
Metrics: testMetricsH,
DependencyRequeueInterval: 2 * time.Second,
ConcurrentSSA: 4,
Expand Down