Skip to content

Commit 054b5ad

Browse files
committed
feat: add nodeselector annotation to limit LB pool members
1 parent 16fd5c0 commit 054b5ad

File tree

9 files changed

+366
-89
lines changed

9 files changed

+366
-89
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,7 @@ Changes to the following annotations causes pools to be recreated and cause an e
429429
- `k8s.cloudscale.ch/loadbalancer-pool-algorithm`
430430
- `k8s.cloudscale.ch/loadbalancer-pool-protocol`
431431
- `k8s.cloudscale.ch/loadbalancer-listener-allowed-subnets`
432+
- `k8s.cloudscale.ch/loadbalancer-node-selector`
432433

433434
Additionally, changes to `spec.externalTrafficPolicy` have the same effect.
434435

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# Deploys the docker.io/nginxdemos/hello:plain-text container and creates a
2+
# loadbalancer service with a node-selector annotation for it:
3+
#
4+
# export KUBECONFIG=path/to/kubeconfig
5+
# kubectl apply -f nginx-hello.yml
6+
#
7+
# Wait for `kubectl describe service hello` to show "Loadbalancer Ensured",
8+
# then use the IP address found under "LoadBalancer Ingress" to connect to the
9+
# service.
10+
#
11+
# You can also use the following shortcut:
12+
#
13+
# curl http://$(kubectl get service hello -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
14+
#
15+
# If you follow the nginx log, you will see that nginx sees a cluster internal
16+
# IP address as source of requests:
17+
#
18+
# kubectl logs -l "app=hello"
19+
#
20+
---
21+
apiVersion: apps/v1
22+
kind: Deployment
23+
metadata:
24+
name: hello
25+
spec:
26+
replicas: 2
27+
selector:
28+
matchLabels:
29+
app: hello
30+
template:
31+
metadata:
32+
labels:
33+
app: hello
34+
spec:
35+
containers:
36+
- name: hello
37+
image: docker.io/nginxdemos/hello:plain-text
38+
nodeSelector:
39+
kubernetes.io/hostname: k8test-worker-2
40+
---
41+
apiVersion: v1
42+
kind: Service
43+
metadata:
44+
labels:
45+
app: hello
46+
annotations:
47+
k8s.cloudscale.ch/loadbalancer-node-selector: "kubernetes.io/hostname=k8test-worker-2"
48+
name: hello
49+
spec:
50+
ports:
51+
- port: 80
52+
protocol: TCP
53+
targetPort: 80
54+
name: http
55+
selector:
56+
app: hello
57+
type: LoadBalancer

