Skip to content

Commit 7a7d6bd

Browse files
committed
Add support for MutableCSINodeAllocatableCount
The CSI list's all PCIe devices that are not of type VIRTIO_BLOCK_DEVICE and subtracts them from the theoretically maximum, so kubernetes can report a correct dynamic max volume count that can be attached for each node. Signed-off-by: Niclas Schad <niclas.schad@stackit.cloud>
1 parent bddf6e4 commit 7a7d6bd

5 files changed

Lines changed: 113 additions & 5 deletions

File tree

pkg/csi/blockstorage/controllerserver.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,11 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
370370

371371
_, err = cloud.AttachVolume(ctx, instanceID, volumeID)
372372
if err != nil {
373+
// Trigger's an immediate `NodeGetInfo` RPC call when MutableCSINodeAllocatableCount is enabled
374+
// TODO: Finish Implementation of IsTooManyDevicesError
375+
//if stackiterrors.IsTooManyDevicesError(err) {
376+
// return nil, status.Errorf(codes.ResourceExhausted, "[ControllerPublishVolume] Node can't accept any more volumes %v. All PCIe lanes are exhausted!", err)
377+
//}
373378
klog.Errorf("Failed to AttachVolume: %v", err)
374379
return nil, status.Errorf(codes.Internal, "[ControllerPublishVolume] Attach Volume failed with error %v", err)
375380
}

pkg/csi/blockstorage/nodeserver.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -308,9 +308,17 @@ func (ns *nodeServer) NodeGetInfo(ctx context.Context, _ *csi.NodeGetInfoRequest
308308
}
309309

310310
maxVolumesPerNode := DetermineMaxVolumesByFlavor(flavor)
311-
// Subtract 1 for root disk and another for configDrive/spare
312-
maxVolumesPerNode -= 2
313-
klog.V(4).Infof("Determined node to support %d volumes", maxVolumesPerNode)
311+
312+
// Subtract already mounted Volumes
313+
emptyPCIeRootPorts, err := mount.CountNonVirtioBlockDevices()
314+
if err != nil {
315+
klog.Errorf("[NodeGetInfo] unable to retrieve PCIe root ports %v", err)
316+
emptyPCIeRootPorts = 0
317+
}
318+
319+
maxVolumesPerNode -= emptyPCIeRootPorts
320+
klog.Infof("Determined %d PCIe ports occupied by non virtio block devices", emptyPCIeRootPorts)
321+
klog.Infof("Determined node to support %d volumes", maxVolumesPerNode)
314322

315323
nodeInfo := &csi.NodeGetInfoResponse{
316324
NodeId: nodeID,

pkg/csi/blockstorage/utils.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func DetermineMaxVolumesByFlavor(flavor string) int64 {
9797
return 159
9898
default:
9999
// All other flavors can mount 28 volumes
100-
return 25
100+
return 28
101101
}
102102
}
103103

pkg/csi/util/mount/mount.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"fmt"
2121
"os"
2222
"path"
23+
"path/filepath"
24+
"regexp"
2325
"slices"
2426
"strings"
2527
"time"
@@ -41,6 +43,15 @@ const (
4143
operationFinishSteps = 15
4244
)
4345

