Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ func (d plugin) Create(r *volume.CreateRequest) error {
logger.Infof("Creating volume '%s' ...", r.Name)
logger.Debugf("Create: %+v", r)

ctx := context.TODO()

d.mutex.Lock()
defer d.mutex.Unlock()

Expand All @@ -98,7 +100,7 @@ func (d plugin) Create(r *volume.CreateRequest) error {
}
}

vol, err := volumes.Create(context.TODO(), d.blockClient, volumes.CreateOpts{
vol, err := volumes.Create(ctx, d.blockClient, volumes.CreateOpts{
Size: size,
Name: r.Name,
}, volumes.SchedulerHintOpts{}).Extract()
Expand Down Expand Up @@ -139,10 +141,12 @@ func (d plugin) List() (*volume.ListResponse, error) {
logger := log.WithFields(log.Fields{"action": "list"})
logger.Debugf("List")

ctx := context.TODO()

var vols []*volume.Volume

pager := volumes.List(d.blockClient, volumes.ListOpts{})
err := pager.EachPage(context.TODO(), func(ctx context.Context, page pagination.Page) (bool, error) {
err := pager.EachPage(ctx, func(ctx context.Context, page pagination.Page) (bool, error) {
vList, _ := volumes.ExtractVolumes(page)

for _, v := range vList {
Expand Down Expand Up @@ -170,6 +174,8 @@ func (d plugin) Mount(r *volume.MountRequest) (*volume.MountResponse, error) {
logger.Infof("Mounting volume '%s' ...", r.Name)
logger.Debugf("Mount: %+v", r)

ctx := context.TODO()

d.mutex.Lock()
defer d.mutex.Unlock()

Expand All @@ -183,24 +189,24 @@ func (d plugin) Mount(r *volume.MountRequest) (*volume.MountResponse, error) {

if vol.Status == "creating" || vol.Status == "detaching" {
logger.Infof("Volume is in '%s' state, wait for 'available'...", vol.Status)
if vol, err = d.waitOnVolumeState(logger.Context, vol, "available"); err != nil {
if vol, err = d.waitOnVolumeState(ctx, vol, "available"); err != nil {
logger.Error(err.Error())
return nil, err
}
}

if vol, err = volumes.Get(context.TODO(), d.blockClient, vol.ID).Extract(); err != nil {
if vol, err = volumes.Get(ctx, d.blockClient, vol.ID).Extract(); err != nil {
return nil, err
}

if len(vol.Attachments) > 0 {
logger.Debug("Volume already attached, detaching first")
if vol, err = d.detachVolume(logger.Context, vol); err != nil {
if vol, err = d.detachVolume(ctx, vol); err != nil {
logger.WithError(err).Error("Error detaching volume")
return nil, err
}

if vol, err = d.waitOnVolumeState(logger.Context, vol, "available"); err != nil {
if vol, err = d.waitOnVolumeState(ctx, vol, "available"); err != nil {
logger.WithError(err).Error("Error detaching volume")
return nil, err
}
Expand All @@ -216,7 +222,7 @@ func (d plugin) Mount(r *volume.MountRequest) (*volume.MountResponse, error) {
// Attaching block volume to compute instance

opts := volumeattach.CreateOpts{VolumeID: vol.ID}
_, err = volumeattach.Create(context.TODO(), d.computeClient, d.config.MachineID, opts).Extract()
_, err = volumeattach.Create(ctx, d.computeClient, d.config.MachineID, opts).Extract()

if err != nil {
logger.WithError(err).Errorf("Error attaching volume: %s", err.Error())
Expand Down Expand Up @@ -293,6 +299,8 @@ func (d plugin) Remove(r *volume.RemoveRequest) error {
logger.Infof("Removing volume '%s' ...", r.Name)
logger.Debugf("Remove: %+v", r)

ctx := context.TODO()

vol, err := d.getByName(r.Name)

if err != nil {
Expand All @@ -304,15 +312,15 @@ func (d plugin) Remove(r *volume.RemoveRequest) error {

if len(vol.Attachments) > 0 {
logger.Debug("Volume still attached, detaching first")
if vol, err = d.detachVolume(logger.Context, vol); err != nil {
if vol, err = d.detachVolume(ctx, vol); err != nil {
logger.WithError(err).Error("Error detaching volume")
return err
}
}

logger.Debug("Deleting block volume...")

err = volumes.Delete(context.TODO(), d.blockClient, vol.ID, volumes.DeleteOpts{}).ExtractErr()
err = volumes.Delete(ctx, d.blockClient, vol.ID, volumes.DeleteOpts{}).ExtractErr()
if err != nil {
logger.WithError(err).Errorf("Error deleting volume: %s", err.Error())
return err
Expand All @@ -328,6 +336,8 @@ func (d plugin) Unmount(r *volume.UnmountRequest) error {
logger.Infof("Unmounting volume '%s' ...", r.Name)
logger.Debugf("Unmount: %+v", r)

ctx := context.TODO()

d.mutex.Lock()
defer d.mutex.Unlock()

Expand All @@ -348,7 +358,7 @@ func (d plugin) Unmount(r *volume.UnmountRequest) error {
if err != nil {
logger.WithError(err).Error("Error retriving volume")
} else {
_, err = d.detachVolume(logger.Context, vol)
_, err = d.detachVolume(ctx, vol)
if err != nil {
logger.WithError(err).Error("Error detaching volume")
}
Expand All @@ -360,8 +370,10 @@ func (d plugin) Unmount(r *volume.UnmountRequest) error {
func (d plugin) getByName(name string) (*volumes.Volume, error) {
var volume *volumes.Volume

ctx := context.TODO()

pager := volumes.List(d.blockClient, volumes.ListOpts{Name: name})
err := pager.EachPage(context.TODO(), func(ctx context.Context, page pagination.Page) (bool, error) {
err := pager.EachPage(ctx, func(ctx context.Context, page pagination.Page) (bool, error) {
vList, err := volumes.ExtractVolumes(page)

if err != nil {
Expand Down Expand Up @@ -414,7 +426,7 @@ func (d plugin) waitOnVolumeState(ctx context.Context, vol *volumes.Volume, stat
}
}

log.WithContext(ctx).Debugf("Volume did not become %s: %+v", status, vol)
log.Debugf("Volume status did not change to %s: %+v", status, vol)

return nil, fmt.Errorf("Volume status did became %s", status)
return nil, fmt.Errorf("Volume status changed to %s", status)
}
Loading