Skip to content

Commit

Permalink
Merge pull request #5 from galipnik/SIANXKE-338-status-interrupted
Browse files Browse the repository at this point in the history
SIANXKE-338: Introduce status interrupted
  • Loading branch information
nezhar authored Oct 29, 2023
2 parents fcb9d2d + 045ff44 commit 1ff5935
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 17 deletions.
10 changes: 10 additions & 0 deletions django_future_tasks/management/commands/process_future_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
class Command(BaseCommand):
help = "Process future tasks from database"

current_task_pk = None

def add_arguments(self, parser):
parser.add_argument(
"--onetimerun",
Expand All @@ -29,6 +31,12 @@ def add_arguments(self, parser):
)

def _handle_termination(self, *args, **kwargs):
try:
current_task = FutureTask.objects.get(pk=self.current_task_pk)
current_task.status = FutureTask.FUTURE_TASK_STATUS_INTERRUPTED
current_task.save()
except FutureTask.DoesNotExist:
pass
self._running = False

def _handle_options(self, options):
Expand All @@ -52,6 +60,7 @@ def handle_tick(self):
for task in task_list:
task.status = FutureTask.FUTURE_TASK_STATUS_IN_PROGRESS
task.save()
self.current_task_pk = task.pk
try:
start_time = timeit.default_timer()
future_task_signal.send(sender=intern(task.type), instance=task)
Expand All @@ -68,6 +77,7 @@ def handle_tick(self):
*sys.exc_info(), limit=None, chain=None
),
}
self.current_task_pk = None
task.save()

time.sleep(self.tick)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Generated by Django 3.2.22 on 2023-10-13 13:40

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("django_future_tasks", "0003_futuretask_execution_time"),
]

operations = [
migrations.AlterField(
model_name="futuretask",
name="execution_time",
field=models.FloatField(blank=True, help_text="in seconds", null=True),
),
migrations.AlterField(
model_name="futuretask",
name="status",
field=models.CharField(
choices=[
("open", "Status open"),
("in_progress", "Status in progress"),
("interrupted", "Status interrupted"),
("done", "Status done"),
("error", "Status error"),
],
default="open",
max_length=255,
verbose_name="Status",
),
),
]
2 changes: 2 additions & 0 deletions django_future_tasks/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
class FutureTask(models.Model):
FUTURE_TASK_STATUS_OPEN = "open"
FUTURE_TASK_STATUS_IN_PROGRESS = "in_progress"
FUTURE_TASK_STATUS_INTERRUPTED = "interrupted"
FUTURE_TASK_STATUS_DONE = "done"
FUTURE_TASK_STATUS_ERROR = "error"

FUTURE_TASK_STATUS = (
(FUTURE_TASK_STATUS_OPEN, _("Status open")),
(FUTURE_TASK_STATUS_IN_PROGRESS, _("Status in progress")),
(FUTURE_TASK_STATUS_INTERRUPTED, _("Status interrupted")),
(FUTURE_TASK_STATUS_DONE, _("Status done")),
(FUTURE_TASK_STATUS_ERROR, _("Status error")),
)
Expand Down
2 changes: 2 additions & 0 deletions tests/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,11 @@
FUTURE_TASK_TYPE_ONE = "task_one"
FUTURE_TASK_TYPE_TWO = "task_two"
FUTURE_TASK_TYPE_ERROR = "task_error"
FUTURE_TASK_TYPE_INTERRUPTION = "task_interruption"

FUTURE_TASK_TYPES = (
(FUTURE_TASK_TYPE_ONE, "Task 1"),
(FUTURE_TASK_TYPE_TWO, "Task 2"),
(FUTURE_TASK_TYPE_ERROR, "Task Error"),
(FUTURE_TASK_TYPE_INTERRUPTION, "Task Interruption"),
)
8 changes: 7 additions & 1 deletion tests/testapp/handlers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
from sys import intern

from django.dispatch import receiver
Expand All @@ -8,7 +9,7 @@

@receiver(future_task_signal, sender=intern(settings.FUTURE_TASK_TYPE_ONE))
def my_task_function1(sender, instance, **kwargs):
pass
time.sleep(0.5)


@receiver(future_task_signal, sender=intern(settings.FUTURE_TASK_TYPE_TWO))
Expand All @@ -19,3 +20,8 @@ def my_task_function2(sender, instance, **kwargs):
@receiver(future_task_signal, sender=intern(settings.FUTURE_TASK_TYPE_ERROR))
def my_task_function_error(sender, instance, **kwargs):
raise Exception("task error")


@receiver(future_task_signal, sender=intern(settings.FUTURE_TASK_TYPE_INTERRUPTION))
def my_task_function_interruption(sender, instance, **kwargs):
time.sleep(10)
34 changes: 31 additions & 3 deletions tests/testapp/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
from django.db import connection

from django_future_tasks.management.commands.populate_periodic_future_tasks import (
Command,
Command as PopulatePeriodicTasksCommand,
)
from django_future_tasks.management.commands.process_future_tasks import (
Command as ProcessTasksCommand,
)


Expand All @@ -14,15 +17,40 @@ def run(self):
connection.close()


class PeriodicTaskCommandMixin(object):
class ProcessTasksCommandMixin(object):
@classmethod
def setUpClass(cls):
assert (
not hasattr(cls, "command_instance") or cls.command_instance is None
), "process_future_tasks has already been started"
print("Starting process_future_tasks...")

cls.command_instance = ProcessTasksCommand()
cls.thread = TestThread(target=call_command, args=(cls.command_instance,))
cls.thread.start()
super().setUpClass()

