Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/k8s/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ var ErrClusterEntryNotFound = errors.New("cluster entry not found in kubeconfig"

// ErrAPIServerTimeout is returned when the API server does not become ready within the timeout.
var ErrAPIServerTimeout = errors.New("API server not ready within timeout")

// ErrClusterNotReady is returned when the cluster does not become ready
// (API reachable and a basic authorized read succeeds) within the timeout.
var ErrClusterNotReady = errors.New("cluster not ready within timeout")
13 changes: 13 additions & 0 deletions pkg/k8s/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package k8s

import (
"context"

"k8s.io/client-go/kubernetes"
)

// WaitForAuthorizedReadForTest exposes waitForAuthorizedRead for unit testing
// with a fake clientset, avoiding the need for a real API server.
func WaitForAuthorizedReadForTest(ctx context.Context, clientset kubernetes.Interface) error {
return waitForAuthorizedRead(ctx, clientset)
}
79 changes: 79 additions & 0 deletions pkg/k8s/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,32 @@ import (
"strings"
"time"

"github.com/devantler-tech/ksail/v7/pkg/fsutil"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

const (
// waitForAPIServerTimeout is the maximum time to wait for the API server to become ready.
waitForAPIServerTimeout = 60 * time.Second

// waitForAuthorizedReadTimeout is the maximum time to wait for a basic
// authorized read to succeed after the API server reports ready. This
// covers the authorizer warm-up window where the API server is reachable
// but RBAC checks transiently return 403, and the brief delay before
// built-in resources (e.g. the default namespace's ServiceAccount) are
// reconciled.
waitForAuthorizedReadTimeout = 60 * time.Second

// waitBackoffMultiplier is the exponential backoff multiplier for the wait interval.
waitBackoffMultiplier = 2

// initialPollInterval is the starting backoff interval between readiness
// polls. Kept small so readiness is detected promptly after the API server
// reports ready (and so unit tests don't pay a fixed startup delay); it
// grows by waitBackoffMultiplier up to maxWaitInterval.
initialPollInterval = 100 * time.Millisecond

// maxWaitInterval is the maximum backoff interval between API server readiness polls.
maxWaitInterval = 5 * time.Second
)
Expand Down Expand Up @@ -57,3 +73,66 @@ func WaitForAPIServer(ctx context.Context, kubeconfigPath, contextName string) e
}
Comment thread
devantler marked this conversation as resolved.
}
}

// WaitForClusterReady waits until the cluster is genuinely ready for use: the
// API server is reachable (/readyz returns "ok") AND a basic authorized read
// (listing namespaces) succeeds.
//
// The authorized-read step exists because the API server can report ready
// while the authorizer is still warming up — transiently returning 403
// ("... cannot list namespaces") — and before built-in resources such as the
// default namespace's ServiceAccount have been reconciled. Distributions whose
// Start() returns as soon as the node container is up (Kind, K3d) call this so
// callers get a usable cluster instead of racing the warm-up window.
func WaitForClusterReady(ctx context.Context, kubeconfigPath, contextName string) error {
resolvedPath, err := ResolveKubeconfigPath(kubeconfigPath)
if err != nil {
return fmt.Errorf("resolve kubeconfig path: %w", err)
}

Comment thread
devantler marked this conversation as resolved.
// Canonicalize (resolve symlinks) before loading, since the path may come
// from user/config input — aligns with the repo's path-safety practices.
resolvedPath, err = fsutil.EvalCanonicalPath(resolvedPath)
if err != nil {
return fmt.Errorf("canonicalize kubeconfig path: %w", err)
}

err = WaitForAPIServer(ctx, resolvedPath, contextName)
if err != nil {
return err
}

clientset, err := NewClientset(resolvedPath, contextName)
if err != nil {
return fmt.Errorf("create clientset: %w", err)
}

return waitForAuthorizedRead(ctx, clientset)
}

