Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,4 @@ node_modules/
.cursor
.envrc
mise.toml
/PLAN.md
2 changes: 1 addition & 1 deletion charts/pulsar-resources-operator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ kubectl apply -f https://raw.githubusercontent.com/streamnative/pulsar-resources
|-----|------|---------|-------------|
| affinity | object | `{}` | Add affinity for pod |
| annotations | object | `{}` | Add annotations for the deployment |
| features.alwaysUpdatePulsarResource | bool | `false` | |
| features.alwaysUpdatePulsarResource | bool | `false` | Re-apply observed managed Pulsar resources even when their Kubernetes resources are already Ready. Prefer temporary use for upgrade remediation because it increases Pulsar admin API load on reconciliations and resyncs. |
| fullnameOverride | string | `""` | It will override the name of deployment |
| image.manager.registry | string | `"docker.io"` | Specififies the registry of images, especially when user want to use a different image hub |
| image.manager.repository | string | `"streamnative/pulsar-resources-operator"` | The full repo name for image. |
Expand Down
2 changes: 1 addition & 1 deletion charts/pulsar-resources-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ imagePullSecrets: []
# - name: test

features:
# Enable to force always sync k8s resource status to pulsar.
# -- Re-apply observed managed Pulsar resources even when their Kubernetes resources are already Ready. Prefer temporary use for upgrade remediation because it increases Pulsar admin API load on reconciliations and resyncs.
alwaysUpdatePulsarResource: false
# resyncPeriod determines the minimum frequency at which watched resources are reconciled. The unit is hour, default value is 10 hours.
resyncPeriod: 10
Expand Down
2 changes: 2 additions & 0 deletions docs/pulsar_namespace.md
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,8 @@ test-pulsar-namespace test-tenant/testns 1 1

You can update the namespace policies by editing the `namespace.yaml` file and then applying it again using `kubectl apply -f namespace.yaml`. This allows you to modify various settings of the Pulsar namespace.

If a namespace was already `Ready=True` before an operator upgrade introduced a new spec field, that new field may not be applied until the resource is reconciled again. See [Pulsar resource lifecycle reconciliation skip behavior](pulsar_resource_lifecycle.md#reconciliation-skip-behavior) for the skip contract and recovery options, including temporary use of `ALWAYS_UPDATE_PULSAR_RESOURCE`.

Please note the following important points:

1. The fields `name` and `bundles` cannot be updated after the namespace is created. These are immutable properties of the namespace.
Expand Down
12 changes: 12 additions & 0 deletions docs/pulsar_resource_lifecycle.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,18 @@ When you need to delete the actual Pulsar resource (tenant, namespace, or topic)

Always ensure you have the necessary permissions and have considered the implications of deleting resources before proceeding with any deletion operation.

## Reconciliation Skip Behavior

For normal steady-state operation, the operator skips applying Pulsar API changes for a managed child resource when its Kubernetes status already has `Ready=True` and `status.observedGeneration` matches `metadata.generation`. This avoids unnecessary Pulsar admin requests during resyncs.

After upgrading the operator, a new spec field may be introduced while existing custom resources remain `Ready=True` at the same generation. In that case, the new field is not applied to Pulsar until the resource is reconciled again. Recovery options are:

1. Update the custom resource spec or metadata so Kubernetes increments the resource generation, then wait for `Ready=True` again.
2. Temporarily enable `ALWAYS_UPDATE_PULSAR_RESOURCE=true` (Helm: `features.alwaysUpdatePulsarResource=true`) so the operator re-applies observed managed child resources even when they are already Ready.
3. Disable `ALWAYS_UPDATE_PULSAR_RESOURCE` after remediation unless continuous re-application is intentionally required.

Use the always-update option carefully. It can apply all observed managed resources on every reconciliation or resync and may increase Pulsar broker/admin API load. The `PulsarConnection` deletion guard is still preserved: a deleting connection is kept until its remaining managed child resources are removed.

## Changing the Policy

You can change the policy of a Pulsar resource by updating the `lifecyclePolicy` field in the corresponding Kubernetes custom resource. However, there are important considerations to keep in mind when changing the policy:
Expand Down
243 changes: 243 additions & 0 deletions go.sum

Large diffs are not rendered by default.

54 changes: 44 additions & 10 deletions pkg/connection/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"

"github.com/go-logr/logr"
"github.com/streamnative/pulsar-resources-operator/pkg/feature"
"github.com/streamnative/pulsar-resources-operator/pkg/utils"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -130,27 +132,34 @@ func (r *PulsarConnectionReconciler) Reconcile(ctx context.Context) error {
len(r.namespaces), "topics", len(r.topics), "geo", len(r.geoReplications))
msg := fmt.Sprintf("remaining resources: tenants [%d], namespaces [%d], topics [%d], geoReplications [%d]",
len(r.tenants), len(r.namespaces), len(r.topics), len(r.geoReplications))
originalStatus := r.connection.Status.DeepCopy()
meta.SetStatusCondition(&r.connection.Status.Conditions, *NewErrorCondition(r.connection.Generation, msg))
if err := r.client.Status().Update(ctx, r.connection); err != nil {
return err
}
return nil
return r.updateConnectionStatusIfChanged(ctx, originalStatus)
}
return nil
}
log.Info("Doesn't have associated unready resource, reconcile completed")
return nil
if !r.shouldReconcileReadyResources() {
log.Info("Doesn't have associated unready resource, reconcile completed")
return nil
}
log.Info("AlwaysUpdatePulsarResource is enabled; reconciling ready pulsar resources")
}
Comment thread
freeznet marked this conversation as resolved.
log.Info("Reconciling pulsar resources", "resources", r.unreadyResources)

