Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/operator-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ func (c *boxcutterReconcilerConfigurator) Configure(ceReconciler *controllers.Cl
controllers.HandleFinalizers(c.finalizers),
controllers.MigrateStorage(storageMigrator),
controllers.RetrieveRevisionStates(revisionStatesGetter),
controllers.ResolveBundle(c.resolver),
controllers.ResolveBundle(c.resolver, c.mgr.GetClient()),
controllers.UnpackBundle(c.imagePuller, c.imageCache),
controllers.ApplyBundleWithBoxcutter(appl.Apply),
}
Expand Down Expand Up @@ -742,7 +742,7 @@ func (c *helmReconcilerConfigurator) Configure(ceReconciler *controllers.Cluster
ceReconciler.ReconcileSteps = []controllers.ReconcileStepFunc{
controllers.HandleFinalizers(c.finalizers),
controllers.RetrieveRevisionStates(revisionStatesGetter),
controllers.ResolveBundle(c.resolver),
controllers.ResolveBundle(c.resolver, c.mgr.GetClient()),
controllers.UnpackBundle(c.imagePuller, c.imageCache),
controllers.ApplyBundle(appl),
}
Expand Down
46 changes: 31 additions & 15 deletions internal/operator-controller/applier/boxcutter.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,21 +303,37 @@ func (bc *Boxcutter) createOrUpdate(ctx context.Context, obj client.Object) erro
return bc.Client.Patch(ctx, obj, client.Apply, client.FieldOwner(bc.FieldOwner), client.ForceOwnership)
}

func (bc *Boxcutter) Apply(ctx context.Context, contentFS fs.FS, ext *ocv1.ClusterExtension, objectLabels, revisionAnnotations map[string]string) error {
// Generate desired revision
desiredRevision, err := bc.RevisionGenerator.GenerateRevision(ctx, contentFS, ext, objectLabels, revisionAnnotations)
func (bc *Boxcutter) Apply(ctx context.Context, contentFS fs.FS, ext *ocv1.ClusterExtension, objectLabels, revisionAnnotations map[string]string) (bool, string, error) {
// Note: We list revisions first (before checking contentFS) because we need the revision list
// to determine if we can fall back when contentFS is nil. If the API call fails here,
// it indicates a serious cluster connectivity issue, and we should not proceed even in fallback mode
// since the ClusterExtensionRevision controller also requires API access to maintain resources.
existingRevisions, err := bc.getExistingRevisions(ctx, ext.GetName())
if err != nil {
return err
return false, "", err
}

if err := controllerutil.SetControllerReference(ext, desiredRevision, bc.Scheme); err != nil {
return fmt.Errorf("set ownerref: %w", err)
// If contentFS is nil, we're maintaining the current state without catalog access.
// In this case, we should use the existing installed revision without generating a new one.
if contentFS == nil {
if len(existingRevisions) == 0 {
return false, "", fmt.Errorf("cannot maintain workload: no catalog content available and no previously installed revision found")
}
// Returning true here signals that the rollout has succeeded using the current revision. The
// ClusterExtensionRevision controller will continue to reconcile, apply, and maintain the
// resources defined in that revision via Server-Side Apply, ensuring the workload keeps running
// even when catalog access (and thus new revision content) is unavailable.
return true, "", nil
}

// List all existing revisions
existingRevisions, err := bc.getExistingRevisions(ctx, ext.GetName())
// Generate desired revision
desiredRevision, err := bc.RevisionGenerator.GenerateRevision(ctx, contentFS, ext, objectLabels, revisionAnnotations)
if err != nil {
return err
return false, "", err
}

if err := controllerutil.SetControllerReference(ext, desiredRevision, bc.Scheme); err != nil {
return false, "", fmt.Errorf("set ownerref: %w", err)
}

currentRevision := &ocv1.ClusterExtensionRevision{}
Expand All @@ -339,7 +355,7 @@ func (bc *Boxcutter) Apply(ctx context.Context, contentFS fs.FS, ext *ocv1.Clust
// inplace patch was successful, no changes in phases
state = StateUnchanged
default:
return fmt.Errorf("patching %s Revision: %w", desiredRevision.Name, err)
return false, "", fmt.Errorf("patching %s Revision: %w", desiredRevision.Name, err)
}
}

Expand All @@ -353,7 +369,7 @@ func (bc *Boxcutter) Apply(ctx context.Context, contentFS fs.FS, ext *ocv1.Clust
case StateNeedsInstall:
err := preflight.Install(ctx, plainObjs)
if err != nil {
return err
return false, "", err
}
// TODO: jlanford's IDE says that "StateNeedsUpgrade" condition is always true, but
// it isn't immediately obvious why that is. Perhaps len(existingRevisions) is
Expand All @@ -362,7 +378,7 @@ func (bc *Boxcutter) Apply(ctx context.Context, contentFS fs.FS, ext *ocv1.Clust
case StateNeedsUpgrade:
err := preflight.Upgrade(ctx, plainObjs)
if err != nil {
return err
return false, "", err
}
}
}
Expand All @@ -376,15 +392,15 @@ func (bc *Boxcutter) Apply(ctx context.Context, contentFS fs.FS, ext *ocv1.Clust
desiredRevision.Spec.Revision = revisionNumber

if err = bc.garbageCollectOldRevisions(ctx, prevRevisions); err != nil {
return fmt.Errorf("garbage collecting old revisions: %w", err)
return false, "", fmt.Errorf("garbage collecting old revisions: %w", err)
}

if err := bc.createOrUpdate(ctx, desiredRevision); err != nil {
return fmt.Errorf("creating new Revision: %w", err)
return false, "", fmt.Errorf("creating new Revision: %w", err)
}
}

return nil
return true, "", nil
}

// garbageCollectOldRevisions deletes archived revisions beyond ClusterExtensionRevisionRetentionLimit.
Expand Down
6 changes: 5 additions & 1 deletion internal/operator-controller/applier/boxcutter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,14 +926,18 @@ func TestBoxcutter_Apply(t *testing.T) {
labels.PackageNameKey: "test-package",
}
}
err := boxcutter.Apply(t.Context(), testFS, ext, nil, revisionAnnotations)
completed, status, err := boxcutter.Apply(t.Context(), testFS, ext, nil, revisionAnnotations)