// waitForAuthorizedRead polls a minimal authorized read (listing namespaces)
// until it succeeds or the timeout elapses. Any error — including a transient
// 403 from the warming-up authorizer — is treated as "not ready yet" and
// retried, since the credentials come from the cluster's own kubeconfig and a
// real authorization failure is not expected here.
func waitForAuthorizedRead(ctx context.Context, clientset kubernetes.Interface) error {
waitCtx, cancel := context.WithTimeout(ctx, waitForAuthorizedReadTimeout)
defer cancel()

interval := initialPollInterval

for {
_, err := clientset.CoreV1().Namespaces().List(waitCtx, metav1.ListOptions{Limit: 1})
if err == nil {
return nil
}

select {
case <-waitCtx.Done():
return fmt.Errorf("%w: %w", ErrClusterNotReady, err)
case <-time.After(interval):
Comment thread
devantler marked this conversation as resolved.
// Exponential backoff capped at maxWaitInterval
interval = min(interval*waitBackoffMultiplier, maxWaitInterval)
}
}
}
105 changes: 105 additions & 0 deletions pkg/k8s/wait_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package k8s_test

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/devantler-tech/ksail/v7/pkg/k8s"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
k8sfake "k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
)

// TestWaitForAuthorizedRead_Succeeds verifies the authorized read returns nil
// as soon as listing namespaces succeeds.
func TestWaitForAuthorizedRead_Succeeds(t *testing.T) {
t.Parallel()

clientset := k8sfake.NewClientset()

err := k8s.WaitForAuthorizedReadForTest(context.Background(), clientset)

require.NoError(t, err)
}

// TestWaitForAuthorizedRead_RetriesTransientForbidden verifies a transient 403
// (authorizer warm-up) is retried rather than treated as fatal, and the wait
// succeeds once the read is authorized.
func TestWaitForAuthorizedRead_RetriesTransientForbidden(t *testing.T) {
t.Parallel()

clientset := k8sfake.NewClientset()

Comment thread
devantler marked this conversation as resolved.
var calls atomic.Int32

clientset.PrependReactor(
"list", "namespaces",
func(_ k8stesting.Action) (bool, runtime.Object, error) {
// Fail the first call with Forbidden, then allow the default
// tracker to serve the (empty) list.
if calls.Add(1) == 1 {
return true, nil, apierrors.NewForbidden(
schema.GroupResource{Resource: "namespaces"},
"",
assert.AnError,
)
}

return false, nil, nil
},
)

err := k8s.WaitForAuthorizedReadForTest(context.Background(), clientset)

require.NoError(t, err)
assert.GreaterOrEqual(t, calls.Load(), int32(2), "expected the forbidden read to be retried")
}

// TestWaitForAuthorizedRead_TimesOut verifies the wait surfaces
// ErrClusterNotReady (wrapping the last error) when the read never succeeds
// before the context deadline.
func TestWaitForAuthorizedRead_TimesOut(t *testing.T) {
t.Parallel()

clientset := k8sfake.NewClientset()
clientset.PrependReactor(
"list", "namespaces",
func(_ k8stesting.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewForbidden(
schema.GroupResource{Resource: "namespaces"},
"",
assert.AnError,
)
},
)

ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()

err := k8s.WaitForAuthorizedReadForTest(ctx, clientset)

require.ErrorIs(t, err, k8s.ErrClusterNotReady)
}

// TestWaitForClusterReady_InvalidKubeconfig verifies the public entry point
// surfaces an error for a bogus context with no reachable API server (the
// API-server wait fails fast against the empty/short deadline).
func TestWaitForClusterReady_InvalidKubeconfig(t *testing.T) {
t.Parallel()

// An empty path resolves to the default kubeconfig; pair it with a
// context that will not exist so client construction or the API-server
// wait fails rather than hanging.
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()

err := k8s.WaitForClusterReady(ctx, t.TempDir()+"/missing-kubeconfig", "does-not-exist")

require.Error(t, err)
}
2 changes: 1 addition & 1 deletion pkg/svc/provisioner/cluster/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ func (f DefaultFactory) createK3dProvisioner( //nolint:funlen // sequential setu
provisioner := k3dprovisioner.CreateProvisioner(
k3dConfig,
tempConfigPath,
)
).WithKubeconfig(cluster.Spec.Cluster.Connection.Kubeconfig)

