Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
bin
charts/**/charts
charts/koperator/requirements.lock

charts/kafka-operator/ingress
# Test binary, build with `go test -c`
*.test

Expand Down
13 changes: 10 additions & 3 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,16 @@ type KafkaClusterSpec struct {
// This is default to be true; if set to false, the Kafka cluster is in ZooKeeper mode.
// +kubebuilder:default=false
// +optional
KRaftMode bool `json:"kRaft"`
HeadlessServiceEnabled bool `json:"headlessServiceEnabled"`
ListenersConfig ListenersConfig `json:"listenersConfig"`
KRaftMode bool `json:"kRaft"`
HeadlessServiceEnabled bool `json:"headlessServiceEnabled"`
// DebugEnabled is used to decide whether to create a separate loadbalancer services for the
// Kafka and Cruise Control Pods. These services will expose the internal listener ports of the Kafka
// cluster with LoadBalancer type, which can be used for running Koperator on a local machine against
// a kafkaCluster instance on a Kind Cluster.
// +kubebuilder:default=false
// +optional
DebugEnabled bool `json:"debugEnabled"`
ListenersConfig ListenersConfig `json:"listenersConfig"`
// Custom ports to expose in the container. Example use case: a custom kafka distribution, that includes an integrated metrics api endpoint
AdditionalPorts []corev1.ContainerPort `json:"additionalPorts,omitempty"`
// ZKAddresses specifies the ZooKeeper connection string
Expand Down
8 changes: 8 additions & 0 deletions charts/kafka-operator/crds/kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19231,6 +19231,14 @@ spec:
type: object
type: array
type: object
debugEnabled:
default: false
description: |-
DebugEnabled is used to decide whether to create a separate loadbalancer services for the
Kafka and Cruise Control Pods. These services will expose the internal listener ports of the Kafka
cluster with LoadBalancer type, which can be used for running Koperator on a local machine against
a kafkaCluster instance on a Kind Cluster.
type: boolean
disruptionBudget:
description: DisruptionBudget defines the configuration for PodDisruptionBudget
where the workload is managed by the kafka-operator
Expand Down
8 changes: 8 additions & 0 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19231,6 +19231,14 @@ spec:
type: object
type: array
type: object
debugEnabled:
default: false
description: |-
DebugEnabled is used to decide whether to create a separate loadbalancer services for the
Kafka and Cruise Control Pods. These services will expose the internal listener ports of the Kafka
cluster with LoadBalancer type, which can be used for running Koperator on a local machine against
a kafkaCluster instance on a Kind Cluster.
type: boolean
disruptionBudget:
description: DisruptionBudget defines the configuration for PodDisruptionBudget
where the workload is managed by the kafka-operator
Expand Down
9 changes: 9 additions & 0 deletions config/samples/simpleZookeeper.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: zookeeper.pravega.io/v1beta1
kind: ZookeeperCluster
metadata:
name: zookeeper-server
namespace: zookeeper
spec:
replicas: 3
persistence:
reclaimPolicy: Delete
3 changes: 2 additions & 1 deletion config/samples/simplekafkacluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ metadata:
controller-tools.k8s.io: "1.0"
name: kafka
spec:
debugEnabled: true
kRaft: false
monitoringConfig:
jmxImage: "ghcr.io/adobe/koperator/jmx-javaagent:1.4.0"
headlessServiceEnabled: true
headlessServiceEnabled: false
zkAddresses:
- "zookeeper-server-client.zookeeper:2181"
propagateLabels: false
Expand Down
22 changes: 22 additions & 0 deletions config/scaleops/CustomOwnerGrouping.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

kind: CustomOwnerGrouping
apiVersion: analysis.scaleops.sh/v1alpha1
metadata:
name: kafkabroker
namespace: scaleops-system
spec:
groupBy:
positiveRegexMatch: false
groupBys:
- labels:
- 'isBrokerNode: true'
positiveRegexMatch: false
topOwnerController:
apiVersion: kafka.banzaicloud.io/v1beta1
kind: KafkaCluster
displayOptions:
hideGeneratedSuffix: true
fields:
- ownerName
defaultPolicy: kafka-brokers
enabled: true
96 changes: 96 additions & 0 deletions config/scaleops/KafkaBrokersPolicy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
kind: Policy
apiVersion: analysis.scaleops.sh/v1alpha1
metadata:
name: kafka-brokers
namespace: scaleops-system
spec:
type: Optimize
policyOptimize:
rightSizePolicy:
windowByResource:
cpu: 2h
ephemeral-storage: 2h
memory: 2h
requestsConfigs:
cpu:
headroomPercentage: 20
percentilePercentage: 97
minAllowed: 500m
maxAllowed: 0m
keepRequest: false
ephemeral-storage:
headroomPercentage: 5
percentilePercentage: 90
minAllowed: 0Gi
maxAllowed: 0Gi
memory:
headroomPercentage: 20
percentilePercentage: 97
minAllowed: 1Gi
maxAllowed: 0Gi
keepRequest: false
limitConfigs:
cpu:
noLimit: false
keepLimit: true
setLimit: null
setLimitRequestRatio: null
keepLimitRequestRatio: false
headroomPercentage: null
equalsToRequest: false
ephemeral-storage:
noLimit: false
keepLimit: true
setLimit: null
setLimitRequestRatio: null
keepLimitRequestRatio: false
headroomPercentage: null
equalsToRequest: false
memory:
noLimit: false
keepLimit: true
setLimit: null
setLimitRequestRatio: null
keepLimitRequestRatio: false
headroomPercentage: null
equalsToRequest: false
nodeCappingPolicy:
nodeCappingAuto: true
cpuInteger: false
histogramReplicaPercentilePerMinuteByResource:
memory: 100
jvmOptimizationEnabled: false
ephemeralStorageOptimizationEnabled: false
fastReaction:
enabled:
cpu: true
memory: true
updatePolicy:
updateByTypeMode:
deployment: OnCreate
statefulSet: OnCreate
daemonSet: OnCreate
job: OnCreate
family: OnCreate
argoRollout: OnCreate
minReplicas: 1
considerDeploymentStrategy: false
podMinReadySeconds: 5
requiredWindowCoveragePercentage: 3
allowRollingUpdate: true
inPlaceUpdateStrategy:
enableInPlaceForUponPodCreationStrategy: true
enableInPlaceForOngoingStrategy: true
binPackUnEvictablePods: true
skipRolloutUponAutomation: true
ignoreAutoscalerSafeToEvictAnnotations: false
activelyEnforceOptimizationByContext: true
initContainers:
enabled: true
autoDetectionRules: {}
hpa:
manageHPA: true
autoHealing:
enabledV2: true
enabledByResource:
ephemeral-storage: true
9 changes: 8 additions & 1 deletion pkg/resources/cruisecontrol/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ import (
)

func (r *Reconciler) service() runtime.Object {
return &corev1.Service{
svc := &corev1.Service{
ObjectMeta: templates.ObjectMeta(
fmt.Sprintf(serviceNameTemplate, r.KafkaCluster.Name),
apiutil.MergeLabels(ccLabelSelector(r.KafkaCluster.Name), r.KafkaCluster.Labels),
r.KafkaCluster,
),
Spec: corev1.ServiceSpec{
Selector: ccLabelSelector(r.KafkaCluster.Name),
Type: corev1.ServiceTypeClusterIP,
Ports: []corev1.ServicePort{
{
Name: "cc",
Expand All @@ -50,4 +51,10 @@ func (r *Reconciler) service() runtime.Object {
},
},
}

if r.KafkaCluster.Spec.DebugEnabled {
svc.Spec.Type = corev1.ServiceTypeLoadBalancer
}

return svc
}
8 changes: 7 additions & 1 deletion pkg/resources/kafka/allBrokerService.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (r *Reconciler) allBrokerService() runtime.Object {
usedPorts = append(usedPorts,
generateServicePortForAdditionalPorts(r.KafkaCluster.Spec.AdditionalPorts)...)

return &corev1.Service{
svc := &corev1.Service{
ObjectMeta: templates.ObjectMetaWithAnnotations(
fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, r.KafkaCluster.GetName()),
apiutil.LabelsForKafka(r.KafkaCluster.GetName()),
Expand All @@ -52,4 +52,10 @@ func (r *Reconciler) allBrokerService() runtime.Object {
Ports: usedPorts,
},
}

if r.KafkaCluster.Spec.DebugEnabled {
svc.Spec.Type = corev1.ServiceTypeLoadBalancer
}

return svc
}
6 changes: 5 additions & 1 deletion pkg/resources/kafka/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (r *Reconciler) service(id int32, _ *v1beta1.BrokerConfig) runtime.Object {
Protocol: corev1.ProtocolTCP,
})

return &corev1.Service{
svc := &corev1.Service{
ObjectMeta: templates.ObjectMetaWithAnnotations(fmt.Sprintf("%s-%d", r.KafkaCluster.Name, id),
apiutil.MergeLabels(
apiutil.LabelsForKafka(r.KafkaCluster.Name),
Expand All @@ -61,4 +61,8 @@ func (r *Reconciler) service(id int32, _ *v1beta1.BrokerConfig) runtime.Object {
Ports: usedPorts,
},
}
if r.KafkaCluster.Spec.DebugEnabled {
svc.Spec.Type = corev1.ServiceTypeLoadBalancer
}
return svc
}
81 changes: 81 additions & 0 deletions run-local.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#!/bin/bash
set -e

## Prerequisite checks
if [ -z "${SCALEOPS_TOKEN}" ]; then
echo "Error: SCALEOPS_TOKEN environment variable is not set"
exit 1
fi

## Create kind cluster
kind delete clusters kind-kafka
kind create cluster --config=./tests/e2e/platforms/kind/kind_config.yaml --name=kind-kafka

## Build/Load images (Kafka 3.7.0)
kind load docker-image docker-pipeline-upstream-mirror.dr-uw2.adobeitc.com/adobe/kafka:2.13-3.7.0 --name kind-kafka
### Skip if you want to run koperator locally
docker build . -t koperator_e2e_test
kind load docker-image koperator_e2e_test:latest --name kind-kafka

## Install Helm Charts and CRDs
### project contour
helm repo add contour https://projectcontour.github.io/helm-charts/ || true
helm upgrade --install contour contour/contour --namespace projectcontour --create-namespace

### cert-manager
helm repo add jetstack https://charts.jetstack.io --force-update || true
helm upgrade --install cert-manager jetstack/cert-manager --namespace cert-manager --create-namespace --version v1.16.2 --set crds.enabled=true

### zookeeper-operator
helm repo add pravega https://charts.pravega.io || true
helm upgrade --install zookeeper-operator pravega/zookeeper-operator --version 0.2.15 --namespace zookeeper --create-namespace --set crd.create=true

### prometheus
helm repo add prometheus https://prometheus-community.github.io/helm-charts || true
helm upgrade --install prometheus prometheus/kube-prometheus-stack --version 54.1.0 --namespace prometheus --create-namespace

### scaleops
helm upgrade --install --create-namespace -n scaleops-system --repo https://registry.scaleops.com/charts/ --username scaleops --password ${SCALEOPS_TOKEN} --set scaleopsToken=${SCALEOPS_TOKEN} --set clusterName=$(kubectl config current-context) scaleops scaleops
kubectl apply -f config/scaleops/CustomOwnerGrouping.yaml
kubectl apply -f config/scaleops/KafkaBrokersPolicy.yaml
#### Scaleops Dashboard Port Forward
# kubectl port-forward <scaleops-dashboard-pod-name> 8080 -n scaleops-system
# (find pod name with: kubectl get pods -n scaleops-system)

## Run Koperator Locally
### Start Cloud Provider Kind in the background to enable LoadBalancer services for local koperator
# sudo ~/go/bin/cloud-provider-kind
# (run this manually in a separate terminal before starting koperator)

### Start Local Koperator instance:
kubectl create namespace kafka || true
kubectl ens kafka
make install
# Run koperator locally in a separate terminal:
# go run ./main.go --metrics-addr=:8090 --disable-webhooks

## Initialize Zookeeper and Kafka Cluster
kubectl apply -f config/samples/simplezookeeper.yaml -n zookeeper
kubectl apply -f config/samples/simplekafkacluster.yaml -n kafka

# NOTES for running koperator locally:
#
# If you want to run koperator locally, make sure to set `debugEnabled: true`
# in your KafkaCluster spec. This will create LoadBalancer services for the
# Kafka and Cruise Control pods, allowing your local koperator to access
# services running on the Kind cluster.
#
# Cloud Provider KIND is required to enable LoadBalancer services on Kind.
# This is necessary for local koperator access. If you don't want to run it,
# you can port-forward the services instead.
#
# Finally, you'll need to update your /etc/hosts file to direct request from
# Koperator to the LoadBalancer IPs. You can find the LoadBalancer IPs by running:
# kubectl get svc -n kafka
#
# Your /etc/hosts entries should look something like this:
# 172.18.0.7 kafka-0.kafka.svc.cluster.local
# 172.18.0.9 kafka-1.kafka.svc.cluster.local
# 172.18.0.10 kafka-2.kafka.svc.cluster.local
# 172.18.0.11 kafka-all-broker.kafka.svc.cluster.local
# 172.18.0.8 kafka-cruisecontrol-svc.kafka.svc.cluster.local
14 changes: 8 additions & 6 deletions tests/e2e/platforms/kind/kind_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# topology.kubernetes.io/zone (e.g. config/samples/simplekafkacluster_affinity.yaml).
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
name: kind-kafka
nodes:
- role: control-plane
kubeadmConfigPatches:
Expand Down Expand Up @@ -32,9 +33,10 @@ nodes:
nodeRegistration:
kubeletExtraArgs:
node-labels: "topology.kubernetes.io/zone=zone-c"
containerdConfigPatches:
- |-
[plugins."io.containerd.grpc.v1.cri".containerd]
snapshotter = "overlayfs"
[plugins."io.containerd.grpc.v1.cri".registry.mirrors."localhost:5000"]
endpoint = ["http://localhost:5000"]
extraPortMappings:
- containerPort: 80
hostPort: 80
listenAddress: "0.0.0.0"
- containerPort: 443
hostPort: 443
listenAddress: "0.0.0.0"
Loading