Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions crates/stackable-operator/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ All notable changes to this project will be documented in this file.
- Add support for specifying a `clientAuthenticationMethod` for OIDC ([#1178]).
This was originally done in [#1158] and had been reverted in [#1170].

## Changed

- Check self RBAC permissions before attempting to list types when garbage collecting possible orphans ([#1179]).
This clears up an error message that was logged when operators tried to list types when they had no permission to do so.

[#1178]: https://github.com/stackabletech/operator-rs/pull/1178
[#1179]: https://github.com/stackabletech/operator-rs/pull/1179

## [0.108.0] - 2026-03-10

Expand Down
115 changes: 114 additions & 1 deletion crates/stackable-operator/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
use std::{
any::TypeId,
collections::HashMap,
convert::TryFrom,
fmt::{Debug, Display},
sync::{Arc, RwLock},
time::{Duration, Instant},
};

use either::Either;
use futures::StreamExt;
use k8s_openapi::{
ClusterResourceScope, NamespaceResourceScope, apimachinery::pkg::apis::meta::v1::LabelSelector,
ClusterResourceScope, NamespaceResourceScope,
api::authorization::v1::{
ResourceAttributes, SelfSubjectAccessReview, SelfSubjectAccessReviewSpec,
},
apimachinery::pkg::apis::meta::v1::LabelSelector,
};
use kube::{
Api, Config,
Expand All @@ -28,6 +36,15 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(Debug, Snafu)]
pub enum Error {
// The following two variants cannot have a `source` field,
// because that would require a leak of the `self` reference outside of can_list() function
// which would not compile.
#[snafu(display("unable to write cache of object list permissions"))]
ListCachePermissionsWrite,

#[snafu(display("unable to read cache of object list permissions"))]
ListCachePermissionsRead,

#[snafu(display("unable to get resource {resource_name:?}"))]
GetResource {
source: kube::Error,
Expand Down Expand Up @@ -91,6 +108,10 @@ pub enum Error {
},
}

// Type that maps (resource type, namespace) to (can list, cached at).
// This type is needed to silence clippy warnings about type complexity of the `list_permissions` field in `Client`.
type ListableResourceMap = HashMap<(TypeId, String), (bool, Instant)>;

/// This `Client` can be used to access Kubernetes.
/// It wraps an underlying [kube::client::Client] and provides some common functionality.
#[derive(Clone)]
Expand All @@ -103,8 +124,17 @@ pub struct Client {
pub default_namespace: String,

pub kubernetes_cluster_info: KubernetesClusterInfo,

/// Cache of `SelfSubjectAccessReview` results keyed by (resource type, namespace).
list_permissions: Arc<RwLock<ListableResourceMap>>,
}

/// How long a cached `SelfSubjectAccessReview` result is considered valid.
/// A TTL is used rather than caching indefinitely because RBAC rules can change at runtime
/// (e.g. an admin updates a `ClusterRole`), and we want to pick up such changes eventually
/// without requiring an operator restart.
const LIST_PERMISSION_TTL: Duration = Duration::from_secs(300);

impl Client {
pub fn new(
client: KubeClient,
Expand All @@ -125,6 +155,7 @@ impl Client {
delete_params: DeleteParams::default(),
default_namespace,
kubernetes_cluster_info,
list_permissions: Arc::default(),
}
}

Expand Down Expand Up @@ -520,6 +551,88 @@ impl Client {
Api::all(self.client.clone())
}

/// Returns whether the current service account is allowed to `list` resources of type `T`
/// in the given `namespace`, by performing a [`SelfSubjectAccessReview`].
///
/// Results are cached per (resource type, namespace) pair to avoid a SAR API call on every
/// reconciliation. The cache has a TTL of [`LIST_PERMISSION_TTL`] so that RBAC changes made
/// at runtime are eventually picked up without requiring an operator restart.
///
/// If the review request itself fails (e.g. due to a network error), this returns `true` so
/// that callers fall back to attempting the operation and handling any resulting error.
/// Failures are intentionally not cached: a transient error should not suppress deletion
/// for the full TTL duration.
pub async fn can_list<T>(&self, namespace: &str) -> Result<bool, Error>
where
T: Resource<DynamicType = ()> + 'static,
{
let key = (TypeId::of::<T>(), namespace.to_string());

// This nested block is necessary to ensure the cache lock is dropped before the write() call below to avoid deadlocks.
// Alternatively a drop(cache) call could be used but this is more idiomatic.
{
let cache = self
.list_permissions
.read()
.map_err(|_| Error::ListCachePermissionsRead)?;
if let Some(&(allowed, cached_at)) = cache.get(&key)
&& cached_at.elapsed() < LIST_PERMISSION_TTL
{
tracing::debug!(
allowed = allowed,
namespace = namespace,
type_name = std::any::type_name::<T>(),
"object list permission from cache",
);

return Ok(allowed);
}
}

let sar = SelfSubjectAccessReview {
spec: SelfSubjectAccessReviewSpec {
resource_attributes: Some(ResourceAttributes {
namespace: Some(namespace.to_string()),
verb: Some("list".to_string()),
group: Some(T::group(&()).to_string()),
resource: Some(T::plural(&()).to_string()),
..Default::default()
}),
..Default::default()
},
..Default::default()
};

let api: Api<SelfSubjectAccessReview> = Api::all(self.client.clone());
let allowed = match api.create(&PostParams::default(), &sar).await {
Ok(response) => {
let allowed = response.status.map(|s| s.allowed).unwrap_or(false);
self.list_permissions
.write()
.map_err(|_| Error::ListCachePermissionsWrite)?
.insert(key, (allowed, Instant::now()));
allowed
}
Err(err) => {
tracing::error!(
namespace = namespace,
type_name = std::any::type_name::<T>(),
error = ?err,
"failed to perform SelfSubjectAccessReview, assuming list is allowed",
);
true
}
};

tracing::debug!(
allowed = allowed,
namespace = namespace,
type_name = std::any::type_name::<T>(),
"object list permissions",
);
Ok(allowed)
}

#[deprecated(note = "Use Api::get_api instead", since = "0.26.0")]
pub fn get_namespaced_api<T>(&self, namespace: &str) -> Api<T>
where
Expand Down
85 changes: 44 additions & 41 deletions crates/stackable-operator/src/cluster_resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use k8s_openapi::{
use kube::{Resource, ResourceExt};
use serde::{Serialize, de::DeserializeOwned};
use snafu::{OptionExt, ResultExt, Snafu};
use tracing::{debug, info, warn};
use tracing::{debug, warn};

use crate::{
client::{Client, GetApi},
Expand Down Expand Up @@ -105,6 +105,7 @@ pub trait ClusterResource:
+ Resource<DynamicType = (), Scope = NamespaceResourceScope>
+ GetApi<Namespace = str>
+ Serialize
+ 'static
{
/// This must be implemented for any [`ClusterResources`] that should be adapted before
/// applying depending on the chosen [`ClusterResourceApplyStrategy`].
Expand Down Expand Up @@ -713,54 +714,56 @@ impl<'a> ClusterResources<'a> {
client: &Client,
) -> Result<()> {
if !self.apply_strategy.delete_orphans() {
debug!(
"Skip deleting orphaned resources because of [{}] strategy.",
self.apply_strategy
tracing::debug!(
apply_strategy = ?self.apply_strategy,
"skip deleting orphaned resources because of strategy.",
);
return Ok(());
}

match self.list_deployed_cluster_resources::<T>(client).await {
Ok(deployed_cluster_resources) => {
let mut orphaned_resources = Vec::new();

for resource in deployed_cluster_resources {
let resource_id = resource.uid().context(MissingObjectKeySnafu {
key: "metadata/uid",
})?;
if !self.resource_ids.contains(&resource_id) {
orphaned_resources.push(resource);
}
}

if !orphaned_resources.is_empty() {
info!(
"Deleting orphaned {}: {}",
T::plural(&()),
ClusterResources::print_resources(&orphaned_resources),
);
for resource in orphaned_resources.iter() {
client
.delete(resource)
.await
.context(DeleteOrphanedResourceSnafu)?;
}
}
if !client
.can_list::<T>(&self.namespace)
.await
.context(ListClusterResourcesSnafu)?
{
tracing::debug!(
type_name = std::any::type_name::<T>(),
"skipping deletion of orphans of this type because the operator is not allowed to list them",
);
return Ok(());
}

Ok(())
let deployed_cluster_resources = self
.list_deployed_cluster_resources::<T>(client)
.await
.context(ListClusterResourcesSnafu)?;

let mut orphaned_resources = Vec::new();

for resource in deployed_cluster_resources {
let resource_id = resource.uid().context(MissingObjectKeySnafu {
key: "metadata/uid",
})?;
if !self.resource_ids.contains(&resource_id) {
orphaned_resources.push(resource);
}
Err(crate::client::Error::ListResources {
source: kube::Error::Api(s),
}) if s.is_forbidden() => {
debug!(
"Skipping deletion of orphaned {} because the operator is not allowed to list \
them and is therefore probably not in charge of them.",
T::plural(&())
);
Ok(())
}

if !orphaned_resources.is_empty() {
tracing::info!(
type_name = std::any::type_name::<T>(),
orphans = ClusterResources::print_resources(&orphaned_resources),
"deleting orphans",
);
for resource in orphaned_resources.iter() {
client
.delete(resource)
.await
.context(DeleteOrphanedResourceSnafu)?;
}
Err(error) => Err(error).context(ListClusterResourcesSnafu),
}

Ok(())
}

/// Creates a string containing the names and if present namespaces of the given resources
Expand Down
Loading