Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
### Fixed

- Default `API_WORKERS` to 1 (instead of letting Airflow default to 4) to prevent crashloop and update/correct docs to reflect this ([#727]).
- Prevent unnecessary Pod restarts when initially creating an AirflowCluster.
This is achieved by applying the StatefulSet after all ConfigMaps and Secrets that it mounts ([#734]).

[#726]: https://github.com/stackabletech/airflow-operator/pull/726
[#727]: https://github.com/stackabletech/airflow-operator/pull/727
[#733]: https://github.com/stackabletech/airflow-operator/pull/733
[#734]: https://github.com/stackabletech/airflow-operator/pull/734

## [25.11.0] - 2025-11-07

Expand Down
98 changes: 50 additions & 48 deletions rust/operator-binary/src/airflow_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,36 @@ pub async fn reconcile_airflow(
role: role_name.to_string(),
})?;

if let Some(GenericRoleConfig {
pod_disruption_budget: pdb,
}) = airflow.role_config(&airflow_role)
{
add_pdbs(&pdb, airflow, &airflow_role, client, &mut cluster_resources)
.await
.context(FailedToCreatePdbSnafu)?;
}

if let Some(listener_class) = airflow_role.listener_class_name(airflow) {
if let Some(listener_group_name) = airflow.group_listener_name(&airflow_role) {
let rg_group_listener = build_group_listener(
airflow,
build_recommended_labels(
airflow,
AIRFLOW_CONTROLLER_NAME,
&resolved_product_image.app_version_label_value,
role_name,
"none",
),
listener_class.to_string(),
listener_group_name,
)?;
cluster_resources
.add(client, rg_group_listener)
.await
.context(ApplyGroupListenerSnafu)?;
}
}

for (rolegroup_name, rolegroup_config) in role_config.iter() {
let rolegroup = RoleGroupRef {
cluster: ObjectRef::from_obj(airflow),
Expand Down Expand Up @@ -587,6 +617,26 @@ pub async fn reconcile_airflow(
rolegroup: rolegroup.clone(),
})?;

let rg_configmap = build_rolegroup_config_map(
airflow,
&resolved_product_image,
&rolegroup,
rolegroup_config,
&authentication_config,
&authorization_config,
&merged_airflow_config.logging,
&Container::Airflow,
)?;
cluster_resources
.add(client, rg_configmap)
.await
.with_context(|_| ApplyRoleGroupConfigSnafu {
rolegroup: rolegroup.clone(),
})?;

// Note: The StatefulSet needs to be applied after all ConfigMaps and Secrets it mounts
// to prevent unnecessary Pod restarts.
// See https://github.com/stackabletech/commons-operator/issues/111 for details.
let rg_statefulset = build_server_rolegroup_statefulset(
airflow,
&resolved_product_image,
Expand All @@ -609,54 +659,6 @@ pub async fn reconcile_airflow(
rolegroup: rolegroup.clone(),
})?,
);

let rg_configmap = build_rolegroup_config_map(
airflow,
&resolved_product_image,
&rolegroup,
rolegroup_config,
&authentication_config,
&authorization_config,
&merged_airflow_config.logging,
&Container::Airflow,
)?;
cluster_resources
.add(client, rg_configmap)
.await
.with_context(|_| ApplyRoleGroupConfigSnafu {
rolegroup: rolegroup.clone(),
})?;
}

let role_config = airflow.role_config(&airflow_role);
if let Some(GenericRoleConfig {
pod_disruption_budget: pdb,
}) = role_config
{
add_pdbs(&pdb, airflow, &airflow_role, client, &mut cluster_resources)
.await
.context(FailedToCreatePdbSnafu)?;
}

if let Some(listener_class) = airflow_role.listener_class_name(airflow) {
if let Some(listener_group_name) = airflow.group_listener_name(&airflow_role) {
let rg_group_listener = build_group_listener(
airflow,
build_recommended_labels(
airflow,
AIRFLOW_CONTROLLER_NAME,
&resolved_product_image.app_version_label_value,
role_name,
"none",
),
listener_class.to_string(),
listener_group_name,
)?;
cluster_resources
.add(client, rg_group_listener)
.await
.context(ApplyGroupListenerSnafu)?;
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions tests/templates/kuttl/smoke/40-assert.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ apiVersion: apps/v1
kind: StatefulSet
metadata:
name: airflow-webserver-default
generation: 1 # There should be no unneeded Pod restarts
spec:
template:
spec:
Expand All @@ -30,6 +31,7 @@ apiVersion: apps/v1
kind: StatefulSet
metadata:
name: airflow-worker-default
generation: 1 # There should be no unneeded Pod restarts
spec:
template:
spec:
Expand All @@ -43,6 +45,7 @@ apiVersion: apps/v1
kind: StatefulSet
metadata:
name: airflow-scheduler-default
generation: 1 # There should be no unneeded Pod restarts
spec:
template:
spec:
Expand All @@ -55,6 +58,7 @@ apiVersion: apps/v1
kind: StatefulSet
metadata:
name: airflow-dagprocessor-default
generation: 1 # There should be no unneeded Pod restarts
spec:
template:
spec:
Expand All @@ -67,6 +71,7 @@ apiVersion: apps/v1
kind: StatefulSet
metadata:
name: airflow-triggerer-default
generation: 1 # There should be no unneeded Pod restarts
spec:
template:
spec:
Expand Down