feat(asts): init advancedStatefulSet#1
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces the initial AdvancedStatefulSet (ASTS) controller implementation and ports a set of supporting controller utilities (in-place update, expectations, controller history, controller-ref management, etc.) into this repo, along with CRD/RBAC manifests and an example.
Changes:
- Added AdvancedStatefulSet controller scaffolding (reconcile loop, pod/PVC watches, status updater, pod control).
- Added a large set of workload/controller utility packages (in-place update, expectations, controller history, discovery, update sorting, pod readiness/lifecycle, etc.).
- Added module/manifests/docs/examples to build and run the controller (root + submodule go.mod, CRD, RBAC, README, Makefile, example).
Reviewed changes
Copilot reviewed 49 out of 52 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/workload/utils/util/selector.go | Label selector overlap helpers and a fast-path selector conversion utility. |
| pkg/workload/utils/util/pods.go | Pod/container/condition helper utilities used across controllers. |
| pkg/workload/utils/util/json.go | JSON dump/equality helpers. |
| pkg/workload/utils/util/image.go | Image digest/tag comparison helpers. |
| pkg/workload/utils/util/cache.go | Shared cache store and controller cache sync timeout wiring. |
| pkg/workload/utils/updatesort/sort.go | Sorter interface for pod update ordering. |
| pkg/workload/utils/updatesort/scatter_sort.go | Scatter-based update ordering implementation. |
| pkg/workload/utils/updatesort/priority_sort.go | Priority-based update ordering implementation. |
| pkg/workload/utils/specifieddelete/specified_delete.go | Label-based “specified delete” helpers for pods. |
| pkg/workload/utils/revisionadapter/revision_adapter.go | Adapter interface for reading/writing revision hash labels. |
| pkg/workload/utils/revision/revision.go | Revision hash comparison helpers (incl. short-hash comparison). |
| pkg/workload/utils/requeueduration/duration.go | Utilities for accumulating/merging reconcile requeue durations. |
| pkg/workload/utils/ratelimiter/rate_limiter.go | Default controller-runtime rate limiter wiring with flags. |
| pkg/workload/utils/podreadiness/pod_readiness_utils.go | Readiness-gate condition message management for pods. |
| pkg/workload/utils/podreadiness/pod_readiness.go | Pod readiness control interface + message serialization. |
| pkg/workload/utils/podadapter/adapter.go | Adapter layer for runtime client / typed client / informer access to Pods. |
| pkg/workload/utils/lifecycle/lifecycle_utils.go | Pod lifecycle state transitions + readiness integration. |
| pkg/workload/utils/kubecontroller/pod_control.go | Pod create/patch/delete utilities adapted from upstream controller patterns. |
| pkg/workload/utils/kubecontroller/controller_ref_manager.go | ControllerRef claiming/adoption/release helpers for Pods. |
| pkg/workload/utils/inplaceupdate/inplace_update_vertical.go | Native vertical in-place resize implementation + patch generation. |
| pkg/workload/utils/inplaceupdate/inplace_update_defaults.go | Default in-place update spec calculation and completion checks. |
| pkg/workload/utils/inplaceupdate/inplace_update.go | In-place update orchestration (grace period, batching, readiness gate). |
| pkg/workload/utils/expectations/update_expectations.go | Update expectations tracking keyed by controller + revision. |
| pkg/workload/utils/expectations/scale_expectations.go | Scale expectations tracking (create/delete). |
| pkg/workload/utils/expectations/resource_version_expectation.go | ResourceVersion-based expectation tracking. |
| pkg/workload/utils/expectations/init.go | Flag for expectation timeout configuration. |
| pkg/workload/utils/discovery/discovery.go | Discovery-based GVK presence checks for conditional controller startup. |
| pkg/workload/utils/controllerhistory/controller_history.go | ControllerRevision history management utilities. |
| pkg/workload/utils/containermeta/env_hash.go | Hashing utilities for envs derived from metadata. |
| pkg/workload/utils/client/client.go | Helper for constructing controller-runtime client from manager config. |
| pkg/workload/utils/api/asts.go | Reserve ordinal parsing helpers for ASTS features. |
| pkg/workload/go.mod | Workload submodule dependency definition (k8s/controller-runtime/etc.). |
| pkg/workload/advancedstatefulset/statefulset_controller.go | ASTS controller wiring: manager add, watches, reconciler setup, core reconcile flow. |
| pkg/workload/advancedstatefulset/stateful_update_utils.go | Pod selection/sorting for rolling updates. |
| pkg/workload/advancedstatefulset/stateful_set_status_updater.go | Status update helper with conflict retries. |
| pkg/workload/advancedstatefulset/stateful_pod_control.go | StatefulSet pod/PVC creation/update/delete control logic. |
| pkg/workload/advancedstatefulset/pvc_event_handler.go | PVC event handler to requeue owning ASTS on PVC updates. |
| go.mod | Root module definition tying api and workload submodules together via replace. |
| example/advancedstatefulset/main.go | Runnable example manager wiring that registers the ASTS controller and can create an example ASTS. |
| config/rbac/role.yaml | ClusterRole for the ASTS controller. |
| config/crd/workload.fluid.io_advancedstatefulsets.yaml | CRD manifest for AdvancedStatefulSet. |
| api/workload/v1alpha1/groupversion_info.go | API group registration and scheme/codecs setup. |
| api/go.sum | API module dependency checksums. |
| api/go.mod | API module dependency definition. |
| README.md | Project documentation and usage examples. |
| Makefile | Build + manifest generation targets (controller-gen). |
| .gitignore | Updated ignores (vendor, IDE configs). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for _, c := range pod.Status.Conditions { | ||
| if c.Type == cType { | ||
| return &c | ||
| } |
There was a problem hiding this comment.
GetCondition returns &c where c is the range loop variable, so the returned pointer does not reference the element in pod.Status.Conditions (it points to a copy that changes each iteration). Iterate by index and return &pod.Status.Conditions[i] instead.
| accessor, err := meta.Accessor(object) | ||
| if err != nil { | ||
| logger.Error(err, "parentObject does not have ObjectMeta") | ||
| return nil | ||
| } |
There was a problem hiding this comment.
If meta.Accessor(object) fails, createPods logs the error but returns nil, which reports a successful create even though the controller object couldn't be inspected. This should return the error (or wrap it) so the reconcile can fail/retry appropriately.
| if err := r.KubeClient.CoreV1().Pods(namespace).Delete(ctx, podID, metav1.DeleteOptions{}); err != nil { | ||
| if apierrors.IsNotFound(err) { | ||
| logger.V(4).Info("Pod has already been deleted.", "pod", klog.KRef(namespace, podID)) | ||
| return err | ||
| } |
There was a problem hiding this comment.
When the Pod is already deleted, DeletePod logs that fact but returns the NotFound error. This makes deletion idempotency harder for callers and can cause unnecessary reconcile errors; consider returning nil on apierrors.IsNotFound(err).
| for _, e := range c.Env { | ||
| if e.Name == workloadv1alpha1.ContainerLaunchBarrierEnvName { | ||
| p, _ := strconv.Atoi(e.ValueFrom.ConfigMapKeyRef.Key[priorityStartIndex:]) | ||
| return &p |
There was a problem hiding this comment.
getContainerPriority assumes e.ValueFrom and e.ValueFrom.ConfigMapKeyRef are non-nil and that Key is long enough, which can panic if the env var is misconfigured. Add nil/length checks before slicing/parsing and ignore/return nil when the env source doesn’t match the expected shape.
| if len(finalizers) > 0 { | ||
| var finalizersToDelete []map[string]interface{} | ||
| for _, f := range finalizers { | ||
| finalizersToDelete = append(finalizersToDelete, map[string]interface{}{ | ||
| "$patch": "delete", | ||
| "value": f, | ||
| }) | ||
| } | ||
| patch["metadata"].(map[string]interface{})["$deleteFromPrimitiveList/finalizers"] = finalizers | ||
| } |
There was a problem hiding this comment.
GenerateDeleteOwnerRefStrategicMergeBytes declares and populates finalizersToDelete but never uses it, which will fail compilation due to an unused variable. Either remove this variable entirely or use it as part of the patch structure (note that $deleteFromPrimitiveList/finalizers already uses finalizers).
| klog.ErrorS(err, "invalid range reserveOrdinal found, an empty slice will be returned", "reserveOrdinal", elem.StrVal) | ||
| return nil |
There was a problem hiding this comment.
On invalid range, the log says an empty result will be returned, but the function returns nil. Callers like reserveOrdinals.Has(...) will panic on a nil set; return sets.New[int]() (empty set) instead of nil to match the message and keep callers safe.
| klog.ErrorS(err, "invalid range reserveOrdinal found, an empty slice will be returned", "reserveOrdinal", elem.StrVal) | |
| return nil | |
| klog.ErrorS(err, "invalid range reserveOrdinal found, an empty set will be returned", "reserveOrdinal", elem.StrVal) | |
| return sets.New[int]() |
| delegatingClient, _ := client.New(cfg, client.Options{ | ||
| Scheme: mgr.GetScheme(), | ||
| Cache: &client.CacheOptions{ | ||
| Reader: mgr.GetCache(), | ||
| Unstructured: true, | ||
| }, | ||
| }) |
There was a problem hiding this comment.
NewClientFromManager ignores the error from client.New. If that call fails, this function can return a nil client and cause later panics that are hard to diagnose. Consider returning (client.Client, error) or panicking/logging fatally if the client cannot be constructed.
| delegatingClient, _ := client.New(cfg, client.Options{ | |
| Scheme: mgr.GetScheme(), | |
| Cache: &client.CacheOptions{ | |
| Reader: mgr.GetCache(), | |
| Unstructured: true, | |
| }, | |
| }) | |
| delegatingClient, err := client.New(cfg, client.Options{ | |
| Scheme: mgr.GetScheme(), | |
| Cache: &client.CacheOptions{ | |
| Reader: mgr.GetCache(), | |
| Unstructured: true, | |
| }, | |
| }) | |
| if err != nil { | |
| panic(fmt.Sprintf("failed to create client for %q: %v", name, err)) | |
| } |
| for _, p := range ps.strategy.WeightPriority { | ||
| selector, err := util.ValidatedLabelSelectorAsSelector(&p.MatchSelector) | ||
| if err != nil { | ||
| continue | ||
| } |
There was a problem hiding this comment.
ValidatedLabelSelectorAsSelector bypasses validation (per its docstring), but here it is called directly on p.MatchSelector and errors are silently ignored. This can make priority sorting behave incorrectly for invalid selectors; consider validating via metav1.LabelSelectorAsSelector (or returning/logging the error) rather than continuing silently.
| return requeueDuration.Get() | ||
| } | ||
|
|
||
| // Duration helps calculate the shortest non-zore duration to requeue |
There was a problem hiding this comment.
Typo in comment: “non-zore” should be “non-zero”.
| if cI != cJ { | ||
| return cI > cJ | ||
| } | ||
| return i <= j | ||
| }) |
There was a problem hiding this comment.
The sort.SliceStable less function must implement a strict weak ordering; return i <= j returns true when i == j, violating the comparator contract and can cause panics due to inconsistent comparison. Use return i < j (or just return false when counts are equal) to preserve stability without breaking ordering rules.
1c12214 to
75800e4
Compare
| } | ||
| for _, c := range pod.Status.Conditions { | ||
| if c.Type == cType { | ||
| return &c |
There was a problem hiding this comment.
GetCondition is returning the address of the range variable here, so the caller gets a pointer to a copy rather than the element in pod.Status.Conditions. That makes the returned pointer semantically wrong for any caller expecting to inspect or mutate the real condition entry. Please iterate by index and return &pod.Status.Conditions[i] instead.
| func GetPodContainerByName(cName string, pod *v1.Pod) *v1.Container { | ||
| for _, container := range pod.Spec.Containers { | ||
| if cName == container.Name { | ||
| return &container |
There was a problem hiding this comment.
Same issue here: this returns the address of the range variable, not the matching element in pod.Spec.Containers. Callers will get a pointer to a copy, so any later use that expects the real container object will be misleading. Please iterate by index and return &pod.Spec.Containers[i].
| }, | ||
| } | ||
| if len(finalizers) > 0 { | ||
| var finalizersToDelete []map[string]interface{} |
There was a problem hiding this comment.
finalizersToDelete is built up here but never used. In Go that is a compile error, so this needs to be removed or wired into the patch construction before the PR can be merged. Since $deleteFromPrimitiveList/finalizers is already populated below, it looks like the extra slice is just leftover code.
| } else { | ||
| start, end, err := ParseRange(elem.StrVal) | ||
| if err != nil { | ||
| klog.ErrorS(err, "invalid range reserveOrdinal found, an empty slice will be returned", "reserveOrdinal", elem.StrVal) |
There was a problem hiding this comment.
The behavior here does not match the contract in the log message: on an invalid range this returns nil, not an empty set. That makes the helper harder to use safely, because callers now need to defensively handle a nil set instead of just working with an empty result. Returning sets.New[int]() would keep the API consistent with the message and avoid nil-handling surprises.
| if cI != cJ { | ||
| return cI > cJ | ||
| } | ||
| return i <= j |
There was a problem hiding this comment.
The tie-breaker here is not a valid sort.SliceStable comparator: return i <= j is true when i == j, which breaks the strict ordering contract expected by the sort implementation. If you just want to preserve the existing order on equal counts, returning false here (or using i < j) would be safer.
|
@cheyang @Syspretor any update ? |
| for _, c := range pod.Status.Conditions { | ||
| if c.Type == cType { | ||
| return &c | ||
| } | ||
| } |
| var finalizersToDelete []map[string]interface{} | ||
| for _, f := range finalizers { | ||
| finalizersToDelete = append(finalizersToDelete, map[string]interface{}{ | ||
| "$patch": "delete", | ||
| "value": f, |
| body := fmt.Sprintf( | ||
| `{"metadata":{"labels":{"%s":"%s"}}}`, | ||
| workloadv1alpha1.SpecifiedDeleteKey, | ||
| value, | ||
| ) |
| func (rd *Duration) Merge(rd2 *Duration) { | ||
| rd2.Lock() | ||
| defer rd2.Unlock() | ||
| rd.UpdateWithMsg(rd2.duration, rd2.message) | ||
| } |
| delegatingClient, _ := client.New(cfg, client.Options{ | ||
| Scheme: mgr.GetScheme(), | ||
| Cache: &client.CacheOptions{ | ||
| Reader: mgr.GetCache(), | ||
| Unstructured: true, | ||
| }, | ||
| }) |
| finalizersHandler = fmt.Sprintf(`[%s]`, strings.TrimLeft(finalizersHandler, ",")) | ||
|
|
||
| body := fmt.Sprintf( | ||
| `{"metadata":{"labels":{"%s":"%s"%s},"annotations":{"%s":"%s"},"finalizers":%s}}`, | ||
| workloadv1alpha1.LifecycleStateKey, |
| keyField := val.FieldByName("key") | ||
| keyFieldPtr := (*string)(unsafe.Pointer(keyField.UnsafeAddr())) | ||
| *keyFieldPtr = key | ||
|
|
||
| opField := val.FieldByName("operator") |
| c := status.Conditions[i] | ||
| if c.Type == condType { | ||
| return &c |
No description provided.