Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions deploy/helm/kafka-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,10 @@ spec:
type: object
x-kubernetes-preserve-unknown-fields: true
type: object
bootstrapListenerClass:
description: The ListenerClass used for bootstrapping new clients.
nullable: true
type: string
gracefulShutdownTimeout:
description: Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details.
nullable: true
Expand Down Expand Up @@ -1271,6 +1275,10 @@ spec:
type: object
x-kubernetes-preserve-unknown-fields: true
type: object
bootstrapListenerClass:
description: The ListenerClass used for bootstrapping new clients.
nullable: true
type: string
gracefulShutdownTimeout:
description: Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details.
nullable: true
Expand Down
164 changes: 135 additions & 29 deletions rust/operator-binary/src/crd/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use stackable_operator::{
};
use strum::{EnumDiscriminants, EnumString};

use crate::crd::{STACKABLE_LISTENER_BROKER_DIR, security::KafkaTlsSecurity, v1alpha1};
use crate::crd::{
STACKABLE_LISTENER_BOOTSTRAP_DIR, STACKABLE_LISTENER_BROKER_DIR, security::KafkaTlsSecurity,
v1alpha1,
};

const LISTENER_LOCAL_ADDRESS: &str = "0.0.0.0";

Expand Down Expand Up @@ -186,13 +189,22 @@ impl Display for KafkaListener {
}
}

