Skip to content

Commit

Permalink
Process future tasks by eta ordering + use time machine in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
christophbuermann committed Oct 11, 2024
1 parent acecd28 commit d35f4f2
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def _handle_options(self, options):
def tasks_for_processing():
return FutureTask.objects.filter(
eta__lte=timezone.now(), status=FutureTask.FUTURE_TASK_STATUS_OPEN
)
).order_by("eta")

@staticmethod
def _convert_exception_args(args):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

# Development dependencies
coverage>=7.2.3,<7.3
time-machine>=2.16.0,<2.17.0

# Linters and formatters
pre-commit>=3.2.2,<3.3
Expand Down
2 changes: 2 additions & 0 deletions tests/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,14 @@
FUTURE_TASK_TYPE_TWO = "task_two"
FUTURE_TASK_TYPE_ERROR = "task_error"
FUTURE_TASK_TYPE_INTERRUPTION = "task_interruption"
FUTURE_TASK_TYPE_ETA_ORDERING = "task_eta_ordering"

FUTURE_TASK_TYPES = (
(FUTURE_TASK_TYPE_ONE, "Task 1"),
(FUTURE_TASK_TYPE_TWO, "Task 2"),
(FUTURE_TASK_TYPE_ERROR, "Task Error"),
(FUTURE_TASK_TYPE_INTERRUPTION, "Task Interruption"),
(FUTURE_TASK_TYPE_ETA_ORDERING, "Task ETA Ordering"),
)

STATIC_URL = "/static/"
Expand Down
9 changes: 8 additions & 1 deletion tests/testapp/handlers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import time
from sys import intern
from time import monotonic_ns

from django.dispatch import receiver

Expand All @@ -9,7 +10,7 @@

@receiver(future_task_signal, sender=intern(settings.FUTURE_TASK_TYPE_ONE))
def my_task_function1(sender, instance, **kwargs):
time.sleep(0.5)
pass


@receiver(future_task_signal, sender=intern(settings.FUTURE_TASK_TYPE_TWO))
Expand All @@ -25,3 +26,9 @@ def my_task_function_error(sender, instance, **kwargs):
@receiver(future_task_signal, sender=intern(settings.FUTURE_TASK_TYPE_INTERRUPTION))
def my_task_function_interruption(sender, instance, **kwargs):
time.sleep(10)


@receiver(future_task_signal, sender=intern(settings.FUTURE_TASK_TYPE_ETA_ORDERING))
def my_task_function_eta_ordering(sender, instance, **kwargs):
instance.result = monotonic_ns()
instance.save()
214 changes: 120 additions & 94 deletions tests/testapp/tests/test_future_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import signal
import time
from datetime import timedelta
from timeit import default_timer

import time_machine
from django.core.management import call_command
from django.test import TestCase, TransactionTestCase
from django.utils import timezone
Expand All @@ -11,138 +13,162 @@
from tests.core import settings
from tests.testapp.mixins import ProcessTasksCommandMixin

SLEEP_TIME = 2.2

class WaitForTaskStatusTimeout(Exception):
pass

class TestFutureTasks(ProcessTasksCommandMixin, TransactionTestCase):
def setUp(self):
super().setUp()

today = timezone.now()
yesterday = today - timedelta(days=1)
tomorrow = today + timedelta(days=1)

self.task1 = FutureTask.objects.create(
task_id="task1", eta=yesterday, type=settings.FUTURE_TASK_TYPE_ONE
)
def _wait_for_task_status(task, status, tick_seconds=0.1, timeout_seconds=2):
start_time = default_timer()
while task.status != status:
if default_timer() - start_time >= timeout_seconds:
raise WaitForTaskStatusTimeout(
f"Timeout while waiting for task status. Actual: '{task.status}' Expected: '{status}'"
)
task.refresh_from_db()
time.sleep(tick_seconds)

self.task2 = FutureTask.objects.create(
task_id="task2", eta=tomorrow, type=settings.FUTURE_TASK_TYPE_TWO
)

