Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions deploy/helm/opensearch-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ spec:
type: object
x-kubernetes-preserve-unknown-fields: true
type: object
discoveryServiceExposed:
description: Determines whether this role group is exposed in the discovery service.
nullable: true
type: boolean
gracefulShutdownTimeout:
description: |-
Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the
Expand Down Expand Up @@ -517,11 +521,19 @@ spec:
x-kubernetes-preserve-unknown-fields: true
roleConfig:
default:
discoveryServiceListenerClass: cluster-internal
podDisruptionBudget:
enabled: true
maxUnavailable: null
description: This is a product-agnostic RoleConfig, which is sufficient for most of the products.
properties:
discoveryServiceListenerClass:
default: cluster-internal
description: The [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) that is used for the discovery service.
maxLength: 253
minLength: 1
pattern: ^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$
type: string
podDisruptionBudget:
default:
enabled: true
Expand Down Expand Up @@ -600,6 +612,10 @@ spec:
type: object
x-kubernetes-preserve-unknown-fields: true
type: object
discoveryServiceExposed:
description: Determines whether this role group is exposed in the discovery service.
nullable: true
type: boolean
gracefulShutdownTimeout:
description: |-
Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the
Expand Down
110 changes: 74 additions & 36 deletions rust/operator-binary/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ use std::{collections::BTreeMap, marker::PhantomData, str::FromStr, sync::Arc};