if f.ComponentDetector != nil {
provisioner.WithComponentDetector(f.ComponentDetector)
Expand Down
15 changes: 15 additions & 0 deletions pkg/svc/provisioner/cluster/k3d/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,21 @@ func (k *Provisioner) WithListClustersRawForTest(
return k
}

// WithWaitForReadyForTest injects a stub readiness waiter so Start can be
// exercised without a live cluster.
func (k *Provisioner) WithWaitForReadyForTest(
f func(ctx context.Context, kubeconfigPath, contextName string) error,
) *Provisioner {
k.waitForReady = f

return k
}

// KubeconfigForTest returns the kubeconfig field for testing purposes.
func (k *Provisioner) KubeconfigForTest() string {
return k.kubeconfig
}

// ParseClusterNamesForTest exposes parseClusterNames for unit testing.
func ParseClusterNamesForTest(output string) ([]string, error) {
return parseClusterNames(output)
Expand Down
57 changes: 52 additions & 5 deletions pkg/svc/provisioner/cluster/k3d/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"sync"

"github.com/devantler-tech/ksail/v7/pkg/k8s"
runner "github.com/devantler-tech/ksail/v7/pkg/runner"
"github.com/devantler-tech/ksail/v7/pkg/svc/detector"
"github.com/devantler-tech/ksail/v7/pkg/svc/provisioner/cluster/clustererr"
Expand All @@ -29,6 +30,10 @@ var (
logrusConfigOnce sync.Once //nolint:gochecknoglobals // Required for one-time logrus initialization
)

// defaultKubeconfigPath is where k3d writes (and updates the current context in)
// the kubeconfig when no explicit path is configured.
const defaultKubeconfigPath = "~/.kube/config"

// Provisioner executes k3d lifecycle commands via Cobra.
type Provisioner struct {
simpleCfg *v1alpha5.SimpleConfig
Expand All @@ -40,6 +45,14 @@ type Provisioner struct {
// defaultListClustersRaw; tests override it via export_test.go.
listClustersRaw func(ctx context.Context) (string, error)
componentDetector *detector.ComponentDetector
// kubeconfig is the path k3d writes the cluster's kubeconfig to. Used to
// build a client for the post-start readiness wait. Defaults to
// defaultKubeconfigPath; override via WithKubeconfig.
kubeconfig string
// waitForReady blocks until the cluster is genuinely ready (API reachable
// and a basic authorized read succeeds). It is a seam so tests can run
// Start() without a live cluster. Defaults to k8s.WaitForClusterReady.
waitForReady func(ctx context.Context, kubeconfigPath, contextName string) error
}

// NewProvisioner constructs a new command-backed provisioner.
Expand All @@ -62,15 +75,29 @@ func NewProvisioner(
})

prov := &Provisioner{
simpleCfg: simpleCfg,
configPath: configPath,
runner: runner.NewCobraCommandRunner(nil, nil),
simpleCfg: simpleCfg,
configPath: configPath,
runner: runner.NewCobraCommandRunner(nil, nil),
kubeconfig: defaultKubeconfigPath,
waitForReady: k8s.WaitForClusterReady,
}
prov.listClustersRaw = prov.defaultListClustersRaw

return prov
}

// WithKubeconfig sets the kubeconfig path used to build a client for the
// post-start readiness wait. An empty path leaves the default unchanged.
// Returns the provisioner for chaining.
func (k *Provisioner) WithKubeconfig(path string) *Provisioner {
trimmed := strings.TrimSpace(path)
if trimmed != "" {
k.kubeconfig = trimmed
}

return k
}
Comment thread
devantler marked this conversation as resolved.

// Create provisions a k3d cluster using the native Cobra command.
func (k *Provisioner) Create(ctx context.Context, name string) error {
args := k.appendConfigFlag(nil)
Expand Down Expand Up @@ -117,16 +144,36 @@ func (k *Provisioner) Delete(ctx context.Context, name string) error {
)
}

// Start resumes a stopped k3d cluster via Cobra.
// Start resumes a stopped k3d cluster via Cobra, then waits for the cluster to
// be genuinely ready (API reachable + a basic authorized read succeeds) so
// callers get a usable cluster rather than one that races the API server's
// authorizer warm-up.
func (k *Provisioner) Start(ctx context.Context, name string) error {
return k.runLifecycleCommand(
err := k.runLifecycleCommand(
ctx,
clustercommand.NewCmdClusterStart,
nil,
name,
"cluster start",
nil,
)
if err != nil {
return err
}

target := k.resolveName(name)
if target == "" {
// Without a resolved name we cannot derive the kubeconfig context, so
// fall back to the prior behavior of returning once k3d reports started.
return nil
}

err = k.waitForReady(ctx, k.kubeconfig, "k3d-"+target)
if err != nil {
return fmt.Errorf("wait for k3d cluster ready: %w", err)
}

return nil
}

// Stop halts a running k3d cluster via Cobra.
Expand Down
Loading
Loading