self.task_error = FutureTask.objects.create(
task_id="task_error", eta=yesterday, type=settings.FUTURE_TASK_TYPE_ERROR
class TestProcessFutureTasks(ProcessTasksCommandMixin, TransactionTestCase):
@time_machine.travel("2024-01-01 00:00 +0000", tick=False)
def test_process_future_tasks_eta_now(self):
start_time = default_timer()
task = FutureTask.objects.create(
task_id="task", eta=timezone.now(), type=settings.FUTURE_TASK_TYPE_ONE
)

def test_future_task_process_task(self):
task = FutureTask.objects.get(pk=self.task1.pk)
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

# Make sure that task has been processed.
time.sleep(SLEEP_TIME)

task.refresh_from_db()
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_DONE)
_wait_for_task_status(task, FutureTask.FUTURE_TASK_STATUS_DONE)
end_time = default_timer()
self.assertIsNotNone(task.execution_time)
self.assertGreater(task.execution_time, 0.5)
self.assertLess(task.execution_time, 1)

def test_future_task_no_task_to_process(self):
task = FutureTask.objects.get(pk=self.task2.pk)
self.assertGreater(task.execution_time, 0.0)
self.assertLess(task.execution_time, end_time - start_time)

@time_machine.travel("2024-01-01 00:00 +0000", tick=False)
def test_process_future_tasks_eta_future(self):
task = FutureTask.objects.create(
task_id="task",
eta=timezone.now() + timedelta(microseconds=1),
type=settings.FUTURE_TASK_TYPE_TWO,
)
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

# Make sure that task has been processed.
time.sleep(SLEEP_TIME)

try:
_wait_for_task_status(task, FutureTask.FUTURE_TASK_STATUS_DONE)
except WaitForTaskStatusTimeout:
pass
task.refresh_from_db()
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

def test_future_task_process_error(self):
task = FutureTask.objects.get(pk=self.task_error.pk)
@time_machine.travel("2024-01-01 00:00 +0000", tick=False)
def test_process_future_tasks_error(self):
task = FutureTask.objects.create(
task_id="task", eta=timezone.now(), type=settings.FUTURE_TASK_TYPE_ERROR
)
print(FutureTask.objects.all())
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

# Make sure that task has been processed.
time.sleep(SLEEP_TIME)

task.refresh_from_db()
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_ERROR)
_wait_for_task_status(task, FutureTask.FUTURE_TASK_STATUS_ERROR)
self.assertEqual(task.result["args"], ["task error"])


class TestFutureTaskInterruption(ProcessTasksCommandMixin, TransactionTestCase):
def setUp(self):
super().setUp()

yesterday = timezone.now() - timedelta(days=1)

self.task1 = FutureTask.objects.create(
task_id="task", eta=yesterday, type=settings.FUTURE_TASK_TYPE_INTERRUPTION
@time_machine.travel("2024-01-01 00:00 +0000", tick=True)
def test_process_future_tasks_eta_ordering(self):
_now = timezone.now()
task_late = FutureTask.objects.create(
task_id="task_late",
eta=_now,
type=settings.FUTURE_TASK_TYPE_ETA_ORDERING,
)
task_early = FutureTask.objects.create(
task_id="task_early",
eta=_now - timedelta(microseconds=1),
type=settings.FUTURE_TASK_TYPE_ETA_ORDERING,
)
self.assertEqual(task_late.status, FutureTask.FUTURE_TASK_STATUS_OPEN)
self.assertEqual(task_early.status, FutureTask.FUTURE_TASK_STATUS_OPEN)
_wait_for_task_status(task_late, FutureTask.FUTURE_TASK_STATUS_DONE)
_wait_for_task_status(task_early, FutureTask.FUTURE_TASK_STATUS_DONE)
self.assertGreater(task_late.result, task_early.result)


class TestProcessFutureTasksInterruption(ProcessTasksCommandMixin, TransactionTestCase):
@time_machine.travel("2024-01-01 00:00 +0000", tick=False)
def test_future_task_process_interruption(self):
task = FutureTask.objects.create(
task_id="task",
eta=timezone.now(),
type=settings.FUTURE_TASK_TYPE_INTERRUPTION,
)

def test_future_task_process_task(self):
task = FutureTask.objects.get(pk=self.task1.pk)
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

# Make sure that task is in progression.
time.sleep(SLEEP_TIME)

_wait_for_task_status(task, FutureTask.FUTURE_TASK_STATUS_IN_PROGRESS)
pid = os.getpid()
os.kill(pid, signal.SIGINT)

task.refresh_from_db()
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_INTERRUPTED)