// Builds a list of listeners for the given Kafka cluster and rolegroup.
//
// TODO: Not every listener is necessarily used by every role while some listeners are used by both roles.
// Yeah, this is confusing and needs refactoring.
//
// For example, the INTERNAL and CLIENT listener are only configured on brokers.
// On the other hand, the BOOTSTRAP listener is configured on both roles.
// Note that actual the bootstrap services are different between brokers and controllers but from the
// Kafka perspective they are both called BOOTSTRAP.
pub fn get_kafka_listener_config(
kafka: &v1alpha1::KafkaCluster,
kafka_security: &KafkaTlsSecurity,
rolegroup_ref: &RoleGroupRef<v1alpha1::KafkaCluster>,
cluster_info: &KubernetesClusterInfo,
) -> Result<KafkaListenerConfig, KafkaListenerError> {
let pod_fqdn = pod_fqdn(
let headless_pod_fqdn = pod_fqdn(
kafka,
&rolegroup_ref.rolegroup_headless_service_name(),
cluster_info,
Expand Down Expand Up @@ -269,7 +281,7 @@ pub fn get_kafka_listener_config(
});
advertised_listeners.push(KafkaListener {
name: KafkaListenerName::Internal,
host: pod_fqdn.to_string(),
host: headless_pod_fqdn.to_string(),
port: KafkaTlsSecurity::SECURE_INTERNAL_PORT.to_string(),
});
listener_security_protocol_map
Expand All @@ -285,7 +297,7 @@ pub fn get_kafka_listener_config(
});
advertised_listeners.push(KafkaListener {
name: KafkaListenerName::Internal,
host: pod_fqdn.to_string(),
host: headless_pod_fqdn.to_string(),
port: kafka_security.internal_port().to_string(),
});
listener_security_protocol_map.insert(
Expand All @@ -298,23 +310,39 @@ pub fn get_kafka_listener_config(
);
}

let bootstrap_service_fqdn = service_fqdn(
kafka,
&rolegroup_service_name(rolegroup_ref, KafkaListenerName::Bootstrap),
cluster_info,
)?;

// BOOTSTRAP
listeners.push(KafkaListener {
name: KafkaListenerName::Bootstrap,
host: LISTENER_LOCAL_ADDRESS.to_string(),
port: kafka_security.bootstrap_port().to_string(),
});
advertised_listeners.push(KafkaListener {
name: KafkaListenerName::Bootstrap,
host: bootstrap_service_fqdn.to_string(),
port: node_port_cmd(
STACKABLE_LISTENER_BOOTSTRAP_DIR,
kafka_security.bootstrap_port_name(),
),
});
if kafka_security.has_kerberos_enabled() {
listeners.push(KafkaListener {
name: KafkaListenerName::Bootstrap,
host: LISTENER_LOCAL_ADDRESS.to_string(),
port: kafka_security.bootstrap_port().to_string(),
});
advertised_listeners.push(KafkaListener {
name: KafkaListenerName::Bootstrap,
host: node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
port: node_port_cmd(
STACKABLE_LISTENER_BROKER_DIR,
kafka_security.client_port_name(),
),
});
listener_security_protocol_map
.insert(KafkaListenerName::Bootstrap, KafkaListenerProtocol::SaslSsl);
} else if kafka_security.tls_client_authentication_class().is_some()
|| kafka_security.tls_server_secret_class().is_some()
{
listener_security_protocol_map
.insert(KafkaListenerName::Bootstrap, KafkaListenerProtocol::Ssl);
} else {
listener_security_protocol_map.insert(
KafkaListenerName::Bootstrap,
KafkaListenerProtocol::Plaintext,
);
}

Ok(KafkaListenerConfig {
Expand Down Expand Up @@ -352,6 +380,31 @@ pub fn pod_fqdn(
))
}

// TODO: This is the more general version to `RoleGroupRef::rolegroup_headless_service_name()`
// because we need it for the bootstrap service as well, which doesn't exist in op-rs.
fn rolegroup_service_name(
rolegroup_ref: &RoleGroupRef<v1alpha1::KafkaCluster>,
listener: KafkaListenerName,
) -> String {
format!(
"{name}-{service}",
name = rolegroup_ref.object_name(),
service = listener.to_string().to_lowercase()
)
}

pub fn service_fqdn(
kafka: &v1alpha1::KafkaCluster,
sts_service_name: &str,
cluster_info: &KubernetesClusterInfo,
) -> Result<String, KafkaListenerError> {
Ok(format!(
"{sts_service_name}.{namespace}.svc.{cluster_domain}",
namespace = kafka.namespace().context(ObjectHasNoNamespaceSnafu)?,
cluster_domain = cluster_info.cluster_domain
))
}

#[cfg(test)]
mod tests {
use stackable_operator::{
Expand Down Expand Up @@ -414,20 +467,23 @@ mod tests {
assert_eq!(
config.listeners(),
format!(
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}",
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}",
name = KafkaListenerName::Client,
host = LISTENER_LOCAL_ADDRESS,
port = kafka_security.client_port(),
internal_name = KafkaListenerName::Internal,
internal_host = LISTENER_LOCAL_ADDRESS,
internal_port = kafka_security.internal_port(),
bootstrap_name = KafkaListenerName::Bootstrap,
bootstrap_host = LISTENER_LOCAL_ADDRESS,
bootstrap_port = kafka_security.bootstrap_port(),
)
);

assert_eq!(
config.advertised_listeners(),
format!(
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}",
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}",
name = KafkaListenerName::Client,
host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
port = node_port_cmd(
Expand All @@ -442,17 +498,30 @@ mod tests {
)
.unwrap(),
internal_port = kafka_security.internal_port(),
bootstrap_name = KafkaListenerName::Bootstrap,
bootstrap_host = service_fqdn(
&kafka,
&rolegroup_service_name(&rolegroup_ref, KafkaListenerName::Bootstrap),
&cluster_info
)
.unwrap(),
bootstrap_port = node_port_cmd(
STACKABLE_LISTENER_BOOTSTRAP_DIR,
kafka_security.bootstrap_port_name()
),
)
);

assert_eq!(
config.listener_security_protocol_map(),
format!(
"{name}:{protocol},{internal_name}:{internal_protocol},{controller_name}:{controller_protocol}",
"{name}:{protocol},{internal_name}:{internal_protocol},{bootstrap_name}:{bootstrap_protocol},{controller_name}:{controller_protocol}",
name = KafkaListenerName::Client,
protocol = KafkaListenerProtocol::Ssl,
internal_name = KafkaListenerName::Internal,
internal_protocol = KafkaListenerProtocol::Ssl,
bootstrap_name = KafkaListenerName::Bootstrap,
bootstrap_protocol = KafkaListenerProtocol::Ssl,
controller_name = KafkaListenerName::Controller,
controller_protocol = KafkaListenerProtocol::Ssl,
)
Expand All @@ -470,20 +539,23 @@ mod tests {
assert_eq!(
config.listeners(),
format!(
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}",
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}",
name = KafkaListenerName::Client,
host = LISTENER_LOCAL_ADDRESS,
port = kafka_security.client_port(),
internal_name = KafkaListenerName::Internal,
internal_host = LISTENER_LOCAL_ADDRESS,
internal_port = kafka_security.internal_port(),
bootstrap_name = KafkaListenerName::Bootstrap,
bootstrap_host = LISTENER_LOCAL_ADDRESS,
bootstrap_port = kafka_security.bootstrap_port(),
)
);

assert_eq!(
config.advertised_listeners(),
format!(
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}",
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}",
name = KafkaListenerName::Client,
host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
port = node_port_cmd(
Expand All @@ -498,19 +570,32 @@ mod tests {
)
.unwrap(),
internal_port = kafka_security.internal_port(),
bootstrap_name = KafkaListenerName::Bootstrap,
bootstrap_host = service_fqdn(
&kafka,
&rolegroup_service_name(&rolegroup_ref, KafkaListenerName::Bootstrap),
&cluster_info
)
.unwrap(),
bootstrap_port = node_port_cmd(
STACKABLE_LISTENER_BOOTSTRAP_DIR,
kafka_security.bootstrap_port_name()
),
)
);

