Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 110 additions & 15 deletions pkg/service/controller.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package service

import (
"fmt"
"strconv"
"time"

Expand Down Expand Up @@ -66,7 +67,10 @@ func (c *ControllerService) validateCreateVolumeRequest(req *csi.CreateVolumeReq
return false, status.Error(codes.InvalidArgument, "volume capabilities missing in request")
}

isBlock, isRWX := getAccessMode(caps)
isBlock, isRWX, err := getAccessMode(caps)
if err != nil {
return false, err
}

if isRWX && !isBlock {
return false, status.Error(codes.InvalidArgument, "non-block volume with RWX access mode is not supported")
Expand All @@ -91,25 +95,46 @@ func (c *ControllerService) validateCreateVolumeRequest(req *csi.CreateVolumeReq
return isRWX, nil
}

func getAccessMode(caps []*csi.VolumeCapability) (bool, bool) {
isBlock := false
isRWX := false

func getAccessMode(caps []*csi.VolumeCapability) (isBlock, isRWX bool, err error) {
for _, capability := range caps {
if capability != nil {
if capability.GetBlock() != nil {
isBlock = true
}

if am := capability.GetAccessMode(); am != nil {
if am.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER {
isRWX = true
}
isRWX, err = hasRWXCapabiltyAccessMode(am)
}
}
}

return isBlock, isRWX
return isBlock, isRWX, err
}

// hasRWXCapabiltyAccessMode will return whether a volume is RWX. It may also
// return an error if the received access mode is unknown.
//
// Parameters:
// - cap: Volume Capability AccessMode.
//
// Returns:
// - bool: True if the capability represents an RWX volume.
// - error: Unsupported capability.
func hasRWXCapabiltyAccessMode(cap *csi.VolumeCapability_AccessMode) (bool, error) {
switch cap.GetMode() {
case
csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY,
csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER,
csi.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER:
return false, nil
case
csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY,
csi.VolumeCapability_AccessMode_MULTI_NODE_SINGLE_WRITER,
csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER:
return true, nil
}
return false, fmt.Errorf("unknown volume capability")
}

// CreateVolume Create a new DataVolume.
Expand Down Expand Up @@ -326,7 +351,31 @@ func (c *ControllerService) ControllerPublishVolume(
if err := c.validateControllerPublishVolumeRequest(req); err != nil {
return nil, err
}

dvName := req.GetVolumeId()

// Get VM name from node ID which is a namespace/name
_, vmName, err := cache.SplitMetaNamespaceKey(req.NodeId)
if err != nil {
klog.Error("failed getting VM Name for node ID " + req.NodeId)
return nil, err
}

// Check if the volume is RWO, and if it is, check if its in a different Virtual Machine Instance.
isRWX, err := hasRWXCapabiltyAccessMode(req.GetVolumeCapability().GetAccessMode())
if err != nil {
return nil, fmt.Errorf("error checking access mode: %w", err)
}
if !isRWX {
alreadyAttached, err := c.IsVolumeAttachedToOtherVMI(ctx, dvName, c.infraClusterNamespace, vmName)
if err != nil {
return nil, status.Errorf(codes.FailedPrecondition, "failed to check if volume is already attached: %s", err)
}
if alreadyAttached {
return nil, status.Errorf(codes.FailedPrecondition, "volume is attached to another VM")
}
}

if _, err := c.virtClient.GetDataVolume(ctx, c.infraClusterNamespace, dvName); errors.IsNotFound(err) {
return nil, status.Errorf(codes.NotFound, "volume %s not found", req.GetVolumeId())
} else if err != nil {
Expand All @@ -335,12 +384,6 @@ func (c *ControllerService) ControllerPublishVolume(

klog.V(3).Infof("Attaching DataVolume %s to Node ID %s", dvName, req.NodeId)

// Get VM name from node ID which is a namespace/name
_, vmName, err := cache.SplitMetaNamespaceKey(req.NodeId)
if err != nil {
klog.Error("failed getting VM Name for node ID " + req.NodeId)
return nil, err
}
_, err = c.virtClient.GetWorkloadManagingVirtualMachine(ctx, c.infraClusterNamespace, vmName)
if err != nil {
if !errors.IsNotFound(err) {
Expand Down Expand Up @@ -850,3 +893,55 @@ func (c *ControllerService) ControllerGetCapabilities(context.Context, *csi.Cont
func (c *ControllerService) ControllerGetVolume(_ context.Context, _ *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

// IsVolumeAttachedToOtherVMI checks if a PVC is actively
// used by any VirtualMachineInstance other than the current one.
//
// NOTE: This function uses vmi.Status.VolumeStatus as the source of truth for
// what is currently attached. It directly compares the volume name in the status
// with the target PVC name.
//
// Parameters:
// - ctx: The context for cancellation.
// - dvName: The name of the PersistentVolumeClaim to check for.
// - infraNamespace: The namespace of the PersistentVolumeClaim.
// - currentVMIName: The name of the VMI for the current ControllerPublishVolume
// request. We want to ignore this VMI in our check.
//
// Returns:
// - bool: True if the volume is attached to another VMI.
// - error: An error if listing VMIs fails.
func (c *ControllerService) IsVolumeAttachedToOtherVMI(
ctx context.Context,
dvName string,
infraNamespace string,
currentVMIName string,
) (bool, error) {
vmis, err := c.virtClient.ListVirtualMachines(ctx, infraNamespace)
if err != nil {
return false, fmt.Errorf("failed to list Virtual Machine Instances in namespace %s: %w", infraNamespace, err)
}

for _, vmi := range vmis {
// Skip the VMI that the volume is intended for.
if vmi.Name == currentVMIName {
continue
}

// The source of truth is the VMI's status. We iterate through the volumes
// that are reported as active in the status.
for _, volumeStatus := range vmi.Status.VolumeStatus {
// If the name in the status matches our PVC name, it means the volume
// is actively attached to this other VMI.
if volumeStatus.Name == dvName {
klog.Infof(
"CONFLICT: PVC %s/%s is in use by VMI %s/%s",
infraNamespace, dvName, vmi.Namespace, vmi.Name,
)
return true, nil
}
}
}

return false, nil
}
Loading