Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/nfs/nfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func NewDriver(options *DriverOptions) *Driver {
n.AddNodeServiceCapabilities([]csi.NodeServiceCapability_RPC_Type{
csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
csi.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
csi.NodeServiceCapability_RPC_UNKNOWN,
})
n.volumeLocks = NewVolumeLocks()
Expand Down
217 changes: 192 additions & 25 deletions pkg/nfs/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (ns *NodeServer) NodePublishVolume(_ context.Context, req *csi.NodePublishV
if len(targetPath) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
}
stagingTargetPath := req.GetStagingTargetPath()

lockKey := fmt.Sprintf("%s-%s", volumeID, targetPath)
if acquired := ns.Driver.volumeLocks.TryAcquire(lockKey); !acquired {
Expand Down Expand Up @@ -105,15 +106,6 @@ func (ns *NodeServer) NodePublishVolume(_ context.Context, req *csi.NodePublishV
if baseDir == "" {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("%v is a required parameter", paramShare))
}
server = getServerFromSource(server)
source := fmt.Sprintf("%s:%s", server, baseDir)
if subDir != "" {
// replace pv/pvc name namespace metadata in subDir
subDir = replaceWithMap(subDir, subDirReplaceMap)

source = strings.TrimRight(source, "/")
source = fmt.Sprintf("%s/%s", source, subDir)
}

notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath)
if err != nil {
Expand All @@ -130,19 +122,72 @@ func (ns *NodeServer) NodePublishVolume(_ context.Context, req *csi.NodePublishV
return &csi.NodePublishVolumeResponse{}, nil
}

klog.V(2).Infof("NodePublishVolume: volumeID(%v) source(%s) targetPath(%s) mountflags(%v)", volumeID, source, targetPath, mountOptions)
execFunc := func() error {
return ns.mounter.Mount(source, targetPath, "nfs", mountOptions)
}
timeoutFunc := func() error { return fmt.Errorf("time out") }
if err := WaitUntilTimeout(90*time.Second, execFunc, timeoutFunc); err != nil {
if os.IsPermission(err) {
return nil, status.Error(codes.PermissionDenied, err.Error())
// If stagingTargetPath is provided, bind mount from staging to target
// and remount with security options to ensure they are applied
if stagingTargetPath != "" {
klog.V(2).Infof("NodePublishVolume: volumeID(%v) bind mounting from stagingPath(%s) to targetPath(%s) with mountflags(%v)", volumeID, stagingTargetPath, targetPath, mountOptions)

// Perform bind mount
if err := ns.mounter.Mount(stagingTargetPath, targetPath, "", []string{"bind"}); err != nil {
if os.IsPermission(err) {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
if strings.Contains(err.Error(), "invalid argument") {
return nil, status.Error(codes.InvalidArgument, err.Error())

// Remount with security options to ensure they are applied to the bind mount
// Extract security-related mount options that need to be re-applied
securityOpts := []string{"remount"}
for _, opt := range mountOptions {
// Include security options and readonly flag
if opt == "noexec" || opt == "nosuid" || opt == "nodev" || opt == "ro" {
securityOpts = append(securityOpts, opt)
}
}

// Only remount if there are security options to apply
if len(securityOpts) > 1 {
klog.V(2).Infof("NodePublishVolume: remounting targetPath(%s) with security options(%v)", targetPath, securityOpts)
if err := ns.mounter.Mount("", targetPath, "", securityOpts); err != nil {
// Attempt to cleanup the bind mount on failure
forceUnmounter, ok := ns.mounter.(mount.MounterForceUnmounter)
if ok {
mount.CleanupMountWithForce(targetPath, forceUnmounter, false, 30*time.Second)
} else {
mount.CleanupMountPoint(targetPath, ns.mounter, false)
}
if os.IsPermission(err) {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
}
} else {
// Legacy path: direct NFS mount (for backward compatibility when staging is not used)
server = getServerFromSource(server)
source := fmt.Sprintf("%s:%s", server, baseDir)
if subDir != "" {
// replace pv/pvc name namespace metadata in subDir
subDir = replaceWithMap(subDir, subDirReplaceMap)

source = strings.TrimRight(source, "/")
source = fmt.Sprintf("%s/%s", source, subDir)
}

klog.V(2).Infof("NodePublishVolume: volumeID(%v) source(%s) targetPath(%s) mountflags(%v)", volumeID, source, targetPath, mountOptions)
execFunc := func() error {
return ns.mounter.Mount(source, targetPath, "nfs", mountOptions)
}
timeoutFunc := func() error { return fmt.Errorf("time out") }
if err := WaitUntilTimeout(90*time.Second, execFunc, timeoutFunc); err != nil {
if os.IsPermission(err) {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
if strings.Contains(err.Error(), "invalid argument") {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}

if mountPermissions > 0 {
Expand All @@ -152,7 +197,7 @@ func (ns *NodeServer) NodePublishVolume(_ context.Context, req *csi.NodePublishV
} else {
klog.V(2).Infof("skip chmod on targetPath(%s) since mountPermissions is set as 0", targetPath)
}
klog.V(2).Infof("volume(%s) mount %s on %s succeeded", volumeID, source, targetPath)
klog.V(2).Infof("volume(%s) mounted to %s successfully", volumeID, targetPath)
return &csi.NodePublishVolumeResponse{}, nil
}

Expand Down Expand Up @@ -286,13 +331,135 @@ func (ns *NodeServer) NodeGetVolumeStats(_ context.Context, req *csi.NodeGetVolu
}

// NodeUnstageVolume unstage volume
func (ns *NodeServer) NodeUnstageVolume(_ context.Context, _ *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
func (ns *NodeServer) NodeUnstageVolume(_ context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
volumeID := req.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
stagingTargetPath := req.GetStagingTargetPath()
if len(stagingTargetPath) == 0 {
return nil, status.Error(codes.InvalidArgument, "Staging target path missing in request")
}

lockKey := fmt.Sprintf("%s-%s", volumeID, stagingTargetPath)
if acquired := ns.Driver.volumeLocks.TryAcquire(lockKey); !acquired {
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
}
defer ns.Driver.volumeLocks.Release(lockKey)

klog.V(2).Infof("NodeUnstageVolume: unmounting volume %s on %s", volumeID, stagingTargetPath)
var err error
extensiveMountPointCheck := true
forceUnmounter, ok := ns.mounter.(mount.MounterForceUnmounter)
if ok {
klog.V(2).Infof("force unmount %s on %s", volumeID, stagingTargetPath)
err = mount.CleanupMountWithForce(stagingTargetPath, forceUnmounter, extensiveMountPointCheck, 30*time.Second)
} else {
err = mount.CleanupMountPoint(stagingTargetPath, ns.mounter, extensiveMountPointCheck)
}
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to unmount staging target %q: %v", stagingTargetPath, err)
}
klog.V(2).Infof("NodeUnstageVolume: unmount volume %s on %s successfully", volumeID, stagingTargetPath)

return &csi.NodeUnstageVolumeResponse{}, nil
}

// NodeStageVolume stage volume
func (ns *NodeServer) NodeStageVolume(_ context.Context, _ *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
func (ns *NodeServer) NodeStageVolume(_ context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
volCap := req.GetVolumeCapability()
if volCap == nil {
return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request")
}
volumeID := req.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
stagingTargetPath := req.GetStagingTargetPath()
if len(stagingTargetPath) == 0 {
return nil, status.Error(codes.InvalidArgument, "Staging target path not provided")
}

lockKey := fmt.Sprintf("%s-%s", volumeID, stagingTargetPath)
if acquired := ns.Driver.volumeLocks.TryAcquire(lockKey); !acquired {
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
}
defer ns.Driver.volumeLocks.Release(lockKey)

mountOptions := volCap.GetMount().GetMountFlags()

var server, baseDir, subDir string
subDirReplaceMap := map[string]string{}

for k, v := range req.GetVolumeContext() {
switch strings.ToLower(k) {
case paramServer:
server = v
case paramShare:
baseDir = v
case paramSubDir:
subDir = v
case pvcNamespaceKey:
subDirReplaceMap[pvcNamespaceMetadata] = v
case pvcNameKey:
subDirReplaceMap[pvcNameMetadata] = v
case pvNameKey:
subDirReplaceMap[pvNameMetadata] = v
case mountOptionsField:
if v != "" {
mountOptions = append(mountOptions, v)
}
}
}

if server == "" {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("%v is a required parameter", paramServer))
}
if baseDir == "" {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("%v is a required parameter", paramShare))
}
server = getServerFromSource(server)
source := fmt.Sprintf("%s:%s", server, baseDir)
if subDir != "" {
// replace pv/pvc name namespace metadata in subDir
subDir = replaceWithMap(subDir, subDirReplaceMap)

source = strings.TrimRight(source, "/")
source = fmt.Sprintf("%s/%s", source, subDir)
}

notMnt, err := ns.mounter.IsLikelyNotMountPoint(stagingTargetPath)
if err != nil {
if os.IsNotExist(err) {
if err := os.MkdirAll(stagingTargetPath, os.FileMode(0755)); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
notMnt = true
} else {
return nil, status.Error(codes.Internal, err.Error())
}
}
if !notMnt {
return &csi.NodeStageVolumeResponse{}, nil
}

klog.V(2).Infof("NodeStageVolume: volumeID(%v) source(%s) stagingTargetPath(%s) mountflags(%v)", volumeID, source, stagingTargetPath, mountOptions)
execFunc := func() error {
return ns.mounter.Mount(source, stagingTargetPath, "nfs", mountOptions)
}
timeoutFunc := func() error { return fmt.Errorf("time out") }
if err := WaitUntilTimeout(90*time.Second, execFunc, timeoutFunc); err != nil {
if os.IsPermission(err) {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
if strings.Contains(err.Error(), "invalid argument") {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}

klog.V(2).Infof("volume(%s) mount %s on %s succeeded", volumeID, source, stagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil
}

// NodeExpandVolume node expand volume
Expand Down
Loading