assert_eq!(
config.listener_security_protocol_map(),
format!(
"{name}:{protocol},{internal_name}:{internal_protocol},{controller_name}:{controller_protocol}",
"{name}:{protocol},{internal_name}:{internal_protocol},{bootstrap_name}:{bootstrap_protocol},{controller_name}:{controller_protocol}",
name = KafkaListenerName::Client,
protocol = KafkaListenerProtocol::Ssl,
internal_name = KafkaListenerName::Internal,
internal_protocol = KafkaListenerProtocol::Ssl,
controller_name = KafkaListenerName::Controller,
controller_protocol = KafkaListenerProtocol::Ssl,
bootstrap_name = KafkaListenerName::Bootstrap,
bootstrap_protocol = KafkaListenerProtocol::Ssl,
)
);

Expand All @@ -527,20 +612,23 @@ mod tests {
assert_eq!(
config.listeners(),
format!(
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}",
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}",
name = KafkaListenerName::Client,
host = LISTENER_LOCAL_ADDRESS,
port = kafka_security.client_port(),
internal_name = KafkaListenerName::Internal,
internal_host = LISTENER_LOCAL_ADDRESS,
internal_port = kafka_security.internal_port(),
bootstrap_name = KafkaListenerName::Bootstrap,
bootstrap_host = LISTENER_LOCAL_ADDRESS,
bootstrap_port = kafka_security.bootstrap_port(),
)
);

assert_eq!(
config.advertised_listeners(),
format!(
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}",
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}",
name = KafkaListenerName::Client,
host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
port = node_port_cmd(
Expand All @@ -555,19 +643,32 @@ mod tests {
)
.unwrap(),
internal_port = kafka_security.internal_port(),
bootstrap_name = KafkaListenerName::Bootstrap,
bootstrap_host = service_fqdn(
&kafka,
&rolegroup_service_name(&rolegroup_ref, KafkaListenerName::Bootstrap),
&cluster_info
)
.unwrap(),
bootstrap_port = node_port_cmd(
STACKABLE_LISTENER_BOOTSTRAP_DIR,
kafka_security.bootstrap_port_name()
),
)
);

assert_eq!(
config.listener_security_protocol_map(),
format!(
"{name}:{protocol},{internal_name}:{internal_protocol},{controller_name}:{controller_protocol}",
"{name}:{protocol},{internal_name}:{internal_protocol},{bootstrap_name}:{bootstrap_protocol},{controller_name}:{controller_protocol}",
name = KafkaListenerName::Client,
protocol = KafkaListenerProtocol::Plaintext,
internal_name = KafkaListenerName::Internal,
internal_protocol = KafkaListenerProtocol::Plaintext,
controller_name = KafkaListenerName::Controller,
controller_protocol = KafkaListenerProtocol::Plaintext,
bootstrap_name = KafkaListenerName::Bootstrap,
bootstrap_protocol = KafkaListenerProtocol::Plaintext,
)
);
}
Expand Down Expand Up @@ -648,10 +749,15 @@ mod tests {
.unwrap(),
internal_port = kafka_security.internal_port(),
bootstrap_name = KafkaListenerName::Bootstrap,
bootstrap_host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
bootstrap_host = service_fqdn(
&kafka,
&rolegroup_service_name(&rolegroup_ref, KafkaListenerName::Bootstrap),
&cluster_info
)
.unwrap(),
bootstrap_port = node_port_cmd(
STACKABLE_LISTENER_BROKER_DIR,
kafka_security.client_port_name()
STACKABLE_LISTENER_BOOTSTRAP_DIR,
kafka_security.bootstrap_port_name()
),
)
);
Expand Down
4 changes: 4 additions & 0 deletions rust/operator-binary/src/crd/role/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ pub struct ControllerConfig {

#[fragment_attrs(serde(default))]
pub resources: Resources<Storage, NoRuntimeLimits>,

/// The ListenerClass used for bootstrapping new clients.
pub bootstrap_listener_class: String,
}

impl ControllerConfig {
Expand All @@ -88,6 +91,7 @@ impl ControllerConfig {
},
},
},
bootstrap_listener_class: Some("cluster-internal".to_string()),
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions rust/operator-binary/src/crd/role/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,13 @@ impl AnyConfig {
}
}

pub fn bootstrap_listener_class(&self) -> &String {
match self {
AnyConfig::Broker(broker_config) => &broker_config.bootstrap_listener_class,
AnyConfig::Controller(controller_config) => &controller_config.bootstrap_listener_class,
}
}

pub fn config_file_name(&self) -> &str {
match self {
AnyConfig::Broker(_) => BROKER_PROPERTIES_FILE,
Expand Down
Loading