Skip to content

Commit

Permalink
fix: disk lun collision issue
Browse files Browse the repository at this point in the history
fix

fix

refactor

refine

feat: add throttling for disk lun check

fix

feat: add check-disk-lun-collision flag

test: add ut

chore: fix gofmt

fix panic

fix panic

fix

fix

refine
  • Loading branch information
andyzhangx committed Dec 7, 2023
1 parent a97da2d commit 962e6ce
Show file tree
Hide file tree
Showing 12 changed files with 276 additions and 17 deletions.
Binary file modified charts/latest/azuredisk-csi-driver-v0.0.0.tgz
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ spec:
- "--enable-traffic-manager={{ .Values.controller.enableTrafficManager }}"
- "--traffic-manager-port={{ .Values.controller.trafficManagerPort }}"
- "--enable-otel-tracing={{ .Values.controller.otelTracing.enabled }}"
- "--check-disk-lun-collision=true"
ports:
- containerPort: {{ .Values.controller.livenessProbe.healthPort }}
name: healthz
Expand Down
3 changes: 2 additions & 1 deletion pkg/azureconstants/azure_constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ const (
StandardSsdAccountPrefix = "standardssd"
StorageAccountTypeField = "storageaccounttype"
TagsField = "tags"
ThrottlingKey = "throttlingKey"
GetDiskThrottlingKey = "getdiskthrottlingKey"
CheckDiskLunThrottlingKey = "checkdisklunthrottlingKey"
TrueValue = "true"
FalseValue = "false"
UserAgentField = "useragent"
Expand Down
82 changes: 75 additions & 7 deletions pkg/azuredisk/azuredisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ import (
"google.golang.org/grpc/status"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/volume/util/hostutil"
"k8s.io/mount-utils"
"k8s.io/utils/pointer"

consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
"sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
Expand Down Expand Up @@ -76,6 +78,7 @@ type DriverOptions struct {
GetNodeIDFromIMDS bool
EnableOtelTracing bool
WaitForSnapshotReady bool
CheckDiskLUNCollision bool
}

// CSIDriver defines the interface for a CSI driver.
Expand Down Expand Up @@ -125,14 +128,15 @@ type DriverCore struct {
getNodeIDFromIMDS bool
enableOtelTracing bool
shouldWaitForSnapshotReady bool
checkDiskLUNCollision bool
}

// Driver is the v1 implementation of the Azure Disk CSI Driver.
type Driver struct {
DriverCore
volumeLocks *volumehelper.VolumeLocks
// a timed cache GetDisk throttling
getDiskThrottlingCache azcache.Resource
// a timed cache for throttling
throttlingCache azcache.Resource
}

// newDriverV1 Creates a NewCSIDriver object. Assumes vendor version is equal to driver version &
Expand Down Expand Up @@ -167,6 +171,7 @@ func newDriverV1(options *DriverOptions) *Driver {
driver.getNodeIDFromIMDS = options.GetNodeIDFromIMDS
driver.enableOtelTracing = options.EnableOtelTracing
driver.shouldWaitForSnapshotReady = options.WaitForSnapshotReady
driver.checkDiskLUNCollision = options.CheckDiskLUNCollision
driver.volumeLocks = volumehelper.NewVolumeLocks()
driver.ioHandler = azureutils.NewOSIOHandler()
driver.hostUtil = hostutil.NewHostUtil()
Expand All @@ -179,7 +184,7 @@ func newDriverV1(options *DriverOptions) *Driver {
if err != nil {
klog.Fatalf("%v", err)
}
driver.getDiskThrottlingCache = cache
driver.throttlingCache = cache
return &driver
}

Expand Down Expand Up @@ -289,9 +294,18 @@ func (d *Driver) Run(endpoint, kubeconfig string, disableAVSetNodes, testingMock
}

func (d *Driver) isGetDiskThrottled() bool {
cache, err := d.getDiskThrottlingCache.Get(consts.ThrottlingKey, azcache.CacheReadTypeDefault)
cache, err := d.throttlingCache.Get(consts.GetDiskThrottlingKey, azcache.CacheReadTypeDefault)
if err != nil {
klog.Warningf("getDiskThrottlingCache(%s) return with error: %s", consts.ThrottlingKey, err)
klog.Warningf("throttlingCache(%s) return with error: %s", consts.GetDiskThrottlingKey, err)
return false
}
return cache != nil
}

func (d *Driver) isCheckDiskLunThrottled() bool {
cache, err := d.throttlingCache.Get(consts.CheckDiskLunThrottlingKey, azcache.CacheReadTypeDefault)
if err != nil {
klog.Warningf("throttlingCache(%s) return with error: %s", consts.CheckDiskLunThrottlingKey, err)
return false
}
return cache != nil
Expand All @@ -317,7 +331,7 @@ func (d *Driver) checkDiskExists(ctx context.Context, diskURI string) (*compute.
if rerr != nil {
if rerr.IsThrottled() || strings.Contains(rerr.RawError.Error(), consts.RateLimited) {
klog.Warningf("checkDiskExists(%s) is throttled with error: %v", diskURI, rerr.Error())
d.getDiskThrottlingCache.Set(consts.ThrottlingKey, "")
d.throttlingCache.Set(consts.GetDiskThrottlingKey, "")
return nil, nil
}
return nil, rerr.Error()
Expand All @@ -342,7 +356,7 @@ func (d *Driver) checkDiskCapacity(ctx context.Context, subsID, resourceGroup, d
} else {
if rerr.IsThrottled() || strings.Contains(rerr.RawError.Error(), consts.RateLimited) {
klog.Warningf("checkDiskCapacity(%s, %s) is throttled with error: %v", resourceGroup, diskName, rerr.Error())
d.getDiskThrottlingCache.Set(consts.ThrottlingKey, "")
d.throttlingCache.Set(consts.GetDiskThrottlingKey, "")
}
}
return true, nil
Expand Down Expand Up @@ -470,6 +484,60 @@ func (d *DriverCore) waitForSnapshotReady(ctx context.Context, subsID, resourceG
}
}

// getUsedLunsFromVolumeAttachments returns a list of used luns from VolumeAttachments
func (d *DriverCore) getUsedLunsFromVolumeAttachments(ctx context.Context, nodeName string) ([]int, error) {
kubeClient := d.cloud.KubeClient
if kubeClient == nil || kubeClient.StorageV1() == nil || kubeClient.StorageV1().VolumeAttachments() == nil {
return nil, fmt.Errorf("kubeClient or kubeClient.StorageV1() or kubeClient.StorageV1().VolumeAttachments() is nil")
}

volumeAttachments, err := kubeClient.StorageV1().VolumeAttachments().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}

usedLuns := make([]int, 0)
if volumeAttachments == nil {
klog.V(2).Infof("volumeAttachments is nil")
return usedLuns, nil
}
for _, va := range volumeAttachments.Items {
klog.V(6).Infof("attacher: %s, nodeName: %s, Status: %v, PV: %s, attachmentMetadata: %v", va.Spec.Attacher, va.Spec.NodeName,
va.Status.Attached, pointer.StringDeref(va.Spec.Source.PersistentVolumeName, ""), va.Status.AttachmentMetadata)
if va.Spec.Attacher == consts.DefaultDriverName && strings.EqualFold(va.Spec.NodeName, nodeName) && va.Status.Attached {
if k, ok := va.Status.AttachmentMetadata[consts.LUN]; ok {
lun, err := strconv.Atoi(k)
if err != nil {
klog.Warningf("VolumeAttachment(%s) lun(%s) is not a valid integer", va.Name, k)
continue
}
usedLuns = append(usedLuns, lun)
}
}
}
return usedLuns, nil
}

// getUsedLunsFromNode returns a list of sorted used luns from Node
func (d *DriverCore) getUsedLunsFromNode(nodeName types.NodeName) ([]int, error) {
disks, _, err := d.cloud.GetNodeDataDisks(nodeName, azcache.CacheReadTypeDefault)
if err != nil {
klog.Errorf("error of getting data disks for node %s: %v", nodeName, err)
return nil, err
}

usedLuns := make([]int, 0)
// get all disks attached to the node
for _, disk := range disks {
if disk.Lun == nil {
klog.Warningf("disk(%s) lun is nil", *disk.Name)
continue
}
usedLuns = append(usedLuns, int(*disk.Lun))
}
return usedLuns, nil
}

// getNodeInfoFromLabels get zone, instanceType from node labels
func getNodeInfoFromLabels(ctx context.Context, nodeName string, kubeClient clientset.Interface) (string, string, error) {
if kubeClient == nil || kubeClient.CoreV1() == nil {
Expand Down
66 changes: 66 additions & 0 deletions pkg/azuredisk/azuredisk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/utils/pointer"
consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/diskclient/mockdiskclient"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/snapshotclient/mocksnapshotclient"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/vmclient/mockvmclient"
azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
"sigs.k8s.io/cloud-provider-azure/pkg/retry"
)
Expand Down Expand Up @@ -427,3 +429,67 @@ func TestGetVMSSInstanceName(t *testing.T) {
}
}
}

func TestGetUsedLunsFromVolumeAttachments(t *testing.T) {
d, _ := NewFakeDriver(t)
tests := []struct {
name string
nodeName string
expectedUsedLunList []int
expectedErr error
}{
{
name: "nil kubeClient",
nodeName: "test-node",
expectedUsedLunList: nil,
expectedErr: fmt.Errorf("kubeClient or kubeClient.StorageV1() or kubeClient.StorageV1().VolumeAttachments() is nil"),
},
}
for _, test := range tests {
result, err := d.getUsedLunsFromVolumeAttachments(context.Background(), test.nodeName)
if !reflect.DeepEqual(err, test.expectedErr) {
t.Errorf("test(%s): err(%v) != expected err(%v)", test.name, err, test.expectedErr)
}
if !reflect.DeepEqual(result, test.expectedUsedLunList) {
t.Errorf("test(%s): result(%v) != expected result(%v)", test.name, result, test.expectedUsedLunList)
}
}
}

func TestGetUsedLunsFromNode(t *testing.T) {
d, _ := NewFakeDriver(t)
vm := compute.VirtualMachine{}
dataDisks := make([]compute.DataDisk, 2)
dataDisks[0] = compute.DataDisk{Lun: pointer.Int32(int32(0)), Name: &testVolumeName}
dataDisks[1] = compute.DataDisk{Lun: pointer.Int32(int32(2)), Name: &testVolumeName}
vm.VirtualMachineProperties = &compute.VirtualMachineProperties{
StorageProfile: &compute.StorageProfile{
DataDisks: &dataDisks,
},
}
mockVMsClient := d.getCloud().VirtualMachinesClient.(*mockvmclient.MockInterface)
mockVMsClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(vm, nil).AnyTimes()

tests := []struct {
name string
nodeName string
expectedUsedLunList []int
expectedErr error
}{
{
name: "lun 0 and 2 are used",
nodeName: "test-node",
expectedUsedLunList: []int{0, 2},
expectedErr: nil,
},
}
for _, test := range tests {
result, err := d.getUsedLunsFromNode(types.NodeName(test.nodeName))
if !reflect.DeepEqual(err, test.expectedErr) {
t.Errorf("test(%s): err(%v) != expected err(%v)", test.name, err, test.expectedErr)
}
if !reflect.DeepEqual(result, test.expectedUsedLunList) {
t.Errorf("test(%s): result(%v) != expected result(%v)", test.name, result, test.expectedUsedLunList)
}
}
}
4 changes: 2 additions & 2 deletions pkg/azuredisk/azuredisk_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ func TestCheckDiskCapacity_V1(t *testing.T) {
}
d.getCloud().DisksClient.(*mockdiskclient.MockInterface).EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(disk, nil).AnyTimes()

d.setDiskThrottlingCache(consts.ThrottlingKey, "")
d.setThrottlingCache(consts.GetDiskThrottlingKey, "")
flag, _ := d.checkDiskCapacity(context.TODO(), "", resourceGroup, diskName, 11)
assert.Equal(t, flag, true)
}

func TestDriver_checkDiskExists_V1(t *testing.T) {
d, _ := NewFakeDriver(t)
d.setDiskThrottlingCache(consts.ThrottlingKey, "")
d.setThrottlingCache(consts.GetDiskThrottlingKey, "")
_, err := d.checkDiskExists(context.TODO(), "testurl/subscriptions/12/resourceGroups/23/providers/Microsoft.Compute/disks/name")
assert.Equal(t, err, nil)
}
39 changes: 37 additions & 2 deletions pkg/azuredisk/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
waitForSnapshotReadyInterval = 5 * time.Second
waitForSnapshotReadyTimeout = 10 * time.Minute
maxErrMsgLength = 990
checkDiskLunThrottleLatency = 1 * time.Second
)

// listVolumeStatus explains the return status of `listVolumesByResourceGroup`
Expand Down Expand Up @@ -437,6 +438,8 @@ func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.Controlle
if cachingMode, err = azureutils.GetCachingMode(volumeContext); err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}

