Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ rules:
- postgresql.cnpg.io
resources:
- backups
- clusters
verbs:
- get
- list
Expand Down
2 changes: 2 additions & 0 deletions internal/cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func NewCmd() *cobra.Command {
_ = viper.BindPFlag("server-address", cmd.Flags().Lookup("server-address"))

_ = viper.BindEnv("sidecar-image", "SIDECAR_IMAGE")
_ = viper.BindEnv("custom-cnpg-group", "CUSTOM_CNPG_GROUP")
_ = viper.BindEnv("custom-cnpg-version", "CUSTOM_CNPG_VERSION")

return cmd
}
2 changes: 2 additions & 0 deletions internal/cmd/restore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func NewCmd() *cobra.Command {
_ = viper.BindEnv("pod-name", "POD_NAME")
_ = viper.BindEnv("pgdata", "PGDATA")
_ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY")
_ = viper.BindEnv("custom-cnpg-group", "CUSTOM_CNPG_GROUP")
_ = viper.BindEnv("custom-cnpg-version", "CUSTOM_CNPG_VERSION")

return cmd
}
24 changes: 2 additions & 22 deletions internal/cnpgi/instance/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,15 @@ import (
"github.com/spf13/viper"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/scheme"

barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
extendedclient "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/instance/internal/client"
pluginscheme "github.com/cloudnative-pg/plugin-barman-cloud/internal/scheme"
)

// Start starts the sidecar informers and CNPG-i server
Expand Down Expand Up @@ -127,26 +126,7 @@ func generateScheme(ctx context.Context) *runtime.Scheme {

utilruntime.Must(barmancloudv1.AddToScheme(result))
utilruntime.Must(clientgoscheme.AddToScheme(result))

cnpgGroup := viper.GetString("custom-cnpg-group")
cnpgVersion := viper.GetString("custom-cnpg-version")
if len(cnpgGroup) == 0 {
cnpgGroup = cnpgv1.SchemeGroupVersion.Group
}
if len(cnpgVersion) == 0 {
cnpgVersion = cnpgv1.SchemeGroupVersion.Version
}

// Proceed with custom registration of the CNPG scheme
schemeGroupVersion := schema.GroupVersion{Group: cnpgGroup, Version: cnpgVersion}
schemeBuilder := &scheme.Builder{GroupVersion: schemeGroupVersion}
schemeBuilder.Register(&cnpgv1.Cluster{}, &cnpgv1.ClusterList{})
schemeBuilder.Register(&cnpgv1.Backup{}, &cnpgv1.BackupList{})
schemeBuilder.Register(&cnpgv1.ScheduledBackup{}, &cnpgv1.ScheduledBackupList{})
utilruntime.Must(schemeBuilder.AddToScheme(result))

schemeLog := log.FromContext(ctx)
schemeLog.Info("CNPG types registration", "schemeGroupVersion", schemeGroupVersion)
pluginscheme.AddCNPGToScheme(ctx, result)

return result
}
19 changes: 13 additions & 6 deletions internal/cnpgi/operator/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"crypto/tls"

// +kubebuilder:scaffold:imports
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/cloudnative-pg/machinery/pkg/log"
"github.com/spf13/viper"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -38,25 +37,33 @@ import (

barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/controller"
pluginscheme "github.com/cloudnative-pg/plugin-barman-cloud/internal/scheme"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
)

var scheme = runtime.NewScheme()
// generateScheme creates a runtime.Scheme with all type definitions
// needed by the operator. CNPG types are registered under a
// configurable API group to support custom CNPG-based operators.
func generateScheme(ctx context.Context) *runtime.Scheme {
result := runtime.NewScheme()

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(barmancloudv1.AddToScheme(scheme))
utilruntime.Must(cnpgv1.AddToScheme(scheme))
utilruntime.Must(clientgoscheme.AddToScheme(result))
utilruntime.Must(barmancloudv1.AddToScheme(result))
pluginscheme.AddCNPGToScheme(ctx, result)
// +kubebuilder:scaffold:scheme

return result
}