use apply::Applier;
use build::build;
use dereference::dereference;
use snafu::{ResultExt, Snafu};
use stackable_operator::{
cluster_resources::ClusterResourceApplyStrategy,
commons::{affinity::StackableAffinity, product_image_selection::ResolvedProductImage},
crd::listener::v1alpha1::Listener,
commons::{
affinity::StackableAffinity, networking::DomainName,
product_image_selection::ResolvedProductImage,
},
crd::listener,
k8s_openapi::api::{
apps::v1::StatefulSet,
core::v1::{ConfigMap, Service, ServiceAccount},
Expand All @@ -20,7 +24,6 @@ use stackable_operator::{
},
kube::{Resource, api::ObjectMeta, core::DeserializeGuard, runtime::controller::Action},
logging::controller::ReconcilerError,
role_utils::GenericRoleConfig,
shared::time::Duration,
};
use strum::{EnumDiscriminants, IntoStaticStr};
Expand All @@ -34,7 +37,8 @@ use crate::{
product_logging::framework::{ValidatedContainerLogConfigChoice, VectorContainerLogConfig},
role_utils::{GenericProductSpecificCommonConfig, RoleGroupConfig},
types::{
kubernetes::{ListenerClassName, NamespaceName, Uid},
common::Port,
kubernetes::{Hostname, ListenerClassName, NamespaceName, Uid},
operator::{
ClusterName, ControllerName, OperatorName, ProductName, ProductVersion,
RoleGroupName, RoleName,
Expand All @@ -45,6 +49,7 @@ use crate::{

mod apply;
mod build;
mod dereference;
mod update_status;
mod validate;

Expand All @@ -56,6 +61,7 @@ pub struct ContextNames {
pub product_name: ProductName,
pub operator_name: OperatorName,
pub controller_name: ControllerName,
pub cluster_domain_name: DomainName,
}

/// The controller context
Expand All @@ -66,19 +72,22 @@ pub struct Context {

impl Context {
pub fn new(client: stackable_operator::client::Client, operator_name: OperatorName) -> Self {
let cluster_domain_name = client.kubernetes_cluster_info.cluster_domain.clone();

Context {
client,
names: Self::context_names(operator_name),
names: Self::context_names(operator_name, cluster_domain_name),
}
}

fn context_names(operator_name: OperatorName) -> ContextNames {
fn context_names(operator_name: OperatorName, cluster_domain_name: DomainName) -> ContextNames {
ContextNames {
product_name: ProductName::from_str("opensearch")
.expect("should be a valid product name"),
operator_name,
controller_name: ControllerName::from_str("opensearchcluster")
.expect("should be a valid controller name"),
cluster_domain_name,
}
}

Expand All @@ -103,6 +112,9 @@ pub enum Error {
source: Box<stackable_operator::kube::core::error_boundary::InvalidObject>,
},

#[snafu(display("failed to dereference resources"))]
Dereference { source: dereference::Error },

#[snafu(display("failed to validate cluster"))]
ValidateCluster { source: validate::Error },

Expand All @@ -127,10 +139,16 @@ type OpenSearchRoleGroupConfig =
type OpenSearchNodeResources =
stackable_operator::commons::resources::Resources<v1alpha1::StorageConfig>;

/// Additional objects required for building the cluster
pub struct DereferencedObjects {
pub maybe_discovery_service_listener: Option<listener::v1alpha1::Listener>,
}

/// Validated [`v1alpha1::OpenSearchConfig`]
#[derive(Clone, Debug, PartialEq)]
pub struct ValidatedOpenSearchConfig {
pub affinity: StackableAffinity,
pub discovery_service_exposed: bool,
pub listener_class: ListenerClassName,
pub logging: ValidatedLogging,
pub node_roles: NodeRoles,
Expand All @@ -152,6 +170,12 @@ impl ValidatedLogging {
}
}

#[derive(Clone, Debug, PartialEq)]
pub struct ValidatedDiscoveryEndpoint {
pub hostname: Hostname,
pub port: Port,
}

/// The validated [`v1alpha1::OpenSearchCluster`]
///
/// Validated means that there should be no reason for Kubernetes to reject resources generated
Expand All @@ -168,10 +192,11 @@ pub struct ValidatedCluster {
pub name: ClusterName,
pub namespace: NamespaceName,
pub uid: Uid,
pub role_config: GenericRoleConfig,
pub role_config: v1alpha1::OpenSearchRoleConfig,
pub role_group_configs: BTreeMap<RoleGroupName, OpenSearchRoleGroupConfig>,
pub tls_config: v1alpha1::OpenSearchTls,
pub keystores: Vec<v1alpha1::OpenSearchKeystore>,
pub discovery_endpoint: Option<ValidatedDiscoveryEndpoint>,
}

impl ValidatedCluster {
Expand All @@ -182,13 +207,14 @@ impl ValidatedCluster {
name: ClusterName,
namespace: NamespaceName,
uid: impl Into<Uid>,
role_config: GenericRoleConfig,
role_config: v1alpha1::OpenSearchRoleConfig,
role_group_configs: BTreeMap<RoleGroupName, OpenSearchRoleGroupConfig>,
tls_config: v1alpha1::OpenSearchTls,
keystores: Vec<v1alpha1::OpenSearchKeystore>,
discovery_endpoint: Option<ValidatedDiscoveryEndpoint>,
) -> Self {
let uid = uid.into();
ValidatedCluster {
Self {
metadata: ObjectMeta {
name: Some(name.to_string()),
namespace: Some(namespace.to_string()),
Expand All @@ -204,6 +230,7 @@ impl ValidatedCluster {
role_group_configs,
tls_config,
keystores,
discovery_endpoint,
}
}

Expand Down Expand Up @@ -286,6 +313,27 @@ impl Resource for ValidatedCluster {
}
}

/// Marker for prepared Kubernetes resources which are not applied yet
struct Prepared;
/// Marker for applied Kubernetes resources
struct Applied;

/// List of all Kubernetes resources produced by this controller
///
/// `T` is a marker that indicates if these resources are only [`Prepared`] or already [`Applied`].
/// The marker is useful e.g. to ensure that the cluster status is updated based on the applied
/// resources.
struct KubernetesResources<T> {
stateful_sets: Vec<StatefulSet>,
services: Vec<Service>,
listeners: Vec<listener::v1alpha1::Listener>,
config_maps: Vec<ConfigMap>,
service_accounts: Vec<ServiceAccount>,
role_bindings: Vec<RoleBinding>,
pod_disruption_budgets: Vec<PodDisruptionBudget>,
status: PhantomData<T>,
}

pub fn error_policy(
_object: Arc<DeserializeGuard<v1alpha1::OpenSearchCluster>>,
error: &Error,
Expand Down Expand Up @@ -317,10 +365,14 @@ pub async fn reconcile(
.map_err(stackable_operator::kube::core::error_boundary::InvalidObject::clone)
.context(DeserializeClusterDefinitionSnafu)?;

// not necessary in this controller: dereference (client required)
// dereference (client required)
let dereferenced_objects = dereference(&context.client, cluster)
.await
.context(DereferenceSnafu)?;

// validate (no client required)
let validated_cluster = validate(&context.names, cluster).context(ValidateClusterSnafu)?;
let validated_cluster =
validate(&context.names, cluster, &dereferenced_objects).context(ValidateClusterSnafu)?;

// build (no client required; infallible)
let prepared_resources = build(&context.names, validated_cluster.clone());
Expand Down Expand Up @@ -350,27 +402,6 @@ pub async fn reconcile(
Ok(Action::await_change())
}

/// Marker for prepared Kubernetes resources which are not applied yet
struct Prepared;
/// Marker for applied Kubernetes resources
struct Applied;

/// List of all Kubernetes resources produced by this controller
///
/// `T` is a marker that indicates if these resources are only [`Prepared`] or already [`Applied`].
/// The marker is useful e.g. to ensure that the cluster status is updated based on the applied
/// resources.
struct KubernetesResources<T> {
stateful_sets: Vec<StatefulSet>,
services: Vec<Service>,
listeners: Vec<Listener>,
config_maps: Vec<ConfigMap>,
service_accounts: Vec<ServiceAccount>,
role_bindings: Vec<RoleBinding>,
pod_disruption_budgets: Vec<PodDisruptionBudget>,
status: PhantomData<T>,
}

#[cfg(test)]
mod tests {
use std::{
Expand All @@ -379,11 +410,13 @@ mod tests {
};

use stackable_operator::{
commons::{affinity::StackableAffinity, product_image_selection::ResolvedProductImage},
commons::{
affinity::StackableAffinity, networking::DomainName,
product_image_selection::ResolvedProductImage,
},
k8s_openapi::api::core::v1::PodTemplateSpec,
kvp::LabelValue,
product_logging::spec::AutomaticContainerLogConfig,
role_utils::GenericRoleConfig,
shared::time::Duration,
};
use uuid::uuid;
Expand All @@ -406,7 +439,10 @@ mod tests {
#[test]
fn test_context_names() {
// Test that the function does not panic
Context::context_names(OperatorName::from_str_unsafe("my-operator"));
Context::context_names(
OperatorName::from_str_unsafe("my-operator"),
DomainName::from_str("cluster.local").expect("should be a valid domain name"),
);
}

#[test]
Expand Down Expand Up @@ -476,7 +512,7 @@ mod tests {
ClusterName::from_str_unsafe("my-opensearch"),
NamespaceName::from_str_unsafe("default"),
uuid!("e6ac237d-a6d4-43a1-8135-f36506110912"),
GenericRoleConfig::default(),
v1alpha1::OpenSearchRoleConfig::default(),
[
(
RoleGroupName::from_str_unsafe("coordinating"),
Expand Down Expand Up @@ -512,6 +548,7 @@ mod tests {
.into(),
v1alpha1::OpenSearchTls::default(),
vec![],
None,
)
}

Expand All @@ -523,6 +560,7 @@ mod tests {
replicas,
config: ValidatedOpenSearchConfig {
affinity: StackableAffinity::default(),
discovery_service_exposed: true,
listener_class: ListenerClassName::from_str_unsafe("external-stable"),
logging: ValidatedLogging {
opensearch_container: ValidatedContainerLogConfigChoice::Automatic(
Expand Down
Loading