diff --git a/app/models/manageiq/providers/kubernetes/container_manager/streaming_refresh_mixin.rb b/app/models/manageiq/providers/kubernetes/container_manager/streaming_refresh_mixin.rb index b4b559a084..a4652378de 100644 --- a/app/models/manageiq/providers/kubernetes/container_manager/streaming_refresh_mixin.rb +++ b/app/models/manageiq/providers/kubernetes/container_manager/streaming_refresh_mixin.rb @@ -38,6 +38,7 @@ def do_work_streaming_refresh full_refresh start_watch_threads else + ensure_watch_threads targeted_refresh end end @@ -88,12 +89,22 @@ def start_watch_threads _log.info("#{log_header} Starting watch threads...") entity_types.each do |entity_type| - watch_threads[entity_type] = Thread.new { watch_thread(entity_type) } + watch_threads[entity_type] = start_watch_thread(entity_type) end _log.info("#{log_header} Starting watch threads...Complete") end + def ensure_watch_threads + entity_types.each do |entity_type| + next if watch_threads[entity_type].alive? + + _log.info("#{log_header} Restarting #{entity_type} watch thread") + + watch_threads[entity_type] = start_watch_thread(entity_type) + end + end + def stop_watch_threads safe_log("#{log_header} Stopping watch threads...") @@ -103,14 +114,26 @@ def stop_watch_threads safe_log("#{log_header} Stopping watch threads...Complete") end + def start_watch_thread(entity_type) + Thread.new { watch_thread(entity_type) } + end + def watch_thread(entity_type) _log.info("#{log_header} #{entity_type} watch thread started") resource_version = resource_versions[entity_type] || "0" watch_stream = start_watch(entity_type, resource_version) - until finish.value - watch_stream.each { |notice| queue.push(notice) } + until finished? + watch_stream.each do |notice| + # Update the collection resourceVersion to be the most recent + # object's resourceVersion so that if this watch has to be restarted + # it will pick up where it left off. + resource_version = notice.object.metadata.resourceVersion + resource_versions[entity_type] = resource_version + + queue.push(notice) + end end _log.info("#{log_header} #{entity_type} watch thread exiting") @@ -123,6 +146,10 @@ def start_watch(entity_type, resource_version = "0") connection_for_entity(entity_type).send(watch_method, :resource_version => resource_version) end + def finished? + finish.value + end + def connection_for_entity(_entity_type) kubernetes_connection end