class TestFutureTasksOnetimeRun(TestCase):
def setUp(self):
super().setUp()

today = timezone.now()
yesterday = today - timedelta(days=1)
tomorrow = today + timedelta(days=1)
@time_machine.travel("2024-01-01 00:00 +0000", tick=False)
def test_process_future_tasks_onetimerun_no_task(self):
call_command("process_future_tasks", onetimerun=True)

self.task1 = FutureTask.objects.create(
task_id="onetimerun_task1",
eta=yesterday,
@time_machine.travel("2024-01-01 00:00 +0000", tick=False)
def test_process_future_tasks_onetimerun_eta_now(self):
start_time = default_timer()
task = FutureTask.objects.create(
task_id="task",
eta=timezone.now(),
type=settings.FUTURE_TASK_TYPE_ONE,
)

self.task2 = FutureTask.objects.create(
task_id="onetimerun_task2", eta=tomorrow, type=settings.FUTURE_TASK_TYPE_TWO
)

self.task_error = FutureTask.objects.create(
task_id="onetimerun_task_error",
eta=yesterday,
type=settings.FUTURE_TASK_TYPE_ERROR,
)

def test_future_task_process_task_onetimerun(self):
task = FutureTask.objects.get(pk=self.task1.pk)
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

call_command("process_future_tasks", onetimerun=True)

end_time = default_timer()
task.refresh_from_db()
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_DONE)
self.assertGreater(task.execution_time, 0.5)
self.assertLess(task.execution_time, 1)

def test_future_task_no_task_to_process_onetimerun(self):
task = FutureTask.objects.get(pk=self.task2.pk)
self.assertIsNotNone(task.execution_time)
self.assertGreater(task.execution_time, 0.0)
self.assertLess(task.execution_time, end_time - start_time)

@time_machine.travel("2024-01-01 00:00 +0000", tick=False)
def test_process_future_tasks_onetimerun_eta_future(self):
_now = timezone.now()
task = FutureTask.objects.create(
task_id="task",
eta=_now + timedelta(microseconds=1),
type=settings.FUTURE_TASK_TYPE_TWO,
)
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

call_command("process_future_tasks", onetimerun=True)

task.refresh_from_db()
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

def test_future_task_process_error_onetimerun(self):
task = FutureTask.objects.get(pk=self.task_error.pk)
@time_machine.travel("2024-01-01 00:00 +0000", tick=False)
def test_process_future_tasks_onetimerun_error(self):
_now = timezone.now()
task = FutureTask.objects.create(
task_id="task",
eta=_now,
type=settings.FUTURE_TASK_TYPE_ERROR,
)
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

call_command("process_future_tasks", onetimerun=True)

task.refresh_from_db()
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_ERROR)
self.assertEqual(task.result["args"], ["task error"])

@time_machine.travel("2024-01-01 00:00 +0000", tick=True)
def test_process_future_tasks_onetimerun_eta_ordering(self):
_now = timezone.now()
task_late = FutureTask.objects.create(
task_id="task_late",
eta=_now,
type=settings.FUTURE_TASK_TYPE_ETA_ORDERING,
)
task_early = FutureTask.objects.create(
task_id="task_early",
eta=_now - timedelta(microseconds=1),
type=settings.FUTURE_TASK_TYPE_ETA_ORDERING,
)
self.assertEqual(task_late.status, FutureTask.FUTURE_TASK_STATUS_OPEN)
self.assertEqual(task_early.status, FutureTask.FUTURE_TASK_STATUS_OPEN)
call_command("process_future_tasks", onetimerun=True)
task_late.refresh_from_db()
task_early.refresh_from_db()
self.assertGreater(task_late.result, task_early.result)

0 comments on commit d35f4f2

Please sign in to comment.