diff --git a/.rubocop.yml b/.rubocop.yml index 7a2a9333..22202618 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -48,3 +48,5 @@ Style/AccessorGrouping: EnforcedStyle: separated Style/NegatedIfElseCondition: Enabled: false +Style/Semicolon: + Exclude: ["test/**/*.rb"] diff --git a/README.md b/README.md index 4be477ad..aa9d6831 100644 --- a/README.md +++ b/README.md @@ -812,6 +812,20 @@ to be substituted. Note that for a required parameter that does not provide a ge client.process_template template ``` +### Informer + +A list that is always updated because it is it kept in sync by a watch in the background. +Can also share a list+watch with multiple threads. + +```ruby +client = Kubeclient::Client.new('http://localhost:8080/api/', 'v1') +informer = Kubeclient::Informer.new(client, "pods", reconcile_timeout: 15 * 60, logger: Logger.new(STDOUT)) +informer.start_worker + +informer.list # all current pods +informer.watch { |notice| } # watch for changes (hides restarts and errors) +``` + ## Contributing 1. Fork it ( https://github.com/[my-github-username]/kubeclient/fork ) diff --git a/lib/kubeclient.rb b/lib/kubeclient.rb index 15f3db19..a2c92f40 100644 --- a/lib/kubeclient.rb +++ b/lib/kubeclient.rb @@ -9,6 +9,7 @@ require_relative 'kubeclient/exec_credentials' require_relative 'kubeclient/gcp_auth_provider' require_relative 'kubeclient/http_error' +require_relative 'kubeclient/informer' require_relative 'kubeclient/missing_kind_compatibility' require_relative 'kubeclient/oidc_auth_provider' require_relative 'kubeclient/resource' diff --git a/lib/kubeclient/informer.rb b/lib/kubeclient/informer.rb new file mode 100644 index 00000000..b94c0b9c --- /dev/null +++ b/lib/kubeclient/informer.rb @@ -0,0 +1,93 @@ +module Kubeclient + # caches results for multiple consumers to share and keeps them updated with a watch + class Informer + def initialize(client, resource_name, reconcile_timeout: 15 * 60, logger: nil) + @client = client + @resource_name = resource_name + @reconcile_timeout = reconcile_timeout + @logger = logger + @cache = nil + @started = nil + @watching = [] + end + + def list + @cache.values + end + + def watch(&block) + with_watching(&block) + end + + # not implicit so users know they have to `stop` + def start_worker + @worker = Thread.new do + loop do + fill_cache + watch_to_update_cache + rescue StandardError => e + # need to keep retrying since we work in the background + @logger&.error("ignoring error during background work #{e}") + ensure + sleep(1) # do not overwhelm the api-server if we are somehow broken + end + end + sleep(0.01) until @cache + end + + def stop_worker + @worker&.kill # TODO: be nicer ? + end + + private + + def with_watching + queue = Queue.new + @watching << queue + loop do + x = queue.pop + yield(x) + end + ensure + @watching.delete(queue) + end + + def cache_key(resource) + resource.dig(:metadata, :uid) + end + + def fill_cache + reply = @client.get_entities(nil, @resource_name, raw: true, resource_version: '0') + @cache = reply[:items].each_with_object({}) do |item, h| + h[cache_key(item)] = item + end + @started = reply.dig(:metadata, :resourceVersion) + end + + def watch_to_update_cache + watcher = @client.watch_entities(@resource_name, watch: true, resource_version: @started) + stop_reason = 'disconnect' + + # stop watcher without using timeout + Thread.new do + sleep(@reconcile_timeout) + stop_reason = 'reconcile' + watcher.finish + end + + watcher.each do |notice| + case notice[:type] + when 'ADDED', 'MODIFIED' then @cache[cache_key(notice[:object])] = notice[:object] + when 'DELETED' then @cache.delete(cache_key(notice[:object])) + when 'ERROR' + stop_reason = 'error' + break + else + @logger&.error("Unsupported event type #{notice[:type]}") + end + @watching.each { |q| q << notice } + end + @logger&.info("watch restarted: #{stop_reason}") + end + end +end diff --git a/test/test_informer.rb b/test/test_informer.rb new file mode 100644 index 00000000..e03f32e0 --- /dev/null +++ b/test/test_informer.rb @@ -0,0 +1,163 @@ +# frozen_string_literal: true + +require_relative 'test_helper' + +# tests with_retries in common.rb +class RetryTest < MiniTest::Test + def setup + super + @slept = [] + stub_core_api_list + end + + # prevent leftover threads from causing trouble + def teardown + (Thread.list - [Thread.current]).each(&:kill) + super + end + + def test_lists_at_start + list = stub_list + watch = stub_request(:get, %r{/v1/watch/pods}).to_return(body: '', status: 200) + with_worker do + assert_equal(['a'], informer.list.map { |p| p.metadata.name }) + end + assert_requested(list, times: 1) + assert_requested(watch, times: 1) + end + + def test_watches_for_updates + lock = Mutex.new + lock.lock + list = stub_list + watch = stub_request(:get, %r{/v1/watch/pods}).with { lock.lock }.to_return( + body: { + type: 'MODIFIED', object: { metadata: { name: 'b', uid: 'id1' } } + }.to_json << "\n", + status: 200 + ) + + with_worker do + assert_equal(['a'], informer.list.map { |p| p.metadata.name }) + lock.unlock # trigger watch + sleep(0.02) # wait for watch to finish + assert_equal(['b'], informer.list.map { |p| p.metadata.name }) + end + + assert_requested(list, times: 1) + assert_requested(watch, times: 1) + end + + def test_watches_for_add + stub_list + stub_request(:get, %r{/v1/watch/pods}).to_return( + body: { + type: 'ADDED', object: { metadata: { name: 'b', uid: 'id2' } } + }.to_json << "\n", + status: 200 + ) + + with_worker do + assert_equal(['a', 'b'], informer.list.map { |p| p.metadata.name }) + end + end + + def test_watches_for_delete + stub_list + stub_request(:get, %r{/v1/watch/pods}).to_return( + body: { + type: 'DELETED', object: { metadata: { name: 'b', uid: 'id1' } } + }.to_json << "\n", + status: 200 + ) + + with_worker do + assert_equal([], informer.list.map { |p| p.metadata.name }) + end + end + + def test_restarts_on_error + list = stub_list + watch = stub_request(:get, %r{/v1/watch/pods}).to_return( + body: { type: 'ERROR' }.to_json << "\n", + status: 200 + ) + slept = [] + informer.stubs(:sleep).with { |x| slept << x; sleep(0.01) } + + with_worker do + assert_equal(['a'], informer.list.map { |p| p.metadata.name }) + sleep(0.05) + end + + assert slept.size >= 2, slept + assert_requested(list, at_least_times: 2) + assert_requested(watch, at_least_times: 2) + end + + def test_can_watch_watches + skip if RUBY_ENGINE == 'truffleruby' # TODO: some race condition in truffle-ruby + + list = stub_list + watch = stub_request(:get, %r{/v1/watch/pods}).to_return( + body: { + type: 'ADDED', object: { metadata: { name: 'b', uid: 'id2' } } + }.to_json << "\n", + status: 200 + ) + + seen1 = [] + seen2 = [] + seeer1 = Thread.new { informer.watch { |n| seen1 << n; break } } + seeer2 = Thread.new { informer.watch { |n| seen2 << n; break } } + sleep(0.01) until informer.instance_variable_get(:@watching).size == 2 + + with_worker do + assert_equal([['ADDED'], ['ADDED']], [seen1.map(&:type), seen2.map(&:type)]) + end + + assert_requested(list, times: 1) + assert_requested(watch, times: 1) + ensure + seeer1&.kill + seeer2&.kill + end + + def test_timeout + timeout = 0.1 + informer.instance_variable_set(:@reconcile_timeout, timeout) + stub_list + Kubeclient::Common::WatchStream.any_instance.expects(:finish) + stub_request(:get, %r{/v1/watch/pods}) + with_worker { sleep(timeout * 1.9) } + end + + private + + def with_worker + informer.start_worker + sleep(0.03) # wait for worker to watch + yield + ensure + informer.stop_worker + end + + def stub_list + stub_request(:get, %r{/v1/pods}).to_return(body: pods_reply.to_json, status: 200) + end + + def client + @client ||= Kubeclient::Client.new('http://localhost:8080/api/', 'v1') + end + + def informer + @informer ||= Kubeclient::Informer.new(client, 'pods') + end + + def pods_reply + @pods_reply ||= { + metadata: { resourceVersion: 1 }, + items: [{ metadata: { name: 'a', uid: 'id1' } }] + } + end +end