Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Added

- [Deferred] Support configurable retry options with `max_retries` and `retry_interval` by [@Digvijay](https://github.com/Digvijay-x1) (#215).
- [Cable] Add support for `stop_stream_from` and `stop_stream_for` by [@Digvijay](https://github.com/Digvijay-x1) (#217).

## [1.21.1] - 2026-02-27
Expand Down
2 changes: 1 addition & 1 deletion lib/rage/deferred/metadata.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def retrying?
# @return [Boolean] `true` if a failure will schedule another attempt, `false` otherwise
def will_retry?
task = Rage::Deferred::Context.get_task(context)
task.__should_retry?(attempts)
!!task.__next_retry_in(attempts, nil)
end

private
Expand Down
9 changes: 5 additions & 4 deletions lib/rage/deferred/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ def schedule(task_id, context, publish_in: nil)
Fiber.schedule do
Iodine.task_inc!

is_completed = task.new.__perform(context)
result = task.new.__perform(context)

if is_completed
if result == true
@backend.remove(task_id)
else
attempts = Rage::Deferred::Context.inc_attempts(context)
if task.__should_retry?(attempts)
enqueue(context, delay: task.__next_retry_in(attempts), task_id:)
retry_in = task.__next_retry_in(attempts, result)
if retry_in
enqueue(context, delay: retry_in, task_id:)
else
@backend.remove(task_id)
end
Expand Down
77 changes: 72 additions & 5 deletions lib/rage/deferred/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def __perform(context)
Rage.logger.error("Deferred task failed with exception: #{e.class} (#{e.message}):\n#{e.backtrace.join("\n")}")
end
end
false
e
end

private def restore_log_info(context)
Expand All @@ -105,6 +105,62 @@ def self.included(klass)
end

module ClassMethods
# Set the maximum number of retry attempts for this task.
#
# @param count [Integer] the maximum number of retry attempts
# @example
# class SendWelcomeEmail
# include Rage::Deferred::Task
# max_retries 10
#
# def perform(email)
# # ...
# end
# end
def max_retries(count)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, this actually defines attempts rather than retries. Let's ensure that max_retries 3 means the task will be executed up to 4 times.

value = Integer(count)

if value.negative?
raise ArgumentError, "max_retries should be a valid non-negative integer"
end

@__max_retries = value
rescue ArgumentError, TypeError
raise ArgumentError, "max_retries should be a valid non-negative integer"
end

# Override this method to customize retry behavior per exception.
#
# Return an Integer to retry in that many seconds.
# Return `super` to use the default exponential backoff.
# Return `false` or `nil` to abort retries.
#
# @param exception [Exception] the exception that caused the failure
# @param attempt [Integer] the current attempt number (1-indexed)
# @return [Integer, false, nil] the retry interval in seconds, or false/nil to abort
# @example
# class ProcessPayment
# include Rage::Deferred::Task
#
# def self.retry_interval(exception, attempt:)
# case exception
# when TemporaryNetworkError
# 10 # Retry in 10 seconds
# when InvalidDataError
# false # Do not retry
# else
# super # Default backoff strategy
# end
# end
#
# def perform(payment_id)
# # ...
# end
# end
def retry_interval(exception, attempt:)
__default_backoff(attempt)
end

def enqueue(*args, delay: nil, delay_until: nil, **kwargs)
context = Rage::Deferred::Context.build(self, args, kwargs)

Expand All @@ -118,13 +174,24 @@ def enqueue(*args, delay: nil, delay_until: nil, **kwargs)
end

# @private
def __should_retry?(attempts)
attempts < MAX_ATTEMPTS
def __next_retry_in(attempts, exception)
max = @__max_retries || MAX_ATTEMPTS
return if attempts > max

interval = retry_interval(exception, attempt: attempts)
return if !interval

unless interval.is_a?(Numeric)
Rage.logger.warn("#{name}.retry_interval returned #{interval.class}, expected Numeric, false, or nil; falling back to default backoff")
return __default_backoff(attempts)
end

interval
end

# @private
def __next_retry_in(attempts)
rand(BACKOFF_INTERVAL * 2**attempts.to_i) + 1
def __default_backoff(attempt)
rand(BACKOFF_INTERVAL * 2**attempt) + 1
end
end
end
11 changes: 8 additions & 3 deletions spec/deferred/metadata_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,14 @@
Rage::Deferred::Context.inc_attempts(context)
end

it "delegates to Task.__should_retry?" do
expect(task).to receive(:__should_retry?).with(2).and_return(:should_retry_result)
expect(subject.will_retry?).to eq(:should_retry_result)
it "delegates to Task.__next_retry_in" do
expect(task).to receive(:__next_retry_in).with(2, nil).and_return(10)
expect(subject.will_retry?).to eq(true)
end

it "returns false when __next_retry_in returns nil" do
expect(task).to receive(:__next_retry_in).with(2, nil).and_return(nil)
expect(subject.will_retry?).to eq(false)
end
end
end
12 changes: 7 additions & 5 deletions spec/deferred/queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

RSpec.describe Rage::Deferred::Queue do
let(:backend) { double("backend", add: "task-id", remove: nil) }
let(:task_class) { double("task_class", __should_retry?: false) }
let(:task_class) { double("task_class") }
let(:task_instance) { double("task_instance", __perform: true) }

let(:task_context) { "TestTask" }
let(:backpressure_config) { nil }

Expand Down Expand Up @@ -99,23 +100,24 @@
end

context "when task fails" do
let(:error) { StandardError.new("Something went wrong") }

before do
allow(task_instance).to receive(:__perform).and_return(false)
allow(task_instance).to receive(:__perform).and_return(error)
allow(Rage::Deferred::Context).to receive(:inc_attempts).with(task_context).and_return(1)
end

context "and should be retried" do
it "re-enqueues the task with a delay" do
allow(task_class).to receive(:__should_retry?).with(1).and_return(true)
allow(task_class).to receive(:__next_retry_in).with(1).and_return(30)
allow(task_class).to receive(:__next_retry_in).with(1, error).and_return(30)
expect(subject).to receive(:enqueue).with(task_context, delay: 30, task_id: "task-id")
subject.schedule("task-id", task_context)
end
end

context "and should not be retried" do
it "removes the task from the backend" do
allow(task_class).to receive(:__should_retry?).with(1).and_return(false)
allow(task_class).to receive(:__next_retry_in).with(1, error).and_return(nil)
expect(backend).to receive(:remove).with("task-id")
subject.schedule("task-id", task_context)
end
Expand Down
Loading
Loading