feature: support self-maintained workload advancedStatefulSet#5733
feature: support self-maintained workload advancedStatefulSet#5733Syspretor wants to merge 1 commit intofluid-cloudnative:masterfrom
Conversation
Signed-off-by: 玖宇 <guotongyu.gty@alibaba-inc.com>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Skipping CI for Draft Pull Request. |
|
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances workload management by introducing the Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Pull request overview
This PR introduces a new “workload-controller” component and adds support for the AdvancedStatefulSet workload (including CRD + controller logic), largely by vendoring in Kubernetes client-go informer/lister code and porting Kruise-derived utilities.
Changes:
- Add
AdvancedStatefulSetCRD + sample manifest, plus controller runtime entrypoint (cmd/workload) and Makefile target. - Add a large set of workload-controller utilities (expectations, lifecycle, inplace update helpers, sorting, etc.) and supporting vendored k8s packages.
- Update dependencies (
go.mod) and vendor tree to include required client-go informers/listers and k8s helper packages.
Reviewed changes
Copilot reviewed 47 out of 97 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| vendor/modules.txt | Vendor manifest updated for newly vendored client-go/k8s packages. |
| vendor/k8s.io/kubernetes/pkg/util/slice/slice.go | Vendored k8s slice utilities used by workload logic. |
| vendor/k8s.io/kubernetes/pkg/util/hash/hash.go | Vendored k8s hash helper used for stable object hashing. |
| vendor/k8s.io/kubernetes/pkg/fieldpath/fieldpath.go | Vendored fieldPath parsing/extraction helpers. |
| vendor/k8s.io/kubernetes/pkg/fieldpath/doc.go | Vendored package doc for fieldpath. |
| vendor/k8s.io/client-go/listers/core/v1/serviceaccount.go | Vendored client-go lister. |
| vendor/k8s.io/client-go/listers/core/v1/service.go | Vendored client-go lister. |
| vendor/k8s.io/client-go/listers/core/v1/secret.go | Vendored client-go lister. |
| vendor/k8s.io/client-go/listers/core/v1/resourcequota.go | Vendored client-go lister. |
| vendor/k8s.io/client-go/listers/core/v1/replicationcontroller_expansion.go | Vendored lister expansion methods. |
| vendor/k8s.io/client-go/listers/core/v1/replicationcontroller.go | Vendored client-go lister. |
| vendor/k8s.io/client-go/listers/core/v1/podtemplate.go | Vendored client-go lister. |
| vendor/k8s.io/client-go/listers/core/v1/pod.go | Vendored client-go lister. |
| vendor/k8s.io/client-go/listers/core/v1/persistentvolumeclaim.go | Vendored client-go lister. |
| vendor/k8s.io/client-go/listers/core/v1/persistentvolume.go | Vendored client-go lister. |
| vendor/k8s.io/client-go/listers/core/v1/node.go | Vendored client-go lister. |
| vendor/k8s.io/client-go/listers/core/v1/namespace.go | Vendored client-go lister. |
| vendor/k8s.io/client-go/listers/core/v1/limitrange.go | Vendored client-go lister. |
| vendor/k8s.io/client-go/listers/core/v1/expansion_generated.go | Vendored generated lister expansions. |
| vendor/k8s.io/client-go/listers/core/v1/event.go | Vendored client-go lister. |
| vendor/k8s.io/client-go/listers/core/v1/endpoints.go | Vendored client-go lister. |
| vendor/k8s.io/client-go/listers/core/v1/configmap.go | Vendored client-go lister. |
| vendor/k8s.io/client-go/listers/core/v1/componentstatus.go | Vendored client-go lister. |
| vendor/k8s.io/client-go/listers/apps/v1/statefulset_expansion.go | Vendored lister expansion methods. |
| vendor/k8s.io/client-go/listers/apps/v1/statefulset.go | Vendored client-go lister. |
| vendor/k8s.io/client-go/listers/apps/v1/replicaset_expansion.go | Vendored lister expansion methods. |
| vendor/k8s.io/client-go/listers/apps/v1/replicaset.go | Vendored client-go lister. |
| vendor/k8s.io/client-go/listers/apps/v1/expansion_generated.go | Vendored generated lister expansions. |
| vendor/k8s.io/client-go/listers/apps/v1/deployment.go | Vendored client-go lister. |
| vendor/k8s.io/client-go/listers/apps/v1/daemonset_expansion.go | Vendored lister expansion methods. |
| vendor/k8s.io/client-go/listers/apps/v1/daemonset.go | Vendored client-go lister. |
| vendor/k8s.io/client-go/listers/apps/v1/controllerrevision.go | Vendored client-go lister. |
| vendor/k8s.io/client-go/informers/internalinterfaces/factory_interfaces.go | Vendored informer factory interfaces. |
| vendor/k8s.io/client-go/informers/core/v1/serviceaccount.go | Vendored client-go informer. |
| vendor/k8s.io/client-go/informers/core/v1/service.go | Vendored client-go informer. |
| vendor/k8s.io/client-go/informers/core/v1/secret.go | Vendored client-go informer. |
| vendor/k8s.io/client-go/informers/core/v1/resourcequota.go | Vendored client-go informer. |
| vendor/k8s.io/client-go/informers/core/v1/replicationcontroller.go | Vendored client-go informer. |
| vendor/k8s.io/client-go/informers/core/v1/podtemplate.go | Vendored client-go informer. |
| vendor/k8s.io/client-go/informers/core/v1/pod.go | Vendored client-go informer. |
| vendor/k8s.io/client-go/informers/core/v1/persistentvolumeclaim.go | Vendored client-go informer. |
| vendor/k8s.io/client-go/informers/core/v1/persistentvolume.go | Vendored client-go informer. |
| vendor/k8s.io/client-go/informers/core/v1/node.go | Vendored client-go informer. |
| vendor/k8s.io/client-go/informers/core/v1/namespace.go | Vendored client-go informer. |
| vendor/k8s.io/client-go/informers/core/v1/limitrange.go | Vendored client-go informer. |
| vendor/k8s.io/client-go/informers/core/v1/interface.go | Vendored group-version informer interface. |
| vendor/k8s.io/client-go/informers/core/v1/event.go | Vendored client-go informer. |
| vendor/k8s.io/client-go/informers/core/v1/endpoints.go | Vendored client-go informer. |
| vendor/k8s.io/client-go/informers/core/v1/configmap.go | Vendored client-go informer. |
| vendor/k8s.io/client-go/informers/core/v1/componentstatus.go | Vendored client-go informer. |
| pkg/controllers/workload/v1alpha1/utils/util/json.go | JSON helpers used by workload utils. |
| pkg/controllers/workload/v1alpha1/utils/util/image.go | Image parsing/equality helpers. |
| pkg/controllers/workload/v1alpha1/utils/util/cache.go | Global cache + cache sync timeout utilities. |
| pkg/controllers/workload/v1alpha1/utils/updatesort/sort.go | Defines sorter interface for update ordering. |
| pkg/controllers/workload/v1alpha1/utils/updatesort/scatter_sort.go | Implements scatter-based update ordering. |
| pkg/controllers/workload/v1alpha1/utils/updatesort/priority_sort.go | Implements priority-based update ordering. |
| pkg/controllers/workload/v1alpha1/utils/specifieddelete/specified_delete.go | Utilities for “specified delete” labeling/patching. |
| pkg/controllers/workload/v1alpha1/utils/revisionadapter/revision_adapter.go | Revision hash adapter abstraction. |
| pkg/controllers/workload/v1alpha1/utils/revision/revision.go | Revision comparison helpers for Pods. |
| pkg/controllers/workload/v1alpha1/utils/requeueduration/duration.go | Stores/reduces requeue durations across reconcile steps. |
| pkg/controllers/workload/v1alpha1/utils/ratelimiter/rate_limiter.go | Configurable workqueue rate limiter. |
| pkg/controllers/workload/v1alpha1/utils/podreadiness/pod_readiness_utils.go | Readiness gate condition/message management. |
| pkg/controllers/workload/v1alpha1/utils/podreadiness/pod_readiness.go | Pod readiness control wrapper. |
| pkg/controllers/workload/v1alpha1/utils/podadapter/adapter.go | Pod adapter abstractions (runtime client / typed client / informer). |
| pkg/controllers/workload/v1alpha1/utils/lifecycle/lifecycle_utils.go | Pod lifecycle state + hook handling utilities. |
| pkg/controllers/workload/v1alpha1/utils/kubecontroller/pod_control.go | K8s-like PodControl utilities used by controller logic. |
| pkg/controllers/workload/v1alpha1/utils/expectations/update_expectations.go | Update expectations tracking for controller reconcile loops. |
| pkg/controllers/workload/v1alpha1/utils/expectations/scale_expectations.go | Scale expectations tracking for controller reconcile loops. |
| pkg/controllers/workload/v1alpha1/utils/expectations/resource_version_expectation.go | ResourceVersion-based expectation tracking. |
| pkg/controllers/workload/v1alpha1/utils/expectations/init.go | Registers expectation timeout CLI flag. |
| pkg/controllers/workload/v1alpha1/utils/discovery/discovery.go | Discovery-based GVK existence checks. |
| pkg/controllers/workload/v1alpha1/utils/containermeta/env_hash.go | Hashing of env vars derived from metadata. |
| pkg/controllers/workload/v1alpha1/utils/client/client.go | Helper for creating controller-runtime client from manager. |
| pkg/controllers/workload/v1alpha1/utils/api/asts.go | Parsing helpers for AdvancedStatefulSet API fields. |
| pkg/controllers/workload/v1alpha1/advancedstatefulset/stateful_update_utils.go | Pod selection/sorting for StatefulSet rolling updates. |
| pkg/controllers/workload/v1alpha1/advancedstatefulset/stateful_set_status_updater.go | Status updater with conflict retry. |
| pkg/controllers/workload/v1alpha1/advancedstatefulset/pvc_event_handler.go | PVC update handler to trigger reconcile. |
| go.mod | Adds/adjusts dependencies (e.g., jsonpatch) for workload controller. |
| config/samples/workload_v1alpha1_advancedstatefulset.yaml | Sample manifest for AdvancedStatefulSet. |
| config/crd/bases/workload.fluid.io_advancedstatefulsets.yaml | CRD definition for AdvancedStatefulSet. |
| cmd/workload/main.go | New workload-controller entrypoint. |
| cmd/workload/app/workload.go | Cobra command wiring + manager startup for workload-controller. |
| api/workload/v1alpha1/groupversion_info.go | API group version registration for workload APIs. |
| Makefile | Adds workload-controller build/run targets. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| flag.DurationVar(&maxDelay, "rate-limiter-max-delay", time.Second*1000, "The max delay for rate limiter. Defaults 1000s") | ||
| flag.IntVar(&qps, "rate-limiter-qps", 10, "The qps for rate limier. Defaults 10") | ||
| flag.IntVar(&bucketSize, "rate-limiter-bucket-size", 100, "The bucket size for rate limier. Defaults 100") |
There was a problem hiding this comment.
Flag help text has typos ("rate limier"), which will show up in --help output. Please correct to "rate limiter" for clarity.
| // Duration helps calculate the shortest non-zore duration to requeue | ||
| type Duration struct { | ||
| sync.Mutex | ||
| duration time.Duration |
There was a problem hiding this comment.
Typo in comment: "non-zore" should be "non-zero".
| func NewScatterSorterV1beta1(s workloadv1alpha1.UpdateScatterStrategy) Sorter { | ||
| // Convert v1beta1 to v1alpha1 | ||
| v1alpha1Strategy := make(workloadv1alpha1.UpdateScatterStrategy, len(s)) | ||
| for i, term := range s { | ||
| v1alpha1Strategy[i] = workloadv1alpha1.UpdateScatterTerm{ | ||
| Key: term.Key, | ||
| Value: term.Value, | ||
| } | ||
| } | ||
| return &scatterSort{strategy: v1alpha1Strategy} | ||
| } |
There was a problem hiding this comment.
NewScatterSorterV1beta1 is named and documented as converting from v1beta1, but it accepts/returns the v1alpha1 strategy type and performs a no-op copy. This is misleading for callers/maintainers; consider either removing it or changing the signature/name to reflect the actual API version being handled.
| func init() { | ||
| _ = clientgoscheme.AddToScheme(scheme) | ||
| _ = workloadv1alpha1.AddToScheme(scheme) | ||
|
|
||
| workloadCmd.Flags().StringVarP(&metricsAddr, "metrics-addr", "", ":8084", "The address the metric endpoint binds to.") | ||
| workloadCmd.Flags().BoolVarP(&enableLeaderElection, "enable-leader-election", "", false, "Enable leader election for controller manager.") | ||
| workloadCmd.Flags().StringVarP(&leaderElectionNamespace, "leader-election-namespace", "", "fluid-system", "The namespace in which the leader election resource will be created.") | ||
| workloadCmd.Flags().BoolVarP(&development, "development", "", true, "Enable development mode for workload controller.") | ||
| workloadCmd.Flags().StringVarP(&pprofAddr, "pprof-addr", "", "", "The address for pprof to use while exporting profiling results") | ||
| } |
There was a problem hiding this comment.
workload controller command does not add the standard library flag.CommandLine flags to Cobra (e.g., via workloadCmd.Flags().AddGoFlagSet(flag.CommandLine)). Since the AdvancedStatefulSet controller and several utils register Go flags in init(), those flags will be unusable and passing them will cause "unknown flag" errors. Please import flag and add the Go flag set (and any required flag.Set(...) like other cmds do) so these options can be configured consistently.
| if err := r.KubeClient.CoreV1().Pods(namespace).Delete(ctx, podID, metav1.DeleteOptions{}); err != nil { | ||
| if apierrors.IsNotFound(err) { | ||
| logger.V(4).Info("Pod has already been deleted.", "pod", klog.KRef(namespace, podID)) | ||
| return err |
There was a problem hiding this comment.
DeletePod returns the NotFound error when the pod is already deleted. For controller delete operations this should be treated as success; otherwise reconciliation may retry/fail unnecessarily on an already-removed pod.
| return err | |
| return nil |
| delegatingClient, _ := client.New(cfg, client.Options{ | ||
| Scheme: mgr.GetScheme(), | ||
| Cache: &client.CacheOptions{ | ||
| Reader: mgr.GetCache(), | ||
| Unstructured: true, | ||
| }, | ||
| }) |
There was a problem hiding this comment.
NewClientFromManager ignores the error returned by client.New and always returns the (possibly nil) client. This can lead to nil dereferences later and makes failures hard to diagnose. Consider returning (client.Client, error) or handling the error explicitly (e.g., logging + exit/panic) to ensure callers can't proceed with an invalid client.
| delegatingClient, _ := client.New(cfg, client.Options{ | |
| Scheme: mgr.GetScheme(), | |
| Cache: &client.CacheOptions{ | |
| Reader: mgr.GetCache(), | |
| Unstructured: true, | |
| }, | |
| }) | |
| delegatingClient, err := client.New(cfg, client.Options{ | |
| Scheme: mgr.GetScheme(), | |
| Cache: &client.CacheOptions{ | |
| Reader: mgr.GetCache(), | |
| Unstructured: true, | |
| }, | |
| }) | |
| if err != nil { | |
| panic(fmt.Sprintf("failed to create delegating client for %q: %v", name, err)) | |
| } |
| klog.ErrorS(err, "invalid range reserveOrdinal found, an empty slice will be returned", "reserveOrdinal", elem.StrVal) | ||
| return nil |
There was a problem hiding this comment.
GetReserveOrdinalIntSet returns nil on parse error, but callers treat the result as a non-nil set and call methods like Has(...), which will panic. Return an empty set (or change callers to handle nil) and update the log message accordingly (it currently says an empty slice will be returned).
| klog.ErrorS(err, "invalid range reserveOrdinal found, an empty slice will be returned", "reserveOrdinal", elem.StrVal) | |
| return nil | |
| klog.ErrorS(err, "invalid range reserveOrdinal found, an empty set will be returned", "reserveOrdinal", elem.StrVal) | |
| return values |
There was a problem hiding this comment.
Code Review
This pull request introduces a new AdvancedStatefulSet API and controller, largely adapted from the Kruise project, to provide more advanced StatefulSet functionalities within Fluid. This includes new definitions for AdvancedStatefulSet spec and status, various update strategies (e.g., in-place updates, rolling updates), lifecycle hooks, and PVC retention policies. The changes also involve adding a new workload-controller binary, its build targets in the Makefile, and a comprehensive set of utility packages to support the controller's operations. Review comments suggest renaming constants like ContainerLaunchBarrierEnvName and KruisePodReadyConditionType to be more Fluid-specific for consistency. Additionally, there's a suggestion to improve error handling by wrapping errors with %w in stateful_pod_control.go.
| AnnotationSubsetPatchKey = "workload.fluid.io/subset-patch" | ||
|
|
||
| // ContainerLaunchBarrierEnvName is the env name used to indicate container launch priority barrier. | ||
| ContainerLaunchBarrierEnvName = "KRUISE_CONTAINER_LAUNCH_BARRIER" |
There was a problem hiding this comment.
The constant ContainerLaunchBarrierEnvName seems to be a leftover from the original Kruise implementation. To maintain consistency within the Fluid project, it would be better to rename it to something Fluid-specific, like FLUID_CONTAINER_LAUNCH_BARRIER.
| ContainerLaunchBarrierEnvName = "KRUISE_CONTAINER_LAUNCH_BARRIER" | |
| ContainerLaunchBarrierEnvName = "FLUID_CONTAINER_LAUNCH_BARRIER" |
| // KruisePodReadyConditionType is the pod condition type for kruise readiness gate. | ||
| KruisePodReadyConditionType v1.PodConditionType = "KruisePodReady" | ||
| // InPlaceUpdateStrategyKruisePodReadyConditionType is an alias for KruisePodReadyConditionType. | ||
| InPlaceUpdateStrategyKruisePodReadyConditionType = KruisePodReadyConditionType |
There was a problem hiding this comment.
The constants KruisePodReadyConditionType and InPlaceUpdateStrategyKruisePodReadyConditionType appear to be copied from Kruise. For better consistency and clarity within the Fluid codebase, please consider renaming them to something Fluid-specific, such as FluidPodReadyConditionType.
| // KruisePodReadyConditionType is the pod condition type for kruise readiness gate. | |
| KruisePodReadyConditionType v1.PodConditionType = "KruisePodReady" | |
| // InPlaceUpdateStrategyKruisePodReadyConditionType is an alias for KruisePodReadyConditionType. | |
| InPlaceUpdateStrategyKruisePodReadyConditionType = KruisePodReadyConditionType | |
| // FluidPodReadyConditionType is the pod condition type for fluid readiness gate. | |
| FluidPodReadyConditionType v1.PodConditionType = "FluidPodReady" | |
| // InPlaceUpdateStrategyFluidPodReadyConditionType is an alias for FluidPodReadyConditionType. | |
| InPlaceUpdateStrategyFluidPodReadyConditionType = FluidPodReadyConditionType |
| case apierrors.IsNotFound(err): | ||
| klog.V(4).InfoS("Expected claim missing, continuing to pick up in next iteration", "claimName", claimName) | ||
| case err != nil: | ||
| return false, fmt.Errorf("Could not retrieve claim %s for %s when checking PVC deletion policy", claimName, pod.Name) |
There was a problem hiding this comment.
For better error handling and debugging, it's a good practice to wrap the original error using %w. This provides more context when the error is propagated up the call stack. This pattern of unwrapped errors appears in a few places in this file (e.g., lines 350, 356).
| return false, fmt.Errorf("Could not retrieve claim %s for %s when checking PVC deletion policy", claimName, pod.Name) | |
| return false, fmt.Errorf("Could not retrieve claim %s for %s when checking PVC deletion policy: %w", claimName, pod.Name, err) |



Ⅰ. Describe what this PR does
Ⅱ. Does this pull request fix one issue?
fixes #XXXX
Ⅲ. List the added test cases (unit test/integration test) if any, please explain if no tests are needed.
Ⅳ. Describe how to verify it
Ⅴ. Special notes for reviews