Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,7 @@ func (r *Reconciler) reconcileKafkaPod(log logr.Logger, desiredPod *corev1.Pod,
return errorfactory.New(errorfactory.APIFailure{}, err, "getting resource failed", "kind", desiredType)
}
switch {
//initial run - Create Pod
case len(podList.Items) == 0:
if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(desiredPod); err != nil {
return errors.WrapIf(err, "could not apply last state to annotation")
Expand Down Expand Up @@ -935,6 +936,37 @@ func (r *Reconciler) updateStatusWithDockerImageAndVersion(brokerId int32, broke
return nil
}

// syncResourceRequests overwrites CPU and memory requests in desiredPod's containers
// with the values from currentPod so that request-only changes do not trigger a pod restart.
func syncResourceRequests(desiredPod, currentPod *corev1.Pod) {
syncContainerResourceRequests(desiredPod.Spec.Containers, currentPod.Spec.Containers)
syncContainerResourceRequests(desiredPod.Spec.InitContainers, currentPod.Spec.InitContainers)
}

func syncContainerResourceRequests(desired, current []corev1.Container) {
index := make(map[string]corev1.ResourceList, len(current))
for _, c := range current {
index[c.Name] = c.Resources.Requests
}
for i := range desired {
c := &desired[i]
reqs, ok := index[c.Name]
if !ok {
continue
}
if c.Resources.Requests == nil {
c.Resources.Requests = make(corev1.ResourceList)
}
for _, res := range []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory} {
if val, exists := reqs[res]; exists {
c.Resources.Requests[res] = val
} else {
delete(c.Resources.Requests, res)
}
}
}
}

//gocyclo:ignore
func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPod *corev1.Pod, desiredType reflect.Type) error {
// Since toleration does not support patchStrategy:"merge,retainKeys",
Expand All @@ -951,6 +983,8 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
}
desiredPod.Spec.Tolerations = uniqueTolerations
}
// Ignore CPU/memory request diffs — changing requests does not require a pod restart.
syncResourceRequests(desiredPod, currentPod)
// Check if the resource actually updated or if labels match TaintedBrokersSelector
patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod)
switch {
Expand Down
Loading