// Assert
if tc.expectedErr != "" {
require.Error(t, err)
assert.Contains(t, err.Error(), tc.expectedErr)
assert.False(t, completed)
assert.Empty(t, status)
} else {
require.NoError(t, err)
assert.True(t, completed)
assert.Empty(t, status)
}

if tc.validate != nil {
Expand Down
66 changes: 66 additions & 0 deletions internal/operator-controller/applier/helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,16 @@ func (h *Helm) runPreAuthorizationChecks(ctx context.Context, ext *ocv1.ClusterE
}

func (h *Helm) Apply(ctx context.Context, contentFS fs.FS, ext *ocv1.ClusterExtension, objectLabels map[string]string, storageLabels map[string]string) (bool, string, error) {
// If contentFS is nil, we're maintaining the current state without catalog access.
// In this case, reconcile the existing Helm release if it exists.
if contentFS == nil {
ac, err := h.ActionClientGetter.ActionClientFor(ctx, ext)
if err != nil {
return false, "", err
}
return h.reconcileExistingRelease(ctx, ac, ext)
}

chrt, err := h.buildHelmChart(contentFS, ext)
if err != nil {
return false, "", err
Expand Down Expand Up @@ -197,6 +207,62 @@ func (h *Helm) Apply(ctx context.Context, contentFS fs.FS, ext *ocv1.ClusterExte
return true, "", nil
}

// reconcileExistingRelease reconciles an existing Helm release without catalog access.
// This is used when the catalog is unavailable but we need to maintain the current installation.
// It reconciles the release to actively maintain resources, and sets up watchers for monitoring/observability.
func (h *Helm) reconcileExistingRelease(ctx context.Context, ac helmclient.ActionInterface, ext *ocv1.ClusterExtension) (bool, string, error) {
rel, err := ac.Get(ext.GetName())
if errors.Is(err, driver.ErrReleaseNotFound) {
return false, "", fmt.Errorf("cannot maintain workload: no catalog content available and no previously installed Helm release found")
}
if err != nil {
return false, "", fmt.Errorf("getting current release: %w", err)
}

// Reconcile the existing release to ensure resources are maintained
if err := ac.Reconcile(rel); err != nil {
// Reconcile failed - resources NOT maintained
// Return false (rollout failed) with error
return false, "", err
}

// At this point: Reconcile succeeded - resources ARE maintained
// The operations below are for setting up monitoring (watches).
// If they fail, the resources are still successfully reconciled and maintained,
// so we return true (rollout succeeded) and log the watch error instead of returning it.
logger := klog.FromContext(ctx)

relObjects, err := util.ManifestObjects(strings.NewReader(rel.Manifest), fmt.Sprintf("%s-release-manifest", rel.Name))
if err != nil {
logger.Error(err, "failed to parse manifest objects for watching, resources are maintained but not being watched")
return true, "", nil
}

logger.Info("watching managed objects")

// Defensive nil checks to prevent panics if Manager or Watcher not properly initialized
if h.Manager == nil {
logger.Error(nil, "manager is nil, cannot set up watches (resources are maintained but not being watched)")
return true, "", nil
}
cache, err := h.Manager.Get(ctx, ext)
if err != nil {
logger.Error(err, "failed to get managed content cache, resources are maintained but not being watched")
return true, "", nil
}

if h.Watcher == nil {
logger.Error(nil, "watcher is nil, cannot set up watches (resources are maintained but not being watched)")
return true, "", nil
}
if err := cache.Watch(ctx, h.Watcher, relObjects...); err != nil {
logger.Error(err, "failed to set up watches on managed objects, resources are maintained but not being watched")
return true, "", nil
}

return true, "", nil
}

func (h *Helm) buildHelmChart(bundleFS fs.FS, ext *ocv1.ClusterExtension) (*chart.Chart, error) {
if h.HelmChartProvider == nil {
return nil, errors.New("HelmChartProvider is nil")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func MigrateStorage(m StorageMigrator) ReconcileStepFunc {
}
}

func ApplyBundleWithBoxcutter(apply func(ctx context.Context, contentFS fs.FS, ext *ocv1.ClusterExtension, objectLabels, revisionAnnotations map[string]string) error) ReconcileStepFunc {
func ApplyBundleWithBoxcutter(apply func(ctx context.Context, contentFS fs.FS, ext *ocv1.ClusterExtension, objectLabels, revisionAnnotations map[string]string) (bool, string, error)) ReconcileStepFunc {
return func(ctx context.Context, state *reconcileState, ext *ocv1.ClusterExtension) (*ctrl.Result, error) {
l := log.FromContext(ctx)
revisionAnnotations := map[string]string{
Expand All @@ -109,7 +109,8 @@ func ApplyBundleWithBoxcutter(apply func(ctx context.Context, contentFS fs.FS, e
}

l.Info("applying bundle contents")
if err := apply(ctx, state.imageFS, ext, objLbls, revisionAnnotations); err != nil {
_, _, err := apply(ctx, state.imageFS, ext, objLbls, revisionAnnotations)
if err != nil {
// If there was an error applying the resolved bundle,
// report the error via the Progressing condition.
setStatusProgressing(ext, wrapErrorWithResolutionInfo(state.resolvedRevisionMetadata.BundleMetadata, err))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ func TestApplyBundleWithBoxcutter(t *testing.T) {
imageFS: fstest.MapFS{},
}

stepFunc := ApplyBundleWithBoxcutter(func(_ context.Context, _ fs.FS, _ *ocv1.ClusterExtension, _, _ map[string]string) error {
return nil
stepFunc := ApplyBundleWithBoxcutter(func(_ context.Context, _ fs.FS, _ *ocv1.ClusterExtension, _, _ map[string]string) (bool, string, error) {
return true, "", nil
})
result, err := stepFunc(ctx, state, ext)
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
)

func TestClusterExtensionSourceConfig(t *testing.T) {
sourceTypeEmptyError := "Invalid value: null"
// NOTE: Kubernetes validation error format for JSON null values varies across K8s versions.
// We check for the common part "Invalid value:" which appears in all versions.
sourceTypeEmptyError := "Invalid value:"
sourceTypeMismatchError := "spec.source.sourceType: Unsupported value"
sourceConfigInvalidError := "spec.source: Invalid value"
// unionField represents the required Catalog or (future) Bundle field required by SourceConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ func (r *ClusterExtensionReconciler) Reconcile(ctx context.Context, req ctrl.Req

// ensureAllConditionsWithReason checks that all defined condition types exist in the given ClusterExtension,
// and assigns a specified reason and custom message to any missing condition.
//
//nolint:unparam // reason parameter is designed to be flexible, even if current callers use the same value
func ensureAllConditionsWithReason(ext *ocv1.ClusterExtension, reason v1alpha1.ConditionReason, message string) {
for _, condType := range conditionsets.ConditionTypes {
cond := apimeta.FindStatusCondition(ext.Status.Conditions, condType)
Expand Down
Loading
Loading