Skip to content

Extend CacheRuntime phase 2.4: add dataset related labels to nodes and support app pod affinity#5836

Open
xliuqq wants to merge 2 commits into
fluid-cloudnative:masterfrom
xliuqq:cache_affinity
Open

Extend CacheRuntime phase 2.4: add dataset related labels to nodes and support app pod affinity#5836
xliuqq wants to merge 2 commits into
fluid-cloudnative:masterfrom
xliuqq:cache_affinity

Conversation

@xliuqq
Copy link
Copy Markdown
Collaborator

@xliuqq xliuqq commented May 7, 2026

mark thin runtime reference not support cache runtime

Ⅰ. Describe what this PR does

add dataset related labels to nodes and support app pod affinity.

Ⅱ. Does this pull request fix one issue?

part of #5412

Ⅲ. List the added test cases (unit test/integration test) if any, please explain if no tests are needed.

Ⅳ. Describe how to verify it

Ⅴ. Special notes for reviews

mark thin runtime reference not support cache runtime

Signed-off-by: xliuqq <xlzq1992@gmail.com>
@fluid-e2e-bot
Copy link
Copy Markdown

fluid-e2e-bot Bot commented May 7, 2026

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors runtime status management by introducing the RuntimeStatusAccessor interface and unifies worker pod retrieval via the GetWorkerPods method. It also implements node affinity support for cache components and updates the cache engine's synchronization and shutdown processes. Feedback highlights a critical bug where an error from e.transform is ignored, resulting in potential panics. Additionally, the reviewer noted code duplication in the thin runtime package that should be resolved by exporting helper functions from the base package, and pointed out the need to restore test coverage for the refactored status retrieval logic.

I am having trouble creating individual review comments. Click here to see my feedback.

pkg/ddc/cache/engine/sync.go (54-56)

high

The error returned by e.transform is ignored because the err variable is immediately overwritten by the result of e.CheckAndUpdateRuntimeStatus. If transform fails, runtimeValue might be invalid, which could lead to panics or incorrect status updates in the subsequent call.

	runtimeValue, err := e.transform(dataset, runtime, runtimeClass)
	if err != nil {
		return err
	}
	// TODO: use different struct for input parameter to avoid fully transform
	_, err = e.CheckAndUpdateRuntimeStatus(runtimeValue)

pkg/ddc/base/runtime.go (646-647)

medium

Consider exporting this function as GetDDCRuntimeStatus. This would allow other packages, such as pkg/ddc/thin/referencedataset, to reuse this logic instead of duplicating the entire switch statement for DDC-based runtimes.

