From f53d0475a951d5a5b8f051e9ecdd9cbabcfee565 Mon Sep 17 00:00:00 2001
From: Pierre Jambet <pierre.jambet@gmail.com>
Date: Thu, 12 Nov 2020 12:46:15 -0500
Subject: [PATCH] Delaying & Scheduling extension: Wrap ops in multi/exec

Wrap write operations in multi/exec blocks to prevent any possibilities
of Redis being in an inconsistent state. It also has the added benefit
of providing small optimizations by reducing the number of round-trips.

The commit uses a verbose approach of wrapping most calls to
`redis.multi` in a `redis.pipelined` block, which is _technically_
unnecessary since the redis-rb gem does not send the `multi` & `exec`
command if it is pipelining the commands.

The reason this commit uses both is that both methods have different
semantics, `pipelined` is meant to be an optimization whereas `multi`
provides the guarantees of a Redis transaction.

While it may seem unlikely, it seems possible that future versions of
the redis gem change the underlying behavior of the `multi` and
`pipelined` commands. Additionally, it is a little bit more explicit in
terms of describing intent: "I want these commands to be run in a atomic
fashion, and I'm ok sending it all at once".

The call to `redis.del(key)` in `clean_up_timestamp` was unnecessary
since the only reason that would cause the `llen` call above to return
`0` is if the list did not exist.

The call to `pipelined` in this example might seem even more overkill
since we only give a single command to `multi`, but `multi`/`exec` are
themselves commands, so in the eventuality that the redis gem would
start sending the `multi` command right away in a future version,
wrapping it in a `pipelined` call is explicitly asking it to send
`multi`/`zrem`/`exec` all at once.
---
 lib/resque/scheduler/delaying_extensions.rb   | 32 +++++++++++--------
 lib/resque/scheduler/scheduling_extensions.rb | 24 +++++++++-----
 2 files changed, 34 insertions(+), 22 deletions(-)

diff --git a/lib/resque/scheduler/delaying_extensions.rb b/lib/resque/scheduler/delaying_extensions.rb
index 4658ed06..a7e84898 100644
--- a/lib/resque/scheduler/delaying_extensions.rb
+++ b/lib/resque/scheduler/delaying_extensions.rb
@@ -68,16 +68,20 @@ def enqueue_in_with_queue(queue, number_of_seconds_from_now,
       # insertion time complexity is O(log(n)). Returns true if it's
       # the first job to be scheduled at that time, else false.
       def delayed_push(timestamp, item)
-        # First add this item to the list for this timestamp
-        redis.rpush("delayed:#{timestamp.to_i}", encode(item))
-
-        # Store the timestamps at with this item occurs
-        redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}")
-
-        # Now, add this timestamp to the zsets.  The score and the value are
-        # the same since we'll be querying by timestamp, and we don't have
-        # anything else to store.
-        redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i
+        redis.pipelined do
+          redis.multi do
+            # First add this item to the list for this timestamp
+            redis.rpush("delayed:#{timestamp.to_i}", encode(item))
+
+            # Store the timestamps at with this item occurs
+            redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}")
+
+            # Now, add this timestamp to the zsets.  The score and the value are
+            # the same since we'll be querying by timestamp, and we don't have
+            # anything else to store.
+            redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i
+          end
+        end
       end
 
       # Returns an array of timestamps based on start and count
@@ -318,10 +322,10 @@ def clean_up_timestamp(key, timestamp)
         # queue while we're removing it.
         redis.watch(key) do
           if redis.llen(key).to_i == 0
-            # If the list is empty, remove it.
-            redis.multi do
-              redis.del(key)
-              redis.zrem(:delayed_queue_schedule, timestamp.to_i)
+            redis.pipelined do
+              redis.multi do
+                redis.zrem(:delayed_queue_schedule, timestamp.to_i)
+              end
             end
           else
             redis.redis.unwatch
diff --git a/lib/resque/scheduler/scheduling_extensions.rb b/lib/resque/scheduler/scheduling_extensions.rb
index f5aa8157..6af42f94 100644
--- a/lib/resque/scheduler/scheduling_extensions.rb
+++ b/lib/resque/scheduler/scheduling_extensions.rb
@@ -85,13 +85,17 @@ def all_schedules
       def set_schedule(name, config, reload = true)
         persist = config.delete(:persist) || config.delete('persist')
 
-        if persist
-          redis.hset(:persistent_schedules, name, encode(config))
-        else
-          non_persistent_schedules[name] = decode(encode(config))
+        redis.pipelined do
+          redis.multi do
+            if persist
+              redis.hset(:persistent_schedules, name, encode(config))
+            else
+              non_persistent_schedules[name] = decode(encode(config))
+            end
+
+            redis.sadd(:schedules_changed, name)
+          end
         end
-
-        redis.sadd(:schedules_changed, name)
         reload_schedule! if reload
       end
 
@@ -104,8 +108,12 @@ def fetch_schedule(name)
       # Preventing a reload is optional and available to batch operations
       def remove_schedule(name, reload = true)
         non_persistent_schedules.delete(name)
-        redis.hdel(:persistent_schedules, name)
-        redis.sadd(:schedules_changed, name)
+        redis.pipelined do
+          redis.multi do
+            redis.hdel(:persistent_schedules, name)
+            redis.sadd(:schedules_changed, name)
+          end
+        end
 
         reload_schedule! if reload
       end