Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 225 additions & 14 deletions test/extended/router/config_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"strconv"
"strings"
"time"

Expand All @@ -13,9 +14,12 @@ import (
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
e2eoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
"k8s.io/pod-security-admission/api"
utilpointer "k8s.io/utils/pointer"

routev1 "github.com/openshift/api/route/v1"
Expand Down Expand Up @@ -46,12 +50,17 @@ var _ = g.Describe("[sig-network][Feature:Router][apigroup:route.openshift.io]",
}
})

oc = exutil.NewCLI("router-config-manager")
const ROUTER_BLUEPRINT_ROUTE_POOL_SIZE = 3
const ROUTER_MAX_DYNAMIC_SERVERS = 2

// Defines the number of services named `insecure-concurrent-service-NN`, one replica each
const NUM_CONCURRENT_SERVICES = ROUTER_BLUEPRINT_ROUTE_POOL_SIZE + 1
// Defines the number of replicas the named service `insecure-concurrent-service-replicas` should have
const NUM_CONCURRENT_REPLICAS = ROUTER_MAX_DYNAMIC_SERVERS + 1

oc = exutil.NewCLIWithPodSecurityLevel("router-config-manager", api.LevelPrivileged)

g.BeforeEach(func() {
// the test has been skipped since July 2018 because it was flaking.
// TODO: Fix the test and re-enable it in https://issues.redhat.com/browse/NE-906.
g.Skip("HAProxy dynamic config manager tests skipped in 4.x")
ns = oc.Namespace()

routerImage, err := exutil.FindRouterImage(oc)
Expand Down Expand Up @@ -153,6 +162,46 @@ http {
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "insecure-concurrent-service-replicas",
Labels: map[string]string{
"test": "router",
},
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"test": "haproxy-cfgmgr",
"endpoints": "insecure-concurrent-endpoint-replicas",
},
Ports: []corev1.ServicePort{
{
Port: 9376,
},
},
},
},
}
for i := range NUM_CONCURRENT_SERVICES {
services = append(services, corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("insecure-concurrent-service-%d", i),
Labels: map[string]string{
"test": "router",
},
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"test": "haproxy-cfgmgr",
"endpoints": fmt.Sprintf("insecure-concurrent-endpoint-%d", i),
},
Ports: []corev1.ServicePort{
{
Port: 9376,
},
},
},
})
}

for _, service := range services {
Expand Down Expand Up @@ -350,6 +399,14 @@ http {
},
},
},
{
Name: "ROUTER_BLUEPRINT_ROUTE_POOL_SIZE",
Value: strconv.Itoa(ROUTER_BLUEPRINT_ROUTE_POOL_SIZE),
},
{
Name: "ROUTER_MAX_DYNAMIC_SERVERS",
Value: strconv.Itoa(ROUTER_MAX_DYNAMIC_SERVERS),
},
},
Args: []string{
"--namespace=$(POD_NAMESPACE)",
Expand Down Expand Up @@ -462,7 +519,7 @@ http {
Name: "cert",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: "service-cert",
SecretName: "serving-cert",
},
},
},
Expand All @@ -482,6 +539,61 @@ http {
},
},
}
for i := range NUM_CONCURRENT_SERVICES {
routerPods = append(routerPods, corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("insecure-concurrent-endpoint-%d", i),
Labels: map[string]string{
"test": "haproxy-cfgmgr",
"endpoints": fmt.Sprintf("insecure-concurrent-endpoint-%d", i),
},
},
Spec: corev1.PodSpec{
TerminationGracePeriodSeconds: utilpointer.Int64(1),
Containers: []corev1.Container{
{
Name: "test",
Image: image.LocationFor("registry.k8s.io/e2e-test-images/agnhost:2.56"),
Args: []string{"serve-hostname"},
Ports: []corev1.ContainerPort{
{
ContainerPort: 9376,
Name: "http",
},
},
},
},
},
})
}
for i := range NUM_CONCURRENT_REPLICAS {
routerPods = append(routerPods, corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("insecure-concurrent-endpoint-replicas-%d", i),
Labels: map[string]string{
"test": "haproxy-cfgmgr",
// this is the service selector, but added instead by the test itself.
// "endpoints": "insecure-concurrent-endpoint-replicas",
},
},
Spec: corev1.PodSpec{
TerminationGracePeriodSeconds: utilpointer.Int64(1),
Containers: []corev1.Container{
{
Name: "test",
Image: image.LocationFor("registry.k8s.io/e2e-test-images/agnhost:2.56"),
Args: []string{"serve-hostname"},
Ports: []corev1.ContainerPort{
{
ContainerPort: 9376,
Name: "http",
},
},
},
},
},
})
}

