Skip to content

Commit 389d076

Browse files
committed
Add FG for restarting external-snapshotter faster by releasing leader election lease on sigterm
1 parent 7935bd4 commit 389d076

File tree

5 files changed

+183
-40
lines changed

5 files changed

+183
-40
lines changed

cmd/csi-snapshotter/main.go

Lines changed: 55 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@ import (
2424
"os"
2525
"os/signal"
2626
"strings"
27+
"sync"
2728
"time"
2829

2930
"google.golang.org/grpc"
3031

3132
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3233
"k8s.io/apimachinery/pkg/labels"
3334
"k8s.io/apimachinery/pkg/runtime"
35+
server "k8s.io/apiserver/pkg/server"
3436
utilfeature "k8s.io/apiserver/pkg/util/feature"
3537
coreinformers "k8s.io/client-go/informers"
3638
"k8s.io/client-go/kubernetes"
@@ -301,19 +303,55 @@ func main() {
301303
workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax),
302304
)
303305

304-
run := func(context.Context) {
305-
// run...
306-
stopCh := make(chan struct{})
307-
snapshotContentfactory.Start(stopCh)
308-
factory.Start(stopCh)
309-
coreFactory.Start(stopCh)
310-
go ctrl.Run(*threads, stopCh)
311-
312-
// ...until SIGINT
313-
c := make(chan os.Signal, 1)
314-
signal.Notify(c, os.Interrupt)
315-
<-c
316-
close(stopCh)
306+
// handle SIGTERM and SIGINT by cancelling the context.
307+
var (
308+
terminate func() // called when all controllers are finished
309+
controllerCtx context.Context // shuts down all controllers on a signal
310+
shutdownHandler <-chan struct{} // called when the signal is received
311+
)
312+
313+
if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) {
314+
// ctx waits for all controllers to finish, then shuts down the whole process, incl. leader election
315+
ctx, terminate = context.WithCancel(ctx)
316+
var cancelControllerCtx context.CancelFunc
317+
controllerCtx, cancelControllerCtx = context.WithCancel(ctx)
318+
shutdownHandler = server.SetupSignalHandler()
319+
320+
defer terminate()
321+
322+
go func() {
323+
defer cancelControllerCtx()
324+
<-shutdownHandler
325+
klog.Info("Received SIGTERM or SIGINT signal, shutting down controller.")
326+
}()
327+
}
328+
329+
run := func(ctx context.Context) {
330+
if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) {
331+
// run...
332+
stopCh := controllerCtx.Done()
333+
snapshotContentfactory.Start(stopCh)
334+
factory.Start(stopCh)
335+
coreFactory.Start(stopCh)
336+
var controllerWg sync.WaitGroup
337+
go ctrl.Run(*threads, stopCh, &controllerWg)
338+
<-shutdownHandler
339+
controllerWg.Wait()
340+
terminate()
341+
} else {
342+
// run...
343+
stopCh := make(chan struct{})
344+
snapshotContentfactory.Start(stopCh)
345+
factory.Start(stopCh)
346+
coreFactory.Start(stopCh)
347+
go ctrl.Run(*threads, stopCh, nil)
348+
349+
// ...until SIGINT
350+
c := make(chan os.Signal, 1)
351+
signal.Notify(c, os.Interrupt)
352+
<-c
353+
close(stopCh)
354+
}
317355
}
318356

