Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion cmd/milo/controller-manager/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ import (
quotav1alpha1 "go.miloapis.com/milo/pkg/apis/quota/v1alpha1"
resourcemanagerv1alpha1 "go.miloapis.com/milo/pkg/apis/resourcemanager/v1alpha1"
miloprovider "go.miloapis.com/milo/pkg/multicluster-runtime/milo"
"go.miloapis.com/milo/internal/controllers/projectprovider"
milowebhook "go.miloapis.com/milo/pkg/webhook"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
)
Expand Down Expand Up @@ -304,6 +305,11 @@ func NewCommand() *cobra.Command {

fs.IntVar(&s.ControllerRuntimeWebhookPort, "controller-runtime-webhook-port", 9443, "The port to use for the controller-runtime webhook server.")

fs.IntVar(&s.ProjectProvider.Workers, "project-provider-workers", s.ProjectProvider.Workers, "Number of concurrent workers that process project additions.")
fs.IntVar(&s.ProjectProvider.MaxRetries, "project-provider-max-retries", s.ProjectProvider.MaxRetries, "Maximum retry attempts for a failed project addition before giving up.")
fs.Float64Var(&s.ProjectProvider.RateLimit, "project-provider-rate-limit", s.ProjectProvider.RateLimit, "Sustained per-second rate at which projects are added.")
fs.IntVar(&s.ProjectProvider.RateBurst, "project-provider-rate-burst", s.ProjectProvider.RateBurst, "Burst allowance for the project addition rate limiter.")

fs.StringVar(&AssignableRolesNamespace, "assignable-roles-namespace", "datum-cloud", "An extra namespace that the system allows to be used for assignable roles.")

s.InfraCluster.AddFlags(namedFlagSets.FlagSet("Infrastructure Cluster"))
Expand Down Expand Up @@ -341,6 +347,10 @@ type Options struct {

// The port to use for the controller-runtime webhook server.
ControllerRuntimeWebhookPort int

// ProjectProvider holds tunable parameters for the project provider's
// rate-limiting and retry behaviour during bulk project onboarding.
ProjectProvider projectprovider.Config
}

// NewOptions creates a new Options object with default values.
Expand All @@ -355,7 +365,8 @@ func NewOptions() (*Options, error) {
InfraCluster: &infracluster.Options{
KubeconfigFile: baseOpts.Generic.ClientConnection.Kubeconfig,
},
ControlPlane: &controlplane.Options{},
ControlPlane: &controlplane.Options{},
ProjectProvider: projectprovider.DefaultConfig(),
}

return opts, nil
Expand Down Expand Up @@ -415,6 +426,7 @@ func Run(ctx context.Context, c *config.CompletedConfig, opts *Options) error {
logger.Error(err, "Error building controller context")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
controllerContext.ProjectProviderConfig = opts.ProjectProvider

// Create a controller manager for the core control plane.
//
Expand Down Expand Up @@ -953,6 +965,9 @@ type ControllerContext struct {

// GraphBuilder gives an access to dependencyGraphBuilder which keeps tracks of resources in the cluster
GraphBuilder *garbagecollector.GraphBuilder

// ProjectProviderConfig carries tunable parameters for the project provider.
ProjectProviderConfig projectprovider.Config
}

// IsControllerEnabled checks if the context's controllers enabled or not
Expand Down
4 changes: 2 additions & 2 deletions cmd/milo/controller-manager/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func startModifiedNamespaceController(ctx context.Context, controllerContext Con
Finalizer: v1.FinalizerKubernetes,
}

prov, err := projectprovider.New(nsKubeconfig, sink)
prov, err := projectprovider.New(nsKubeconfig, sink, controllerContext.ProjectProviderConfig)
if err != nil {
return nil, true, err
}
Expand Down Expand Up @@ -134,7 +134,7 @@ func startGarbageCollectorController(ctx context.Context, controllerContext Cont
InformersStarted: controllerContext.InformersStarted,
InitialSyncPeriod: 30 * time.Second,
}
prov, err := projectprovider.New(cfg, gcSink)
prov, err := projectprovider.New(cfg, gcSink, controllerContext.ProjectProviderConfig)
if err != nil {
return nil, true, fmt.Errorf("failed to start project provider for GC: %w", err)
}
Expand Down
158 changes: 107 additions & 51 deletions internal/controllers/garbagecollector/garbagecollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ type GarbageCollector struct {

dependencyGraphBuilders []*GraphBuilder
cancels map[string]context.CancelFunc

// resyncNeeded is set when a new per-project builder is added via
// AddProject. It forces the next Sync tick to resync all builders even
// if root discovery reports the same resource set, covering the case
// where the new builder was seeded with incomplete monitors.
resyncNeeded bool
}

var _ controller.Interface = (*GarbageCollector)(nil)
Expand All @@ -93,17 +99,22 @@ func (gc *GarbageCollector) AddProject(
discover discovery.ServerResourcesInterface,
initialSyncTimeout time.Duration,
) error {
// ensure maps
gc.mu.Lock()
if gc.cancels == nil {
gc.cancels = make(map[string]context.CancelFunc)
}
// Skip projects that are already registered (idempotent for retries).
if _, exists := gc.cancels[project]; exists {
gc.mu.Unlock()
return nil
}
gc.mu.Unlock()

logger := klog.FromContext(parent)

// Reuse shared queues/cache from GC (created from the root GB).
atd, ato, absent := gc.attemptToDelete, gc.attemptToOrphan, gc.absentOwnerCache

// Build per-partition GraphBuilder using shared plumbing + shared broadcaster.
gb := NewDependencyGraphBuilderWithShared(
parent,
md,
Expand All @@ -114,29 +125,38 @@ func (gc *GarbageCollector) AddProject(
atd,
ato,
absent,
gc.eventBroadcaster, // share events across partitions
gc.eventBroadcaster,
)
gb.SetProject(project)

// Track and start
// Seed monitors BEFORE registering the builder. If discovery or monitor
// creation fails (e.g. apiserver throttling during startup burst), the
// builder is never registered and the caller can retry cleanly.
newResources, err := GetDeletableResources(logger, discover)
if err != nil {
logger.V(2).Info("GC: partial discovery for project", "project", project, "error", err)
}
if len(newResources) == 0 {
return fmt.Errorf("gc(%s): discovery returned no resources", project)
}
if err := gb.syncMonitors(logger, newResources); err != nil {
return fmt.Errorf("gc(%s): syncMonitors: %w", project, err)
}

// Monitors created successfully — register the builder and start it.
ctx, cancel := context.WithCancel(parent)

gc.mu.Lock()
gc.dependencyGraphBuilders = append(gc.dependencyGraphBuilders, gb)
gc.cancels[project] = cancel
gc.resyncNeeded = true
gc.mu.Unlock()

go gb.Run(ctx)
partitionCount.Inc()
partitionMonitorCount.WithLabelValues(project).Set(float64(monitorCountForBuilder(gb)))

// Seed monitors for this partition immediately
logger := klog.FromContext(parent)
newResources, _ := GetDeletableResources(logger, discover)
if err := gb.syncMonitors(logger, newResources); err != nil {
cancel()
return fmt.Errorf("gc(%s): syncMonitors: %w", project, err)
}
gb.startMonitors(logger)
go gb.Run(ctx)

// Optional: wait briefly for first sync to reduce initial GC latency
ok := cache.WaitForNamedCacheSync(
"gc-"+project,
waitForStopOrTimeout(ctx.Done(), initialSyncTimeout),
Expand All @@ -146,6 +166,8 @@ func (gc *GarbageCollector) AddProject(
logger.Info("GC: partition monitors not fully synced; continuing", "project", project)
}

partitionSynced.WithLabelValues(project).Set(boolToFloat(ok))

return nil
}

Expand All @@ -155,7 +177,6 @@ func (gc *GarbageCollector) RemoveProject(project string) {
cancel()
delete(gc.cancels, project)
}
// drop from slice
dst := gc.dependencyGraphBuilders[:0]
for _, gb := range gc.dependencyGraphBuilders {
if gb.project != project {
Expand All @@ -164,6 +185,10 @@ func (gc *GarbageCollector) RemoveProject(project string) {
}
gc.dependencyGraphBuilders = dst
gc.mu.Unlock()

partitionCount.Dec()
partitionMonitorCount.Delete(map[string]string{"project": project})
partitionSynced.Delete(map[string]string{"project": project})
}

// NewGarbageCollector creates a new GarbageCollector.
Expand Down Expand Up @@ -217,6 +242,7 @@ func NewComposedGarbageCollectorMulti(
}

metrics.Register()
registerPartitionMetricsOnce()
return gc, nil
}

Expand All @@ -234,6 +260,9 @@ func (gc *GarbageCollector) resyncMonitors(
return err
}
gb.startMonitors(logger)
if gb.project != "" {
partitionMonitorCount.WithLabelValues(gb.project).Set(float64(monitorCountForBuilder(gb)))
}
}
return nil
}
Expand Down Expand Up @@ -321,48 +350,75 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery.
oldResources := make(map[schema.GroupVersionResource]struct{})

wait.UntilWithContext(ctx, func(ctx context.Context) {
logger := klog.FromContext(ctx)

// 1) Discover deletable resources
newResources, err := GetDeletableResources(logger, discoveryClient)
if len(newResources) == 0 {
logger.V(2).Info("no resources reported by discovery, skipping garbage collector sync")
metrics.GarbageCollectorResourcesSyncError.Inc()
return
newOld, ok := gc.syncOnce(ctx, discoveryClient, oldResources, period)
if ok {
oldResources = newOld
}
}, period)
}

// 2) Handle partial discovery: keep already-synced monitors for failed groups
if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure {
for k, v := range oldResources {
if _, failed := groupLookupFailures[k.GroupVersion()]; failed && gc.anyBuilderResourceSynced(k) {
newResources[k] = v
}
// syncOnce performs a single sync cycle. It returns the resource set to
// remember and true when a resync was performed successfully. If the tick
// was skipped or the resync failed, it returns nil/false so the caller
// retains the previous oldResources.
func (gc *GarbageCollector) syncOnce(
ctx context.Context,
discoveryClient discovery.ServerResourcesInterface,
oldResources map[schema.GroupVersionResource]struct{},
waitPeriod time.Duration,
) (map[schema.GroupVersionResource]struct{}, bool) {
logger := klog.FromContext(ctx)

// 1) Discover deletable resources
newResources, err := GetDeletableResources(logger, discoveryClient)
if len(newResources) == 0 {
logger.V(2).Info("no resources reported by discovery, skipping garbage collector sync")
metrics.GarbageCollectorResourcesSyncError.Inc()
return nil, false
}

// 2) Handle partial discovery: keep already-synced monitors for failed groups
if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure {
for k, v := range oldResources {
if _, failed := groupLookupFailures[k.GroupVersion()]; failed && gc.anyBuilderResourceSynced(k) {
newResources[k] = v
}
}
}

// 3) Short-circuit if nothing changed
if reflect.DeepEqual(oldResources, newResources) {
logger.V(5).Info("no resource updates from discovery, skipping garbage collector sync")
return
}
// 3) Short-circuit if nothing changed AND no new builders need resyncing
gc.mu.RLock()
forceResync := gc.resyncNeeded
gc.mu.RUnlock()

logger.V(2).Info("syncing garbage collector with updated resources from discovery",
"diff", printDiff(oldResources, newResources))
if !forceResync && reflect.DeepEqual(oldResources, newResources) {
logger.V(5).Info("no resource updates from discovery, skipping garbage collector sync")
return nil, false
}

// 4) Reset REST mapper (invalidates its underlying discovery cache)
gc.restMapper.Reset()
logger.V(4).Info("reset restmapper")
logger.V(2).Info("syncing garbage collector with updated resources from discovery",
"diff", printDiff(oldResources, newResources),
"forceResync", forceResync)

// 5) Resync monitors across ALL builders
if err := gc.resyncMonitors(logger, newResources); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %w", err))
metrics.GarbageCollectorResourcesSyncError.Inc()
return
}
logger.V(4).Info("resynced monitors")
// 4) Reset REST mapper (invalidates its underlying discovery cache)
gc.restMapper.Reset()
logger.V(4).Info("reset restmapper")

// 5) Resync monitors across ALL builders
if err := gc.resyncMonitors(logger, newResources); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %w", err))
metrics.GarbageCollectorResourcesSyncError.Inc()
return nil, false
}
logger.V(4).Info("resynced monitors")

// 6) Periodically check that ALL builders report cache synced (for logs/metrics)
cacheSynced := cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), period), func() bool {
gc.mu.Lock()
gc.resyncNeeded = false
gc.mu.Unlock()

// 6) Periodically check that ALL builders report cache synced (for logs/metrics)
if waitPeriod > 0 {
cacheSynced := cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), waitPeriod), func() bool {
for _, gb := range gc.dependencyGraphBuilders {
if !gb.IsSynced(logger) {
return false
Expand All @@ -376,10 +432,10 @@ func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery.
utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync"))
metrics.GarbageCollectorResourcesSyncError.Inc()
}
}

// 7) Remember current resource set
oldResources = newResources
}, period)
// 7) Remember current resource set
return newResources, true
}

// printDiff returns a human-readable summary of what resources were added and removed
Expand Down
Loading
Loading