Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package v1beta1

import (
"fmt"
"sort"
"strconv"
"strings"

Expand Down Expand Up @@ -46,6 +47,9 @@ const (
// ProcessRolesKey is used to identify which process roles the Kafka pod has
ProcessRolesKey = "processRoles"

// PvcRolesKey is used to identify which process roles a PVC serves (broker, controller, or broker_controller)
PvcRolesKey = "pvcRoles"

// IsBrokerNodeKey is used to identify if the kafka pod is either a broker or a broker_controller
IsBrokerNodeKey = "isBrokerNode"

Expand Down Expand Up @@ -1102,12 +1106,23 @@ func (bConfig *BrokerConfig) GetBrokerAnnotations() map[string]string {
return util.CloneMap(bConfig.BrokerAnnotations)
}

// processRolesValue returns the joined role string used for both the processRoles pod label
// and the pvcRoles PVC label. In KRaft mode the roles are joined with "_"; in ZK mode always "broker".
func (bConfig *BrokerConfig) processRolesValue(kRaftMode bool) string {
if kRaftMode {
roles := append([]string(nil), bConfig.Roles...)
sort.Strings(roles)
return strings.Join(roles, "_")
}
return BrokerNodeProcessRole
}

// GetBrokerLabels returns the labels that are applied to broker pods
func (bConfig *BrokerConfig) GetBrokerLabels(kafkaClusterName string, brokerId int32, kRaftMode bool) map[string]string {
var kraftLabels map[string]string
if kRaftMode {
kraftLabels = map[string]string{
ProcessRolesKey: strings.Join(bConfig.Roles, "_"),
ProcessRolesKey: bConfig.processRolesValue(kRaftMode),
IsControllerNodeKey: fmt.Sprintf("%t", bConfig.IsControllerNode()),
IsBrokerNodeKey: fmt.Sprintf("%t", bConfig.IsBrokerNode()),
}
Expand Down Expand Up @@ -1214,6 +1229,13 @@ func (bConfig *BrokerConfig) IsCombinedNode() bool {
return bConfig.IsBrokerNode() && bConfig.IsControllerNode()
}

// GetPvcRolesLabelValue returns the value for the pvcRoles label on PVCs.
// In KRaft mode the value mirrors processRoles (e.g. "broker", "controller", "broker_controller").
// In ZooKeeper mode all nodes are brokers, so the value is always "broker".
func (bConfig *BrokerConfig) GetPvcRolesLabelValue(kRaftMode bool) string {
return bConfig.processRolesValue(kRaftMode)
}

// GetResources returns the broker specific Kubernetes resource
func (bConfig *BrokerConfig) GetResources() *corev1.ResourceRequirements {
if bConfig.Resources != nil {
Expand Down
2 changes: 1 addition & 1 deletion api/v1beta1/kafkacluster_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func TestGetBrokerLabels(t *testing.T) {
BrokerIdLabelKey: strconv.Itoa(expectedBrokerId),
KafkaCRLabelKey: expectedKafkaCRName,
"test_label_key": "test_label_value",
ProcessRolesKey: "controller_broker",
ProcessRolesKey: "broker_controller",
IsBrokerNodeKey: "true",
IsControllerNodeKey: "true",
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
if storage.PvcSpec == nil {
continue
}
o, err := r.pvc(broker.Id, index, storage)
o, err := r.pvc(broker.Id, index, storage, brokerConfig, r.KafkaCluster.Spec.KRaftMode)
if err != nil {
return errors.WrapIfWithDetails(err, "failed to generate resource", "resources", "PersistentVolumeClaim")
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/resources/kafka/pvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/Masterminds/sprig/v3"
)

func (r *Reconciler) pvc(brokerId int32, storageIndex int, storage v1beta1.StorageConfig) (*corev1.PersistentVolumeClaim, error) {
func (r *Reconciler) pvc(brokerId int32, storageIndex int, storage v1beta1.StorageConfig, brokerConfig *v1beta1.BrokerConfig, kRaftMode bool) (*corev1.PersistentVolumeClaim, error) {
errCtx := []interface{}{v1beta1.BrokerIdLabelKey, brokerId, "mountPath", storage.MountPath}

pvcSpecYaml, err := yaml.Marshal(storage.PvcSpec)
Expand Down Expand Up @@ -67,7 +67,10 @@ func (r *Reconciler) pvc(brokerId int32, storageIndex int, storage v1beta1.Stora
fmt.Sprintf(brokerStorageTemplate, r.KafkaCluster.Name, brokerId, storageIndex),
apiutil.MergeLabels(
apiutil.LabelsForKafka(r.KafkaCluster.Name),
map[string]string{v1beta1.BrokerIdLabelKey: fmt.Sprintf("%d", brokerId)},
map[string]string{
v1beta1.BrokerIdLabelKey: fmt.Sprintf("%d", brokerId),
v1beta1.PvcRolesKey: brokerConfig.GetPvcRolesLabelValue(kRaftMode),
},
),
map[string]string{"mountPath": storage.MountPath}, r.KafkaCluster),
Spec: pvcSpec,
Expand Down
104 changes: 99 additions & 5 deletions pkg/resources/kafka/pvc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,19 @@ func TestReconciler_pvc(t *testing.T) {
},
}

brokerConfig := &v1beta1.BrokerConfig{}

testCases := []struct {
testName string
brokerConfig *v1beta1.BrokerConfig
kRaftMode bool
storageConfig v1beta1.StorageConfig
expectedPersistentVolumeClaim *corev1.PersistentVolumeClaim
}{
{
testName: "storage config with no template",
testName: "storage config with no template",
brokerConfig: brokerConfig,
kRaftMode: false,
storageConfig: v1beta1.StorageConfig{
MountPath: "/kafka-logs-1",
PvcSpec: &corev1.PersistentVolumeClaimSpec{
Expand All @@ -71,6 +77,7 @@ func TestReconciler_pvc(t *testing.T) {
v1beta1.AppLabelKey: "kafka",
v1beta1.KafkaCRLabelKey: kafkaCluster.GetName(),
v1beta1.BrokerIdLabelKey: "2",
v1beta1.PvcRolesKey: "broker",
},
Annotations: map[string]string{
"mountPath": "/kafka-logs-1",
Expand All @@ -84,7 +91,9 @@ func TestReconciler_pvc(t *testing.T) {
},
},
{
testName: "storage config with template",
testName: "storage config with template",
brokerConfig: brokerConfig,
kRaftMode: false,
storageConfig: v1beta1.StorageConfig{
MountPath: "/kafka-logs-1",
PvcSpec: &corev1.PersistentVolumeClaimSpec{
Expand All @@ -108,6 +117,7 @@ func TestReconciler_pvc(t *testing.T) {
v1beta1.AppLabelKey: "kafka",
v1beta1.KafkaCRLabelKey: kafkaCluster.GetName(),
v1beta1.BrokerIdLabelKey: "2",
v1beta1.PvcRolesKey: "broker",
},
Annotations: map[string]string{
"mountPath": "/kafka-logs-1",
Expand All @@ -125,7 +135,9 @@ func TestReconciler_pvc(t *testing.T) {
},
},
{
testName: "storage config with template and very long mount path",
testName: "storage config with template and very long mount path",
brokerConfig: brokerConfig,
kRaftMode: false,
storageConfig: v1beta1.StorageConfig{
MountPath: "/mountpath/that/exceeds63characters/kafka-logs-123456789123456789",
PvcSpec: &corev1.PersistentVolumeClaimSpec{
Expand All @@ -147,6 +159,7 @@ func TestReconciler_pvc(t *testing.T) {
v1beta1.AppLabelKey: "kafka",
v1beta1.KafkaCRLabelKey: kafkaCluster.GetName(),
v1beta1.BrokerIdLabelKey: "2",
v1beta1.PvcRolesKey: "broker",
},
Annotations: map[string]string{
"mountPath": "/mountpath/that/exceeds63characters/kafka-logs-123456789123456789",
Expand All @@ -164,7 +177,9 @@ func TestReconciler_pvc(t *testing.T) {
},
},
{
testName: "storage config with volume name template",
testName: "storage config with volume name template",
brokerConfig: brokerConfig,
kRaftMode: false,
storageConfig: v1beta1.StorageConfig{
MountPath: "/kafka-logs-1",
PvcSpec: &corev1.PersistentVolumeClaimSpec{
Expand All @@ -180,6 +195,7 @@ func TestReconciler_pvc(t *testing.T) {
v1beta1.AppLabelKey: "kafka",
v1beta1.KafkaCRLabelKey: kafkaCluster.GetName(),
v1beta1.BrokerIdLabelKey: "2",
v1beta1.PvcRolesKey: "broker",
},
Annotations: map[string]string{
"mountPath": "/kafka-logs-1",
Expand All @@ -190,6 +206,84 @@ func TestReconciler_pvc(t *testing.T) {
},
},
},
{
testName: "kraft controller-only node",
brokerConfig: &v1beta1.BrokerConfig{Roles: []string{v1beta1.ControllerNodeProcessRole}},
kRaftMode: true,
storageConfig: v1beta1.StorageConfig{
MountPath: "/kafka-logs-1",
PvcSpec: &corev1.PersistentVolumeClaimSpec{},
},
expectedPersistentVolumeClaim: &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: kafkaCluster.GetNamespace(),
Name: "",
GenerateName: fmt.Sprintf("%s-2-storage-1-", kafkaCluster.GetName()),
Labels: map[string]string{
v1beta1.AppLabelKey: "kafka",
v1beta1.KafkaCRLabelKey: kafkaCluster.GetName(),
v1beta1.BrokerIdLabelKey: "2",
v1beta1.PvcRolesKey: "controller",
},
Annotations: map[string]string{
"mountPath": "/kafka-logs-1",
},
},
Spec: corev1.PersistentVolumeClaimSpec{},
},
},
{
testName: "kraft broker-only node",
brokerConfig: &v1beta1.BrokerConfig{Roles: []string{v1beta1.BrokerNodeProcessRole}},
kRaftMode: true,
storageConfig: v1beta1.StorageConfig{
MountPath: "/kafka-logs-1",
PvcSpec: &corev1.PersistentVolumeClaimSpec{},
},
expectedPersistentVolumeClaim: &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: kafkaCluster.GetNamespace(),
Name: "",
GenerateName: fmt.Sprintf("%s-2-storage-1-", kafkaCluster.GetName()),
Labels: map[string]string{
v1beta1.AppLabelKey: "kafka",
v1beta1.KafkaCRLabelKey: kafkaCluster.GetName(),
v1beta1.BrokerIdLabelKey: "2",
v1beta1.PvcRolesKey: "broker",
},
Annotations: map[string]string{
"mountPath": "/kafka-logs-1",
},
},
Spec: corev1.PersistentVolumeClaimSpec{},
},
},
{
testName: "kraft combined broker+controller node",
brokerConfig: &v1beta1.BrokerConfig{Roles: []string{v1beta1.BrokerNodeProcessRole, v1beta1.ControllerNodeProcessRole}},
kRaftMode: true,
storageConfig: v1beta1.StorageConfig{
MountPath: "/kafka-logs-1",
PvcSpec: &corev1.PersistentVolumeClaimSpec{},
},
expectedPersistentVolumeClaim: &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: kafkaCluster.GetNamespace(),
Name: "",
GenerateName: fmt.Sprintf("%s-2-storage-1-", kafkaCluster.GetName()),
Labels: map[string]string{
v1beta1.AppLabelKey: "kafka",
v1beta1.KafkaCRLabelKey: kafkaCluster.GetName(),
v1beta1.BrokerIdLabelKey: "2",
v1beta1.PvcRolesKey: "broker_controller",
},
Annotations: map[string]string{
"mountPath": "/kafka-logs-1",
},
},
Spec: corev1.PersistentVolumeClaimSpec{},
},
},
}

t.Parallel()
Expand All @@ -198,7 +292,7 @@ func TestReconciler_pvc(t *testing.T) {
test := test

t.Run(test.testName, func(t *testing.T) {
pvc, err := r.pvc(2, 1, test.storageConfig)
pvc, err := r.pvc(2, 1, test.storageConfig, test.brokerConfig, test.kRaftMode)

assert.NilError(t, err, "PVC creation should succeed")

Expand Down
Loading