Skip to content

Commit

Permalink
feat(*): add graceful shutdown (apache#1470)
Browse files Browse the repository at this point in the history
* feat(*): add graceful shutdown

* feat(*): merge destroyProviderProtocols and destroyConsumerProtocols

Co-authored-by: dongjianhui03 <[email protected]>
  • Loading branch information
Mulavar and dongjianhui03 authored Sep 24, 2021
1 parent 526ea1a commit 0697950
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 157 deletions.
39 changes: 20 additions & 19 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,26 @@ const (

// Filter Keys
const (
AccessLogFilterKey = "accesslog"
ActiveFilterKey = "active"
AuthConsumerFilterKey = "sign"
AuthProviderFilterKey = "auth"
EchoFilterKey = "echo"
ExecuteLimitFilterKey = "execute"
GenericFilterKey = "generic"
GenericServiceFilterKey = "generic_service"
GracefulShutdownProviderFilterKey = "pshutdown"
GracefulShutdownConsumerFilterKey = "cshutdown"
HystrixConsumerFilterKey = "hystrix_consumer"
HystrixProviderFilterKey = "hystrix_provider"
MetricsFilterKey = "metrics"
SeataFilterKey = "seata"
SentinelProviderFilterKey = "sentinel-provider"
SentinelConsumerFilterKey = "sentinel-consumer"
TokenFilterKey = "token"
TpsLimitFilterKey = "tps"
TracingFilterKey = "tracing"
AccessLogFilterKey = "accesslog"
ActiveFilterKey = "active"
AuthConsumerFilterKey = "sign"
AuthProviderFilterKey = "auth"
EchoFilterKey = "echo"
ExecuteLimitFilterKey = "execute"
GenericFilterKey = "generic"
GenericServiceFilterKey = "generic_service"
GracefulShutdownProviderFilterKey = "pshutdown"
GracefulShutdownConsumerFilterKey = "cshutdown"
GracefulShutdownFilterShutdownConfig = "GracefulShutdownFilterShutdownConfig"
HystrixConsumerFilterKey = "hystrix_consumer"
HystrixProviderFilterKey = "hystrix_provider"
MetricsFilterKey = "metrics"
SeataFilterKey = "seata"
SentinelProviderFilterKey = "sentinel-provider"
SentinelConsumerFilterKey = "sentinel-consumer"
TokenFilterKey = "token"
TpsLimitFilterKey = "tps"
TracingFilterKey = "tracing"
)

const (
Expand Down
4 changes: 0 additions & 4 deletions config/config_setter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

package config

const (
GracefulShutdownFilterShutdownConfig = "GracefulShutdownFilterShutdownConfig"
)

type Setter interface {
Set(name string, config interface{})
}
234 changes: 102 additions & 132 deletions config/graceful_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package config

import (
"os"
"os/signal"
"runtime/debug"
"time"
)

Expand Down Expand Up @@ -51,61 +54,49 @@ import (
const defaultShutDownTime = time.Second * 60

// nolint
//func GracefulShutdownInit() {
// signals := make(chan os.Signal, 1)
//
// signal.Notify(signals, ShutdownSignals...)
//
// // retrieve ShutdownConfig for gracefulShutdownFilter
// if filter, ok := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey).(config.Setter); ok && config.GetConsumerConfig().ShutdownConfig != nil {
// filter.Set(config.GracefulShutdownFilterShutdownConfig, config.GetConsumerConfig().ShutdownConfig)
// }
// if filter, ok := extension.GetFilter(constant.GracefulShutdownProviderFilterKey).(config.Setter); ok && config.GetProviderConfig().ShutdownConfig != nil {
// filter.Set(config.GracefulShutdownFilterShutdownConfig, config.GetProviderConfig().ShutdownConfig)
// }
//
// go func() {
// select {
// case sig := <-signals:
// logger.Infof("get signal %s, applicationConfig will shutdown.", sig)
// // gracefulShutdownOnce.Do(func() {
// time.AfterFunc(totalTimeout(), func() {
// logger.Warn("Shutdown gracefully timeout, applicationConfig will shutdown immediately. ")
// os.Exit(0)
// })
// BeforeShutdown()
// // those signals' original behavior is exit with dump ths stack, so we try to keep the behavior
// for _, dumpSignal := range DumpHeapShutdownSignals {
// if sig == dumpSignal {
// debug.WriteHeapDump(os.Stdout.Fd())
// }
// }
// os.Exit(0)
// }
// }()
//}
func GracefulShutdownInit() {
signals := make(chan os.Signal, 1)

signal.Notify(signals, ShutdownSignals...)

// retrieve ShutdownConfig for gracefulShutdownFilter
if filter, ok := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey).(Setter); ok && rootConfig.Shutdown != nil {
filter.Set(constant.GracefulShutdownFilterShutdownConfig, rootConfig.Shutdown)
}

go func() {
select {
case sig := <-signals:
logger.Infof("get signal %s, applicationConfig will shutdown.", sig)
// gracefulShutdownOnce.Do(func() {
time.AfterFunc(totalTimeout(), func() {
logger.Warn("Shutdown gracefully timeout, applicationConfig will shutdown immediately. ")
os.Exit(0)
})
BeforeShutdown()
// those signals' original behavior is exit with dump ths stack, so we try to keep the behavior
for _, dumpSignal := range DumpHeapShutdownSignals {
if sig == dumpSignal {
debug.WriteHeapDump(os.Stdout.Fd())
}
}
os.Exit(0)
}
}()
}

// BeforeShutdown provides processing flow before shutdown
func BeforeShutdown() {
destroyAllRegistries()
// waiting for a short time so that the clients have enough time to get the notification that server shutdowns
// The value of configuration depends on how long the clients will get notification.
//waitAndAcceptNewRequests()

// reject the new request, but keeping waiting for accepting requests
//waitForReceivingRequests()

// we fetch the protocols from Consumer.References. Consumer.ProtocolConfig doesn't contains all protocol, like jsonrpc
//consumerProtocols := getConsumerProtocols()

// If this applicationConfig is not the provider, it will do nothing
//destroyProviderProtocols(consumerProtocols)
waitAndAcceptNewRequests()

// reject sending the new request, and waiting for response of sending requests
//waitForSendingRequests()
// reject sending/receiving the new request, but keeping waiting for accepting requests
waitForSendingAndReceivingRequests()

// If this applicationConfig is not the consumer, it will do nothing
//destroyConsumerProtocols(consumerProtocols)
// destroy all protocols
destroyProtocols()

logger.Info("Graceful shutdown --- Execute the custom callbacks.")
customCallbacks := extension.GetAllCustomShutdownCallbacks()
Expand All @@ -120,68 +111,56 @@ func destroyAllRegistries() {
registryProtocol.Destroy()
}

func destroyConsumerProtocols(consumerProtocols *gxset.HashSet) {
logger.Info("Graceful shutdown --- Destroy consumer's protocols. ")
// destroyProtocols destroys protocols.
// First we destroy provider's protocols, and then we destroy the consumer protocols.
func destroyProtocols() {
logger.Info("Graceful shutdown --- Destroy protocols. ")
logger.Info("Graceful shutdown --- First destroy provider's protocols. ")

consumerProtocols := getConsumerProtocols()
if rootConfig.Protocols == nil {
return
}

for _, protocol := range rootConfig.Protocols {
// the protocol is the consumer's protocol too, we can not destroy it.
if consumerProtocols.Contains(protocol.Name) {
continue
}
extension.GetProtocol(protocol.Name).Destroy()
}

logger.Info("Graceful shutdown --- Second destroy consumer's protocols. ")
for name := range consumerProtocols.Items {
extension.GetProtocol(name.(string)).Destroy()
}
}

// destroyProviderProtocols destroys the provider's protocol.
// if the protocol is consumer's protocol too, we will keep it
//func destroyProviderProtocols(consumerProtocols *gxset.HashSet) {
// logger.Info("Graceful shutdown --- Destroy provider's protocols. ")
//
// if config.providerConfig == nil || config.providerConfig.Protocols == nil {
// return
// }
//
// for _, protocol := range config.providerConfig.Protocols {
//
// // the protocol is the consumer's protocol too, we can not destroy it.
// if consumerProtocols.Contains(protocol.Name) {
// continue
// }
// extension.GetProtocol(protocol.Name).Destroy()
// }
//}

//func waitAndAcceptNewRequests() {
// logger.Info("Graceful shutdown --- Keep waiting and accept new requests for a short time. ")
// if config.providerConfig == nil || config.providerConfig.ShutdownConfig == nil {
// return
// }
//
// timeout := config.providerConfig.ShutdownConfig.GetStepTimeout()
//
// // ignore this step
// if timeout < 0 {
// return
// }
// time.Sleep(timeout)
//}

// for provider. It will wait for processing receiving requests
//func waitForReceivingRequests() {
// logger.Info("Graceful shutdown --- Keep waiting until accepting requests finish or timeout. ")
// if config.providerConfig == nil || config.providerConfig.ShutdownConfig == nil {
// // ignore this step
// return
// }
// config.providerConfig.ShutdownConfig.RejectRequest = true
// waitingProcessedTimeout(config.providerConfig.ShutdownConfig)
//}

// for consumer. It will wait for the response of sending requests
//func waitForSendingRequests() {
// logger.Info("Graceful shutdown --- Keep waiting until sending requests getting response or timeout ")
// if config.consumerConfig == nil || config.consumerConfig.ShutdownConfig == nil {
// // ignore this step
// return
// }
// config.consumerConfig.ShutdownConfig.RejectRequest = true
// waitingProcessedTimeout(config.consumerConfig.ShutdownConfig)
//}
func waitAndAcceptNewRequests() {
logger.Info("Graceful shutdown --- Keep waiting and accept new requests for a short time. ")
if rootConfig.Shutdown == nil {
return
}

timeout := rootConfig.Shutdown.GetStepTimeout()

// ignore this step
if timeout < 0 {
return
}
time.Sleep(timeout)
}

//for provider. It will wait for processing receiving requests
func waitForSendingAndReceivingRequests() {
logger.Info("Graceful shutdown --- Keep waiting until sending/accepting requests finish or timeout. ")
if rootConfig == nil || rootConfig.Shutdown == nil {
// ignore this step
return
}
rootConfig.Shutdown.RejectRequest = true
waitingProcessedTimeout(rootConfig.Shutdown)
}

func waitingProcessedTimeout(shutdownConfig *ShutdownConfig) {
timeout := shutdownConfig.GetStepTimeout()
Expand All @@ -196,33 +175,24 @@ func waitingProcessedTimeout(shutdownConfig *ShutdownConfig) {
}
}

//func totalTimeout() time.Duration {
// providerShutdown := defaultShutDownTime
// if config.providerConfig != nil && config.providerConfig.ShutdownConfig != nil {
// providerShutdown = config.providerConfig.ShutdownConfig.GetTimeout()
// }
//
// var consumerShutdown time.Duration
// if config.consumerConfig != nil && config.consumerConfig.ShutdownConfig != nil {
// consumerShutdown = config.consumerConfig.ShutdownConfig.GetTimeout()
// }
//
// timeout := providerShutdown
// if consumerShutdown > providerShutdown {
// timeout = consumerShutdown
// }
// return timeout
//}
func totalTimeout() time.Duration {
timeout := defaultShutDownTime
if rootConfig.Shutdown != nil && rootConfig.Shutdown.GetTimeout() > timeout {
timeout = rootConfig.Shutdown.GetTimeout()
}

return timeout
}

// we can not get the protocols from consumerConfig because some protocol don't have configuration, like jsonrpc.
//func getConsumerProtocols() *gxset.HashSet {
// result := gxset.NewSet()
// if config.consumerConfig == nil || config.consumerConfig.References == nil {
// return result
// }
//
// for _, reference := range config.consumerConfig.References {
// result.Add(reference.Protocol)
// }
// return result
//}
func getConsumerProtocols() *gxset.HashSet {
result := gxset.NewSet()
if rootConfig.Consumer == nil || rootConfig.Consumer.References == nil {
return result
}

for _, reference := range rootConfig.Consumer.References {
result.Add(reference.Protocol)
}
return result
}
4 changes: 2 additions & 2 deletions filter/gshutdown/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func (f *Filter) OnResponse(ctx context.Context, result protocol.Result, invoker

func (f *Filter) Set(name string, conf interface{}) {
switch name {
case config.GracefulShutdownFilterShutdownConfig:
case constant.GracefulShutdownFilterShutdownConfig:
if shutdownConfig, ok := conf.(*config.ShutdownConfig); !ok {
f.shutdownConfig = shutdownConfig
return
}
logger.Warnf("the type of config for {%s} should be *config.ShutdownConfig", config.GracefulShutdownFilterShutdownConfig)
logger.Warnf("the type of config for {%s} should be *config.ShutdownConfig", constant.GracefulShutdownFilterShutdownConfig)
default:
// do nothing
}
Expand Down

0 comments on commit 0697950

Please sign in to comment.