From b64d8b0cd3b3c668b41681cd93deb96d57a1ea36 Mon Sep 17 00:00:00 2001 From: Tertsonen Toni Date: Thu, 29 Jun 2023 15:32:48 +0300 Subject: [PATCH 1/3] Feat: add pagination on-behalf-of: @insta-advance toni.tertsonen@insta.fi --- .gitignore | 2 +- docs/index.asciidoc | 125 ++++++-- lib/logstash/inputs/http_poller.rb | 217 +++++++++++-- .../inputs/http_poller/state_handler.rb | 138 ++++++++ logstash-input-http_poller.gemspec | 1 + spec/inputs/http_poller_spec.rb | 299 ++++++++++++++++++ 6 files changed, 730 insertions(+), 52 deletions(-) create mode 100644 lib/logstash/inputs/http_poller/state_handler.rb diff --git a/.gitignore b/.gitignore index 543ae90..9670395 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,4 @@ Gemfile.lock vendor *~ .idea -*.log \ No newline at end of file +*.log diff --git a/docs/index.asciidoc b/docs/index.asciidoc index f78ff8c..9151454 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -163,7 +163,7 @@ input plugins.   [id="plugins-{type}s-{plugin}-automatic_retries"] -===== `automatic_retries` +===== `automatic_retries` * Value type is <> * Default value is `1` @@ -173,7 +173,7 @@ to zero if keepalive is enabled. Some servers incorrectly end keepalives early r Note: if `retry_non_idempotent` is set only GET, HEAD, PUT, DELETE, OPTIONS, and TRACE requests will be retried. [id="plugins-{type}s-{plugin}-cacert"] -===== `cacert` +===== `cacert` * Value type is <> * There is no default value for this setting. @@ -181,7 +181,7 @@ Note: if `retry_non_idempotent` is set only GET, HEAD, PUT, DELETE, OPTIONS, and If you need to use a custom X.509 CA (.pem certs) specify the path to that here [id="plugins-{type}s-{plugin}-client_cert"] -===== `client_cert` +===== `client_cert` * Value type is <> * There is no default value for this setting. @@ -189,7 +189,7 @@ If you need to use a custom X.509 CA (.pem certs) specify the path to that here If you'd like to use a client certificate (note, most people don't want this) set the path to the x509 cert here [id="plugins-{type}s-{plugin}-client_key"] -===== `client_key` +===== `client_key` * Value type is <> * There is no default value for this setting. @@ -197,7 +197,7 @@ If you'd like to use a client certificate (note, most people don't want this) se If you're using a client certificate specify the path to the encryption key here [id="plugins-{type}s-{plugin}-connect_timeout"] -===== `connect_timeout` +===== `connect_timeout` * Value type is <> * Default value is `10` @@ -205,7 +205,7 @@ If you're using a client certificate specify the path to the encryption key here Timeout (in seconds) to wait for a connection to be established. Default is `10s` [id="plugins-{type}s-{plugin}-cookies"] -===== `cookies` +===== `cookies` * Value type is <> * Default value is `true` @@ -289,7 +289,7 @@ Example output: ---- [id="plugins-{type}s-{plugin}-follow_redirects"] -===== `follow_redirects` +===== `follow_redirects` * Value type is <> * Default value is `true` @@ -297,7 +297,7 @@ Example output: Should redirects be followed? Defaults to `true` [id="plugins-{type}s-{plugin}-keepalive"] -===== `keepalive` +===== `keepalive` * Value type is <> * Default value is `true` @@ -306,7 +306,7 @@ Turn this on to enable HTTP keepalive support. We highly recommend setting `auto one with this to fix interactions with broken keepalive implementations. [id="plugins-{type}s-{plugin}-keystore"] -===== `keystore` +===== `keystore` * Value type is <> * There is no default value for this setting. @@ -314,7 +314,7 @@ one with this to fix interactions with broken keepalive implementations. If you need to use a custom keystore (`.jks`) specify that here. This does not work with .pem keys! [id="plugins-{type}s-{plugin}-keystore_password"] -===== `keystore_password` +===== `keystore_password` * Value type is <> * There is no default value for this setting. @@ -323,7 +323,7 @@ Specify the keystore password here. Note, most .jks files created with keytool require a password! [id="plugins-{type}s-{plugin}-keystore_type"] -===== `keystore_type` +===== `keystore_type` * Value type is <> * Default value is `"JKS"` @@ -331,7 +331,7 @@ Note, most .jks files created with keytool require a password! Specify the keystore type here. One of `JKS` or `PKCS12`. Default is `JKS` [id="plugins-{type}s-{plugin}-metadata_target"] -===== `metadata_target` +===== `metadata_target` * Value type is <> * Default value is `"@metadata"` @@ -341,7 +341,7 @@ Set this value to the name of the field you'd like to store a nested hash of metadata. [id="plugins-{type}s-{plugin}-password"] -===== `password` +===== `password` * Value type is <> * There is no default value for this setting. @@ -349,7 +349,7 @@ hash of metadata. Password to be used in conjunction with <> for HTTP authentication. [id="plugins-{type}s-{plugin}-pool_max"] -===== `pool_max` +===== `pool_max` * Value type is <> * Default value is `50` @@ -357,7 +357,7 @@ Password to be used in conjunction with <> for HT Max number of concurrent connections. Defaults to `50` [id="plugins-{type}s-{plugin}-pool_max_per_route"] -===== `pool_max_per_route` +===== `pool_max_per_route` * Value type is <> * Default value is `25` @@ -365,7 +365,7 @@ Max number of concurrent connections. Defaults to `50` Max number of concurrent connections to a single host. Defaults to `25` [id="plugins-{type}s-{plugin}-proxy"] -===== `proxy` +===== `proxy` * Value type is <> * There is no default value for this setting. @@ -377,7 +377,7 @@ If you'd like to use an HTTP proxy . This supports multiple configuration syntax 3. Proxy host in form: `{url => 'http://proxy.org:1234', user => 'username@host', password => 'password'}` [id="plugins-{type}s-{plugin}-request_timeout"] -===== `request_timeout` +===== `request_timeout` * Value type is <> * Default value is `60` @@ -385,7 +385,7 @@ If you'd like to use an HTTP proxy . This supports multiple configuration syntax Timeout (in seconds) for the entire request. [id="plugins-{type}s-{plugin}-retry_non_idempotent"] -===== `retry_non_idempotent` +===== `retry_non_idempotent` * Value type is <> * Default value is `false` @@ -393,7 +393,7 @@ Timeout (in seconds) for the entire request. If `automatic_retries` is enabled this will cause non-idempotent HTTP verbs (such as POST) to be retried. [id="plugins-{type}s-{plugin}-schedule"] -===== `schedule` +===== `schedule` * Value type is <> * There is no default value for this setting. @@ -408,7 +408,7 @@ Examples: See: rufus/scheduler for details about different schedule options and value string format [id="plugins-{type}s-{plugin}-socket_timeout"] -===== `socket_timeout` +===== `socket_timeout` * Value type is <> * Default value is `10` @@ -449,7 +449,7 @@ It is primarily intended as a temporary diagnostic mechanism when attempting to Using `none` in production environments is strongly discouraged. [id="plugins-{type}s-{plugin}-target"] -===== `target` +===== `target` * Value type is <> * There is no default value for this setting. @@ -461,7 +461,7 @@ Example: `codec => json { target => "TARGET_FIELD_NAME" }` [id="plugins-{type}s-{plugin}-truststore"] -===== `truststore` +===== `truststore` * Value type is <> * There is no default value for this setting. @@ -469,7 +469,7 @@ Example: `codec => json { target => "TARGET_FIELD_NAME" }` If you need to use a custom truststore (`.jks`) specify that here. This does not work with .pem certs! [id="plugins-{type}s-{plugin}-truststore_password"] -===== `truststore_password` +===== `truststore_password` * Value type is <> * There is no default value for this setting. @@ -478,7 +478,7 @@ Specify the truststore password here. Note, most .jks files created with keytool require a password! [id="plugins-{type}s-{plugin}-truststore_type"] -===== `truststore_type` +===== `truststore_type` * Value type is <> * Default value is `"JKS"` @@ -486,7 +486,7 @@ Note, most .jks files created with keytool require a password! Specify the truststore type here. One of `JKS` or `PKCS12`. Default is `JKS` [id="plugins-{type}s-{plugin}-urls"] -===== `urls` +===== `urls` * This is a required setting. * Value type is <> @@ -501,13 +501,17 @@ The values in urls can be either: * a sub-hash containing many useful keys provided by the Manticore backend: ** url: the String url ** method: (optional) the HTTP method to use (defaults to GET) -** user: (optional) the HTTP Basic Auth user. The user must be under +** user: (optional) the HTTP Basic Auth user. The user must be under an auth sub-hash for Manticore, but this plugin also accepts it either way. -** password: (optional) the HTTP Basic Auth password. The password +** password: (optional) the HTTP Basic Auth password. The password must be under an auth sub-hash for Manticore, but this plugin accepts it either way. ** headers: a hash containing key-value pairs of headers. ** body: a string (supported only on POST and PUT requests) -** possibly other options mentioned in the +** pagination: (optional) a hash containing options for pagination handling +** failure_mode: (optional) a string specifying what to do on failure +** retry_delay: (optional) a number specifying the amount of seconds to wait to retry if failure_mode = retry +** success_status_codes: (optional) array of HTTP status codes (integers) to be considered as successful +** possibly other options mentioned in the https://www.rubydoc.info/github/cheald/manticore/Manticore/Client#http-instance_method[Manticore docs]. Note that Manticore options that are not explicitly documented above are not thoroughly tested and therefore liable to break in unexpected ways if we @@ -515,7 +519,7 @@ The values in urls can be either: *Notes:* -* Passwords specified as a part of `urls` are prone to exposure in plugin log output. +* Passwords specified as a part of `urls` are prone to exposure in plugin log output. The plugin does not declare them as passwords, and therefore doesn't wrap them in leak-reducing wrappers as we do elsewhere. * We don't guarantee that boolean-type options like Manticore's `follow_redirects` are supported @@ -525,7 +529,7 @@ string is "truthy." as anything other than true [id="plugins-{type}s-{plugin}-user"] -===== `user` +===== `user` * Value type is <> * There is no default value for this setting. @@ -534,7 +538,7 @@ Username to use with HTTP authentication for ALL requests. Note that you can als If you set this you must also set the <> option. [id="plugins-{type}s-{plugin}-validate_after_inactivity"] -===== `validate_after_inactivity` +===== `validate_after_inactivity` * Value type is <> * Default value is `200` @@ -550,6 +554,65 @@ being leased to the consumer. Non-positive value passed to this method disables connection validation. This check helps detect connections that have become stale (half-closed) while kept inactive in the pool." +[id="plugins-{type}s-{plugin}-failure_mode"] +===== `failure_mode` + * Value type is <> + * Default value is `continue` + +Specifies what should be done if requests fail. Request failures (server not responding etc.) are classfied as failures, also certain status codes can be set to trigger failure handling with the success_status_codes option. + +Allowed values: + +* continue: On failure, emit the event normally to the pipeline. +* retry: On failure, wait for the amount of seconds specified in retry_delay and try again. +* stop: On failure, stop the plugin. + +[id="plugins-{type}s-{plugin}-retry_delay"] +===== `retry_delay` + * Value type is <> + * There is no default value for this setting. + +The amount of time (in seconds) to wait if the request fails and failure_mode is set to `retry`. + +[id="plugins-{type}s-{plugin}-success_status_codes"] +===== `success_status_codes` + + * Value type is <> + * There is no default value for this setting. + +If specified, all requests with a response code not defined here will go to the failure handling defined with failure_mode. + +[id="plugins-{type}s-{plugin}-pagination"] +===== `pagination` + + * Value type is <> + * There is no default value for this setting. + +If pagination is set, the request is handled as a paginated request, creating separate events for each page. The current page is stored to a file to allow continuing in case the process gets closed. The state saving follows the at-least-once principle, so if Logstash gets shut down abnormally, some events might get queried twice. + +The following required values must be set to use pagination: + +* start_page: Number of page to start from +* end_page: Number of page to end at +* page_parameter: The name of the query parameter where to send the page to the server + +The following values can be optionally set: + +* concurrent_requests: Amount of requests processed by the HTTP client at the same time. Note that the amount of requests actually sent to the server concurrently also depends on the HTTP client's settings, set separately. The default value is 1. +* last_run_metadata_path: Path to a file that will be created to persist the current page, if the pipeline gets stopped for some reason. If not specified, the file will be created in the Logstash data directory in data/plugins/inputs/http_poller/state/state_. Note that if multiple urls with the same name are ran in pipelines with Logstash instances with the same data directory, last_run_metadata_path should be set to avoid multiple pipelines from overwriting the file. +* delete_last_run_metadata: Whether to delete the last run metadata file when all pages are queried, so the query starts from the first page on next run. Defaults to true. + +For example, with the values + +* start_page = 1 +* end_page = 2 +* page_parameter = page + +two requests will be sent to the server with these URLs: + +http://example.com/example?page=1 and +http://example.com/example?page=2 + [id="plugins-{type}s-{plugin}-common-options"] include::{include_path}/{type}.asciidoc[] diff --git a/lib/logstash/inputs/http_poller.rb b/lib/logstash/inputs/http_poller.rb index dc8466a..17a0780 100644 --- a/lib/logstash/inputs/http_poller.rb +++ b/lib/logstash/inputs/http_poller.rb @@ -9,6 +9,9 @@ require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter' require 'logstash/plugin_mixins/event_support/event_factory_adapter' require 'logstash/plugin_mixins/scheduler' +require 'logstash/inputs/http_poller/state_handler' +require 'yaml' +require 'java' class LogStash::Inputs::HTTP_Poller < LogStash::Inputs::Base include LogStash::PluginMixins::HttpClient @@ -49,14 +52,16 @@ class LogStash::Inputs::HTTP_Poller < LogStash::Inputs::Base public def register @host = Socket.gethostname.force_encoding(Encoding::UTF_8) - setup_ecs_field! setup_requests! + @state_handler = LogStash::Inputs::HTTPPoller::StateHandler.new(@logger, @requests) + @persist_in_progress_on_stop = false end # @overload def stop close_client + @state_handler.signal_waiting_threads end # @overload @@ -130,17 +135,17 @@ def normalize_request(url_or_spec) auth = spec[:auth] user = spec.delete(:user) || (auth && auth["user"]) password = spec.delete(:password) || (auth && auth["password"]) - + if user.nil? ^ password.nil? raise LogStash::ConfigurationError, "'user' and 'password' must both be specified for input HTTP poller!" end if user && password spec[:auth] = { - user: user, + user: user, pass: password, eager: true - } + } end res = [method, url, spec] else @@ -158,15 +163,35 @@ def validate_request!(url_or_spec, request) raise LogStash::ConfigurationError, "Invalid URL #{url}" unless URI::DEFAULT_PARSER.regexp[:ABS_URI].match(url) raise LogStash::ConfigurationError, "No URL provided for request! #{url_or_spec}" unless url - if spec && spec[:auth] - if !spec[:auth][:user] - raise LogStash::ConfigurationError, "Auth was specified, but 'user' was not!" + if spec + if spec[:auth] + if !spec[:auth][:user] + raise LogStash::ConfigurationError, "Auth was specified, but 'user' was not!" + end + if !spec[:auth][:pass] + raise LogStash::ConfigurationError, "Auth was specified, but 'password' was not!" + end + end + if spec[:pagination] + err_msg = "Pagination had a invalid value for concurrent_threads, start_page, end_page, page_parameter, last_run_metadata_path or delete_last_run_metadata!" + raise LogStash::ConfigurationError, err_msg if !spec[:pagination]["start_page"] || !spec[:pagination]["start_page"].is_a?(Integer) + raise LogStash::ConfigurationError, err_msg if !spec[:pagination]["end_page"] || !spec[:pagination]["end_page"].is_a?(Integer) + raise LogStash::ConfigurationError, err_msg if !spec[:pagination]["page_parameter"] || !spec[:pagination]["page_parameter"].is_a?(String) + raise LogStash::ConfigurationError, err_msg if spec[:pagination]["concurrent_threads"] && !spec[:pagination]["concurrent_threads"].is_a?(Integer) + raise LogStash::ConfigurationError, err_msg if spec[:pagination]["last_run_metadata_path"] && !spec[:pagination]["last_run_metadata_path"].is_a?(String) + raise LogStash::ConfigurationError, err_msg if spec[:pagination]["delete_last_run_metadata"] && !["true", "false"].include?(spec[:pagination]["delete_last_run_metadata"]) + end + if spec[:failure_mode] + raise LogStash::ConfigurationError, "Invalid value for failure_mode!" if !["retry", "stop", "continue"].include?(spec[:failure_mode]) + raise LogStash::ConfigurationError, "failure_mode was set to retry, but no retry_delay was specified!" if spec[:failure_mode] == "retry" && !spec[:retry_delay] end - if !spec[:auth][:pass] - raise LogStash::ConfigurationError, "Auth was specified, but 'password' was not!" + if spec[:retry_delay] && !(spec[:retry_delay].is_a?(Float) || spec[:retry_delay].is_a?(Integer)) + raise LogStash::ConfigurationError, "retry_delay should be a float or an integer" + end + if spec[:success_status_codes] && !spec[:success_status_codes].is_a?(Array) + raise LogStash::ConfigurationError, "success_status_codes should be an array of integers" end end - request end @@ -175,6 +200,7 @@ def run(queue) setup_schedule(queue) end + private def setup_schedule(queue) #schedule hash must contain exactly one of the allowed keys msg_invalid_schedule = "Invalid config. schedule hash must contain " + @@ -183,20 +209,70 @@ def setup_schedule(queue) schedule_type = @schedule.keys.first schedule_value = @schedule[schedule_type] raise LogStash::ConfigurationError, msg_invalid_schedule unless %w(cron every at in).include?(schedule_type) - opts = schedule_type == "every" ? { first_in: 0.01 } : {} scheduler.public_send(schedule_type, schedule_value, opts) { run_once(queue) } scheduler.join end + private + def handle_pagination(queue, name, request) + method = request[0] + url = request[1] + pagination = request[2][:pagination] + max_threads = !pagination["concurrent_requests"].nil? ? pagination["concurrent_requests"] : 1 + state_file = pagination["last_run_metadata_path"] + current_page, in_progress_pages = @state_handler.start_paginated_request(name, state_file, pagination["start_page"] - 1, 1) + in_progress_pages.each do |page| + create_paginated_request(queue, name, request, page, max_threads, pagination) + end + client.execute! + + while current_page.get < pagination["end_page"] do + break if stop? + current_page.getAndIncrement + @state_handler.add_page(name, current_page) + create_paginated_request(queue, name, request, current_page.get, max_threads, pagination) + if @state_handler.in_progress_pages.size >= max_threads + client.execute! + end + @state_handler.wait_for_change(name, max_threads - 1, self) + end + + client.execute! unless stop? + @state_handler.wait_for_change(name, 0, self) unless @persist_in_progress_on_stop + @state_handler.stop_pagination_state_writer + if stop? || pagination["delete_last_run_metadata"] == "false" + @state_handler.write_state(name, state_file, current_page) + @state_handler.stop_paginated_request() + return + end + @state_handler.delete_state(name, state_file) + @state_handler.stop_paginated_request() + end + + private + def create_paginated_request(queue, name, request, current_page, max_threads, pagination) + # These have to be cloned so different requests don't use the same instance + request = request.clone + request[2] = request[2].clone + query = request[2][:query].nil? ? {} : request[2][:query].clone + query[pagination["page_parameter"]] = current_page + request[2][:query] = query + request_bg(queue, name, request) + end + def run_once(queue) @requests.each do |name, request| # prevent executing a scheduler kick after the plugin has been stop-ed # this could easily happen as the scheduler shutdown is not immediate return if stop? - request_async(queue, name, request) + opts = request[2] + if !opts.nil? && opts.key?(:pagination) + handle_pagination(queue, name, request.clone) + else + request_async(queue, name, request) + end end - client.execute! unless stop? end @@ -207,7 +283,18 @@ def request_async(queue, name, request) method, *request_opts = request client.async.send(method, *request_opts). - on_success {|response| handle_success(queue, name, request, response, Time.now - started) }. + on_success {|response| handle_success(queue, name, request, response, Time.now - started)}. + on_failure {|exception| handle_failure(queue, name, request, exception, Time.now - started) } + end + + # Runs requests and event processing on multiple threads + private + def request_bg(queue, name, request) + started = Time.now + + method, *request_opts = request + client.async.background.send(method, *request_opts). + on_success {|response| handle_success(queue, name, request, response, Time.now - started)}. on_failure {|exception| handle_failure(queue, name, request, exception, Time.now - started) } end @@ -219,6 +306,7 @@ def to_nanoseconds(time_diff) private def handle_success(queue, name, request, response, execution_time) + failed = check_failure_state(queue, name, request, nil, response) @logger.debug? && @logger.debug("success fetching url", name: name, url: request) body = response.body # If there is a usable response. HEAD requests are `nil` and empty get @@ -226,12 +314,36 @@ def handle_success(queue, name, request, response, execution_time) if body && body.size > 0 decode_and_flush(@codec, body) do |decoded| event = @target ? targeted_event_factory.new_event(decoded.to_hash) : decoded + if failed + event.tag("_http_request_failure") + apply_failure_fields(event, name, request, nil, execution_time) + end handle_decoded_event(queue, name, request, response, event, execution_time) end else event = event_factory.new_event + if failed + event.tag("_http_request_failure") + apply_failure_fields(event, name, request, nil, execution_time) + end handle_decoded_event(queue, name, request, response, event, execution_time) end + rescue => e + @logger.error? && @logger.error("Cannot process event!", + :exception => e, + :exception_message => e.message, + :exception_backtrace => e.backtrace, + :name => name) + + # If we are running in debug mode we can display more information about the + # specific request which could give more details about the connection. + @logger.debug? && @logger.debug("Cannot process event", + :exception => e, + :exception_message => e.message, + :exception_backtrace => e.backtrace, + :name => name, + :url => request) + handle_failure(queue, name, request, e, execution_time) end private @@ -245,25 +357,86 @@ def handle_decoded_event(queue, name, request, response, event, execution_time) apply_metadata(event, name, request, response, execution_time) decorate(event) queue << event - rescue StandardError, java.lang.Exception => e - @logger.error? && @logger.error("Error eventifying response!", + rescue StandardError, java.lang.Exception => e + @logger.error? && @logger.error("Error eventifying response!", :exception => e, :exception_message => e.message, :name => name, :url => request, :response => response - ) + ) + ensure + @state_handler.delete_page(name, request) unless @persist_in_progress_on_stop || !request[2].nil? && !request[2][:retried].nil? + end + + # returns true if request failed + private + def check_failure_state(queue, name, request, exception, response) + req_opts = request[2] + if exception.nil? && ( + req_opts.nil? || + req_opts[:success_status_codes].nil? || + req_opts[:success_status_codes].include?(response.code)) + + req_opts.delete(:retried) if !req_opts.nil? + return false + end + if req_opts.nil? || + req_opts[:failure_mode].nil? || + req_opts[:failure_mode] == "continue" + + req_opts.delete(:retried) if !req_opts.nil? + return true + end + if @logger.debug? + @logger.debug("Encountered request failure: ", :name => name, :request => request) + if exception + logger.debug("Exception: ", :exception => exception, + :exception_message => exception.message, + :exception_backtrace => exception.backtrace) + end + if response + logger.debug("Response: ", :response => response) + end + end + if stop? + return true + end + case req_opts[:failure_mode] + when "retry" + @logger.warn? && @logger.warn("Encountered request failure with url '%s', trying again after %d seconds.." % [name, req_opts[:retry_delay]]) + begin + sleep req_opts[:retry_delay] + req_opts[:retried] = true + return true if stop? + if req_opts[:pagination] + request_bg(queue, name, request) + else + request_async(queue, name, request) + end + client.execute! + rescue Java::JavaLang::InterruptedException + # this is raised when the plugin is stopped + end + when "stop" + @persist_in_progress_on_stop = true + @logger.error? && @logger.error("Encountered request failure, stopping plugin..") + # We have to call do_stop on a a separate thread, since it waits for the handlers to finish + Thread.new {do_stop} + return true + end + return true end private # Beware, on old versions of manticore some uncommon failures are not handled def handle_failure(queue, name, request, exception, execution_time) + check_failure_state(queue, name, request, exception, nil) @logger.debug? && @logger.debug("failed fetching url", name: name, url: request) event = event_factory.new_event event.tag("_http_request_failure") apply_metadata(event, name, request, nil, execution_time) apply_failure_fields(event, name, request, exception, execution_time) - queue << event rescue StandardError, java.lang.Exception => e @logger.error? && @logger.error("Cannot read URL or send the error as an event!", @@ -280,6 +453,8 @@ def handle_failure(queue, name, request, exception, execution_time) :exception_backtrace => e.backtrace, :name => name, :url => request) + ensure + @state_handler.delete_page(name, request) unless @persist_in_progress_on_stop || !request[2].nil? && !request[2][:retried].nil? end private @@ -314,8 +489,10 @@ def apply_failure_fields(event, name, request, exception, execution_time) event.set(@fail_response_time_s_field, execution_time) if @fail_response_time_s_field event.set(@fail_response_time_ns_field, to_nanoseconds(execution_time)) if @fail_response_time_ns_field - event.set(@error_msg_field, exception.to_s) - event.set(@stack_trace_field, exception.backtrace) + if not exception.nil? + event.set(@error_msg_field, exception.to_s) + event.set(@stack_trace_field, exception.backtrace) + end end private diff --git a/lib/logstash/inputs/http_poller/state_handler.rb b/lib/logstash/inputs/http_poller/state_handler.rb new file mode 100644 index 0000000..e2a8467 --- /dev/null +++ b/lib/logstash/inputs/http_poller/state_handler.rb @@ -0,0 +1,138 @@ +# encoding: utf-8 +require "logstash/namespace" +require "immutable/sorted_set" +require "fileutils" + +module LogStash + module Inputs + module HTTPPoller + class StateHandler + def initialize(logger, requests) + @logger = logger + @state_path = ::File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "http_poller", "state") + FileUtils.mkpath @state_path + @requests = requests + @pages_mutex = Mutex.new + @in_progress_pages = nil + @pages_signal = ConditionVariable.new + @stop_writer = false + @state_writer_thread = nil + end + + attr_reader :in_progress_pages + + public + def signal_waiting_threads + @pages_signal.broadcast + end + + private + def get_state_file_path(name) + return ::File.join(@state_path, "state_" + name) + end + + private + def atomic_state_write(file_path, current_page) + ::File.write(file_path + ".tmp", YAML.dump([@in_progress_pages.to_a, current_page.get]), mode: "w") + ::File.rename(file_path + ".tmp", file_path) + end + + public + def write_state(name, last_run_metadata_path, current_page) + file_path = last_run_metadata_path.nil? ? get_state_file_path(name) : last_run_metadata_path + atomic_state_write(file_path, current_page) + end + + private + def start_pagination_state_writer(write_interval, name, last_run_metadata_path, current_page) + thr = Thread.new { + file_path = last_run_metadata_path.nil? ? get_state_file_path(name) : last_run_metadata_path + while true do + if @stop_writer + break + end + atomic_state_write(file_path, current_page) + sleep write_interval + end + } + return thr + end + + public + def stop_pagination_state_writer() + @stop_writer = true + @state_writer_thread.join(25) + @stop_writer = false + end + + private + def start_with_default_values(name, last_run_metadata_path, default_start_page, write_interval) + @in_progress_pages = Immutable::SortedSet.new + start_page = java.util.concurrent.atomic.AtomicInteger.new(default_start_page) + @state_writer_thread = start_pagination_state_writer(write_interval, name, last_run_metadata_path, start_page) + return start_page, [] + end + + public + def start_paginated_request(name, file_path, default_start_page, write_interval) + file_path = get_state_file_path(name) if file_path.nil? + begin + pages, current_page = YAML.load_file(file_path) + return start_with_default_values(name, file_path, default_start_page, write_interval) if !pages.is_a?(Array) || !current_page.is_a?(Integer) + current_page_atomic = java.util.concurrent.atomic.AtomicInteger.new(current_page) + @state_writer_thread = start_pagination_state_writer(write_interval, name, file_path, current_page_atomic) + @in_progress_pages = Immutable::SortedSet.new(pages) + @logger.info? && @logger.info("Read status from file for url %s" % [name]) + return current_page_atomic, pages + end + rescue Errno::ENOENT, SyntaxError + return start_with_default_values(name, file_path, default_start_page, write_interval) + end + + public + def add_page(name, page) + @pages_mutex.synchronize { + @in_progress_pages = @in_progress_pages.add(page.get) + } + end + + public + def delete_page(name, request) + if not @in_progress_pages.nil? + request_opts = request[2] + @pages_mutex.synchronize { + page = request_opts[:query][request_opts[:pagination]["page_parameter"]] + @in_progress_pages = @in_progress_pages.delete(Integer(page)) + @pages_signal.broadcast + } + end + end + + public + def wait_for_change(name, max_value, plugin) + @pages_mutex.synchronize { + while @in_progress_pages.size > max_value && !plugin.stop? do + @pages_signal.wait(@pages_mutex) + end + } + end + + public + def delete_state(name, last_run_metadata_path) + if not last_run_metadata_path.nil? + ::File.delete(last_run_metadata_path) + return + end + file_path = get_state_file_path(name) + ::File.delete(file_path) + rescue Errno::ENOENT + # it doesn't matter if the file does not exist + end + + def stop_paginated_request() + @in_progress_pages = nil + end + end + end + end +end diff --git a/logstash-input-http_poller.gemspec b/logstash-input-http_poller.gemspec index 027368f..989eedf 100644 --- a/logstash-input-http_poller.gemspec +++ b/logstash-input-http_poller.gemspec @@ -25,6 +25,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~>1.3' s.add_runtime_dependency 'logstash-mixin-event_support', '~> 1.0', '>= 1.0.1' s.add_runtime_dependency 'logstash-mixin-validator_support', '~> 1.0' + s.add_runtime_dependency "immutable-ruby" s.add_development_dependency 'logstash-codec-json' s.add_development_dependency 'logstash-codec-line' diff --git a/spec/inputs/http_poller_spec.rb b/spec/inputs/http_poller_spec.rb index 2f18710..4ade73e 100644 --- a/spec/inputs/http_poller_spec.rb +++ b/spec/inputs/http_poller_spec.rb @@ -6,6 +6,9 @@ # Workaround for the bug reported in https://github.com/jruby/jruby/issues/4637 require 'rspec/matchers/built_in/raise_error.rb' require 'logstash/plugin_mixins/ecs_compatibility_support/spec_helper' +require 'immutable/sorted_set' +require 'immutable/hash' +require 'securerandom' describe LogStash::Inputs::HTTP_Poller do let(:metadata_target) { "_http_poller_metadata" } @@ -61,6 +64,113 @@ end end + describe "#run_once using pagination" do + shared_examples "with 5 pages" do + shared_examples "send requests" do + context "with empty data folder" do + before(:example) do + subject { described_class.new(default_opts) } + default_urls.each do |name, url| + subject.instance_variable_get(:@state_handler).delete_state(name, url["pagination"]["last_run_metadata_path"]) + end + end + + it "should normalize the requests correctly" do + default_urls.each do |name, url| + subject.send(:normalize_request, url) + end + end + + it "should load the state from file" do + default_urls.each do |name, url| + # Write state to file + writer = described_class.new(default_opts) + writer.register + state_handler = writer.instance_variable_get(:@state_handler) + state_handler.instance_variable_set(:@in_progress_pages, Immutable::Set.new([9])) + writer.instance_variable_get(:@state_handler).write_state(name, pagination["last_run_metadata_path"], java.util.concurrent.atomic.AtomicInteger.new(9)) + + expect(subject).to receive(:request_bg).with(queue, name, anything).exactly(2).times + .and_invoke(lambda { |queue, name, request| + subject.instance_variable_get(:@state_handler).delete_page(name, request) + }) + subject.send(:run_once, queue) # :run_once is a private method + end + end + + it "should issue a request for each page" do + default_urls.each do |name, url| + expect(subject).to receive(:request_bg).with(queue, name, anything).exactly(5).times + .and_invoke(lambda { |queue, name, request| + subject.instance_variable_get(:@state_handler).delete_page(name, request) + }) + end + subject.send(:run_once, queue) # :run_once is a private method + # Check that each state file gets deleted + default_urls.each do |name, url| + if pagination["last_run_metadata_path"] + expect(File.exists? pagination["last_run_metadata_path"]).to eq(false) + else + expect(File.exists? subject.instance_variable_get(:@state_handler).send(:get_state_file_path, name)).to eq(false) + end + end + end + end + end + let(:default_urls) { + { + default_name => { + "url" => default_url, + "method" => "POST", + "pagination" => pagination + } + } + } + context "with default state file" do + let(:pagination) { + { + "start_page" => 6, + "end_page" => 10, + "concurrent_requests" => concurrent_requests, + "page_parameter" => "page" + } + } + include_examples "send requests" + end + + context "with custom state file" do + # Create unique directory per test + path = ::Dir.tmpdir + "/" + SecureRandom.uuid + ::Dir.mkdir(path) + let(:pagination) { + { + "start_page" => 6, + "end_page" => 10, + "concurrent_requests" => concurrent_requests, + "page_parameter" => "page", + "last_run_metadata_path" => path + "/rspec_custom_state_file" + } + } + include_examples "send requests" + end + end + + describe "with a big enough concurrent_requests value to run everything in one go" do + let(:concurrent_requests) { 5 } + include_examples "with 5 pages" + end + + describe "with a small concurrent_requests value to not run everything concurrently" do + let(:concurrent_requests) { 2 } + include_examples "with 5 pages" + end + + describe "with only 1 concurrent request" do + let(:concurrent_requests) { 1 } + include_examples "with 5 pages" + end + end + describe "normalizing a request spec" do shared_examples "a normalized request" do it "should set the method correctly" do @@ -112,6 +222,126 @@ end end + context "with failure handling parameters" do + let(:failure_mode) {"retry"} + let(:spec_url) { "http://localhost:3000" } + let(:spec_method) { "post" } + let(:spec_opts) { {:"failure_mode" => failure_mode} } + let(:url) do + { + "url" => spec_url, + "method" => spec_method, + }.merge(Hash[spec_opts.map {|k,v| [k.to_s,v]}]) + end + + context "with failure_mode stop" do + let(:failure_mode) {"stop"} + include_examples "a normalized request" + end + + context "with failure_mode retry" do + let(:spec_opts) { + { + :"failure_mode" => failure_mode, + :"retry_delay" => 3.3 + } + } + let(:failure_mode) {"retry"} + include_examples "a normalized request" + end + + context "with failure_mode continue" do + let(:failure_mode) {"continue"} + include_examples "a normalized request" + end + + context "with invalid failure_mode" do + let(:failure_mode) {"invalid"} + it "raises an error" do + expect { normalized }.to raise_error(LogStash::ConfigurationError) + end + end + context "with invalid success_status_codes" do + let(:spec_opts) { + { + :"failure_mode" => "stop", + :"success_status_codes" => "asd" + } + } + it "raises an error" do + expect { normalized }.to raise_error(LogStash::ConfigurationError) + end + end + end + + context "pagination" do + let(:spec_url) { "http://localhost:3000" } + let(:spec_method) { "post" } + let(:pagination) { + { + "start_page" => 2, + "end_page" => 3, + "page_parameter" => "page" + } + } + let(:url) do + { + "url" => spec_url, + "method" => spec_method, + "pagination" => pagination + } + end + context "with missing end_page" do + let(:pagination) { + { + "start_page" => 2, + "page_parameter" => "page" + } + } + it "should raise an error" do + expect { normalized }.to raise_error(LogStash::ConfigurationError) + end + end + context "with wrong type for start_page" do + let(:pagination) { + { + "start_page" => "wrong", + "end_page" => 1234, + "page_parameter" => "page" + } + } + it "should raise an error" do + expect { normalized }.to raise_error(LogStash::ConfigurationError) + end + end + context "with wrong type for concurrent_threads" do + let(:pagination) { + { + "start_page" => "wrong", + "end_page" => 1234, + "page_parameter" => "page", + "concurrent_threads" => [] + } + } + it "should raise an error" do + expect { normalized }.to raise_error(LogStash::ConfigurationError) + end + end + context "with wrong type for last_run_metadata_path" do + let(:pagination) { + { + "start_page" => 1, + "end_page" => 1234, + "page_parameter" => "page", + "last_run_metadata_path" => 123 + } + } + it "should raise an error" do + expect { normalized }.to raise_error(LogStash::ConfigurationError) + end + end + end + shared_examples "auth" do context "with auth enabled but no pass" do let(:auth) { {"user" => "foo"} } @@ -284,6 +514,75 @@ def run_plugin_and_yield_queue(plugin, sleep: nil) end end + describe "failure handling" do + let(:payload) { {"a" => 2, "hello" => ["a", "b", "c"]} } + let(:response_body) { LogStash::Json.dump(payload) } + let(:url) {"http://localhost:3000"} + let(:failure_mode) {"stop"} + let(:retry_delay) { 0.2 } + let(:default_url) { + { + "method" => "post", + "url" => url, + "failure_mode" => failure_mode, + "success_status_codes" => [200], + "retry_delay" => retry_delay + } + } + let(:opts) { default_opts } + let(:name) { default_name } + let(:code) { 202 } + + before do + plugin.register + u = url.is_a?(Hash) ? url["url"] : url # handle both complex specs and simple string URLs + plugin.client.stub(u, :body => response_body, :code => code) + allow(plugin).to receive(:decorate) + end + + after do + plugin.close + end + + context "with failure_mode stop" do + let(:failure_mode) { "stop" } + it "stops the plugin" do + expect(plugin.instance_variable_get(:@logger)).to receive(:error).with("Encountered request failure, stopping plugin..").once + plugin.send(:run_once, queue) + # stop gets called from another thread + sleep 0.5 + expect(plugin.stop?).to eq(true) + allow($stderr).to receive(:puts).and_call_original + end + end + + context "with failure_mode retry" do + let(:failure_mode) { "retry" } + it "retries the request" do + rcount = 0 + original = plugin.method(:request_async) + expect(plugin).to receive(:request_async).exactly(4).times.and_invoke(lambda { |queue, name, request| + rcount = rcount + 1 + return if rcount == 4 + allow(plugin.instance_variable_get(:@logger)).to receive(:warn?).and_return(true) + expect(plugin.instance_variable_get(:@logger)).to receive(:warn).once.with("Encountered request failure with url '%s', trying again after %d seconds.." % [default_name, retry_delay]) + original.call(queue, name, request) + }) + plugin.send(:run_once, queue) + end + end + + context "with failure_mode continue" do + let(:failure_mode) { "continue" } + it "passes the event to the queue normally" do + plugin.send(:run_once, queue) + expect(queue.size).to eq(1) + event = queue.pop(true) + expect(event.get("tags")).to eq(["_http_request_failure"]) + end + end + end + describe "events", :ecs_compatibility_support, :aggregate_failures do ecs_compatibility_matrix(:disabled, :v1, :v8 => :v1) do |ecs_select| before do From be746a9d8bee983c7e6f91c9f1828d20615c201d Mon Sep 17 00:00:00 2001 From: Tertsonen Toni Date: Thu, 29 Jun 2023 16:42:03 +0300 Subject: [PATCH 2/3] refactor: refactor http_poller.rb on-behalf-of: @insta-advance toni.tertsonen@insta.fi --- lib/logstash/inputs/http_poller.rb | 49 +++++++++++++++--------------- 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/lib/logstash/inputs/http_poller.rb b/lib/logstash/inputs/http_poller.rb index 17a0780..87cafd0 100644 --- a/lib/logstash/inputs/http_poller.rb +++ b/lib/logstash/inputs/http_poller.rb @@ -174,12 +174,12 @@ def validate_request!(url_or_spec, request) end if spec[:pagination] err_msg = "Pagination had a invalid value for concurrent_threads, start_page, end_page, page_parameter, last_run_metadata_path or delete_last_run_metadata!" - raise LogStash::ConfigurationError, err_msg if !spec[:pagination]["start_page"] || !spec[:pagination]["start_page"].is_a?(Integer) - raise LogStash::ConfigurationError, err_msg if !spec[:pagination]["end_page"] || !spec[:pagination]["end_page"].is_a?(Integer) - raise LogStash::ConfigurationError, err_msg if !spec[:pagination]["page_parameter"] || !spec[:pagination]["page_parameter"].is_a?(String) - raise LogStash::ConfigurationError, err_msg if spec[:pagination]["concurrent_threads"] && !spec[:pagination]["concurrent_threads"].is_a?(Integer) - raise LogStash::ConfigurationError, err_msg if spec[:pagination]["last_run_metadata_path"] && !spec[:pagination]["last_run_metadata_path"].is_a?(String) - raise LogStash::ConfigurationError, err_msg if spec[:pagination]["delete_last_run_metadata"] && !["true", "false"].include?(spec[:pagination]["delete_last_run_metadata"]) + raise LogStash::ConfigurationError, err_msg if (!spec[:pagination]["start_page"] || !spec[:pagination]["start_page"].is_a?(Integer)) || + (!spec[:pagination]["end_page"] || !spec[:pagination]["end_page"].is_a?(Integer)) || + (!spec[:pagination]["page_parameter"] || !spec[:pagination]["page_parameter"].is_a?(String)) || + (spec[:pagination]["concurrent_threads"] && !spec[:pagination]["concurrent_threads"].is_a?(Integer)) || + (spec[:pagination]["last_run_metadata_path"] && !spec[:pagination]["last_run_metadata_path"].is_a?(String)) || + (spec[:pagination]["delete_last_run_metadata"] && !["true", "false"].include?(spec[:pagination]["delete_last_run_metadata"])) end if spec[:failure_mode] raise LogStash::ConfigurationError, "Invalid value for failure_mode!" if !["retry", "stop", "continue"].include?(spec[:failure_mode]) @@ -216,14 +216,13 @@ def setup_schedule(queue) private def handle_pagination(queue, name, request) - method = request[0] - url = request[1] - pagination = request[2][:pagination] - max_threads = !pagination["concurrent_requests"].nil? ? pagination["concurrent_requests"] : 1 + method, url, pagination = request + pagination = pagination[:pagination] + concurrent_requests = !pagination["concurrent_requests"].nil? ? pagination["concurrent_requests"] : 1 state_file = pagination["last_run_metadata_path"] current_page, in_progress_pages = @state_handler.start_paginated_request(name, state_file, pagination["start_page"] - 1, 1) in_progress_pages.each do |page| - create_paginated_request(queue, name, request, page, max_threads, pagination) + create_paginated_request(queue, name, request, page, pagination) end client.execute! @@ -231,11 +230,11 @@ def handle_pagination(queue, name, request) break if stop? current_page.getAndIncrement @state_handler.add_page(name, current_page) - create_paginated_request(queue, name, request, current_page.get, max_threads, pagination) - if @state_handler.in_progress_pages.size >= max_threads + create_paginated_request(queue, name, request, current_page.get, pagination) + if @state_handler.in_progress_pages.size >= concurrent_requests client.execute! end - @state_handler.wait_for_change(name, max_threads - 1, self) + @state_handler.wait_for_change(name, concurrent_requests - 1, self) end client.execute! unless stop? @@ -251,7 +250,7 @@ def handle_pagination(queue, name, request) end private - def create_paginated_request(queue, name, request, current_page, max_threads, pagination) + def create_paginated_request(queue, name, request, current_page, pagination) # These have to be cloned so different requests don't use the same instance request = request.clone request[2] = request[2].clone @@ -357,16 +356,16 @@ def handle_decoded_event(queue, name, request, response, event, execution_time) apply_metadata(event, name, request, response, execution_time) decorate(event) queue << event - rescue StandardError, java.lang.Exception => e - @logger.error? && @logger.error("Error eventifying response!", - :exception => e, - :exception_message => e.message, - :name => name, - :url => request, - :response => response - ) - ensure - @state_handler.delete_page(name, request) unless @persist_in_progress_on_stop || !request[2].nil? && !request[2][:retried].nil? + rescue StandardError, java.lang.Exception => e + @logger.error? && @logger.error("Error eventifying response!", + :exception => e, + :exception_message => e.message, + :name => name, + :url => request, + :response => response + ) + ensure + @state_handler.delete_page(name, request) unless @persist_in_progress_on_stop || !request[2].nil? && !request[2][:retried].nil? end # returns true if request failed From bc17911536f5f4a17e1bbf73a0bd19fa74eb8a27 Mon Sep 17 00:00:00 2001 From: Toni Tertsonen Date: Wed, 19 Jul 2023 13:33:44 +0300 Subject: [PATCH 3/3] fix: fix tests fix tests by using java sortedset instead of immutable-ruby on-behalf-of: @insta-advance toni.tertsonen@insta.fi --- .../inputs/http_poller/state_handler.rb | 17 ++++++----------- logstash-input-http_poller.gemspec | 1 - spec/inputs/http_poller_spec.rb | 4 +--- 3 files changed, 7 insertions(+), 15 deletions(-) diff --git a/lib/logstash/inputs/http_poller/state_handler.rb b/lib/logstash/inputs/http_poller/state_handler.rb index e2a8467..2747412 100644 --- a/lib/logstash/inputs/http_poller/state_handler.rb +++ b/lib/logstash/inputs/http_poller/state_handler.rb @@ -1,6 +1,5 @@ # encoding: utf-8 require "logstash/namespace" -require "immutable/sorted_set" require "fileutils" module LogStash @@ -67,7 +66,7 @@ def stop_pagination_state_writer() private def start_with_default_values(name, last_run_metadata_path, default_start_page, write_interval) - @in_progress_pages = Immutable::SortedSet.new + @in_progress_pages = java.util.concurrent.ConcurrentSkipListSet.new start_page = java.util.concurrent.atomic.AtomicInteger.new(default_start_page) @state_writer_thread = start_pagination_state_writer(write_interval, name, last_run_metadata_path, start_page) return start_page, [] @@ -81,7 +80,7 @@ def start_paginated_request(name, file_path, default_start_page, write_interval) return start_with_default_values(name, file_path, default_start_page, write_interval) if !pages.is_a?(Array) || !current_page.is_a?(Integer) current_page_atomic = java.util.concurrent.atomic.AtomicInteger.new(current_page) @state_writer_thread = start_pagination_state_writer(write_interval, name, file_path, current_page_atomic) - @in_progress_pages = Immutable::SortedSet.new(pages) + @in_progress_pages = java.util.concurrent.ConcurrentSkipListSet.new(pages) @logger.info? && @logger.info("Read status from file for url %s" % [name]) return current_page_atomic, pages end @@ -91,20 +90,16 @@ def start_paginated_request(name, file_path, default_start_page, write_interval) public def add_page(name, page) - @pages_mutex.synchronize { - @in_progress_pages = @in_progress_pages.add(page.get) - } + @in_progress_pages.add(page.get) end public def delete_page(name, request) if not @in_progress_pages.nil? request_opts = request[2] - @pages_mutex.synchronize { - page = request_opts[:query][request_opts[:pagination]["page_parameter"]] - @in_progress_pages = @in_progress_pages.delete(Integer(page)) - @pages_signal.broadcast - } + page = request_opts[:query][request_opts[:pagination]["page_parameter"]] + @in_progress_pages.remove(Integer(page)) + @pages_signal.broadcast end end diff --git a/logstash-input-http_poller.gemspec b/logstash-input-http_poller.gemspec index 989eedf..027368f 100644 --- a/logstash-input-http_poller.gemspec +++ b/logstash-input-http_poller.gemspec @@ -25,7 +25,6 @@ Gem::Specification.new do |s| s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~>1.3' s.add_runtime_dependency 'logstash-mixin-event_support', '~> 1.0', '>= 1.0.1' s.add_runtime_dependency 'logstash-mixin-validator_support', '~> 1.0' - s.add_runtime_dependency "immutable-ruby" s.add_development_dependency 'logstash-codec-json' s.add_development_dependency 'logstash-codec-line' diff --git a/spec/inputs/http_poller_spec.rb b/spec/inputs/http_poller_spec.rb index 4ade73e..d8cb335 100644 --- a/spec/inputs/http_poller_spec.rb +++ b/spec/inputs/http_poller_spec.rb @@ -6,8 +6,6 @@ # Workaround for the bug reported in https://github.com/jruby/jruby/issues/4637 require 'rspec/matchers/built_in/raise_error.rb' require 'logstash/plugin_mixins/ecs_compatibility_support/spec_helper' -require 'immutable/sorted_set' -require 'immutable/hash' require 'securerandom' describe LogStash::Inputs::HTTP_Poller do @@ -87,7 +85,7 @@ writer = described_class.new(default_opts) writer.register state_handler = writer.instance_variable_get(:@state_handler) - state_handler.instance_variable_set(:@in_progress_pages, Immutable::Set.new([9])) + state_handler.instance_variable_set(:@in_progress_pages, java.util.concurrent.ConcurrentSkipListSet.new([9])) writer.instance_variable_get(:@state_handler).write_state(name, pagination["last_run_metadata_path"], java.util.concurrent.atomic.AtomicInteger.new(9)) expect(subject).to receive(:request_bg).with(queue, name, anything).exactly(2).times