Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 41 additions & 7 deletions pkg/application/inject/fuse/mount_point_script.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@

"github.com/fluid-cloudnative/fluid/pkg/application/inject/fuse/mutator"
"github.com/fluid-cloudnative/fluid/pkg/application/inject/fuse/poststart"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/retry"
)

func (s *Injector) injectCheckMountReadyScript(podSpecs *mutator.MutatingPodSpecs, runtimeInfos map[string]base.RuntimeInfoInterface) error {
Expand Down Expand Up @@ -127,28 +129,60 @@
return nil
}

func (s *Injector) ensureScriptConfigMapExists(namespace string) (*poststart.ScriptGeneratorForApp, error) {

Check failure on line 132 in pkg/application/inject/fuse/mount_point_script.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 25 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=fluid-cloudnative_fluid&issues=AZ0el1Lzf5Uwloyfk-r4&open=AZ0el1Lzf5Uwloyfk-r4&pullRequest=5697
appScriptGen := poststart.NewScriptGeneratorForApp(namespace)

cm := appScriptGen.BuildConfigmap()
cmFound, err := kubeclient.IsConfigMapExist(s.client, cm.Name, cm.Namespace)
cmKey := fmt.Sprintf("%s/%s", cm.Namespace, cm.Name)

existingCM, err := kubeclient.GetConfigmapByName(s.client, cm.Name, cm.Namespace)
if err != nil {
s.log.Error(err, "error when checking configMap's existence", "cm.Name", cm.Name, "cm.Namespace", cm.Namespace)
s.log.Error(err, "error when getting configMap", "cm.Name", cm.Name, "cm.Namespace", cm.Namespace)

Check failure on line 140 in pkg/application/inject/fuse/mount_point_script.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "cm.Name" 3 times.

See more on https://sonarcloud.io/project/issues?id=fluid-cloudnative_fluid&issues=AZz_EJhotAxTmA_RXEqv&open=AZz_EJhotAxTmA_RXEqv&pullRequest=5697

Check failure on line 140 in pkg/application/inject/fuse/mount_point_script.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "cm.Namespace" 3 times.

See more on https://sonarcloud.io/project/issues?id=fluid-cloudnative_fluid&issues=AZz_EJhotAxTmA_RXEqu&open=AZz_EJhotAxTmA_RXEqu&pullRequest=5697
return nil, err
}

cmKey := fmt.Sprintf("%s/%s", cm.Namespace, cm.Name)
s.log.V(1).Info("after check configMap existence", "configMap", cmKey, "existence", cmFound)
if !cmFound {
if existingCM == nil {
// ConfigMap does not exist, create it
s.log.V(1).Info("configMap not found, creating", "configMap", cmKey)
err = s.client.Create(context.TODO(), cm)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will spam logs at scale. Use V(1) like the line above?

if err != nil {
if otherErr := utils.IgnoreAlreadyExists(err); otherErr != nil {
s.log.Error(err, "error when creating new configMap", "cm.Name", cm.Name, "cm.Namespace", cm.Namespace)
return nil, err
} else {
s.log.V(1).Info("configmap already exists, skip", "configMap", cmKey)
}
s.log.V(1).Info("configmap already exists (concurrent creation), skip", "configMap", cmKey)
}
return appScriptGen, nil
}

// ConfigMap exists, check if the script SHA256 annotation matches; update with retry on conflict.
currentSHA256 := appScriptGen.GetScriptSHA256()
if err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
latest, getErr := kubeclient.GetConfigmapByName(s.client, cm.Name, cm.Namespace)
if getErr != nil {
return getErr
}
if latest == nil {
// Deleted between Get calls; recreate
return s.client.Create(context.TODO(), cm)
}
if latest.Annotations != nil {
if annotationSHA256, ok := latest.Annotations[common.AnnotationCheckMountScriptSHA256]; ok && annotationSHA256 == currentSHA256 {
s.log.V(1).Info("configmap script is up-to-date, skip update", "configMap", cmKey)
return nil
}
}
// SHA256 mismatch or annotation missing: update the ConfigMap with latest script and SHA256
s.log.Info("configmap script SHA256 mismatch or annotation missing, updating", "configMap", cmKey, "expectedSHA256", currentSHA256)
latest.Data = cm.Data
if latest.Annotations == nil {
latest.Annotations = map[string]string{}
}
latest.Annotations[common.AnnotationCheckMountScriptSHA256] = currentSHA256
return s.client.Update(context.TODO(), latest)
}); err != nil {
s.log.Error(err, "error when ensuring configMap is up-to-date", "cm.Name", cm.Name, "cm.Namespace", cm.Namespace)
return nil, err
}

