Skip to content

Commit 39b17bf

Browse files
committed
improve node stage volume mounting
- add volume locks to avoid concurrent operations on the same volume - add detection if symlinks don't add up to the right volume - increase logging once more
1 parent 9e203fd commit 39b17bf

File tree

7 files changed

+172
-36
lines changed

7 files changed

+172
-36
lines changed

driver/driver.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ type Driver struct {
6565
mounter Mounter
6666
log *logrus.Entry
6767

68+
// A map storing all volumes with ongoing operations so that additional operations
69+
// for that same volume (as defined by VolumeID) return an Aborted error
70+
volumeLocks *VolumeLocks
71+
6872
// ready defines whether the driver is ready to function. This value will
6973
// be used by the `Identity` service via the `Probe()` method.
7074
readyMu sync.Mutex // protects ready
@@ -113,6 +117,7 @@ func NewDriver(ep, token, urlstr string, logLevel logrus.Level) (*Driver, error)
113117
cloudscaleClient: cloudscaleClient,
114118
mounter: newMounter(log),
115119
log: log,
120+
volumeLocks: NewVolumeLocks(),
116121
}, nil
117122
}
118123

driver/driver_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,11 @@ import (
2929
"time"
3030

3131
"github.com/cenkalti/backoff/v5"
32-
"github.com/google/uuid"
33-
"k8s.io/mount-utils"
34-
3532
"github.com/cloudscale-ch/cloudscale-go-sdk/v6"
33+
"github.com/google/uuid"
3634
"github.com/kubernetes-csi/csi-test/v5/pkg/sanity"
3735
"github.com/sirupsen/logrus"
3836
"k8s.io/mount-utils"
39-
40-
"github.com/cloudscale-ch/cloudscale-go-sdk/v4"
4137
)
4238

4339
func init() {
@@ -74,6 +70,7 @@ func TestDriverSuite(t *testing.T) {
7470
cloudscaleClient: cloudscaleClient,
7571
mounter: fm,
7672
log: logrus.New().WithField("test_enabed", true),
73+
volumeLocks: NewVolumeLocks(),
7774
}
7875
defer driver.Stop()
7976

driver/driver_volume_type_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func TestCreateVolumeInvalidType(t *testing.T) {
9090
)
9191

9292
assert.Error(t, err)
93-
//assert.Error(t, err, "invalid volume capabilities requested for LUKS xx.")
93+
// assert.Error(t, err, "invalid volume capabilities requested for LUKS xx.")
9494
}
9595

9696
func TestCreateVolumeInvalidLUKSAndRaw(t *testing.T) {
@@ -183,5 +183,6 @@ func createDriverForTest(t *testing.T) *Driver {
183183
mounter: &fakeMounter{},
184184
log: logrus.New().WithField("test_enabled", true),
185185
cloudscaleClient: cloudscaleClient,
186+
volumeLocks: NewVolumeLocks(),
186187
}
187188
}

