Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,8 @@ const (
SkipPrecheckAnnotationKey = "sidecar.fluid.io/skip-precheck"
HostMountPathModeOnDefaultPlatformKey = "default.fuse-sidecar.fluid.io/host-mount-path-mode"
)

const (
// DatasetPolicyAutoCreate indicates that a Dataset should be auto-created for the Runtime.
DatasetPolicyAutoCreate = "auto-create"
)
4 changes: 4 additions & 0 deletions pkg/common/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ const (
// i.e. fluid.io/check-mount-script-sha256
AnnotationCheckMountScriptSHA256 = LabelAnnotationPrefix + "check-mount-script-sha256"

// AnnotationDatasetPolicy is a runtime annotation that controls how Dataset is handled.
// i.e. fluid.io/dataset-policy
AnnotationDatasetPolicy = LabelAnnotationPrefix + "dataset-policy"

// AnnotationDisableRuntimeHelmValueConfig is a runtime label indicates the configmap contains helm value will not be created in setup.
AnnotationDisableRuntimeHelmValueConfig = "runtime." + LabelAnnotationPrefix + "disable-helm-value-config"

Expand Down
76 changes: 69 additions & 7 deletions pkg/controllers/runtime_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/validation/field"
Expand All @@ -34,8 +35,7 @@ import (
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

// "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/fluid-cloudnative/fluid/pkg/dump"
fluiderrs "github.com/fluid-cloudnative/fluid/pkg/errors"
Expand Down Expand Up @@ -110,14 +110,27 @@ func (r *RuntimeReconciler) ReconcileInternal(ctx cruntime.ReconcileRequestConte
return utils.RequeueIfError(err)
}

var datasetPolicy string
if annotations := objectMeta.GetAnnotations(); annotations != nil {
datasetPolicy = annotations[common.AnnotationDatasetPolicy]
}

// 5.Get the dataset
dataset, err := r.GetDataset(ctx)
if err != nil {
// r.Recorder.Eventf(ctx.Dataset, corev1.EventTypeWarning, common.ErrorProcessRuntimeReason, "Process Runtime error %v", err)
if utils.IgnoreNotFound(err) == nil {
ctx.Log.Info("The dataset is not found", "dataset", ctx.NamespacedName)
dataset = nil
// return ctrl.Result{}, nil
if datasetPolicy == common.DatasetPolicyAutoCreate {
ctx.Log.Info("The dataset is not found, auto-creating according to policy", "dataset", ctx.NamespacedName)
dataset, err = r.ensureDatasetForRuntime(ctx, objectMeta)
if err != nil {
ctx.Log.Error(err, "Failed to auto-create the dataset")
r.Recorder.Eventf(runtime, corev1.EventTypeWarning, common.ErrorCreateDataset, "Failed to auto-create dataset: %v", err)
return utils.RequeueAfterInterval(5 * time.Second)
}
} else {
ctx.Log.Info("The dataset is not found", "dataset", ctx.NamespacedName)
dataset = nil
}
} else {
ctx.Log.Error(err, "Failed to get the ddc dataset")
return utils.RequeueIfError(errors.Wrap(err, "Unable to get dataset"))
Expand Down Expand Up @@ -172,7 +185,11 @@ func (r *RuntimeReconciler) ReconcileInternal(ctx cruntime.ReconcileRequestConte
}
} else {
// If dataset is nil, need to wait because the user may have not created dataset
ctx.Log.Info("No dataset can be bound to the runtime, waiting.")
if datasetPolicy == common.DatasetPolicyAutoCreate {
ctx.Log.Info("No dataset is available for the runtime after auto-create, waiting.", "dataset", ctx.NamespacedName)
} else {
ctx.Log.Info("No dataset can be bound to the runtime, waiting.")
}
r.Recorder.Event(runtime, corev1.EventTypeWarning, common.ErrorProcessRuntimeReason, "No dataset can be bound to the runtime, waiting.")
return utils.RequeueAfterInterval(time.Duration(5 * time.Second))
}
Expand Down Expand Up @@ -385,6 +402,51 @@ func (r *RuntimeReconciler) GetDataset(ctx cruntime.ReconcileRequestContext) (*d
return &dataset, nil
}

func (r *RuntimeReconciler) ensureDatasetForRuntime(ctx cruntime.ReconcileRequestContext, objectMeta metav1.Object) (*datav1alpha1.Dataset, error) {
runtime := ctx.Runtime
if runtime == nil {
return nil, fmt.Errorf("runtime is nil")
}

annotations := make(map[string]string)
for k, v := range objectMeta.GetAnnotations() {
annotations[k] = v
}
annotations[common.AnnotationDatasetPolicy] = common.DatasetPolicyAutoCreate

dataset := &datav1alpha1.Dataset{
TypeMeta: metav1.TypeMeta{
Kind: datav1alpha1.Datasetkind,
APIVersion: datav1alpha1.GroupVersion.Group + "/" + datav1alpha1.GroupVersion.Version,
},
ObjectMeta: metav1.ObjectMeta{
Name: objectMeta.GetName(),
Namespace: objectMeta.GetNamespace(),
Annotations: annotations,
},
}
Comment thread
jakharmonika364 marked this conversation as resolved.

// SetControllerReference looks up the GVK from the scheme, avoiding the
// empty-TypeMeta problem that arises when runtime.GetObjectKind() is used
// on objects retrieved via controller-runtime's client.Get().
if err := controllerutil.SetControllerReference(runtime, dataset, r.Client.Scheme()); err != nil {
return nil, fmt.Errorf("failed to set controller reference on auto-created dataset: %w", err)
}

err := r.Create(ctx, dataset)
if err != nil && !apierrors.IsAlreadyExists(err) {
return nil, err
}

if err := r.Get(ctx, ctx.NamespacedName, dataset); err != nil {
return nil, err
}

r.Recorder.Eventf(runtime, corev1.EventTypeNormal, common.Succeed, "Auto-created Dataset %s for Runtime", dataset.Name)

return dataset, nil
}

func (r *RuntimeReconciler) CheckIfReferenceDatasetIsSupported(ctx cruntime.ReconcileRequestContext) (bool, string) {
mounted := base.GetPhysicalDatasetFromMounts(ctx.Dataset.Spec.Mounts)

Expand Down
1 change: 1 addition & 0 deletions pkg/ddc/alluxio/ufs_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (e *AlluxioEngine) processUpdatingUFS(ufsToUpdate *utils.UFSToUpdate) (upda

return
}

// updatingUFSWithMountCommand updates the Alluxio UFS mount points based on the differences identified in ufsToUpdate.
// It performs mount operations for new UFS paths specified in ufsToUpdate.ToAdd() and unmount operations for paths
// listed in ufsToUpdate.ToRemove(). The function skips mount points using Fluid native schemes as they are not editable.
Expand Down
Loading