Skip to content

Commit

Permalink
feat: support redis sentinel url
Browse files Browse the repository at this point in the history
  • Loading branch information
alisterd51 authored and antoinerabany committed Jan 24, 2024
1 parent eccd485 commit 2b82b05
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 31 deletions.
6 changes: 6 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ Changelog

All notable changes to this project will be documented in this file.

`3.2.0`_ -- 2024-01-23
----------------------
Feat
^^^^
* Support redis sentinel url

`3.1.3`_ -- 2023-12-27
----------------------
Chore
Expand Down
6 changes: 2 additions & 4 deletions remoulade/cancel/backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import redis

from ...helpers.redis_client import redis_client
from ..backend import CancelBackend


Expand Down Expand Up @@ -55,11 +56,8 @@ def __init__(
) -> None:
super().__init__(cancellation_ttl=cancellation_ttl)

if url:
parameters["connection_pool"] = redis.ConnectionPool.from_url(url)

self.client = client or redis_client(url=url, **parameters)
self.key = key
self.client = client or redis.Redis(**parameters)

def is_canceled(self, message_id: str, composition_id: Optional[str]) -> bool:
try:
Expand Down
22 changes: 22 additions & 0 deletions remoulade/helpers/redis_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import os
from typing import Optional
from urllib.parse import urlparse

import redis


def redis_client(url: Optional[str], **parameters):
""""""
if url:
url_parsed = urlparse(url)
if url_parsed.scheme == "sentinel":
sentinel = redis.Sentinel(
[(url_parsed.hostname, url_parsed.port)],
sentinel_kwargs={"password": url_parsed.password},
)
return sentinel.master_for(
service_name=os.path.normpath(url_parsed.path).split("/")[1], password=url_parsed.password
)
else:
parameters["connection_pool"] = redis.ConnectionPool.from_url(url)
return redis.Redis(**parameters)
22 changes: 10 additions & 12 deletions remoulade/rate_limits/backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from typing import Callable, List

import redis
from redis import WatchError

from ...helpers.redis_client import redis_client
from ..backend import RateLimiterBackend


Expand All @@ -36,10 +37,7 @@ class RedisBackend(RateLimiterBackend):
"""

def __init__(self, *, client=None, url=None, **parameters):
if url is not None:
parameters["connection_pool"] = redis.ConnectionPool.from_url(url)

self.client = client or redis.Redis(**parameters)
self.client = client or redis_client(url=url, **parameters)

def add(self, key: str, value: int, ttl: int) -> bool:
return bool(self.client.set(key, value, px=ttl, nx=True))
Expand All @@ -49,7 +47,7 @@ def incr(self, key: str, amount: int, maximum: int, ttl: int) -> bool:
while True:
try:
pipe.watch(key)
value = int(pipe.get(key) or b"0") # type: ignore
value = int(pipe.get(key) or b"0")
value += amount
if value > maximum:
return False
Expand All @@ -58,15 +56,15 @@ def incr(self, key: str, amount: int, maximum: int, ttl: int) -> bool:
pipe.set(key, value, px=ttl)
pipe.execute()
return True
except redis.WatchError:
except WatchError:
continue

def decr(self, key: str, amount: int, minimum: int, ttl: int) -> bool:
with self.client.pipeline() as pipe:
while True:
try:
pipe.watch(key)
value = int(pipe.get(key) or b"0") # type: ignore
value = int(pipe.get(key) or b"0")
value -= amount
if value < minimum:
return False
Expand All @@ -75,7 +73,7 @@ def decr(self, key: str, amount: int, minimum: int, ttl: int) -> bool:
pipe.set(key, value, px=ttl)
pipe.execute()
return True
except redis.WatchError:
except WatchError:
continue

def incr_and_sum(self, key: str, keys: Callable[[], List[str]], amount: int, maximum: int, ttl: int) -> bool:
Expand All @@ -85,20 +83,20 @@ def incr_and_sum(self, key: str, keys: Callable[[], List[str]], amount: int, max
# TODO: Drop non-callable keys in Remoulade v2.
key_list = keys() if callable(keys) else keys
pipe.watch(key, *key_list)
value = int(pipe.get(key) or b"0") # type: ignore
value = int(pipe.get(key) or b"0")
value += amount
if value > maximum:
return False

# Fetch keys again to account for net/server latency.
values = pipe.mget(keys() if callable(keys) else keys)
total = amount + sum(int(n) for n in values if n) # type: ignore
total = amount + sum(int(n) for n in values if n)
if total > maximum:
return False

pipe.multi()
pipe.set(key, value, px=ttl)
pipe.execute()
return True
except redis.WatchError:
except WatchError:
continue
6 changes: 2 additions & 4 deletions remoulade/results/backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import redis

from ...helpers.backoff import BackoffStrategy, compute_backoff
from ...helpers.redis_client import redis_client
from ..backend import BackendResult, ForgottenResult, Missing, ResultBackend, ResultMissing, ResultTimeout


Expand Down Expand Up @@ -59,10 +60,7 @@ def __init__(
super().__init__(namespace=namespace, encoder=encoder, default_timeout=default_timeout)

url = url or os.getenv("REMOULADE_REDIS_URL")
if url:
parameters["connection_pool"] = redis.ConnectionPool.from_url(url)

self.client = client or redis.Redis(**parameters)
self.client = client or redis_client(url=url, **parameters)
self.max_retries = max_retries
self.min_backoff = min_backoff
self.max_backoff = max_backoff
Expand Down
8 changes: 3 additions & 5 deletions remoulade/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

from remoulade import Broker, get_encoder, get_logger

from ..helpers.redis_client import redis_client

DEFAULT_JOB_INTERVAL = 3600 * 24
DEFAULT_JOB_STATUS = True
DEFAULT_TZ = "UTC"
Expand Down Expand Up @@ -147,11 +149,7 @@ def __init__(
self.period = period if period is not None else DEFAULT_SCHEDULER_PERIOD
self.broker = broker
self.lock_key = lock_key if lock_key is not None else DEFAULT_SCHEDULER_LOCK_KEY

if url:
redis_parameters["connection_pool"] = redis.ConnectionPool.from_url(url)

self.client = client or redis.Redis(**redis_parameters)
self.client = client or redis_client(url=url, **redis_parameters)
self.logger = get_logger(__name__, type(self))
self.stopped = True

Expand Down
7 changes: 2 additions & 5 deletions remoulade/state/backends/redis.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import datetime
from typing import List, Optional

import redis

from remoulade.common import chunk

from ...helpers.redis_client import redis_client
from ..backend import State, StateBackend


Expand All @@ -25,9 +24,7 @@ class RedisBackend(StateBackend):

def __init__(self, *, namespace="remoulade-state", encoder=None, client=None, url=None, **parameters):
super().__init__(namespace=namespace, encoder=encoder)
if url:
parameters["connection_pool"] = redis.ConnectionPool.from_url(url)
self.client = client or redis.Redis(**parameters)
self.client = client or redis_client(url=url, **parameters)

def get_state(self, message_id):
key = self._build_message_key(message_id)
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ max-line-length = 120
max-complexity = 20
max-line-length = 120
select = C,E,F,W,B,B9
ignore = E501,E203,W503,B905,B907,B950
ignore = E501,E203,W503,B036,B905,B907,B950
exclude =
tests/mypy/plain_files
build
Expand Down

0 comments on commit 2b82b05

Please sign in to comment.