@classmethod
def tearDownClass(cls):
assert (
cls.command_instance is not None
), "process_future_tasks has not been started and can therefore not be stopped"
print("Stopping process_future_tasks...")

super().tearDownClass()
cls.command_instance._handle_termination()
cls.thread.join()


class PopulatePeriodicTaskCommandMixin(object):
@classmethod
def setUpClass(cls):
assert (
not hasattr(cls, "command_instance") or cls.command_instance is None
), "populate_periodic_future_tasks has already been started"
print("Starting populate_periodic_future_tasks...")

cls.command_instance = Command()
cls.command_instance = PopulatePeriodicTasksCommand()
cls.thread = TestThread(target=call_command, args=(cls.command_instance,))
cls.thread.start()
super().setUpClass()
Expand Down
107 changes: 96 additions & 11 deletions tests/testapp/tests/test_future_tasks.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
import os
import signal
import time
from datetime import timedelta
from sys import intern

from django.core.management import call_command
from django.dispatch import receiver
from django.test import TestCase
from django.test import TestCase, TransactionTestCase
from django.utils import timezone

from django_future_tasks.handlers import future_task_signal
from django_future_tasks.models import FutureTask
from tests.core import settings
from tests.testapp.mixins import ProcessTasksCommandMixin

SLEEP_TIME = 2.2

@receiver(future_task_signal, sender=intern(settings.FUTURE_TASK_TYPE_ONE))
def test_function(sender, instance, **kwargs):
time.sleep(0.5)


class TestFutureTasks(TestCase):
class TestFutureTasks(ProcessTasksCommandMixin, TransactionTestCase):
def setUp(self):
super().setUp()

Expand All @@ -40,7 +37,10 @@ def setUp(self):
def test_future_task_process_task(self):
task = FutureTask.objects.get(pk=self.task1.pk)
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)
call_command("process_future_tasks", onetimerun=True)

# Make sure that task has been processed.
time.sleep(SLEEP_TIME)

task.refresh_from_db()
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_DONE)
self.assertIsNotNone(task.execution_time)
Expand All @@ -50,14 +50,99 @@ def test_future_task_process_task(self):
def test_future_task_no_task_to_process(self):
task = FutureTask.objects.get(pk=self.task2.pk)
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)
call_command("process_future_tasks", onetimerun=True)

# Make sure that task has been processed.
time.sleep(SLEEP_TIME)

task.refresh_from_db()
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

def test_future_task_process_error(self):
task = FutureTask.objects.get(pk=self.task_error.pk)
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

# Make sure that task has been processed.
time.sleep(SLEEP_TIME)

task.refresh_from_db()
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_ERROR)
self.assertEqual(task.result["args"], ["task error"])


class TestFutureTaskInterruption(ProcessTasksCommandMixin, TransactionTestCase):
def setUp(self):
super().setUp()

yesterday = timezone.now() - timedelta(days=1)

self.task1 = FutureTask.objects.create(
task_id="task", eta=yesterday, type=settings.FUTURE_TASK_TYPE_INTERRUPTION
)

def test_future_task_process_task(self):
task = FutureTask.objects.get(pk=self.task1.pk)
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

# Make sure that task is in progression.
time.sleep(SLEEP_TIME)

pid = os.getpid()
os.kill(pid, signal.SIGINT)

task.refresh_from_db()
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_INTERRUPTED)


class TestFutureTasksOnetimeRun(TestCase):
def setUp(self):
super().setUp()

today = timezone.now()
yesterday = today - timedelta(days=1)
tomorrow = today + timedelta(days=1)

self.task1 = FutureTask.objects.create(
task_id="onetimerun_task1",
eta=yesterday,
type=settings.FUTURE_TASK_TYPE_ONE,
)

self.task2 = FutureTask.objects.create(
task_id="onetimerun_task2", eta=tomorrow, type=settings.FUTURE_TASK_TYPE_TWO
)

self.task_error = FutureTask.objects.create(
task_id="onetimerun_task_error",
eta=yesterday,
type=settings.FUTURE_TASK_TYPE_ERROR,
)

def test_future_task_process_task_onetimerun(self):
task = FutureTask.objects.get(pk=self.task1.pk)
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

call_command("process_future_tasks", onetimerun=True)

task.refresh_from_db()
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_DONE)
self.assertGreater(task.execution_time, 0.5)
self.assertLess(task.execution_time, 1)

def test_future_task_no_task_to_process_onetimerun(self):
task = FutureTask.objects.get(pk=self.task2.pk)
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

call_command("process_future_tasks", onetimerun=True)

task.refresh_from_db()
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

def test_future_task_process_error_onetimerun(self):
task = FutureTask.objects.get(pk=self.task_error.pk)
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_OPEN)

call_command("process_future_tasks", onetimerun=True)

task.refresh_from_db()
self.assertEqual(task.status, FutureTask.FUTURE_TASK_STATUS_ERROR)
self.assertEqual(task.result["args"], ["task error"])
4 changes: 2 additions & 2 deletions tests/testapp/tests/test_periodic_future_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

from django_future_tasks.models import FutureTask, PeriodicFutureTask
from tests.core import settings
from tests.testapp.mixins import PeriodicTaskCommandMixin
from tests.testapp.mixins import PopulatePeriodicTaskCommandMixin

SLEEP_TIME = 1.1


class TestPeriodicFutureTasks(PeriodicTaskCommandMixin, TransactionTestCase):
class TestPeriodicFutureTasks(PopulatePeriodicTaskCommandMixin, TransactionTestCase):
def setUp(self):
super().setUp()

Expand Down

0 comments on commit 1ff5935

Please sign in to comment.