// Start starts the manager
func Start(ctx context.Context) error {
setupLog := log.FromContext(ctx)

scheme := generateScheme(ctx)

var tlsOpts []func(*tls.Config)

// if the enable-http2 flag is false (the default), http/2 should be disabled
Expand Down
58 changes: 0 additions & 58 deletions internal/cnpgi/operator/ownership.go

This file was deleted.

158 changes: 158 additions & 0 deletions internal/cnpgi/operator/rbac/ensure.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
Copyright © contributors to CloudNativePG, established as
CloudNativePG a Series of LF Projects, LLC.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

SPDX-License-Identifier: Apache-2.0
*/

// Package rbac contains utilities to reconcile RBAC resources
// for the barman-cloud plugin.
package rbac

import (
"context"

cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/cloudnative-pg/machinery/pkg/log"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/operator/specs"
)

// EnsureRole ensures the RBAC Role for the given Cluster matches
// the desired state derived from the given ObjectStores. On creation,
// the Cluster is set as the owner of the Role for garbage collection.
//
// This function is called from both the Pre hook (gRPC) and the
// ObjectStore controller. To handle concurrent modifications
// gracefully, AlreadyExists on Create and Conflict on Patch are
// retried once rather than returned as errors.
func EnsureRole(
ctx context.Context,
c client.Client,
cluster *cnpgv1.Cluster,
barmanObjects []barmancloudv1.ObjectStore,
) error {
newRole := specs.BuildRole(cluster, barmanObjects)

roleKey := client.ObjectKey{
Namespace: newRole.Namespace,
Name: newRole.Name,
}

var role rbacv1.Role
err := c.Get(ctx, roleKey, &role)

switch {
case apierrs.IsNotFound(err):
role, err := createRole(ctx, c, cluster, newRole)
if err != nil {
return err
}
if role == nil {
// Created successfully, nothing else to do.
return nil
}
// AlreadyExists: fall through to patch with the re-read role.
return patchRoleRules(ctx, c, newRole.Rules, role)

case err != nil:
return err

default:
return patchRoleRules(ctx, c, newRole.Rules, &role)
}
}

// createRole attempts to create the Role. If another writer created
// it concurrently (AlreadyExists), it re-reads and returns the
// existing Role for the caller to patch. On success it returns nil.
func createRole(
ctx context.Context,
c client.Client,
cluster *cnpgv1.Cluster,
newRole *rbacv1.Role,
) (*rbacv1.Role, error) {
contextLogger := log.FromContext(ctx)

if err := controllerutil.SetControllerReference(cluster, newRole, c.Scheme()); err != nil {
return nil, err
}

contextLogger.Info("Creating role",
"name", newRole.Name, "namespace", newRole.Namespace)

createErr := c.Create(ctx, newRole)
if createErr == nil {
return nil, nil
}
if !apierrs.IsAlreadyExists(createErr) {
return nil, createErr
}

contextLogger.Info("Role was created concurrently, checking rules")

var role rbacv1.Role
if err := c.Get(ctx, client.ObjectKeyFromObject(newRole), &role); err != nil {
return nil, err
}

return &role, nil
}

// patchRoleRules patches the Role's rules if they differ from the
// desired state. On Conflict (concurrent modification), it retries
// once with a fresh read.
func patchRoleRules(
ctx context.Context,
c client.Client,
desiredRules []rbacv1.PolicyRule,
role *rbacv1.Role,
) error {
if equality.Semantic.DeepEqual(desiredRules, role.Rules) {
return nil
}

contextLogger := log.FromContext(ctx)
contextLogger.Info("Patching role",
"name", role.Name, "namespace", role.Namespace, "rules", desiredRules)

oldRole := role.DeepCopy()
role.Rules = desiredRules

patchErr := c.Patch(ctx, role, client.MergeFrom(oldRole))
if patchErr == nil || !apierrs.IsConflict(patchErr) {
return patchErr
}

// Conflict: re-read and retry once.
contextLogger.Info("Role was modified concurrently, retrying patch")
if err := c.Get(ctx, client.ObjectKeyFromObject(role), role); err != nil {
return err
}
if equality.Semantic.DeepEqual(desiredRules, role.Rules) {
return nil
}

oldRole = role.DeepCopy()
role.Rules = desiredRules

return c.Patch(ctx, role, client.MergeFrom(oldRole))
}
Loading
Loading