Skip to content

Commit

Permalink
Fix: chunk MGET and DELETE calls on multiple keys (#160)
Browse files Browse the repository at this point in the history
  • Loading branch information
psrok1 authored May 10, 2022
1 parent 8f3e8e2 commit b3af248
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 10 deletions.
2 changes: 1 addition & 1 deletion karton/core/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "4.4.0"
__version__ = "4.4.1"
24 changes: 16 additions & 8 deletions karton/core/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from urllib3.response import HTTPResponse

from .task import Task, TaskPriority, TaskState
from .utils import chunks

KARTON_TASKS_QUEUE = "karton.tasks"
KARTON_OPERATIONS_QUEUE = "karton.operations"
Expand Down Expand Up @@ -241,32 +242,37 @@ def get_task(self, task_uid: str) -> Optional[Task]:
return None
return Task.unserialize(task_data, backend=self)

def get_tasks(self, task_uid_list: List[str]) -> List[Task]:
def get_tasks(self, task_uid_list: List[str], chunk_size: int = 1000) -> List[Task]:
"""
Get multiple tasks for given identifier list
:param task_uid_list: List of task identifiers
:param chunk_size: Size of chunks passed to the Redis MGET command
:return: List of task objects
"""
task_list = self.redis.mget(
[f"{KARTON_TASK_NAMESPACE}:{task_uid}" for task_uid in task_uid_list]
keys = chunks(
[f"{KARTON_TASK_NAMESPACE}:{task_uid}" for task_uid in task_uid_list],
chunk_size,
)
return [
Task.unserialize(task_data, backend=self)
for task_data in task_list
for chunk in keys
for task_data in self.redis.mget(chunk)
if task_data is not None
]

def get_all_tasks(self) -> List[Task]:
def get_all_tasks(self, chunk_size: int = 1000) -> List[Task]:
"""
Get all tasks registered in Redis
:param chunk_size: Size of chunks passed to the Redis MGET command
:return: List with Task objects
"""
tasks = self.redis.keys(f"{KARTON_TASK_NAMESPACE}:*")
return [
Task.unserialize(task_data)
for task_data in self.redis.mget(tasks)
for chunk in chunks(tasks, chunk_size)
for task_data in self.redis.mget(chunk)
if task_data is not None
]

Expand Down Expand Up @@ -314,14 +320,16 @@ def delete_task(self, task: Task) -> None:
"""
self.redis.delete(f"{KARTON_TASK_NAMESPACE}:{task.uid}")

def delete_tasks(self, tasks: List[Task]) -> None:
def delete_tasks(self, tasks: List[Task], chunk_size: int = 1000) -> None:
"""
Remove multiple tasks from Redis
:param tasks: List of Task objects
:param chunk_size: Size of chunks passed to the Redis DELETE command
"""
keys = [f"{KARTON_TASK_NAMESPACE}:{task.uid}" for task in tasks]
self.redis.delete(*keys)
for chunk in chunks(keys, chunk_size):
self.redis.delete(*chunk)

def get_task_queue(self, queue: str) -> List[Task]:
"""
Expand Down
8 changes: 7 additions & 1 deletion karton/core/utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import inspect
import signal
from typing import Any, Callable
from typing import Any, Callable, Iterator, Sequence, TypeVar

T = TypeVar("T")


def get_function_arg_num(fun: Callable) -> int:
return len(inspect.signature(fun).parameters)


def chunks(seq: Sequence[T], size: int) -> Iterator[Sequence[T]]:
return (seq[pos : pos + size] for pos in range(0, len(seq), size))


class GracefulKiller:
def __init__(self, handle_func: Callable) -> None:
self.handle_func = handle_func
Expand Down

0 comments on commit b3af248

Please sign in to comment.