driver/luks_util.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package driver
2020
import (
2121
"errors"
2222
"fmt"
23-
"io/ioutil"
2423
"os"
2524
"os/exec"
2625
"strings"
@@ -204,9 +203,9 @@ func luksPrepareMount(source string, ctx LuksContext, log *logrus.Entry) (string
204203
return "", err
205204
}
206205
defer func() {
207-
e := os.Remove(filename)
208-
if e != nil {
209-
log.Errorf("cannot delete temporary file %s: %s", filename, e.Error())
206+
err := os.Remove(filename)
207+
if err != nil {
208+
log.Errorf("cannot delete temporary file %s: %s", filename, err.Error())
210209
}
211210
}()
212211

@@ -378,10 +377,14 @@ func writeLuksKey(key string, log *logrus.Entry) (string, error) {
378377
if !checkTmpFs("/tmp") {
379378
return "", errors.New("temporary directory /tmp is not a tmpfs volume; refusing to write luks key to a volume backed by a disk")
380379
}
381-
tmpFile, err := ioutil.TempFile("/tmp", "luks-")
380+
tmpFile, err := os.CreateTemp("/tmp", "luks-")
382381
if err != nil {
383382
return "", err
384383
}
384+
defer func() {
385+
_ = tmpFile.Close()
386+
}()
387+
385388
_, err = tmpFile.WriteString(key)
386389
if err != nil {
387390
log.WithField("tmp_file", tmpFile.Name()).Warnf("Unable to write luks key file: %s", err.Error())

driver/mounter.go

Lines changed: 67 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,13 @@ import (
2525
"os"
2626
"os/exec"
2727
"path/filepath"
28+
"regexp"
2829
"strconv"
2930
"strings"
3031
"sync"
3132
"syscall"
3233
"time"
3334

34-
"k8s.io/mount-utils"
35-
kexec "k8s.io/utils/exec"
36-
3735
"github.com/sirupsen/logrus"
3836
"golang.org/x/sys/unix"
3937
"k8s.io/mount-utils"
@@ -204,7 +202,9 @@ func (m *mounter) Mount(source, target, fsType string, luksContext LuksContext,
204202
if err != nil {
205203
return fmt.Errorf("failed to create target file for raw block bind mount: %v", err)
206204
}
207-
file.Close()
205+
if err := file.Close(); err != nil {
206+
m.log.WithFields(logrus.Fields{"target": target}).Error("failed to close file handle")
207+
}
208208
} else {
209209
// create target, os.Mkdirall is noop if directory exists
210210
err := os.MkdirAll(target, 0750)
@@ -222,6 +222,7 @@ func (m *mounter) Mount(source, target, fsType string, luksContext LuksContext,
222222
}).Error("failed to prepare luks volume for mounting")
223223
return err
224224
}
225+
// source is /dev/mapper/<volumeName> now
225226
source = luksSource
226227
}
227228

@@ -267,6 +268,9 @@ func (m *mounter) Unmount(target string, luksContext LuksContext) error {
267268
// a luks volume needs to be closed after unmounting; get the source
268269
// of the mount to check if that is a luks volume
269270
mountSources, err := getMountSources(target)
271+
if err != nil {
272+
return fmt.Errorf("failed to get mount sources for target %q: %v", target, err)
273+
}
270274

271275
err = mount.CleanupMountPoint(target, m.kMounter, true)
272276
if err != nil {
@@ -476,15 +480,35 @@ func (m *mounter) FinalizeVolumeAttachmentAndFindPath(logger *logrus.Entry, volu
476480
logger.WithFields(logrus.Fields{
477481
"disk_id_path": diskIDPath,
478482
"error": err,
479-
}).Debug("FinalizeVolumeAttachmentAndFindPath: found path but failed to resolve symlink")
480-
} else {
483+
}).Error("FinalizeVolumeAttachmentAndFindPath: found path but failed to resolve symlink")
484+
return "", fmt.Errorf("FinalizeVolumeAttachmentAndFindPath: found path %s but failed to resolve symlink: %w", diskIDPath, err)
485+
}
486+
logger.WithFields(logrus.Fields{
487+
"disk_id_path": diskIDPath,
488+
"resolved_device": resolved,
489+
"num_tries": numTries,
490+
}).Debug("FinalizeVolumeAttachmentAndFindPath: found device path")
491+
492+
devFsSerial, innerErr := getScsiSerial(resolved)
493+
if innerErr != nil {
494+
logger.WithFields(logrus.Fields{
495+
"disk_id_path": diskIDPath,
496+
"resolved_device": resolved,
497+
"num_tries": numTries,
498+
}).Error("FinalizeVolumeAttachmentAndFindPath: unable to get device serial")
499+
return "", fmt.Errorf("FinalizeVolumeAttachmentAndFindPath: unable to get serial number for disk %s at path %s: %w", diskIDPath, resolved, innerErr)
500+
}
501+
// success: found a path in /dev/disk/by-id/* which resolved to a symlink in /dev/* and that returned the right serial.
502+
if devFsSerial != "" && devFsSerial == volumeID {
481503
logger.WithFields(logrus.Fields{
482504
"disk_id_path": diskIDPath,
483505
"resolved_device": resolved,
506+
"serial": devFsSerial,
484507
"num_tries": numTries,
485-
}).Debug("FinalizeVolumeAttachmentAndFindPath: found device path")
508+
}).Debug("FinalizeVolumeAttachmentAndFindPath: found device and resolved serial")
509+
return diskIDPath, nil
486510
}
487-
return diskIDPath, nil
511+
// A /dev/* path exists, but it's not matching the right serial. Attempt to repair by triggering udevadm.
488512
}
489513

