Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,9 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
if err != nil {
return err
}
brokerStatus[broker.Id] = brokerConfig
if r.brokerNeedsVersionUpdate(broker.Id, brokerConfig) {
brokerStatus[broker.Id] = brokerConfig
}

// If dynamic configs can not be set then let the loop continue to the next broker,
// after the loop we return error. This solves that case when other brokers could get healthy,
Expand Down Expand Up @@ -921,6 +923,14 @@ func (r *Reconciler) reconcileKafkaPod(log logr.Logger, desiredPod *corev1.Pod,
return nil
}

// brokerNeedsVersionUpdate returns true when the broker's image/version status is absent,
// incomplete, or stale relative to the desired image — i.e. a JMX fetch is warranted.
func (r *Reconciler) brokerNeedsVersionUpdate(brokerID int32, brokerConfig *banzaiv1beta1.BrokerConfig) bool {
desiredImage := util.GetBrokerImage(brokerConfig, r.KafkaCluster.Spec.GetClusterImage())
state, ok := r.KafkaCluster.Status.BrokersState[strconv.Itoa(int(brokerID))]
return !ok || state.Version == "" || state.Image != desiredImage
}

type brokerVersionResult struct {
brokerID int32
kafkaVersion *banzaiv1beta1.KafkaVersion
Expand Down
104 changes: 104 additions & 0 deletions pkg/resources/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1986,3 +1986,107 @@ func TestGetBrokerAzMap(t *testing.T) {
})
}
}

func TestBrokerNeedsVersionUpdate(t *testing.T) {
t.Parallel()
const clusterImage = "apache/kafka:3.4.0"
const updatedImage = "apache/kafka:3.5.0"

testCases := []struct {
testName string
brokerID int32
brokerConfig v1beta1.BrokerConfig
clusterImage string
brokersState map[string]v1beta1.BrokerState
expected bool
}{
{
testName: "no existing status entry triggers update",
brokerID: 0,
brokerConfig: v1beta1.BrokerConfig{},
clusterImage: clusterImage,
brokersState: map[string]v1beta1.BrokerState{},
expected: true,
},
{
testName: "status entry with empty version triggers update",
brokerID: 0,
brokerConfig: v1beta1.BrokerConfig{},
clusterImage: clusterImage,
brokersState: map[string]v1beta1.BrokerState{
"0": {Image: clusterImage, Version: ""},
},
expected: true,
},
{
testName: "status entry with different image triggers update",
brokerID: 0,
brokerConfig: v1beta1.BrokerConfig{},
clusterImage: updatedImage,
brokersState: map[string]v1beta1.BrokerState{
"0": {Image: clusterImage, Version: "3.4.0"},
},
expected: true,
},
{
testName: "status up to date with cluster image skips update",
brokerID: 0,
brokerConfig: v1beta1.BrokerConfig{},
clusterImage: clusterImage,
brokersState: map[string]v1beta1.BrokerState{
"0": {Image: clusterImage, Version: "3.4.0"},
},
expected: false,
},
{
testName: "broker-level image override used instead of cluster image",
brokerID: 0,
brokerConfig: v1beta1.BrokerConfig{Image: "apache/kafka:3.4.1"},
clusterImage: clusterImage,
brokersState: map[string]v1beta1.BrokerState{
"0": {Image: "apache/kafka:3.4.1", Version: "3.4.1"},
},
expected: false,
},
{
testName: "broker-level image override differs from recorded image triggers update",
brokerID: 0,
brokerConfig: v1beta1.BrokerConfig{Image: "apache/kafka:3.5.0"},
clusterImage: clusterImage,
brokersState: map[string]v1beta1.BrokerState{
"0": {Image: "apache/kafka:3.4.1", Version: "3.4.1"},
},
expected: true,
},
{
testName: "correct state for one broker does not suppress update for another",
brokerID: 1,
brokerConfig: v1beta1.BrokerConfig{},
clusterImage: clusterImage,
brokersState: map[string]v1beta1.BrokerState{
"0": {Image: clusterImage, Version: "3.4.0"},
},
expected: true,
},
}

for _, test := range testCases {
test := test
t.Run(test.testName, func(t *testing.T) {
t.Parallel()
r := Reconciler{
Reconciler: resources.Reconciler{
KafkaCluster: &v1beta1.KafkaCluster{
Spec: v1beta1.KafkaClusterSpec{
ClusterImage: test.clusterImage,
},
Status: v1beta1.KafkaClusterStatus{
BrokersState: test.brokersState,
},
},
},
}
assert.Equal(t, test.expected, r.brokerNeedsVersionUpdate(test.brokerID, &test.brokerConfig))
})
}
}
Loading