Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions cmd/cloudflared/tunnel/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tunnel
import (
"bufio"
"context"
"encoding/json"
"fmt"
"net/url"
"os"
Expand Down Expand Up @@ -32,6 +33,7 @@ import (
"github.com/cloudflare/cloudflared/diagnostic"
"github.com/cloudflare/cloudflared/edgediscovery"
"github.com/cloudflare/cloudflared/ingress"
"github.com/cloudflare/cloudflared/k8s"
"github.com/cloudflare/cloudflared/logger"
"github.com/cloudflare/cloudflared/management"
"github.com/cloudflare/cloudflared/metrics"
Expand Down Expand Up @@ -174,6 +176,7 @@ func Commands() []*cli.Command {
buildCleanupCommand(),
buildTokenCommand(),
buildDiagCommand(),
buildKubernetesSubcommand(),
proxydns.Command(), // removed feature, only here for error message
cliutil.RemovedCommand("db-connect"),
}
Expand Down Expand Up @@ -445,6 +448,45 @@ func StartServer(
return err
}

// Start Kubernetes service watcher if enabled
cfg := config.GetConfiguration()
if cfg.Kubernetes.Enabled {
k8sCfg := &k8s.Config{
Enabled: cfg.Kubernetes.Enabled,
Namespace: cfg.Kubernetes.Namespace,
BaseDomain: cfg.Kubernetes.BaseDomain,
KubeconfigPath: cfg.Kubernetes.KubeconfigPath,
ExposeAPIServer: cfg.Kubernetes.ExposeAPIServer,
APIServerHostname: cfg.Kubernetes.APIServerHostname,
LabelSelector: cfg.Kubernetes.LabelSelector,
}
if err := k8sCfg.Validate(); err != nil {
log.Warn().Err(err).Msg("Kubernetes config validation failed, watcher will not start")
} else {
k8sWatcher := k8s.NewWatcher(k8sCfg, log, func(services []k8s.ServiceInfo) {
log.Info().Int("count", len(services)).Msg("Kubernetes service change detected, updating ingress rules")
k8sRules := k8s.GenerateIngressRules(services, log)
updatedIngress := k8s.MergeWithExistingRules(cfg.Ingress, k8sRules)
newConfigBytes, err := json.Marshal(ingress.RemoteConfigJSON{
IngressRules: updatedIngress,
WarpRouting: cfg.WarpRouting,
})
if err != nil {
log.Err(err).Msg("Failed to marshal updated K8s ingress config")
return
}
resp := orchestrator.UpdateK8sConfig(newConfigBytes)
if resp.Err != nil {
log.Err(resp.Err).Msg("Failed to apply K8s ingress config update")
} else {
log.Info().Int("services", len(services)).Msg("Successfully applied K8s ingress config update")
}
})
go k8sWatcher.Run(ctx)
log.Info().Msg("Kubernetes service watcher started")
}
}

metricsListener, err := metrics.CreateMetricsListener(&listeners, c.String("metrics"))
if err != nil {
log.Err(err).Msg("Error opening metrics server listener")
Expand Down
31 changes: 31 additions & 0 deletions cmd/cloudflared/tunnel/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cloudflare/cloudflared/features"
"github.com/cloudflare/cloudflared/ingress"
"github.com/cloudflare/cloudflared/ingress/origins"
"github.com/cloudflare/cloudflared/k8s"
"github.com/cloudflare/cloudflared/orchestration"
"github.com/cloudflare/cloudflared/supervisor"
"github.com/cloudflare/cloudflared/tlsconfig"
Expand Down Expand Up @@ -151,6 +152,36 @@ func prepareTunnelConfig(
}

cfg := config.GetConfiguration()

// If Kubernetes integration is enabled, discover services and merge with config
if cfg.Kubernetes.Enabled {
k8sCfg := &k8s.Config{
Enabled: cfg.Kubernetes.Enabled,
Namespace: cfg.Kubernetes.Namespace,
BaseDomain: cfg.Kubernetes.BaseDomain,
KubeconfigPath: cfg.Kubernetes.KubeconfigPath,
ExposeAPIServer: cfg.Kubernetes.ExposeAPIServer,
APIServerHostname: cfg.Kubernetes.APIServerHostname,
LabelSelector: cfg.Kubernetes.LabelSelector,
}
if err := k8sCfg.Validate(); err != nil {
log.Warn().Err(err).Msg("Kubernetes integration config validation failed, skipping K8s discovery")
} else {
// Use a timeout so K8s discovery doesn't block tunnel startup indefinitely.
k8sCtx, k8sCancel := context.WithTimeout(ctx, 30*time.Second)
services, err := k8s.DiscoverServices(k8sCtx, k8sCfg, log)
k8sCancel()
if err != nil {
log.Warn().Err(err).Msg("Failed to discover Kubernetes services, continuing without K8s rules")
} else if len(services) > 0 {
k8sRules := k8s.GenerateIngressRules(services, log)
cfg.Ingress = k8s.MergeWithExistingRules(cfg.Ingress, k8sRules)
log.Info().Int("k8sServices", len(services)).Int("totalRules", len(cfg.Ingress)).
Msg("Merged Kubernetes-discovered services into ingress rules")
}
}
}

ingressRules, err := ingress.ParseIngressFromConfigAndCLI(cfg, c, log)
if err != nil {
return nil, nil, err
Expand Down
Loading