490514
logger.WithFields(logrus.Fields{
@@ -502,6 +526,41 @@ func (m *mounter) FinalizeVolumeAttachmentAndFindPath(logger *logrus.Entry, volu
502526
return "", errors.New("FinalizeVolumeAttachmentAndFindPath: Timeout after 30s")
503527
}
504528

529+
// getScsiSerial assumes that scsiIdPath exists and will error if it
530+
// doesnt. It is the callers responsibility to verify the existence of this
531+
// tool. Calls scsi_id on the given devicePath to get the serial number reported
532+
// by that device.
533+
func getScsiSerial(devicePath string) (string, error) {
534+
out, err := exec.Command(
535+
"/usr/lib/udev/scsi_id",
536+
"--page=0x83",
537+
"--whitelisted",
538+
fmt.Sprintf("--device=%v", devicePath)).CombinedOutput()
539+
if err != nil {
540+
return "", fmt.Errorf("scsi_id failed for device %q with output %s: %w", devicePath, string(out), err)
541+
}
542+
543+
return parseScsiSerial(string(out))
544+
}
545+
546+
var (
547+
// scsi_id output should be in the form of:
548+
// 0QEMU QEMU HARDDISK <disk id>
549+
scsiPattern = `^0QEMU\s+QEMU\sHARDDISK\s+([\S]+)\s*$`
550+
// regex to parse scsi_id output and extract the serial
551+
scsiRegex = regexp.MustCompile(scsiPattern)
552+
)
553+
554+
// Parse the output returned by scsi_id and extract the serial number
555+
func parseScsiSerial(output string) (string, error) {
556+
substrings := scsiRegex.FindStringSubmatch(output)
557+
if substrings == nil {
558+
return "", fmt.Errorf("scsi_id output cannot be parsed: %q", output)
559+
}
560+
561+
return substrings[1], nil
562+
}
563+
505564
func runCmdWithTimeout(name string, args []string, logger *logrus.Entry, timeout time.Duration) {
506565
ctx, cancel := context.WithTimeout(context.Background(), timeout)
507566
defer cancel()

driver/node.go

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,6 @@ See the License for the specific language governing permissions and
1515
limitations under the License.
1616
*/
1717

18-
// Code generated by protoc-gen-go. DO NOT EDIT.
19-
20-
// NOTE: THIS IS NOT GENERATED. We have to add the line above to prevent golint
21-
// checking this file. This is needed because some methods end with xxxId, but
22-
// golint wants them to be xxxID. But we're not able to change it as the
23-
// official CSI spec is that way and we have to implement the interface
24-
// exactly.
25-
2618
package driver
2719

2820
import (
@@ -40,8 +32,6 @@ import (
4032
)
4133

4234
const (
43-
diskDOPrefix = "scsi-0DO_Volume_"
44-
4535
// Current technical limit is 128
4636
// - 1 for root
4737
// - 1 for /var/lib/docker
@@ -72,6 +62,11 @@ func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRe
7262
return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Volume Capability must be provided")
7363
}
7464

65+
if acquired := d.volumeLocks.TryAcquire(req.VolumeId); !acquired {
66+
return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", req.VolumeId)
67+
}
68+
defer d.volumeLocks.Release(req.VolumeId)
69+
7570
// Apparently sometimes we need to call udevadm trigger to get the volume
7671
// properly registered in /dev/disk. More information can be found here:
7772
// https://github.com/cloudscale-ch/csi-cloudscale/issues/9
@@ -80,12 +75,17 @@ func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRe
8075
return nil, err
8176
}
8277

78+
d.log.WithFields(logrus.Fields{
79+
"volume_id": req.VolumeId,
80+
"device_path": source,
81+
}).Info("successfully found attached volume_id at device_path")
82+
8383
// Debug logging to help diagnose potential race conditions with concurrent volume mounts
8484
resolvedSource, resolveErr := filepath.EvalSymlinks(source)
8585
if resolveErr != nil {
8686
d.log.WithFields(logrus.Fields{
87-
"volume_id": req.VolumeId,
88-
"source": source,
87+
"volume_id": req.VolumeId,
88+
"source": source,
8989
"resolve_error": resolveErr,
9090
}).Debug("failed to resolve source symlink")
9191
} else {
@@ -115,7 +115,7 @@ func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRe
115115
return &csi.NodeStageVolumeResponse{}, nil
116116
}
117117

118-
target := req.StagingTargetPath
118+
stagingTargetPath := req.StagingTargetPath
119119

120120
mnt := req.VolumeCapability.GetMount()
121121
options := mnt.MountFlags
@@ -153,19 +153,21 @@ func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRe
153153
ll.Info("source device is already formatted")
154154
}
155155

156-
ll.Info("mounting the volume for staging")
156+
ll.Info("checking if stagingTargetPath is already mounted")
157157

158-
mounted, err := d.mounter.IsMounted(target)
158+
mounted, err := d.mounter.IsMounted(stagingTargetPath)
159159
if err != nil {
160+
ll.WithError(err).Error("unable to check if already mounted")
160161
return nil, err
161162
}
162163

163164
if !mounted {
164-
if err := d.mounter.Mount(source, target, fsType, luksContext, options...); err != nil {
165+
ll.Info("not mounted yet, mounting the volume for staging")
166+
if err := d.mounter.Mount(source, stagingTargetPath, fsType, luksContext, options...); err != nil {
165167
return nil, status.Error(codes.Internal, err.Error())
166168
}
167169
} else {
168-
ll.Info("source device is already mounted to the target path")
170+
ll.Info("source device is already mounted to the stagingTargetPath path")
169171
}
170172

171173
ll.Info("formatting and mounting stage volume is finished")
@@ -182,6 +184,11 @@ func (d *Driver) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolu
182184
return nil, status.Error(codes.InvalidArgument, "NodeUnstageVolume Staging Target Path must be provided")
183185
}
184186

187+
if acquired := d.volumeLocks.TryAcquire(req.VolumeId); !acquired {
188+
return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", req.VolumeId)
189+
}
190+
defer d.volumeLocks.Release(req.VolumeId)
191+
185192
luksContext := LuksContext{VolumeLifecycle: VolumeLifecycleNodeUnstageVolume}
186193

187194
ll := d.log.WithFields(logrus.Fields{
@@ -229,6 +236,11 @@ func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolu
229236
return nil, status.Error(codes.InvalidArgument, "NodePublishVolume Volume Capability must be provided")
230237
}
231238

239+
if acquired := d.volumeLocks.TryAcquire(req.VolumeId); !acquired {
240+
return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", req.VolumeId)
241+
}
242+
defer d.volumeLocks.Release(req.VolumeId)
243+
232244
publishContext := req.GetPublishContext()
233245
if publishContext == nil {
234246
return nil, status.Error(codes.InvalidArgument, "PublishContext must be provided")
@@ -276,6 +288,11 @@ func (d *Driver) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublish
276288
return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume Target Path must be provided")
277289
}
278290

291+
if acquired := d.volumeLocks.TryAcquire(req.VolumeId); !acquired {
292+
return nil, status.Errorf(codes.Aborted, "an operation with the given Volume ID %s already exists", req.VolumeId)
293+
}
294+
defer d.volumeLocks.Release(req.VolumeId)
295+
279296
luksContext := LuksContext{VolumeLifecycle: VolumeLifecycleNodeUnpublishVolume}
280297

281298
ll := d.log.WithFields(logrus.Fields{

0 commit comments

Comments
 (0)