Skip to content

Commit

Permalink
fix(controller): prevent zfs volume cr deletion if snapshot exists
Browse files Browse the repository at this point in the history
Signed-off-by: sinhaashish <[email protected]>
  • Loading branch information
sinhaashish committed Feb 3, 2025
1 parent da8b7ad commit 0964543
Show file tree
Hide file tree
Showing 11 changed files with 754 additions and 113 deletions.
12 changes: 12 additions & 0 deletions pkg/builder/volbuilder/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/zfs/v1"
)

// MarkForDeletionAnnotation is the annotation key
const MarkForDeletionAnnotation string = "openebs.io/marked-for-deletion"

// Builder is the builder object for ZFSVolume
type Builder struct {
volume *ZFSVolume
Expand Down Expand Up @@ -154,6 +157,15 @@ func (b *Builder) WithVolBlockSize(bs string) *Builder {
return b
}

// WithAnnotation sets the annotation of ZFSVolume
func (b *Builder) WithAnnotation() *Builder {
if b.volume.Object.Annotations == nil {
b.volume.Object.Annotations = make(map[string]string)
}
b.volume.Object.Annotations[MarkForDeletionAnnotation] = "true"
return b
}

// WithVolumeType sets if ZFSVolume needs to be thin provisioned
func (b *Builder) WithVolumeType(vtype string) *Builder {
b.volume.Object.Spec.VolumeType = vtype
Expand Down
103 changes: 92 additions & 11 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,14 +503,14 @@ func (cs *controller) DeleteVolume(
defer unlock()

// verify if the volume has already been deleted
vol, err := zfs.GetVolume(volumeID)
vol, err := zfs.GetZFSVolume(volumeID)
if vol != nil && vol.DeletionTimestamp != nil {
goto deleteResponse
return csipayload.NewDeleteVolumeResponseBuilder().Build(), nil
}

if err != nil {
if k8serror.IsNotFound(err) {
goto deleteResponse
return csipayload.NewDeleteVolumeResponseBuilder().Build(), nil
}
return nil, errors.Wrapf(
err,
Expand All @@ -524,19 +524,45 @@ func (cs *controller) DeleteVolume(
return nil, status.Error(codes.Internal, "can not delete, volume creation is in progress")
}

// Delete the corresponding ZV CR
err = zfs.DeleteVolume(volumeID)
// Fetch the list of snapshot for the given volume
snapList, err := zfs.GetSnapshotForVolume(volumeID)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to handle delete volume request for {%s}",
volumeID,
return nil, status.Errorf(
codes.NotFound,
"failed to handle delete volume request for {%s}, "+
"validation failed checking for snapshots. Error: %s",
req.VolumeId,
err.Error(),
)
}

// Delete the corresponding ZV CR only if there are no snapshots present for the volume
if len(snapList.Items) == 0 {
err = zfs.DeleteVolume(volumeID)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to handle delete volume request for {%s}",
volumeID,
)
}
} else {
// add annotation to the volume to indicate that it is eligible for deletion
// once all the snapshots are deleted and the reclaim policy is not Retain
// this volume will be deleted
err = zfs.MarkForDeletion(volumeID)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to annotate volume on deletion request for {%s}",
volumeID,
)
}

}

sendEventOrIgnore("", volumeID, vol.Spec.Capacity, analytics.VolumeDeprovision)

deleteResponse:
return csipayload.NewDeleteVolumeResponseBuilder().Build(), nil
}

Expand Down Expand Up @@ -815,8 +841,36 @@ func (cs *controller) DeleteSnapshot(
// should succeed when an invalid snapshot id is used
return &csi.DeleteSnapshotResponse{}, nil
}
volumeID := snapshotID[0]
unlock := cs.volumeLock.LockVolumeWithSnapshot(snapshotID[0], snapshotID[1])
defer unlock()

// verify if the snapshot has already been deleted
_, err := zfs.GetZFSSnapshot(snapshotID[1])

if err != nil {
if k8serror.IsNotFound(err) {
return csipayload.NewDeleteSnapshotResponseBuilder().Build(), nil
}
return nil, errors.Wrapf(
err,
"failed to get snapshot for {%s}",
snapshotID[1],
)
}

// Fetch the list of snapshot for the given volume
snapList, err := zfs.GetSnapshotForVolume(volumeID)
if err != nil {
return nil, status.Errorf(
codes.FailedPrecondition,
"failed to handle delete snapshot request for {%s}, "+
"validation failed checking for snapshot list for volume. Error: %s",
volumeID,
err.Error(),
)
}

if err := zfs.DeleteSnapshot(snapshotID[1]); err != nil {
return nil, status.Errorf(
codes.Internal,
Expand All @@ -825,7 +879,34 @@ func (cs *controller) DeleteSnapshot(
err.Error(),
)
}
return &csi.DeleteSnapshotResponse{}, nil

eligibleForDeletion, err := zfs.IsVolumeEligibleForDeletion(volumeID)
if err != nil {
return nil, status.Errorf(
codes.FailedPrecondition,
"failed to handle delete snapshot request for {%s}, "+
"validation failed checking for eligible for deletion. Error: %s",
volumeID,
err.Error(),
)
}

klog.Infof(" The snap list size %v and eligibleForDeletion %v", len(snapList.Items), eligibleForDeletion)
// Delete the corresponding ZV CR only if this is the last snapshot
// for the volume and the corresponding pvc is deleted
if len(snapList.Items) == 1 && eligibleForDeletion {
err = zfs.DeleteVolume(volumeID)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to handle delete volume request for {%s}",
volumeID,
)
}
klog.Infof("volume %s deleted after the deletion of last snapshot %s ", volumeID, snapshotID[1])
}

return csipayload.NewDeleteSnapshotResponseBuilder().Build(), nil
}

// ListSnapshots lists all snapshots for the
Expand Down
20 changes: 20 additions & 0 deletions pkg/response/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,23 @@ func NewDeleteVolumeResponseBuilder() *DeleteVolumeResponseBuilder {
func (b *DeleteVolumeResponseBuilder) Build() *csi.DeleteVolumeResponse {
return b.response
}

// DeleteSnapshotResponseBuilder helps building an
// instance of csi DeleteSnapshotResponse
type DeleteSnapshotResponseBuilder struct {
response *csi.DeleteSnapshotResponse
}

// NewDeleteSnapshotResponseBuilder returns a new
// instance of DeleteSnapshotResponseBuilder
func NewDeleteSnapshotResponseBuilder() *DeleteSnapshotResponseBuilder {
return &DeleteSnapshotResponseBuilder{
response: &csi.DeleteSnapshotResponse{},
}
}

// Build returns the constructed instance
// of csi DeleteSnapshotResponse
func (b *DeleteSnapshotResponseBuilder) Build() *csi.DeleteSnapshotResponse {
return b.response
}
64 changes: 57 additions & 7 deletions pkg/zfs/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/openebs/zfs-localpv/pkg/builder/restorebuilder"
"github.com/openebs/zfs-localpv/pkg/builder/snapbuilder"
"github.com/openebs/zfs-localpv/pkg/builder/volbuilder"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -213,13 +215,6 @@ func DeleteSnapshot(snapname string) (err error) {
return
}

// GetVolume the corresponding ZFSVolume CR
func GetVolume(volumeID string) (*apis.ZFSVolume, error) {
return volbuilder.NewKubeclient().
WithNamespace(OpenEBSNamespace).
Get(volumeID, metav1.GetOptions{})
}

// DeleteVolume deletes the corresponding ZFSVol CR
func DeleteVolume(volumeID string) (err error) {
err = volbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).Delete(volumeID)
Expand Down Expand Up @@ -251,6 +246,18 @@ func GetZFSVolume(volumeID string) (*apis.ZFSVolume, error) {
return vol, err
}

// UpdateZFSVolumeAnnotation updtates the ZFSVolume CR with the given annotation
func UpdateZFSVolumeAnnotation(vol *apis.ZFSVolume) error {
newVol, err := volbuilder.BuildFrom(vol).
WithAnnotation().Build()

if err != nil {
return err
}
_, err = volbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).Update(newVol)
return err
}

// GetZFSVolumeState returns ZFSVolume OwnerNode and State for
// the given volume. CreateVolume request may call it again and
// again until volume is "Ready".
Expand Down Expand Up @@ -441,3 +448,46 @@ func IsVolumeReady(vol *apis.ZFSVolume) bool {

return false
}

// GetSnapshotForVolume fetches all the snapshots for the given volume
func GetSnapshotForVolume(volumeID string) (*apis.ZFSSnapshotList, error) {
listOptions := metav1.ListOptions{
LabelSelector: ZFSVolKey + "=" + volumeID,
}
snapList, err := snapbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).List(listOptions)
return snapList, err
}

// MarkForDeletion marks the volume for deletion by adding the annotation
func MarkForDeletion(volumeName string) error {
zv, err := GetZFSVolume(volumeName)
if err != nil {
klog.Errorf("failed to get ZV %s: %v", volumeName, err)
return err
}

err = UpdateZFSVolumeAnnotation(zv)
if err != nil {
klog.Errorf("Failed to annotate the ZV with marked for deletion %s: %v", volumeName, err)
return err
}
return nil
}

// IsVolumeEligibleForDeletion checks if the volume can be deleted or not
func IsVolumeEligibleForDeletion(volumeName string) (bool, error) {

zfsVol, err := GetZFSVolume(volumeName)
if err != nil {
return false, status.Errorf(
codes.Internal,
"failed to get ZFSVolume %s: %v",
volumeName,
err,
)
}
if zfsVol.Annotations[volbuilder.MarkForDeletionAnnotation] == "true" {
return true, nil
}
return false, nil
}
Loading

0 comments on commit 0964543

Please sign in to comment.