occupiedLuns := d.getOccupiedLunsFromNode(ctx, nodeName, diskURI)
klog.V(2).Infof("Trying to attach volume %s to node %s", diskURI, nodeName)

asyncAttach := isAsyncAttachEnabled(d.enableAsyncAttach, volumeContext)
Expand All @@ -445,7 +448,7 @@ func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.Controlle
klog.V(2).Infof("attachDiskInitialDelayInMs is set to %d", attachDiskInitialDelay)
d.cloud.AttachDetachInitialDelayInMs = attachDiskInitialDelay
}
lun, err = d.cloud.AttachDisk(ctx, asyncAttach, diskName, diskURI, nodeName, cachingMode, disk, nil)
lun, err = d.cloud.AttachDisk(ctx, asyncAttach, diskName, diskURI, nodeName, cachingMode, disk, occupiedLuns)
if err == nil {
klog.V(2).Infof("Attach operation successful: volume %s attached to node %s.", diskURI, nodeName)
} else {
Expand All @@ -460,7 +463,7 @@ func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.Controlle
return nil, status.Errorf(codes.Internal, "Could not detach volume %s from node %s: %v", diskURI, derr.CurrentNode, err)
}
klog.V(2).Infof("Trying to attach volume %s to node %s again", diskURI, nodeName)
lun, err = d.cloud.AttachDisk(ctx, asyncAttach, diskName, diskURI, nodeName, cachingMode, disk, nil)
lun, err = d.cloud.AttachDisk(ctx, asyncAttach, diskName, diskURI, nodeName, cachingMode, disk, occupiedLuns)
}
if err != nil {
klog.Errorf("Attach volume %s to instance %s failed with %v", diskURI, nodeName, err)
Expand Down Expand Up @@ -561,6 +564,38 @@ func (d *Driver) ValidateVolumeCapabilities(ctx context.Context, req *csi.Valida
}}, nil
}

// getOccupiedLunsFromNode returns the occupied luns from node
func (d *Driver) getOccupiedLunsFromNode(ctx context.Context, nodeName types.NodeName, diskURI string) []int {
var occupiedLuns []int
if d.checkDiskLUNCollision && !d.isCheckDiskLunThrottled() {
now := time.Now()
if usedLunsFromVA, err := d.getUsedLunsFromVolumeAttachments(ctx, string(nodeName)); err == nil {
if len(usedLunsFromVA) > 0 {
if usedLunsFromNode, err := d.getUsedLunsFromNode(nodeName); err == nil {
occupiedLuns = volumehelper.GetElementsInArray1NotInArray2(usedLunsFromVA, usedLunsFromNode)
if len(occupiedLuns) > 0 {
klog.Warningf("node: %s, usedLuns from VolumeAttachments: %v, usedLuns from Node: %v, occupiedLuns: %v, disk: %s", nodeName, usedLunsFromVA, usedLunsFromNode, occupiedLuns, diskURI)
} else {
klog.V(6).Infof("node: %s, usedLuns from VolumeAttachments: %v, usedLuns from Node: %v, occupiedLuns: %v, disk: %s", nodeName, usedLunsFromVA, usedLunsFromNode, occupiedLuns, diskURI)
}
} else {
klog.Warningf("getUsedLunsFromNode(%s, %s) failed with %v", nodeName, diskURI, err)
}
}
} else {
klog.Warningf("getUsedLunsFromVolumeAttachments(%s, %s) failed with %v", nodeName, diskURI, err)
}
latency := time.Since(now)
if latency > checkDiskLunThrottleLatency {
klog.Warningf("checkDiskLun(%s) on node %s took %v (limit: %v), disable disk lun check temporarily", diskURI, nodeName, latency, checkDiskLunThrottleLatency)
d.throttlingCache.Set(consts.CheckDiskLunThrottlingKey, "")
} else {
klog.V(6).Infof("checkDiskLun(%s) on node %s took %v", diskURI, nodeName, latency)
}
}
return occupiedLuns
}

// ControllerGetCapabilities returns the capabilities of the Controller plugin
func (d *Driver) ControllerGetCapabilities(_ context.Context, _ *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
return &csi.ControllerGetCapabilitiesResponse{
Expand Down
11 changes: 7 additions & 4 deletions pkg/azuredisk/fake_azuredisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/types"
"k8s.io/mount-utils"
testingexec "k8s.io/utils/exec/testing"

Expand Down Expand Up @@ -92,7 +93,9 @@ type FakeDriver interface {
ensureMountPoint(string) (bool, error)
ensureBlockTargetFile(string) error
getDevicePathWithLUN(lunStr string) (string, error)
setDiskThrottlingCache(key string, value string)
setThrottlingCache(key string, value string)
getUsedLunsFromVolumeAttachments(context.Context, string) ([]int, error)
getUsedLunsFromNode(nodeName types.NodeName) ([]int, error)
}

type fakeDriverV1 struct {
Expand Down Expand Up @@ -131,7 +134,7 @@ func newFakeDriverV1(t *testing.T) (*fakeDriverV1, error) {
if err != nil {
return nil, err
}
driver.getDiskThrottlingCache = cache
driver.throttlingCache = cache
driver.deviceHelper = mockoptimization.NewMockInterface(ctrl)

driver.AddControllerServiceCapabilities(
Expand Down Expand Up @@ -163,8 +166,8 @@ func (d *fakeDriverV1) setNextCommandOutputScripts(scripts ...testingexec.FakeAc
d.mounter.Exec.(*mounter.FakeSafeMounter).SetNextCommandOutputScripts(scripts...)
}

func (d *fakeDriverV1) setDiskThrottlingCache(key string, value string) {
d.getDiskThrottlingCache.Set(key, value)
func (d *fakeDriverV1) setThrottlingCache(key string, value string) {
d.throttlingCache.Set(key, value)
}

func createVolumeCapabilities(accessMode csi.VolumeCapability_AccessMode_Mode) []*csi.VolumeCapability {
Expand Down
2 changes: 1 addition & 1 deletion pkg/azuredisk/fake_azuredisk_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,5 @@ func (d *fakeDriverV2) setNextCommandOutputScripts(scripts ...testingexec.FakeAc
d.mounter.Exec.(*mounter.FakeSafeMounter).SetNextCommandOutputScripts(scripts...)
}

func (d *DriverV2) setDiskThrottlingCache(key string, value string) {
func (d *DriverV2) setThrottlingCache(key string, value string) {
}
Loading

0 comments on commit 962e6ce

Please sign in to comment.