diff --git a/kolibri/core/content/test/test_import_export.py b/kolibri/core/content/test/test_import_export.py index 6ad47aabb35..62dedb8e688 100644 --- a/kolibri/core/content/test/test_import_export.py +++ b/kolibri/core/content/test/test_import_export.py @@ -1125,7 +1125,10 @@ def test_remote_import_no_space_after_first_download( 2201062 + 336974, ) get_free_space_mock.side_effect = [100000000000, 0, 0, 0, 0, 0, 0] - with self.assertRaises(InsufficientStorageSpaceError): + # Ensure single threaded operation for deterministic testing + with patch( + "kolibri.core.tasks.utils.get_fd_limit", return_value=1 + ), self.assertRaises(InsufficientStorageSpaceError): manager = RemoteChannelResourceImportManager(self.the_channel_id) manager.run() self.annotation_mock.set_content_visibility.assert_called_with( @@ -1466,7 +1469,9 @@ def test_remote_import_full_import( 10, ) manager = RemoteChannelResourceImportManager(self.the_channel_id) - manager.run() + # Ensure single threaded operation for deterministic testing + with patch("kolibri.core.tasks.utils.get_fd_limit", return_value=1): + manager.run() self.annotation_mock.set_content_visibility.assert_called_with( self.the_channel_id, [ diff --git a/kolibri/core/content/utils/resource_import.py b/kolibri/core/content/utils/resource_import.py index 49031b89e5d..54f4b96c56a 100644 --- a/kolibri/core/content/utils/resource_import.py +++ b/kolibri/core/content/utils/resource_import.py @@ -282,6 +282,10 @@ def run_import(self): # Allow for two open file descriptors per download: # The temporary download file that the file is streamed to initially, and then # the actual destination file that it is moved to. + # Note that with the possibility of a chunked file download, + # the true number of file descriptors used may be higher, + # but this is unlikely to be a problem in practice, and we build in extra tolerance + # in the fd_safe_executor max worker calculation. with fd_safe_executor(fds_per_task=2) as executor: self.executor = executor batch_size = 100 @@ -392,8 +396,27 @@ def __init__( ) self.session = requests.Session() + # Because we create the executor in the run method, we need to track + # we need to mount the adapter in the create_file_transfer method + # so that we can introspect the executor to configure the pool correctly. + self._adapter_mounted = False + + def _mount_adapter(self): + if not self._adapter_mounted: + # If we are using a ThreadPoolExecutor, then we need to make sure + # that the requests session has enough connections to handle + # the number of threads. + max_workers = self.executor._max_workers + adapter = requests.adapters.HTTPAdapter( + pool_connections=max_workers, + pool_maxsize=max_workers, + ) + self.session.mount("http://", adapter) + self.session.mount("https://", adapter) + self._adapter_mounted = True def create_file_transfer(self, f, filename, dest): + self._mount_adapter() url = paths.get_content_storage_remote_url(filename, baseurl=self.baseurl) return transfer.FileDownload( url, diff --git a/kolibri/core/tasks/utils.py b/kolibri/core/tasks/utils.py index eab8e5d8697..8466c432c31 100644 --- a/kolibri/core/tasks/utils.py +++ b/kolibri/core/tasks/utils.py @@ -358,12 +358,16 @@ def fd_safe_executor(fds_per_task=2): else concurrent.futures.ThreadPoolExecutor ) - max_workers = 10 + max_workers = 50 - if not use_multiprocessing: - # If we're not using multiprocessing for workers, we may need - # to limit the number of workers depending on the number of allowed - # file descriptors. + # We may need to limit the number of workers depending + # on the number of allowed file descriptors. + + if conf.OPTIONS["Tasks"]["USE_WORKER_MULTIPROCESSING"]: + # If we are using multiprocessing, then file descriptors are not shared. + # So we can use all the available file descriptors for this task. + max_descriptors_per_task = get_fd_limit() + else: # This is a heuristic method, where we know there can be issues if # the max number of file descriptors for a process is 256, and we use 10 # workers, with potentially 4 concurrent tasks downloading files. @@ -376,12 +380,11 @@ def fd_safe_executor(fds_per_task=2): max_descriptors_per_task = ( get_fd_limit() - server_reserved_fd_count ) / conf.OPTIONS["Tasks"]["REGULAR_PRIORITY_WORKERS"] - # Each task only needs to have a maximum of `fds_per_task` open file descriptors at once. - # To add tolerance, we divide the number of file descriptors that could be allocated to - # this task by double this number which should give us leeway in case of unforeseen - # descriptor use during the process. - max_workers = min( - max_workers, min(1, max_descriptors_per_task // (fds_per_task * 2)) - ) - + # Each task only needs to have a maximum of `fds_per_task` open file descriptors at once. + # To add tolerance, we divide the number of file descriptors that could be allocated to + # this task by 1.5 times this number which should give us leeway in case of unforeseen + # descriptor use during the process. + max_workers = min( + max_workers, max(1, max_descriptors_per_task // (fds_per_task * 2)) + ) return executor(max_workers=max_workers) diff --git a/kolibri/core/tasks/worker.py b/kolibri/core/tasks/worker.py index 252792ff1e4..3e896cd61f2 100644 --- a/kolibri/core/tasks/worker.py +++ b/kolibri/core/tasks/worker.py @@ -7,18 +7,25 @@ from kolibri.core.tasks.storage import Storage from kolibri.core.tasks.utils import db_connection from kolibri.core.tasks.utils import InfiniteLoopThread +from kolibri.utils.logger import setup_worker_logging from kolibri.utils.multiprocessing_compat import PoolExecutor logger = logging.getLogger(__name__) +def _init_worker(log_queue): + """ + Initialize worker process, setting up logging to use the given log queue. + """ + setup_worker_logging(log_queue) + + def execute_job( job_id, worker_host=None, worker_process=None, worker_thread=None, worker_extra=None, - log_queue=None, ): """ Call the function stored in the job.func. @@ -41,7 +48,7 @@ def execute_job( django_connection.close() -def execute_job_with_python_worker(job_id, log_queue=None): +def execute_job_with_python_worker(job_id): """ Call execute_job but additionally with the current host, process and thread information taken directly from python internals. @@ -55,7 +62,6 @@ def execute_job_with_python_worker(job_id, log_queue=None): worker_host=socket.gethostname(), worker_process=str(os.getpid()), worker_thread=str(threading.get_ident()), - log_queue=log_queue, ) @@ -105,7 +111,11 @@ def shutdown_workers(self, wait=True): self.workers.shutdown(wait=wait) def start_workers(self): - pool = PoolExecutor(max_workers=self.max_workers) + pool = PoolExecutor( + max_workers=self.max_workers, + initializer=_init_worker, + initargs=(self.log_queue,), + ) return pool def handle_finished_future(self, future): @@ -199,7 +209,6 @@ def start_next_job(self, job): future = self.workers.submit( execute_job_with_python_worker, job_id=job.job_id, - log_queue=self.log_queue, ) # Check if the job ID already exists in the future_job_mapping dictionary diff --git a/kolibri/utils/logger.py b/kolibri/utils/logger.py index fa14930a508..00680a952bf 100644 --- a/kolibri/utils/logger.py +++ b/kolibri/utils/logger.py @@ -52,7 +52,15 @@ def prepare(self, record: logging.LogRecord) -> logging.LogRecord: ) record.exc_info = None if hasattr(record, "args"): - record.args = tuple(str(arg) for arg in record.args) + # Convert args to strings only if they aren't already pickle-safe + safe_args = [] + for arg in record.args: + # Keep numeric types as-is for format compatibility + if isinstance(arg, (int, float, bool, type(None))): + safe_args.append(arg) + else: + safe_args.append(str(arg)) + record.args = tuple(safe_args) record = super().prepare(record) record._logger_name = self.logger_name diff --git a/kolibri/utils/options.py b/kolibri/utils/options.py index ce94d91be2b..484b8677036 100644 --- a/kolibri/utils/options.py +++ b/kolibri/utils/options.py @@ -796,7 +796,7 @@ def csp_source_list(value): "Tasks": { "USE_WORKER_MULTIPROCESSING": { "type": "multiprocess_bool", - "default": False, + "default": True, "description": """ Whether to use Python multiprocessing for worker pools. If False, then it will use threading. This may be useful, if running on a dedicated device with multiple cores, and a lot of asynchronous tasks get run.