Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions slice/config/dev/manager_config_patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@
- op: add
path: /spec/template/spec/containers/0/args/0
value: --retry-delay-on-slice-failure=0s
- op: add
path: /spec/template/spec/containers/0/args/0
value: --feature-gates=FailOnUntoleratedDegradedSlice=true
117 changes: 96 additions & 21 deletions slice/internal/controller/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,30 +806,45 @@ func (r *WorkloadReconciler) syncAdmissionCheckStatus(ctx context.Context, wl *k
return nil
}

func calculateEffectiveSliceCounts(slicesByState map[core.SliceState][]v1beta1.Slice, wl *kueue.Workload, podSetRequiresHealthy map[string]bool) (int, int) {
effectiveActiveCount := len(slicesByState[core.SliceStateActive])
effectiveFailedCount := len(slicesByState[core.SliceStateFailed])

if features.Enabled(features.FailOnUntoleratedDegradedSlice) {
for _, slice := range slicesByState[core.SliceStateActiveDegraded] {
psName := slice.Annotations[core.OwnerPodSetNameAnnotation]
if healthySliceRequired(psName, podSetRequiresHealthy, wl) {
effectiveFailedCount++
} else {
effectiveActiveCount++
}
}
} else {
effectiveActiveCount += len(slicesByState[core.SliceStateActiveDegraded])
}
return effectiveActiveCount, effectiveFailedCount
}

func (r *WorkloadReconciler) prepareAdmissionCheckStatus(ctx context.Context, wl *kueue.Workload, ac *kueue.AdmissionCheckState, slices []v1beta1.Slice, desiredSlicesCount int) {
log := ctrl.LoggerFrom(ctx).V(2)
// wait for Kueue to reset check to Pending after eviction
if ac.State == kueue.CheckStateRetry {
return
}
slicesByState := core.GroupSlicesByState(slices, r.activationTimeout)
podSetRequiresHealthy := make(map[string]bool)
if features.Enabled(features.FailOnUntoleratedDegradedSlice) {
for _, ps := range wl.Spec.PodSets {
podSetRequiresHealthy[string(ps.Name)] = podSetRequestedOnlyHealthySlices(ps)
}
}
effectiveActiveCount, effectiveFailedCount := calculateEffectiveSliceCounts(slicesByState, wl, podSetRequiresHealthy)

switch {
case desiredSlicesCount == len(slicesByState[core.SliceStateActive])+len(slicesByState[core.SliceStateActiveDegraded]):
case desiredSlicesCount == effectiveActiveCount:
ac.State = kueue.CheckStateReady
var podSetUpdates []kueue.PodSetUpdate
for _, ps := range wl.Spec.PodSets {
if topology := core.GetTPUTopology(ps.Template); topology != "" {
podSetUpdates = append(podSetUpdates, kueue.PodSetUpdate{
Name: ps.Name,
NodeSelector: map[string]string{
core.TPUTopologyAnnotation: topology,
},
})
}
}
ac.PodSetUpdates = podSetUpdates
case len(slicesByState[core.SliceStateFailed]) > 0:
ac.PodSetUpdates = buildPodSetUpdates(wl)
case effectiveFailedCount > 0:
ac.State = kueue.CheckStateRetry
ac.RequeueAfterSeconds = ptr.To(int32(r.retryDelayOnSliceFailure.Round(time.Second).Seconds()))
case (features.Enabled(features.UseRetryMechanismForSliceCreation) && len(slicesByState[core.SliceStateStale]) > 0):
Expand All @@ -844,29 +859,89 @@ func (r *WorkloadReconciler) prepareAdmissionCheckStatus(ctx context.Context, wl
default:
ac.State = kueue.CheckStatePending
}
ac.Message = buildAdmissionCheckMessage(slicesByState, effectiveFailedCount, wl, podSetRequiresHealthy)
}

func buildPodSetUpdates(wl *kueue.Workload) []kueue.PodSetUpdate {
var podSetUpdates []kueue.PodSetUpdate
for _, ps := range wl.Spec.PodSets {
if topology := core.GetTPUTopology(ps.Template); topology != "" {
podSetUpdates = append(podSetUpdates, kueue.PodSetUpdate{
Name: ps.Name,
NodeSelector: map[string]string{
core.TPUTopologyAnnotation: topology,
},
})
}
}
return podSetUpdates
}

func buildAdmissionCheckMessage(slicesByState map[core.SliceState][]v1beta1.Slice, effectiveFailedCount int, wl *kueue.Workload, podSetRequiresHealthy map[string]bool) string {
var stateMessages []string
for _, state := range core.SliceStates {
if count := len(slicesByState[state]); count > 0 {
stateMessages = append(stateMessages, fmt.Sprintf("%d %s", count, state))
}
}

if len(stateMessages) > 0 {
ac.Message = fmt.Sprintf("Slices are in states: %s", strings.Join(stateMessages, ", "))
var message string
if len(stateMessages) == 0 {
message = "Waiting for Slices to be created"
} else {
ac.Message = "Waiting for Slices to be created"
message = fmt.Sprintf("Slices are in states: %s", strings.Join(stateMessages, ", "))
}

if len(slicesByState[core.SliceStateFailed]) > 0 {
if effectiveFailedCount > 0 {
var errMessages []string
for _, slice := range slicesByState[core.SliceStateFailed] {
cond := meta.FindStatusCondition(slice.Status.Conditions, v1beta1.SliceStateConditionType)
errMessages = append(errMessages, cond.Message)
if cond != nil {
errMessages = append(errMessages, cond.Message)
}
}
ac.Message += ". Errors: " + strings.Join(errMessages, "; ")
if features.Enabled(features.FailOnUntoleratedDegradedSlice) {
for _, slice := range slicesByState[core.SliceStateActiveDegraded] {
psName := slice.Annotations[core.OwnerPodSetNameAnnotation]
if !healthySliceRequired(psName, podSetRequiresHealthy, wl) {
continue
}
if cond := meta.FindStatusCondition(slice.Status.Conditions, v1beta1.SliceStateConditionType); cond != nil {
errMessages = append(errMessages, fmt.Sprintf("%s (degraded)", cond.Message))
}
}
}
message += ". Errors: " + strings.Join(errMessages, "; ")
}
ac.Message = api.TruncateConditionMessage(ac.Message)
return api.TruncateConditionMessage(message)
}

// healthySliceRequired returns true if the given podset requires healthy slice
// The second part of the condition (psName == "") is for backward
// compatibility for slices created before the OwnerPodSetNameAnnotation was introduced.
func healthySliceRequired(psName string, podSetRequiresHealthy map[string]bool, wl *kueue.Workload) bool {
if psName != "" {
return podSetRequiresHealthy[psName]
}
return anyPodSetRequestedOnlyHealthySlices(wl)
}

func anyPodSetRequestedOnlyHealthySlices(wl *kueue.Workload) bool {
for _, ps := range wl.Spec.PodSets {
// if a least one podset requested only healthy
if podSetRequestedOnlyHealthySlices(ps) {
return true
}
}
return false
}

func podSetRequestedOnlyHealthySlices(ps kueue.PodSet) bool {
if v, ok := ps.Template.Spec.NodeSelector[core.TPUSliceHealthNodeSelectorKey]; ok {
return v == core.TPUSliceHealthNodeSelectorHealthy
}

return !core.NodeAffinityAllowsValue(ps.Template.Spec.Affinity, core.TPUSliceHealthNodeSelectorKey, core.TPUSliceHealthNodeSelectorDegraded)
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
Loading
Loading