pkg/cloudscale_ccm/cloud.go

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@ import (
99
"strings"
1010
"time"
1111

12-
cloudscale "github.com/cloudscale-ch/cloudscale-go-sdk/v6"
12+
"github.com/cloudscale-ch/cloudscale-go-sdk/v6"
1313
"golang.org/x/oauth2"
14-
"k8s.io/client-go/kubernetes"
14+
corev1 "k8s.io/api/core/v1"
15+
"k8s.io/client-go/kubernetes/scheme"
16+
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
17+
"k8s.io/client-go/tools/record"
1518
"k8s.io/klog/v2"
1619

1720
cloudprovider "k8s.io/cloud-provider"
@@ -32,6 +35,8 @@ const (
3235
type cloud struct {
3336
instances *instances
3437
loadbalancer *loadbalancer
38+
39+
eventRecorder record.EventRecorder
3540
}
3641

3742
// Register this provider with Kubernetes.
@@ -112,8 +117,27 @@ func (c *cloud) Initialize(
112117

113118
// This cannot be configured earlier, even though it seems better situated
114119
// in newCloudscaleClient
115-
c.loadbalancer.k8s = kubernetes.NewForConfigOrDie(
116-
clientBuilder.ConfigOrDie("cloudscale-cloud-controller-manager"))
120+
c.loadbalancer.k8s = clientBuilder.ClientOrDie(
121+
"cloudscale-cloud-controller-manager",
122+
)
123+
124+
eventBroadcaster := record.NewBroadcaster()
125+
eventBroadcaster.StartRecordingToSink(&v1.EventSinkImpl{
126+
Interface: c.loadbalancer.k8s.CoreV1().Events(""),
127+
})
128+
c.eventRecorder = eventBroadcaster.NewRecorder(scheme.Scheme,
129+
corev1.EventSource{
130+
Component: "cloudscale-cloud-controller-manager",
131+
},
132+
)
133+
134+
go func() {
135+
// wait until stop chan closes
136+
<-stop
137+
eventBroadcaster.Shutdown()
138+
}()
139+
140+
c.loadbalancer.recorder = c.eventRecorder
117141
}
118142

119143
// LoadBalancer returns a balancer interface. Also returns true if the

pkg/cloudscale_ccm/loadbalancer.go

Lines changed: 61 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package cloudscale_ccm
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
76
"slices"
87
"strings"
@@ -11,7 +10,9 @@ import (
1110
"github.com/cloudscale-ch/cloudscale-go-sdk/v6"
1211
v1 "k8s.io/api/core/v1"
1312
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13+
"k8s.io/apimachinery/pkg/labels"
1414
"k8s.io/client-go/kubernetes"
15+
"k8s.io/client-go/tools/record"
1516
"k8s.io/klog/v2"
1617
"k8s.io/utils/ptr"
1718
)
@@ -208,7 +209,7 @@ const (
208209
// connections timing out while the monitor is updated.
209210
LoadBalancerHealthMonitorTimeoutS = "k8s.cloudscale.ch/loadbalancer-health-monitor-timeout-s"
210211

211-
// LoadBalancerHealthMonitorDownThreshold is the number of the checks that
212+
// LoadBalancerHealthMonitorUpThreshold is the number of the checks that
212213
// need to succeed before a pool member is considered up. Defaults to 2.
213214
LoadBalancerHealthMonitorUpThreshold = "k8s.cloudscale.ch/loadbalancer-health-monitor-up-threshold"
214215

@@ -278,7 +279,7 @@ const (
278279
// Changing this annotation on an established service is considered safe.
279280
LoadBalancerListenerTimeoutMemberDataMS = "k8s.cloudscale.ch/loadbalancer-timeout-member-data-ms"
280281

281-
// LoadBalancerSubnetLimit is a JSON list of subnet UUIDs that the
282+
// LoadBalancerListenerAllowedSubnets is a JSON list of subnet UUIDs that the
282283
// loadbalancer should use. By default, all subnets of a node are used:
283284
//
284285
// * `[]` means that anyone is allowed to connect (default).
@@ -291,12 +292,17 @@ const (
291292
// This is an advanced feature, useful if you have nodes that are in
292293
// multiple private subnets.
293294
LoadBalancerListenerAllowedSubnets = "k8s.cloudscale.ch/loadbalancer-listener-allowed-subnets"
295+
296+
// LoadBalancerNodeSelector can be set to restrict which nodes are added to the LB pool.
297+
// It accepts a standard Kubernetes label selector string.
298+
LoadBalancerNodeSelector = "k8s.cloudscale.ch/loadbalancer-node-selector"
294299
)
295300

296301
type loadbalancer struct {
297-
lbs lbMapper
298-
srv serverMapper
299-
k8s kubernetes.Interface
302+
lbs lbMapper
303+
srv serverMapper
304+
k8s kubernetes.Interface
305+
recorder record.EventRecorder
300306
}
301307

302308
// GetLoadBalancer returns whether the specified load balancer exists, and
@@ -387,16 +393,23 @@ func (l *loadbalancer) EnsureLoadBalancer(
387393
return nil, err
388394
}
389395

390-
// Refuse to do anything if there are no nodes
396+
nodes, err := filterNodesBySelector(serviceInfo, nodes)
397+
if err != nil {
398+
return nil, err
399+
}
400+
391401
if len(nodes) == 0 {
392-
return nil, errors.New(
393-
"no valid nodes for service found, please verify there is " +
394-
"at least one that allows load balancers",
402+
l.recorder.Event(
403+
service,
404+
v1.EventTypeWarning,
405+
"NoValidNodes",
406+
"No valid nodes for service found, "+
407+
"double-check node-selector annotation",
395408
)
396409
}
397410

398411
// Reconcile
399-
err := reconcileLbState(ctx, l.lbs.client, func() (*lbState, error) {
412+
err = reconcileLbState(ctx, l.lbs.client, func() (*lbState, error) {
400413
// Get the desired state from Kubernetes
401414
servers, err := l.srv.mapNodes(ctx, nodes).All()
402415
if err != nil {
@@ -442,6 +455,28 @@ func (l *loadbalancer) EnsureLoadBalancer(
442455
return result, nil
443456
}
444457

458+
func filterNodesBySelector(
459+
serviceInfo *serviceInfo,
460+
nodes []*v1.Node,
461+
) ([]*v1.Node, error) {
462+
selector := labels.Everything()
463+
if v := serviceInfo.annotation(LoadBalancerNodeSelector); v != "" {
464+
var err error
465+
selector, err = labels.Parse(v)
466+
if err != nil {
467+
return nil, fmt.Errorf("unable to parse selector: %w", err)
468+
}
469+
}
470+
selectedNodes := make([]*v1.Node, 0, len(nodes))
471+
for _, node := range nodes {
472+
if selector.Matches(labels.Set(node.Labels)) {
473+
selectedNodes = append(selectedNodes, node)
474+
}
475+
}
476+
477+
return selectedNodes, nil
478+
}
479+
445480
// UpdateLoadBalancer updates hosts under the specified load balancer.
446481
// Implementations must treat the *v1.Service and *v1.Node
447482
// parameters as read-only and not modify them.
@@ -461,6 +496,21 @@ func (l *loadbalancer) UpdateLoadBalancer(
461496
return err
462497
}
463498

499+
nodes, err := filterNodesBySelector(serviceInfo, nodes)
500+
if err != nil {
501+
return err
502+
}
503+
504+
if len(nodes) == 0 {
505+
l.recorder.Event(
506+
service,
507+
v1.EventTypeWarning,
508+
"NoValidNodes",
509+
"No valid nodes for service found, "+
510+
"double-check node-selector annotation",
511+
)
512+
}
513+
464514
// Reconcile
465515
return reconcileLbState(ctx, l.lbs.client, func() (*lbState, error) {
466516
// Get the desired state from Kubernetes

pkg/cloudscale_ccm/reconcile.go

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,13 @@ type lbState struct {
2222

2323
// Pool pointers are used to refer to members by pool, therefore use a
2424
// pointer here as well, to not accidentally copy the struct.
25-
pools []*cloudscale.LoadBalancerPool
26-
members map[*cloudscale.LoadBalancerPool][]cloudscale.
27-
LoadBalancerPoolMember
28-
monitors map[*cloudscale.LoadBalancerPool][]cloudscale.
29-
LoadBalancerHealthMonitor
25+
pools []*cloudscale.LoadBalancerPool
26+
members map[*cloudscale.LoadBalancerPool][]cloudscale.LoadBalancerPoolMember
27+
monitors map[*cloudscale.LoadBalancerPool][]cloudscale.LoadBalancerHealthMonitor
3028

3129
// Though not currently used that way, listeners are not
3230
// necessarily bound to any given pool.
33-
listeners map[*cloudscale.LoadBalancerPool][]cloudscale.
34-
LoadBalancerListener
31+
listeners map[*cloudscale.LoadBalancerPool][]cloudscale.LoadBalancerListener
3532

3633
// The assigned floating IPs
3734
floatingIPs []string
@@ -201,15 +198,6 @@ func desiredLbState(
201198
}
202199
}
203200

204-
// If there are no pool members, return an error. It would be possible
205-
// to just put a load balancer up that has no function, but it seems
206-
// more useful to err instead, as there's likely something wrong.
207-
if len(s.members[&pool]) == 0 {
208-
return nil, fmt.Errorf(
209-
"service %s: no private address found on any node",
210-
serviceInfo.Service.Name)
211-
}
212-
213201
// Add a health monitor for each pool
214202
monitor, err := healthMonitorForPort(serviceInfo)
215203
if err != nil {

pkg/cloudscale_ccm/reconcile_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1156,10 +1156,10 @@ func TestLimitSubnets(t *testing.T) {
11561156
assert.Equal(t, "10.0.1.1", state.members[state.pools[0]][0].Address)
11571157
assert.Equal(t, "10.0.1.2", state.members[state.pools[0]][1].Address)
11581158

1159-
// If we have no valid addresses, we get an error
1159+
// If we have no valid addresses, we get no error
11601160
s.Annotations[LoadBalancerListenerAllowedSubnets] = `
11611161
["00000000-0000-0000-0000-000000000003"]`
11621162

11631163
_, err = desiredLbState(i, nodes, servers)
1164-
assert.Error(t, err)
1164+
assert.NoError(t, err)
11651165
}

pkg/cloudscale_ccm/service_info.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ func (s serviceInfo) annotation(key string) string {
118118
return s.annotationOrDefault(key, "50000")
119119
case LoadBalancerListenerAllowedSubnets:
120120
return s.annotationOrDefault(key, "[]")
121+
case LoadBalancerNodeSelector:
122+
return s.annotationOrDefault(key, "")
121123
default:
122124
return s.annotationOrElse(key, func() string {
123125
klog.Warning("unknown annotation:", key)

0 commit comments

Comments
 (0)