Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions pkg/ddc/alluxio/ufs_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package alluxio
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
"path/filepath"
"reflect"
"strings"
"time"

v1 "k8s.io/api/core/v1"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ddc/alluxio/operations"
Expand Down Expand Up @@ -166,18 +167,11 @@ func (e *AlluxioEngine) processUpdatingUFS(ufsToUpdate *utils.UFSToUpdate) (upda
}

if updateReady {
// need to reset ufsTotal to Calculating so that SyncMetadata will work
datasetToUpdate := dataset.DeepCopy()
datasetToUpdate.Status.UfsTotal = metadataSyncNotDoneMsg
if !reflect.DeepEqual(dataset.Status, datasetToUpdate.Status) {
err = e.Client.Status().Update(context.TODO(), datasetToUpdate)
if err != nil {
e.Log.Error(err, "fail to update ufsTotal of dataset to Calculating")
}
if err = e.resetUfsTotalForSync(); err != nil {
return true, err
}

err = e.SyncMetadata()
if err != nil {
if err = e.SyncMetadata(); err != nil {
// just report this error and ignore it because SyncMetadata isn't on the critical path of Setup
e.Log.Error(err, "SyncMetadata", "dataset", e.name)
return true, nil
Expand All @@ -192,6 +186,24 @@ func (e *AlluxioEngine) processUpdatingUFS(ufsToUpdate *utils.UFSToUpdate) (upda
return
}

// resetUfsTotalForSync resets the dataset's UfsTotal to the "Calculating" sentinel value
// so that SyncMetadata knows it must resync. The update is wrapped with RetryOnConflict
// to handle concurrent status updates gracefully.
func (e *AlluxioEngine) resetUfsTotalForSync() error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
dataset, err := utils.GetDataset(e.Client, e.name, e.namespace)
if err != nil {
return err
}
datasetToUpdate := dataset.DeepCopy()
datasetToUpdate.Status.UfsTotal = metadataSyncNotDoneMsg
if !reflect.DeepEqual(dataset.Status, datasetToUpdate.Status) {
return e.Client.Status().Update(context.Background(), datasetToUpdate)
}
return nil
})
}

// updatingUFSWithMountCommand updates the Alluxio UFS mount points based on the differences identified in ufsToUpdate.
// It performs mount operations for new UFS paths specified in ufsToUpdate.ToAdd() and unmount operations for paths
// listed in ufsToUpdate.ToRemove(). The function skips mount points using Fluid native schemes as they are not editable.
Expand Down
31 changes: 22 additions & 9 deletions pkg/ddc/goosefs/ufs_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
securityutil "github.com/fluid-cloudnative/fluid/pkg/utils/security"
"github.com/pkg/errors"
"k8s.io/client-go/util/retry"
)

func (e *GooseFSEngine) usedStorageBytesInternal() (value int64, err error) {
Expand Down Expand Up @@ -235,17 +236,11 @@ func (e *GooseFSEngine) processUpdatingUFS(ufsToUpdate *utils.UFSToUpdate) (err
}
}
// need to reset ufsTotal to Calculating so that SyncMetadata will work
datasetToUpdate := dataset.DeepCopy()
datasetToUpdate.Status.UfsTotal = MetadataSyncNotDoneMsg
if !reflect.DeepEqual(dataset.Status, datasetToUpdate.Status) {
err = e.Client.Status().Update(context.TODO(), datasetToUpdate)
if err != nil {
e.Log.Error(err, "fail to update ufsTotal of dataset to Calculating")
}
if err = e.resetUfsTotalForSync(); err != nil {
return err
}

err = e.SyncMetadata()
if err != nil {
if err = e.SyncMetadata(); err != nil {
// just report this error and ignore it because SyncMetadata isn't on the critical path of Setup
e.Log.Error(err, "SyncMetadata", "dataset", e.name)
return nil
Expand All @@ -254,6 +249,24 @@ func (e *GooseFSEngine) processUpdatingUFS(ufsToUpdate *utils.UFSToUpdate) (err
return nil
}

// resetUfsTotalForSync resets the dataset's UfsTotal to the "Calculating" sentinel value
// so that SyncMetadata knows it must resync. The update is wrapped with RetryOnConflict
// to handle concurrent status updates gracefully.
func (e *GooseFSEngine) resetUfsTotalForSync() error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
dataset, err := utils.GetDataset(e.Client, e.name, e.namespace)
if err != nil {
return err
}
datasetToUpdate := dataset.DeepCopy()
datasetToUpdate.Status.UfsTotal = MetadataSyncNotDoneMsg
if !reflect.DeepEqual(dataset.Status, datasetToUpdate.Status) {
return e.Client.Status().Update(context.Background(), datasetToUpdate)
}
return nil
})
}

// mountUFS() mount all UFSs to GooseFS according to mount points in `dataset.Spec`. If a mount point is Fluid-native, mountUFS() will skip it.
func (e *GooseFSEngine) mountUFS() (err error) {
dataset, err := utils.GetDataset(e.Client, e.name, e.namespace)
Expand Down
Loading