-
Notifications
You must be signed in to change notification settings - Fork 276
Add new env var for sqsMsgVisibilityTimeoutSec #1220
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -117,6 +117,9 @@ const ( | |
| // heartbeat | ||
| heartbeatIntervalKey = "HEARTBEAT_INTERVAL" | ||
| heartbeatUntilKey = "HEARTBEAT_UNTIL" | ||
| // sqs monitor | ||
| sqsMsgVisibilityTimeoutSecConfigKey = "SQS_MSG_VISIBILITY_TIMEOUT_SEC" | ||
| sqsMsgVisibilityTimeoutSecDefault = 20 | ||
| ) | ||
|
|
||
| // Config arguments set via CLI, environment variables, or defaults | ||
|
|
@@ -174,6 +177,7 @@ type Config struct { | |
| UseAPIServerCacheToListPods bool | ||
| HeartbeatInterval int | ||
| HeartbeatUntil int | ||
| SqsMsgVisibilityTimeoutSec int | ||
| } | ||
|
|
||
| // ParseCliArgs parses cli arguments and uses environment variables as fallback values | ||
|
|
@@ -241,6 +245,7 @@ func ParseCliArgs() (config Config, err error) { | |
| flag.BoolVar(&config.UseAPIServerCacheToListPods, "use-apiserver-cache", getBoolEnv(useAPIServerCache, false), "If true, leverage the k8s apiserver's index on pod's spec.nodeName to list pods on a node, instead of doing an etcd quorum read.") | ||
| flag.IntVar(&config.HeartbeatInterval, "heartbeat-interval", getIntEnv(heartbeatIntervalKey, -1), "The time period in seconds between consecutive heartbeat signals. Valid range: 30-3600 seconds (30 seconds to 1 hour).") | ||
| flag.IntVar(&config.HeartbeatUntil, "heartbeat-until", getIntEnv(heartbeatUntilKey, -1), "The duration in seconds over which heartbeat signals are sent. Valid range: 60-172800 seconds (1 minute to 48 hours).") | ||
| flag.IntVar(&config.SqsMsgVisibilityTimeoutSec, "sqs-msg-visibility-timeout-sec", getIntEnv(sqsMsgVisibilityTimeoutSecConfigKey, sqsMsgVisibilityTimeoutSecDefault), "Duration in seconds that a message is hidden from other consumers after being retrieved from the SQS queue by sqs-monitor.") | ||
|
||
| flag.Parse() | ||
|
|
||
| if isConfigProvided("pod-termination-grace-period", podTerminationGracePeriodConfigKey) && isConfigProvided("grace-period", gracePeriodConfigKey) { | ||
|
|
@@ -367,6 +372,7 @@ func (c Config) PrintJsonConfigArgs() { | |
| Bool("use_apiserver_cache", c.UseAPIServerCacheToListPods). | ||
| Int("heartbeat_interval", c.HeartbeatInterval). | ||
| Int("heartbeat_until", c.HeartbeatUntil). | ||
| Int("sqs_msg_visibility_timeout_sec", c.SqsMsgVisibilityTimeoutSec). | ||
| Msg("aws-node-termination-handler arguments") | ||
| } | ||
|
|
||
|
|
@@ -421,7 +427,8 @@ func (c Config) PrintHumanConfigArgs() { | |
| "\taws-endpoint: %s,\n"+ | ||
| "\tuse-apiserver-cache: %t,\n"+ | ||
| "\theartbeat-interval: %d,\n"+ | ||
| "\theartbeat-until: %d\n", | ||
| "\theartbeat-until: %d\n"+ | ||
| "\tsqs-msg-visibility-timeout-sec: %d\n", | ||
| c.DryRun, | ||
| c.NodeName, | ||
| c.PodName, | ||
|
|
@@ -465,6 +472,7 @@ func (c Config) PrintHumanConfigArgs() { | |
| c.UseAPIServerCacheToListPods, | ||
| c.HeartbeatInterval, | ||
| c.HeartbeatUntil, | ||
| c.SqsMsgVisibilityTimeoutSec, | ||
| ) | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -54,6 +54,7 @@ type SQSMonitor struct { | |
| CheckIfManaged bool | ||
| ManagedTag string | ||
| BeforeCompleteLifecycleAction func() | ||
| SqsMsgVisibilityTimeoutSec int | ||
| } | ||
|
|
||
| // InterruptionEventWrapper is a convenience wrapper for associating an interruption event with its error, if any | ||
|
|
@@ -294,6 +295,11 @@ func (m SQSMonitor) processInterruptionEvents(interruptionEventWrappers []Interr | |
|
|
||
| // receiveQueueMessages checks the configured SQS queue for new messages | ||
| func (m SQSMonitor) receiveQueueMessages(qURL string) ([]*sqs.Message, error) { | ||
| visibilityTimeout := m.SqsMsgVisibilityTimeoutSec | ||
| if visibilityTimeout <= 0 { | ||
| visibilityTimeout = 20 | ||
|
||
| } | ||
|
|
||
| result, err := m.SQS.ReceiveMessage(&sqs.ReceiveMessageInput{ | ||
| AttributeNames: []*string{ | ||
| aws.String(sqs.MessageSystemAttributeNameSentTimestamp), | ||
|
|
@@ -303,7 +309,7 @@ func (m SQSMonitor) receiveQueueMessages(qURL string) ([]*sqs.Message, error) { | |
| }, | ||
| QueueUrl: &qURL, | ||
| MaxNumberOfMessages: aws.Int64(10), | ||
| VisibilityTimeout: aws.Int64(20), // 20 seconds | ||
| VisibilityTimeout: aws.Int64(int64(visibilityTimeout)), | ||
| WaitTimeSeconds: aws.Int64(20), // Max long polling | ||
| }) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update comment here as well after including the range
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.