return appScriptGen, nil
Expand Down
56 changes: 52 additions & 4 deletions pkg/application/inject/fuse/mount_point_script_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/fluid-cloudnative/fluid/pkg/application/inject/fuse/mutator"
"github.com/fluid-cloudnative/fluid/pkg/application/inject/fuse/poststart"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"github.com/go-logr/logr"
. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -266,17 +267,64 @@ var _ = Describe("Fuse Injector", func() {
})
})

Context("when configmap already exists", func() {
It("should not return an error", func() {
// Create configmap first
Context("when configmap already exists with matching SHA256 annotation", func() {
It("should skip update and return without error", func() {
appScriptGen := poststart.NewScriptGeneratorForApp(namespace)
cm := appScriptGen.BuildConfigmap()
err := fakeClient.Create(context.TODO(), cm)
Expect(err).To(BeNil())

// Try to ensure it exists again
_, err = injector.ensureScriptConfigMapExists(namespace)
Expect(err).To(BeNil())

// Verify configmap is unchanged (no extra update)
retrievedCM := &corev1.ConfigMap{}
err = fakeClient.Get(context.TODO(), client.ObjectKey{Name: cm.Name, Namespace: cm.Namespace}, retrievedCM)
Expect(err).To(BeNil())
Expect(retrievedCM.Annotations).To(HaveKey(common.AnnotationCheckMountScriptSHA256))
Expect(retrievedCM.Annotations[common.AnnotationCheckMountScriptSHA256]).To(Equal(appScriptGen.GetScriptSHA256()))
})
})

Context("when configmap already exists with stale SHA256 annotation", func() {
It("should update the configmap with the latest script and annotation", func() {
appScriptGen := poststart.NewScriptGeneratorForApp(namespace)
cm := appScriptGen.BuildConfigmap()
// Tamper the annotation to simulate an outdated configmap
cm.Annotations[common.AnnotationCheckMountScriptSHA256] = "stale-sha256"
cm.Data["check-fluid-mount-ready.sh"] = "old script content"
err := fakeClient.Create(context.TODO(), cm)
Expect(err).To(BeNil())

_, err = injector.ensureScriptConfigMapExists(namespace)
Expect(err).To(BeNil())

// Verify configmap was updated
retrievedCM := &corev1.ConfigMap{}
err = fakeClient.Get(context.TODO(), client.ObjectKey{Name: cm.Name, Namespace: cm.Namespace}, retrievedCM)
Expect(err).To(BeNil())
Expect(retrievedCM.Annotations[common.AnnotationCheckMountScriptSHA256]).To(Equal(appScriptGen.GetScriptSHA256()))
Expect(retrievedCM.Data["check-fluid-mount-ready.sh"]).NotTo(Equal("old script content"))
})
})

Context("when configmap already exists without SHA256 annotation", func() {
It("should update the configmap to add the annotation", func() {
appScriptGen := poststart.NewScriptGeneratorForApp(namespace)
cm := appScriptGen.BuildConfigmap()
// Remove the annotation to simulate a legacy configmap
delete(cm.Annotations, common.AnnotationCheckMountScriptSHA256)
err := fakeClient.Create(context.TODO(), cm)
Expect(err).To(BeNil())

_, err = injector.ensureScriptConfigMapExists(namespace)
Expect(err).To(BeNil())

retrievedCM := &corev1.ConfigMap{}
err = fakeClient.Get(context.TODO(), client.ObjectKey{Name: cm.Name, Namespace: cm.Namespace}, retrievedCM)
Expect(err).To(BeNil())
Expect(retrievedCM.Annotations).To(HaveKey(common.AnnotationCheckMountScriptSHA256))
Expect(retrievedCM.Annotations[common.AnnotationCheckMountScriptSHA256]).To(Equal(appScriptGen.GetScriptSHA256()))
})
})
})
Expand Down
76 changes: 63 additions & 13 deletions pkg/application/inject/fuse/mutator/mutator_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,24 @@ import (
"encoding/hex"
"fmt"
"path/filepath"
"reflect"
"strings"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/fluid-cloudnative/fluid/pkg/application/inject/fuse/poststart"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
)

var (
Expand Down Expand Up @@ -321,20 +325,9 @@ func prepareFuseContainerPostStartScript(helper *helperData) error {
// Fluid assumes pvc name is the same with runtime's name
gen := poststart.NewDefaultPostStartScriptGenerator()
cmKey := gen.GetNamespacedConfigMapKey(types.NamespacedName{Namespace: datasetNamespace, Name: datasetName}, template.FuseMountInfo.FsType)
found, err := kubeclient.IsConfigMapExist(helper.client, cmKey.Name, cmKey.Namespace)
if err != nil {
return err
}

if !found {
cm := gen.BuildConfigMap(dataset, cmKey)
err = helper.client.Create(context.TODO(), cm)
if err != nil {
// If ConfigMap creation succeeds concurrently, continue to mutate
if otherErr := utils.IgnoreAlreadyExists(err); otherErr != nil {
return err
}
}
if err = ensurePostStartConfigMap(helper.client, gen, dataset, cmKey); err != nil {
return err
}

template.FuseContainer.VolumeMounts = append(template.FuseContainer.VolumeMounts, gen.GetVolumeMount())
Expand All @@ -347,6 +340,63 @@ func prepareFuseContainerPostStartScript(helper *helperData) error {
return nil
}

// ensurePostStartConfigMap creates the ConfigMap if it does not exist, or updates it when the
// script content has changed (detected via SHA256 annotation).
func ensurePostStartConfigMap(c client.Client, gen poststart.ScriptGenerator, dataset *datav1alpha1.Dataset, cmKey types.NamespacedName) error {
existingCM, err := kubeclient.GetConfigmapByName(c, cmKey.Name, cmKey.Namespace)
if err != nil {
return err
}

if existingCM == nil {
cm := gen.BuildConfigMap(dataset, cmKey)
if createErr := c.Create(context.TODO(), cm); createErr != nil {
// If ConfigMap creation succeeds concurrently, continue to mutate
return utils.IgnoreAlreadyExists(createErr)
}
return nil
}

// ConfigMap exists; update with retry on conflict to handle concurrent webhook mutations.
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return updateConfigMapIfStale(c, gen, dataset, cmKey)
})
}

// updateConfigMapIfStale fetches the latest ConfigMap and updates it when the SHA256 annotation
// does not match the current script. It is designed to be called inside a RetryOnConflict loop.
func updateConfigMapIfStale(c client.Client, gen poststart.ScriptGenerator, dataset *datav1alpha1.Dataset, cmKey types.NamespacedName) error {
latest, err := kubeclient.GetConfigmapByName(c, cmKey.Name, cmKey.Namespace)
if err != nil {
return err
}
if latest == nil {
// Deleted between Get calls; recreate
return c.Create(context.TODO(), gen.BuildConfigMap(dataset, cmKey))
}

currentSHA256 := gen.GetScriptSHA256()
if isConfigMapUpToDate(latest.Annotations, currentSHA256) {
return nil
}

updated := gen.RefreshConfigMapContents(dataset, cmKey, latest.DeepCopy())
if reflect.DeepEqual(latest, updated) {
return nil
}
return c.Update(context.TODO(), updated)
}

// isConfigMapUpToDate returns true when the annotations already carry the expected SHA256,
// meaning no update is needed.
func isConfigMapUpToDate(annotations map[string]string, expectedSHA256 string) bool {
if annotations == nil {
return false
}
sha, ok := annotations[common.AnnotationCheckMountScriptSHA256]
return ok && sha == expectedSHA256
}

func transformTemplateWithCacheDirDisabled(helper *helperData) {
template := helper.template
template.FuseContainer.VolumeMounts = utils.TrimVolumeMounts(template.FuseContainer.VolumeMounts, cacheDirNames)
Expand Down
Loading
Loading