Skip to content

Commit 12bfa1d

Browse files
pedro-psbmdellweg
andcommitted
Enable immediate task from content app
When requiring immediate tasks to be async, we had a problem dispatching them from the content app due to the use of django transactions in the dispatch code and how that plays with asyncio. Now that we no longer have that (due to the app_lock implementation) we can remove the workaround we had in place and prioritize immediate tasks on the worker. Co-authored-by: Matthias Dellweg <[email protected]>
1 parent d3bfcec commit 12bfa1d

File tree

3 files changed

+11
-14
lines changed

3 files changed

+11
-14
lines changed

CHANGES/+task-immediate.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Enabled immediate tasks from content app to be prioritized.

pulpcore/tasking/tasks.py

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import sys
77
import traceback
88
import tempfile
9-
import threading
109
from gettext import gettext as _
1110
from contextlib import contextmanager
1211
from asgiref.sync import sync_to_async, async_to_sync
@@ -187,14 +186,6 @@ async def task_wrapper(): # asyncio.wait_for + async_to_sync requires wrapping
187186
return async_to_sync(task_wrapper)
188187

189188

190-
def running_from_thread_pool() -> bool:
191-
# TODO: this needs an alternative approach ASAP!
192-
# Currently we rely on the weak fact that ThreadPoolExecutor names threads like:
193-
# "ThreadPoolExecutor-0_0"
194-
thread_name = threading.current_thread().name
195-
return "ThreadPoolExecutor" in thread_name
196-
197-
198189
def dispatch(
199190
func,
200191
args=None,
@@ -242,8 +233,7 @@ def dispatch(
242233
ValueError: When `resources` is an unsupported type.
243234
"""
244235

245-
# Can't run short tasks immediately if running from thread pool
246-
immediate = immediate and not running_from_thread_pool()
236+
execute_now = immediate and not called_from_content_app()
247237
assert deferred or immediate, "A task must be at least `deferred` or `immediate`."
248238
send_wakeup_signal = True if not immediate else False
249239
function_name = get_function_name(func)
@@ -254,7 +244,7 @@ def dispatch(
254244
)
255245
task = Task.objects.create(**task_payload)
256246
task.refresh_from_db() # The database will have assigned a timestamp for us.
257-
if immediate:
247+
if execute_now:
258248
if are_resources_available(colliding_resources, task):
259249
send_wakeup_signal = True if resources else False
260250
task.unblock()
@@ -283,6 +273,7 @@ async def adispatch(
283273
versions=None,
284274
):
285275
"""Async version of dispatch."""
276+
execute_now = immediate and not called_from_content_app()
286277
assert deferred or immediate, "A task must be at least `deferred` or `immediate`."
287278
function_name = get_function_name(func)
288279
versions = get_version(versions, function_name)
@@ -293,7 +284,7 @@ async def adispatch(
293284
)
294285
task = await Task.objects.acreate(**task_payload)
295286
await task.arefresh_from_db() # The database will have assigned a timestamp for us.
296-
if immediate:
287+
if execute_now:
297288
if await async_are_resources_available(colliding_resources, task):
298289
send_wakeup_signal = True if resources else False
299290
await task.aunblock()
@@ -363,6 +354,11 @@ def are_resources_available(colliding_resources, task: Task) -> bool:
363354
return not colliding_resources or not colliding_resources_taken
364355

365356

357+
def called_from_content_app() -> bool:
358+
current_app = AppStatus.objects.current()
359+
return current_app is not None and current_app.app_type == "content"
360+
361+
366362
def get_function_name(func):
367363
if callable(func):
368364
function_name = f"{func.__module__}.{func.__name__}"

pulpcore/tests/unit/content/test_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ async def test_pull_through_repository_add(request123, monkeypatch):
561561
@pytest_asyncio.fixture
562562
async def app_status(monkeypatch):
563563
monkeypatch.setattr(AppStatus.objects, "_current_app_status", None)
564-
app_status = await AppStatus.objects.acreate(app_type="content", name="test_runner")
564+
app_status = await AppStatus.objects.acreate(app_type="api", name="test_runner")
565565
yield app_status
566566
await app_status.adelete()
567567

0 commit comments

Comments
 (0)