From c5e1a8ec7b6c81e4c7a760a31c5f79700c8bafed Mon Sep 17 00:00:00 2001 From: Alex Velez Date: Wed, 8 Oct 2025 18:04:46 -0500 Subject: [PATCH 01/13] Add retry_on to task decorator --- kolibri/core/auth/tasks.py | 4 ++++ kolibri/core/tasks/decorators.py | 3 +++ kolibri/core/tasks/job.py | 33 +++++++++++++++++++++++++++++++- kolibri/core/tasks/registry.py | 14 ++++++++++++++ kolibri/core/tasks/utils.py | 17 ++++++++++++++++ 5 files changed, 70 insertions(+), 1 deletion(-) diff --git a/kolibri/core/auth/tasks.py b/kolibri/core/auth/tasks.py index d362a035024..d6d473ef594 100644 --- a/kolibri/core/auth/tasks.py +++ b/kolibri/core/auth/tasks.py @@ -7,6 +7,8 @@ from django.core.management import call_command from django.core.management.base import CommandError from django.utils import timezone +from morango.errors import MorangoError +from requests.exceptions import HTTPError from rest_framework import serializers from rest_framework.exceptions import AuthenticationFailed from rest_framework.exceptions import ValidationError @@ -41,6 +43,7 @@ from kolibri.core.tasks.permissions import IsAdminForJob from kolibri.core.tasks.permissions import IsSuperAdmin from kolibri.core.tasks.permissions import NotProvisioned +from kolibri.core.tasks.utils import DatabaseLockedError from kolibri.core.tasks.utils import get_current_job from kolibri.core.tasks.validation import JobValidator from kolibri.utils.time_utils import naive_utc_datetime @@ -607,6 +610,7 @@ def validate(self, data): permission_classes=[IsSuperAdmin() | NotProvisioned()], status_fn=status_fn, long_running=True, + retry_on=[DatabaseLockedError, MorangoError, HTTPError], ) def peeruserimport(command, **kwargs): call_command(command, **kwargs) diff --git a/kolibri/core/tasks/decorators.py b/kolibri/core/tasks/decorators.py index 74c115debad..4e4770666b3 100644 --- a/kolibri/core/tasks/decorators.py +++ b/kolibri/core/tasks/decorators.py @@ -17,6 +17,7 @@ def register_task( permission_classes=None, long_running=False, status_fn=None, + retry_on=None, ): """ Registers the decorated function as task. @@ -36,6 +37,7 @@ def register_task( permission_classes=permission_classes, long_running=long_running, status_fn=status_fn, + retry_on=retry_on, ) return RegisteredTask( @@ -49,4 +51,5 @@ def register_task( permission_classes=permission_classes, long_running=long_running, status_fn=status_fn, + retry_on=retry_on, ) diff --git a/kolibri/core/tasks/job.py b/kolibri/core/tasks/job.py index 6147266d466..8766c550c17 100644 --- a/kolibri/core/tasks/job.py +++ b/kolibri/core/tasks/job.py @@ -4,6 +4,7 @@ import traceback import uuid from collections import namedtuple +from datetime import timedelta from kolibri.core.tasks.constants import ( # noqa F401 - imported for backwards compatibility Priority, @@ -101,6 +102,13 @@ def default_status_text(job): ALLOWED_RETRY_IN_KWARGS = {"priority", "repeat", "interval", "retry_interval"} +RETRY_ON_DELAY = timedelta( + seconds=5 +) # Delay before retrying a job that failed due to a retryable exception +MAX_RETRIES = ( + 3 # Maximum number of retries for a job that failed due to a retryable exception +) + class Job(object): """ @@ -129,6 +137,7 @@ class Job(object): "facility_id", "func", "long_running", + "retry_on", } def to_json(self): @@ -169,6 +178,7 @@ def from_job(cls, job, **kwargs): kwargs["track_progress"] = job.track_progress kwargs["cancellable"] = job.cancellable kwargs["long_running"] = job.long_running + kwargs["retry_on"] = job.retry_on.copy() kwargs["extra_metadata"] = job.extra_metadata.copy() kwargs["facility_id"] = job.facility_id return cls(job.func, **kwargs) @@ -190,6 +200,7 @@ def __init__( total_progress=0, result=None, long_running=False, + retry_on=None, ): """ Create a new Job that will run func given the arguments passed to Job(). If the track_progress keyword parameter @@ -231,6 +242,7 @@ def __init__( self.kwargs = kwargs or {} self._storage = None self.func = callable_to_import_path(func) + self.retry_on = [callable_to_import_path(exc) for exc in (retry_on or [])] def _check_storage_attached(self): if self._storage is None: @@ -362,6 +374,8 @@ def execute(self): args, kwargs = copy.copy(self.args), copy.copy(self.kwargs) + should_retry = False + try: # First check whether the job has been cancelled self.check_for_cancel() @@ -370,6 +384,7 @@ def execute(self): except UserCancelledError: self.storage.mark_job_as_canceled(self.job_id) except Exception as e: + should_retry = self.should_retry(e) # If any error occurs, mark the job as failed and save the exception traceback_str = traceback.format_exc() e.traceback = traceback_str @@ -379,10 +394,26 @@ def execute(self): self.storage.mark_job_as_failed(self.job_id, e, traceback_str) self.storage.reschedule_finished_job_if_needed( - self.job_id, delay=self._retry_in_delay, **self._retry_in_kwargs + self.job_id, + delay=RETRY_ON_DELAY if should_retry else self._retry_in_delay, + **self._retry_in_kwargs, ) setattr(current_state_tracker, "job", None) + def should_retry(self, exception): + retries = self.extra_metadata.get("retries", 0) + 1 + self.extra_metadata["retries"] = retries + self.save_meta() + if retries > MAX_RETRIES: + return False + + for retry_exception in self.retry_on: + exc = import_path_to_callable(retry_exception) + if isinstance(exception, exc): + logger.info(f"Retrying job {self.job_id} due to exception {exception}") + return True + return False + @property def task(self): """ diff --git a/kolibri/core/tasks/registry.py b/kolibri/core/tasks/registry.py index 7c2161309e0..1170ec61bdf 100644 --- a/kolibri/core/tasks/registry.py +++ b/kolibri/core/tasks/registry.py @@ -159,6 +159,7 @@ def __init__( # noqa: C901 permission_classes=None, long_running=False, status_fn=None, + retry_on=None, ): """ :param func: Function to be wrapped as a Registered task @@ -229,6 +230,7 @@ def __init__( # noqa: C901 self.track_progress = track_progress self.long_running = long_running self._status_fn = status_fn + self.retry_on = self._validate_retry_on(retry_on) # Make this wrapper object look seamlessly like the wrapped function update_wrapper(self, func) @@ -258,6 +260,17 @@ def _validate_permissions_classes(self, permission_classes): else: yield permission_class + def _validate_retry_on(self, retry_on): + if retry_on is None: + return [] + + if not isinstance(retry_on, list): + raise TypeError("retry_on must be a list of exceptions") + for item in retry_on: + if not issubclass(item, Exception): + raise TypeError("Each item in retry_on must be an Exception subclass") + return retry_on + def check_job_permissions(self, user, job, view): for permission in self.permissions: if not permission.has_permission(user, job, view): @@ -395,6 +408,7 @@ def _ready_job(self, **job_kwargs): cancellable=job_kwargs.pop("cancellable", self.cancellable), track_progress=job_kwargs.pop("track_progress", self.track_progress), long_running=job_kwargs.pop("long_running", self.long_running), + retry_on=self.retry_on, **job_kwargs ) return job_obj diff --git a/kolibri/core/tasks/utils.py b/kolibri/core/tasks/utils.py index f2adab6084e..791c809348d 100644 --- a/kolibri/core/tasks/utils.py +++ b/kolibri/core/tasks/utils.py @@ -7,6 +7,7 @@ from threading import Thread import click +from django.db.utils import OperationalError from django.utils.functional import SimpleLazyObject from django.utils.module_loading import import_string from sqlalchemy import create_engine @@ -383,3 +384,19 @@ def fd_safe_executor(fds_per_task=2): ) return executor(max_workers=max_workers) + + +class DatabaseLockedError(OperationalError): + """ + Custom exception that is only raised when the underlying error + is an OperationalError whose message contains 'database is locked'. + """ + + def __init__(self, *args, **kwargs): + error_message = str(args[0]) if args else "" + + if "database is locked" not in error_message.lower(): + # If the condition is not met, re-raise the original error. + raise OperationalError(*args, **kwargs) + + super().__init__(*args, **kwargs) From e6879c613f0cb366c7d172cc21e601b59793adeb Mon Sep 17 00:00:00 2001 From: Alex Velez Date: Wed, 8 Oct 2025 18:07:04 -0500 Subject: [PATCH 02/13] Handle user import errors in setup wizard plugin --- .../assets/src/views/ImportMultipleUsers.vue | 54 +++++++++++++++---- .../assets/src/views/OnboardingStepBase.vue | 5 ++ 2 files changed, 50 insertions(+), 9 deletions(-) diff --git a/kolibri/plugins/setup_wizard/assets/src/views/ImportMultipleUsers.vue b/kolibri/plugins/setup_wizard/assets/src/views/ImportMultipleUsers.vue index b63544532e8..cbe1b4c4aed 100644 --- a/kolibri/plugins/setup_wizard/assets/src/views/ImportMultipleUsers.vue +++ b/kolibri/plugins/setup_wizard/assets/src/views/ImportMultipleUsers.vue @@ -5,6 +5,7 @@ :step="step" :steps="steps" :showBackArrow="true" + :backArrowDisabled="learnersBeingImported.length > 0" :eventOnGoBack="backArrowEvent" :title="selectAUser$()" :description="facilityDescription" @@ -52,6 +53,7 @@ + @@ -65,8 +67,10 @@ import PaginatedListContainer from 'kolibri-common/components/PaginatedListContainer'; import { lodUsersManagementStrings } from 'kolibri-common/strings/lodUsersManagementStrings'; import { DemographicConstants } from 'kolibri/constants'; - import { TaskStatuses } from 'kolibri-common/utils/syncTaskUtils'; + import { TaskStatuses, TaskTypes } from 'kolibri-common/utils/syncTaskUtils'; import UserTable from 'kolibri-common/components/UserTable'; + import useSnackbar from 'kolibri/composables/useSnackbar'; + import GlobalSnackbar from 'kolibri/components/GlobalSnackbar'; import { FooterMessageTypes, SoudQueue } from '../constants'; import OnboardingStepBase from './OnboardingStepBase'; @@ -85,14 +89,19 @@ OnboardingStepBase, PaginatedListContainer, UserTable, + GlobalSnackbar, }, mixins: [commonCoreStrings, commonSyncElements], setup() { - const { selectAUser$, importedLabel$ } = lodUsersManagementStrings; + const { selectAUser$, importedLabel$, importUserError$ } = lodUsersManagementStrings; + const { createSnackbar } = useSnackbar(); return { + createSnackbar, + selectAUser$, importedLabel$, + importUserError$, }; }, data() { @@ -151,6 +160,9 @@ beforeMount() { this.isPolling = true; this.pollImportTask(); + this.learnersBeingImported = this.wizardService.state.context.usersBeingImported.map( + u => u.id, + ); }, methods: { importedLearners() { @@ -159,14 +171,20 @@ pollImportTask() { TaskResource.list({ queue: SoudQueue }).then(tasks => { if (tasks.length) { + let isFailingTasks = false; tasks.forEach(task => { - if (task.status === TaskStatuses.COMPLETED) { - // Remove completed user id from 'being imported' + if ([TaskStatuses.COMPLETED, TaskStatuses.FAILED].includes(task.status)) { + // Remove completed/failed user id from 'being imported' const taskUserId = task.extra_metadata.user_id; this.learnersBeingImported = this.learnersBeingImported.filter( id => id != taskUserId, ); - + this.wizardService.send({ + type: 'REMOVE_USER_BEING_IMPORTED', + value: taskUserId, + }); + } + if (task.status === TaskStatuses.COMPLETED) { // Update the wizard context to know this user has been imported - only if they // haven't already been added to the list (ie, imported by other means) const taskUsername = task.extra_metadata.username; @@ -186,8 +204,14 @@ value: taskUsername, }); } + } else if (task.status === TaskStatuses.FAILED) { + isFailingTasks = true; } }); + if (isFailingTasks) { + this.createSnackbar(this.importUserError$()); + TaskResource.clearAll(SoudQueue); + } } }); if (this.isPolling) { @@ -196,11 +220,11 @@ }, 2000); } }, - startImport(learner) { + async startImport(learner) { // Push the learner into being imported, we'll remove it if we get an error later on this.learnersBeingImported.push(learner.id); - const task_name = 'kolibri.core.auth.tasks.peeruserimport'; + const task_name = TaskTypes.IMPORTLODUSER; const params = { type: task_name, ...this.wizardService.state.context.remoteAdmin, @@ -216,9 +240,21 @@ value: { username: learner.username, password: DemographicConstants.NOT_SPECIFIED }, }); } - TaskResource.startTask(params).catch(() => { + try { + const newTask = await TaskResource.startTask(params); + this.wizardService.send({ + type: 'ADD_USER_BEING_IMPORTED', + value: { + id: learner.id, + full_name: learner.full_name, + username: learner.username, + taskId: newTask.id, + }, + }); + } catch (error) { + this.createSnackbar(this.importUserError$()); this.learnersBeingImported = this.learnersBeingImported.filter(id => id != learner.id); - }); + } }, isImported(learner) { return this.importedLearners().find(u => u === learner.username); diff --git a/kolibri/plugins/setup_wizard/assets/src/views/OnboardingStepBase.vue b/kolibri/plugins/setup_wizard/assets/src/views/OnboardingStepBase.vue index 1b27c5a2ba9..79cc7c8bf87 100644 --- a/kolibri/plugins/setup_wizard/assets/src/views/OnboardingStepBase.vue +++ b/kolibri/plugins/setup_wizard/assets/src/views/OnboardingStepBase.vue @@ -67,6 +67,7 @@ v-if="showBackArrow" icon="back" style="margin-left: -12px" + :disabled="backArrowDisabled" @click="wizardService.send(eventOnGoBack)" /> @@ -188,6 +189,10 @@ type: Boolean, default: false, }, + backArrowDisabled: { + type: Boolean, + default: false, + }, noBackAction: { type: Boolean, default: false, From eb53ac5f36aed5d4c9ed81b272c8f2bb058346c9 Mon Sep 17 00:00:00 2001 From: Alex Velez Date: Wed, 8 Oct 2025 18:59:57 -0500 Subject: [PATCH 03/13] Add tests --- kolibri/core/tasks/test/test_decorators.py | 2 + kolibri/core/tasks/test/test_job.py | 71 ++++++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/kolibri/core/tasks/test/test_decorators.py b/kolibri/core/tasks/test/test_decorators.py index 17105a7bafe..ac9b2e590ba 100644 --- a/kolibri/core/tasks/test/test_decorators.py +++ b/kolibri/core/tasks/test/test_decorators.py @@ -26,6 +26,7 @@ def add(x, y): cancellable=True, track_progress=True, status_fn=status_fn, + retry_on=[], )(add) MockRegisteredTask.assert_called_once_with( @@ -39,6 +40,7 @@ def add(x, y): track_progress=True, long_running=False, status_fn=status_fn, + retry_on=[], ) def test_register_decorator_registers_without_args(self): diff --git a/kolibri/core/tasks/test/test_job.py b/kolibri/core/tasks/test/test_job.py index 91051a8113d..848f5a7ad0e 100644 --- a/kolibri/core/tasks/test/test_job.py +++ b/kolibri/core/tasks/test/test_job.py @@ -3,10 +3,12 @@ import mock from django.test.testcases import TestCase +from requests.exceptions import HTTPError from kolibri.core.tasks.constants import Priority from kolibri.core.tasks.exceptions import JobNotRunning from kolibri.core.tasks.job import Job +from kolibri.core.tasks.job import MAX_RETRIES from kolibri.core.tasks.permissions import IsSuperAdmin from kolibri.core.tasks.registry import RegisteredTask from kolibri.core.tasks.utils import current_state_tracker @@ -17,6 +19,10 @@ def status_fn(job): pass +def fn_with_http_error(): + raise HTTPError("Test exception") + + class JobTest(TestCase): def setUp(self): self.job = Job(id, track_progress=True) @@ -181,6 +187,70 @@ def test_job_retry_in_all_allowable_values(self): except Exception: setattr(current_state_tracker, "job", None) + def test_job_retry_on_matching_exception(self): + # The task raises an HTTPError, which is in the retry_on list, so it should be rescheduled + job = Job(fn_with_http_error, retry_on=[HTTPError]) + job.storage = mock.MagicMock() + setattr(current_state_tracker, "job", job) + + job.execute() + + job.storage.reschedule_finished_job_if_needed.assert_called_once() + + # If delay was set to the reschedule call, it means it will be retried + args, kwargs = job.storage.reschedule_finished_job_if_needed.call_args + self.assertEqual(args[0], job.job_id) + self.assertEqual(job.extra_metadata.get("retries"), 1) + self.assertIsNotNone(kwargs.get("delay")) + + setattr(current_state_tracker, "job", None) + + def test_job_retry_on_max_retries_not_exceeded(self): + job = Job(fn_with_http_error, retry_on=[HTTPError]) + job.storage = mock.MagicMock() + job.extra_metadata["retries"] = MAX_RETRIES - 1 # Still allowed to retry + setattr(current_state_tracker, "job", job) + + job.execute() + + job.storage.reschedule_finished_job_if_needed.assert_called_once() + args, kwargs = job.storage.reschedule_finished_job_if_needed.call_args + self.assertEqual(args[0], job.job_id) + self.assertEqual(job.extra_metadata.get("retries"), MAX_RETRIES) + self.assertIsNotNone(kwargs.get("delay")) + + setattr(current_state_tracker, "job", None) + + def test_job_retry_on_max_retries_exceeded(self): + job = Job(fn_with_http_error, retry_on=[HTTPError]) + job.storage = mock.MagicMock() + job.extra_metadata["retries"] = MAX_RETRIES # Already reached max retries + setattr(current_state_tracker, "job", job) + + job.execute() + + job.storage.reschedule_finished_job_if_needed.assert_called_once() + args, kwargs = job.storage.reschedule_finished_job_if_needed.call_args + self.assertEqual(args[0], job.job_id) + self.assertIsNone(kwargs.get("delay")) # No delay means no retry + + setattr(current_state_tracker, "job", None) + + def test_job_retry_on_non_matching_exception(self): + # The task raises an HTTPError, which is not in the retry_on list, so it should not be rescheduled + job = Job(fn_with_http_error, retry_on=[ValueError]) # Different exception type + job.storage = mock.MagicMock() + setattr(current_state_tracker, "job", job) + + job.execute() + + job.storage.reschedule_finished_job_if_needed.assert_called_once() + args, kwargs = job.storage.reschedule_finished_job_if_needed.call_args + self.assertEqual(args[0], job.job_id) + self.assertIsNone(kwargs.get("delay")) # No delay means no retry + + setattr(current_state_tracker, "job", None) + # Test generated by Claude 3.7 Sonnet and tweaked def test_job_update_progress_throttles_small_updates(self): @@ -402,6 +472,7 @@ def test__ready_job(self, MockJob): track_progress=True, long_running=True, kwargs=dict(base=10), # kwarg that was passed to _ready_job() + retry_on=[], ) # Do we return the job object? From 57ba0d32e878bdccbeaf20506cf45fc0cfb5f744 Mon Sep 17 00:00:00 2001 From: Alex Velez Date: Wed, 15 Oct 2025 19:36:59 -0500 Subject: [PATCH 04/13] Migrations for sqlalchemy job --- kolibri/core/tasks/job.py | 4 +- kolibri/core/tasks/migrations/0001_initial.py | 53 ++++++++ .../migrations/0002_add_retries_fields.py | 27 ++++ kolibri/core/tasks/migrations/__init__.py | 0 kolibri/core/tasks/models.py | 116 ++++++++++++++++++ kolibri/core/tasks/operations.py | 42 +++++++ kolibri/core/tasks/schema_utils.py | 60 +++++++++ kolibri/core/tasks/storage.py | 27 ++++ kolibri/core/tasks/validation.py | 5 + kolibri/deployment/default/settings/base.py | 12 ++ kolibri/deployment/default/sqlite_db_names.py | 2 + kolibri/utils/main.py | 16 ++- kolibri/utils/sanity_checks.py | 20 ++- 13 files changed, 375 insertions(+), 9 deletions(-) create mode 100644 kolibri/core/tasks/migrations/0001_initial.py create mode 100644 kolibri/core/tasks/migrations/0002_add_retries_fields.py create mode 100644 kolibri/core/tasks/migrations/__init__.py create mode 100644 kolibri/core/tasks/models.py create mode 100644 kolibri/core/tasks/operations.py create mode 100644 kolibri/core/tasks/schema_utils.py diff --git a/kolibri/core/tasks/job.py b/kolibri/core/tasks/job.py index 8766c550c17..bd2330c1a22 100644 --- a/kolibri/core/tasks/job.py +++ b/kolibri/core/tasks/job.py @@ -375,6 +375,7 @@ def execute(self): args, kwargs = copy.copy(self.args), copy.copy(self.kwargs) should_retry = False + exception = None try: # First check whether the job has been cancelled @@ -384,6 +385,7 @@ def execute(self): except UserCancelledError: self.storage.mark_job_as_canceled(self.job_id) except Exception as e: + exception = e should_retry = self.should_retry(e) # If any error occurs, mark the job as failed and save the exception traceback_str = traceback.format_exc() @@ -392,10 +394,10 @@ def execute(self): "Job {} raised an exception: {}".format(self.job_id, traceback_str) ) self.storage.mark_job_as_failed(self.job_id, e, traceback_str) - self.storage.reschedule_finished_job_if_needed( self.job_id, delay=RETRY_ON_DELAY if should_retry else self._retry_in_delay, + exception=exception, **self._retry_in_kwargs, ) setattr(current_state_tracker, "job", None) diff --git a/kolibri/core/tasks/migrations/0001_initial.py b/kolibri/core/tasks/migrations/0001_initial.py new file mode 100644 index 00000000000..bdf8521fda7 --- /dev/null +++ b/kolibri/core/tasks/migrations/0001_initial.py @@ -0,0 +1,53 @@ +# Initial migration for the jobs table. This migration should be +# skipped if the table was created by SQLAlchemy. +from django.db import migrations +from django.db import models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [] + + operations = [ + migrations.CreateModel( + name="Job", + fields=[ + ("id", models.CharField(max_length=36, primary_key=True)), + ("state", models.CharField(max_length=20, db_index=True)), + ("func", models.CharField(max_length=200, db_index=True)), + ("priority", models.IntegerField(db_index=True)), + ("queue", models.CharField(max_length=50, db_index=True)), + ("saved_job", models.TextField()), + ("time_created", models.DateTimeField(null=True, blank=True)), + ("time_updated", models.DateTimeField(null=True, blank=True)), + ("interval", models.IntegerField(default=0)), + ("retry_interval", models.IntegerField(null=True, blank=True)), + ("repeat", models.IntegerField(null=True, blank=True)), + ("scheduled_time", models.DateTimeField(null=True, blank=True)), + ( + "worker_host", + models.CharField(max_length=100, null=True, blank=True), + ), + ( + "worker_process", + models.CharField(max_length=50, null=True, blank=True), + ), + ( + "worker_thread", + models.CharField(max_length=50, null=True, blank=True), + ), + ("worker_extra", models.TextField(null=True, blank=True)), + ], + options={ + "db_table": "jobs", + }, + ), + migrations.AddIndex( + model_name="job", + index=models.Index( + fields=["queue", "scheduled_time"], name="queue__scheduled_time" + ), + ), + ] diff --git a/kolibri/core/tasks/migrations/0002_add_retries_fields.py b/kolibri/core/tasks/migrations/0002_add_retries_fields.py new file mode 100644 index 00000000000..bb7159bb670 --- /dev/null +++ b/kolibri/core/tasks/migrations/0002_add_retries_fields.py @@ -0,0 +1,27 @@ +# Generated by Django 3.2.25 on 2025-10-15 21:14 +# Modified to use AddColumnIfNotExists operation to prevent errors if columns were +# added by SQLAlchemy. +from django.db import migrations +from django.db import models + +from kolibri.core.tasks.operations import AddFieldIfNotExists + + +class Migration(migrations.Migration): + + dependencies = [ + ("kolibritasks", "0001_initial"), + ] + + operations = [ + AddFieldIfNotExists( + model_name="job", + name="max_retries", + field=models.IntegerField(blank=True, null=True), + ), + AddFieldIfNotExists( + model_name="job", + name="retries", + field=models.IntegerField(blank=True, null=True), + ), + ] diff --git a/kolibri/core/tasks/migrations/__init__.py b/kolibri/core/tasks/migrations/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/kolibri/core/tasks/models.py b/kolibri/core/tasks/models.py new file mode 100644 index 00000000000..62aa3ae1514 --- /dev/null +++ b/kolibri/core/tasks/models.py @@ -0,0 +1,116 @@ +from django.db import models + +from kolibri.deployment.default.sqlite_db_names import JOB_STORAGE + + +class KolibriTasksRouter(object): + """ + Determine how to route database calls for the kolibritasks app. + All other models will be routed to the default database. + """ + + def db_for_read(self, model, **hints): + """Send all read operations on kolibritasks app models to JOB_STORAGE.""" + if model._meta.app_label == "kolibritasks": + return JOB_STORAGE + return None + + def db_for_write(self, model, **hints): + """Send all write operations on kolibritasks app models to JOB_STORAGE.""" + if model._meta.app_label == "kolibritasks": + return JOB_STORAGE + return None + + def allow_relation(self, obj1, obj2, **hints): + """Determine if relationship is allowed between two objects.""" + + # Allow any relation between two models that are both in the Notifications app. + if ( + obj1._meta.app_label == "kolibritasks" + and obj2._meta.app_label == "kolibritasks" + ): + return True + # No opinion if neither object is in the Notifications app (defer to default or other routers). + elif "kolibritasks" not in [obj1._meta.app_label, obj2._meta.app_label]: + return None + + # Block relationship if one object is in the Notifications app and the other isn't. + return False + + def allow_migrate(self, db, app_label, model_name=None, **hints): + """Ensure that the Notifications app's models get created on the right database.""" + if app_label == "kolibritasks": + # The kolibritasks app should be migrated only on the JOB_STORAGE database. + return db == JOB_STORAGE + elif db == JOB_STORAGE: + # Ensure that all other apps don't get migrated on the JOB_STORAGE database. + return False + + # No opinion for all other scenarios + return None + + +class Job(models.Model): + """ + Django model corresponding to the 'jobs' table in SQLAlchemy. + + This model is not meant to be used for normal CRUD operations. + It exists solely for Django to manage the migrations + of the 'jobs' table, which is handled by SQLAlchemy. + """ + + # The hex UUID given to the job upon first creation. + id = models.CharField(max_length=36, primary_key=True) + + # The job's state. Inflated here for easier querying to the job's state. + state = models.CharField(max_length=20, db_index=True) + + # The job's function string. Inflated here for easier querying of which task type it is. + func = models.CharField(max_length=200, db_index=True) + + # The job's priority. Helps to decide which job to run next. + priority = models.IntegerField(db_index=True) + + # The queue name passed to the client when the job is scheduled. + queue = models.CharField(max_length=50, db_index=True) + + # The JSON string that represents the job + saved_job = models.TextField() + + time_created = models.DateTimeField(null=True, blank=True) + time_updated = models.DateTimeField(null=True, blank=True) + + # Repeat interval in seconds. + interval = models.IntegerField(default=0) + + # Retry interval in seconds. + retry_interval = models.IntegerField(null=True, blank=True) + + # Number of times to repeat - None means repeat forever. + repeat = models.IntegerField(null=True, blank=True) + + scheduled_time = models.DateTimeField(null=True, blank=True) + + # Optional references to the worker host, process and thread that are running this job, + # and any extra metadata that can be used by specific worker implementations. + worker_host = models.CharField(max_length=100, null=True, blank=True) + worker_process = models.CharField(max_length=50, null=True, blank=True) + worker_thread = models.CharField(max_length=50, null=True, blank=True) + worker_extra = models.TextField(null=True, blank=True) + + # Columns for retry logic + # Number of times the job has been retried + retries = models.IntegerField(null=True, blank=True) + # Maximum number of retries allowed for the job + max_retries = models.IntegerField(null=True, blank=True) + + class Meta: + db_table = "jobs" # Usar la misma tabla que SQLAlchemy + indexes = [ + models.Index( + fields=["queue", "scheduled_time"], name="queue__scheduled_time" + ), + ] + + def __str__(self): + return f"Job {self.id} - {self.func} ({self.state})" diff --git a/kolibri/core/tasks/operations.py b/kolibri/core/tasks/operations.py new file mode 100644 index 00000000000..c32c51680e5 --- /dev/null +++ b/kolibri/core/tasks/operations.py @@ -0,0 +1,42 @@ +import logging + +from django.db import migrations + +logger = logging.getLogger(__name__) + + +class AddFieldIfNotExists(migrations.AddField): + """ + Migration operation that adds a field to a model if it does not already exist in the database. + This is useful for ensuring compatibility between SQLAlchemy and Django migrations, + preventing errors when the field has already been added by SQLAlchemy. + """ + + def database_forwards(self, app_label, schema_editor, from_state, to_state): + """ + Executes the real logic in the database. + """ + to_model = to_state.apps.get_model(app_label, self.model_name) + + from_model = from_state.apps.get_model(app_label, self.model_name) + + table_name = from_model._meta.db_table + field = to_model._meta.get_field(self.name) + column_name = field.column + + with schema_editor.connection.cursor() as cursor: + columns = [ + col.name + for col in schema_editor.connection.introspection.get_table_description( + cursor, table_name + ) + ] + + if column_name not in columns: + super().database_forwards(app_label, schema_editor, from_state, to_state) + logger.info(f"Column '{field.column}' did not exist. Creating it...") + else: + logger.info(f"Column '{field.column}' already exists. Skipping creation.") + + def describe(self): + return f"Adds the field {self.name} to {self.model_name} if it does not exist." diff --git a/kolibri/core/tasks/schema_utils.py b/kolibri/core/tasks/schema_utils.py new file mode 100644 index 00000000000..dc3d4775a64 --- /dev/null +++ b/kolibri/core/tasks/schema_utils.py @@ -0,0 +1,60 @@ +""" +Utilidades para sincronizar cambios de esquema entre SQLAlchemy y Django. + +Este módulo contiene funciones helper para aplicar cambios de esquema +a las tablas de jobs usando tanto SQLAlchemy como migraciones Django. +""" +import logging + +from django.core.management import call_command + +from kolibri.deployment.default.sqlite_db_names import JOB_STORAGE +from kolibri.utils import conf + + +logger = logging.getLogger(__name__) + + +def _get_migration_database(): + database_engine = conf.OPTIONS["Database"]["DATABASE_ENGINE"] + + if database_engine == "sqlite": + return JOB_STORAGE + else: + return None + + +def sync_initial_migration_state(): + """ + Syncs the initial migration state without executing SQL. + This is useful to prevent the initial table creation migration if a jobs + table was already created via SQLAlchemy. + """ + try: + call_command( + "migrate", + "kolibritasks", + "0001", + "--fake", + database=_get_migration_database(), + ) + except Exception as e: + logger.error(f"Error syncing initial migration state: {e}") + raise + + +def sync_django_migration_state(): + """ + Syncs the Django migration state without executing SQL. + This is useful to mark all migrations as applied if the tables + were already created via SQLAlchemy. + """ + try: + # Ejecutar fake migrate para marcar como aplicada + call_command( + "migrate", "kolibritasks", "--fake", database=_get_migration_database() + ) + + except Exception as e: + logger.error(f"Error syncing Django migration state: {e}") + raise diff --git a/kolibri/core/tasks/storage.py b/kolibri/core/tasks/storage.py index 96328ec5854..4d4aebe49c7 100644 --- a/kolibri/core/tasks/storage.py +++ b/kolibri/core/tasks/storage.py @@ -27,6 +27,7 @@ from kolibri.core.tasks.hooks import StorageHook from kolibri.core.tasks.job import Job from kolibri.core.tasks.job import State +from kolibri.core.tasks.validation import validate_exception from kolibri.core.tasks.validation import validate_interval from kolibri.core.tasks.validation import validate_priority from kolibri.core.tasks.validation import validate_repeat @@ -45,6 +46,10 @@ class ORMJob(Base): The DB representation of a common.classes.Job object, storing the relevant details needed by the job storage backend. + + Migrations are carried out using Django's migration framework, so + any changes to this model should also be done in the django model + in kolibri.core.tasks.models.Job """ __tablename__ = "jobs" @@ -88,6 +93,12 @@ class ORMJob(Base): worker_thread = Column(String, nullable=True) worker_extra = Column(String, nullable=True) + # Columns for retry logic + # Number of times the job has been retried + retries = Column(Integer, nullable=True) + # Maximum number of retries allowed for the job + max_retries = Column(Integer, nullable=True) + __table_args__ = (Index("queue__scheduled_time", "queue", "scheduled_time"),) @@ -137,9 +148,18 @@ def __contains__(self, item): is not None ) + @staticmethod + def create_default_tables(engine): + """ + Creates the default tables for the job storage backend. If the tables + already exist, this does nothing. + """ + Base.metadata.create_all(engine) + @staticmethod def recreate_default_tables(engine): """ + @deprecated Recreates the default tables for the job storage backend. """ Base.metadata.drop_all(engine) @@ -587,6 +607,7 @@ def reschedule_finished_job_if_needed( # noqa: C901 interval=None, repeat=NO_VALUE, retry_interval=NO_VALUE, + exception=None, ): """ Because repeat and retry_interval are nullable, None is a semantic value, so we need to use a sentinel value NO_VALUE @@ -609,6 +630,9 @@ def reschedule_finished_job_if_needed( # noqa: C901 if delay is not None: validate_timedelay(delay) + if exception is not None: + validate_exception(exception) + orm_job = self.get_orm_job(job_id) # Only allow this function to be run on a job that is in a finished state. @@ -628,6 +652,9 @@ def reschedule_finished_job_if_needed( # noqa: C901 else orm_job.retry_interval, ) + if exception is not None: + # TODO: Implement retry on exception logic. + pass # Set a null new_scheduled_time so that we finish processing if none of the cases below pertain. new_scheduled_time = None if delay is not None: diff --git a/kolibri/core/tasks/validation.py b/kolibri/core/tasks/validation.py index b914c3d7f23..dae5f78d126 100644 --- a/kolibri/core/tasks/validation.py +++ b/kolibri/core/tasks/validation.py @@ -33,6 +33,11 @@ def validate_timedelay(value): raise TypeError("time delay must be a datetime.timedelta object") +def validate_exception(value): + if not isinstance(value, BaseException): + raise TypeError("exception must be an error object") + + class EnqueueArgsSerializer(serializers.Serializer): """ A serializer for `enqueue_args` object of incoming user request data. diff --git a/kolibri/deployment/default/settings/base.py b/kolibri/deployment/default/settings/base.py index 57b142380d4..bd62f914bb4 100644 --- a/kolibri/deployment/default/settings/base.py +++ b/kolibri/deployment/default/settings/base.py @@ -21,6 +21,7 @@ import kolibri from kolibri.deployment.default.cache import CACHES from kolibri.deployment.default.sqlite_db_names import ADDITIONAL_SQLITE_DATABASES +from kolibri.deployment.default.sqlite_db_names import JOB_STORAGE from kolibri.plugins.utils.settings import apply_settings from kolibri.utils import conf from kolibri.utils import i18n @@ -140,6 +141,11 @@ # https://docs.djangoproject.com/en/3.2/ref/settings/#databases if conf.OPTIONS["Database"]["DATABASE_ENGINE"] == "sqlite": + job_storage_path = conf.OPTIONS["Tasks"]["JOB_STORAGE_FILEPATH"] + # if job_storage_path is relative, make it relative to KOLIBRI_HOME + if not os.path.isabs(job_storage_path): + job_storage_path = os.path.join(conf.KOLIBRI_HOME, job_storage_path) + # Using custom SQLite backend that uses BEGIN IMMEDIATE transactions. # Once upgraded to Django 5.2+, revert to "django.db.backends.sqlite3" and use # the transaction_mode option instead. @@ -152,6 +158,11 @@ ), "OPTIONS": {"timeout": 100}, }, + JOB_STORAGE: { + "ENGINE": "django.db.backends.sqlite3", + "NAME": job_storage_path, + "OPTIONS": {"timeout": 100}, + }, } for additional_db in ADDITIONAL_SQLITE_DATABASES: @@ -166,6 +177,7 @@ "kolibri.core.notifications.models.NotificationsRouter", "kolibri.core.device.models.SyncQueueRouter", "kolibri.core.discovery.models.NetworkLocationRouter", + "kolibri.core.tasks.models.KolibriTasksRouter", ) elif conf.OPTIONS["Database"]["DATABASE_ENGINE"] == "postgres": diff --git a/kolibri/deployment/default/sqlite_db_names.py b/kolibri/deployment/default/sqlite_db_names.py index 8514d42926b..e527462ed7d 100644 --- a/kolibri/deployment/default/sqlite_db_names.py +++ b/kolibri/deployment/default/sqlite_db_names.py @@ -12,5 +12,7 @@ SESSIONS = "sessions" +JOB_STORAGE = "job_storage" + ADDITIONAL_SQLITE_DATABASES = (SYNC_QUEUE, NETWORK_LOCATION, NOTIFICATIONS, SESSIONS) diff --git a/kolibri/utils/main.py b/kolibri/utils/main.py index 7eb8065cf2a..0c671fac929 100644 --- a/kolibri/utils/main.py +++ b/kolibri/utils/main.py @@ -31,7 +31,6 @@ from kolibri.utils.debian_check import check_debian_user from kolibri.utils.logger import get_base_logging_config from kolibri.utils.sanity_checks import check_content_directory_exists_and_writable -from kolibri.utils.sanity_checks import check_database_is_migrated from kolibri.utils.sanity_checks import check_default_options_exist from kolibri.utils.sanity_checks import check_django_stack_ready from kolibri.utils.sanity_checks import check_log_file_location @@ -41,6 +40,8 @@ from kolibri.utils.server import get_status from kolibri.utils.server import NotRunning +# from kolibri.utils.sanity_checks import check_database_is_migrated + logger = logging.getLogger(__name__) @@ -288,6 +289,11 @@ def initialize( # noqa C901 if not skip_update: _upgrades_before_django_setup(updated, version) + _setup_django() + + _post_django_initialization() + + if not skip_update: try: ensure_job_tables_created() except Exception as e: @@ -298,10 +304,6 @@ def initialize( # noqa C901 ) raise - _setup_django() - - _post_django_initialization() - if updated and not skip_update: conditional_backup(kolibri.__version__, version) @@ -325,7 +327,9 @@ def initialize( # noqa C901 check_django_stack_ready() try: - check_database_is_migrated() + # check_database_is_migrated() + # Temporary replacing this check to always migrate + _migrate_databases() except DatabaseNotMigrated: try: _migrate_databases() diff --git a/kolibri/utils/sanity_checks.py b/kolibri/utils/sanity_checks.py index 5005939163f..fab3b30673a 100644 --- a/kolibri/utils/sanity_checks.py +++ b/kolibri/utils/sanity_checks.py @@ -123,12 +123,28 @@ def ensure_job_tables_created(): from kolibri.core.tasks.main import job_storage from kolibri.core.tasks.main import connection from kolibri.core.tasks.storage import Storage + from kolibri.core.tasks.schema_utils import sync_django_migration_state + from kolibri.core.tasks.schema_utils import sync_initial_migration_state try: job_storage.test_table_readable() + try: + # If the table is created and up to date, mark all migrations as applied + sync_django_migration_state() + except Exception as e: + logger.warning(f"Could not sync Django migration state: {e}") except (SQLAlchemyOperationalError, SQLAlchemyProgrammingError, DBSchemaError): - logger.warning("Database table for job storage was not accessible, recreating.") - Storage.recreate_default_tables(connection) + logger.warning("Database table for job storage was not accessible, creating.") + # If the table doesn't exist, create it + # If the table exists but is not up to date, mark initial migration (table creation) as applied + # and migrations will take care of updating the schema + Storage.create_default_tables(connection) + try: + sync_initial_migration_state() + except Exception as e: + logger.warning( + f"Could not sync Django migration state after recreation: {e}" + ) except Exception as e: raise DatabaseInaccessible(db_exception=e) From 486c89608c35dc974fa7e8c2d667d11c52a564ce Mon Sep 17 00:00:00 2001 From: Alex Velez Date: Thu, 16 Oct 2025 00:48:21 -0500 Subject: [PATCH 05/13] Migrate ensuring jobs database exists to django migrations --- kolibri/core/tasks/migrations/0001_initial.py | 11 ++- .../migrations/0002_add_retries_fields.py | 8 +- kolibri/core/tasks/models.py | 9 +-- kolibri/core/tasks/operations.py | 75 +++++++++++++------ kolibri/core/tasks/schema_utils.py | 60 --------------- kolibri/core/tasks/storage.py | 12 +-- kolibri/utils/main.py | 12 --- kolibri/utils/sanity_checks.py | 24 ++---- 8 files changed, 74 insertions(+), 137 deletions(-) delete mode 100644 kolibri/core/tasks/schema_utils.py diff --git a/kolibri/core/tasks/migrations/0001_initial.py b/kolibri/core/tasks/migrations/0001_initial.py index bdf8521fda7..4248ad572ec 100644 --- a/kolibri/core/tasks/migrations/0001_initial.py +++ b/kolibri/core/tasks/migrations/0001_initial.py @@ -1,8 +1,11 @@ -# Initial migration for the jobs table. This migration should be -# skipped if the table was created by SQLAlchemy. +# Initial migration for the jobs table. This model/index creation should be +# skipped if the table was previously created by SQLAlchemy. from django.db import migrations from django.db import models +from kolibri.core.tasks.operations import AddIndexIfNotExists +from kolibri.core.tasks.operations import CreateModelIfNotExists + class Migration(migrations.Migration): @@ -11,7 +14,7 @@ class Migration(migrations.Migration): dependencies = [] operations = [ - migrations.CreateModel( + CreateModelIfNotExists( name="Job", fields=[ ("id", models.CharField(max_length=36, primary_key=True)), @@ -44,7 +47,7 @@ class Migration(migrations.Migration): "db_table": "jobs", }, ), - migrations.AddIndex( + AddIndexIfNotExists( model_name="job", index=models.Index( fields=["queue", "scheduled_time"], name="queue__scheduled_time" diff --git a/kolibri/core/tasks/migrations/0002_add_retries_fields.py b/kolibri/core/tasks/migrations/0002_add_retries_fields.py index bb7159bb670..8c57c0c9820 100644 --- a/kolibri/core/tasks/migrations/0002_add_retries_fields.py +++ b/kolibri/core/tasks/migrations/0002_add_retries_fields.py @@ -1,11 +1,7 @@ # Generated by Django 3.2.25 on 2025-10-15 21:14 -# Modified to use AddColumnIfNotExists operation to prevent errors if columns were -# added by SQLAlchemy. from django.db import migrations from django.db import models -from kolibri.core.tasks.operations import AddFieldIfNotExists - class Migration(migrations.Migration): @@ -14,12 +10,12 @@ class Migration(migrations.Migration): ] operations = [ - AddFieldIfNotExists( + migrations.AddField( model_name="job", name="max_retries", field=models.IntegerField(blank=True, null=True), ), - AddFieldIfNotExists( + migrations.AddField( model_name="job", name="retries", field=models.IntegerField(blank=True, null=True), diff --git a/kolibri/core/tasks/models.py b/kolibri/core/tasks/models.py index 62aa3ae1514..0bc263b6485 100644 --- a/kolibri/core/tasks/models.py +++ b/kolibri/core/tasks/models.py @@ -24,21 +24,18 @@ def db_for_write(self, model, **hints): def allow_relation(self, obj1, obj2, **hints): """Determine if relationship is allowed between two objects.""" - # Allow any relation between two models that are both in the Notifications app. if ( obj1._meta.app_label == "kolibritasks" and obj2._meta.app_label == "kolibritasks" ): return True - # No opinion if neither object is in the Notifications app (defer to default or other routers). elif "kolibritasks" not in [obj1._meta.app_label, obj2._meta.app_label]: return None - # Block relationship if one object is in the Notifications app and the other isn't. return False def allow_migrate(self, db, app_label, model_name=None, **hints): - """Ensure that the Notifications app's models get created on the right database.""" + """Ensure that the kolibritasks app's models get created on the right database.""" if app_label == "kolibritasks": # The kolibritasks app should be migrated only on the JOB_STORAGE database. return db == JOB_STORAGE @@ -54,7 +51,7 @@ class Job(models.Model): """ Django model corresponding to the 'jobs' table in SQLAlchemy. - This model is not meant to be used for normal CRUD operations. + This model is not meant to be used for normal CRUD operations (yet). It exists solely for Django to manage the migrations of the 'jobs' table, which is handled by SQLAlchemy. """ @@ -105,7 +102,7 @@ class Job(models.Model): max_retries = models.IntegerField(null=True, blank=True) class Meta: - db_table = "jobs" # Usar la misma tabla que SQLAlchemy + db_table = "jobs" indexes = [ models.Index( fields=["queue", "scheduled_time"], name="queue__scheduled_time" diff --git a/kolibri/core/tasks/operations.py b/kolibri/core/tasks/operations.py index c32c51680e5..0b25654daec 100644 --- a/kolibri/core/tasks/operations.py +++ b/kolibri/core/tasks/operations.py @@ -5,38 +5,69 @@ logger = logging.getLogger(__name__) -class AddFieldIfNotExists(migrations.AddField): +def _get_db_tables(schema_editor): + with schema_editor.connection.cursor() as cursor: + return [ + table.name + for table in schema_editor.connection.introspection.get_table_list(cursor) + ] + + +class CreateModelIfNotExists(migrations.CreateModel): """ - Migration operation that adds a field to a model if it does not already exist in the database. + Migration operation that creates a model if it does not already exist in the database. This is useful for ensuring compatibility between SQLAlchemy and Django migrations, - preventing errors when the field has already been added by SQLAlchemy. + preventing errors when the model has already been created by SQLAlchemy. """ + def _model_exists(self, schema_editor, table_name): + tables = _get_db_tables(schema_editor) + return table_name in tables + def database_forwards(self, app_label, schema_editor, from_state, to_state): - """ - Executes the real logic in the database. - """ - to_model = to_state.apps.get_model(app_label, self.model_name) + to_model = to_state.apps.get_model(app_label, self.name) + table_name = to_model._meta.db_table + + if not self._model_exists(schema_editor, table_name): + # Let the parent class handle the creation + super().database_forwards(app_label, schema_editor, from_state, to_state) + else: + logger.info(f"Table '{table_name}' already exists. Skipping creation.") - from_model = from_state.apps.get_model(app_label, self.model_name) + def describe(self): + return f"Creates the model {self.name} if it does not exist." - table_name = from_model._meta.db_table - field = to_model._meta.get_field(self.name) - column_name = field.column + +class AddIndexIfNotExists(migrations.AddIndex): + """ + Migration operation that adds an index to a model if it does not already exist in the database. + This is useful for ensuring compatibility between SQLAlchemy and Django migrations, + preventing errors when the index has already been added by SQLAlchemy. + """ + + def _index_exists(self, schema_editor, table_name, index_name): + # Check existing tables in the database + tables = _get_db_tables(schema_editor) + if table_name not in tables: + return False with schema_editor.connection.cursor() as cursor: - columns = [ - col.name - for col in schema_editor.connection.introspection.get_table_description( - cursor, table_name - ) - ] - - if column_name not in columns: + # Get existing constraints (including indexes) on the table + constraints = schema_editor.connection.introspection.get_constraints( + cursor, table_name + ) + return constraints.get(index_name) is not None + + def database_forwards(self, app_label, schema_editor, from_state, to_state): + model = to_state.apps.get_model(app_label, self.model_name) + table_name = model._meta.db_table + index_name = self.index.name + + if not self._index_exists(schema_editor, table_name, index_name): + # Let the parent class handle the index creation super().database_forwards(app_label, schema_editor, from_state, to_state) - logger.info(f"Column '{field.column}' did not exist. Creating it...") else: - logger.info(f"Column '{field.column}' already exists. Skipping creation.") + logger.info(f"Index '{index_name}' already exists. Skipping creation.") def describe(self): - return f"Adds the field {self.name} to {self.model_name} if it does not exist." + return f"Adds the index {self.index} to {self.model_name} if it does not exist." diff --git a/kolibri/core/tasks/schema_utils.py b/kolibri/core/tasks/schema_utils.py deleted file mode 100644 index dc3d4775a64..00000000000 --- a/kolibri/core/tasks/schema_utils.py +++ /dev/null @@ -1,60 +0,0 @@ -""" -Utilidades para sincronizar cambios de esquema entre SQLAlchemy y Django. - -Este módulo contiene funciones helper para aplicar cambios de esquema -a las tablas de jobs usando tanto SQLAlchemy como migraciones Django. -""" -import logging - -from django.core.management import call_command - -from kolibri.deployment.default.sqlite_db_names import JOB_STORAGE -from kolibri.utils import conf - - -logger = logging.getLogger(__name__) - - -def _get_migration_database(): - database_engine = conf.OPTIONS["Database"]["DATABASE_ENGINE"] - - if database_engine == "sqlite": - return JOB_STORAGE - else: - return None - - -def sync_initial_migration_state(): - """ - Syncs the initial migration state without executing SQL. - This is useful to prevent the initial table creation migration if a jobs - table was already created via SQLAlchemy. - """ - try: - call_command( - "migrate", - "kolibritasks", - "0001", - "--fake", - database=_get_migration_database(), - ) - except Exception as e: - logger.error(f"Error syncing initial migration state: {e}") - raise - - -def sync_django_migration_state(): - """ - Syncs the Django migration state without executing SQL. - This is useful to mark all migrations as applied if the tables - were already created via SQLAlchemy. - """ - try: - # Ejecutar fake migrate para marcar como aplicada - call_command( - "migrate", "kolibritasks", "--fake", database=_get_migration_database() - ) - - except Exception as e: - logger.error(f"Error syncing Django migration state: {e}") - raise diff --git a/kolibri/core/tasks/storage.py b/kolibri/core/tasks/storage.py index 4d4aebe49c7..ade76f430fd 100644 --- a/kolibri/core/tasks/storage.py +++ b/kolibri/core/tasks/storage.py @@ -111,7 +111,6 @@ def __init__(self, connection, Base=Base): if self.engine.name == "sqlite": self.set_sqlite_pragmas() self.Base = Base - self.Base.metadata.create_all(self.engine) self.sessionmaker = sessionmaker(bind=self.engine) self._hooks = list(StorageHook.registered_hooks) @@ -148,14 +147,6 @@ def __contains__(self, item): is not None ) - @staticmethod - def create_default_tables(engine): - """ - Creates the default tables for the job storage backend. If the tables - already exist, this does nothing. - """ - Base.metadata.create_all(engine) - @staticmethod def recreate_default_tables(engine): """ @@ -394,6 +385,9 @@ def get_all_jobs(self, queue=None, repeating=None): return self.filter_jobs(queue=queue, repeating=repeating) def test_table_readable(self): + """ + @deprecated + """ # Have to use the self-referential `self.engine.engine` as the inspection # used inside this function complains if we use the `self.engine` object # as it is a Django SimpleLazyObject and it doesn't like it! diff --git a/kolibri/utils/main.py b/kolibri/utils/main.py index 0c671fac929..bc9e09fbf2b 100644 --- a/kolibri/utils/main.py +++ b/kolibri/utils/main.py @@ -36,7 +36,6 @@ from kolibri.utils.sanity_checks import check_log_file_location from kolibri.utils.sanity_checks import DatabaseInaccessible from kolibri.utils.sanity_checks import DatabaseNotMigrated -from kolibri.utils.sanity_checks import ensure_job_tables_created from kolibri.utils.server import get_status from kolibri.utils.server import NotRunning @@ -293,17 +292,6 @@ def initialize( # noqa C901 _post_django_initialization() - if not skip_update: - try: - ensure_job_tables_created() - except Exception as e: - logging.error( - "The job tables were not fully migrated. Tried to " - "create them in the database and an error occurred: " - "{}".format(e) - ) - raise - if updated and not skip_update: conditional_backup(kolibri.__version__, version) diff --git a/kolibri/utils/sanity_checks.py b/kolibri/utils/sanity_checks.py index fab3b30673a..dbd6cf451b9 100644 --- a/kolibri/utils/sanity_checks.py +++ b/kolibri/utils/sanity_checks.py @@ -120,31 +120,19 @@ def check_database_is_migrated(): def ensure_job_tables_created(): + """ + @deprecated: This is no longer needed as of Kolibri 0.19, because we + now use django migrations to create the jobs table. + """ from kolibri.core.tasks.main import job_storage from kolibri.core.tasks.main import connection from kolibri.core.tasks.storage import Storage - from kolibri.core.tasks.schema_utils import sync_django_migration_state - from kolibri.core.tasks.schema_utils import sync_initial_migration_state try: job_storage.test_table_readable() - try: - # If the table is created and up to date, mark all migrations as applied - sync_django_migration_state() - except Exception as e: - logger.warning(f"Could not sync Django migration state: {e}") except (SQLAlchemyOperationalError, SQLAlchemyProgrammingError, DBSchemaError): - logger.warning("Database table for job storage was not accessible, creating.") - # If the table doesn't exist, create it - # If the table exists but is not up to date, mark initial migration (table creation) as applied - # and migrations will take care of updating the schema - Storage.create_default_tables(connection) - try: - sync_initial_migration_state() - except Exception as e: - logger.warning( - f"Could not sync Django migration state after recreation: {e}" - ) + logger.warning("Database table for job storage was not accessible, recreating.") + Storage.recreate_default_tables(connection) except Exception as e: raise DatabaseInaccessible(db_exception=e) From 58597ebfcb473d279146562bc2c6bf1119a1785f Mon Sep 17 00:00:00 2001 From: Alex Velez Date: Thu, 16 Oct 2025 13:24:22 -0500 Subject: [PATCH 06/13] Refactor retry_on logic --- kolibri/core/tasks/api.py | 3 ++ kolibri/core/tasks/job.py | 22 +------- kolibri/core/tasks/registry.py | 1 - kolibri/core/tasks/storage.py | 54 +++++++++++++++++-- .../assets/src/views/ImportMultipleUsers.vue | 4 ++ kolibri/utils/main.py | 7 +-- 6 files changed, 61 insertions(+), 30 deletions(-) diff --git a/kolibri/core/tasks/api.py b/kolibri/core/tasks/api.py index a2c4303049c..10a6da9b15a 100644 --- a/kolibri/core/tasks/api.py +++ b/kolibri/core/tasks/api.py @@ -162,6 +162,7 @@ def _enqueue_job_based_on_enqueue_args(self, registered_task, job, enqueue_args) interval=enqueue_args.get("repeat_interval", 0), repeat=enqueue_args.get("repeat", 0), retry_interval=enqueue_args.get("retry_interval", None), + max_retries=enqueue_args.get("max_retries", None), ) elif enqueue_args.get("enqueue_in"): return job_storage.enqueue_in( @@ -172,12 +173,14 @@ def _enqueue_job_based_on_enqueue_args(self, registered_task, job, enqueue_args) interval=enqueue_args.get("repeat_interval", 0), repeat=enqueue_args.get("repeat", 0), retry_interval=enqueue_args.get("retry_interval", None), + max_retries=enqueue_args.get("max_retries", None), ) return job_storage.enqueue_job( job, queue=registered_task.queue, priority=enqueue_args.get("priority", registered_task.priority), retry_interval=enqueue_args.get("retry_interval", None), + max_retries=enqueue_args.get("max_retries", None), ) def create(self, request): diff --git a/kolibri/core/tasks/job.py b/kolibri/core/tasks/job.py index bd2330c1a22..e3833d3d6be 100644 --- a/kolibri/core/tasks/job.py +++ b/kolibri/core/tasks/job.py @@ -137,7 +137,6 @@ class Job(object): "facility_id", "func", "long_running", - "retry_on", } def to_json(self): @@ -178,7 +177,6 @@ def from_job(cls, job, **kwargs): kwargs["track_progress"] = job.track_progress kwargs["cancellable"] = job.cancellable kwargs["long_running"] = job.long_running - kwargs["retry_on"] = job.retry_on.copy() kwargs["extra_metadata"] = job.extra_metadata.copy() kwargs["facility_id"] = job.facility_id return cls(job.func, **kwargs) @@ -200,7 +198,6 @@ def __init__( total_progress=0, result=None, long_running=False, - retry_on=None, ): """ Create a new Job that will run func given the arguments passed to Job(). If the track_progress keyword parameter @@ -242,7 +239,6 @@ def __init__( self.kwargs = kwargs or {} self._storage = None self.func = callable_to_import_path(func) - self.retry_on = [callable_to_import_path(exc) for exc in (retry_on or [])] def _check_storage_attached(self): if self._storage is None: @@ -374,7 +370,6 @@ def execute(self): args, kwargs = copy.copy(self.args), copy.copy(self.kwargs) - should_retry = False exception = None try: @@ -386,7 +381,6 @@ def execute(self): self.storage.mark_job_as_canceled(self.job_id) except Exception as e: exception = e - should_retry = self.should_retry(e) # If any error occurs, mark the job as failed and save the exception traceback_str = traceback.format_exc() e.traceback = traceback_str @@ -396,26 +390,12 @@ def execute(self): self.storage.mark_job_as_failed(self.job_id, e, traceback_str) self.storage.reschedule_finished_job_if_needed( self.job_id, - delay=RETRY_ON_DELAY if should_retry else self._retry_in_delay, + delay=self._retry_in_delay, exception=exception, **self._retry_in_kwargs, ) setattr(current_state_tracker, "job", None) - def should_retry(self, exception): - retries = self.extra_metadata.get("retries", 0) + 1 - self.extra_metadata["retries"] = retries - self.save_meta() - if retries > MAX_RETRIES: - return False - - for retry_exception in self.retry_on: - exc = import_path_to_callable(retry_exception) - if isinstance(exception, exc): - logger.info(f"Retrying job {self.job_id} due to exception {exception}") - return True - return False - @property def task(self): """ diff --git a/kolibri/core/tasks/registry.py b/kolibri/core/tasks/registry.py index 1170ec61bdf..cd629838a10 100644 --- a/kolibri/core/tasks/registry.py +++ b/kolibri/core/tasks/registry.py @@ -408,7 +408,6 @@ def _ready_job(self, **job_kwargs): cancellable=job_kwargs.pop("cancellable", self.cancellable), track_progress=job_kwargs.pop("track_progress", self.track_progress), long_running=job_kwargs.pop("long_running", self.long_running), - retry_on=self.retry_on, **job_kwargs ) return job_obj diff --git a/kolibri/core/tasks/storage.py b/kolibri/core/tasks/storage.py index ade76f430fd..55ca90087e6 100644 --- a/kolibri/core/tasks/storage.py +++ b/kolibri/core/tasks/storage.py @@ -187,7 +187,12 @@ def _orm_to_job(self, orm_job): return job def enqueue_job( - self, job, queue=DEFAULT_QUEUE, priority=Priority.REGULAR, retry_interval=None + self, + job, + queue=DEFAULT_QUEUE, + priority=Priority.REGULAR, + retry_interval=None, + max_retries=None, ): """ Add the job given by j to the job queue. @@ -204,6 +209,7 @@ def enqueue_job( interval=0, repeat=0, retry_interval=retry_interval, + max_retries=max_retries, ) except JobRunning: logger.debug( @@ -214,7 +220,12 @@ def enqueue_job( return job.job_id def enqueue_lifo( - self, job, queue=DEFAULT_QUEUE, priority=Priority.REGULAR, retry_interval=None + self, + job, + queue=DEFAULT_QUEUE, + priority=Priority.REGULAR, + retry_interval=None, + max_retries=None, ): naive_utc_now = datetime.utcnow() with self.session_scope() as session: @@ -240,6 +251,7 @@ def enqueue_lifo( interval=0, repeat=0, retry_interval=retry_interval, + max_retries=max_retries, ) except JobRunning: logger.debug( @@ -644,6 +656,7 @@ def reschedule_finished_job_if_needed( # noqa: C901 retry_interval=retry_interval if retry_interval is not NO_VALUE else orm_job.retry_interval, + max_retries=orm_job.max_retries, ) if exception is not None: @@ -658,12 +671,19 @@ def reschedule_finished_job_if_needed( # noqa: C901 # enqueuing changes - so if it is still set to repeat, it will repeat again after the # delayed rerun. new_scheduled_time = self._now() + delay - elif orm_job.state == State.FAILED and kwargs["retry_interval"] is not None: + elif self._should_retry_on_task_failed( + orm_job, exception, kwargs["retry_interval"] + ): # If the task has failed, and a retry interval has been specified (either in the original enqueue, # or from the passed in kwargs) then requeue as a retry. new_scheduled_time = self._now() + timedelta( seconds=kwargs["retry_interval"] + if kwargs["retry_interval"] is not None + else 10 ) + # Increment the retries count. + current_retries = orm_job.retries if orm_job.retries is not None else 0 + kwargs["retries"] = current_retries + 1 elif ( orm_job.state in {State.COMPLETED, State.FAILED, State.CANCELED} @@ -682,6 +702,28 @@ def reschedule_finished_job_if_needed( # noqa: C901 # Use the schedule method so that any scheduling hooks are run for this next run of the job. self.schedule(new_scheduled_time, job, **kwargs) + def _should_retry_on_task_failed(self, orm_job, exception, retry_interval): + """ + Determine if a job should be retried based on its retry settings and the exception raised. + """ + if orm_job.state != State.FAILED: + return False + + if retry_interval is None and orm_job.max_retries is None: + # retry_interval or max_retries should be set to enable retries + return False + + current_retries = orm_job.retries if orm_job.retries is not None else 0 + if orm_job.max_retries is not None and current_retries >= orm_job.max_retries: + return False + + job = self._orm_to_job(orm_job) + retry_on = job.task.retry_on + if retry_on: + return any(issubclass(exception, exc) for exc in retry_on) + + return True + def _update_job(self, job_id, state=None, **kwargs): with self.session_scope() as session: try: @@ -736,6 +778,7 @@ def enqueue_at( interval=0, repeat=0, retry_interval=None, + max_retries=None, ): """ Add the job for the specified time @@ -748,6 +791,7 @@ def enqueue_at( interval=interval, repeat=repeat, retry_interval=retry_interval, + max_retries=max_retries, ) def enqueue_in( @@ -759,6 +803,7 @@ def enqueue_in( interval=0, repeat=0, retry_interval=None, + max_retries=None, ): """ Add the job in the specified time delta @@ -774,6 +819,7 @@ def enqueue_in( interval=interval, repeat=repeat, retry_interval=retry_interval, + max_retries=max_retries, ) def schedule( @@ -785,6 +831,7 @@ def schedule( interval=0, repeat=0, retry_interval=None, + max_retries=None, ): """ Add the job for the specified time, interval, and number of repeats. @@ -820,6 +867,7 @@ def schedule( interval=interval, repeat=repeat, retry_interval=retry_interval, + max_retries=max_retries, scheduled_time=naive_utc_datetime(dt), saved_job=job.to_json(), ) diff --git a/kolibri/plugins/setup_wizard/assets/src/views/ImportMultipleUsers.vue b/kolibri/plugins/setup_wizard/assets/src/views/ImportMultipleUsers.vue index cbe1b4c4aed..d7967db99e4 100644 --- a/kolibri/plugins/setup_wizard/assets/src/views/ImportMultipleUsers.vue +++ b/kolibri/plugins/setup_wizard/assets/src/views/ImportMultipleUsers.vue @@ -233,6 +233,10 @@ device_id: this.device.id, user_id: learner.id, using_admin: true, + enqueue_args: { + retry_interval: 5, + max_retries: 3, + }, }; if (!this.wizardService.state.context.firstImportedLodUser) { this.wizardService.send({ diff --git a/kolibri/utils/main.py b/kolibri/utils/main.py index bc9e09fbf2b..15b979e6a9a 100644 --- a/kolibri/utils/main.py +++ b/kolibri/utils/main.py @@ -31,6 +31,7 @@ from kolibri.utils.debian_check import check_debian_user from kolibri.utils.logger import get_base_logging_config from kolibri.utils.sanity_checks import check_content_directory_exists_and_writable +from kolibri.utils.sanity_checks import check_database_is_migrated from kolibri.utils.sanity_checks import check_default_options_exist from kolibri.utils.sanity_checks import check_django_stack_ready from kolibri.utils.sanity_checks import check_log_file_location @@ -39,8 +40,6 @@ from kolibri.utils.server import get_status from kolibri.utils.server import NotRunning -# from kolibri.utils.sanity_checks import check_database_is_migrated - logger = logging.getLogger(__name__) @@ -315,9 +314,7 @@ def initialize( # noqa C901 check_django_stack_ready() try: - # check_database_is_migrated() - # Temporary replacing this check to always migrate - _migrate_databases() + check_database_is_migrated() except DatabaseNotMigrated: try: _migrate_databases() From 579b9e71665359a403c680d86741fe734b32c5ef Mon Sep 17 00:00:00 2001 From: Alex Velez Date: Fri, 24 Oct 2025 15:09:43 -0500 Subject: [PATCH 07/13] Migrate Jobs model to django --- kolibri/core/auth/tasks.py | 3 +- kolibri/core/tasks/api.py | 5 +- kolibri/core/tasks/job.py | 8 - kolibri/core/tasks/main.py | 9 +- kolibri/core/tasks/models.py | 9 +- kolibri/core/tasks/storage.py | 398 +++++++------------- kolibri/core/tasks/worker.py | 11 +- kolibri/deployment/default/settings/base.py | 4 + kolibri/utils/sanity_checks.py | 21 -- 9 files changed, 145 insertions(+), 323 deletions(-) diff --git a/kolibri/core/auth/tasks.py b/kolibri/core/auth/tasks.py index d6d473ef594..418b178426e 100644 --- a/kolibri/core/auth/tasks.py +++ b/kolibri/core/auth/tasks.py @@ -46,7 +46,6 @@ from kolibri.core.tasks.utils import DatabaseLockedError from kolibri.core.tasks.utils import get_current_job from kolibri.core.tasks.validation import JobValidator -from kolibri.utils.time_utils import naive_utc_datetime from kolibri.utils.translation import gettext as _ @@ -472,7 +471,7 @@ def enqueue_soud_sync_processing(): # Check if there is already an enqueued job try: - converted_next_run = naive_utc_datetime(timezone.now() + next_run) + converted_next_run = timezone.now() + next_run orm_job = job_storage.get_orm_job(SOUD_SYNC_PROCESSING_JOB_ID) if ( orm_job.state not in (State.COMPLETED, State.FAILED, State.CANCELED) diff --git a/kolibri/core/tasks/api.py b/kolibri/core/tasks/api.py index 10a6da9b15a..d40af9c0ade 100644 --- a/kolibri/core/tasks/api.py +++ b/kolibri/core/tasks/api.py @@ -2,9 +2,7 @@ from django.http.response import Http404 from django.utils.decorators import method_decorator -from django.utils.timezone import make_aware from django.views.decorators.csrf import csrf_protect -from pytz import utc from rest_framework import decorators from rest_framework import serializers from rest_framework import status @@ -100,8 +98,7 @@ def _job_to_response(self, job): "args": job.args, "kwargs": job.kwargs, "extra_metadata": job.extra_metadata, - # Output is UTC naive, coerce to UTC aware. - "scheduled_datetime": make_aware(orm_job.scheduled_time, utc).isoformat(), + "scheduled_datetime": orm_job.scheduled_time.isoformat(), "repeat": orm_job.repeat, "repeat_interval": orm_job.interval, "retry_interval": orm_job.retry_interval, diff --git a/kolibri/core/tasks/job.py b/kolibri/core/tasks/job.py index e3833d3d6be..8677a81730c 100644 --- a/kolibri/core/tasks/job.py +++ b/kolibri/core/tasks/job.py @@ -4,7 +4,6 @@ import traceback import uuid from collections import namedtuple -from datetime import timedelta from kolibri.core.tasks.constants import ( # noqa F401 - imported for backwards compatibility Priority, @@ -102,13 +101,6 @@ def default_status_text(job): ALLOWED_RETRY_IN_KWARGS = {"priority", "repeat", "interval", "retry_interval"} -RETRY_ON_DELAY = timedelta( - seconds=5 -) # Delay before retrying a job that failed due to a retryable exception -MAX_RETRIES = ( - 3 # Maximum number of retries for a job that failed due to a retryable exception -) - class Job(object): """ diff --git a/kolibri/core/tasks/main.py b/kolibri/core/tasks/main.py index d06f977b484..98c500fca15 100644 --- a/kolibri/core/tasks/main.py +++ b/kolibri/core/tasks/main.py @@ -3,7 +3,6 @@ from django.utils.functional import SimpleLazyObject from kolibri.core.tasks.storage import Storage -from kolibri.core.tasks.utils import db_connection from kolibri.core.tasks.worker import Worker from kolibri.utils import conf @@ -11,13 +10,8 @@ logger = logging.getLogger(__name__) -connection = SimpleLazyObject(db_connection) - - def __job_storage(): - return Storage( - connection=connection, - ) + return Storage() # This storage instance should be used to access job_storage db. @@ -28,7 +22,6 @@ def __job_storage(): def initialize_workers(log_queue=None): logger.info("Starting async task workers.") return Worker( - connection=connection, regular_workers=conf.OPTIONS["Tasks"]["REGULAR_PRIORITY_WORKERS"], high_workers=conf.OPTIONS["Tasks"]["HIGH_PRIORITY_WORKERS"], log_queue=log_queue, diff --git a/kolibri/core/tasks/models.py b/kolibri/core/tasks/models.py index 0bc263b6485..35ac0539f17 100644 --- a/kolibri/core/tasks/models.py +++ b/kolibri/core/tasks/models.py @@ -47,14 +47,9 @@ def allow_migrate(self, db, app_label, model_name=None, **hints): return None +# The Job model has been migrated from SqlAlchemy to use django models +# generated by Copilot and tweaked class Job(models.Model): - """ - Django model corresponding to the 'jobs' table in SQLAlchemy. - - This model is not meant to be used for normal CRUD operations (yet). - It exists solely for Django to manage the migrations - of the 'jobs' table, which is handled by SQLAlchemy. - """ # The hex UUID given to the job upon first creation. id = models.CharField(max_length=36, primary_key=True) diff --git a/kolibri/core/tasks/storage.py b/kolibri/core/tasks/storage.py index 55ca90087e6..02ac4b61617 100644 --- a/kolibri/core/tasks/storage.py +++ b/kolibri/core/tasks/storage.py @@ -1,23 +1,11 @@ import logging -from contextlib import contextmanager from datetime import datetime from datetime import timedelta -import pytz -from sqlalchemy import Column -from sqlalchemy import DateTime -from sqlalchemy import func as sql_func -from sqlalchemy import Index -from sqlalchemy import Integer -from sqlalchemy import or_ -from sqlalchemy import select -from sqlalchemy import String -from sqlalchemy import Table -from sqlalchemy import text -from sqlalchemy import update -from sqlalchemy.exc import OperationalError -from sqlalchemy.orm import declarative_base -from sqlalchemy.orm import sessionmaker +from django.db import connections +from django.db import transaction +from django.db.models import Q +from django.db.utils import OperationalError from kolibri.core.tasks.constants import DEFAULT_QUEUE from kolibri.core.tasks.constants import Priority @@ -27,111 +15,30 @@ from kolibri.core.tasks.hooks import StorageHook from kolibri.core.tasks.job import Job from kolibri.core.tasks.job import State +from kolibri.core.tasks.models import Job as ORMJob from kolibri.core.tasks.validation import validate_exception from kolibri.core.tasks.validation import validate_interval from kolibri.core.tasks.validation import validate_priority from kolibri.core.tasks.validation import validate_repeat from kolibri.core.tasks.validation import validate_timedelay -from kolibri.utils.sql_alchemy import db_matches_schema from kolibri.utils.time_utils import local_now -from kolibri.utils.time_utils import naive_utc_datetime - -Base = declarative_base() logger = logging.getLogger(__name__) -class ORMJob(Base): - """ - The DB representation of a common.classes.Job object, - storing the relevant details needed by the job storage - backend. - - Migrations are carried out using Django's migration framework, so - any changes to this model should also be done in the django model - in kolibri.core.tasks.models.Job - """ - - __tablename__ = "jobs" - - # The hex UUID given to the job upon first creation. - id = Column(String, primary_key=True, autoincrement=False) - - # The job's state. Inflated here for easier querying to the job's state. - state = Column(String, index=True) - - # The job's function string. Inflated here for easier querying of which task type it is. - func = Column(String, index=True) - - # The job's priority. Helps to decide which job to run next. - priority = Column(Integer, index=True) - - # The queue name passed to the client when the job is scheduled. - queue = Column(String, index=True) - - # The JSON string that represents the job - saved_job = Column(String) - - time_created = Column(DateTime(timezone=True), server_default=sql_func.now()) - time_updated = Column(DateTime(timezone=True), onupdate=sql_func.now()) - - # Repeat interval in seconds. - interval = Column(Integer, default=0) - - # Retry interval in seconds. - retry_interval = Column(Integer, nullable=True) - - # Number of times to repeat - None means repeat forever. - repeat = Column(Integer, nullable=True) - - scheduled_time = Column(DateTime()) - - # Optional references to the worker host, process and thread that are running this job, - # and any extra metadata that can be used by specific worker implementations. - worker_host = Column(String, nullable=True) - worker_process = Column(String, nullable=True) - worker_thread = Column(String, nullable=True) - worker_extra = Column(String, nullable=True) - - # Columns for retry logic - # Number of times the job has been retried - retries = Column(Integer, nullable=True) - # Maximum number of retries allowed for the job - max_retries = Column(Integer, nullable=True) - - __table_args__ = (Index("queue__scheduled_time", "queue", "scheduled_time"),) - - NO_VALUE = object() class Storage(object): - def __init__(self, connection, Base=Base): - self.engine = connection - if self.engine.name == "sqlite": - self.set_sqlite_pragmas() - self.Base = Base - self.sessionmaker = sessionmaker(bind=self.engine) + def __init__(self): + self.set_sqlite_pragmas() self._hooks = list(StorageHook.registered_hooks) - @contextmanager - def session_scope(self): - session = self.sessionmaker() - try: - yield session - session.commit() - except Exception: - session.rollback() - raise - finally: - session.close() - def __len__(self): """ Returns the number of jobs currently in the storage. """ - with self.engine.connect() as conn: - return conn.execute(sql_func.count(ORMJob.id)).scalar() + return ORMJob.objects.count() def __contains__(self, item): """ @@ -141,38 +48,27 @@ def __contains__(self, item): job_id = item if isinstance(item, Job): job_id = item.job_id - with self.engine.connect() as connection: - return ( - connection.execute(select(ORMJob).where(ORMJob.id == job_id)).fetchone() - is not None - ) - - @staticmethod - def recreate_default_tables(engine): - """ - @deprecated - Recreates the default tables for the job storage backend. - """ - Base.metadata.drop_all(engine) - scheduledjobs_base = declarative_base() - scheduledjobs_table = Table("scheduledjobs", scheduledjobs_base.metadata) - scheduledjobs_table.drop(engine, checkfirst=True) - Base.metadata.create_all(engine) + return ORMJob.objects.filter(id=job_id).exists() def set_sqlite_pragmas(self): """ - Sets the connection PRAGMAs for the sqlalchemy engine stored in self.engine. + Sets the connection PRAGMAs for the sqlite database. It currently sets: - journal_mode to WAL :return: None """ - try: - with self.engine.connect() as conn: - conn.execute(text("PRAGMA journal_mode = WAL;")) - except OperationalError: - pass + from django.db import connections + + connection = connections[ORMJob.objects.db] + if connection.vendor == "sqlite": + try: + cursor = connection.cursor() + cursor.execute("PRAGMA journal_mode=WAL;") + cursor.close() + except OperationalError: + pass def _orm_to_job(self, orm_job): """ @@ -227,21 +123,18 @@ def enqueue_lifo( retry_interval=None, max_retries=None, ): - naive_utc_now = datetime.utcnow() - with self.session_scope() as session: - soonest_job = ( - session.query(ORMJob) - .filter(ORMJob.state == State.QUEUED) - .filter(ORMJob.scheduled_time <= naive_utc_now) - .order_by(ORMJob.scheduled_time) - .first() - ) - dt = ( - pytz.timezone("UTC").localize(soonest_job.scheduled_time) - - timedelta(microseconds=1) - if soonest_job - else self._now() - ) + now = self._now() + soonest_job = ( + ORMJob.objects.filter(state=State.QUEUED) + .filter(scheduled_time__lte=now) + .order_by("scheduled_time") + .first() + ) + dt = ( + soonest_job.scheduled_time - timedelta(microseconds=1) + if soonest_job + else self._now() + ) try: return self.schedule( dt, @@ -295,16 +188,13 @@ def mark_job_as_canceling(self, job_id): """ self._update_job(job_id, State.CANCELING) - def _filter_next_query(self, query, priority): - naive_utc_now = datetime.utcnow() - return ( - query.filter(ORMJob.state == State.QUEUED) - .filter(ORMJob.scheduled_time <= naive_utc_now) - .filter(ORMJob.priority <= priority) - .order_by(ORMJob.priority, ORMJob.scheduled_time, ORMJob.time_created) - ) + def _filter_next_query(self, queryset, priority): + now = self._now() + return queryset.filter( + Q(scheduled_time__lte=now), state=State.QUEUED, priority__lte=priority + ).order_by("priority", "scheduled_time", "time_created") - def _postgres_next_queued_job(self, session, priority): + def _postgres_next_queued_job(self, priority): """ For postgres we are doing our best to ensure that the selected job is not then also selected by another potentially concurrent worker controller @@ -312,77 +202,72 @@ def _postgres_next_queued_job(self, session, priority): This should work as long as our connection uses the default isolation level of READ_COMMITTED. More details here: https://dba.stackexchange.com/a/69497 - For SQLAlchemy details here: https://stackoverflow.com/a/25943713 """ - subquery = ( - self._filter_next_query(session.query(ORMJob.id), priority) - .limit(1) - .with_for_update(skip_locked=True) - ) - return self.engine.execute( - update(ORMJob) - .values(state=State.SELECTED) - .where(ORMJob.id == subquery.scalar_subquery()) - .returning(ORMJob.saved_job) - ).fetchone() + with transaction.atomic(): + next_job = ( + self._filter_next_query(ORMJob.objects.all(), priority) + .select_for_update(skip_locked=True) + .first() + ) - def _sqlite_next_queued_job(self, session, priority): + if next_job: + next_job.state = State.SELECTED + next_job.save() + return next_job + return None + + def _sqlite_next_queued_job(self, priority): """ Due to the difficulty in appropriately locking the task row we do not support multiple task runners potentially duelling to lock tasks for SQLite, so here we just do a minimal best effort to mark the job as selected for running. """ - orm_job = self._filter_next_query(session.query(ORMJob), priority).first() + orm_job = self._filter_next_query(ORMJob.objects.all(), priority).first() + if orm_job: orm_job.state = State.SELECTED - session.add(orm_job) + orm_job.save() return orm_job def get_next_queued_job(self, priority=Priority.REGULAR): - with self.session_scope() as s: - method = ( - self._sqlite_next_queued_job - if self.engine.dialect.name == "sqlite" - else self._postgres_next_queued_job - ) - orm_job = method(s, priority) + db_backend = connections[ORMJob.objects.db].vendor - if orm_job: - job = self._orm_to_job(orm_job) - else: - job = None + if db_backend == "sqlite": + orm_job = self._sqlite_next_queued_job(priority) + else: + orm_job = self._postgres_next_queued_job(priority) - return job + if orm_job: + return self._orm_to_job(orm_job) + return None def filter_jobs( self, queue=None, queues=None, state=None, repeating=None, func=None ): if queue and queues: raise ValueError("Cannot specify both queue and queues") - with self.engine.connect() as conn: - q = select(ORMJob) - if queue: - q = q.where(ORMJob.queue == queue) + queryset = ORMJob.objects.all() - if queues: - q = q.where(ORMJob.queue.in_(queues)) + if queue: + queryset = queryset.filter(queue=queue) - if state: - q = q.where(ORMJob.state == state) + if queues: + queryset = queryset.filter(queue__in=queues) - if repeating is True: - q = q.where(or_(ORMJob.repeat > 0, ORMJob.repeat == None)) # noqa E711 - elif repeating is False: - q = q.where(ORMJob.repeat == 0) + if state: + queryset = queryset.filter(state=state) - if func: - q = q.where(ORMJob.func == func) + if repeating is True: + queryset = queryset.filter(Q(repeat__gt=0) | Q(repeat__isnull=True)) + elif repeating is False: + queryset = queryset.filter(repeat=0) - orm_jobs = conn.execute(q) + if func: + queryset = queryset.filter(func=func) - return [self._orm_to_job(o) for o in orm_jobs] + return [self._orm_to_job(o) for o in queryset] def get_canceling_jobs(self, queues=None): return self.get_jobs_by_state(state=State.CANCELING, queues=queues) @@ -396,28 +281,17 @@ def get_jobs_by_state(self, state, queues=None): def get_all_jobs(self, queue=None, repeating=None): return self.filter_jobs(queue=queue, repeating=repeating) - def test_table_readable(self): - """ - @deprecated - """ - # Have to use the self-referential `self.engine.engine` as the inspection - # used inside this function complains if we use the `self.engine` object - # as it is a Django SimpleLazyObject and it doesn't like it! - db_matches_schema({ORMJob.__tablename__: ORMJob}, self.engine.engine) - def get_job(self, job_id): orm_job = self.get_orm_job(job_id) job = self._orm_to_job(orm_job) return job def get_orm_job(self, job_id): - with self.engine.connect() as connection: - orm_job = connection.execute( - select(ORMJob).where(ORMJob.id == job_id) - ).fetchone() - if orm_job is None: + try: + orm_job = ORMJob.objects.get(id=job_id) + return orm_job + except ORMJob.DoesNotExist: raise JobNotFound() - return orm_job def restart_job(self, job_id): """ @@ -497,28 +371,28 @@ def clear(self, queue=None, job_id=None, force=False): :type force: bool :param force: If True, clear the job (or jobs), even if it hasn't completed, failed or been cancelled. """ - with self.session_scope() as s: - q = s.query(ORMJob) + with transaction.atomic(): + queryset = ORMJob.objects.all() if queue: - q = q.filter_by(queue=queue) + queryset = queryset.filter(queue=queue) if job_id: - q = q.filter_by(id=job_id) + queryset = queryset.filter(id=job_id) # filter only by the finished jobs, if we are not specified to force if not force: - q = q.filter( - or_( - ORMJob.state == State.COMPLETED, - ORMJob.state == State.FAILED, - ORMJob.state == State.CANCELED, - ) + queryset = queryset.filter( + Q(state=State.COMPLETED) + | Q(state=State.FAILED) + | Q(state=State.CANCELED) ) + if self._hooks: - for orm_job in q: + for orm_job in queryset: job = self._orm_to_job(orm_job) for hook in self._hooks: hook.clear(job, orm_job) - q.delete(synchronize_session=False) + + queryset.delete() def update_job_progress( self, job_id, progress, total_progress, extra_metadata=None @@ -582,9 +456,9 @@ def save_worker_info( # nothing to do return - with self.session_scope() as session: + with transaction.atomic(): try: - _, orm_job = self._get_job_and_orm_job(job_id, session) + _, orm_job = self._get_job_and_orm_job(job_id) if host is not None: orm_job.worker_host = host if process is not None: @@ -593,11 +467,7 @@ def save_worker_info( orm_job.worker_thread = thread if extra is not None: orm_job.worker_extra = extra - session.add(orm_job) - try: - session.commit() - except Exception as e: - logger.error("Got an error running session.commit(): {}".format(e)) + orm_job.save() except JobNotFound: logger.error( "Tried to update job with id {} but it was not found".format(job_id) @@ -657,11 +527,9 @@ def reschedule_finished_job_if_needed( # noqa: C901 if retry_interval is not NO_VALUE else orm_job.retry_interval, max_retries=orm_job.max_retries, + retries=orm_job.retries, ) - if exception is not None: - # TODO: Implement retry on exception logic. - pass # Set a null new_scheduled_time so that we finish processing if none of the cases below pertain. new_scheduled_time = None if delay is not None: @@ -671,11 +539,9 @@ def reschedule_finished_job_if_needed( # noqa: C901 # enqueuing changes - so if it is still set to repeat, it will repeat again after the # delayed rerun. new_scheduled_time = self._now() + delay - elif self._should_retry_on_task_failed( + elif self._should_retry_on_failed_task( orm_job, exception, kwargs["retry_interval"] ): - # If the task has failed, and a retry interval has been specified (either in the original enqueue, - # or from the passed in kwargs) then requeue as a retry. new_scheduled_time = self._now() + timedelta( seconds=kwargs["retry_interval"] if kwargs["retry_interval"] is not None @@ -702,7 +568,7 @@ def reschedule_finished_job_if_needed( # noqa: C901 # Use the schedule method so that any scheduling hooks are run for this next run of the job. self.schedule(new_scheduled_time, job, **kwargs) - def _should_retry_on_task_failed(self, orm_job, exception, retry_interval): + def _should_retry_on_failed_task(self, orm_job, exception, retry_interval): """ Determine if a job should be retried based on its retry settings and the exception raised. """ @@ -719,15 +585,15 @@ def _should_retry_on_task_failed(self, orm_job, exception, retry_interval): job = self._orm_to_job(orm_job) retry_on = job.task.retry_on - if retry_on: - return any(issubclass(exception, exc) for exc in retry_on) + if retry_on and exception: + return any(isinstance(exception, exc) for exc in retry_on) return True def _update_job(self, job_id, state=None, **kwargs): - with self.session_scope() as session: + with transaction.atomic(): try: - job, orm_job = self._get_job_and_orm_job(job_id, session) + job, orm_job = self._get_job_and_orm_job(job_id) if state is not None: orm_job.state = job.state = state for kwarg in kwargs: @@ -740,11 +606,7 @@ def _update_job(self, job_id, state=None, **kwargs): ) ) orm_job.saved_job = job.to_json() - session.add(orm_job) - try: - session.commit() - except Exception as e: - logger.error("Got an error running session.commit(): {}".format(e)) + orm_job.save() for hook in self._hooks: hook.update(job, orm_job, state=state, **kwargs) return job, orm_job @@ -762,9 +624,10 @@ def _update_job(self, job_id, state=None, **kwargs): ) ) - def _get_job_and_orm_job(self, job_id, session): - orm_job = session.query(ORMJob).filter_by(id=job_id).one_or_none() - if orm_job is None: + def _get_job_and_orm_job(self, job_id): + try: + orm_job = ORMJob.objects.get(id=job_id) + except ORMJob.DoesNotExist: raise JobNotFound() job = self._orm_to_job(orm_job) return job, orm_job @@ -831,6 +694,7 @@ def schedule( interval=0, repeat=0, retry_interval=None, + retries=None, max_retries=None, ): """ @@ -852,34 +716,38 @@ def schedule( if not isinstance(job, Job): raise ValueError("Job argument must be a Job object.") - with self.session_scope() as session: - orm_job = session.get(ORMJob, job.job_id) + with transaction.atomic(): + orm_job = ORMJob.objects.filter(id=job.job_id).first() if orm_job and orm_job.state == State.RUNNING: raise JobRunning() job.state = State.QUEUED - orm_job = ORMJob( - id=job.job_id, - state=job.state, - func=job.func, - priority=priority, - queue=queue, - interval=interval, - repeat=repeat, - retry_interval=retry_interval, - max_retries=max_retries, - scheduled_time=naive_utc_datetime(dt), - saved_job=job.to_json(), - ) - session.merge(orm_job) - try: - session.commit() - except Exception as e: - logger.error("Got an error running session.commit(): {}".format(e)) + orm_job_data = { + "id": job.job_id, + "state": job.state, + "func": job.func, + "priority": priority, + "queue": queue, + "interval": interval, + "repeat": repeat, + "retry_interval": retry_interval, + "retries": retries, + "max_retries": max_retries, + "scheduled_time": dt, + "saved_job": job.to_json(), + } + + if orm_job: + # Update existing job + for key, value in orm_job_data.items(): + setattr(orm_job, key, value) + orm_job.save() + else: + orm_job = ORMJob.objects.create(**orm_job_data) self._run_scheduled_hooks(orm_job) - return job.job_id + return job.job_id def _run_scheduled_hooks(self, orm_job): job = self._orm_to_job(orm_job) diff --git a/kolibri/core/tasks/worker.py b/kolibri/core/tasks/worker.py index 252792ff1e4..6e1797e65b0 100644 --- a/kolibri/core/tasks/worker.py +++ b/kolibri/core/tasks/worker.py @@ -5,7 +5,6 @@ from kolibri.core.tasks.constants import Priority from kolibri.core.tasks.storage import Storage -from kolibri.core.tasks.utils import db_connection from kolibri.core.tasks.utils import InfiniteLoopThread from kolibri.utils.multiprocessing_compat import PoolExecutor @@ -25,9 +24,7 @@ def execute_job( :return: None """ - connection = db_connection() - - storage = Storage(connection) + storage = Storage() job = storage.get_job(job_id) @@ -35,8 +32,6 @@ def execute_job( job.execute() - connection.dispose() - # Close any django connections opened here django_connection.close() @@ -60,7 +55,7 @@ def execute_job_with_python_worker(job_id, log_queue=None): class Worker(object): - def __init__(self, connection, regular_workers=2, high_workers=1, log_queue=None): + def __init__(self, regular_workers=2, high_workers=1, log_queue=None): # Internally, we use concurrent.future.Future to run and track # job executions. We need to keep track of which future maps to which # job they were made from, and we use the job_future_mapping dict to do @@ -72,7 +67,7 @@ def __init__(self, connection, regular_workers=2, high_workers=1, log_queue=None # Key: job_id, Value: future object self.future_job_mapping = {} - self.storage = Storage(connection) + self.storage = Storage() self.requeue_stalled_jobs() diff --git a/kolibri/deployment/default/settings/base.py b/kolibri/deployment/default/settings/base.py index bd62f914bb4..8727fdecbb5 100644 --- a/kolibri/deployment/default/settings/base.py +++ b/kolibri/deployment/default/settings/base.py @@ -162,10 +162,14 @@ "ENGINE": "django.db.backends.sqlite3", "NAME": job_storage_path, "OPTIONS": {"timeout": 100}, + "TEST": { + "NAME": os.path.join(conf.KOLIBRI_HOME, "test_job_storage.sqlite3") + }, }, } for additional_db in ADDITIONAL_SQLITE_DATABASES: + # TODO ADD JOB_STORAGE to ADDITIONAL_SQLITE_DATABASES and check it here DATABASES[additional_db] = { "ENGINE": "kolibri.deployment.default.db.backends.sqlite3", "NAME": os.path.join(conf.KOLIBRI_HOME, "{}.sqlite3".format(additional_db)), diff --git a/kolibri/utils/sanity_checks.py b/kolibri/utils/sanity_checks.py index dbd6cf451b9..98998f8116f 100644 --- a/kolibri/utils/sanity_checks.py +++ b/kolibri/utils/sanity_checks.py @@ -6,13 +6,10 @@ from django.apps import apps from django.db.utils import OperationalError from django.db.utils import ProgrammingError -from sqlalchemy.exc import OperationalError as SQLAlchemyOperationalError -from sqlalchemy.exc import ProgrammingError as SQLAlchemyProgrammingError from .conf import KOLIBRI_HOME from .conf import OPTIONS from .options import generate_empty_options_file -from kolibri.utils.sql_alchemy import DBSchemaError logger = logging.getLogger(__name__) @@ -119,24 +116,6 @@ def check_database_is_migrated(): raise DatabaseInaccessible(db_exception=e) -def ensure_job_tables_created(): - """ - @deprecated: This is no longer needed as of Kolibri 0.19, because we - now use django migrations to create the jobs table. - """ - from kolibri.core.tasks.main import job_storage - from kolibri.core.tasks.main import connection - from kolibri.core.tasks.storage import Storage - - try: - job_storage.test_table_readable() - except (SQLAlchemyOperationalError, SQLAlchemyProgrammingError, DBSchemaError): - logger.warning("Database table for job storage was not accessible, recreating.") - Storage.recreate_default_tables(connection) - except Exception as e: - raise DatabaseInaccessible(db_exception=e) - - def check_default_options_exist(): options_path = os.path.join(KOLIBRI_HOME, "options.ini") if not os.path.exists(options_path): From 82167f7b625a4ba1af5c73cfdcf425f1f4de5b62 Mon Sep 17 00:00:00 2001 From: Alex Velez Date: Fri, 24 Oct 2025 15:10:32 -0500 Subject: [PATCH 08/13] Migrate tests and add tests to retry_on param --- kolibri/conftest.py | 13 -- kolibri/core/auth/test/test_auth_tasks.py | 19 +- kolibri/core/tasks/test/base.py | 21 --- .../tasks/test/taskrunner/test_job_running.py | 14 +- .../tasks/test/taskrunner/test_scheduler.py | 69 ++++--- .../tasks/test/taskrunner/test_storage.py | 170 ++++++++++++++++-- .../core/tasks/test/taskrunner/test_worker.py | 17 +- kolibri/core/tasks/test/test_api.py | 40 ++--- kolibri/core/tasks/test/test_job.py | 66 ------- kolibri/core/tasks/test/test_no_connection.py | 4 + kolibri/utils/tests/test_sanity_check.py | 20 --- kolibri/utils/tests/test_server.py | 11 +- 12 files changed, 224 insertions(+), 240 deletions(-) delete mode 100644 kolibri/core/tasks/test/base.py diff --git a/kolibri/conftest.py b/kolibri/conftest.py index ab843ecd57d..0f06da472bb 100644 --- a/kolibri/conftest.py +++ b/kolibri/conftest.py @@ -7,19 +7,6 @@ TEMP_KOLIBRI_HOME = "./.pytest_kolibri_home" -@pytest.fixture(scope="session") -def django_db_setup( - request, - django_db_setup, -): - def dispose_sqlalchemy(): - from kolibri.core.tasks.main import connection - - connection.dispose() - - request.addfinalizer(dispose_sqlalchemy) - - @pytest.fixture(scope="session", autouse=True) def global_fixture(): if not os.path.exists(TEMP_KOLIBRI_HOME): diff --git a/kolibri/core/auth/test/test_auth_tasks.py b/kolibri/core/auth/test/test_auth_tasks.py index b907e9e3706..938e4518045 100644 --- a/kolibri/core/auth/test/test_auth_tasks.py +++ b/kolibri/core/auth/test/test_auth_tasks.py @@ -37,7 +37,6 @@ from kolibri.core.tasks.exceptions import JobRunning from kolibri.core.tasks.job import Job from kolibri.core.tasks.job import State -from kolibri.utils.time_utils import naive_utc_datetime DUMMY_PASSWORD = "password" @@ -64,7 +63,7 @@ def fake_job(**kwargs): class dummy_orm_job_data(object): - scheduled_time = datetime.datetime(year=2023, month=1, day=1, tzinfo=None) + scheduled_time = datetime.datetime(year=2023, month=1, day=1) repeat = 5 interval = 8600 retry_interval = 5 @@ -701,7 +700,7 @@ def test_enqueue_soud_sync_processing__future__scheduled( mock_soud.get_time_to_next_attempt.return_value = datetime.timedelta(seconds=30) mock_job = mock_job_storage.get_orm_job.return_value mock_job.state = State.QUEUED - mock_job.scheduled_time = naive_utc_datetime(timezone.now()) + mock_job.scheduled_time = timezone.now() enqueue_soud_sync_processing() mock_task.enqueue_in.assert_not_called() @@ -714,7 +713,7 @@ def test_enqueue_soud_sync_processing__future__running( mock_soud.get_time_to_next_attempt.return_value = datetime.timedelta(seconds=1) mock_job = mock_job_storage.get_orm_job.return_value mock_job.state = State.RUNNING - mock_job.scheduled_time = naive_utc_datetime(timezone.now()) + mock_job.scheduled_time = timezone.now() enqueue_soud_sync_processing() mock_task.enqueue_in.assert_not_called() @@ -727,9 +726,7 @@ def test_enqueue_soud_sync_processing__future__reschedule( mock_soud.get_time_to_next_attempt.return_value = datetime.timedelta(seconds=10) mock_job = mock_job_storage.get_orm_job.return_value mock_job.state = State.QUEUED - mock_job.scheduled_time = naive_utc_datetime( - timezone.now() + datetime.timedelta(seconds=15) - ) + mock_job.scheduled_time = timezone.now() + datetime.timedelta(seconds=15) enqueue_soud_sync_processing() mock_task.enqueue_in.assert_called_once_with(datetime.timedelta(seconds=10)) @@ -743,9 +740,7 @@ def test_enqueue_soud_sync_processing__completed__enqueue( mock_job = mock_job_storage.get_orm_job.return_value mock_job.state = State.COMPLETED # far in the past - mock_job.scheduled_time = naive_utc_datetime( - timezone.now() - datetime.timedelta(seconds=100) - ) + mock_job.scheduled_time = timezone.now() - datetime.timedelta(seconds=100) enqueue_soud_sync_processing() mock_task.enqueue_in.assert_called_once_with(datetime.timedelta(seconds=10)) @@ -759,9 +754,7 @@ def test_enqueue_soud_sync_processing__race__already_running( mock_job = mock_job_storage.get_orm_job.return_value mock_job.state = State.COMPLETED # far in the past - mock_job.scheduled_time = naive_utc_datetime( - timezone.now() - datetime.timedelta(seconds=100) - ) + mock_job.scheduled_time = timezone.now() - datetime.timedelta(seconds=100) mock_task.enqueue_in.side_effect = JobRunning() enqueue_soud_sync_processing() mock_task.enqueue_in.assert_called_once_with(datetime.timedelta(seconds=10)) diff --git a/kolibri/core/tasks/test/base.py b/kolibri/core/tasks/test/base.py deleted file mode 100644 index 05f387e4c43..00000000000 --- a/kolibri/core/tasks/test/base.py +++ /dev/null @@ -1,21 +0,0 @@ -import os -import tempfile -from contextlib import contextmanager - -from kolibri.core.tasks.utils import db_connection -from kolibri.utils.tests.helpers import override_option - - -@contextmanager -def connection(): - fd, filepath = tempfile.mkstemp() - with override_option("Tasks", "JOB_STORAGE_FILEPATH", filepath): - engine = db_connection() - yield engine - engine.dispose() - os.close(fd) - try: - os.remove(filepath) - except OSError: - # Don't fail test because of difficulty cleaning up. - pass diff --git a/kolibri/core/tasks/test/taskrunner/test_job_running.py b/kolibri/core/tasks/test/taskrunner/test_job_running.py index b5c6a7c640c..0892ce7888b 100644 --- a/kolibri/core/tasks/test/taskrunner/test_job_running.py +++ b/kolibri/core/tasks/test/taskrunner/test_job_running.py @@ -6,8 +6,6 @@ from kolibri.core.tasks.exceptions import JobNotFound from kolibri.core.tasks.job import Job from kolibri.core.tasks.job import State -from kolibri.core.tasks.storage import Storage -from kolibri.core.tasks.test.base import connection from kolibri.core.tasks.utils import callable_to_import_path from kolibri.core.tasks.utils import get_current_job from kolibri.core.tasks.utils import import_path_to_callable @@ -17,12 +15,11 @@ @pytest.fixture def storage_fixture(): - with connection() as conn: - e = Worker(connection=conn) - b = Storage(conn) - b.clear(force=True) - yield b - e.shutdown() + e = Worker() + b = e.storage + b.clear(force=True) + yield b + e.shutdown() @pytest.fixture @@ -162,6 +159,7 @@ def update_progress_cancelable_job(): return +@pytest.mark.django_db(databases="__all__", transaction=True) class TestJobStorage(object): def test_does_not_enqueue_a_function(self, storage_fixture): try: diff --git a/kolibri/core/tasks/test/taskrunner/test_scheduler.py b/kolibri/core/tasks/test/taskrunner/test_scheduler.py index 3d4b0156679..c7d9735075c 100644 --- a/kolibri/core/tasks/test/taskrunner/test_scheduler.py +++ b/kolibri/core/tasks/test/taskrunner/test_scheduler.py @@ -2,27 +2,31 @@ import pytest +from kolibri.core.tasks.decorators import register_task from kolibri.core.tasks.exceptions import JobRunning from kolibri.core.tasks.job import Job from kolibri.core.tasks.storage import Storage -from kolibri.core.tasks.test.base import connection from kolibri.utils.time_utils import local_now -from kolibri.utils.time_utils import naive_utc_datetime @pytest.fixture def job_storage(): - with connection() as c: - s = Storage(connection=c) - s.clear(force=True) - yield s - s.clear(force=True) + s = Storage() + s.clear(force=True) + yield s + s.clear(force=True) +@register_task +def add(x, y): + return x + y + + +@pytest.mark.django_db(databases="__all__") class TestScheduler(object): @pytest.fixture def job(self): - return Job(id) + return Job(add) def test_enqueue_at_a_function(self, job_storage, job): job_id = job_storage.enqueue_at(local_now(), job) @@ -34,10 +38,9 @@ def test_enqueue_at_a_function_sets_time(self, job_storage, job): now = local_now() job_id = job_storage.enqueue_at(now, job) - with job_storage.session_scope() as session: - _, scheduled_job = job_storage._get_job_and_orm_job(job_id, session) - scheduled_time = scheduled_job.scheduled_time - assert scheduled_time == naive_utc_datetime(now) + _, scheduled_job = job_storage._get_job_and_orm_job(job_id) + scheduled_time = scheduled_job.scheduled_time + assert scheduled_time == now def test_enqueue_at_preserves_extra_metadata(self, job_storage, job): metadata = {"saved": True} @@ -59,19 +62,17 @@ def test_enqueue_in_a_function_sets_time(self, job_storage, job): job_storage._now = lambda: now job_id = job_storage.enqueue_in(diff, job) - with job_storage.session_scope() as session: - _, scheduled_job = job_storage._get_job_and_orm_job(job_id, session) - scheduled_time = scheduled_job.scheduled_time - assert scheduled_time == naive_utc_datetime(now) + diff + _, scheduled_job = job_storage._get_job_and_orm_job(job_id) + scheduled_time = scheduled_job.scheduled_time + assert scheduled_time == now + diff def test_schedule_a_function_sets_time(self, job_storage, job): now = local_now() job_id = job_storage.schedule(now, job) - with job_storage.session_scope() as session: - _, scheduled_job = job_storage._get_job_and_orm_job(job_id, session) - scheduled_time = scheduled_job.scheduled_time - assert scheduled_time == naive_utc_datetime(now) + _, scheduled_job = job_storage._get_job_and_orm_job(job_id) + scheduled_time = scheduled_job.scheduled_time + assert scheduled_time == now def test_schedule_a_function_gives_value_error_without_datetime( self, job_storage, job @@ -112,9 +113,8 @@ def test_scheduled_repeating_function_sets_endless_repeat_new_job( job_id = job_storage.schedule(now, job, interval=1000, repeat=None) job_storage.complete_job(job_id) job_storage.reschedule_finished_job_if_needed(job_id) - with job_storage.session_scope() as session: - _, scheduled_job = job_storage._get_job_and_orm_job(job_id, session) - repeat = scheduled_job.repeat + _, scheduled_job = job_storage._get_job_and_orm_job(job_id) + repeat = scheduled_job.repeat assert repeat is None def test_scheduled_repeating_function_enqueues_job(self, job_storage, job): @@ -131,9 +131,8 @@ def test_scheduled_repeating_function_sets_new_job_with_one_fewer_repeats( job_id = job_storage.schedule(now, job, interval=1000, repeat=1) job_storage.complete_job(job_id) job_storage.reschedule_finished_job_if_needed(job_id) - with job_storage.session_scope() as session: - _, scheduled_job = job_storage._get_job_and_orm_job(job_id, session) - repeat = scheduled_job.repeat + _, scheduled_job = job_storage._get_job_and_orm_job(job_id) + repeat = scheduled_job.repeat assert repeat == 0 def test_scheduled_repeating_function_sets_new_job_at_interval( @@ -144,12 +143,9 @@ def test_scheduled_repeating_function_sets_new_job_at_interval( job_storage._now = lambda: now job_storage.complete_job(job_id) job_storage.reschedule_finished_job_if_needed(job_id) - with job_storage.session_scope() as session: - _, scheduled_job = job_storage._get_job_and_orm_job(job_id, session) - scheduled_time = scheduled_job.scheduled_time - assert scheduled_time == naive_utc_datetime(now) + datetime.timedelta( - seconds=1000 - ) + _, scheduled_job = job_storage._get_job_and_orm_job(job_id) + scheduled_time = scheduled_job.scheduled_time + assert scheduled_time == now + datetime.timedelta(seconds=1000) def test_scheduled_repeating_function_failure_sets_new_job_at_retry_interval( self, job_storage, job @@ -161,17 +157,16 @@ def test_scheduled_repeating_function_failure_sets_new_job_at_retry_interval( job_storage._now = lambda: now job_storage.mark_job_as_failed(job_id, "Exception", "Traceback") job_storage.reschedule_finished_job_if_needed(job_id) - with job_storage.session_scope() as session: - _, scheduled_job = job_storage._get_job_and_orm_job(job_id, session) - scheduled_time = scheduled_job.scheduled_time - assert scheduled_time == naive_utc_datetime(now) + datetime.timedelta(seconds=5) + _, scheduled_job = job_storage._get_job_and_orm_job(job_id) + scheduled_time = scheduled_job.scheduled_time + assert scheduled_time == now + datetime.timedelta(seconds=5) class TestReschedule(TestScheduler): @pytest.fixture def job(self, job_storage): now = local_now() - job_id = job_storage.schedule(now, Job(id), interval=1, repeat=123) + job_id = job_storage.schedule(now, Job(add), interval=1, repeat=123) return job_storage.get_job(job_id) def test_reschedule_a_function_gives_job_running_error(self, job_storage, job): diff --git a/kolibri/core/tasks/test/taskrunner/test_storage.py b/kolibri/core/tasks/test/taskrunner/test_storage.py index 873ab439b0e..76155a96565 100644 --- a/kolibri/core/tasks/test/taskrunner/test_storage.py +++ b/kolibri/core/tasks/test/taskrunner/test_storage.py @@ -5,6 +5,7 @@ import pytest import pytz from mock import patch +from requests.exceptions import HTTPError from kolibri.core.tasks.constants import DEFAULT_QUEUE from kolibri.core.tasks.constants import Priority @@ -14,7 +15,6 @@ from kolibri.core.tasks.job import State from kolibri.core.tasks.registry import TaskRegistry from kolibri.core.tasks.storage import Storage -from kolibri.core.tasks.test.base import connection from kolibri.core.tasks.utils import callable_to_import_path from kolibri.utils.time_utils import local_now @@ -24,38 +24,44 @@ @pytest.fixture def defaultbackend(): - with connection() as c: - b = Storage(c) - b.clear(force=True) - yield b - b.clear(force=True) + b = Storage() + b.clear(force=True) + yield b + b.clear(force=True) -@pytest.fixture -def func(): - @register_task - def add(x, y): - return x + y +@register_task( + retry_on=[ValueError, TypeError], +) +def add(x, y): + return x + y - TaskRegistry["kolibri.core.tasks.test.taskrunner.test_storage.add"] = add - yield add - TaskRegistry.clear() +@pytest.fixture(autouse=True) +def register_add_task(): + # register before tests + TaskRegistry[callable_to_import_path(add)] = add + try: + yield + finally: + # clear after tests + TaskRegistry.clear() @pytest.fixture -def simplejob(func): - return Job(func) +def simplejob(): + return Job(add) +@pytest.mark.django_db(databases="__all__") class TestBackend: - def test_can_enqueue_single_job(self, defaultbackend, simplejob, func): + def test_can_enqueue_single_job(self, defaultbackend, simplejob): job_id = defaultbackend.enqueue_job(simplejob, QUEUE) new_job = defaultbackend.get_job(job_id) # Does the returned job record the function we set to run? - assert str(new_job.func) == callable_to_import_path(func) + assert str(new_job.func) == callable_to_import_path(add) # Does the job have the right state (QUEUED)? assert new_job.state == State.QUEUED @@ -310,6 +316,134 @@ def test_can_reschedule_finished_job(self, defaultbackend, simplejob): assert requeued_job.state == State.QUEUED assert requeued_orm_job.scheduled_time > previous_scheduled_time + def test_job_retry_on_matching_exception(self, defaultbackend, simplejob): + exception = ValueError("Error") + job_id = defaultbackend.enqueue_job( + simplejob, QUEUE, retry_interval=5, max_retries=3 + ) + defaultbackend.mark_job_as_failed(job_id, exception, "Traceback") + + orm_job = defaultbackend.get_orm_job(job_id) + previous_scheduled_time = orm_job.scheduled_time + + defaultbackend.reschedule_finished_job_if_needed( + simplejob.job_id, exception=exception + ) + requeued_orm_job = defaultbackend.get_orm_job(job_id) + requeued_job = defaultbackend.get_job(job_id) + + assert requeued_job.state == State.QUEUED + assert requeued_orm_job.scheduled_time > previous_scheduled_time + assert requeued_orm_job.retries == 1 + + def test_job_retry_on_matching_exception__no_max_retries( + self, defaultbackend, simplejob + ): + exception = ValueError("Error") + job_id = defaultbackend.enqueue_job(simplejob, QUEUE, retry_interval=5) + defaultbackend.mark_job_as_failed(job_id, exception, "Traceback") + + orm_job = defaultbackend.get_orm_job(job_id) + previous_scheduled_time = orm_job.scheduled_time + + defaultbackend.reschedule_finished_job_if_needed( + simplejob.job_id, exception=exception + ) + requeued_orm_job = defaultbackend.get_orm_job(job_id) + requeued_job = defaultbackend.get_job(job_id) + + assert requeued_job.state == State.QUEUED + assert requeued_orm_job.scheduled_time > previous_scheduled_time + assert requeued_orm_job.retries == 1 + + def test_job_retry_on_matching_exception__no_retry_interval( + self, defaultbackend, simplejob + ): + exception = TypeError("Error") + job_id = defaultbackend.enqueue_job(simplejob, QUEUE, max_retries=3) + defaultbackend.mark_job_as_failed(job_id, exception, "Traceback") + + orm_job = defaultbackend.get_orm_job(job_id) + previous_scheduled_time = orm_job.scheduled_time + + defaultbackend.reschedule_finished_job_if_needed( + simplejob.job_id, exception=exception + ) + requeued_orm_job = defaultbackend.get_orm_job(job_id) + requeued_job = defaultbackend.get_job(job_id) + + assert requeued_job.state == State.QUEUED + assert requeued_orm_job.scheduled_time > previous_scheduled_time + assert requeued_orm_job.retries == 1 + + def test_job_not_retry_on_matching_exception__no_retry_params( + self, defaultbackend, simplejob + ): + # If job has no retry params, it should not retry even if exception matches + exception = ValueError("Error") + job_id = defaultbackend.enqueue_job(simplejob, QUEUE) + defaultbackend.mark_job_as_failed(job_id, exception, "Traceback") + + orm_job = defaultbackend.get_orm_job(job_id) + previous_scheduled_time = orm_job.scheduled_time + + defaultbackend.reschedule_finished_job_if_needed( + simplejob.job_id, exception=exception + ) + requeued_orm_job = defaultbackend.get_orm_job(job_id) + requeued_job = defaultbackend.get_job(job_id) + + assert requeued_job.state == State.FAILED + assert requeued_orm_job.scheduled_time == previous_scheduled_time + assert requeued_orm_job.retries is None + + def test_job_not_retry_on_non_matching_exception(self, defaultbackend, simplejob): + exception = HTTPError("Error") + job_id = defaultbackend.enqueue_job( + simplejob, QUEUE, retry_interval=5, max_retries=3 + ) + defaultbackend.mark_job_as_failed(job_id, exception, "Traceback") + + orm_job = defaultbackend.get_orm_job(job_id) + previous_scheduled_time = orm_job.scheduled_time + + defaultbackend.reschedule_finished_job_if_needed( + simplejob.job_id, exception=exception + ) + requeued_orm_job = defaultbackend.get_orm_job(job_id) + requeued_job = defaultbackend.get_job(job_id) + + assert requeued_job.state == State.FAILED + assert requeued_orm_job.scheduled_time == previous_scheduled_time + assert requeued_orm_job.retries is None + + def test_job_not_retry_on_limit_max_retries(self, defaultbackend, simplejob): + exception = ValueError("Error") + job_id = defaultbackend.enqueue_job(simplejob, QUEUE, max_retries=1) + defaultbackend.mark_job_as_failed(job_id, exception, "Traceback") + + # Retry first time + defaultbackend.reschedule_finished_job_if_needed( + simplejob.job_id, exception=exception + ) + defaultbackend.mark_job_as_failed(job_id, exception, "Traceback") + + orm_job = defaultbackend.get_orm_job(job_id) + previous_scheduled_time = orm_job.scheduled_time + + # When trying to retry second time, it should not retry as max_retries is reached + defaultbackend.reschedule_finished_job_if_needed( + simplejob.job_id, exception=exception + ) + + requeued_orm_job = defaultbackend.get_orm_job(job_id) + requeued_job = defaultbackend.get_job(job_id) + + retries = requeued_orm_job.retries + assert requeued_job.state == State.FAILED + assert requeued_orm_job.scheduled_time == previous_scheduled_time + assert retries == 1 + def test_reschedule_finished_job_canceled(self, defaultbackend, simplejob): # Test case where the job is canceled. job_id = defaultbackend.enqueue_job(simplejob, QUEUE) diff --git a/kolibri/core/tasks/test/taskrunner/test_worker.py b/kolibri/core/tasks/test/taskrunner/test_worker.py index 49d10e0d9dc..965ea0d64ae 100644 --- a/kolibri/core/tasks/test/taskrunner/test_worker.py +++ b/kolibri/core/tasks/test/taskrunner/test_worker.py @@ -6,7 +6,6 @@ from kolibri.core.tasks.constants import Priority from kolibri.core.tasks.job import Job from kolibri.core.tasks.job import State -from kolibri.core.tasks.test.base import connection from kolibri.core.tasks.test.taskrunner.test_job_running import EventProxy from kolibri.core.tasks.worker import Worker from kolibri.utils import conf @@ -42,14 +41,14 @@ def toggle_flag(flag_id): @pytest.fixture def worker(): - with connection() as c: - b = Worker(c, regular_workers=1, high_workers=1) - b.storage.clear(force=True) - yield b - b.storage.clear(force=True) - b.shutdown() + b = Worker(regular_workers=1, high_workers=1) + b.storage.clear(force=True) + yield b + b.storage.clear(force=True) + b.shutdown() +@pytest.mark.django_db(databases="__all__", transaction=True) def test_keyerror_prevention(worker): # Create a job with the same ID as the one in worker.enqueue_job_runs_job job = Job(id, args=(9,)) @@ -64,6 +63,7 @@ def test_keyerror_prevention(worker): assert job.state == "COMPLETED" +@pytest.mark.django_db(databases="__all__", transaction=True) def test_keyerror_prevention_multiple_jobs(worker): # Create multiple jobs with the same ID to trigger the race condition job1 = Job(id, args=(9,)) @@ -91,6 +91,7 @@ def test_keyerror_prevention_multiple_jobs(worker): assert job2.state == "COMPLETED" +@pytest.mark.django_db(databases="__all__", transaction=True) class TestWorker: def test_enqueue_job_runs_job(self, worker): job = Job(id, args=(9,)) @@ -106,7 +107,7 @@ def test_enqueue_job_runs_job_once(self, worker, flag): # Do conditional check in here, as it seems to not work properly # inside a pytest.mark.skipIf if conf.OPTIONS["Database"]["DATABASE_ENGINE"] == "postgres": - b = Worker(worker.storage.engine, regular_workers=1, high_workers=1) + b = Worker(regular_workers=1, high_workers=1) job = Job(toggle_flag, args=(flag.event_id,)) worker.storage.enqueue_job(job, QUEUE) diff --git a/kolibri/core/tasks/test/test_api.py b/kolibri/core/tasks/test/test_api.py index 7e36483f27f..ede5c3569a7 100644 --- a/kolibri/core/tasks/test/test_api.py +++ b/kolibri/core/tasks/test/test_api.py @@ -2,11 +2,9 @@ import pytz from django.urls import reverse -from django.utils.timezone import make_aware from mock import call from mock import Mock from mock import patch -from pytz import utc from rest_framework import serializers from rest_framework import status from rest_framework.test import APIClient @@ -53,7 +51,7 @@ def fake_job(**kwargs): class dummy_orm_job_data(object): - scheduled_time = datetime.datetime(year=2023, month=1, day=1, tzinfo=None) + scheduled_time = datetime.datetime(year=2023, month=1, day=1) repeat = 5 interval = 8600 retry_interval = 5 @@ -281,9 +279,7 @@ def add(x, y): "kwargs": {}, "extra_metadata": {}, "facility_id": None, - "scheduled_datetime": make_aware( - dummy_orm_job_data.scheduled_time, utc - ).isoformat(), + "scheduled_datetime": dummy_orm_job_data.scheduled_time.isoformat(), "repeat": dummy_orm_job_data.repeat, "repeat_interval": dummy_orm_job_data.interval, "retry_interval": dummy_orm_job_data.retry_interval, @@ -342,9 +338,7 @@ def add(**kwargs): "kwargs": {}, "extra_metadata": {}, "facility_id": None, - "scheduled_datetime": make_aware( - dummy_orm_job_data.scheduled_time, utc - ).isoformat(), + "scheduled_datetime": dummy_orm_job_data.scheduled_time.isoformat(), "repeat": dummy_orm_job_data.repeat, "repeat_interval": dummy_orm_job_data.interval, "retry_interval": dummy_orm_job_data.retry_interval, @@ -362,9 +356,7 @@ def add(**kwargs): "kwargs": {}, "extra_metadata": {}, "facility_id": None, - "scheduled_datetime": make_aware( - dummy_orm_job_data.scheduled_time, utc - ).isoformat(), + "scheduled_datetime": dummy_orm_job_data.scheduled_time.isoformat(), "repeat": dummy_orm_job_data.repeat, "repeat_interval": dummy_orm_job_data.interval, "retry_interval": dummy_orm_job_data.retry_interval, @@ -448,9 +440,7 @@ def add(**kwargs): "extra_metadata": { "facility": "kolibri HQ", }, - "scheduled_datetime": make_aware( - dummy_orm_job_data.scheduled_time, utc - ).isoformat(), + "scheduled_datetime": dummy_orm_job_data.scheduled_time.isoformat(), "repeat": dummy_orm_job_data.repeat, "repeat_interval": dummy_orm_job_data.interval, "retry_interval": dummy_orm_job_data.retry_interval, @@ -542,9 +532,7 @@ def add(x, y): "extra_metadata": { "facility": "kolibri HQ", }, - "scheduled_datetime": make_aware( - dummy_orm_job_data.scheduled_time, utc - ).isoformat(), + "scheduled_datetime": dummy_orm_job_data.scheduled_time.isoformat(), "repeat": dummy_orm_job_data.repeat, "repeat_interval": dummy_orm_job_data.interval, "retry_interval": dummy_orm_job_data.retry_interval, @@ -564,9 +552,7 @@ def add(x, y): "extra_metadata": { "facility": "kolibri HQ", }, - "scheduled_datetime": make_aware( - dummy_orm_job_data.scheduled_time, utc - ).isoformat(), + "scheduled_datetime": dummy_orm_job_data.scheduled_time.isoformat(), "repeat": dummy_orm_job_data.repeat, "repeat_interval": dummy_orm_job_data.interval, "retry_interval": dummy_orm_job_data.retry_interval, @@ -1230,9 +1216,7 @@ def subtract(x, y): "args": (), "kwargs": {}, "extra_metadata": {}, - "scheduled_datetime": make_aware( - dummy_orm_job_data.scheduled_time, utc - ).isoformat(), + "scheduled_datetime": dummy_orm_job_data.scheduled_time.isoformat(), "repeat": dummy_orm_job_data.repeat, "repeat_interval": dummy_orm_job_data.interval, "retry_interval": dummy_orm_job_data.retry_interval, @@ -1250,9 +1234,7 @@ def subtract(x, y): "args": (), "kwargs": {}, "extra_metadata": {}, - "scheduled_datetime": make_aware( - dummy_orm_job_data.scheduled_time, utc - ).isoformat(), + "scheduled_datetime": dummy_orm_job_data.scheduled_time.isoformat(), "repeat": dummy_orm_job_data.repeat, "repeat_interval": dummy_orm_job_data.interval, "retry_interval": dummy_orm_job_data.retry_interval, @@ -1270,9 +1252,7 @@ def subtract(x, y): "args": (), "kwargs": {}, "extra_metadata": {}, - "scheduled_datetime": make_aware( - dummy_orm_job_data.scheduled_time, utc - ).isoformat(), + "scheduled_datetime": dummy_orm_job_data.scheduled_time.isoformat(), "repeat": dummy_orm_job_data.repeat, "repeat_interval": dummy_orm_job_data.interval, "retry_interval": dummy_orm_job_data.retry_interval, diff --git a/kolibri/core/tasks/test/test_job.py b/kolibri/core/tasks/test/test_job.py index 848f5a7ad0e..3d275426c93 100644 --- a/kolibri/core/tasks/test/test_job.py +++ b/kolibri/core/tasks/test/test_job.py @@ -8,7 +8,6 @@ from kolibri.core.tasks.constants import Priority from kolibri.core.tasks.exceptions import JobNotRunning from kolibri.core.tasks.job import Job -from kolibri.core.tasks.job import MAX_RETRIES from kolibri.core.tasks.permissions import IsSuperAdmin from kolibri.core.tasks.registry import RegisteredTask from kolibri.core.tasks.utils import current_state_tracker @@ -187,70 +186,6 @@ def test_job_retry_in_all_allowable_values(self): except Exception: setattr(current_state_tracker, "job", None) - def test_job_retry_on_matching_exception(self): - # The task raises an HTTPError, which is in the retry_on list, so it should be rescheduled - job = Job(fn_with_http_error, retry_on=[HTTPError]) - job.storage = mock.MagicMock() - setattr(current_state_tracker, "job", job) - - job.execute() - - job.storage.reschedule_finished_job_if_needed.assert_called_once() - - # If delay was set to the reschedule call, it means it will be retried - args, kwargs = job.storage.reschedule_finished_job_if_needed.call_args - self.assertEqual(args[0], job.job_id) - self.assertEqual(job.extra_metadata.get("retries"), 1) - self.assertIsNotNone(kwargs.get("delay")) - - setattr(current_state_tracker, "job", None) - - def test_job_retry_on_max_retries_not_exceeded(self): - job = Job(fn_with_http_error, retry_on=[HTTPError]) - job.storage = mock.MagicMock() - job.extra_metadata["retries"] = MAX_RETRIES - 1 # Still allowed to retry - setattr(current_state_tracker, "job", job) - - job.execute() - - job.storage.reschedule_finished_job_if_needed.assert_called_once() - args, kwargs = job.storage.reschedule_finished_job_if_needed.call_args - self.assertEqual(args[0], job.job_id) - self.assertEqual(job.extra_metadata.get("retries"), MAX_RETRIES) - self.assertIsNotNone(kwargs.get("delay")) - - setattr(current_state_tracker, "job", None) - - def test_job_retry_on_max_retries_exceeded(self): - job = Job(fn_with_http_error, retry_on=[HTTPError]) - job.storage = mock.MagicMock() - job.extra_metadata["retries"] = MAX_RETRIES # Already reached max retries - setattr(current_state_tracker, "job", job) - - job.execute() - - job.storage.reschedule_finished_job_if_needed.assert_called_once() - args, kwargs = job.storage.reschedule_finished_job_if_needed.call_args - self.assertEqual(args[0], job.job_id) - self.assertIsNone(kwargs.get("delay")) # No delay means no retry - - setattr(current_state_tracker, "job", None) - - def test_job_retry_on_non_matching_exception(self): - # The task raises an HTTPError, which is not in the retry_on list, so it should not be rescheduled - job = Job(fn_with_http_error, retry_on=[ValueError]) # Different exception type - job.storage = mock.MagicMock() - setattr(current_state_tracker, "job", job) - - job.execute() - - job.storage.reschedule_finished_job_if_needed.assert_called_once() - args, kwargs = job.storage.reschedule_finished_job_if_needed.call_args - self.assertEqual(args[0], job.job_id) - self.assertIsNone(kwargs.get("delay")) # No delay means no retry - - setattr(current_state_tracker, "job", None) - # Test generated by Claude 3.7 Sonnet and tweaked def test_job_update_progress_throttles_small_updates(self): @@ -472,7 +407,6 @@ def test__ready_job(self, MockJob): track_progress=True, long_running=True, kwargs=dict(base=10), # kwarg that was passed to _ready_job() - retry_on=[], ) # Do we return the job object? diff --git a/kolibri/core/tasks/test/test_no_connection.py b/kolibri/core/tasks/test/test_no_connection.py index 0edddee1292..a2249ef1bec 100644 --- a/kolibri/core/tasks/test/test_no_connection.py +++ b/kolibri/core/tasks/test/test_no_connection.py @@ -1,3 +1,7 @@ +import pytest + + +@pytest.mark.django_db(databases="__all__", transaction=True) def test_importing_job_storage_no_open_connection(): from kolibri.core.tasks.main import job_storage diff --git a/kolibri/utils/tests/test_sanity_check.py b/kolibri/utils/tests/test_sanity_check.py index b288d814e22..41edac1660e 100644 --- a/kolibri/utils/tests/test_sanity_check.py +++ b/kolibri/utils/tests/test_sanity_check.py @@ -4,8 +4,6 @@ from django.db.utils import OperationalError from django.test import TestCase from mock import patch -from sqlalchemy.exc import OperationalError as SQLAlchemyOperationalError -from sqlalchemy.exc import ProgrammingError as SQLAlchemyProgrammingError from kolibri.utils import sanity_checks from kolibri.utils.sanity_checks import DatabaseNotMigrated @@ -56,21 +54,3 @@ def test_check_database_is_migrated(self): get_or_create_current_instance.side_effect = OperationalError("Test") with self.assertRaises(DatabaseNotMigrated): sanity_checks.check_database_is_migrated() - - @patch("kolibri.core.tasks.storage.Storage") - def test_ensure_job_tables_created_operational_error(self, Storage): - with patch("kolibri.core.tasks.main.job_storage") as job_storage: - job_storage.test_table_readable.side_effect = SQLAlchemyOperationalError( - "Test", "", "" - ) - sanity_checks.ensure_job_tables_created() - Storage.recreate_default_tables.assert_called_once() - - @patch("kolibri.core.tasks.storage.Storage") - def test_ensure_job_tables_created_programming_error(self, Storage): - with patch("kolibri.core.tasks.main.job_storage") as job_storage: - job_storage.test_table_readable.side_effect = SQLAlchemyProgrammingError( - "Test", "", "" - ) - sanity_checks.ensure_job_tables_created() - Storage.recreate_default_tables.assert_called_once() diff --git a/kolibri/utils/tests/test_server.py b/kolibri/utils/tests/test_server.py index 53e63836a6f..9708f0c80c5 100755 --- a/kolibri/utils/tests/test_server.py +++ b/kolibri/utils/tests/test_server.py @@ -9,7 +9,6 @@ from kolibri.core.tasks.job import Job from kolibri.core.tasks.storage import Storage -from kolibri.core.tasks.test.base import connection from kolibri.utils import server from kolibri.utils.constants import installation_types @@ -81,11 +80,10 @@ def test_whl(self): @pytest.fixture def job_storage(): - with connection() as c: - s = Storage(connection=c) - s.clear() - yield s - s.clear() + s = Storage() + s.clear() + yield s + s.clear() class TestServerServices(object): @@ -128,6 +126,7 @@ def test_services_shutdown_on_stop(self): ] +@pytest.mark.django_db(databases="__all__", transaction=True) class TestServerDefaultScheduledTasks(object): @mock.patch("kolibri.core.discovery.utils.network.broadcast.KolibriBroadcast") def test_scheduled_jobs_persist_on_restart( From c6c00e484285d193369b6350f57a8a6592eb940d Mon Sep 17 00:00:00 2001 From: Alex Velez Date: Tue, 9 Dec 2025 21:43:20 -0500 Subject: [PATCH 09/13] Use KolibriModelRouter to define the KolibriTasksRouter class --- kolibri/core/tasks/models.py | 55 ++++++++---------------------------- 1 file changed, 11 insertions(+), 44 deletions(-) diff --git a/kolibri/core/tasks/models.py b/kolibri/core/tasks/models.py index 35ac0539f17..1cb0a6eb197 100644 --- a/kolibri/core/tasks/models.py +++ b/kolibri/core/tasks/models.py @@ -1,52 +1,9 @@ from django.db import models +from kolibri.core.utils.model_router import KolibriModelRouter from kolibri.deployment.default.sqlite_db_names import JOB_STORAGE -class KolibriTasksRouter(object): - """ - Determine how to route database calls for the kolibritasks app. - All other models will be routed to the default database. - """ - - def db_for_read(self, model, **hints): - """Send all read operations on kolibritasks app models to JOB_STORAGE.""" - if model._meta.app_label == "kolibritasks": - return JOB_STORAGE - return None - - def db_for_write(self, model, **hints): - """Send all write operations on kolibritasks app models to JOB_STORAGE.""" - if model._meta.app_label == "kolibritasks": - return JOB_STORAGE - return None - - def allow_relation(self, obj1, obj2, **hints): - """Determine if relationship is allowed between two objects.""" - - if ( - obj1._meta.app_label == "kolibritasks" - and obj2._meta.app_label == "kolibritasks" - ): - return True - elif "kolibritasks" not in [obj1._meta.app_label, obj2._meta.app_label]: - return None - - return False - - def allow_migrate(self, db, app_label, model_name=None, **hints): - """Ensure that the kolibritasks app's models get created on the right database.""" - if app_label == "kolibritasks": - # The kolibritasks app should be migrated only on the JOB_STORAGE database. - return db == JOB_STORAGE - elif db == JOB_STORAGE: - # Ensure that all other apps don't get migrated on the JOB_STORAGE database. - return False - - # No opinion for all other scenarios - return None - - # The Job model has been migrated from SqlAlchemy to use django models # generated by Copilot and tweaked class Job(models.Model): @@ -106,3 +63,13 @@ class Meta: def __str__(self): return f"Job {self.id} - {self.func} ({self.state})" + + +class KolibriTasksRouter(KolibriModelRouter): + """ + Determine how to route database calls for the kolibritasks app. + All other models will be routed to the default database. + """ + + MODEL_CLASSES = {Job} + DB_NAME = JOB_STORAGE From b579ed4a2c3655ff7ca43e99648016bd6f65fb0d Mon Sep 17 00:00:00 2001 From: Alex Velez Date: Tue, 9 Dec 2025 21:44:22 -0500 Subject: [PATCH 10/13] Fix jobs tests running on multiple threads --- .../core/content/test/test_content_request.py | 4 ++++ kolibri/core/tasks/storage.py | 17 ++++++++++++----- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/kolibri/core/content/test/test_content_request.py b/kolibri/core/content/test/test_content_request.py index 29146d161b0..6b7d2e15803 100644 --- a/kolibri/core/content/test/test_content_request.py +++ b/kolibri/core/content/test/test_content_request.py @@ -13,6 +13,8 @@ class ContentDownloadRequestSerializerTestCase(TestCase): + databases = "__all__" + @classmethod def setUpTestData(cls): super().setUpTestData() @@ -107,6 +109,8 @@ def test_create_content_download_request__with_invalid_source_instance_id(self): class ContentDownloadRequestViewsetTest(APITestCase): + databases = "__all__" + @classmethod def setUpTestData(cls): super().setUpTestData() diff --git a/kolibri/core/tasks/storage.py b/kolibri/core/tasks/storage.py index 02ac4b61617..cf284898fda 100644 --- a/kolibri/core/tasks/storage.py +++ b/kolibri/core/tasks/storage.py @@ -21,6 +21,7 @@ from kolibri.core.tasks.validation import validate_priority from kolibri.core.tasks.validation import validate_repeat from kolibri.core.tasks.validation import validate_timedelay +from kolibri.deployment.default.sqlite_db_names import JOB_STORAGE from kolibri.utils.time_utils import local_now logger = logging.getLogger(__name__) @@ -50,6 +51,12 @@ def __contains__(self, item): job_id = item.job_id return ORMJob.objects.filter(id=job_id).exists() + def _get_job_database_alias(self): + db_backend = connections[ORMJob.objects.db].vendor + if db_backend == "sqlite": + return JOB_STORAGE + return None # Use default database + def set_sqlite_pragmas(self): """ Sets the connection PRAGMAs for the sqlite database. @@ -203,7 +210,7 @@ def _postgres_next_queued_job(self, priority): of READ_COMMITTED. More details here: https://dba.stackexchange.com/a/69497 """ - with transaction.atomic(): + with transaction.atomic(using=self._get_job_database_alias()): next_job = ( self._filter_next_query(ORMJob.objects.all(), priority) .select_for_update(skip_locked=True) @@ -371,7 +378,7 @@ def clear(self, queue=None, job_id=None, force=False): :type force: bool :param force: If True, clear the job (or jobs), even if it hasn't completed, failed or been cancelled. """ - with transaction.atomic(): + with transaction.atomic(using=self._get_job_database_alias()): queryset = ORMJob.objects.all() if queue: queryset = queryset.filter(queue=queue) @@ -456,7 +463,7 @@ def save_worker_info( # nothing to do return - with transaction.atomic(): + with transaction.atomic(using=self._get_job_database_alias()): try: _, orm_job = self._get_job_and_orm_job(job_id) if host is not None: @@ -591,7 +598,7 @@ def _should_retry_on_failed_task(self, orm_job, exception, retry_interval): return True def _update_job(self, job_id, state=None, **kwargs): - with transaction.atomic(): + with transaction.atomic(using=self._get_job_database_alias()): try: job, orm_job = self._get_job_and_orm_job(job_id) if state is not None: @@ -716,7 +723,7 @@ def schedule( if not isinstance(job, Job): raise ValueError("Job argument must be a Job object.") - with transaction.atomic(): + with transaction.atomic(using=self._get_job_database_alias()): orm_job = ORMJob.objects.filter(id=job.job_id).first() if orm_job and orm_job.state == State.RUNNING: raise JobRunning() From 5a8b0cd942621a3dd4521cfc1aeac52ef57a9850 Mon Sep 17 00:00:00 2001 From: Alex Velez Date: Tue, 9 Dec 2025 21:48:28 -0500 Subject: [PATCH 11/13] Add JOB_STORAGE to ADDITIONAL_SQLITE_DATABASES array --- kolibri/deployment/default/settings/base.py | 34 ++++++++++--------- kolibri/deployment/default/sqlite_db_names.py | 8 ++++- kolibri/utils/main.py | 6 +++- 3 files changed, 30 insertions(+), 18 deletions(-) diff --git a/kolibri/deployment/default/settings/base.py b/kolibri/deployment/default/settings/base.py index 8727fdecbb5..eedc53a0eee 100644 --- a/kolibri/deployment/default/settings/base.py +++ b/kolibri/deployment/default/settings/base.py @@ -141,11 +141,6 @@ # https://docs.djangoproject.com/en/3.2/ref/settings/#databases if conf.OPTIONS["Database"]["DATABASE_ENGINE"] == "sqlite": - job_storage_path = conf.OPTIONS["Tasks"]["JOB_STORAGE_FILEPATH"] - # if job_storage_path is relative, make it relative to KOLIBRI_HOME - if not os.path.isabs(job_storage_path): - job_storage_path = os.path.join(conf.KOLIBRI_HOME, job_storage_path) - # Using custom SQLite backend that uses BEGIN IMMEDIATE transactions. # Once upgraded to Django 5.2+, revert to "django.db.backends.sqlite3" and use # the transaction_mode option instead. @@ -157,25 +152,32 @@ conf.OPTIONS["Database"]["DATABASE_NAME"] or "db.sqlite3", ), "OPTIONS": {"timeout": 100}, - }, - JOB_STORAGE: { - "ENGINE": "django.db.backends.sqlite3", - "NAME": job_storage_path, - "OPTIONS": {"timeout": 100}, - "TEST": { - "NAME": os.path.join(conf.KOLIBRI_HOME, "test_job_storage.sqlite3") - }, - }, + } } for additional_db in ADDITIONAL_SQLITE_DATABASES: - # TODO ADD JOB_STORAGE to ADDITIONAL_SQLITE_DATABASES and check it here - DATABASES[additional_db] = { + db_config = { "ENGINE": "kolibri.deployment.default.db.backends.sqlite3", "NAME": os.path.join(conf.KOLIBRI_HOME, "{}.sqlite3".format(additional_db)), "OPTIONS": {"timeout": 100}, } + if additional_db == JOB_STORAGE: + # JOB_STORAGE uses a custom file path from the config + job_storage_path = conf.OPTIONS["Tasks"]["JOB_STORAGE_FILEPATH"] + if not os.path.isabs(job_storage_path): + job_storage_path = os.path.join(conf.KOLIBRI_HOME, job_storage_path) + + db_config["NAME"] = job_storage_path + + # Override test config for job storage to use a sqlite file in KOLIBRI_HOME rather than in-memory + # as jobs tasks may be run in a separate thread/process + db_config["TEST"] = { + "NAME": os.path.join(conf.KOLIBRI_HOME, "test_job_storage.sqlite3") + } + + DATABASES[additional_db] = db_config + DATABASE_ROUTERS = ( "kolibri.core.auth.models.SessionRouter", "kolibri.core.notifications.models.NotificationsRouter", diff --git a/kolibri/deployment/default/sqlite_db_names.py b/kolibri/deployment/default/sqlite_db_names.py index e527462ed7d..c9031ce35ed 100644 --- a/kolibri/deployment/default/sqlite_db_names.py +++ b/kolibri/deployment/default/sqlite_db_names.py @@ -15,4 +15,10 @@ JOB_STORAGE = "job_storage" -ADDITIONAL_SQLITE_DATABASES = (SYNC_QUEUE, NETWORK_LOCATION, NOTIFICATIONS, SESSIONS) +ADDITIONAL_SQLITE_DATABASES = ( + SYNC_QUEUE, + NETWORK_LOCATION, + NOTIFICATIONS, + SESSIONS, + JOB_STORAGE, +) diff --git a/kolibri/utils/main.py b/kolibri/utils/main.py index 15b979e6a9a..b75a1bff7a5 100644 --- a/kolibri/utils/main.py +++ b/kolibri/utils/main.py @@ -203,7 +203,9 @@ def _upgrades_before_django_setup(updated, version): ] DATABASE_NAMES += [ os.path.join(KOLIBRI_HOME, "{}.sqlite3".format(db)) + # TODO: Figure out a way to handle custom db names for db in ADDITIONAL_SQLITE_DATABASES + if db != "job_storage" ] sqlite_check_foreign_keys(DATABASE_NAMES) # If we are using sqlite, @@ -218,7 +220,9 @@ def _upgrades_before_django_setup(updated, version): # If this is an upgrade, it is possible we've added an additional # database, so we can attempt to copy a preseeded database here. for db_name in ADDITIONAL_SQLITE_DATABASES: - _copy_preseeded_db(db_name) + if db_name != "job_storage": + # TODO: Figure out a way to handle custom db names + _copy_preseeded_db(db_name) def _post_django_initialization(): From 283ffdea8c272998a51378e59b920dc30e1f8280 Mon Sep 17 00:00:00 2001 From: Alex Velez Date: Wed, 10 Dec 2025 09:39:29 -0500 Subject: [PATCH 12/13] Standardize databases path computation --- kolibri/deployment/default/settings/base.py | 15 ++-------- kolibri/deployment/default/sqlite_db_names.py | 30 +++++++++++++++++++ kolibri/utils/main.py | 28 +++++++---------- 3 files changed, 43 insertions(+), 30 deletions(-) diff --git a/kolibri/deployment/default/settings/base.py b/kolibri/deployment/default/settings/base.py index eedc53a0eee..f9d98649b68 100644 --- a/kolibri/deployment/default/settings/base.py +++ b/kolibri/deployment/default/settings/base.py @@ -21,6 +21,7 @@ import kolibri from kolibri.deployment.default.cache import CACHES from kolibri.deployment.default.sqlite_db_names import ADDITIONAL_SQLITE_DATABASES +from kolibri.deployment.default.sqlite_db_names import get_sqlite_database_path from kolibri.deployment.default.sqlite_db_names import JOB_STORAGE from kolibri.plugins.utils.settings import apply_settings from kolibri.utils import conf @@ -147,10 +148,7 @@ DATABASES = { "default": { "ENGINE": "kolibri.deployment.default.db.backends.sqlite3", - "NAME": os.path.join( - conf.KOLIBRI_HOME, - conf.OPTIONS["Database"]["DATABASE_NAME"] or "db.sqlite3", - ), + "NAME": get_sqlite_database_path("default"), "OPTIONS": {"timeout": 100}, } } @@ -158,18 +156,11 @@ for additional_db in ADDITIONAL_SQLITE_DATABASES: db_config = { "ENGINE": "kolibri.deployment.default.db.backends.sqlite3", - "NAME": os.path.join(conf.KOLIBRI_HOME, "{}.sqlite3".format(additional_db)), + "NAME": get_sqlite_database_path(additional_db), "OPTIONS": {"timeout": 100}, } if additional_db == JOB_STORAGE: - # JOB_STORAGE uses a custom file path from the config - job_storage_path = conf.OPTIONS["Tasks"]["JOB_STORAGE_FILEPATH"] - if not os.path.isabs(job_storage_path): - job_storage_path = os.path.join(conf.KOLIBRI_HOME, job_storage_path) - - db_config["NAME"] = job_storage_path - # Override test config for job storage to use a sqlite file in KOLIBRI_HOME rather than in-memory # as jobs tasks may be run in a separate thread/process db_config["TEST"] = { diff --git a/kolibri/deployment/default/sqlite_db_names.py b/kolibri/deployment/default/sqlite_db_names.py index c9031ce35ed..9261f99090e 100644 --- a/kolibri/deployment/default/sqlite_db_names.py +++ b/kolibri/deployment/default/sqlite_db_names.py @@ -3,6 +3,10 @@ Keep them here for a single source of truth that can be referenced by apps and our default settings. """ +import os + +from kolibri.utils.conf import KOLIBRI_HOME +from kolibri.utils.conf import OPTIONS SYNC_QUEUE = "syncqueue" @@ -22,3 +26,29 @@ SESSIONS, JOB_STORAGE, ) + + +def get_sqlite_database_path(db_name): + """ + Get the path for a specific SQLite database. + """ + + if db_name == "default": + main_db_name = OPTIONS["Database"]["DATABASE_NAME"] or "db.sqlite3" + return os.path.join(KOLIBRI_HOME, main_db_name) + + if db_name not in ADDITIONAL_SQLITE_DATABASES: + raise ValueError( + f"Unknown database name '{db_name}'. " + f"Use 'default' or one of: {', '.join(ADDITIONAL_SQLITE_DATABASES)}" + ) + + if db_name == JOB_STORAGE: + # JOB_STORAGE uses a custom file path from the config + job_storage_path = OPTIONS["Tasks"]["JOB_STORAGE_FILEPATH"] + if not os.path.isabs(job_storage_path): + job_storage_path = os.path.join(KOLIBRI_HOME, job_storage_path) + + return job_storage_path + + return os.path.join(KOLIBRI_HOME, "{}.sqlite3".format(db_name)) diff --git a/kolibri/utils/main.py b/kolibri/utils/main.py index b75a1bff7a5..46918f3df55 100644 --- a/kolibri/utils/main.py +++ b/kolibri/utils/main.py @@ -20,6 +20,7 @@ from kolibri.core.upgrade import run_upgrades from kolibri.core.utils.cache import process_cache from kolibri.deployment.default.sqlite_db_names import ADDITIONAL_SQLITE_DATABASES +from kolibri.deployment.default.sqlite_db_names import get_sqlite_database_path from kolibri.plugins.utils import autoremove_unavailable_plugins from kolibri.plugins.utils import check_plugin_config_file_location from kolibri.plugins.utils import enable_new_default_plugins @@ -151,9 +152,8 @@ def _setup_django(): raise -def _copy_preseeded_db(db_name, target=None): - target = target or "{}.sqlite3".format(db_name) - target = os.path.join(KOLIBRI_HOME, target) +def _copy_preseeded_db(db_name): + target = get_sqlite_database_path(db_name) if not os.path.exists(target): try: import kolibri.dist @@ -196,17 +196,11 @@ def _upgrades_before_django_setup(updated, version): check_default_options_exist() if OPTIONS["Database"]["DATABASE_ENGINE"] == "sqlite": - DATABASE_NAMES = [ - os.path.join( - KOLIBRI_HOME, OPTIONS["Database"]["DATABASE_NAME"] or "db.sqlite3" - ) - ] - DATABASE_NAMES += [ - os.path.join(KOLIBRI_HOME, "{}.sqlite3".format(db)) - # TODO: Figure out a way to handle custom db names - for db in ADDITIONAL_SQLITE_DATABASES - if db != "job_storage" - ] + DATABASE_NAMES = [get_sqlite_database_path("default")] + + for db_name in ADDITIONAL_SQLITE_DATABASES: + DATABASE_NAMES.append(get_sqlite_database_path(db_name)) + sqlite_check_foreign_keys(DATABASE_NAMES) # If we are using sqlite, # we can shortcut migrations by using the preseeded databases @@ -214,15 +208,13 @@ def _upgrades_before_django_setup(updated, version): if not version: logger.info("Attempting to setup using pre-migrated databases") # Only copy the default database if this is a fresh install - _copy_preseeded_db("db", target=OPTIONS["Database"]["DATABASE_NAME"]) + _copy_preseeded_db("default") if not version or updated: # If this is an upgrade, it is possible we've added an additional # database, so we can attempt to copy a preseeded database here. for db_name in ADDITIONAL_SQLITE_DATABASES: - if db_name != "job_storage": - # TODO: Figure out a way to handle custom db names - _copy_preseeded_db(db_name) + _copy_preseeded_db(db_name) def _post_django_initialization(): From 1a2f204267b67343f57d4aa76f00904f49d48bd0 Mon Sep 17 00:00:00 2001 From: Alex Velez Date: Wed, 10 Dec 2025 09:40:55 -0500 Subject: [PATCH 13/13] Fix db connections overrides that causes side effects on following tests --- .../core/deviceadmin/tests/test_dbrestore.py | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/kolibri/core/deviceadmin/tests/test_dbrestore.py b/kolibri/core/deviceadmin/tests/test_dbrestore.py index 365a5582b6c..ea584d7f98b 100644 --- a/kolibri/core/deviceadmin/tests/test_dbrestore.py +++ b/kolibri/core/deviceadmin/tests/test_dbrestore.py @@ -142,17 +142,14 @@ def test_restore_from_latest(): # Restore it into a new test database setting with override_settings(DATABASES=MOCK_DATABASES): - from django import db - - # Destroy current connections and create new ones: - db.connections.close_all() - db.connections = db.ConnectionHandler() - call_command("dbrestore", "-l") - # Test that the user has been restored! - assert ( - Facility.objects.filter(name="test latest", kind=FACILITY).count() == 1 - ) - _clear_backups(default_backup_folder()) + with patch("django.db.connections", ConnectionHandler()): + call_command("dbrestore", "-l") + # Test that the user has been restored! + assert ( + Facility.objects.filter(name="test latest", kind=FACILITY).count() + == 1 + ) + _clear_backups(default_backup_folder()) @pytest.mark.django_db(transaction=True) @@ -203,9 +200,6 @@ def test_restore_from_file_to_file(): dest_folder = tempfile.mkdtemp() # Purposefully destroy the connection pointer, which is the default # state of an unopened connection - from django import db - - db.connections["default"].connection = None backup = dbbackup(kolibri.__version__, dest_folder=dest_folder) # Restore it into a new test database setting