46+
var (
47+
pciAddressRegex = regexp.MustCompile(`^[0-9a-fA-F]{4}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}\.[0-9a-fA-F]$`)
48+
)
49+
50+
const (
51+
REDHAT_VENDOR = "0x1af4"
52+
VIRTIO_BLOCK_DEVICE = "0x1042"
53+
)
54+
4455
type IMount interface {
4556
Mounter() *mount.SafeFormatAndMount
4657
ScanForAttach(devicePath string) error
@@ -119,6 +130,78 @@ func probeVolume() error {
119130
return nil
120131
}
121132

133+
// CountNonVirtioBlockDevices returns the number of PCIe Root ports who
134+
// are currently occupied by anything else than an VIRTIO 1.0 Block Device
135+
// returns zero when something went wrong
136+
func CountNonVirtioBlockDevices() (int64, error) {
137+
const pciPath = "/sys/bus/pci/devices"
138+
139+
// Get all PCI devices
140+
devices, err := os.ReadDir(pciPath)
141+
if err != nil {
142+
return 0, fmt.Errorf("failed to read PCI bus: %w", err)
143+
}
144+
145+
pcieSlotsOccupiedByNonBlockDevice := 0
146+
147+
for _, dev := range devices {
148+
devPath := filepath.Join(pciPath, dev.Name())
149+
150+
// 1. Identify if it's a Root Port / Bridge
151+
// We check the 'class' file. PCI Bridge class code starts with 0x0604
152+
classBuf, err := os.ReadFile(filepath.Join(devPath, "class"))
153+
if err != nil {
154+
klog.Errorf("failed to read PCI device class %s : %v", devPath, err)
155+
continue
156+
}
157+
class := strings.TrimSpace(string(classBuf))
158+
159+
// Class 0x060400 is a PCI-to-PCI bridge (standard for Root Ports)
160+
if strings.HasPrefix(class, "0x0604") {
161+
162+
// 2. Check if the port has downstream devices
163+
// If the bridge has children, they appear as subdirectories
164+
// matching the PCI address format (e.g., 0000:01:00.0)
165+
files, err2 := os.ReadDir(devPath)
166+
if err2 != nil {
167+
klog.Errorf("failed to read dir %s : %v", devPath, err2)
168+
}
169+
for _, file := range files {
170+
// Ignore PCI bus directories such as pci001 pci002 and pci010
171+
// Devices must follow <domain:bus:device.function> format
172+
if pciAddressRegex.MatchString(file.Name()) {
173+
isNonBlockDevice := IsNonBlockDevice(devPath, file)
174+
if isNonBlockDevice {
175+
pcieSlotsOccupiedByNonBlockDevice++
176+
}
177+
break
178+
}
179+
}
180+
} else {
181+
klog.Infof("skipping class %s: path: %s", class, devPath)
182+
}
183+
}
184+
185+
return int64(pcieSlotsOccupiedByNonBlockDevice), nil
186+
}
187+
188+
func IsNonBlockDevice(devPath string, file os.DirEntry) bool {
189+
var isNonBlockDevice bool
190+
pciDevicePath := filepath.Join(devPath, file.Name())
191+
vendorBuf, err := os.ReadFile(filepath.Join(pciDevicePath, "vendor"))
192+
if err != nil {
193+
klog.Errorf("failed to read PCI device vendor %s : %v", pciDevicePath, err)
194+
}
195+
deviceBuf, err := os.ReadFile(filepath.Join(pciDevicePath, "device"))
196+
if err != nil {
197+
klog.Errorf("failed to read PCI device file %s : %v", pciDevicePath, err)
198+
}
199+
if strings.TrimSpace(string(vendorBuf)) == REDHAT_VENDOR && strings.TrimSpace(string(deviceBuf)) != VIRTIO_BLOCK_DEVICE {
200+
isNonBlockDevice = true
201+
}
202+
return isNonBlockDevice
203+
}
204+
122205
// GetDevicePath returns the path of an attached block storage volume, specified by its id.
123206
func (m *Mount) GetDevicePath(volumeID string) (string, error) {
124207
backoff := wait.Backoff{

pkg/stackit/stackiterrors/errors.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ import (
44
"errors"
55
"fmt"
66
"net/http"
7+
"strings"
78

89
oapiError "github.com/stackitcloud/stackit-sdk-go/core/oapierror"
9-
wait "github.com/stackitcloud/stackit-sdk-go/services/iaas/v2api/wait"
10+
"github.com/stackitcloud/stackit-sdk-go/services/iaas/v2api/wait"
1011
)
1112

1213
var ErrNotFound = errors.New("failed to find object")
@@ -20,6 +21,17 @@ func IsNotFound(err error) bool {
2021
return oAPIError.StatusCode == http.StatusNotFound
2122
}
2223

24+
func IsTooManyDevicesError(err error) bool {
25+
var oAPIError *oapiError.GenericOpenAPIError
26+
if ok := errors.As(err, &oAPIError); !ok {
27+
return false
28+
}
29+
30+
// TODO: This is just a placeholder. Implement this correctly
31+
return oAPIError.StatusCode == http.StatusInternalServerError &&
32+
strings.Contains(oAPIError.ErrorMessage, "devices")
33+
}
34+
2335
func IgnoreNotFound(err error) error {
2436
if IsNotFound(err) {
2537
return nil

0 commit comments

Comments
 (0)