connectionChanged := false
if r.connection.Spec.AdminServiceURL == "" && r.connection.Spec.AdminServiceSecureURL != "" {
r.connection.Spec.AdminServiceURL = r.connection.Spec.AdminServiceSecureURL
connectionChanged = true
}

// TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer
controllerutil.AddFinalizer(r.connection, resourcev1alpha1.FinalizerName)
if err := r.client.Update(ctx, r.connection); err != nil {
return err
if controllerutil.AddFinalizer(r.connection, resourcev1alpha1.FinalizerName) {
connectionChanged = true
}
if connectionChanged {
if err := r.client.Update(ctx, r.connection); err != nil {
return err
}
}

pulsarConfig, err := r.MakePulsarAdminConfig(ctx)
Expand Down Expand Up @@ -204,6 +213,7 @@ func (r *PulsarConnectionReconciler) Reconcile(ctx context.Context) error {
return fmt.Errorf("PulsarConnectionReconciler Reconcile errors: %v", errs)
}

originalStatus := r.connection.Status.DeepCopy()
auth := r.connection.Spec.Authentication
if auth != nil && auth.Token != nil && auth.Token.SecretRef != nil {
// calculate secret key hash
Expand Down Expand Up @@ -237,17 +247,41 @@ func (r *PulsarConnectionReconciler) Reconcile(ctx context.Context) error {
}
r.connection.Status.ObservedGeneration = r.connection.Generation
meta.SetStatusCondition(&r.connection.Status.Conditions, *NewReadyCondition(r.connection.Generation))
if err := r.client.Status().Update(ctx, r.connection); err != nil {
if err := r.updateConnectionStatusIfChanged(ctx, originalStatus); err != nil {
return err
}

return nil
}

func (r *PulsarConnectionReconciler) updateConnectionStatusIfChanged(ctx context.Context, originalStatus *resourcev1alpha1.PulsarConnectionStatus) error {
if equality.Semantic.DeepEqual(*originalStatus, r.connection.Status) {
return nil
}
return r.client.Status().Update(ctx, r.connection)
}

func (r *PulsarConnectionReconciler) hasUnreadyResource() bool {
return len(r.unreadyResources) > 0
}

func (r *PulsarConnectionReconciler) shouldReconcileReadyResources() bool {
return feature.DefaultFeatureGate.Enabled(feature.AlwaysUpdatePulsarResource) && r.hasObservedResource()
}

func (r *PulsarConnectionReconciler) hasObservedResource() bool {
return len(r.tenants) > 0 ||
len(r.namespaces) > 0 ||
len(r.topics) > 0 ||
len(r.permissions) > 0 ||
len(r.geoReplications) > 0 ||
len(r.packages) > 0 ||
len(r.sinks) > 0 ||
len(r.sources) > 0 ||
len(r.functions) > 0 ||
len(r.nsIsolationPolicies) > 0
}

func (r *PulsarConnectionReconciler) addUnreadyResource(obj reconciler.Object) {
if len(r.unreadyResources) == 30 {
// avoid add too many unready resources
Expand Down
Loading
Loading