Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ def set_up_namespace_thread
# the configuration.
namespace_watcher = start_namespace_watch
Thread.current[:namespace_watch_retry_backoff_interval] = @watch_retry_interval
Thread.current[:namespace_watch_retry_count] = 0

# Any failures / exceptions in the followup watcher notice
# processing will be swallowed and retried. These failures /
Expand Down Expand Up @@ -96,11 +95,19 @@ def get_namespaces_and_start_watcher
watcher
end

# Reset namespace watch retry count and backoff interval as there is a
# successful watch notice.
def reset_namespace_watch_retry_stats
Thread.current[:namespace_watch_retry_count] = 0
Thread.current[:namespace_watch_retry_backoff_interval] = @watch_retry_interval
end

# Process a watcher notice and potentially raise an exception.
def process_namespace_watcher_notices(watcher)
watcher.each do |notice|
case notice.type
when 'MODIFIED'
reset_namespace_watch_retry_stats
cache_key = notice.object['metadata']['uid']
cached = @namespace_cache[cache_key]
if cached
Expand All @@ -110,13 +117,16 @@ def process_namespace_watcher_notices(watcher)
@stats.bump(:namespace_cache_watch_misses)
end
when 'DELETED'
reset_namespace_watch_retry_stats
# ignore and let age out for cases where
# deleted but still processing logs
@stats.bump(:namespace_cache_watch_deletes_ignored)
when 'ERROR'
@stats.bump(:namespace_watch_error_type_notices)
message = notice['object']['message'] if notice['object'] && notice['object']['message']
raise "Error while watching namespaces: #{message}"
else
reset_namespace_watch_retry_stats
# Don't pay attention to creations, since the created namespace may not
# be used by any namespace on this node.
@stats.bump(:namespace_cache_watch_ignored)
Expand Down
14 changes: 11 additions & 3 deletions lib/fluent/plugin/kubernetes_metadata_watch_pods.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,19 @@ def get_pods_and_start_watcher
watcher
end

# Reset pod watch retry count and backoff interval as there is a
# successful watch notice.
def reset_pod_watch_retry_stats
Thread.current[:pod_watch_retry_count] = 0
Thread.current[:pod_watch_retry_backoff_interval] = @watch_retry_interval
end

# Process a watcher notice and potentially raise an exception.
def process_pod_watcher_notices(watcher)
watcher.each do |notice|
case notice.type
when 'MODIFIED'
Thread.current[:pod_watch_retry_count] = 0
reset_pod_watch_retry_stats
cache_key = notice.object['metadata']['uid']
cached = @cache[cache_key]
if cached
Expand All @@ -116,15 +123,16 @@ def process_pod_watcher_notices(watcher)
@stats.bump(:pod_cache_watch_misses)
end
when 'DELETED'
Thread.current[:pod_watch_retry_count] = 0
reset_pod_watch_retry_stats
# ignore and let age out for cases where pods
# deleted but still processing logs
@stats.bump(:pod_cache_watch_delete_ignored)
when 'ERROR'
@stats.bump(:pod_watch_error_type_notices)
message = notice['object']['message'] if notice['object'] && notice['object']['message']
raise "Error while watching pods: #{message}"
else
Thread.current[:pod_watch_retry_count] = 0
reset_pod_watch_retry_stats
# Don't pay attention to creations, since the created pod may not
# end up on this node.
@stats.bump(:pod_cache_watch_ignored)
Expand Down
39 changes: 39 additions & 0 deletions test/plugin/test_watch_namespaces.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ class WatchNamespacesTestTest < WatchTest
}
}
)
@error = OpenStruct.new(
type: 'ERROR',
object: {
'message' => 'some error message'
}
)
end

test 'namespace list caches namespaces' do
Expand Down Expand Up @@ -137,6 +143,39 @@ class WatchNamespacesTestTest < WatchTest
assert_equal(3, @stats[:namespace_watch_failures])
assert_equal(2, Thread.current[:namespace_watch_retry_count])
assert_equal(4, Thread.current[:namespace_watch_retry_backoff_interval])
assert_nil(@stats[:namespace_watch_error_type_notices])
end
end
end

test 'namespace watch retries when error is received' do
@client.stub :get_namespaces, @initial do
@client.stub :watch_namespaces, [@error] do
assert_raise Fluent::UnrecoverableError do
set_up_namespace_thread
end
assert_equal(3, @stats[:namespace_watch_failures])
assert_equal(2, Thread.current[:namespace_watch_retry_count])
assert_equal(4, Thread.current[:namespace_watch_retry_backoff_interval])
assert_equal(3, @stats[:namespace_watch_error_type_notices])
end
end
end

test 'namespace watch continues after retries succeed' do
@client.stub :get_namespaces, @initial do
@client.stub :watch_namespaces, [@modified, @error, @modified] do
# Force the infinite watch loop to exit after 3 seconds. Verifies that
# no unrecoverable error was thrown during this period of time.
assert_raise Timeout::Error.new('execution expired') do
Timeout.timeout(3) do
set_up_namespace_thread
end
end
assert_operator(@stats[:namespace_watch_failures], :>=, 3)
assert_operator(Thread.current[:namespace_watch_retry_count], :<=, 1)
assert_operator(Thread.current[:namespace_watch_retry_backoff_interval], :<=, 1)
assert_operator(@stats[:namespace_watch_error_type_notices], :>=, 3)
end
end
end
Expand Down
39 changes: 39 additions & 0 deletions test/plugin/test_watch_pods.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ class DefaultPodWatchStrategyTest < WatchTest
}
}
)
@error = OpenStruct.new(
type: 'ERROR',
object: {
'message' => 'some error message'
}
)
end

test 'pod list caches pods' do
Expand Down Expand Up @@ -219,6 +225,39 @@ class DefaultPodWatchStrategyTest < WatchTest
assert_equal(3, @stats[:pod_watch_failures])
assert_equal(2, Thread.current[:pod_watch_retry_count])
assert_equal(4, Thread.current[:pod_watch_retry_backoff_interval])
assert_nil(@stats[:pod_watch_error_type_notices])
end
end
end

test 'pod watch retries when error is received' do
@client.stub :get_pods, @initial do
@client.stub :watch_pods, [@error] do
assert_raise Fluent::UnrecoverableError do
set_up_pod_thread
end
assert_equal(3, @stats[:pod_watch_failures])
assert_equal(2, Thread.current[:pod_watch_retry_count])
assert_equal(4, Thread.current[:pod_watch_retry_backoff_interval])
assert_equal(3, @stats[:pod_watch_error_type_notices])
end
end
end

test 'pod watch continues after retries succeed' do
@client.stub :get_pods, @initial do
@client.stub :watch_pods, [@modified, @error, @modified] do
# Force the infinite watch loop to exit after 3 seconds. Verifies that
# no unrecoverable error was thrown during this period of time.
assert_raise Timeout::Error.new('execution expired') do
Timeout.timeout(3) do
set_up_pod_thread
end
end
assert_operator(@stats[:pod_watch_failures], :>=, 3)
assert_operator(Thread.current[:pod_watch_retry_count], :<=, 1)
assert_operator(Thread.current[:pod_watch_retry_backoff_interval], :<=, 1)
assert_operator(@stats[:pod_watch_error_type_notices], :>=, 3)
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions test/plugin/watch_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def thread_current_running?
@watch_retry_exponential_backoff_base = 2
@cache = {}
@stats = KubernetesMetadata::Stats.new
Thread.current[:pod_watch_retry_count] = 0
Thread.current[:namespace_watch_retry_count] = 0

@client = OpenStruct.new
def @client.resourceVersion
Expand Down