diff --git a/lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb b/lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb index 5959392..c4b24a3 100644 --- a/lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb +++ b/lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb @@ -29,7 +29,6 @@ def set_up_namespace_thread # the configuration. namespace_watcher = start_namespace_watch Thread.current[:namespace_watch_retry_backoff_interval] = @watch_retry_interval - Thread.current[:namespace_watch_retry_count] = 0 # Any failures / exceptions in the followup watcher notice # processing will be swallowed and retried. These failures / @@ -96,11 +95,19 @@ def get_namespaces_and_start_watcher watcher end + # Reset namespace watch retry count and backoff interval as there is a + # successful watch notice. + def reset_namespace_watch_retry_stats + Thread.current[:namespace_watch_retry_count] = 0 + Thread.current[:namespace_watch_retry_backoff_interval] = @watch_retry_interval + end + # Process a watcher notice and potentially raise an exception. def process_namespace_watcher_notices(watcher) watcher.each do |notice| case notice.type when 'MODIFIED' + reset_namespace_watch_retry_stats cache_key = notice.object['metadata']['uid'] cached = @namespace_cache[cache_key] if cached @@ -110,13 +117,16 @@ def process_namespace_watcher_notices(watcher) @stats.bump(:namespace_cache_watch_misses) end when 'DELETED' + reset_namespace_watch_retry_stats # ignore and let age out for cases where # deleted but still processing logs @stats.bump(:namespace_cache_watch_deletes_ignored) when 'ERROR' + @stats.bump(:namespace_watch_error_type_notices) message = notice['object']['message'] if notice['object'] && notice['object']['message'] raise "Error while watching namespaces: #{message}" else + reset_namespace_watch_retry_stats # Don't pay attention to creations, since the created namespace may not # be used by any namespace on this node. @stats.bump(:namespace_cache_watch_ignored) diff --git a/lib/fluent/plugin/kubernetes_metadata_watch_pods.rb b/lib/fluent/plugin/kubernetes_metadata_watch_pods.rb index 0767484..f9dd64a 100644 --- a/lib/fluent/plugin/kubernetes_metadata_watch_pods.rb +++ b/lib/fluent/plugin/kubernetes_metadata_watch_pods.rb @@ -98,12 +98,19 @@ def get_pods_and_start_watcher watcher end + # Reset pod watch retry count and backoff interval as there is a + # successful watch notice. + def reset_pod_watch_retry_stats + Thread.current[:pod_watch_retry_count] = 0 + Thread.current[:pod_watch_retry_backoff_interval] = @watch_retry_interval + end + # Process a watcher notice and potentially raise an exception. def process_pod_watcher_notices(watcher) watcher.each do |notice| case notice.type when 'MODIFIED' - Thread.current[:pod_watch_retry_count] = 0 + reset_pod_watch_retry_stats cache_key = notice.object['metadata']['uid'] cached = @cache[cache_key] if cached @@ -116,15 +123,16 @@ def process_pod_watcher_notices(watcher) @stats.bump(:pod_cache_watch_misses) end when 'DELETED' - Thread.current[:pod_watch_retry_count] = 0 + reset_pod_watch_retry_stats # ignore and let age out for cases where pods # deleted but still processing logs @stats.bump(:pod_cache_watch_delete_ignored) when 'ERROR' + @stats.bump(:pod_watch_error_type_notices) message = notice['object']['message'] if notice['object'] && notice['object']['message'] raise "Error while watching pods: #{message}" else - Thread.current[:pod_watch_retry_count] = 0 + reset_pod_watch_retry_stats # Don't pay attention to creations, since the created pod may not # end up on this node. @stats.bump(:pod_cache_watch_ignored) diff --git a/test/plugin/test_watch_namespaces.rb b/test/plugin/test_watch_namespaces.rb index 67521f5..70c1f0d 100644 --- a/test/plugin/test_watch_namespaces.rb +++ b/test/plugin/test_watch_namespaces.rb @@ -70,6 +70,12 @@ class WatchNamespacesTestTest < WatchTest } } ) + @error = OpenStruct.new( + type: 'ERROR', + object: { + 'message' => 'some error message' + } + ) end test 'namespace list caches namespaces' do @@ -137,6 +143,39 @@ class WatchNamespacesTestTest < WatchTest assert_equal(3, @stats[:namespace_watch_failures]) assert_equal(2, Thread.current[:namespace_watch_retry_count]) assert_equal(4, Thread.current[:namespace_watch_retry_backoff_interval]) + assert_nil(@stats[:namespace_watch_error_type_notices]) + end + end + end + + test 'namespace watch retries when error is received' do + @client.stub :get_namespaces, @initial do + @client.stub :watch_namespaces, [@error] do + assert_raise Fluent::UnrecoverableError do + set_up_namespace_thread + end + assert_equal(3, @stats[:namespace_watch_failures]) + assert_equal(2, Thread.current[:namespace_watch_retry_count]) + assert_equal(4, Thread.current[:namespace_watch_retry_backoff_interval]) + assert_equal(3, @stats[:namespace_watch_error_type_notices]) + end + end + end + + test 'namespace watch continues after retries succeed' do + @client.stub :get_namespaces, @initial do + @client.stub :watch_namespaces, [@modified, @error, @modified] do + # Force the infinite watch loop to exit after 3 seconds. Verifies that + # no unrecoverable error was thrown during this period of time. + assert_raise Timeout::Error.new('execution expired') do + Timeout.timeout(3) do + set_up_namespace_thread + end + end + assert_operator(@stats[:namespace_watch_failures], :>=, 3) + assert_operator(Thread.current[:namespace_watch_retry_count], :<=, 1) + assert_operator(Thread.current[:namespace_watch_retry_backoff_interval], :<=, 1) + assert_operator(@stats[:namespace_watch_error_type_notices], :>=, 3) end end end diff --git a/test/plugin/test_watch_pods.rb b/test/plugin/test_watch_pods.rb index 3d1fccf..e375e1b 100644 --- a/test/plugin/test_watch_pods.rb +++ b/test/plugin/test_watch_pods.rb @@ -136,6 +136,12 @@ class DefaultPodWatchStrategyTest < WatchTest } } ) + @error = OpenStruct.new( + type: 'ERROR', + object: { + 'message' => 'some error message' + } + ) end test 'pod list caches pods' do @@ -219,6 +225,39 @@ class DefaultPodWatchStrategyTest < WatchTest assert_equal(3, @stats[:pod_watch_failures]) assert_equal(2, Thread.current[:pod_watch_retry_count]) assert_equal(4, Thread.current[:pod_watch_retry_backoff_interval]) + assert_nil(@stats[:pod_watch_error_type_notices]) + end + end + end + + test 'pod watch retries when error is received' do + @client.stub :get_pods, @initial do + @client.stub :watch_pods, [@error] do + assert_raise Fluent::UnrecoverableError do + set_up_pod_thread + end + assert_equal(3, @stats[:pod_watch_failures]) + assert_equal(2, Thread.current[:pod_watch_retry_count]) + assert_equal(4, Thread.current[:pod_watch_retry_backoff_interval]) + assert_equal(3, @stats[:pod_watch_error_type_notices]) + end + end + end + + test 'pod watch continues after retries succeed' do + @client.stub :get_pods, @initial do + @client.stub :watch_pods, [@modified, @error, @modified] do + # Force the infinite watch loop to exit after 3 seconds. Verifies that + # no unrecoverable error was thrown during this period of time. + assert_raise Timeout::Error.new('execution expired') do + Timeout.timeout(3) do + set_up_pod_thread + end + end + assert_operator(@stats[:pod_watch_failures], :>=, 3) + assert_operator(Thread.current[:pod_watch_retry_count], :<=, 1) + assert_operator(Thread.current[:pod_watch_retry_backoff_interval], :<=, 1) + assert_operator(@stats[:pod_watch_error_type_notices], :>=, 3) end end end diff --git a/test/plugin/watch_test.rb b/test/plugin/watch_test.rb index 58eaf5a..53a36bb 100644 --- a/test/plugin/watch_test.rb +++ b/test/plugin/watch_test.rb @@ -33,6 +33,8 @@ def thread_current_running? @watch_retry_exponential_backoff_base = 2 @cache = {} @stats = KubernetesMetadata::Stats.new + Thread.current[:pod_watch_retry_count] = 0 + Thread.current[:namespace_watch_retry_count] = 0 @client = OpenStruct.new def @client.resourceVersion