// GetDDCRuntimeStatus retrieves the runtime object based on runtime type for DDC-based runtimes
func GetDDCRuntimeStatus(client client.Client, runtimeType, name, namespace string) (*datav1alpha1.RuntimeStatus, error) {

pkg/ddc/base/runtime.go (630)

medium

Update the call to use the exported function name if the suggestion to export getDDCRuntime is accepted.

		status, err := GetDDCRuntimeStatus(client, runtimeType, name, namespace)

pkg/ddc/thin/referencedataset/runtime.go (49)

medium

Instead of using a local duplicated version of the runtime status retrieval logic, use the (suggested) exported function from the base package. This improves maintainability by centralizing the mapping of runtime types to their status objects.

	return base.GetDDCRuntimeStatus(e.Client, physicalRuntimeInfo.GetRuntimeType(),

pkg/ddc/thin/referencedataset/runtime.go (56-104)

medium

This function is a duplicate of the logic in pkg/ddc/base/runtime.go. It should be removed in favor of using an exported helper from the base package to avoid code duplication and maintenance overhead when new runtimes are added.

pkg/ddc/base/runtime_test.go (1163-1382)

medium

The tests for GetRuntimeStatus have been removed. While the function itself was refactored into GetRuntimeStatusAccessor, the test coverage should be maintained. Please update these tests to verify the new accessor and its implementations (DDCRuntimeStatusAccessor and CacheRuntimeStatusAccessor) instead of deleting them.

pkg/ddc/base/runtime.go (642)

medium

The error message mentions runtimeInfo, but the function GetRuntimeStatusAccessor is intended to return a status accessor. Updating the message to 'fail to get runtime status accessor' would be more accurate.

		return nil, fmt.Errorf("fail to get runtime status accessor for runtime type: %s", runtimeType)

@codecov
Copy link
Copy Markdown

codecov Bot commented May 7, 2026

Codecov Report

❌ Patch coverage is 27.64228% with 89 lines in your changes missing coverage. Please review.
✅ Project coverage is 58.90%. Comparing base (fc9a6d9) to head (de28086).
⚠️ Report is 7 commits behind head on master.

Files with missing lines Patch % Lines
pkg/ddc/base/runtime.go 48.78% 20 Missing and 1 partial ⚠️
pkg/ddc/cache/engine/sync.go 0.00% 21 Missing ⚠️
pkg/ddc/cache/engine/runtime.go 0.00% 15 Missing ⚠️
pkg/ddc/cache/component/daemonset_manager.go 0.00% 7 Missing ⚠️
pkg/ddc/cache/component/statefulset_manager.go 30.00% 7 Missing ⚠️
pkg/ddc/cache/engine/shutdown.go 0.00% 6 Missing ⚠️
pkg/ddc/cache/engine/status.go 0.00% 6 Missing ⚠️
.../nodeaffinitywithcache/node_affinity_with_cache.go 40.00% 4 Missing and 2 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #5836      +/-   ##
==========================================
- Coverage   59.10%   58.90%   -0.20%     
==========================================
  Files         480      480              
  Lines       32511    32599      +88     
==========================================
- Hits        19215    19202      -13     
- Misses      11746    11844      +98     
- Partials     1550     1553       +3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: xliuqq <xlzq1992@gmail.com>
@xliuqq xliuqq force-pushed the cache_affinity branch from 7c9fa7e to de28086 Compare May 7, 2026 12:17
@sonarqubecloud
Copy link
Copy Markdown

sonarqubecloud Bot commented May 7, 2026

@xliuqq xliuqq marked this pull request as ready for review May 8, 2026 11:14
@xliuqq xliuqq requested a review from cheyang May 8, 2026 11:15
@cheyang
Copy link
Copy Markdown
Collaborator

cheyang commented May 12, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a unified interface for accessing runtime status and worker pods across various Fluid runtime types. Key changes include the addition of the RuntimeStatusAccessor interface, the implementation of GetWorkerPods for retrieving worker pod information, and the integration of node affinity tracking for CacheRuntime. The feedback suggests correcting the documentation for GetWorkerPods, handling "not found" errors gracefully when fetching Kubernetes resources to prevent reconciliation loops from failing prematurely, and ensuring RuntimeClassName is validated before use.

Comment thread pkg/ddc/base/runtime.go
type RuntimeInfoInterface interface {
Conventions

// GetWorkerPods returns the worker object and selector for runtime workers.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment for GetWorkerPods is inaccurate as it mentions returning a "worker object and selector", but the method signature returns a slice of pods.

Suggested change
// GetWorkerPods returns the worker object and selector for runtime workers.
// GetWorkerPods returns the pods for runtime workers.

Comment thread pkg/ddc/base/runtime.go
Comment on lines +382 to +385
workers, err := kubeclient.GetStatefulSet(client, info.GetWorkerStatefulsetName(), info.GetNamespace())
if err != nil {
return nil, err
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If the worker StatefulSet is not found, GetWorkerPods should return an empty list and no error to avoid failing the reconciliation loop during initial setup. Using utils.IgnoreNotFound is a clean way to handle this.

Suggested change
workers, err := kubeclient.GetStatefulSet(client, info.GetWorkerStatefulsetName(), info.GetNamespace())
if err != nil {
return nil, err
}
workers, err := kubeclient.GetStatefulSet(client, info.GetWorkerStatefulsetName(), info.GetNamespace())
if err != nil {
return nil, utils.IgnoreNotFound(err)
}

Comment on lines +53 to +56
ds, err := kubeclient.GetDaemonset(s.client, component.Name, component.Namespace)
if err != nil {
return nil, err
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Handle the case where the DaemonSet is not found gracefully by returning nil, nil instead of an error. This prevents the status update from failing when the component hasn't been created yet.

Suggested change
ds, err := kubeclient.GetDaemonset(s.client, component.Name, component.Namespace)
if err != nil {
return nil, err
}
ds, err := kubeclient.GetDaemonset(s.client, component.Name, component.Namespace)
if err != nil {
return nil, utils.IgnoreNotFound(err)
}

Comment on lines +53 to +56
sts, err := kubeclient.GetStatefulSet(s.client, component.Name, component.Namespace)
if err != nil {
return nil, err
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Handle the case where the StatefulSet is not found gracefully by returning nil, nil instead of an error to avoid unnecessary reconciliation failures.

Suggested change
sts, err := kubeclient.GetStatefulSet(s.client, component.Name, component.Namespace)
if err != nil {
return nil, err
}
sts, err := kubeclient.GetStatefulSet(s.client, component.Name, component.Namespace)
if err != nil {
return nil, utils.IgnoreNotFound(err)
}

Comment on lines +41 to +44
workers, err := kubeclient.GetStatefulSet(client, workerName, info.GetNamespace())
if err != nil {
return nil, err
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Handle the case where the worker StatefulSet is not found gracefully to avoid reconciliation errors during the initial setup phase.

Suggested change
workers, err := kubeclient.GetStatefulSet(client, workerName, info.GetNamespace())
if err != nil {
return nil, err
}
workers, err := kubeclient.GetStatefulSet(client, workerName, info.GetNamespace())
if err != nil {
return nil, utils.IgnoreNotFound(err)
}

@@ -18,13 +18,15 @@ package engine

import (
"context"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Add fmt to imports for error formatting in the Sync method.

Suggested change
"context"
"context"
"fmt"

return err
}
dataset := ctx.Dataset
runtimeClass, err := e.getRuntimeClass(runtime.Spec.RuntimeClassName)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It's safer to check if RuntimeClassName is set before attempting to fetch the CacheRuntimeClass to provide a clearer error message if it's missing.

Suggested change
runtimeClass, err := e.getRuntimeClass(runtime.Spec.RuntimeClassName)
if runtime.Spec.RuntimeClassName == "" {
return fmt.Errorf("runtimeClassName is not set in CacheRuntime %s/%s", runtime.Namespace, runtime.Name)
}
runtimeClass, err := e.getRuntimeClass(runtime.Spec.RuntimeClassName)

@cheyang
Copy link
Copy Markdown
Collaborator

cheyang commented May 12, 2026

Review of PR #5836 — CacheRuntime phase 2.4: add dataset-related labels to nodes and support app pod affinity.

The PR makes several well-structured changes: (1) RuntimeStatusAccessor abstraction to unify DDC and Cache runtime status access for the affinity webhook, (2) GetWorkerPods interface to decouple node labeling from direct StatefulSet access, (3) CacheRuntime status sync + SyncScheduleInfoToCacheNodes integration, (4) proper shutdown cleanup of dataset labels, (5) ThinRuntime reference now explicitly excludes CacheRuntime.

One blocking concern: CacheRuntimeInfo.GetWorkerPods duplicates the naming logic that already exists in base.RuntimeInfo.GetWorkerPods, using GetComponentName vs GetWorkerStatefulsetName. These two paths may produce different worker names depending on runtime type suffix conventions, creating a maintenance risk when conventions evolve.

Non-blocking: ctx.Dataset nil-safety in Sync, GetNodeAffinity API call overhead, full-transform-as-input pattern, ThinRuntime exclusion lacks user-facing validation/docs, and Shutdown TearDownWorkers error handling.

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CacheRuntimeInfo.GetWorkerPods computes the worker StatefulSet name as GetComponentName(info.GetName(), common.ComponentTypeWorker), while base.RuntimeInfo.GetWorkerPods uses runtimeInfo.GetWorkerStatefulsetName(). These may produce different names depending on the runtime type suffix convention.

For CacheRuntime, the worker StatefulSet name is <name>-cache-worker (derived from the cache engine's naming convention via GetComponentName), but GetWorkerStatefulsetName() on the embedded RuntimeInfo may return a different suffix depending on how BuildRuntimeInfo was configured.

Since CacheRuntimeInfo embeds the base RuntimeInfoInterface and delegates all other methods, it's unclear which naming convention is authoritative. If the embedded RuntimeInfo was built with common.CacheRuntime as runtimeType, GetWorkerStatefulsetName() would also compute the suffix — but the two code paths use completely different logic to derive the name, creating a maintenance risk.

Recommendation: remove the duplicate GetWorkerPods implementation from CacheRuntimeInfo and let it delegate to the embedded base.RuntimeInfo.GetWorkerPods(), or at minimum ensure both paths produce the same name.

@@ -33,6 +35,11 @@ func (e *CacheEngine) Sync(ctx cruntime.ReconcileRequestContext) (err error) {
if err != nil {
return err
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sync now uses ctx.Dataset directly to call e.transform(dataset, runtime, runtimeClass). If the ReconcileRequestContext is constructed without a Dataset (e.g., before the dataset is bound), this will panic or produce incorrect behavior. Other DDC engines typically check for nil dataset or handle the pre-binding case explicitly.

Consider adding a nil check for ctx.Dataset before proceeding with the transform and status sync, similar to how other runtime engines handle the unbound case.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not blocking, it will be fixed in the same issue "use different struct for input parameter to avoid fully transform". #5836 (comment)

}

func (s *StatefulSetManager) GetNodeAffinity(component *common.CacheRuntimeComponentValue) (*corev1.NodeAffinity, error) {
sts, err := kubeclient.GetStatefulSet(s.client, component.Name, component.Namespace)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetNodeAffinity calls kubeclient.GetStatefulSet for every status update cycle. Since the StatefulSet spec (nodeSelector + affinity) rarely changes, this adds unnecessary API server load. Consider caching the affinity or reading it from the runtime value instead of re-fetching on every reconcile.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add an issue to track it. Does cache runtime support to change Worker Affinify ? If so, the affinity shoule be generated according to the runtime and dataset spec .

_, err = e.CheckAndUpdateRuntimeStatus(runtimeValue)
if err != nil {
return err
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TODO comment "use different struct for input parameter to avoid fully transform" is correct — calling e.transform() + CheckAndUpdateRuntimeStatus() on every Sync cycle forces a full value computation even when only the status needs updating. This is wasteful. While not blocking, it should be tracked with an issue link rather than just a TODO.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add an issue to track it.

)

// getPhysicalDatasetRuntimeStatus get the runtime status of the physical dataset
// Note: This function only supports DDC-based runtimes (Alluxio, Jindo, etc.)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment "CacheRuntime is not supported because its status structure is incompatible with ThinRuntime" makes it clear that CacheRuntime cannot be used as a physical dataset runtime for ThinRuntime reference. However, there is no user-facing documentation or validation to prevent users from attempting this configuration. Without a webhook validation or clear docs, users may configure a ThinRuntime pointing to a CacheRuntime-backed Dataset and get a cryptic runtime error.

Consider adding: (1) a validation in the ThinRuntime webhook or controller that rejects CacheRuntime as physicalRuntimeType, or (2) documentation that explicitly lists supported runtime types for ThinRuntime reference.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add an issue to track it.

Copy link
Copy Markdown
Collaborator

@cheyang cheyang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm
/approve

Non-blocking naming concern noted in review; addressed in separate follow-up acceptable.

@fluid-e2e-bot
Copy link
Copy Markdown

fluid-e2e-bot Bot commented May 12, 2026

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: cheyang

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants