Skip to content

feature: support self-maintained workload advancedStatefulSet#5733

Draft
Syspretor wants to merge 1 commit intofluid-cloudnative:masterfrom
Syspretor:feature/init-advanced-sts-controller
Draft

feature: support self-maintained workload advancedStatefulSet#5733
Syspretor wants to merge 1 commit intofluid-cloudnative:masterfrom
Syspretor:feature/init-advanced-sts-controller

Conversation

@Syspretor
Copy link
Copy Markdown
Collaborator

Ⅰ. Describe what this PR does

Ⅱ. Does this pull request fix one issue?

fixes #XXXX

Ⅲ. List the added test cases (unit test/integration test) if any, please explain if no tests are needed.

Ⅳ. Describe how to verify it

Ⅴ. Special notes for reviews

Signed-off-by: 玖宇 <guotongyu.gty@alibaba-inc.com>
@fluid-e2e-bot
Copy link
Copy Markdown

fluid-e2e-bot bot commented Mar 26, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign zwwhdls for approval by writing /assign @zwwhdls in a comment. For more information see:The Kubernetes Code Review Process.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@fluid-e2e-bot
Copy link
Copy Markdown

fluid-e2e-bot bot commented Mar 26, 2026

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

@sonarqubecloud
Copy link
Copy Markdown

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances workload management by introducing the AdvancedStatefulSet custom resource. This new resource provides more granular control over stateful applications, offering flexible update strategies like in-place updates and sophisticated PVC retention policies. The changes lay the groundwork for more robust and efficient management of stateful workloads within the Fluid project.

Highlights

  • AdvancedStatefulSet API Introduction: Introduced a new custom resource definition (CRD) for AdvancedStatefulSet under the workload.fluid.io/v1alpha1 API group, extending Kubernetes' native StatefulSet capabilities.
  • Enhanced Update Strategies: Implemented advanced rolling update strategies, including in-place updates, unordered updates, and priority-based updates for pods within an AdvancedStatefulSet.
  • Flexible PVC Retention Policies: Added support for configurable PersistentVolumeClaim (PVC) retention policies, allowing control over PVC lifecycle during StatefulSet deletion or scaling events.
  • Workload Controller Integration: Integrated the new AdvancedStatefulSet controller into the existing workload-controller binary, enabling its reconciliation logic within the Fluid ecosystem.
  • Comprehensive Utility Libraries: Incorporated a suite of utility packages for managing various aspects of workload control, such as controller history, pod lifecycle, in-place update mechanisms, and resource expectations.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new “workload-controller” component and adds support for the AdvancedStatefulSet workload (including CRD + controller logic), largely by vendoring in Kubernetes client-go informer/lister code and porting Kruise-derived utilities.

Changes:

  • Add AdvancedStatefulSet CRD + sample manifest, plus controller runtime entrypoint (cmd/workload) and Makefile target.
  • Add a large set of workload-controller utilities (expectations, lifecycle, inplace update helpers, sorting, etc.) and supporting vendored k8s packages.
  • Update dependencies (go.mod) and vendor tree to include required client-go informers/listers and k8s helper packages.

Reviewed changes

Copilot reviewed 47 out of 97 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
vendor/modules.txt Vendor manifest updated for newly vendored client-go/k8s packages.
vendor/k8s.io/kubernetes/pkg/util/slice/slice.go Vendored k8s slice utilities used by workload logic.
vendor/k8s.io/kubernetes/pkg/util/hash/hash.go Vendored k8s hash helper used for stable object hashing.
vendor/k8s.io/kubernetes/pkg/fieldpath/fieldpath.go Vendored fieldPath parsing/extraction helpers.
vendor/k8s.io/kubernetes/pkg/fieldpath/doc.go Vendored package doc for fieldpath.
vendor/k8s.io/client-go/listers/core/v1/serviceaccount.go Vendored client-go lister.
vendor/k8s.io/client-go/listers/core/v1/service.go Vendored client-go lister.
vendor/k8s.io/client-go/listers/core/v1/secret.go Vendored client-go lister.
vendor/k8s.io/client-go/listers/core/v1/resourcequota.go Vendored client-go lister.
vendor/k8s.io/client-go/listers/core/v1/replicationcontroller_expansion.go Vendored lister expansion methods.
vendor/k8s.io/client-go/listers/core/v1/replicationcontroller.go Vendored client-go lister.
vendor/k8s.io/client-go/listers/core/v1/podtemplate.go Vendored client-go lister.
vendor/k8s.io/client-go/listers/core/v1/pod.go Vendored client-go lister.
vendor/k8s.io/client-go/listers/core/v1/persistentvolumeclaim.go Vendored client-go lister.
vendor/k8s.io/client-go/listers/core/v1/persistentvolume.go Vendored client-go lister.
vendor/k8s.io/client-go/listers/core/v1/node.go Vendored client-go lister.
vendor/k8s.io/client-go/listers/core/v1/namespace.go Vendored client-go lister.
vendor/k8s.io/client-go/listers/core/v1/limitrange.go Vendored client-go lister.
vendor/k8s.io/client-go/listers/core/v1/expansion_generated.go Vendored generated lister expansions.
vendor/k8s.io/client-go/listers/core/v1/event.go Vendored client-go lister.
vendor/k8s.io/client-go/listers/core/v1/endpoints.go Vendored client-go lister.
vendor/k8s.io/client-go/listers/core/v1/configmap.go Vendored client-go lister.
vendor/k8s.io/client-go/listers/core/v1/componentstatus.go Vendored client-go lister.
vendor/k8s.io/client-go/listers/apps/v1/statefulset_expansion.go Vendored lister expansion methods.
vendor/k8s.io/client-go/listers/apps/v1/statefulset.go Vendored client-go lister.
vendor/k8s.io/client-go/listers/apps/v1/replicaset_expansion.go Vendored lister expansion methods.
vendor/k8s.io/client-go/listers/apps/v1/replicaset.go Vendored client-go lister.
vendor/k8s.io/client-go/listers/apps/v1/expansion_generated.go Vendored generated lister expansions.
vendor/k8s.io/client-go/listers/apps/v1/deployment.go Vendored client-go lister.
vendor/k8s.io/client-go/listers/apps/v1/daemonset_expansion.go Vendored lister expansion methods.
vendor/k8s.io/client-go/listers/apps/v1/daemonset.go Vendored client-go lister.
vendor/k8s.io/client-go/listers/apps/v1/controllerrevision.go Vendored client-go lister.
vendor/k8s.io/client-go/informers/internalinterfaces/factory_interfaces.go Vendored informer factory interfaces.
vendor/k8s.io/client-go/informers/core/v1/serviceaccount.go Vendored client-go informer.
vendor/k8s.io/client-go/informers/core/v1/service.go Vendored client-go informer.
vendor/k8s.io/client-go/informers/core/v1/secret.go Vendored client-go informer.
vendor/k8s.io/client-go/informers/core/v1/resourcequota.go Vendored client-go informer.
vendor/k8s.io/client-go/informers/core/v1/replicationcontroller.go Vendored client-go informer.
vendor/k8s.io/client-go/informers/core/v1/podtemplate.go Vendored client-go informer.
vendor/k8s.io/client-go/informers/core/v1/pod.go Vendored client-go informer.
vendor/k8s.io/client-go/informers/core/v1/persistentvolumeclaim.go Vendored client-go informer.
vendor/k8s.io/client-go/informers/core/v1/persistentvolume.go Vendored client-go informer.
vendor/k8s.io/client-go/informers/core/v1/node.go Vendored client-go informer.
vendor/k8s.io/client-go/informers/core/v1/namespace.go Vendored client-go informer.
vendor/k8s.io/client-go/informers/core/v1/limitrange.go Vendored client-go informer.
vendor/k8s.io/client-go/informers/core/v1/interface.go Vendored group-version informer interface.
vendor/k8s.io/client-go/informers/core/v1/event.go Vendored client-go informer.
vendor/k8s.io/client-go/informers/core/v1/endpoints.go Vendored client-go informer.
vendor/k8s.io/client-go/informers/core/v1/configmap.go Vendored client-go informer.
vendor/k8s.io/client-go/informers/core/v1/componentstatus.go Vendored client-go informer.
pkg/controllers/workload/v1alpha1/utils/util/json.go JSON helpers used by workload utils.
pkg/controllers/workload/v1alpha1/utils/util/image.go Image parsing/equality helpers.
pkg/controllers/workload/v1alpha1/utils/util/cache.go Global cache + cache sync timeout utilities.
pkg/controllers/workload/v1alpha1/utils/updatesort/sort.go Defines sorter interface for update ordering.
pkg/controllers/workload/v1alpha1/utils/updatesort/scatter_sort.go Implements scatter-based update ordering.
pkg/controllers/workload/v1alpha1/utils/updatesort/priority_sort.go Implements priority-based update ordering.
pkg/controllers/workload/v1alpha1/utils/specifieddelete/specified_delete.go Utilities for “specified delete” labeling/patching.
pkg/controllers/workload/v1alpha1/utils/revisionadapter/revision_adapter.go Revision hash adapter abstraction.
pkg/controllers/workload/v1alpha1/utils/revision/revision.go Revision comparison helpers for Pods.
pkg/controllers/workload/v1alpha1/utils/requeueduration/duration.go Stores/reduces requeue durations across reconcile steps.
pkg/controllers/workload/v1alpha1/utils/ratelimiter/rate_limiter.go Configurable workqueue rate limiter.
pkg/controllers/workload/v1alpha1/utils/podreadiness/pod_readiness_utils.go Readiness gate condition/message management.
pkg/controllers/workload/v1alpha1/utils/podreadiness/pod_readiness.go Pod readiness control wrapper.
pkg/controllers/workload/v1alpha1/utils/podadapter/adapter.go Pod adapter abstractions (runtime client / typed client / informer).
pkg/controllers/workload/v1alpha1/utils/lifecycle/lifecycle_utils.go Pod lifecycle state + hook handling utilities.
pkg/controllers/workload/v1alpha1/utils/kubecontroller/pod_control.go K8s-like PodControl utilities used by controller logic.
pkg/controllers/workload/v1alpha1/utils/expectations/update_expectations.go Update expectations tracking for controller reconcile loops.
pkg/controllers/workload/v1alpha1/utils/expectations/scale_expectations.go Scale expectations tracking for controller reconcile loops.
pkg/controllers/workload/v1alpha1/utils/expectations/resource_version_expectation.go ResourceVersion-based expectation tracking.
pkg/controllers/workload/v1alpha1/utils/expectations/init.go Registers expectation timeout CLI flag.
pkg/controllers/workload/v1alpha1/utils/discovery/discovery.go Discovery-based GVK existence checks.
pkg/controllers/workload/v1alpha1/utils/containermeta/env_hash.go Hashing of env vars derived from metadata.
pkg/controllers/workload/v1alpha1/utils/client/client.go Helper for creating controller-runtime client from manager.
pkg/controllers/workload/v1alpha1/utils/api/asts.go Parsing helpers for AdvancedStatefulSet API fields.
pkg/controllers/workload/v1alpha1/advancedstatefulset/stateful_update_utils.go Pod selection/sorting for StatefulSet rolling updates.
pkg/controllers/workload/v1alpha1/advancedstatefulset/stateful_set_status_updater.go Status updater with conflict retry.
pkg/controllers/workload/v1alpha1/advancedstatefulset/pvc_event_handler.go PVC update handler to trigger reconcile.
go.mod Adds/adjusts dependencies (e.g., jsonpatch) for workload controller.
config/samples/workload_v1alpha1_advancedstatefulset.yaml Sample manifest for AdvancedStatefulSet.
config/crd/bases/workload.fluid.io_advancedstatefulsets.yaml CRD definition for AdvancedStatefulSet.
cmd/workload/main.go New workload-controller entrypoint.
cmd/workload/app/workload.go Cobra command wiring + manager startup for workload-controller.
api/workload/v1alpha1/groupversion_info.go API group version registration for workload APIs.
Makefile Adds workload-controller build/run targets.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +14 to +16
flag.DurationVar(&maxDelay, "rate-limiter-max-delay", time.Second*1000, "The max delay for rate limiter. Defaults 1000s")
flag.IntVar(&qps, "rate-limiter-qps", 10, "The qps for rate limier. Defaults 10")
flag.IntVar(&bucketSize, "rate-limiter-bucket-size", 100, "The bucket size for rate limier. Defaults 100")
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flag help text has typos ("rate limier"), which will show up in --help output. Please correct to "rate limiter" for clarity.

Copilot uses AI. Check for mistakes.
Comment on lines +53 to +56
// Duration helps calculate the shortest non-zore duration to requeue
type Duration struct {
sync.Mutex
duration time.Duration
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in comment: "non-zore" should be "non-zero".

Copilot uses AI. Check for mistakes.
Comment on lines +36 to +46
func NewScatterSorterV1beta1(s workloadv1alpha1.UpdateScatterStrategy) Sorter {
// Convert v1beta1 to v1alpha1
v1alpha1Strategy := make(workloadv1alpha1.UpdateScatterStrategy, len(s))
for i, term := range s {
v1alpha1Strategy[i] = workloadv1alpha1.UpdateScatterTerm{
Key: term.Key,
Value: term.Value,
}
}
return &scatterSort{strategy: v1alpha1Strategy}
}
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NewScatterSorterV1beta1 is named and documented as converting from v1beta1, but it accepts/returns the v1alpha1 strategy type and performs a no-op copy. This is misleading for callers/maintainers; consider either removing it or changing the signature/name to reflect the actual API version being handled.

Copilot uses AI. Check for mistakes.
Comment on lines +55 to +64
func init() {
_ = clientgoscheme.AddToScheme(scheme)
_ = workloadv1alpha1.AddToScheme(scheme)

workloadCmd.Flags().StringVarP(&metricsAddr, "metrics-addr", "", ":8084", "The address the metric endpoint binds to.")
workloadCmd.Flags().BoolVarP(&enableLeaderElection, "enable-leader-election", "", false, "Enable leader election for controller manager.")
workloadCmd.Flags().StringVarP(&leaderElectionNamespace, "leader-election-namespace", "", "fluid-system", "The namespace in which the leader election resource will be created.")
workloadCmd.Flags().BoolVarP(&development, "development", "", true, "Enable development mode for workload controller.")
workloadCmd.Flags().StringVarP(&pprofAddr, "pprof-addr", "", "", "The address for pprof to use while exporting profiling results")
}
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workload controller command does not add the standard library flag.CommandLine flags to Cobra (e.g., via workloadCmd.Flags().AddGoFlagSet(flag.CommandLine)). Since the AdvancedStatefulSet controller and several utils register Go flags in init(), those flags will be unusable and passing them will cause "unknown flag" errors. Please import flag and add the Go flag set (and any required flag.Set(...) like other cmds do) so these options can be configured consistently.

Copilot uses AI. Check for mistakes.
if err := r.KubeClient.CoreV1().Pods(namespace).Delete(ctx, podID, metav1.DeleteOptions{}); err != nil {
if apierrors.IsNotFound(err) {
logger.V(4).Info("Pod has already been deleted.", "pod", klog.KRef(namespace, podID))
return err
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeletePod returns the NotFound error when the pod is already deleted. For controller delete operations this should be treated as success; otherwise reconciliation may retry/fail unnecessarily on an already-removed pod.

Suggested change
return err
return nil

Copilot uses AI. Check for mistakes.
Comment on lines +31 to +37
delegatingClient, _ := client.New(cfg, client.Options{
Scheme: mgr.GetScheme(),
Cache: &client.CacheOptions{
Reader: mgr.GetCache(),
Unstructured: true,
},
})
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NewClientFromManager ignores the error returned by client.New and always returns the (possibly nil) client. This can lead to nil dereferences later and makes failures hard to diagnose. Consider returning (client.Client, error) or handling the error explicitly (e.g., logging + exit/panic) to ensure callers can't proceed with an invalid client.

Suggested change
delegatingClient, _ := client.New(cfg, client.Options{
Scheme: mgr.GetScheme(),
Cache: &client.CacheOptions{
Reader: mgr.GetCache(),
Unstructured: true,
},
})
delegatingClient, err := client.New(cfg, client.Options{
Scheme: mgr.GetScheme(),
Cache: &client.CacheOptions{
Reader: mgr.GetCache(),
Unstructured: true,
},
})
if err != nil {
panic(fmt.Sprintf("failed to create delegating client for %q: %v", name, err))
}

Copilot uses AI. Check for mistakes.
Comment on lines +58 to +59
klog.ErrorS(err, "invalid range reserveOrdinal found, an empty slice will be returned", "reserveOrdinal", elem.StrVal)
return nil
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetReserveOrdinalIntSet returns nil on parse error, but callers treat the result as a non-nil set and call methods like Has(...), which will panic. Return an empty set (or change callers to handle nil) and update the log message accordingly (it currently says an empty slice will be returned).

Suggested change
klog.ErrorS(err, "invalid range reserveOrdinal found, an empty slice will be returned", "reserveOrdinal", elem.StrVal)
return nil
klog.ErrorS(err, "invalid range reserveOrdinal found, an empty set will be returned", "reserveOrdinal", elem.StrVal)
return values

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new AdvancedStatefulSet API and controller, largely adapted from the Kruise project, to provide more advanced StatefulSet functionalities within Fluid. This includes new definitions for AdvancedStatefulSet spec and status, various update strategies (e.g., in-place updates, rolling updates), lifecycle hooks, and PVC retention policies. The changes also involve adding a new workload-controller binary, its build targets in the Makefile, and a comprehensive set of utility packages to support the controller's operations. Review comments suggest renaming constants like ContainerLaunchBarrierEnvName and KruisePodReadyConditionType to be more Fluid-specific for consistency. Additionally, there's a suggestion to improve error handling by wrapping errors with %w in stateful_pod_control.go.

AnnotationSubsetPatchKey = "workload.fluid.io/subset-patch"

// ContainerLaunchBarrierEnvName is the env name used to indicate container launch priority barrier.
ContainerLaunchBarrierEnvName = "KRUISE_CONTAINER_LAUNCH_BARRIER"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The constant ContainerLaunchBarrierEnvName seems to be a leftover from the original Kruise implementation. To maintain consistency within the Fluid project, it would be better to rename it to something Fluid-specific, like FLUID_CONTAINER_LAUNCH_BARRIER.

Suggested change
ContainerLaunchBarrierEnvName = "KRUISE_CONTAINER_LAUNCH_BARRIER"
ContainerLaunchBarrierEnvName = "FLUID_CONTAINER_LAUNCH_BARRIER"

Comment on lines +203 to +206
// KruisePodReadyConditionType is the pod condition type for kruise readiness gate.
KruisePodReadyConditionType v1.PodConditionType = "KruisePodReady"
// InPlaceUpdateStrategyKruisePodReadyConditionType is an alias for KruisePodReadyConditionType.
InPlaceUpdateStrategyKruisePodReadyConditionType = KruisePodReadyConditionType
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The constants KruisePodReadyConditionType and InPlaceUpdateStrategyKruisePodReadyConditionType appear to be copied from Kruise. For better consistency and clarity within the Fluid codebase, please consider renaming them to something Fluid-specific, such as FluidPodReadyConditionType.

Suggested change
// KruisePodReadyConditionType is the pod condition type for kruise readiness gate.
KruisePodReadyConditionType v1.PodConditionType = "KruisePodReady"
// InPlaceUpdateStrategyKruisePodReadyConditionType is an alias for KruisePodReadyConditionType.
InPlaceUpdateStrategyKruisePodReadyConditionType = KruisePodReadyConditionType
// FluidPodReadyConditionType is the pod condition type for fluid readiness gate.
FluidPodReadyConditionType v1.PodConditionType = "FluidPodReady"
// InPlaceUpdateStrategyFluidPodReadyConditionType is an alias for FluidPodReadyConditionType.
InPlaceUpdateStrategyFluidPodReadyConditionType = FluidPodReadyConditionType

case apierrors.IsNotFound(err):
klog.V(4).InfoS("Expected claim missing, continuing to pick up in next iteration", "claimName", claimName)
case err != nil:
return false, fmt.Errorf("Could not retrieve claim %s for %s when checking PVC deletion policy", claimName, pod.Name)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better error handling and debugging, it's a good practice to wrap the original error using %w. This provides more context when the error is propagated up the call stack. This pattern of unwrapped errors appears in a few places in this file (e.g., lines 350, 356).

Suggested change
return false, fmt.Errorf("Could not retrieve claim %s for %s when checking PVC deletion policy", claimName, pod.Name)
return false, fmt.Errorf("Could not retrieve claim %s for %s when checking PVC deletion policy: %w", claimName, pod.Name, err)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants