Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(controller): prevent zfs volume cr deletion if snapshot exists #613

Merged
merged 1 commit into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pkg/builder/volbuilder/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/zfs/v1"
)

// MarkForDeletionAnnotation is the annotation key
const MarkForDeletionAnnotation string = "openebs.io/marked-for-deletion"

// Builder is the builder object for ZFSVolume
type Builder struct {
volume *ZFSVolume
Expand Down Expand Up @@ -154,6 +157,15 @@ func (b *Builder) WithVolBlockSize(bs string) *Builder {
return b
}

// WithAnnotation sets the annotation of ZFSVolume
func (b *Builder) WithAnnotation() *Builder {
if b.volume.Object.Annotations == nil {
b.volume.Object.Annotations = make(map[string]string)
}
b.volume.Object.Annotations[MarkForDeletionAnnotation] = "true"
return b
}

// WithVolumeType sets if ZFSVolume needs to be thin provisioned
func (b *Builder) WithVolumeType(vtype string) *Builder {
b.volume.Object.Spec.VolumeType = vtype
Expand Down
103 changes: 92 additions & 11 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,14 +503,14 @@ func (cs *controller) DeleteVolume(
defer unlock()

// verify if the volume has already been deleted
vol, err := zfs.GetVolume(volumeID)
vol, err := zfs.GetZFSVolume(volumeID)
if vol != nil && vol.DeletionTimestamp != nil {
goto deleteResponse
return csipayload.NewDeleteVolumeResponseBuilder().Build(), nil
}

if err != nil {
if k8serror.IsNotFound(err) {
goto deleteResponse
return csipayload.NewDeleteVolumeResponseBuilder().Build(), nil
}
return nil, errors.Wrapf(
err,
Expand All @@ -524,19 +524,45 @@ func (cs *controller) DeleteVolume(
return nil, status.Error(codes.Internal, "can not delete, volume creation is in progress")
}

// Delete the corresponding ZV CR
err = zfs.DeleteVolume(volumeID)
// Fetch the list of snapshot for the given volume
snapList, err := zfs.GetSnapshotForVolume(volumeID)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to handle delete volume request for {%s}",
volumeID,
return nil, status.Errorf(
codes.NotFound,
"failed to handle delete volume request for {%s}, "+
"validation failed checking for snapshots. Error: %s",
req.VolumeId,
err.Error(),
)
}

// Delete the corresponding ZV CR only if there are no snapshots present for the volume
if len(snapList.Items) == 0 {
err = zfs.DeleteVolume(volumeID)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to handle delete volume request for {%s}",
volumeID,
)
}
} else {
// add annotation to the volume to indicate that it is eligible for deletion
// once all the snapshots are deleted and the reclaim policy is not Retain
// this volume will be deleted
err = zfs.MarkForDeletion(volumeID)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to annotate volume on deletion request for {%s}",
volumeID,
)
}

}

sendEventOrIgnore("", volumeID, vol.Spec.Capacity, analytics.VolumeDeprovision)

deleteResponse:
return csipayload.NewDeleteVolumeResponseBuilder().Build(), nil
}

Expand Down Expand Up @@ -815,8 +841,36 @@ func (cs *controller) DeleteSnapshot(
// should succeed when an invalid snapshot id is used
return &csi.DeleteSnapshotResponse{}, nil
}
volumeID := snapshotID[0]
unlock := cs.volumeLock.LockVolumeWithSnapshot(snapshotID[0], snapshotID[1])
defer unlock()

// verify if the snapshot has already been deleted
_, err := zfs.GetZFSSnapshot(snapshotID[1])

if err != nil {
if k8serror.IsNotFound(err) {
return csipayload.NewDeleteSnapshotResponseBuilder().Build(), nil
}
return nil, errors.Wrapf(
err,
"failed to get snapshot for {%s}",
snapshotID[1],
)
}

// Fetch the list of snapshot for the given volume
snapList, err := zfs.GetSnapshotForVolume(volumeID)
if err != nil {
return nil, status.Errorf(
codes.FailedPrecondition,
"failed to handle delete snapshot request for {%s}, "+
"validation failed checking for snapshot list for volume. Error: %s",
volumeID,
err.Error(),
)
}

if err := zfs.DeleteSnapshot(snapshotID[1]); err != nil {
return nil, status.Errorf(
codes.Internal,
Expand All @@ -825,7 +879,34 @@ func (cs *controller) DeleteSnapshot(
err.Error(),
)
}
return &csi.DeleteSnapshotResponse{}, nil

eligibleForDeletion, err := zfs.IsVolumeEligibleForDeletion(volumeID)
if err != nil {
return nil, status.Errorf(
codes.FailedPrecondition,
"failed to handle delete snapshot request for {%s}, "+
"validation failed checking for eligible for deletion. Error: %s",
volumeID,
err.Error(),
)
}

klog.Infof(" The snap list size %v and eligibleForDeletion %v", len(snapList.Items), eligibleForDeletion)
// Delete the corresponding ZV CR only if this is the last snapshot
// for the volume and the corresponding pvc is deleted
if len(snapList.Items) == 1 && eligibleForDeletion {
err = zfs.DeleteVolume(volumeID)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to handle delete volume request for {%s}",
volumeID,
)
}
klog.Infof("volume %s deleted after the deletion of last snapshot %s ", volumeID, snapshotID[1])
}

return csipayload.NewDeleteSnapshotResponseBuilder().Build(), nil
}

// ListSnapshots lists all snapshots for the
Expand Down
20 changes: 20 additions & 0 deletions pkg/response/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,23 @@ func NewDeleteVolumeResponseBuilder() *DeleteVolumeResponseBuilder {
func (b *DeleteVolumeResponseBuilder) Build() *csi.DeleteVolumeResponse {
return b.response
}

// DeleteSnapshotResponseBuilder helps building an
// instance of csi DeleteSnapshotResponse
type DeleteSnapshotResponseBuilder struct {
response *csi.DeleteSnapshotResponse
}

// NewDeleteSnapshotResponseBuilder returns a new
// instance of DeleteSnapshotResponseBuilder
func NewDeleteSnapshotResponseBuilder() *DeleteSnapshotResponseBuilder {
return &DeleteSnapshotResponseBuilder{
response: &csi.DeleteSnapshotResponse{},
}
}

// Build returns the constructed instance
// of csi DeleteSnapshotResponse
func (b *DeleteSnapshotResponseBuilder) Build() *csi.DeleteSnapshotResponse {
return b.response
}
64 changes: 57 additions & 7 deletions pkg/zfs/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/openebs/zfs-localpv/pkg/builder/restorebuilder"
"github.com/openebs/zfs-localpv/pkg/builder/snapbuilder"
"github.com/openebs/zfs-localpv/pkg/builder/volbuilder"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -213,13 +215,6 @@ func DeleteSnapshot(snapname string) (err error) {
return
}

// GetVolume the corresponding ZFSVolume CR
func GetVolume(volumeID string) (*apis.ZFSVolume, error) {
return volbuilder.NewKubeclient().
WithNamespace(OpenEBSNamespace).
Get(volumeID, metav1.GetOptions{})
}

// DeleteVolume deletes the corresponding ZFSVol CR
func DeleteVolume(volumeID string) (err error) {
err = volbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).Delete(volumeID)
Expand Down Expand Up @@ -251,6 +246,18 @@ func GetZFSVolume(volumeID string) (*apis.ZFSVolume, error) {
return vol, err
}

// UpdateZFSVolumeAnnotation updtates the ZFSVolume CR with the given annotation
func UpdateZFSVolumeAnnotation(vol *apis.ZFSVolume) error {
newVol, err := volbuilder.BuildFrom(vol).
WithAnnotation().Build()

if err != nil {
return err
}
_, err = volbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).Update(newVol)
return err
}

// GetZFSVolumeState returns ZFSVolume OwnerNode and State for
// the given volume. CreateVolume request may call it again and
// again until volume is "Ready".
Expand Down Expand Up @@ -441,3 +448,46 @@ func IsVolumeReady(vol *apis.ZFSVolume) bool {

return false
}

// GetSnapshotForVolume fetches all the snapshots for the given volume
func GetSnapshotForVolume(volumeID string) (*apis.ZFSSnapshotList, error) {
listOptions := metav1.ListOptions{
LabelSelector: ZFSVolKey + "=" + volumeID,
}
snapList, err := snapbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).List(listOptions)
return snapList, err
}

// MarkForDeletion marks the volume for deletion by adding the annotation
func MarkForDeletion(volumeName string) error {
zv, err := GetZFSVolume(volumeName)
if err != nil {
klog.Errorf("failed to get ZV %s: %v", volumeName, err)
return err
}

err = UpdateZFSVolumeAnnotation(zv)
if err != nil {
klog.Errorf("Failed to annotate the ZV with marked for deletion %s: %v", volumeName, err)
return err
}
return nil
}

// IsVolumeEligibleForDeletion checks if the volume can be deleted or not
func IsVolumeEligibleForDeletion(volumeName string) (bool, error) {

zfsVol, err := GetZFSVolume(volumeName)
if err != nil {
return false, status.Errorf(
codes.Internal,
"failed to get ZFSVolume %s: %v",
volumeName,
err,
)
}
if zfsVol.Annotations[volbuilder.MarkForDeletionAnnotation] == "true" {
return true, nil
}
return false, nil
}
Loading
Loading