for _, pod := range routerPods {
_, err = oc.AdminKubeClient().CoreV1().Pods(ns).Create(context.Background(), &pod, metav1.CreateOptions{})
Expand All @@ -491,9 +603,6 @@ http {

g.Describe("The HAProxy router", func() {
g.It("should serve the correct routes when running with the haproxy config manager", func() {
// the test has been skipped since July 2018 because it was flaking.
// TODO: Fix the test and re-enable it in https://issues.redhat.com/browse/NE-906.
g.Skip("HAProxy dynamic config manager tests skipped in 4.x")
ns := oc.KubeFramework().Namespace.Name
execPod := exutil.CreateExecPodOrFail(oc.AdminKubeClient(), ns, "execpod")
defer func() {
Expand Down Expand Up @@ -528,20 +637,84 @@ http {
o.Expect(err).NotTo(o.HaveOccurred())
}

g.By("adding routes beyond the number of blueprint backends")
var pendingRoutes []string
for i := range NUM_CONCURRENT_SERVICES {
// our NUM_CONCURRENT_SERVICES is already configured to go beyond the available blueprint backends

name := fmt.Sprintf("hapcm-insecure-concurrent-service-%d", i)
serviceName := fmt.Sprintf("insecure-concurrent-service-%d", i)
hostName := fmt.Sprintf("insecure-concurrent-%d.hapcm.test", i)

err := createRoute(oc, routeTypeInsecure, name, serviceName, hostName, "/")
o.Expect(err).NotTo(o.HaveOccurred())
pendingRoutes = append(pendingRoutes, name)

err = waitForRouteToRespond(ns, execPod.Name, "http", hostName, "/", routerIP, 0)
o.Expect(err).NotTo(o.HaveOccurred())
}

g.By("adding replicas beyond the number of blueprint slots per backend")
{
name := "hapcm-insecure-concurrent-service-replicas"
serviceName := "insecure-concurrent-service-replicas"
hostName := "insecure-concurrent-replicas.hapcm.test"
err := createRoute(oc, routeTypeInsecure, name, serviceName, hostName, "/")
o.Expect(err).NotTo(o.HaveOccurred())
pendingRoutes = append(pendingRoutes, name)

var expectedBackendServersCount int
endpointReplicaLabelUpdate := []byte(`{"metadata":{"labels":{"endpoints":"insecure-concurrent-endpoint-replicas"}}}`)
for i := range NUM_CONCURRENT_REPLICAS {
// our NUM_CONCURRENT_REPLICAS is already configured to go beyond the available blueprint servers per backend

// adding one backend server at a time - they start to compose the route as soon as
// its labels match the selector from the service backing the route.
podName := fmt.Sprintf("insecure-concurrent-endpoint-replicas-%d", i)
_, err := oc.AdminKubeClient().CoreV1().Pods(ns).Patch(context.Background(), podName, types.StrategicMergePatchType, endpointReplicaLabelUpdate, metav1.PatchOptions{})
o.Expect(err).NotTo(o.HaveOccurred())

expectedBackendServersCount++
allBackendServers := sets.New[string]()
err = wait.PollUntilContextTimeout(context.Background(), time.Second, timeoutSeconds*time.Second, true, func(ctx context.Context) (bool, error) {
output, err := readURL(ns, execPod.Name, "http", hostName, "/", routerIP, 0)
if err != nil {
// possible 503 due to the first pod still missing
return false, nil
}
allBackendServers.Insert(output)

// we are done as soon as we found as much backend servers as we have behind the route's configuration
return allBackendServers.Len() == expectedBackendServersCount, nil
})
o.Expect(err).NotTo(o.HaveOccurred())
}
}

g.By("adding overlapping route configurations")
{
// Missing
}

g.By("removing unused routes")
for _, name := range pendingRoutes {
err := oc.AsAdmin().Run("delete").Args("route", name).Execute()
o.Expect(err).NotTo(o.HaveOccurred())
}

g.By("mini stress test by adding (and removing) different routes and checking that they are exposed")
for i := 0; i < 16; i++ {
name := fmt.Sprintf("hapcm-stress-insecure-%d", i)
hostName := fmt.Sprintf("stress.insecure-%d.hapcm.test", i)
err := oc.AsAdmin().Run("expose").Args("service", "insecure-service", "--name", name, "--hostname", hostName, "--labels", "select=haproxy-cfgmgr").Execute()
o.Expect(err).NotTo(o.HaveOccurred())
err := createRoute(oc, routeTypeInsecure, name, "insecure-service", hostName, "/")

err = waitForRouteToRespond(ns, execPod.Name, "http", hostName, "/", routerIP, 0)
o.Expect(err).NotTo(o.HaveOccurred())

err = oc.AsAdmin().Run("delete").Args("route", name).Execute()
o.Expect(err).NotTo(o.HaveOccurred())

routeTypes := []string{"edge", "reencrypt", "passthrough"}
routeTypes := []routeType{routeTypeEdge, routeTypeReencrypt, routeTypePassthrough}
for _, t := range routeTypes {
name := fmt.Sprintf("hapcm-stress-%s-%d", t, i)
hostName := fmt.Sprintf("stress.%s-%d.hapcm.test", t, i)
Expand All @@ -550,9 +723,7 @@ http {
serviceName = "insecure-service"
}

err := oc.AsAdmin().Run("create").Args("route", t, name, "--service", serviceName, "--hostname", hostName).Execute()
o.Expect(err).NotTo(o.HaveOccurred())
err = oc.AsAdmin().Run("label").Args("route", name, "select=haproxy-cfgmgr").Execute()
err := createRoute(oc, t, name, serviceName, hostName, "/")
o.Expect(err).NotTo(o.HaveOccurred())

err = waitForRouteToRespond(ns, execPod.Name, "https", hostName, "/", routerIP, 0)
Expand All @@ -566,6 +737,46 @@ http {
})
})

type routeType string

const (
routeTypeInsecure = "insecure"
routeTypeEdge = "edge"
routeTypeReencrypt = "reencrypt"
routeTypePassthrough = "passthrough"
)

func createRoute(oc *exutil.CLI, routeType routeType, routeName, serviceName, hostName, path string) error {
var err error
switch routeType {
case routeTypeInsecure:
// --labels on `oc expose` up to 4.21 does not override the ones coming from service's selector,
// so we're labeling the router after creating it. https://issues.redhat.com/browse/OCPBUGS-74543
err = oc.AsAdmin().Run("expose").Args("service", serviceName, "--name", routeName, "--hostname", hostName, "--path", path).Execute()
case routeTypePassthrough:
err = oc.AsAdmin().Run("create").Args("route", routeTypePassthrough, routeName, "--service", serviceName, "--hostname", hostName).Execute()
default:
err = oc.AsAdmin().Run("create").Args("route", string(routeType), routeName, "--service", serviceName, "--hostname", hostName, "--path", path).Execute()
}
if err != nil {
return err
}
return oc.AsAdmin().Run("label").Args("route", routeName, "select=haproxy-cfgmgr").Execute()
}

func readURL(ns, execPodName, proto, host, path, ipaddr string, port int) (string, error) {
host = exutil.IPUrl(host)
if port == 0 {
port = 80
if proto == "https" {
port = 443
}
}
uri := fmt.Sprintf("%s://%s:%d%s", proto, host, port, path)
cmd := fmt.Sprintf("curl -ksfL -m 5 --resolve %s:%d:%s %q", host, port, ipaddr, uri)
return e2eoutput.RunHostCmd(ns, execPodName, cmd)
}

func waitForRouteToRespond(ns, execPodName, proto, host, abspath, ipaddr string, port int) error {
// bracket IPv6 IPs when used as URI
host = exutil.IPUrl(host)
Expand Down