From a53c53390d8cf20d165065251527cd76edb8a73c Mon Sep 17 00:00:00 2001 From: Digvijay Singh Rawat Date: Sun, 1 Mar 2026 03:27:11 +0530 Subject: [PATCH] [Deferred] Add configurable retry options with max_retries and retry_interval Add two class-level APIs for customizing retry behavior in deferred tasks: - max_retries: sets the maximum number of retries (max_retries 3 means the task will be executed up to 4 times total). Validates the argument via to_i at class-load time to fail fast on invalid types. - retry_interval: an overridable class method that receives the exception and attempt number, returning an interval in seconds (Numeric), or false/nil to abort retries Consolidate retry logic into __next_retry_in as the single source of truth for both whether and when to retry, removing the separate __should_retry? method. The max-retries cap is enforced in __next_retry_in so that custom retry_interval overrides cannot create unbounded retries. Extract __default_backoff as the single source for the exponential backoff formula, eliminating duplication between retry_interval and __next_retry_in. Validate retry_interval return values: accept any Numeric (Integer or Float), treat false/nil as abort, and log a warning for unexpected types while falling back to the default exponential backoff. Update __perform to return the exception object on failure instead of false, enabling retry_interval to make decisions based on exception type. Add comprehensive specs covering custom retry intervals, exception-based routing, input validation, bounded retries with custom intervals, and edge cases (Float, nil, false, 0, negative, String, Array). --- CHANGELOG.md | 1 + lib/rage/deferred/metadata.rb | 2 +- lib/rage/deferred/queue.rb | 9 +- lib/rage/deferred/task.rb | 77 ++++++++++++- spec/deferred/metadata_spec.rb | 11 +- spec/deferred/queue_spec.rb | 12 +- spec/deferred/task_spec.rb | 195 ++++++++++++++++++++++++++++++--- 7 files changed, 275 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c141ddcf..831ca6f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Added +- [Deferred] Support configurable retry options with `max_retries` and `retry_interval` by [@Digvijay](https://github.com/Digvijay-x1) (#215). - [Cable] Add support for `stop_stream_from` and `stop_stream_for` by [@Digvijay](https://github.com/Digvijay-x1) (#217). ## [1.21.1] - 2026-02-27 diff --git a/lib/rage/deferred/metadata.rb b/lib/rage/deferred/metadata.rb index 5c415ab7..00dec22a 100644 --- a/lib/rage/deferred/metadata.rb +++ b/lib/rage/deferred/metadata.rb @@ -27,7 +27,7 @@ def retrying? # @return [Boolean] `true` if a failure will schedule another attempt, `false` otherwise def will_retry? task = Rage::Deferred::Context.get_task(context) - task.__should_retry?(attempts) + !!task.__next_retry_in(attempts, nil) end private diff --git a/lib/rage/deferred/queue.rb b/lib/rage/deferred/queue.rb index d5867de5..1912c71d 100644 --- a/lib/rage/deferred/queue.rb +++ b/lib/rage/deferred/queue.rb @@ -38,14 +38,15 @@ def schedule(task_id, context, publish_in: nil) Fiber.schedule do Iodine.task_inc! - is_completed = task.new.__perform(context) + result = task.new.__perform(context) - if is_completed + if result == true @backend.remove(task_id) else attempts = Rage::Deferred::Context.inc_attempts(context) - if task.__should_retry?(attempts) - enqueue(context, delay: task.__next_retry_in(attempts), task_id:) + retry_in = task.__next_retry_in(attempts, result) + if retry_in + enqueue(context, delay: retry_in, task_id:) else @backend.remove(task_id) end diff --git a/lib/rage/deferred/task.rb b/lib/rage/deferred/task.rb index 90defc06..820b98cd 100644 --- a/lib/rage/deferred/task.rb +++ b/lib/rage/deferred/task.rb @@ -85,7 +85,7 @@ def __perform(context) Rage.logger.error("Deferred task failed with exception: #{e.class} (#{e.message}):\n#{e.backtrace.join("\n")}") end end - false + e end private def restore_log_info(context) @@ -105,6 +105,62 @@ def self.included(klass) end module ClassMethods + # Set the maximum number of retry attempts for this task. + # + # @param count [Integer] the maximum number of retry attempts + # @example + # class SendWelcomeEmail + # include Rage::Deferred::Task + # max_retries 10 + # + # def perform(email) + # # ... + # end + # end + def max_retries(count) + value = Integer(count) + + if value.negative? + raise ArgumentError, "max_retries should be a valid non-negative integer" + end + + @__max_retries = value + rescue ArgumentError, TypeError + raise ArgumentError, "max_retries should be a valid non-negative integer" + end + + # Override this method to customize retry behavior per exception. + # + # Return an Integer to retry in that many seconds. + # Return `super` to use the default exponential backoff. + # Return `false` or `nil` to abort retries. + # + # @param exception [Exception] the exception that caused the failure + # @param attempt [Integer] the current attempt number (1-indexed) + # @return [Integer, false, nil] the retry interval in seconds, or false/nil to abort + # @example + # class ProcessPayment + # include Rage::Deferred::Task + # + # def self.retry_interval(exception, attempt:) + # case exception + # when TemporaryNetworkError + # 10 # Retry in 10 seconds + # when InvalidDataError + # false # Do not retry + # else + # super # Default backoff strategy + # end + # end + # + # def perform(payment_id) + # # ... + # end + # end + def retry_interval(exception, attempt:) + __default_backoff(attempt) + end + def enqueue(*args, delay: nil, delay_until: nil, **kwargs) context = Rage::Deferred::Context.build(self, args, kwargs) @@ -118,13 +174,24 @@ def enqueue(*args, delay: nil, delay_until: nil, **kwargs) end # @private - def __should_retry?(attempts) - attempts < MAX_ATTEMPTS + def __next_retry_in(attempts, exception) + max = @__max_retries || MAX_ATTEMPTS + return if attempts > max + + interval = retry_interval(exception, attempt: attempts) + return if !interval + + unless interval.is_a?(Numeric) + Rage.logger.warn("#{name}.retry_interval returned #{interval.class}, expected Numeric, false, or nil; falling back to default backoff") + return __default_backoff(attempts) + end + + interval end # @private - def __next_retry_in(attempts) - rand(BACKOFF_INTERVAL * 2**attempts.to_i) + 1 + def __default_backoff(attempt) + rand(BACKOFF_INTERVAL * 2**attempt) + 1 end end end diff --git a/spec/deferred/metadata_spec.rb b/spec/deferred/metadata_spec.rb index e69dd16f..a3cc9ac8 100644 --- a/spec/deferred/metadata_spec.rb +++ b/spec/deferred/metadata_spec.rb @@ -69,9 +69,14 @@ Rage::Deferred::Context.inc_attempts(context) end - it "delegates to Task.__should_retry?" do - expect(task).to receive(:__should_retry?).with(2).and_return(:should_retry_result) - expect(subject.will_retry?).to eq(:should_retry_result) + it "delegates to Task.__next_retry_in" do + expect(task).to receive(:__next_retry_in).with(2, nil).and_return(10) + expect(subject.will_retry?).to eq(true) + end + + it "returns false when __next_retry_in returns nil" do + expect(task).to receive(:__next_retry_in).with(2, nil).and_return(nil) + expect(subject.will_retry?).to eq(false) end end end diff --git a/spec/deferred/queue_spec.rb b/spec/deferred/queue_spec.rb index 06061271..67e55ae6 100644 --- a/spec/deferred/queue_spec.rb +++ b/spec/deferred/queue_spec.rb @@ -2,8 +2,9 @@ RSpec.describe Rage::Deferred::Queue do let(:backend) { double("backend", add: "task-id", remove: nil) } - let(:task_class) { double("task_class", __should_retry?: false) } + let(:task_class) { double("task_class") } let(:task_instance) { double("task_instance", __perform: true) } + let(:task_context) { "TestTask" } let(:backpressure_config) { nil } @@ -99,15 +100,16 @@ end context "when task fails" do + let(:error) { StandardError.new("Something went wrong") } + before do - allow(task_instance).to receive(:__perform).and_return(false) + allow(task_instance).to receive(:__perform).and_return(error) allow(Rage::Deferred::Context).to receive(:inc_attempts).with(task_context).and_return(1) end context "and should be retried" do it "re-enqueues the task with a delay" do - allow(task_class).to receive(:__should_retry?).with(1).and_return(true) - allow(task_class).to receive(:__next_retry_in).with(1).and_return(30) + allow(task_class).to receive(:__next_retry_in).with(1, error).and_return(30) expect(subject).to receive(:enqueue).with(task_context, delay: 30, task_id: "task-id") subject.schedule("task-id", task_context) end @@ -115,7 +117,7 @@ context "and should not be retried" do it "removes the task from the backend" do - allow(task_class).to receive(:__should_retry?).with(1).and_return(false) + allow(task_class).to receive(:__next_retry_in).with(1, error).and_return(nil) expect(backend).to receive(:remove).with("task-id") subject.schedule("task-id", task_context) end diff --git a/spec/deferred/task_spec.rb b/spec/deferred/task_spec.rb index a2572b4b..8cce2131 100644 --- a/spec/deferred/task_spec.rb +++ b/spec/deferred/task_spec.rb @@ -43,23 +43,190 @@ def perform(arg, kwarg:) end end - describe ".__should_retry?" do - it "returns true if attempts are less than max" do - expect(task_class.__should_retry?(4)).to be(true) + describe ".__next_retry_in" do + it "returns the next retry interval with exponential backoff" do + expect(task_class.__next_retry_in(0, nil)).to be_between(1, 5) + expect(task_class.__next_retry_in(1, nil)).to be_between(1, 10) + expect(task_class.__next_retry_in(2, nil)).to be_between(1, 20) + expect(task_class.__next_retry_in(3, nil)).to be_between(1, 40) + expect(task_class.__next_retry_in(4, nil)).to be_between(1, 80) end - it "returns false if attempts are equal to max" do - expect(task_class.__should_retry?(5)).to be(false) + it "returns nil when attempts exceed max" do + expect(task_class.__next_retry_in(5, nil)).to be_between(1, 160) + expect(task_class.__next_retry_in(6, nil)).to be_nil end end - describe ".__next_retry_in" do - it "returns the next retry interval with exponential backoff" do - expect(task_class.__next_retry_in(0)).to be_between(1, 5) - expect(task_class.__next_retry_in(1)).to be_between(1, 10) - expect(task_class.__next_retry_in(2)).to be_between(1, 20) - expect(task_class.__next_retry_in(3)).to be_between(1, 40) - expect(task_class.__next_retry_in(4)).to be_between(1, 80) + describe ".max_retries" do + context "with custom max" do + before { task_class.max_retries(3) } + + it "retries up to custom max" do + expect(task_class.__next_retry_in(3, StandardError.new)).to be_a(Numeric) + end + + it "stops after custom max" do + expect(task_class.__next_retry_in(4, StandardError.new)).to be_nil + end + + it "means the task is executed up to 4 times total" do + # attempt 1 = original, attempt 2-4 = retries + expect(task_class.__next_retry_in(1, StandardError.new)).to be_a(Numeric) + expect(task_class.__next_retry_in(2, StandardError.new)).to be_a(Numeric) + expect(task_class.__next_retry_in(3, StandardError.new)).to be_a(Numeric) + expect(task_class.__next_retry_in(4, StandardError.new)).to be_nil + end + end + + context "input validation" do + it "converts string to integer" do + task_class.max_retries("3") + expect(task_class.__next_retry_in(3, StandardError.new)).to be_a(Numeric) + expect(task_class.__next_retry_in(4, StandardError.new)).to be_nil + end + + it "converts float to integer" do + task_class.max_retries(2.9) + expect(task_class.__next_retry_in(2, StandardError.new)).to be_a(Numeric) + expect(task_class.__next_retry_in(3, StandardError.new)).to be_nil + end + + it "raises ArgumentError for negative values" do + expect { task_class.max_retries(-1) }. + to raise_error(ArgumentError, /max_retries should be a valid non-negative integer/) + end + + it "raises ArgumentError for non-integer strings" do + expect { task_class.max_retries("abc") }. + to raise_error(ArgumentError, /max_retries should be a valid non-negative integer/) + end + + it "raises ArgumentError for nil" do + expect { task_class.max_retries(nil) }. + to raise_error(ArgumentError, /max_retries should be a valid non-negative integer/) + end + end + end + + describe ".retry_interval" do + context "default behavior (no override)" do + it "returns an interval for any attempt" do + interval = task_class.retry_interval(StandardError.new, attempt: 1) + expect(interval).to be_a(Integer) + expect(interval).to be >= 1 + end + + it "always returns a backoff (max check is in __next_retry_in)" do + expect(task_class.retry_interval(StandardError.new, attempt: 5)).to be_a(Integer) + expect(task_class.retry_interval(StandardError.new, attempt: 6)).to be_a(Integer) + end + end + + context "with override" do + let(:temporary_error) { Class.new(StandardError) } + let(:fatal_error) { Class.new(StandardError) } + + before do + tmp_err = temporary_error + fat_err = fatal_error + + task_class.define_singleton_method(:retry_interval) do |exception, attempt:| + case exception + when tmp_err + 10 + when fat_err + false + else + super(exception, attempt: attempt) + end + end + end + + it "returns custom interval for matching exception" do + expect(task_class.retry_interval(temporary_error.new, attempt: 1)).to eq(10) + end + + it "returns false for non-retryable exception" do + expect(task_class.retry_interval(fatal_error.new, attempt: 1)).to be(false) + end + + it "falls back to default for unmatched exception" do + interval = task_class.retry_interval(StandardError.new, attempt: 1) + expect(interval).to be_a(Integer) + expect(interval).to be >= 1 + end + + it "__next_retry_in returns interval for retryable" do + expect(task_class.__next_retry_in(1, temporary_error.new)).to eq(10) + end + + it "__next_retry_in returns nil for non-retryable" do + expect(task_class.__next_retry_in(1, fatal_error.new)).to be_nil + end + + it "__next_retry_in uses default backoff for unmatched" do + interval = task_class.__next_retry_in(1, StandardError.new) + expect(interval).to be_between(1, 10) + end + + it "__next_retry_in enforces max_retries even with custom interval" do + task_class.max_retries(2) + # attempt 1 & 2 should retry with custom interval + expect(task_class.__next_retry_in(1, temporary_error.new)).to eq(10) + expect(task_class.__next_retry_in(2, temporary_error.new)).to eq(10) + # attempt 3 should be capped by max_retries + expect(task_class.__next_retry_in(3, temporary_error.new)).to be_nil + end + end + + context "with edge case return values" do + let(:logger) { double(warn: nil) } + + before do + allow(Rage).to receive(:logger).and_return(logger) + end + + it "accepts a Float return value" do + task_class.define_singleton_method(:retry_interval) { |_exception, attempt:| 2.5 } + expect(task_class.__next_retry_in(1, StandardError.new)).to eq(2.5) + end + + it "returns nil when retry_interval returns nil" do + task_class.define_singleton_method(:retry_interval) { |_exception, attempt:| nil } + expect(task_class.__next_retry_in(1, StandardError.new)).to be_nil + end + + it "returns nil when retry_interval returns false" do + task_class.define_singleton_method(:retry_interval) { |_exception, attempt:| false } + expect(task_class.__next_retry_in(1, StandardError.new)).to be_nil + end + + it "accepts zero as a valid interval" do + task_class.define_singleton_method(:retry_interval) { |_exception, attempt:| 0 } + expect(task_class.__next_retry_in(1, StandardError.new)).to eq(0) + end + + it "accepts a negative number as a Numeric" do + task_class.define_singleton_method(:retry_interval) { |_exception, attempt:| -5 } + expect(task_class.__next_retry_in(1, StandardError.new)).to eq(-5) + end + + it "logs a warning and falls back to default backoff for String" do + task_class.define_singleton_method(:retry_interval) { |_exception, attempt:| "invalid" } + result = task_class.__next_retry_in(1, StandardError.new) + expect(result).to be_a(Numeric) + expect(result).to be >= 1 + expect(logger).to have_received(:warn).with(/returned String, expected Numeric/) + end + + it "logs a warning and falls back to default backoff for Array" do + task_class.define_singleton_method(:retry_interval) { |_exception, attempt:| [10] } + result = task_class.__next_retry_in(1, StandardError.new) + expect(result).to be_a(Numeric) + expect(result).to be >= 1 + expect(logger).to have_received(:warn).with(/returned Array, expected Numeric/) + end end end @@ -145,8 +312,8 @@ def perform(arg, kwarg:) expect(logger).to have_received(:error).with("Deferred task failed with exception: StandardError (Something went wrong):\nline 1\nline 2") end - it "returns false" do - expect(task.__perform(context)).to be(false) + it "returns the exception" do + expect(task.__perform(context)).to be(error) end context "with suppressed exception logging" do