319357
if !*leaderElection {
@@ -338,6 +376,10 @@ func main() {
338376
le.WithLeaseDuration(*leaderElectionLeaseDuration)
339377
le.WithRenewDeadline(*leaderElectionRenewDeadline)
340378
le.WithRetryPeriod(*leaderElectionRetryPeriod)
379+
if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) {
380+
le.WithReleaseOnCancel(true)
381+
le.WithContext(ctx)
382+
}
341383

342384
if err := le.Run(); err != nil {
343385
klog.Fatalf("failed to initialize leader election: %v", err)

cmd/snapshot-controller/main.go

Lines changed: 57 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4040
"k8s.io/apimachinery/pkg/runtime"
4141
"k8s.io/apimachinery/pkg/util/wait"
42+
server "k8s.io/apiserver/pkg/server"
4243

4344
klog "k8s.io/klog/v2"
4445

@@ -259,18 +260,55 @@ func main() {
259260
os.Exit(1)
260261
}
261262

262-
run := func(context.Context) {
263-
// run...
264-
stopCh := make(chan struct{})
265-
factory.Start(stopCh)
266-
coreFactory.Start(stopCh)
267-
go ctrl.Run(*threads, stopCh)
268-
269-
// ...until SIGINT
270-
c := make(chan os.Signal, 1)
271-
signal.Notify(c, os.Interrupt)
272-
<-c
273-
close(stopCh)
263+
ctx := context.Background()
264+
265+
// handle SIGTERM and SIGINT by cancelling the context.
266+
var (
267+
terminate func() // called when all controllers are finished
268+
controllerCtx context.Context // shuts down all controllers on a signal
269+
shutdownHandler <-chan struct{} // called when the signal is received
270+
)
271+
272+
if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) {
273+
// ctx waits for all controllers to finish, then shuts down the whole process, incl. leader election
274+
ctx, terminate = context.WithCancel(ctx)
275+
var cancelControllerCtx context.CancelFunc
276+
controllerCtx, cancelControllerCtx = context.WithCancel(ctx)
277+
shutdownHandler = server.SetupSignalHandler()
278+
279+
defer terminate()
280+
281+
go func() {
282+
defer cancelControllerCtx()
283+
<-shutdownHandler
284+
klog.Info("Received SIGTERM or SIGINT signal, shutting down controller.")
285+
}()
286+
}
287+
288+
run := func(ctx context.Context) {
289+
if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) {
290+
// run...
291+
stopCh := controllerCtx.Done()
292+
factory.Start(stopCh)
293+
coreFactory.Start(stopCh)
294+
var controllerWg sync.WaitGroup
295+
go ctrl.Run(*threads, stopCh, &controllerWg)
296+
<-shutdownHandler
297+
controllerWg.Wait()
298+
terminate()
299+
} else {
300+
// run...
301+
stopCh := make(chan struct{})
302+
factory.Start(stopCh)
303+
coreFactory.Start(stopCh)
304+
go ctrl.Run(*threads, stopCh, nil)
305+
306+
// ...until SIGINT
307+
c := make(chan os.Signal, 1)
308+
signal.Notify(c, os.Interrupt)
309+
<-c
310+
close(stopCh)
311+
}
274312
}
275313

276314
// start listening & serving http endpoint if set
@@ -289,7 +327,7 @@ func main() {
289327
klog.Infof("Metrics http server successfully started on %s, %s", *httpEndpoint, *metricsPath)
290328

291329
defer func() {
292-
err := srv.Shutdown(context.Background())
330+
err := srv.Shutdown(ctx)
293331
if err != nil {
294332
klog.Errorf("Failed to shutdown metrics server: %s", err.Error())
295333
}
@@ -300,7 +338,7 @@ func main() {
300338
}
301339

302340
if !*leaderElection {
303-
run(context.TODO())
341+
run(ctx)
304342
} else {
305343
lockName := "snapshot-controller-leader"
306344
// Create a new clientset for leader election to prevent throttling
@@ -320,6 +358,11 @@ func main() {
320358
le.WithLeaseDuration(*leaderElectionLeaseDuration)
321359
le.WithRenewDeadline(*leaderElectionRenewDeadline)
322360
le.WithRetryPeriod(*leaderElectionRetryPeriod)
361+
if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) {
362+
le.WithReleaseOnCancel(true)
363+
le.WithContext(ctx)
364+
}
365+
323366
if err := le.Run(); err != nil {
324367
klog.Fatalf("failed to initialize leader election: %v", err)
325368
}

pkg/common-controller/snapshot_controller_base.go

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package common_controller
1919
import (
2020
"context"
2121
"fmt"
22+
"sync"
2223
"time"
2324

2425
crdv1beta2 "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumegroupsnapshot/v1beta2"
@@ -28,13 +29,15 @@ import (
2829
snapshotinformers "github.com/kubernetes-csi/external-snapshotter/client/v8/informers/externalversions/volumesnapshot/v1"
2930
groupsnapshotlisters "github.com/kubernetes-csi/external-snapshotter/client/v8/listers/volumegroupsnapshot/v1beta2"
3031
snapshotlisters "github.com/kubernetes-csi/external-snapshotter/client/v8/listers/volumesnapshot/v1"
32+
"github.com/kubernetes-csi/external-snapshotter/v8/pkg/features"
3133
"github.com/kubernetes-csi/external-snapshotter/v8/pkg/metrics"
3234
"github.com/kubernetes-csi/external-snapshotter/v8/pkg/utils"
3335

3436
v1 "k8s.io/api/core/v1"
3537
"k8s.io/apimachinery/pkg/api/errors"
3638
"k8s.io/apimachinery/pkg/labels"
3739
"k8s.io/apimachinery/pkg/util/wait"
40+
utilfeature "k8s.io/apiserver/pkg/util/feature"
3841
coreinformers "k8s.io/client-go/informers/core/v1"
3942
"k8s.io/client-go/kubernetes"
4043
"k8s.io/client-go/kubernetes/scheme"
@@ -244,7 +247,7 @@ func NewCSISnapshotCommonController(
244247
return ctrl
245248
}
246249

247-
func (ctrl *csiSnapshotCommonController) Run(workers int, stopCh <-chan struct{}) {
250+
func (ctrl *csiSnapshotCommonController) Run(workers int, stopCh <-chan struct{}, wg *sync.WaitGroup) {
248251
defer ctrl.snapshotQueue.ShutDown()
249252
defer ctrl.contentQueue.ShutDown()
250253
if ctrl.enableVolumeGroupSnapshots {
@@ -276,12 +279,40 @@ func (ctrl *csiSnapshotCommonController) Run(workers int, stopCh <-chan struct{}
276279

277280
ctrl.initializeCaches()
278281

279-
for i := 0; i < workers; i++ {
280-
go wait.Until(ctrl.snapshotWorker, 0, stopCh)
281-
go wait.Until(ctrl.contentWorker, 0, stopCh)
282-
if ctrl.enableVolumeGroupSnapshots {
283-
go wait.Until(ctrl.groupSnapshotWorker, 0, stopCh)
284-
go wait.Until(ctrl.groupSnapshotContentWorker, 0, stopCh)
282+
if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) {
283+
for i := 0; i < workers; i++ {
284+
wg.Add(1)
285+
go func() {
286+
defer wg.Done()
287+
wait.Until(ctrl.snapshotWorker, 0, stopCh)
288+
}()
289+
wg.Add(1)
290+
go func() {
291+
defer wg.Done()
292+
wait.Until(ctrl.contentWorker, 0, stopCh)
293+
}()
294+
295+
if ctrl.enableVolumeGroupSnapshots {
296+
wg.Add(1)
297+
go func() {
298+
defer wg.Done()
299+
wait.Until(ctrl.groupSnapshotWorker, 0, stopCh)
300+
}()
301+
wg.Add(1)
302+
go func() {
303+
defer wg.Done()
304+
wait.Until(ctrl.groupSnapshotContentWorker, 0, stopCh)
305+
}()
306+
}
307+
}
308+
} else {
309+
for i := 0; i < workers; i++ {
310+
go wait.Until(ctrl.snapshotWorker, 0, stopCh)
311+
go wait.Until(ctrl.contentWorker, 0, stopCh)
312+
if ctrl.enableVolumeGroupSnapshots {
313+
go wait.Until(ctrl.groupSnapshotWorker, 0, stopCh)
314+
go wait.Until(ctrl.groupSnapshotContentWorker, 0, stopCh)
315+
}
285316
}
286317
}
287318

pkg/features/features.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ import (
2424
const (
2525
// Enable usage of volume group snapshot
2626
VolumeGroupSnapshot featuregate.Feature = "CSIVolumeGroupSnapshot"
27+
28+
// owner: @rhrmo
29+
// alpha: v1.34
30+
//
31+
// Releases leader election lease on sigterm / sigint.
32+
ReleaseLeaderElectionOnExit featuregate.Feature = "ReleaseLeaderElectionOnExit"
2733
)
2834

2935
func init() {
@@ -33,5 +39,6 @@ func init() {
3339
// defaultKubernetesFeatureGates consists of all known feature keys specific to external-snapshotter.
3440
// To add a new feature, define a key for it above and add it here.
3541
var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
36-
VolumeGroupSnapshot: {Default: false, PreRelease: featuregate.Beta},
42+
VolumeGroupSnapshot: {Default: false, PreRelease: featuregate.Beta},
43+
ReleaseLeaderElectionOnExit: {Default: false, PreRelease: featuregate.Alpha},
3744
}

pkg/sidecar-controller/snapshot_controller_base.go

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@ package sidecar_controller
1818

1919
import (
2020
"fmt"
21+
"sync"
2122
"time"
2223

24+
"github.com/kubernetes-csi/external-snapshotter/v8/pkg/features"
2325
"github.com/kubernetes-csi/external-snapshotter/v8/pkg/group_snapshotter"
2426

2527
v1 "k8s.io/api/core/v1"
2628
"k8s.io/apimachinery/pkg/api/errors"
2729
"k8s.io/apimachinery/pkg/labels"
2830
"k8s.io/apimachinery/pkg/util/wait"
31+
utilfeature "k8s.io/apiserver/pkg/util/feature"
2932
"k8s.io/client-go/kubernetes"
3033
"k8s.io/client-go/kubernetes/scheme"
3134
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
@@ -167,7 +170,7 @@ func NewCSISnapshotSideCarController(
167170
return ctrl
168171
}
169172

170-
func (ctrl *csiSnapshotSideCarController) Run(workers int, stopCh <-chan struct{}) {
173+
func (ctrl *csiSnapshotSideCarController) Run(workers int, stopCh <-chan struct{}, wg *sync.WaitGroup) {
171174
defer ctrl.contentQueue.ShutDown()
172175

173176
klog.Infof("Starting CSI snapshotter")
@@ -185,10 +188,27 @@ func (ctrl *csiSnapshotSideCarController) Run(workers int, stopCh <-chan struct{
185188

186189
ctrl.initializeCaches()
187190

188-
for i := 0; i < workers; i++ {
189-
go wait.Until(ctrl.contentWorker, 0, stopCh)
190-
if ctrl.enableVolumeGroupSnapshots {
191-
go wait.Until(ctrl.groupSnapshotContentWorker, 0, stopCh)
191+
if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) {
192+
for i := 0; i < workers; i++ {
193+
wg.Add(1)
194+
go func() {
195+
defer wg.Done()
196+
wait.Until(ctrl.contentWorker, 0, stopCh)
197+
}()
198+
if ctrl.enableVolumeGroupSnapshots {
199+
wg.Add(1)
200+
go func() {
201+
defer wg.Done()
202+
wait.Until(ctrl.groupSnapshotContentWorker, 0, stopCh)
203+
}()
204+
}
205+
}
206+
} else {
207+
for i := 0; i < workers; i++ {
208+
go wait.Until(ctrl.contentWorker, 0, stopCh)
209+
if ctrl.enableVolumeGroupSnapshots {
210+
go wait.Until(ctrl.groupSnapshotContentWorker, 0, stopCh)
211+
}
192212
}
193213
}
194214

0 commit comments

Comments
 (0)