From 04f13cc3d2f48fa5d58f8c0231fc761fa5e6bbea Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Wed, 30 Jul 2025 19:30:33 -0300 Subject: [PATCH 01/18] Add and test postgres pubsub class implementation --- pulpcore/tasking/pubsub.py | 118 ++++++++++++++++++++++ pulpcore/tests/functional/test_pubsub.py | 123 +++++++++++++++++++++++ 2 files changed, 241 insertions(+) create mode 100644 pulpcore/tasking/pubsub.py create mode 100644 pulpcore/tests/functional/test_pubsub.py diff --git a/pulpcore/tasking/pubsub.py b/pulpcore/tasking/pubsub.py new file mode 100644 index 0000000000..fff4467fde --- /dev/null +++ b/pulpcore/tasking/pubsub.py @@ -0,0 +1,118 @@ +from typing import NamedTuple +import os +import logging +from contextlib import suppress + +logger = logging.getLogger(__name__) + + +def wakeup_worker(pubsub_backend, reason="unknown"): + pubsub_backend.publish(BasePubSubBackend.WORKER_WAKEUP, reason) + + +def cancel_task(task_pk, pubsub_backend): + pubsub_backend.publish(BasePubSubBackend.TASK_CANCELLATION, str(task_pk)) + + +def record_worker_metrics(pubsub_backend, now): + pubsub_backend.publish(BasePubSubBackend.WORKER_METRIC, str(now)) + + +class BasePubSubBackend: + WORKER_WAKEUP = "pulp_worker_wakeup" + TASK_CANCELLATION = "pulp_worker_cancel" + WORKER_METRIC = "pulp_worker_metrics_heartbeat" + + def subscribe(self, channel, callback): + raise NotImplementedError() + + def unsubscribe(self, channel): + raise NotImplementedError() + + def publish(self, channel, message=None): + raise NotImplementedError() + + def fileno(self): + """Add support for being used in select loop.""" + raise NotImplementedError() + + def fetch(self): + """Fetch messages new message, if required.""" + raise NotImplementedError() + + def close(self): + raise NotImplementedError() + + +class PubsubMessage(NamedTuple): + channel: str + payload: str + + +def drain_non_blocking_fd(fd): + with suppress(BlockingIOError): + while True: + os.read(fd, 256) + + +class PostgresPubSub(BasePubSubBackend): + + def __init__(self, connection): + self.cursor = connection.cursor() + self.connection = connection.connection + assert self.cursor.connection is self.connection + self.subscriptions = [] + self.message_buffer = [] + self.connection.add_notify_handler(self._store_messages) + # Handle message readiness + # We can use os.evenfd in python >= 3.10 + self.sentinel_r, self.sentinel_w = os.pipe() + os.set_blocking(self.sentinel_r, False) + os.set_blocking(self.sentinel_w, False) + logger.debug(f"Initialized pubsub. Conn={self.connection}") + + def _store_messages(self, notification): + self.message_buffer.append( + PubsubMessage(channel=notification.channel, payload=notification.payload) + ) + + def subscribe(self, channel): + self.subscriptions.append(channel) + self.connection.execute(f"LISTEN {channel}") + + def unsubscribe(self, channel): + self.subscriptions.remove(channel) + for i in range(0, len(self.message_buffer), -1): + if self.message_buffer[i].channel == channel: + self.message_buffer.pop(i) + self.connection.execute(f"UNLISTEN {channel}") + + def publish(self, channel, message=None): + if not message: + self.cursor.execute(f"NOTIFY {channel}") + else: + self.cursor.execute("SELECT pg_notify(%s, %s)", (channel, message)) + + def fileno(self) -> int: + if self.message_buffer: + os.write(self.sentinel_w, b"0") + else: + drain_non_blocking_fd(self.sentinel_r) + return self.sentinel_r + + def fetch(self) -> list[PubsubMessage]: + self.connection.execute("SELECT 1").fetchone() + result = self.message_buffer.copy() + self.message_buffer.clear() + return result + + def close(self): + os.close(self.sentinel_r) + os.close(self.sentinel_w) + self.cursor.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() diff --git a/pulpcore/tests/functional/test_pubsub.py b/pulpcore/tests/functional/test_pubsub.py new file mode 100644 index 0000000000..314d6cd189 --- /dev/null +++ b/pulpcore/tests/functional/test_pubsub.py @@ -0,0 +1,123 @@ +import django + +django.setup() + +from django.db import connection +from pulpcore.tasking import pubsub +from types import SimpleNamespace +import select +import time +import pytest + + +def test_postgres_pubsub(): + state = SimpleNamespace() + state.got_first_message = False + state.got_second_message = False + with connection.cursor() as cursor: + assert connection.connection is cursor.connection + conn = cursor.connection + conn.execute("LISTEN abc") + conn.add_notify_handler(lambda notification: setattr(state, "got_message", True)) + cursor.execute("NOTIFY abc, 'foo'") + conn.execute("SELECT 1") + conn.execute("UNLISTEN abc") + assert state.got_message is True + + +M = pubsub.PubsubMessage + + +@pytest.mark.parametrize( + "messages", + ( + [M("channel_a", "A1")], + [M("channel_a", "A1"), M("channel_a", "A2")], + [M("channel_a", "A1"), M("channel_a", "A2"), M("channel_b", "B1"), M("channel_c", "C1")], + ), +) +@pytest.mark.parametrize("same_client", (True, False), ids=("same-clients", "different-clients")) +class TestPubSub: + + def test_subscribe_publish_fetch(self, same_client, messages): + """ + GIVEN a publisher and a subscriber (which may be the same) + AND a queue of messages Q with mixed channels and payloads + WHEN the subscriber subscribes to all the channels in Q + AND the publisher publishes all the messages in Q + THEN the subscriber fetch() call returns a queue equivalent to Q + AND calling fetch() a second time returns an empty queue + """ + # Given + publisher = pubsub.PostgresPubSub(connection) + subscriber = publisher if same_client else pubsub.PostgresPubSub(connection) + + # When + for message in messages: + subscriber.subscribe(message.channel) + for message in messages: + publisher.publish(message.channel, message=message.payload) + + # Then + assert subscriber.fetch() == messages + assert subscriber.fetch() == [] + + def test_unsubscribe(self, same_client, messages): + """ + GIVEN a publisher and a subscriber (which may be the same) + AND a queue of messages Q with mixed channels and payloads + WHEN the subscriber subscribes and unsubscribes to all the channels in Q + AND the publisher publishes all the messages in Q + THEN the subscriber fetch() call returns an empty queue + """ + # Given + publisher = pubsub.PostgresPubSub(connection) + subscriber = publisher if same_client else pubsub.PostgresPubSub(connection) + + # When + for message in messages: + subscriber.subscribe(message.channel) + for message in messages: + subscriber.unsubscribe(message.channel) + for message in messages: + publisher.publish(message.channel, message=message.payload) + + # Then + assert subscriber.fetch() == [] + + def test_select_loop(self, same_client, messages): + """ + GIVEN a publisher and a subscriber (which may be the same) + AND a queue of messages Q with mixed channels and payloads + AND the subscriber is subscribed to all the channels in Q + WHEN the publisher has NOT published anything yet + THEN the select loop won't detect the subscriber readiness + AND the subscriber fetch() call returns an empty queue + BUT WHEN the publisher does publish all messages in Q + THEN the select loop detects the subscriber readiness + AND the subscriber fetch() call returns a queue equivalent to Q + """ + TIMEOUT = 0.1 + + # Given + publisher = pubsub.PostgresPubSub(connection) + subscriber = publisher if same_client else pubsub.PostgresPubSub(connection) + + # When + for message in messages: + subscriber.subscribe(message.channel) + r, w, x = select.select([subscriber], [], [], TIMEOUT) + + # Then + assert subscriber not in r + assert subscriber.fetch() == [] + + # But When + for message in messages: + publisher.publish(message.channel, message=message.payload) + r, w, x = select.select([subscriber], [], [], TIMEOUT) + + # Then + assert subscriber in r + assert subscriber.fetch() == messages + assert subscriber.fetch() == [] From 209b1c16171314939ed7c0f19f6aff88a43cc34f Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Tue, 29 Jul 2025 13:29:50 -0300 Subject: [PATCH 02/18] Refactor listen/notify as a pubsub backend --- pulpcore/app/models/task.py | 2 +- pulpcore/constants.py | 7 ++ pulpcore/tasking/pubsub.py | 24 +++--- pulpcore/tasking/tasks.py | 23 ++--- pulpcore/tasking/worker.py | 103 ++++++++++++----------- pulpcore/tests/functional/test_pubsub.py | 5 -- 6 files changed, 83 insertions(+), 81 deletions(-) diff --git a/pulpcore/app/models/task.py b/pulpcore/app/models/task.py index 80d6ef7650..b058e3d9c3 100644 --- a/pulpcore/app/models/task.py +++ b/pulpcore/app/models/task.py @@ -71,7 +71,7 @@ class Task(BaseModel, AutoAddObjPermsMixin): The transitions to CANCELING (marked with *) are the only ones allowed to happen without holding the tasks advisory lock. Canceling is meant to be initiated asyncronously by a sparate - process before signalling the worker via Postgres LISTEN. + process before signalling the worker via a pubsub notification (e.g, Postgres LISTEN). Fields: diff --git a/pulpcore/constants.py b/pulpcore/constants.py index fb128573b9..b6b2441cba 100644 --- a/pulpcore/constants.py +++ b/pulpcore/constants.py @@ -16,6 +16,13 @@ TASK_WAKEUP_UNBLOCK = "unblock" TASK_WAKEUP_HANDLE = "handle" +#: All valid tasking pubsub channels +TASK_PUBSUB = SimpleNamespace( + WAKEUP_WORKER="pulp_worker_wakeup", + CANCEL_TASK="pulp_worker_cancel", + WORKER_METRICS="pulp_worker_metrics_heartbeat", +) + #: All valid task states. TASK_STATES = SimpleNamespace( WAITING="waiting", diff --git a/pulpcore/tasking/pubsub.py b/pulpcore/tasking/pubsub.py index fff4467fde..4265c9f011 100644 --- a/pulpcore/tasking/pubsub.py +++ b/pulpcore/tasking/pubsub.py @@ -1,4 +1,5 @@ from typing import NamedTuple +from pulpcore.constants import TASK_PUBSUB import os import logging from contextlib import suppress @@ -6,23 +7,18 @@ logger = logging.getLogger(__name__) -def wakeup_worker(pubsub_backend, reason="unknown"): - pubsub_backend.publish(BasePubSubBackend.WORKER_WAKEUP, reason) - - -def cancel_task(task_pk, pubsub_backend): - pubsub_backend.publish(BasePubSubBackend.TASK_CANCELLATION, str(task_pk)) - - -def record_worker_metrics(pubsub_backend, now): - pubsub_backend.publish(BasePubSubBackend.WORKER_METRIC, str(now)) +class BasePubSubBackend: + # Utils + def wakeup_worker(self, reason="unknown"): + self.publish(TASK_PUBSUB.WAKEUP_WORKER, reason) + def cancel_task(self, task_pk): + self.publish(TASK_PUBSUB.CANCEL_TASK, str(task_pk)) -class BasePubSubBackend: - WORKER_WAKEUP = "pulp_worker_wakeup" - TASK_CANCELLATION = "pulp_worker_cancel" - WORKER_METRIC = "pulp_worker_metrics_heartbeat" + def record_worker_metrics(self, now): + self.publish(TASK_PUBSUB.WORKER_METRICS, str(now)) + # Interface def subscribe(self, channel, callback): raise NotImplementedError() diff --git a/pulpcore/tasking/tasks.py b/pulpcore/tasking/tasks.py index e15b0d1cec..a1e5aedb8c 100644 --- a/pulpcore/tasking/tasks.py +++ b/pulpcore/tasking/tasks.py @@ -30,6 +30,7 @@ TASK_WAKEUP_UNBLOCK, ) from pulpcore.middleware import x_task_diagnostics_var +from pulpcore.tasking import pubsub from pulpcore.tasking.kafka import send_task_notification _logger = logging.getLogger(__name__) @@ -50,12 +51,6 @@ def _validate_and_get_resources(resources): return list(resource_set) -def wakeup_worker(reason): - # Notify workers - with connection.connection.cursor() as cursor: - cursor.execute("SELECT pg_notify('pulp_worker_wakeup', %s)", (reason,)) - - def execute_task(task): # This extra stack is needed to isolate the current_task ContextVar contextvars.copy_context().run(_execute_task, task) @@ -257,7 +252,8 @@ def dispatch( task.set_canceling() task.set_canceled(TASK_STATES.CANCELED, "Resources temporarily unavailable.") if send_wakeup_signal: - wakeup_worker(TASK_WAKEUP_UNBLOCK) + with pubsub.PostgresPubSub(connection) as pubsub_client: + pubsub_client.wakeup_worker(reason=TASK_WAKEUP_UNBLOCK) return task @@ -297,7 +293,8 @@ async def adispatch( task.set_canceling() task.set_canceled(TASK_STATES.CANCELED, "Resources temporarily unavailable.") if send_wakeup_signal: - await sync_to_async(wakeup_worker)(TASK_WAKEUP_UNBLOCK) + with pubsub.PostgresPubSub(connection) as pubsub_client: + pubsub_client.wakeup_worker(reason=TASK_WAKEUP_UNBLOCK) return task @@ -429,12 +426,10 @@ def cancel_task(task_id): # This is the only valid transition without holding the task lock. task.set_canceling() - # Notify the worker that might be running that task. - with connection.cursor() as cursor: - if task.app_lock is None: - wakeup_worker(TASK_WAKEUP_HANDLE) - else: - cursor.execute("SELECT pg_notify('pulp_worker_cancel', %s)", (str(task.pk),)) + # Notify the worker that might be running that task and other workers to clean up + with pubsub.PostgresPubSub(connection) as pubsub_client: + pubsub_client.cancel_task(task_pk=task.pk) + pubsub_client.wakeup_worker() return task diff --git a/pulpcore/tasking/worker.py b/pulpcore/tasking/worker.py index d6236959d8..b23006e915 100644 --- a/pulpcore/tasking/worker.py +++ b/pulpcore/tasking/worker.py @@ -25,6 +25,7 @@ TASK_METRICS_HEARTBEAT_LOCK, TASK_WAKEUP_UNBLOCK, TASK_WAKEUP_HANDLE, + TASK_PUBSUB, ) from pulpcore.metrics import init_otel_meter from pulpcore.app.apps import pulp_plugin_configs @@ -32,6 +33,7 @@ from pulpcore.app.util import PGAdvisoryLock from pulpcore.exceptions import AdvisoryLockError +from pulpcore.tasking import pubsub from pulpcore.tasking.storage import WorkerDirectory from pulpcore.tasking._util import ( delete_incomplete_resources, @@ -88,6 +90,9 @@ def __init__(self, auxiliary=False): self.worker_cleanup_countdown = random.randint( int(WORKER_CLEANUP_INTERVAL / 10), WORKER_CLEANUP_INTERVAL ) + # Pubsub handling + self.pubsub_client = pubsub.PostgresPubSub(connection) + self.pubsub_channel_callback = {} # Add a file descriptor to trigger select on signals self.sentinel, sentinel_w = os.pipe() @@ -134,29 +139,6 @@ def _signal_handler(self, thesignal, frame): ) self.shutdown_requested = True - def _pg_notify_handler(self, notification): - if notification.channel == "pulp_worker_wakeup": - if notification.payload == TASK_WAKEUP_UNBLOCK: - # Auxiliary workers don't do this. - self.wakeup_unblock = not self.auxiliary - elif notification.payload == TASK_WAKEUP_HANDLE: - self.wakeup_handle = True - else: - _logger.warning("Unknown wakeup call recieved. Reason: '%s'", notification.payload) - # We cannot be sure so assume everything happened. - self.wakeup_unblock = not self.auxiliary - self.wakeup_handle = True - - elif notification.channel == "pulp_worker_metrics_heartbeat": - self.last_metric_heartbeat = datetime.fromisoformat(notification.payload) - elif self.task and notification.channel == "pulp_worker_cancel": - if notification.payload == str(self.task.pk): - self.cancel_task = True - - def shutdown(self): - self.app_status.delete() - _logger.info(_("Worker %s was shut down."), self.name) - def handle_worker_heartbeat(self): """ Update worker heartbeat records. @@ -217,9 +199,6 @@ def beat(self): # to be able to report on a congested tasking system to produce reliable results. self.record_unblocked_waiting_tasks_metric() - def notify_workers(self, reason): - self.cursor.execute("SELECT pg_notify('pulp_worker_wakeup', %s)", (reason,)) - def cancel_abandoned_task(self, task, final_state, reason=None): """Cancel and clean up an abandoned task. @@ -247,7 +226,8 @@ def cancel_abandoned_task(self, task, final_state, reason=None): delete_incomplete_resources(task) task.set_canceled(final_state=final_state, reason=reason) if task.reserved_resources_record: - self.notify_workers(TASK_WAKEUP_UNBLOCK) + self.pubsub_client.wakeup_worker(reason=TASK_WAKEUP_UNBLOCK) + return True def is_compatible(self, task): unmatched_versions = [ @@ -366,14 +346,11 @@ def sleep(self): _logger.debug(_("Worker %s entering sleep state."), self.name) while not self.shutdown_requested and not self.wakeup_handle: r, w, x = select.select( - [self.sentinel, connection.connection], - [], - [], - 0 if self.wakeup_unblock else self.heartbeat_period.seconds, + [self.sentinel, self.pubsub_client], [], [], self.heartbeat_period.seconds ) self.beat() - if connection.connection in r: - connection.connection.execute("SELECT 1") + if self.pubsub_client in r: + self.pubsub_handle_messages(self.pubsub_client.fetch()) if self.wakeup_unblock: self.unblock_tasks() if self.sentinel in r: @@ -409,14 +386,14 @@ def supervise_task(self, task): os.kill(task_process.pid, signal.SIGUSR1) r, w, x = select.select( - [self.sentinel, connection.connection, task_process.sentinel], + [self.sentinel, self.pubsub_client, task_process.sentinel], [], [], - 0 if self.wakeup_unblock or self.cancel_task else self.heartbeat_period.seconds, + self.heartbeat_period.seconds, ) self.beat() - if connection.connection in r: - connection.connection.execute("SELECT 1") + if self.pubsub_client in r: + self.pubsub_handle_messages(self.pubsub_client.fetch()) if self.cancel_task: _logger.info( _("Received signal to cancel current task %s in domain: %s."), @@ -468,7 +445,7 @@ def supervise_task(self, task): if cancel_state: self.cancel_abandoned_task(task, cancel_state, cancel_reason) if task.reserved_resources_record: - self.notify_workers(TASK_WAKEUP_UNBLOCK) + self.pubsub_client.wakeup_worker(reason=TASK_WAKEUP_UNBLOCK) self.task = None def fetch_task(self): @@ -579,18 +556,53 @@ def _record_unblocked_waiting_tasks_metric(self): unblocked_tasks_stats["longest_unblocked_waiting_time"].seconds ) - self.cursor.execute(f"NOTIFY pulp_worker_metrics_heartbeat, '{str(now)}'") + self.pubsub_client.record_worker_metrics(now) + + def pubsub_handle_messages(self, messages: pubsub.PubsubMessage): + for message in messages: + callback = self.pubsub_channel_callback[message.channel] + callback(message.payload) + + def pubsub_setup(self): + def cancellation_callback(message): + if self.task and message == str(self.task.pk): + self.cancel_task = True + + def wakeup_callback(message): + if message == TASK_WAKEUP_UNBLOCK: + # Auxiliary workers don't do this. + self.wakeup_unblock = not self.auxiliary + elif message == TASK_WAKEUP_HANDLE: + self.wakeup_handle = True + else: + _logger.warn("Unknown wakeup call recieved. Reason: '%s'", message) + # We cannot be sure so assume everything happened. + self.wakeup_unblock = not self.auxiliary + self.wakeup_handle = True + + def metric_callback(message): + self.last_metric_heartbeat = datetime.fromisoformat(message) + + self.pubsub_client.subscribe(TASK_PUBSUB.WAKEUP_WORKER) + self.pubsub_channel_callback[TASK_PUBSUB.WAKEUP_WORKER] = wakeup_callback + self.pubsub_client.subscribe(TASK_PUBSUB.CANCEL_TASK) + self.pubsub_channel_callback[TASK_PUBSUB.CANCEL_TASK] = cancellation_callback + self.pubsub_client.subscribe(TASK_PUBSUB.WORKER_METRICS) + self.pubsub_channel_callback[TASK_PUBSUB.WORKER_METRICS] = metric_callback + + def pubsub_teardown(self): + self.pubsub_client.unsubscribe(TASK_PUBSUB.WAKEUP_WORKER) + self.pubsub_client.unsubscribe(TASK_PUBSUB.CANCEL_TASK) + self.pubsub_client.unsubscribe(TASK_PUBSUB.WORKER_METRICS) def run(self, burst=False): with WorkerDirectory(self.name): signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) signal.signal(signal.SIGHUP, self._signal_handler) - # Subscribe to pgsql channels - connection.connection.add_notify_handler(self._pg_notify_handler) - self.cursor.execute("LISTEN pulp_worker_cancel") - self.cursor.execute("LISTEN pulp_worker_metrics_heartbeat") + self.pubsub_setup() if burst: + self.pubsub_client.unsubscribe(self.pubsub_client.WORKER_WAKEUP) if not self.auxiliary: # Attempt to flush the task queue completely. # Stop iteration if no new tasks were found to unblock. @@ -598,7 +610,6 @@ def run(self, burst=False): self.handle_unblocked_tasks() self.handle_unblocked_tasks() else: - self.cursor.execute("LISTEN pulp_worker_wakeup") while not self.shutdown_requested: # do work if self.shutdown_requested: @@ -608,7 +619,5 @@ def run(self, burst=False): break # rest until notified to wakeup self.sleep() - self.cursor.execute("UNLISTEN pulp_worker_wakeup") - self.cursor.execute("UNLISTEN pulp_worker_metrics_heartbeat") - self.cursor.execute("UNLISTEN pulp_worker_cancel") + self.pubsub_teardown() self.shutdown() diff --git a/pulpcore/tests/functional/test_pubsub.py b/pulpcore/tests/functional/test_pubsub.py index 314d6cd189..1db000f3b4 100644 --- a/pulpcore/tests/functional/test_pubsub.py +++ b/pulpcore/tests/functional/test_pubsub.py @@ -1,12 +1,7 @@ -import django - -django.setup() - from django.db import connection from pulpcore.tasking import pubsub from types import SimpleNamespace import select -import time import pytest From 45f5ac01e0d58c3cdd01520fb6075c5d508e6082 Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Thu, 31 Jul 2025 15:50:35 -0300 Subject: [PATCH 03/18] Change PubSub usage and add make use of global connection object --- pulpcore/tasking/pubsub.py | 63 +++++--- pulpcore/tasking/tasks.py | 5 +- pulpcore/tasking/worker.py | 8 +- pulpcore/tests/functional/test_pubsub.py | 178 +++++++++++------------ 4 files changed, 133 insertions(+), 121 deletions(-) diff --git a/pulpcore/tasking/pubsub.py b/pulpcore/tasking/pubsub.py index 4265c9f011..fa2a46cb85 100644 --- a/pulpcore/tasking/pubsub.py +++ b/pulpcore/tasking/pubsub.py @@ -2,6 +2,7 @@ from pulpcore.constants import TASK_PUBSUB import os import logging +from django.db import connection from contextlib import suppress logger = logging.getLogger(__name__) @@ -9,23 +10,27 @@ class BasePubSubBackend: # Utils - def wakeup_worker(self, reason="unknown"): - self.publish(TASK_PUBSUB.WAKEUP_WORKER, reason) + @classmethod + def wakeup_worker(cls, reason="unknown"): + cls.publish(TASK_PUBSUB.WAKEUP_WORKER, reason) - def cancel_task(self, task_pk): - self.publish(TASK_PUBSUB.CANCEL_TASK, str(task_pk)) + @classmethod + def cancel_task(cls, task_pk): + cls.publish(TASK_PUBSUB.CANCEL_TASK, str(task_pk)) - def record_worker_metrics(self, now): - self.publish(TASK_PUBSUB.WORKER_METRICS, str(now)) + @classmethod + def record_worker_metrics(cls, now): + cls.publish(TASK_PUBSUB.WORKER_METRICS, str(now)) # Interface - def subscribe(self, channel, callback): + def subscribe(self, channel): raise NotImplementedError() def unsubscribe(self, channel): raise NotImplementedError() - def publish(self, channel, message=None): + @staticmethod + def publish(channel, payload=None): raise NotImplementedError() def fileno(self): @@ -53,19 +58,19 @@ def drain_non_blocking_fd(fd): class PostgresPubSub(BasePubSubBackend): - def __init__(self, connection): - self.cursor = connection.cursor() - self.connection = connection.connection - assert self.cursor.connection is self.connection + def __init__(self): self.subscriptions = [] self.message_buffer = [] - self.connection.add_notify_handler(self._store_messages) + # Ensures a connection is established + if not connection.connection: + with connection.cursor(): + pass + connection.connection.add_notify_handler(self._store_messages) # Handle message readiness # We can use os.evenfd in python >= 3.10 self.sentinel_r, self.sentinel_w = os.pipe() os.set_blocking(self.sentinel_r, False) os.set_blocking(self.sentinel_w, False) - logger.debug(f"Initialized pubsub. Conn={self.connection}") def _store_messages(self, notification): self.message_buffer.append( @@ -74,20 +79,25 @@ def _store_messages(self, notification): def subscribe(self, channel): self.subscriptions.append(channel) - self.connection.execute(f"LISTEN {channel}") + with connection.cursor() as cursor: + cursor.execute(f"LISTEN {channel}") def unsubscribe(self, channel): self.subscriptions.remove(channel) for i in range(0, len(self.message_buffer), -1): if self.message_buffer[i].channel == channel: self.message_buffer.pop(i) - self.connection.execute(f"UNLISTEN {channel}") - - def publish(self, channel, message=None): - if not message: - self.cursor.execute(f"NOTIFY {channel}") + with connection.cursor() as cursor: + cursor.execute(f"UNLISTEN {channel}") + + @staticmethod + def publish(channel, payload=None): + if not payload: + with connection.cursor() as cursor: + cursor.execute(f"NOTIFY {channel}") else: - self.cursor.execute("SELECT pg_notify(%s, %s)", (channel, message)) + with connection.cursor() as cursor: + cursor.execute("SELECT pg_notify(%s, %s)", (channel, str(payload))) def fileno(self) -> int: if self.message_buffer: @@ -97,7 +107,8 @@ def fileno(self) -> int: return self.sentinel_r def fetch(self) -> list[PubsubMessage]: - self.connection.execute("SELECT 1").fetchone() + with connection.cursor() as cursor: + cursor.execute("SELECT 1").fetchone() result = self.message_buffer.copy() self.message_buffer.clear() return result @@ -105,10 +116,16 @@ def fetch(self) -> list[PubsubMessage]: def close(self): os.close(self.sentinel_r) os.close(self.sentinel_w) - self.cursor.close() + self.message_buffer.clear() + connection.connection.remove_notify_handler(self._store_messages) + for channel in self.subscriptions: + self.unsubscribe(channel) def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.close() + + +backend = PostgresPubSub diff --git a/pulpcore/tasking/tasks.py b/pulpcore/tasking/tasks.py index a1e5aedb8c..65422dd125 100644 --- a/pulpcore/tasking/tasks.py +++ b/pulpcore/tasking/tasks.py @@ -427,9 +427,8 @@ def cancel_task(task_id): # This is the only valid transition without holding the task lock. task.set_canceling() # Notify the worker that might be running that task and other workers to clean up - with pubsub.PostgresPubSub(connection) as pubsub_client: - pubsub_client.cancel_task(task_pk=task.pk) - pubsub_client.wakeup_worker() + pubsub.backend.cancel_task(task_pk=task.pk) + pubsub.backend.wakeup_worker() return task diff --git a/pulpcore/tasking/worker.py b/pulpcore/tasking/worker.py index b23006e915..ebb0881298 100644 --- a/pulpcore/tasking/worker.py +++ b/pulpcore/tasking/worker.py @@ -13,7 +13,7 @@ from packaging.version import parse as parse_version from django.conf import settings -from django.db import connection, DatabaseError, IntegrityError +from django.db import DatabaseError, IntegrityError from django.db.models import Case, Count, F, Max, Value, When from django.utils import timezone @@ -91,7 +91,7 @@ def __init__(self, auxiliary=False): int(WORKER_CLEANUP_INTERVAL / 10), WORKER_CLEANUP_INTERVAL ) # Pubsub handling - self.pubsub_client = pubsub.PostgresPubSub(connection) + self.pubsub_client = pubsub.backend() self.pubsub_channel_callback = {} # Add a file descriptor to trigger select on signals @@ -591,9 +591,7 @@ def metric_callback(message): self.pubsub_channel_callback[TASK_PUBSUB.WORKER_METRICS] = metric_callback def pubsub_teardown(self): - self.pubsub_client.unsubscribe(TASK_PUBSUB.WAKEUP_WORKER) - self.pubsub_client.unsubscribe(TASK_PUBSUB.CANCEL_TASK) - self.pubsub_client.unsubscribe(TASK_PUBSUB.WORKER_METRICS) + self.pubsub_client.close() def run(self, burst=False): with WorkerDirectory(self.name): diff --git a/pulpcore/tests/functional/test_pubsub.py b/pulpcore/tests/functional/test_pubsub.py index 1db000f3b4..4fc2b38b05 100644 --- a/pulpcore/tests/functional/test_pubsub.py +++ b/pulpcore/tests/functional/test_pubsub.py @@ -1,118 +1,116 @@ from django.db import connection from pulpcore.tasking import pubsub from types import SimpleNamespace +from datetime import datetime import select import pytest def test_postgres_pubsub(): + """Testing postgres low-level implementation.""" state = SimpleNamespace() - state.got_first_message = False - state.got_second_message = False + state.got_message = False with connection.cursor() as cursor: assert connection.connection is cursor.connection conn = cursor.connection + # Listen and Notify conn.execute("LISTEN abc") conn.add_notify_handler(lambda notification: setattr(state, "got_message", True)) cursor.execute("NOTIFY abc, 'foo'") + assert state.got_message is True conn.execute("SELECT 1") + assert state.got_message is True + + # Reset and retry + state.got_message = False conn.execute("UNLISTEN abc") - assert state.got_message is True + cursor.execute("NOTIFY abc, 'foo'") + assert state.got_message is False M = pubsub.PubsubMessage +PUBSUB_BACKENDS = [ + pytest.param(pubsub.PostgresPubSub, id="and-using-postgres-backend"), +] + + +@pytest.mark.parametrize("pubsub_backend", PUBSUB_BACKENDS) +class TestPublish: + + @pytest.mark.parametrize( + "payload", + ( + pytest.param(None, id="none"), + pytest.param("", id="empty-string"), + pytest.param("payload", id="non-empty-string"), + pytest.param(123, id="int"), + pytest.param(datetime.now(), id="datetime"), + pytest.param(True, id="bool"), + ), + ) + def test_with_payload_as(self, pubsub_backend, payload): + pubsub_backend.publish("channel", payload=payload) + +@pytest.mark.parametrize("pubsub_backend", PUBSUB_BACKENDS) @pytest.mark.parametrize( "messages", ( - [M("channel_a", "A1")], - [M("channel_a", "A1"), M("channel_a", "A2")], - [M("channel_a", "A1"), M("channel_a", "A2"), M("channel_b", "B1"), M("channel_c", "C1")], + pytest.param([M("a", "A1")], id="single-message"), + pytest.param([M("a", "A1"), M("a", "A2")], id="two-messages-in-same-channel"), + pytest.param( + [M("a", "A1"), M("a", "A2"), M("b", "B1"), M("c", "C1")], + id="tree-msgs-in-different-channels", + ), ), ) -@pytest.mark.parametrize("same_client", (True, False), ids=("same-clients", "different-clients")) -class TestPubSub: - - def test_subscribe_publish_fetch(self, same_client, messages): - """ - GIVEN a publisher and a subscriber (which may be the same) - AND a queue of messages Q with mixed channels and payloads - WHEN the subscriber subscribes to all the channels in Q - AND the publisher publishes all the messages in Q - THEN the subscriber fetch() call returns a queue equivalent to Q - AND calling fetch() a second time returns an empty queue - """ - # Given - publisher = pubsub.PostgresPubSub(connection) - subscriber = publisher if same_client else pubsub.PostgresPubSub(connection) - - # When - for message in messages: - subscriber.subscribe(message.channel) - for message in messages: - publisher.publish(message.channel, message=message.payload) - - # Then - assert subscriber.fetch() == messages - assert subscriber.fetch() == [] - - def test_unsubscribe(self, same_client, messages): - """ - GIVEN a publisher and a subscriber (which may be the same) - AND a queue of messages Q with mixed channels and payloads - WHEN the subscriber subscribes and unsubscribes to all the channels in Q - AND the publisher publishes all the messages in Q - THEN the subscriber fetch() call returns an empty queue - """ - # Given - publisher = pubsub.PostgresPubSub(connection) - subscriber = publisher if same_client else pubsub.PostgresPubSub(connection) - - # When - for message in messages: - subscriber.subscribe(message.channel) - for message in messages: - subscriber.unsubscribe(message.channel) - for message in messages: - publisher.publish(message.channel, message=message.payload) - - # Then - assert subscriber.fetch() == [] - - def test_select_loop(self, same_client, messages): - """ - GIVEN a publisher and a subscriber (which may be the same) - AND a queue of messages Q with mixed channels and payloads - AND the subscriber is subscribed to all the channels in Q - WHEN the publisher has NOT published anything yet - THEN the select loop won't detect the subscriber readiness - AND the subscriber fetch() call returns an empty queue - BUT WHEN the publisher does publish all messages in Q - THEN the select loop detects the subscriber readiness - AND the subscriber fetch() call returns a queue equivalent to Q - """ +class TestSubscribeFetch: + def unsubscribe_all(self, channels, subscriber): + for channel in channels: + subscriber.unsubscribe(channel) + + def subscribe_all(self, channels, subscriber): + for channel in channels: + subscriber.subscribe(channel) + + def publish_all(self, messages, publisher): + for channel, payload in messages: + publisher.publish(channel, payload=payload) + + def test_with( + self, pubsub_backend: pubsub.BasePubSubBackend, messages: list[pubsub.PubsubMessage] + ): + channels = {m.channel for m in messages} + publisher = pubsub_backend + with pubsub_backend() as subscriber: + self.subscribe_all(channels, subscriber) + self.publish_all(messages, publisher) + assert subscriber.fetch() == messages + + self.unsubscribe_all(channels, subscriber) + assert subscriber.fetch() == [] + + def test_select_readiness_with( + self, pubsub_backend: pubsub.BasePubSubBackend, messages: list[pubsub.PubsubMessage] + ): TIMEOUT = 0.1 - - # Given - publisher = pubsub.PostgresPubSub(connection) - subscriber = publisher if same_client else pubsub.PostgresPubSub(connection) - - # When - for message in messages: - subscriber.subscribe(message.channel) - r, w, x = select.select([subscriber], [], [], TIMEOUT) - - # Then - assert subscriber not in r - assert subscriber.fetch() == [] - - # But When - for message in messages: - publisher.publish(message.channel, message=message.payload) - r, w, x = select.select([subscriber], [], [], TIMEOUT) - - # Then - assert subscriber in r - assert subscriber.fetch() == messages - assert subscriber.fetch() == [] + channels = {m.channel for m in messages} + publisher = pubsub_backend + with pubsub_backend() as subscriber: + self.subscribe_all(channels, subscriber) + r, w, x = select.select([subscriber], [], [], TIMEOUT) + assert subscriber not in r + assert subscriber.fetch() == [] + + self.publish_all(messages, publisher) + r, w, x = select.select([subscriber], [], [], TIMEOUT) + assert subscriber in r + assert subscriber.fetch() == messages + assert subscriber.fetch() == [] + + self.unsubscribe_all(channels, subscriber) + self.publish_all(messages, publisher) + r, w, x = select.select([subscriber], [], [], TIMEOUT) + assert subscriber not in r + assert subscriber.fetch() == [] From 7aad72d540d38ce7ac512cbb0355e8f6c32e06f3 Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Thu, 31 Jul 2025 19:05:06 -0300 Subject: [PATCH 04/18] wip debugging --- pulpcore/tasking/pubsub.py | 12 ++++++++---- pulpcore/tasking/worker.py | 16 +++++++++++++++- pulpcore/tests/functional/test_pubsub.py | 16 ++++++++++------ 3 files changed, 33 insertions(+), 11 deletions(-) diff --git a/pulpcore/tasking/pubsub.py b/pulpcore/tasking/pubsub.py index fa2a46cb85..d56a4a7dfd 100644 --- a/pulpcore/tasking/pubsub.py +++ b/pulpcore/tasking/pubsub.py @@ -56,6 +56,9 @@ def drain_non_blocking_fd(fd): os.read(fd, 256) +PID = os.getpid() + + class PostgresPubSub(BasePubSubBackend): def __init__(self): @@ -73,6 +76,8 @@ def __init__(self): os.set_blocking(self.sentinel_w, False) def _store_messages(self, notification): + logger.info(f"[{PID}] Received message: {notification}") + os.write(self.sentinel_w, b"0") self.message_buffer.append( PubsubMessage(channel=notification.channel, payload=notification.payload) ) @@ -92,6 +97,7 @@ def unsubscribe(self, channel): @staticmethod def publish(channel, payload=None): + logger.info(f"[{PID}] Published message: ({channel}, {payload})") if not payload: with connection.cursor() as cursor: cursor.execute(f"NOTIFY {channel}") @@ -100,17 +106,15 @@ def publish(channel, payload=None): cursor.execute("SELECT pg_notify(%s, %s)", (channel, str(payload))) def fileno(self) -> int: - if self.message_buffer: - os.write(self.sentinel_w, b"0") - else: - drain_non_blocking_fd(self.sentinel_r) return self.sentinel_r def fetch(self) -> list[PubsubMessage]: with connection.cursor() as cursor: cursor.execute("SELECT 1").fetchone() result = self.message_buffer.copy() + drain_non_blocking_fd(self.sentinel_r) self.message_buffer.clear() + # logger.info(f"[{PID}] Fetched messages: {result}") return result def close(self): diff --git a/pulpcore/tasking/worker.py b/pulpcore/tasking/worker.py index ebb0881298..666ae3db31 100644 --- a/pulpcore/tasking/worker.py +++ b/pulpcore/tasking/worker.py @@ -385,6 +385,17 @@ def supervise_task(self, task): ) os.kill(task_process.pid, signal.SIGUSR1) + if self.cancel_task: + _logger.info( + _("Received signal to cancel current task %s in domain: %s."), + task.pk, + domain.name, + ) + cancel_state = TASK_STATES.CANCELED + self.cancel_task = False + if self.wakeup_unblock: + self.unblock_tasks() + r, w, x = select.select( [self.sentinel, self.pubsub_client, task_process.sentinel], [], @@ -558,7 +569,8 @@ def _record_unblocked_waiting_tasks_metric(self): self.pubsub_client.record_worker_metrics(now) - def pubsub_handle_messages(self, messages: pubsub.PubsubMessage): + def pubsub_handle_messages(self): + messages = self.pubsub_client.fetch() for message in messages: callback = self.pubsub_channel_callback[message.channel] callback(message.payload) @@ -612,10 +624,12 @@ def run(self, burst=False): # do work if self.shutdown_requested: break + _logger.info(_("=== Worker %s will handle unblocked tasks. ==="), self.name) self.handle_unblocked_tasks() if self.shutdown_requested: break # rest until notified to wakeup + _logger.info(_("*** Worker %s entering sleep state. ***"), self.name) self.sleep() self.pubsub_teardown() self.shutdown() diff --git a/pulpcore/tests/functional/test_pubsub.py b/pulpcore/tests/functional/test_pubsub.py index 4fc2b38b05..ea6be4fed4 100644 --- a/pulpcore/tests/functional/test_pubsub.py +++ b/pulpcore/tests/functional/test_pubsub.py @@ -99,18 +99,22 @@ def test_select_readiness_with( publisher = pubsub_backend with pubsub_backend() as subscriber: self.subscribe_all(channels, subscriber) - r, w, x = select.select([subscriber], [], [], TIMEOUT) - assert subscriber not in r + ready, _, _ = select.select([subscriber], [], [], TIMEOUT) + assert subscriber not in ready assert subscriber.fetch() == [] self.publish_all(messages, publisher) - r, w, x = select.select([subscriber], [], [], TIMEOUT) - assert subscriber in r + ready, _, _ = select.select([subscriber], [], [], TIMEOUT) + assert subscriber in ready + ready, _, _ = select.select([subscriber], [], [], TIMEOUT) + assert subscriber in ready assert subscriber.fetch() == messages + ready, _, _ = select.select([subscriber], [], [], TIMEOUT) + assert subscriber not in ready assert subscriber.fetch() == [] self.unsubscribe_all(channels, subscriber) self.publish_all(messages, publisher) - r, w, x = select.select([subscriber], [], [], TIMEOUT) - assert subscriber not in r + ready, _, _ = select.select([subscriber], [], [], TIMEOUT) + assert subscriber not in ready assert subscriber.fetch() == [] From a7394bd49c4e8bc8935a3e93bfab3d5753dec8ff Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Thu, 31 Jul 2025 21:03:21 -0300 Subject: [PATCH 05/18] Wip more debugging --- pulpcore/tasking/pubsub.py | 39 ++++++++++++------- pulpcore/tasking/worker.py | 33 ++++++++++------ .../api/using_plugin/test_pagination.py | 14 +++++++ 3 files changed, 60 insertions(+), 26 deletions(-) diff --git a/pulpcore/tasking/pubsub.py b/pulpcore/tasking/pubsub.py index d56a4a7dfd..f9410a3403 100644 --- a/pulpcore/tasking/pubsub.py +++ b/pulpcore/tasking/pubsub.py @@ -29,8 +29,8 @@ def subscribe(self, channel): def unsubscribe(self, channel): raise NotImplementedError() - @staticmethod - def publish(channel, payload=None): + @classmethod + def publish(cls, channel, payload=None): raise NotImplementedError() def fileno(self): @@ -60,14 +60,12 @@ def drain_non_blocking_fd(fd): class PostgresPubSub(BasePubSubBackend): + cursor = None def __init__(self): self.subscriptions = [] self.message_buffer = [] - # Ensures a connection is established - if not connection.connection: - with connection.cursor(): - pass + PostgresPubSub.cursor = connection.cursor() connection.connection.add_notify_handler(self._store_messages) # Handle message readiness # We can use os.evenfd in python >= 3.10 @@ -76,7 +74,7 @@ def __init__(self): os.set_blocking(self.sentinel_w, False) def _store_messages(self, notification): - logger.info(f"[{PID}] Received message: {notification}") + # logger.info(f"[{PID}] Received message: {notification}") os.write(self.sentinel_w, b"0") self.message_buffer.append( PubsubMessage(channel=notification.channel, payload=notification.payload) @@ -95,15 +93,24 @@ def unsubscribe(self, channel): with connection.cursor() as cursor: cursor.execute(f"UNLISTEN {channel}") - @staticmethod - def publish(channel, payload=None): - logger.info(f"[{PID}] Published message: ({channel}, {payload})") - if not payload: - with connection.cursor() as cursor: - cursor.execute(f"NOTIFY {channel}") + @classmethod + def publish(cls, channel, payload=None): + # import inspect + # s = inspect.stack()[2] + # source = f"{s.filename.split('/')[-1]}@{s.lineno}:{s.function}" + # logger.info(f"[{PID}][{source}] Published message: ({channel}, {payload})") + + query = ( + (f"NOTIFY {channel}",) + if not payload + else ("SELECT pg_notify(%s, %s)", (channel, str(payload))) + ) + + if cls.cursor: + cls.cursor.execute(*query) else: with connection.cursor() as cursor: - cursor.execute("SELECT pg_notify(%s, %s)", (channel, str(payload))) + cursor.execute(*query) def fileno(self) -> int: return self.sentinel_r @@ -122,6 +129,10 @@ def close(self): os.close(self.sentinel_w) self.message_buffer.clear() connection.connection.remove_notify_handler(self._store_messages) + class_cursor = PostgresPubSub.cursor + if class_cursor and not class_cursor.closed: + class_cursor.close() + PostgresPubSub.cursor = None for channel in self.subscriptions: self.unsubscribe(channel) diff --git a/pulpcore/tasking/worker.py b/pulpcore/tasking/worker.py index 666ae3db31..5688852348 100644 --- a/pulpcore/tasking/worker.py +++ b/pulpcore/tasking/worker.py @@ -7,6 +7,7 @@ import signal import socket import contextlib +import collections from datetime import datetime, timedelta from multiprocessing import Process from tempfile import TemporaryDirectory @@ -393,8 +394,6 @@ def supervise_task(self, task): ) cancel_state = TASK_STATES.CANCELED self.cancel_task = False - if self.wakeup_unblock: - self.unblock_tasks() r, w, x = select.select( [self.sentinel, self.pubsub_client, task_process.sentinel], @@ -456,7 +455,7 @@ def supervise_task(self, task): if cancel_state: self.cancel_abandoned_task(task, cancel_state, cancel_reason) if task.reserved_resources_record: - self.pubsub_client.wakeup_worker(reason=TASK_WAKEUP_UNBLOCK) + self.unblock_tasks() self.task = None def fetch_task(self): @@ -571,16 +570,25 @@ def _record_unblocked_waiting_tasks_metric(self): def pubsub_handle_messages(self): messages = self.pubsub_client.fetch() + by_channel = collections.defaultdict(list) for message in messages: - callback = self.pubsub_channel_callback[message.channel] - callback(message.payload) + by_channel[message.channel].append(message.payload) + for channel, channel_messages in by_channel.items(): + callback = self.pubsub_channel_callback[channel] + callback(channel_messages) def pubsub_setup(self): - def cancellation_callback(message): - if self.task and message == str(self.task.pk): - self.cancel_task = True + def cancellation_callback(messages): + for message in messages: + if self.task and message == str(self.task.pk): + self.cancel_task = True + + def wakeup_callback(messages): + if len(messages) != 1: + message = "unknown" + else: + message = messages[0] - def wakeup_callback(message): if message == TASK_WAKEUP_UNBLOCK: # Auxiliary workers don't do this. self.wakeup_unblock = not self.auxiliary @@ -592,7 +600,8 @@ def wakeup_callback(message): self.wakeup_unblock = not self.auxiliary self.wakeup_handle = True - def metric_callback(message): + def metric_callback(messages): + message = messages[0] self.last_metric_heartbeat = datetime.fromisoformat(message) self.pubsub_client.subscribe(TASK_PUBSUB.WAKEUP_WORKER) @@ -624,12 +633,12 @@ def run(self, burst=False): # do work if self.shutdown_requested: break - _logger.info(_("=== Worker %s will handle unblocked tasks. ==="), self.name) + # _logger.info(_("=== Worker %s will handle unblocked tasks. ==="), self.name) self.handle_unblocked_tasks() if self.shutdown_requested: break # rest until notified to wakeup - _logger.info(_("*** Worker %s entering sleep state. ***"), self.name) + # _logger.info(_("*** Worker %s entering sleep state. ***"), self.name) self.sleep() self.pubsub_teardown() self.shutdown() diff --git a/pulpcore/tests/functional/api/using_plugin/test_pagination.py b/pulpcore/tests/functional/api/using_plugin/test_pagination.py index 333a7d3bcc..6a533df462 100644 --- a/pulpcore/tests/functional/api/using_plugin/test_pagination.py +++ b/pulpcore/tests/functional/api/using_plugin/test_pagination.py @@ -3,6 +3,20 @@ import pytest +@pytest.mark.parallel +def test_debugging( + file_bindings, + file_content_unit_with_name_factory, + file_repo, + monitor_task, +): + i = 0 + content_unit = file_content_unit_with_name_factory(f"{i}.iso") + file_bindings.RepositoriesFileApi.modify( + file_repo.pulp_href, {"add_content_units": [content_unit.pulp_href]} + ).task + + @pytest.mark.parallel def test_repo_version_pagination( file_bindings, From dfc682bf928d1232e78b51223a79fa2137eddc63 Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Thu, 31 Jul 2025 21:24:51 -0300 Subject: [PATCH 06/18] Wip trying a workaround --- pulpcore/tasking/pubsub.py | 4 ++++ pulpcore/tests/functional/test_pubsub.py | 2 ++ 2 files changed, 6 insertions(+) diff --git a/pulpcore/tasking/pubsub.py b/pulpcore/tasking/pubsub.py index f9410a3403..8825c232fa 100644 --- a/pulpcore/tasking/pubsub.py +++ b/pulpcore/tasking/pubsub.py @@ -2,6 +2,7 @@ from pulpcore.constants import TASK_PUBSUB import os import logging +import select from django.db import connection from contextlib import suppress @@ -113,6 +114,9 @@ def publish(cls, channel, payload=None): cursor.execute(*query) def fileno(self) -> int: + has_data, _, _ = select.select([connection.connection], [], [], 0) + if has_data: + return connection.connection.fileno() return self.sentinel_r def fetch(self) -> list[PubsubMessage]: diff --git a/pulpcore/tests/functional/test_pubsub.py b/pulpcore/tests/functional/test_pubsub.py index ea6be4fed4..015c5be15e 100644 --- a/pulpcore/tests/functional/test_pubsub.py +++ b/pulpcore/tests/functional/test_pubsub.py @@ -106,9 +106,11 @@ def test_select_readiness_with( self.publish_all(messages, publisher) ready, _, _ = select.select([subscriber], [], [], TIMEOUT) assert subscriber in ready + ready, _, _ = select.select([subscriber], [], [], TIMEOUT) assert subscriber in ready assert subscriber.fetch() == messages + ready, _, _ = select.select([subscriber], [], [], TIMEOUT) assert subscriber not in ready assert subscriber.fetch() == [] From 20b5e7ffecd2374e4965afcd2d844206fc801605 Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Mon, 4 Aug 2025 13:38:17 -0300 Subject: [PATCH 07/18] Add inter-process communication tests --- pulpcore/tasking/pubsub.py | 43 ++-- pulpcore/tests/functional/test_pubsub.py | 248 ++++++++++++++++++++++- 2 files changed, 265 insertions(+), 26 deletions(-) diff --git a/pulpcore/tasking/pubsub.py b/pulpcore/tasking/pubsub.py index 8825c232fa..768fdd02a0 100644 --- a/pulpcore/tasking/pubsub.py +++ b/pulpcore/tasking/pubsub.py @@ -61,10 +61,9 @@ def drain_non_blocking_fd(fd): class PostgresPubSub(BasePubSubBackend): - cursor = None def __init__(self): - self.subscriptions = [] + self._subscriptions = set() self.message_buffer = [] PostgresPubSub.cursor = connection.cursor() connection.connection.add_notify_handler(self._store_messages) @@ -81,18 +80,8 @@ def _store_messages(self, notification): PubsubMessage(channel=notification.channel, payload=notification.payload) ) - def subscribe(self, channel): - self.subscriptions.append(channel) - with connection.cursor() as cursor: - cursor.execute(f"LISTEN {channel}") - - def unsubscribe(self, channel): - self.subscriptions.remove(channel) - for i in range(0, len(self.message_buffer), -1): - if self.message_buffer[i].channel == channel: - self.message_buffer.pop(i) - with connection.cursor() as cursor: - cursor.execute(f"UNLISTEN {channel}") + def get_subscriptions(self): + return self._subscriptions.copy() @classmethod def publish(cls, channel, payload=None): @@ -107,11 +96,21 @@ def publish(cls, channel, payload=None): else ("SELECT pg_notify(%s, %s)", (channel, str(payload))) ) - if cls.cursor: - cls.cursor.execute(*query) - else: - with connection.cursor() as cursor: - cursor.execute(*query) + with connection.cursor() as cursor: + cursor.execute(*query) + + def subscribe(self, channel): + self._subscriptions.add(channel) + with connection.cursor() as cursor: + cursor.execute(f"LISTEN {channel}") + + def unsubscribe(self, channel): + self._subscriptions.remove(channel) + for i in range(0, len(self.message_buffer), -1): + if self.message_buffer[i].channel == channel: + self.message_buffer.pop(i) + with connection.cursor() as cursor: + cursor.execute(f"UNLISTEN {channel}") def fileno(self) -> int: has_data, _, _ = select.select([connection.connection], [], [], 0) @@ -133,11 +132,7 @@ def close(self): os.close(self.sentinel_w) self.message_buffer.clear() connection.connection.remove_notify_handler(self._store_messages) - class_cursor = PostgresPubSub.cursor - if class_cursor and not class_cursor.closed: - class_cursor.close() - PostgresPubSub.cursor = None - for channel in self.subscriptions: + for channel in self.get_subscriptions(): self.unsubscribe(channel) def __enter__(self): diff --git a/pulpcore/tests/functional/test_pubsub.py b/pulpcore/tests/functional/test_pubsub.py index 015c5be15e..4ab5032182 100644 --- a/pulpcore/tests/functional/test_pubsub.py +++ b/pulpcore/tests/functional/test_pubsub.py @@ -1,9 +1,17 @@ -from django.db import connection +from django.db import connection, connections from pulpcore.tasking import pubsub from types import SimpleNamespace from datetime import datetime import select import pytest +import threading +from functools import partial +from contextlib import contextmanager +from multiprocessing import Process, Pipe, Lock, SimpleQueue, Value +from multiprocessing.connection import Connection +import multiprocessing as mp + +# mp.set_start_method('spawn') def test_postgres_pubsub(): @@ -65,7 +73,7 @@ def test_with_payload_as(self, pubsub_backend, payload): ), ), ) -class TestSubscribeFetch: +class TestNoIpcSubscribeFetch: def unsubscribe_all(self, channels, subscriber): for channel in channels: subscriber.unsubscribe(channel) @@ -120,3 +128,239 @@ def test_select_readiness_with( ready, _, _ = select.select([subscriber], [], [], TIMEOUT) assert subscriber not in ready assert subscriber.fetch() == [] + + +class IpcUtil: + + @staticmethod + def run(host_act, child_act) -> list: + # ensures a connection from one run doesn't interfere with the other + connections.close_all() + conn_1, conn_2 = Pipe() + log = SimpleQueue() + lock = Lock() + turn_1 = partial(IpcUtil._actor_turn, conn_1, starts=True, log=log, lock=lock) + turn_2 = partial(IpcUtil._actor_turn, conn_2, starts=False, log=log, lock=lock) + proc_1 = Process(target=host_act, args=(turn_1, log)) + proc_2 = Process(target=child_act, args=(turn_2, log)) + proc_1.start() + proc_2.start() + try: + proc_1.join() + finally: + conn_1.send("1") + try: + proc_2.join() + finally: + conn_2.send("1") + conn_1.close() + conn_2.close() + result = IpcUtil.read_log(log) + log.close() + return result + + @staticmethod + @contextmanager + def _actor_turn(conn: Connection, starts: bool, log, lock: Lock, done: bool = False): + TIMEOUT = 2 + + def flush_conn(conn): + if not conn.poll(TIMEOUT): + raise TimeoutError() + conn.recv() + + if starts: + with lock: + conn.send("done") + yield + if not done: + flush_conn(conn) + else: + flush_conn(conn) + with lock: + yield + conn.send("done") + + @staticmethod + def read_log(log: SimpleQueue) -> list: + result = [] + while not log.empty(): + result.append(log.get()) + return result + + +def test_ipc_utils(): + RUNS = 1000 + errors = 0 + + def host_act(host_turn, log): + with host_turn(): + log.put(0) + + with host_turn(): + log.put(2) + + with host_turn(): + log.put(4) + + def child_act(child_turn, log): + with child_turn(): + log.put(1) + + with child_turn(): + log.put(3) + + with child_turn(): + log.put(5) + + def run(): + log = IpcUtil.run(host_act, child_act) + if log != [0, 1, 2, 3, 4, 5]: + return 1 + return 0 + + for _ in range(RUNS): + errors += run() + + error_rate = errors / RUNS + assert error_rate == 0 + + +def test_postgres_backend_ipc(): + """Asserts that we are really testing two different connections. + + From psycopg, the backend_id is: + "The process ID (PID) of the backend process handling this connection." + """ + + def host_act(host_turn, log): + with host_turn(): # 1 + from django.db import connection + + assert connection.connection is None + with connection.cursor() as cursor: + cursor.execute("select 1") + assert connection.connection is not None + log.put(connection.connection.info.backend_pid) + + def child_act(child_turn, log): + with child_turn(): # 2 + from django.db import connection + + assert connection.connection is None + with connection.cursor() as cursor: + cursor.execute("select 1") + assert connection.connection is not None + log.put(connection.connection.info.backend_pid) + + log = IpcUtil.run(host_act, child_act) + assert len(log) == 2 + host_connection_pid, child_connection_pid = log + assert host_connection_pid != child_connection_pid + + +@pytest.mark.parametrize("pubsub_backend", PUBSUB_BACKENDS) +@pytest.mark.parametrize( + "messages", + ( + pytest.param([M("a", "A1")], id="single-message"), + pytest.param([M("a", "A1")], id="test-leaking"), + pytest.param([M("b", "B1"), M("b", "B2")], id="two-messages-in-same-channel"), + pytest.param( + [M("c", "C1"), M("c", "C2"), M("d", "D1"), M("d", "D1")], + id="four-msgs-in-different-channels", + ), + ), +) +class TestIpcSubscribeFetch: + + def test_with( + self, pubsub_backend: pubsub.BasePubSubBackend, messages: list[pubsub.PubsubMessage] + ): + CHANNELS = {m.channel for m in messages} + EXPECTED_LOG = [ + "subscribe", + "publish", + "fetch", + "publish", + "fetch+unsubscribe", + "publish", + "fetch-empty", + ] + + # host + def subscriber_act(subscriber_turn, log): + with pubsub_backend() as subscriber: + with subscriber_turn(): # 1 + log.put("subscribe") + for channel in CHANNELS: + subscriber.subscribe(channel) + + with subscriber_turn(): # 3 + log.put("fetch") + assert subscriber.get_subscriptions() == CHANNELS + assert subscriber.fetch() == messages + assert subscriber.fetch() == [] + + with subscriber_turn(): # 5 + log.put("fetch+unsubscribe") + assert subscriber.fetch() == messages + assert subscriber.fetch() == [] + for channel in CHANNELS: + subscriber.unsubscribe(channel) + + with subscriber_turn(done=True): # 7 + log.put("fetch-empty") + assert subscriber.fetch() == [] + + # child + def publisher_act(publisher_turn, log): + publisher = pubsub_backend + with publisher_turn(): + log.put("publish") + for message in messages: # 2 + publisher.publish(message.channel, payload=message.payload) + + with publisher_turn(): + log.put("publish") + for message in messages: # 4 + publisher.publish(message.channel, payload=message.payload) + + with publisher_turn(): + log.put("publish") + for message in messages: # 6 + publisher.publish(message.channel, payload=message.payload) + + log = IpcUtil.run(subscriber_act, publisher_act) + assert log == EXPECTED_LOG + + @pytest.mark.skip("TODO") + def test_select_readiness_with( + self, pubsub_backend: pubsub.BasePubSubBackend, messages: list[pubsub.PubsubMessage] + ): + TIMEOUT = 0.1 + channels = {m.channel for m in messages} + publisher = pubsub_backend + with pubsub_backend() as subscriber: + self.subscribe_all(channels, subscriber) + ready, _, _ = select.select([subscriber], [], [], TIMEOUT) + assert subscriber not in ready + assert subscriber.fetch() == [] + + self.publish_all(messages, publisher) + ready, _, _ = select.select([subscriber], [], [], TIMEOUT) + assert subscriber in ready + + ready, _, _ = select.select([subscriber], [], [], TIMEOUT) + assert subscriber in ready + assert subscriber.fetch() == messages + + ready, _, _ = select.select([subscriber], [], [], TIMEOUT) + assert subscriber not in ready + assert subscriber.fetch() == [] + + self.unsubscribe_all(channels, subscriber) + self.publish_all(messages, publisher) + ready, _, _ = select.select([subscriber], [], [], TIMEOUT) + assert subscriber not in ready + assert subscriber.fetch() == [] From 7d3d1eb66a8c5940834312aed88ae972cb227fd7 Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Mon, 4 Aug 2025 20:20:42 -0300 Subject: [PATCH 08/18] New attempt to satisfy both pubsub test and side-effects --- pulpcore/tasking/pubsub.py | 31 ++++---- pulpcore/tests/functional/test_pubsub.py | 91 +++++++++++++++--------- 2 files changed, 70 insertions(+), 52 deletions(-) diff --git a/pulpcore/tasking/pubsub.py b/pulpcore/tasking/pubsub.py index 768fdd02a0..55ec9b0b7d 100644 --- a/pulpcore/tasking/pubsub.py +++ b/pulpcore/tasking/pubsub.py @@ -65,31 +65,27 @@ class PostgresPubSub(BasePubSubBackend): def __init__(self): self._subscriptions = set() self.message_buffer = [] - PostgresPubSub.cursor = connection.cursor() - connection.connection.add_notify_handler(self._store_messages) - # Handle message readiness - # We can use os.evenfd in python >= 3.10 + self.cursor = connection.cursor() + self.backend_pid = connection.connection.info.backend_pid + # logger.info(f"{connection.connection.info.backend_pid=}") self.sentinel_r, self.sentinel_w = os.pipe() os.set_blocking(self.sentinel_r, False) os.set_blocking(self.sentinel_w, False) + connection.connection.add_notify_handler(self._store_messages) def _store_messages(self, notification): # logger.info(f"[{PID}] Received message: {notification}") - os.write(self.sentinel_w, b"0") self.message_buffer.append( PubsubMessage(channel=notification.channel, payload=notification.payload) ) + if notification.pid == self.backend_pid: + os.write(self.sentinel_w, b"1") def get_subscriptions(self): return self._subscriptions.copy() @classmethod def publish(cls, channel, payload=None): - # import inspect - # s = inspect.stack()[2] - # source = f"{s.filename.split('/')[-1]}@{s.lineno}:{s.function}" - # logger.info(f"[{PID}][{source}] Published message: ({channel}, {payload})") - query = ( (f"NOTIFY {channel}",) if not payload @@ -113,25 +109,26 @@ def unsubscribe(self, channel): cursor.execute(f"UNLISTEN {channel}") def fileno(self) -> int: - has_data, _, _ = select.select([connection.connection], [], [], 0) - if has_data: - return connection.connection.fileno() - return self.sentinel_r + ready, _, _ = select.select([self.sentinel_r], [], [], 0) + if self.sentinel_r in ready: + return self.sentinel_r + return connection.connection.fileno() def fetch(self) -> list[PubsubMessage]: with connection.cursor() as cursor: cursor.execute("SELECT 1").fetchone() result = self.message_buffer.copy() - drain_non_blocking_fd(self.sentinel_r) self.message_buffer.clear() + drain_non_blocking_fd(self.sentinel_r) # logger.info(f"[{PID}] Fetched messages: {result}") return result def close(self): - os.close(self.sentinel_r) - os.close(self.sentinel_w) self.message_buffer.clear() connection.connection.remove_notify_handler(self._store_messages) + drain_non_blocking_fd(self.sentinel_r) + os.close(self.sentinel_r) + os.close(self.sentinel_w) for channel in self.get_subscriptions(): self.unsubscribe(channel) diff --git a/pulpcore/tests/functional/test_pubsub.py b/pulpcore/tests/functional/test_pubsub.py index 4ab5032182..28edc48f67 100644 --- a/pulpcore/tests/functional/test_pubsub.py +++ b/pulpcore/tests/functional/test_pubsub.py @@ -4,14 +4,10 @@ from datetime import datetime import select import pytest -import threading from functools import partial from contextlib import contextmanager -from multiprocessing import Process, Pipe, Lock, SimpleQueue, Value +from multiprocessing import Process, Pipe, Lock, SimpleQueue from multiprocessing.connection import Connection -import multiprocessing as mp - -# mp.set_start_method('spawn') def test_postgres_pubsub(): @@ -103,10 +99,12 @@ def test_select_readiness_with( self, pubsub_backend: pubsub.BasePubSubBackend, messages: list[pubsub.PubsubMessage] ): TIMEOUT = 0.1 - channels = {m.channel for m in messages} + CHANNELS = {m.channel for m in messages} publisher = pubsub_backend with pubsub_backend() as subscriber: - self.subscribe_all(channels, subscriber) + self.subscribe_all(CHANNELS, subscriber) + assert subscriber.get_subscriptions() == CHANNELS + ready, _, _ = select.select([subscriber], [], [], TIMEOUT) assert subscriber not in ready assert subscriber.fetch() == [] @@ -123,7 +121,7 @@ def test_select_readiness_with( assert subscriber not in ready assert subscriber.fetch() == [] - self.unsubscribe_all(channels, subscriber) + self.unsubscribe_all(CHANNELS, subscriber) self.publish_all(messages, publisher) ready, _, _ = select.select([subscriber], [], [], TIMEOUT) assert subscriber not in ready @@ -162,7 +160,7 @@ def run(host_act, child_act) -> list: @staticmethod @contextmanager def _actor_turn(conn: Connection, starts: bool, log, lock: Lock, done: bool = False): - TIMEOUT = 2 + TIMEOUT = 1 def flush_conn(conn): if not conn.poll(TIMEOUT): @@ -189,6 +187,7 @@ def read_log(log: SimpleQueue) -> list: return result +# @pytest.mark.skip def test_ipc_utils(): RUNS = 1000 errors = 0 @@ -235,8 +234,6 @@ def test_postgres_backend_ipc(): def host_act(host_turn, log): with host_turn(): # 1 - from django.db import connection - assert connection.connection is None with connection.cursor() as cursor: cursor.execute("select 1") @@ -245,8 +242,6 @@ def host_act(host_turn, log): def child_act(child_turn, log): with child_turn(): # 2 - from django.db import connection - assert connection.connection is None with connection.cursor() as cursor: cursor.execute("select 1") @@ -334,33 +329,59 @@ def publisher_act(publisher_turn, log): log = IpcUtil.run(subscriber_act, publisher_act) assert log == EXPECTED_LOG - @pytest.mark.skip("TODO") def test_select_readiness_with( self, pubsub_backend: pubsub.BasePubSubBackend, messages: list[pubsub.PubsubMessage] ): TIMEOUT = 0.1 - channels = {m.channel for m in messages} - publisher = pubsub_backend - with pubsub_backend() as subscriber: - self.subscribe_all(channels, subscriber) - ready, _, _ = select.select([subscriber], [], [], TIMEOUT) - assert subscriber not in ready - assert subscriber.fetch() == [] + CHANNELS = {m.channel for m in messages} + EXPECTED_LOG = [ + "subscribe/select-empty", + "publish", + "fetch/select-ready/unsubscribe", + "publish", + "fetch/select-empty", + ] - self.publish_all(messages, publisher) - ready, _, _ = select.select([subscriber], [], [], TIMEOUT) - assert subscriber in ready + def subscriber_act(subscriber_turn, log): + with pubsub_backend() as subscriber: + with subscriber_turn(): # 1 + log.put("subscribe/select-empty") + for channel in CHANNELS: + subscriber.subscribe(channel) + assert subscriber.get_subscriptions() == CHANNELS + ready, _, _ = select.select([subscriber], [], [], TIMEOUT) + assert subscriber not in ready + assert subscriber.fetch() == [] - ready, _, _ = select.select([subscriber], [], [], TIMEOUT) - assert subscriber in ready - assert subscriber.fetch() == messages + with subscriber_turn(): # 3 + log.put("fetch/select-ready/unsubscribe") + ready, _, _ = select.select([subscriber], [], [], TIMEOUT) + assert subscriber in ready - ready, _, _ = select.select([subscriber], [], [], TIMEOUT) - assert subscriber not in ready - assert subscriber.fetch() == [] + ready, _, _ = select.select([subscriber], [], [], TIMEOUT) + assert subscriber in ready + assert subscriber.fetch() == messages + assert subscriber.fetch() == [] + for channel in CHANNELS: + subscriber.unsubscribe(channel) - self.unsubscribe_all(channels, subscriber) - self.publish_all(messages, publisher) - ready, _, _ = select.select([subscriber], [], [], TIMEOUT) - assert subscriber not in ready - assert subscriber.fetch() == [] + with subscriber_turn(): # 5 + log.put("fetch/select-empty") + ready, _, _ = select.select([subscriber], [], [], TIMEOUT) + assert subscriber not in ready + assert subscriber.fetch() == messages + + def publisher_act(publisher_turn, log): + publisher = pubsub_backend + with publisher_turn(): # 2 + log.put("publish") + for message in messages: + publisher.publish(message.channel, payload=message.payload) + + with publisher_turn(): # 4 + log.put("publish") + for message in messages: + publisher.publish(message.channel, payload=message.payload) + + log = IpcUtil.run(subscriber_act, publisher_act) + assert log == EXPECTED_LOG From 61532fdaff3c2c0b5c8dad25f321d5dea34cf065 Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Mon, 4 Aug 2025 21:30:50 -0300 Subject: [PATCH 09/18] Try to turn IpcUtil into a fixture --- pulpcore/tests/functional/test_pubsub.py | 137 ++++++++++++++--------- 1 file changed, 83 insertions(+), 54 deletions(-) diff --git a/pulpcore/tests/functional/test_pubsub.py b/pulpcore/tests/functional/test_pubsub.py index 28edc48f67..6d27eb45cb 100644 --- a/pulpcore/tests/functional/test_pubsub.py +++ b/pulpcore/tests/functional/test_pubsub.py @@ -1,4 +1,3 @@ -from django.db import connection, connections from pulpcore.tasking import pubsub from types import SimpleNamespace from datetime import datetime @@ -10,10 +9,18 @@ from multiprocessing.connection import Connection -def test_postgres_pubsub(): +@pytest.fixture +def django_connection(): + from django.db import connection + + return connection + + +def test_postgres_pubsub(django_connection): """Testing postgres low-level implementation.""" state = SimpleNamespace() state.got_message = False + connection = django_connection with connection.cursor() as cursor: assert connection.connection is cursor.connection conn = cursor.connection @@ -128,56 +135,69 @@ def test_select_readiness_with( assert subscriber.fetch() == [] +@pytest.fixture +def SyncProcesses(): + # ensures a connection from one run doesn't interfere with the other + from django.db import connections + + connections.close_all() + ipc = IpcUtil() + yield ipc + ipc.close() + + +@contextmanager +def actor_turn(conn: Connection, starts: bool, log, lock: Lock, done: bool = False): + TIMEOUT = 1 + + def flush_conn(conn): + if not conn.poll(TIMEOUT): + raise TimeoutError() + conn.recv() + + if starts: + with lock: + conn.send("done") + yield + if not done: + flush_conn(conn) + else: + flush_conn(conn) + with lock: + yield + conn.send("done") + + class IpcUtil: - @staticmethod - def run(host_act, child_act) -> list: - # ensures a connection from one run doesn't interfere with the other - connections.close_all() - conn_1, conn_2 = Pipe() - log = SimpleQueue() - lock = Lock() - turn_1 = partial(IpcUtil._actor_turn, conn_1, starts=True, log=log, lock=lock) - turn_2 = partial(IpcUtil._actor_turn, conn_2, starts=False, log=log, lock=lock) - proc_1 = Process(target=host_act, args=(turn_1, log)) - proc_2 = Process(target=child_act, args=(turn_2, log)) - proc_1.start() - proc_2.start() + def __init__(self): + self.proc_1 = None + self.proc_2 = None + self.conn_1, self.conn_2 = Pipe() + self.log = SimpleQueue() + self.lock = Lock() + + def run(self, host_act, child_act) -> list: + turn_1 = partial(actor_turn, self.conn_1, starts=True, log=self.log, lock=self.lock) + turn_2 = partial(actor_turn, self.conn_2, starts=False, log=self.log, lock=self.lock) + self.proc_1 = Process(target=host_act, args=(turn_1, self.log)) + self.proc_2 = Process(target=child_act, args=(turn_2, self.log)) + self.proc_1.start() + self.proc_2.start() try: - proc_1.join() + self.proc_1.join() finally: - conn_1.send("1") + self.conn_1.send("1") try: - proc_2.join() + self.proc_2.join() finally: - conn_2.send("1") - conn_1.close() - conn_2.close() - result = IpcUtil.read_log(log) - log.close() - return result + self.conn_2.send("1") + return self.read_log(self.log) - @staticmethod - @contextmanager - def _actor_turn(conn: Connection, starts: bool, log, lock: Lock, done: bool = False): - TIMEOUT = 1 - - def flush_conn(conn): - if not conn.poll(TIMEOUT): - raise TimeoutError() - conn.recv() - - if starts: - with lock: - conn.send("done") - yield - if not done: - flush_conn(conn) - else: - flush_conn(conn) - with lock: - yield - conn.send("done") + def close(self): + self.conn_1.close() + self.conn_2.close() + self.log.close() @staticmethod def read_log(log: SimpleQueue) -> list: @@ -187,8 +207,7 @@ def read_log(log: SimpleQueue) -> list: return result -# @pytest.mark.skip -def test_ipc_utils(): +def test_ipc_utils(SyncProcesses): RUNS = 1000 errors = 0 @@ -213,7 +232,7 @@ def child_act(child_turn, log): log.put(5) def run(): - log = IpcUtil.run(host_act, child_act) + log = SyncProcesses.run(host_act, child_act) if log != [0, 1, 2, 3, 4, 5]: return 1 return 0 @@ -225,7 +244,7 @@ def run(): assert error_rate == 0 -def test_postgres_backend_ipc(): +def test_postgres_backend_ipc(SyncProcesses): """Asserts that we are really testing two different connections. From psycopg, the backend_id is: @@ -233,6 +252,8 @@ def test_postgres_backend_ipc(): """ def host_act(host_turn, log): + from django.db import connection + with host_turn(): # 1 assert connection.connection is None with connection.cursor() as cursor: @@ -241,6 +262,8 @@ def host_act(host_turn, log): log.put(connection.connection.info.backend_pid) def child_act(child_turn, log): + from django.db import connection + with child_turn(): # 2 assert connection.connection is None with connection.cursor() as cursor: @@ -248,7 +271,7 @@ def child_act(child_turn, log): assert connection.connection is not None log.put(connection.connection.info.backend_pid) - log = IpcUtil.run(host_act, child_act) + log = SyncProcesses.run(host_act, child_act) assert len(log) == 2 host_connection_pid, child_connection_pid = log assert host_connection_pid != child_connection_pid @@ -270,7 +293,10 @@ def child_act(child_turn, log): class TestIpcSubscribeFetch: def test_with( - self, pubsub_backend: pubsub.BasePubSubBackend, messages: list[pubsub.PubsubMessage] + self, + pubsub_backend: pubsub.BasePubSubBackend, + messages: list[pubsub.PubsubMessage], + SyncProcesses, ): CHANNELS = {m.channel for m in messages} EXPECTED_LOG = [ @@ -326,11 +352,14 @@ def publisher_act(publisher_turn, log): for message in messages: # 6 publisher.publish(message.channel, payload=message.payload) - log = IpcUtil.run(subscriber_act, publisher_act) + log = SyncProcesses.run(subscriber_act, publisher_act) assert log == EXPECTED_LOG def test_select_readiness_with( - self, pubsub_backend: pubsub.BasePubSubBackend, messages: list[pubsub.PubsubMessage] + self, + pubsub_backend: pubsub.BasePubSubBackend, + messages: list[pubsub.PubsubMessage], + SyncProcesses, ): TIMEOUT = 0.1 CHANNELS = {m.channel for m in messages} @@ -383,5 +412,5 @@ def publisher_act(publisher_turn, log): for message in messages: publisher.publish(message.channel, payload=message.payload) - log = IpcUtil.run(subscriber_act, publisher_act) + log = SyncProcesses.run(subscriber_act, publisher_act) assert log == EXPECTED_LOG From ea14bc808d882e98fc96b890585755d8da9510f9 Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Mon, 4 Aug 2025 21:31:32 -0300 Subject: [PATCH 10/18] Revert "Try to turn IpcUtil into a fixture" This reverts commit 6e6879c2e7ca424b1b287e5b7273e64c4816c12f. --- pulpcore/tests/functional/test_pubsub.py | 137 +++++++++-------------- 1 file changed, 54 insertions(+), 83 deletions(-) diff --git a/pulpcore/tests/functional/test_pubsub.py b/pulpcore/tests/functional/test_pubsub.py index 6d27eb45cb..28edc48f67 100644 --- a/pulpcore/tests/functional/test_pubsub.py +++ b/pulpcore/tests/functional/test_pubsub.py @@ -1,3 +1,4 @@ +from django.db import connection, connections from pulpcore.tasking import pubsub from types import SimpleNamespace from datetime import datetime @@ -9,18 +10,10 @@ from multiprocessing.connection import Connection -@pytest.fixture -def django_connection(): - from django.db import connection - - return connection - - -def test_postgres_pubsub(django_connection): +def test_postgres_pubsub(): """Testing postgres low-level implementation.""" state = SimpleNamespace() state.got_message = False - connection = django_connection with connection.cursor() as cursor: assert connection.connection is cursor.connection conn = cursor.connection @@ -135,69 +128,56 @@ def test_select_readiness_with( assert subscriber.fetch() == [] -@pytest.fixture -def SyncProcesses(): - # ensures a connection from one run doesn't interfere with the other - from django.db import connections - - connections.close_all() - ipc = IpcUtil() - yield ipc - ipc.close() - - -@contextmanager -def actor_turn(conn: Connection, starts: bool, log, lock: Lock, done: bool = False): - TIMEOUT = 1 - - def flush_conn(conn): - if not conn.poll(TIMEOUT): - raise TimeoutError() - conn.recv() - - if starts: - with lock: - conn.send("done") - yield - if not done: - flush_conn(conn) - else: - flush_conn(conn) - with lock: - yield - conn.send("done") - - class IpcUtil: - def __init__(self): - self.proc_1 = None - self.proc_2 = None - self.conn_1, self.conn_2 = Pipe() - self.log = SimpleQueue() - self.lock = Lock() - - def run(self, host_act, child_act) -> list: - turn_1 = partial(actor_turn, self.conn_1, starts=True, log=self.log, lock=self.lock) - turn_2 = partial(actor_turn, self.conn_2, starts=False, log=self.log, lock=self.lock) - self.proc_1 = Process(target=host_act, args=(turn_1, self.log)) - self.proc_2 = Process(target=child_act, args=(turn_2, self.log)) - self.proc_1.start() - self.proc_2.start() + @staticmethod + def run(host_act, child_act) -> list: + # ensures a connection from one run doesn't interfere with the other + connections.close_all() + conn_1, conn_2 = Pipe() + log = SimpleQueue() + lock = Lock() + turn_1 = partial(IpcUtil._actor_turn, conn_1, starts=True, log=log, lock=lock) + turn_2 = partial(IpcUtil._actor_turn, conn_2, starts=False, log=log, lock=lock) + proc_1 = Process(target=host_act, args=(turn_1, log)) + proc_2 = Process(target=child_act, args=(turn_2, log)) + proc_1.start() + proc_2.start() try: - self.proc_1.join() + proc_1.join() finally: - self.conn_1.send("1") + conn_1.send("1") try: - self.proc_2.join() + proc_2.join() finally: - self.conn_2.send("1") - return self.read_log(self.log) + conn_2.send("1") + conn_1.close() + conn_2.close() + result = IpcUtil.read_log(log) + log.close() + return result - def close(self): - self.conn_1.close() - self.conn_2.close() - self.log.close() + @staticmethod + @contextmanager + def _actor_turn(conn: Connection, starts: bool, log, lock: Lock, done: bool = False): + TIMEOUT = 1 + + def flush_conn(conn): + if not conn.poll(TIMEOUT): + raise TimeoutError() + conn.recv() + + if starts: + with lock: + conn.send("done") + yield + if not done: + flush_conn(conn) + else: + flush_conn(conn) + with lock: + yield + conn.send("done") @staticmethod def read_log(log: SimpleQueue) -> list: @@ -207,7 +187,8 @@ def read_log(log: SimpleQueue) -> list: return result -def test_ipc_utils(SyncProcesses): +# @pytest.mark.skip +def test_ipc_utils(): RUNS = 1000 errors = 0 @@ -232,7 +213,7 @@ def child_act(child_turn, log): log.put(5) def run(): - log = SyncProcesses.run(host_act, child_act) + log = IpcUtil.run(host_act, child_act) if log != [0, 1, 2, 3, 4, 5]: return 1 return 0 @@ -244,7 +225,7 @@ def run(): assert error_rate == 0 -def test_postgres_backend_ipc(SyncProcesses): +def test_postgres_backend_ipc(): """Asserts that we are really testing two different connections. From psycopg, the backend_id is: @@ -252,8 +233,6 @@ def test_postgres_backend_ipc(SyncProcesses): """ def host_act(host_turn, log): - from django.db import connection - with host_turn(): # 1 assert connection.connection is None with connection.cursor() as cursor: @@ -262,8 +241,6 @@ def host_act(host_turn, log): log.put(connection.connection.info.backend_pid) def child_act(child_turn, log): - from django.db import connection - with child_turn(): # 2 assert connection.connection is None with connection.cursor() as cursor: @@ -271,7 +248,7 @@ def child_act(child_turn, log): assert connection.connection is not None log.put(connection.connection.info.backend_pid) - log = SyncProcesses.run(host_act, child_act) + log = IpcUtil.run(host_act, child_act) assert len(log) == 2 host_connection_pid, child_connection_pid = log assert host_connection_pid != child_connection_pid @@ -293,10 +270,7 @@ def child_act(child_turn, log): class TestIpcSubscribeFetch: def test_with( - self, - pubsub_backend: pubsub.BasePubSubBackend, - messages: list[pubsub.PubsubMessage], - SyncProcesses, + self, pubsub_backend: pubsub.BasePubSubBackend, messages: list[pubsub.PubsubMessage] ): CHANNELS = {m.channel for m in messages} EXPECTED_LOG = [ @@ -352,14 +326,11 @@ def publisher_act(publisher_turn, log): for message in messages: # 6 publisher.publish(message.channel, payload=message.payload) - log = SyncProcesses.run(subscriber_act, publisher_act) + log = IpcUtil.run(subscriber_act, publisher_act) assert log == EXPECTED_LOG def test_select_readiness_with( - self, - pubsub_backend: pubsub.BasePubSubBackend, - messages: list[pubsub.PubsubMessage], - SyncProcesses, + self, pubsub_backend: pubsub.BasePubSubBackend, messages: list[pubsub.PubsubMessage] ): TIMEOUT = 0.1 CHANNELS = {m.channel for m in messages} @@ -412,5 +383,5 @@ def publisher_act(publisher_turn, log): for message in messages: publisher.publish(message.channel, payload=message.payload) - log = SyncProcesses.run(subscriber_act, publisher_act) + log = IpcUtil.run(subscriber_act, publisher_act) assert log == EXPECTED_LOG From 627c80b3418f12dd020dcc0205238aee489fa8bf Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Mon, 4 Aug 2025 21:38:36 -0300 Subject: [PATCH 11/18] Attempt to fix pytest-django import requirements --- pulpcore/tests/functional/test_pubsub.py | 53 ++++++++++++++---------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/pulpcore/tests/functional/test_pubsub.py b/pulpcore/tests/functional/test_pubsub.py index 28edc48f67..4b58776ba6 100644 --- a/pulpcore/tests/functional/test_pubsub.py +++ b/pulpcore/tests/functional/test_pubsub.py @@ -1,17 +1,26 @@ -from django.db import connection, connections -from pulpcore.tasking import pubsub from types import SimpleNamespace from datetime import datetime import select import pytest +from typing import NamedTuple from functools import partial from contextlib import contextmanager from multiprocessing import Process, Pipe, Lock, SimpleQueue from multiprocessing.connection import Connection +@pytest.fixture(autouse=True) +def django_connection_reset(): + from django.db import connections + + connections.close_all() + yield + + def test_postgres_pubsub(): """Testing postgres low-level implementation.""" + from django.db import connection + state = SimpleNamespace() state.got_message = False with connection.cursor() as cursor: @@ -32,14 +41,22 @@ def test_postgres_pubsub(): assert state.got_message is False -M = pubsub.PubsubMessage +class PubsubMessage(NamedTuple): + channel: str + payload: str + + +M = PubsubMessage + + +@pytest.fixture +def pubsub_backend(): + from pulpcore.tasking import pubsub -PUBSUB_BACKENDS = [ - pytest.param(pubsub.PostgresPubSub, id="and-using-postgres-backend"), -] + return pubsub.PostgresPubSub -@pytest.mark.parametrize("pubsub_backend", PUBSUB_BACKENDS) +# @pytest.mark.parametrize("pubsub_backend", PUBSUB_BACKENDS) class TestPublish: @pytest.mark.parametrize( @@ -57,7 +74,7 @@ def test_with_payload_as(self, pubsub_backend, payload): pubsub_backend.publish("channel", payload=payload) -@pytest.mark.parametrize("pubsub_backend", PUBSUB_BACKENDS) +# @pytest.mark.parametrize("pubsub_backend", PUBSUB_BACKENDS) @pytest.mark.parametrize( "messages", ( @@ -82,9 +99,7 @@ def publish_all(self, messages, publisher): for channel, payload in messages: publisher.publish(channel, payload=payload) - def test_with( - self, pubsub_backend: pubsub.BasePubSubBackend, messages: list[pubsub.PubsubMessage] - ): + def test_with(self, pubsub_backend, messages): channels = {m.channel for m in messages} publisher = pubsub_backend with pubsub_backend() as subscriber: @@ -95,9 +110,7 @@ def test_with( self.unsubscribe_all(channels, subscriber) assert subscriber.fetch() == [] - def test_select_readiness_with( - self, pubsub_backend: pubsub.BasePubSubBackend, messages: list[pubsub.PubsubMessage] - ): + def test_select_readiness_with(self, pubsub_backend, messages): TIMEOUT = 0.1 CHANNELS = {m.channel for m in messages} publisher = pubsub_backend @@ -133,7 +146,6 @@ class IpcUtil: @staticmethod def run(host_act, child_act) -> list: # ensures a connection from one run doesn't interfere with the other - connections.close_all() conn_1, conn_2 = Pipe() log = SimpleQueue() lock = Lock() @@ -231,6 +243,7 @@ def test_postgres_backend_ipc(): From psycopg, the backend_id is: "The process ID (PID) of the backend process handling this connection." """ + from django.db import connection def host_act(host_turn, log): with host_turn(): # 1 @@ -254,7 +267,7 @@ def child_act(child_turn, log): assert host_connection_pid != child_connection_pid -@pytest.mark.parametrize("pubsub_backend", PUBSUB_BACKENDS) +# @pytest.mark.parametrize("pubsub_backend", PUBSUB_BACKENDS) @pytest.mark.parametrize( "messages", ( @@ -269,9 +282,7 @@ def child_act(child_turn, log): ) class TestIpcSubscribeFetch: - def test_with( - self, pubsub_backend: pubsub.BasePubSubBackend, messages: list[pubsub.PubsubMessage] - ): + def test_with(self, pubsub_backend, messages): CHANNELS = {m.channel for m in messages} EXPECTED_LOG = [ "subscribe", @@ -329,9 +340,7 @@ def publisher_act(publisher_turn, log): log = IpcUtil.run(subscriber_act, publisher_act) assert log == EXPECTED_LOG - def test_select_readiness_with( - self, pubsub_backend: pubsub.BasePubSubBackend, messages: list[pubsub.PubsubMessage] - ): + def test_select_readiness_with(self, pubsub_backend, messages): TIMEOUT = 0.1 CHANNELS = {m.channel for m in messages} EXPECTED_LOG = [ From 55cbb624d8ab7ebc30f5c610c3ed2dcf796acd53 Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Tue, 5 Aug 2025 10:58:12 -0300 Subject: [PATCH 12/18] Add better error handling for inter-process comm tests --- pulpcore/tests/functional/test_pubsub.py | 96 +++++++++++++++++++----- 1 file changed, 78 insertions(+), 18 deletions(-) diff --git a/pulpcore/tests/functional/test_pubsub.py b/pulpcore/tests/functional/test_pubsub.py index 4b58776ba6..7db191aab7 100644 --- a/pulpcore/tests/functional/test_pubsub.py +++ b/pulpcore/tests/functional/test_pubsub.py @@ -1,7 +1,10 @@ from types import SimpleNamespace from datetime import datetime +import traceback import select import pytest +import sys +import os from typing import NamedTuple from functools import partial from contextlib import contextmanager @@ -10,11 +13,19 @@ @pytest.fixture(autouse=True) -def django_connection_reset(): +def django_connection_reset(django_db_blocker): + # django_db_blocker is from pytest-django. We don't want it to try to safeguard + # us from using our functional Pulp instance. + # https://pytest-django.readthedocs.io/en/latest/database.html#django-db-blocker + django_db_blocker.unblock() + + # If we dont' reset the connections we'll get interference between tests, + # as listen/notify is connection based. from django.db import connections connections.close_all() yield + django_db_blocker.block() def test_postgres_pubsub(): @@ -50,7 +61,7 @@ class PubsubMessage(NamedTuple): @pytest.fixture -def pubsub_backend(): +def pubsub_backend(django_db_blocker): from pulpcore.tasking import pubsub return pubsub.PostgresPubSub @@ -141,6 +152,23 @@ def test_select_readiness_with(self, pubsub_backend, messages): assert subscriber.fetch() == [] +class ProcessErrorData(NamedTuple): + error: Exception + stack_trace: str + + +class RemoteTracebackError(Exception): + """An exception that wraps another exception and its remote traceback string.""" + + def __init__(self, message, remote_traceback): + super().__init__(message) + self.remote_traceback = remote_traceback + + def __str__(self): + """Override __str__ to include the remote traceback when printed.""" + return f"{super().__str__()}\n\n--- Remote Traceback ---\n{self.remote_traceback}" + + class IpcUtil: @staticmethod @@ -167,6 +195,13 @@ def run(host_act, child_act) -> list: conn_2.close() result = IpcUtil.read_log(log) log.close() + if proc_1.exitcode != 0 or proc_2.exitcode != 0: + error = Exception("General exception") + for item in result: + if isinstance(item, ProcessErrorData): + error, stacktrace = item + break + raise Exception(stacktrace) from error return result @staticmethod @@ -174,22 +209,32 @@ def run(host_act, child_act) -> list: def _actor_turn(conn: Connection, starts: bool, log, lock: Lock, done: bool = False): TIMEOUT = 1 - def flush_conn(conn): - if not conn.poll(TIMEOUT): - raise TimeoutError() - conn.recv() + try: - if starts: - with lock: - conn.send("done") - yield - if not done: + def flush_conn(conn): + if not conn.poll(TIMEOUT): + raise TimeoutError() + conn.recv() + + if starts: + with lock: + conn.send("done") + yield + if not done: + flush_conn(conn) + else: flush_conn(conn) - else: - flush_conn(conn) - with lock: - yield - conn.send("done") + with lock: + yield + conn.send("done") + except Exception as e: + traceback.print_exc(file=sys.stderr) + err_header = f"Error from sub-process (pid={os.getpid()}) on test using IpcUtil" + traceback_str = f"{err_header}\n\n{traceback.format_exc()}" + + error = ProcessErrorData(e, traceback_str) + log.put(error) + exit(1) @staticmethod def read_log(log: SimpleQueue) -> list: @@ -199,8 +244,23 @@ def read_log(log: SimpleQueue) -> list: return result -# @pytest.mark.skip -def test_ipc_utils(): +def test_ipc_utils_error_catching(): + + def host_act(host_turn, log): + with host_turn(): + log.put(0) + + def child_act(child_turn, log): + with child_turn(): + log.put(1) + assert 1 == 0 + + error_msg = "AssertionError: assert 1 == 0" + with pytest.raises(Exception, match=error_msg): + IpcUtil.run(host_act, child_act) + + +def test_ipc_utils_correctness(): RUNS = 1000 errors = 0 From 9c269ad9fb755f85a7b5ad0292e66375fe1002c6 Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Tue, 5 Aug 2025 11:44:42 -0300 Subject: [PATCH 13/18] Fix wrong test assertion and fix hanging issue --- pulpcore/tests/functional/test_pubsub.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pulpcore/tests/functional/test_pubsub.py b/pulpcore/tests/functional/test_pubsub.py index 7db191aab7..2537069b73 100644 --- a/pulpcore/tests/functional/test_pubsub.py +++ b/pulpcore/tests/functional/test_pubsub.py @@ -213,7 +213,11 @@ def _actor_turn(conn: Connection, starts: bool, log, lock: Lock, done: bool = Fa def flush_conn(conn): if not conn.poll(TIMEOUT): - raise TimeoutError() + err_msg = ( + "Tip: make sure the last 'with turn()' (in execution order) " + "is called with 'actor_turn(done=True)', otherwise it may hang." + ) + raise TimeoutError(err_msg) conn.recv() if starts: @@ -434,11 +438,11 @@ def subscriber_act(subscriber_turn, log): for channel in CHANNELS: subscriber.unsubscribe(channel) - with subscriber_turn(): # 5 + with subscriber_turn(done=True): # 5 log.put("fetch/select-empty") ready, _, _ = select.select([subscriber], [], [], TIMEOUT) assert subscriber not in ready - assert subscriber.fetch() == messages + assert subscriber.fetch() == [] def publisher_act(publisher_turn, log): publisher = pubsub_backend From 54ef324f489539eed21cd613de9e2a25ecc30d6c Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Tue, 5 Aug 2025 13:08:18 -0300 Subject: [PATCH 14/18] Make pytest-django fixture fix optional --- .../api/using_plugin/test_pagination.py | 14 -------------- pulpcore/tests/functional/test_pubsub.py | 15 +++++++++++---- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/pulpcore/tests/functional/api/using_plugin/test_pagination.py b/pulpcore/tests/functional/api/using_plugin/test_pagination.py index 6a533df462..333a7d3bcc 100644 --- a/pulpcore/tests/functional/api/using_plugin/test_pagination.py +++ b/pulpcore/tests/functional/api/using_plugin/test_pagination.py @@ -3,20 +3,6 @@ import pytest -@pytest.mark.parallel -def test_debugging( - file_bindings, - file_content_unit_with_name_factory, - file_repo, - monitor_task, -): - i = 0 - content_unit = file_content_unit_with_name_factory(f"{i}.iso") - file_bindings.RepositoriesFileApi.modify( - file_repo.pulp_href, {"add_content_units": [content_unit.pulp_href]} - ).task - - @pytest.mark.parallel def test_repo_version_pagination( file_bindings, diff --git a/pulpcore/tests/functional/test_pubsub.py b/pulpcore/tests/functional/test_pubsub.py index 2537069b73..d3b689ba75 100644 --- a/pulpcore/tests/functional/test_pubsub.py +++ b/pulpcore/tests/functional/test_pubsub.py @@ -13,11 +13,17 @@ @pytest.fixture(autouse=True) -def django_connection_reset(django_db_blocker): +def django_connection_reset(request): # django_db_blocker is from pytest-django. We don't want it to try to safeguard # us from using our functional Pulp instance. # https://pytest-django.readthedocs.io/en/latest/database.html#django-db-blocker - django_db_blocker.unblock() + pytest_django_installed = False + try: + django_db_blocker = request.getfixturevalue("django_db_blocker") + django_db_blocker.unblock() + pytest_django_installed = True + except pytest.FixtureLookupError: + pass # If we dont' reset the connections we'll get interference between tests, # as listen/notify is connection based. @@ -25,7 +31,8 @@ def django_connection_reset(django_db_blocker): connections.close_all() yield - django_db_blocker.block() + if pytest_django_installed: + django_db_blocker.block() def test_postgres_pubsub(): @@ -61,7 +68,7 @@ class PubsubMessage(NamedTuple): @pytest.fixture -def pubsub_backend(django_db_blocker): +def pubsub_backend(): from pulpcore.tasking import pubsub return pubsub.PostgresPubSub From 68720f87ab8f7de17a96039bf0e76262ccad2b58 Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Tue, 5 Aug 2025 13:29:03 -0300 Subject: [PATCH 15/18] Clean pubsub test and separate ipc utils --- pulpcore/tests/functional/test_pubsub.py | 200 +++-------------------- pulpcore/tests/functional/test_utils.py | 56 +++++++ pulpcore/tests/functional/utils.py | 93 ++++++++++- 3 files changed, 170 insertions(+), 179 deletions(-) create mode 100644 pulpcore/tests/functional/test_utils.py diff --git a/pulpcore/tests/functional/test_pubsub.py b/pulpcore/tests/functional/test_pubsub.py index d3b689ba75..523cce6d72 100644 --- a/pulpcore/tests/functional/test_pubsub.py +++ b/pulpcore/tests/functional/test_pubsub.py @@ -1,15 +1,9 @@ from types import SimpleNamespace from datetime import datetime -import traceback import select import pytest -import sys -import os -from typing import NamedTuple -from functools import partial -from contextlib import contextmanager -from multiprocessing import Process, Pipe, Lock, SimpleQueue -from multiprocessing.connection import Connection +from pulpcore.tasking import pubsub +from pulpcore.tests.functional.utils import IpcUtil @pytest.fixture(autouse=True) @@ -59,22 +53,13 @@ def test_postgres_pubsub(): assert state.got_message is False -class PubsubMessage(NamedTuple): - channel: str - payload: str +M = pubsub.PubsubMessage +PUBSUB_BACKENDS = [ + pubsub.PostgresPubSub, +] -M = PubsubMessage - - -@pytest.fixture -def pubsub_backend(): - from pulpcore.tasking import pubsub - - return pubsub.PostgresPubSub - - -# @pytest.mark.parametrize("pubsub_backend", PUBSUB_BACKENDS) +@pytest.mark.parametrize("pubsub_backend", PUBSUB_BACKENDS) class TestPublish: @pytest.mark.parametrize( @@ -88,11 +73,11 @@ class TestPublish: pytest.param(True, id="bool"), ), ) - def test_with_payload_as(self, pubsub_backend, payload): + def test_with_payload_as(self, pubsub_backend: pubsub.BasePubSubBackend, payload): pubsub_backend.publish("channel", payload=payload) -# @pytest.mark.parametrize("pubsub_backend", PUBSUB_BACKENDS) +@pytest.mark.parametrize("pubsub_backend", PUBSUB_BACKENDS) @pytest.mark.parametrize( "messages", ( @@ -117,7 +102,9 @@ def publish_all(self, messages, publisher): for channel, payload in messages: publisher.publish(channel, payload=payload) - def test_with(self, pubsub_backend, messages): + def test_with( + self, pubsub_backend: pubsub.BasePubSubBackend, messages: list[pubsub.PubsubMessage] + ): channels = {m.channel for m in messages} publisher = pubsub_backend with pubsub_backend() as subscriber: @@ -128,7 +115,9 @@ def test_with(self, pubsub_backend, messages): self.unsubscribe_all(channels, subscriber) assert subscriber.fetch() == [] - def test_select_readiness_with(self, pubsub_backend, messages): + def test_select_readiness_with( + self, pubsub_backend: pubsub.BasePubSubBackend, messages: list[pubsub.PubsubMessage] + ): TIMEOUT = 0.1 CHANNELS = {m.channel for m in messages} publisher = pubsub_backend @@ -159,155 +148,6 @@ def test_select_readiness_with(self, pubsub_backend, messages): assert subscriber.fetch() == [] -class ProcessErrorData(NamedTuple): - error: Exception - stack_trace: str - - -class RemoteTracebackError(Exception): - """An exception that wraps another exception and its remote traceback string.""" - - def __init__(self, message, remote_traceback): - super().__init__(message) - self.remote_traceback = remote_traceback - - def __str__(self): - """Override __str__ to include the remote traceback when printed.""" - return f"{super().__str__()}\n\n--- Remote Traceback ---\n{self.remote_traceback}" - - -class IpcUtil: - - @staticmethod - def run(host_act, child_act) -> list: - # ensures a connection from one run doesn't interfere with the other - conn_1, conn_2 = Pipe() - log = SimpleQueue() - lock = Lock() - turn_1 = partial(IpcUtil._actor_turn, conn_1, starts=True, log=log, lock=lock) - turn_2 = partial(IpcUtil._actor_turn, conn_2, starts=False, log=log, lock=lock) - proc_1 = Process(target=host_act, args=(turn_1, log)) - proc_2 = Process(target=child_act, args=(turn_2, log)) - proc_1.start() - proc_2.start() - try: - proc_1.join() - finally: - conn_1.send("1") - try: - proc_2.join() - finally: - conn_2.send("1") - conn_1.close() - conn_2.close() - result = IpcUtil.read_log(log) - log.close() - if proc_1.exitcode != 0 or proc_2.exitcode != 0: - error = Exception("General exception") - for item in result: - if isinstance(item, ProcessErrorData): - error, stacktrace = item - break - raise Exception(stacktrace) from error - return result - - @staticmethod - @contextmanager - def _actor_turn(conn: Connection, starts: bool, log, lock: Lock, done: bool = False): - TIMEOUT = 1 - - try: - - def flush_conn(conn): - if not conn.poll(TIMEOUT): - err_msg = ( - "Tip: make sure the last 'with turn()' (in execution order) " - "is called with 'actor_turn(done=True)', otherwise it may hang." - ) - raise TimeoutError(err_msg) - conn.recv() - - if starts: - with lock: - conn.send("done") - yield - if not done: - flush_conn(conn) - else: - flush_conn(conn) - with lock: - yield - conn.send("done") - except Exception as e: - traceback.print_exc(file=sys.stderr) - err_header = f"Error from sub-process (pid={os.getpid()}) on test using IpcUtil" - traceback_str = f"{err_header}\n\n{traceback.format_exc()}" - - error = ProcessErrorData(e, traceback_str) - log.put(error) - exit(1) - - @staticmethod - def read_log(log: SimpleQueue) -> list: - result = [] - while not log.empty(): - result.append(log.get()) - return result - - -def test_ipc_utils_error_catching(): - - def host_act(host_turn, log): - with host_turn(): - log.put(0) - - def child_act(child_turn, log): - with child_turn(): - log.put(1) - assert 1 == 0 - - error_msg = "AssertionError: assert 1 == 0" - with pytest.raises(Exception, match=error_msg): - IpcUtil.run(host_act, child_act) - - -def test_ipc_utils_correctness(): - RUNS = 1000 - errors = 0 - - def host_act(host_turn, log): - with host_turn(): - log.put(0) - - with host_turn(): - log.put(2) - - with host_turn(): - log.put(4) - - def child_act(child_turn, log): - with child_turn(): - log.put(1) - - with child_turn(): - log.put(3) - - with child_turn(): - log.put(5) - - def run(): - log = IpcUtil.run(host_act, child_act) - if log != [0, 1, 2, 3, 4, 5]: - return 1 - return 0 - - for _ in range(RUNS): - errors += run() - - error_rate = errors / RUNS - assert error_rate == 0 - - def test_postgres_backend_ipc(): """Asserts that we are really testing two different connections. @@ -338,7 +178,7 @@ def child_act(child_turn, log): assert host_connection_pid != child_connection_pid -# @pytest.mark.parametrize("pubsub_backend", PUBSUB_BACKENDS) +@pytest.mark.parametrize("pubsub_backend", PUBSUB_BACKENDS) @pytest.mark.parametrize( "messages", ( @@ -353,7 +193,9 @@ def child_act(child_turn, log): ) class TestIpcSubscribeFetch: - def test_with(self, pubsub_backend, messages): + def test_with( + self, pubsub_backend: pubsub.BasePubSubBackend, messages: list[pubsub.PubsubMessage] + ): CHANNELS = {m.channel for m in messages} EXPECTED_LOG = [ "subscribe", @@ -411,7 +253,9 @@ def publisher_act(publisher_turn, log): log = IpcUtil.run(subscriber_act, publisher_act) assert log == EXPECTED_LOG - def test_select_readiness_with(self, pubsub_backend, messages): + def test_select_readiness_with( + self, pubsub_backend: pubsub.BasePubSubBackend, messages: list[pubsub.PubsubMessage] + ): TIMEOUT = 0.1 CHANNELS = {m.channel for m in messages} EXPECTED_LOG = [ diff --git a/pulpcore/tests/functional/test_utils.py b/pulpcore/tests/functional/test_utils.py new file mode 100644 index 0000000000..53548fcc35 --- /dev/null +++ b/pulpcore/tests/functional/test_utils.py @@ -0,0 +1,56 @@ +import pytest +from pulpcore.tests.functional.utils import IpcUtil + + +class TestIpcUtil: + + def test_catch_subprocess_errors(self): + + def host_act(host_turn, log): + with host_turn(): + log.put(0) + + def child_act(child_turn, log): + with child_turn(): + log.put(1) + assert 1 == 0 + + error_msg = "AssertionError: assert 1 == 0" + with pytest.raises(Exception, match=error_msg): + IpcUtil.run(host_act, child_act) + + def test_turns_are_respected(self): + RUNS = 1000 + errors = 0 + + def host_act(host_turn, log): + with host_turn(): + log.put(0) + + with host_turn(): + log.put(2) + + with host_turn(): + log.put(4) + + def child_act(child_turn, log): + with child_turn(): + log.put(1) + + with child_turn(): + log.put(3) + + with child_turn(): + log.put(5) + + def run(): + log = IpcUtil.run(host_act, child_act) + if log != [0, 1, 2, 3, 4, 5]: + return 1 + return 0 + + for _ in range(RUNS): + errors += run() + + error_rate = errors / RUNS + assert error_rate == 0 diff --git a/pulpcore/tests/functional/utils.py b/pulpcore/tests/functional/utils.py index aeab1328ca..85ae6ee318 100644 --- a/pulpcore/tests/functional/utils.py +++ b/pulpcore/tests/functional/utils.py @@ -5,7 +5,14 @@ import hashlib import os import random - +import traceback +import sys +import multiprocessing as mp + +from multiprocessing.connection import Connection +from functools import partial +from contextlib import contextmanager +from typing import NamedTuple from aiohttp import web from dataclasses import dataclass from multidict import CIMultiDict @@ -156,3 +163,87 @@ async def _get_from_url(url, auth=None, headers=None): async with aiohttp.ClientSession(auth=auth) as session: async with session.get(url, ssl=False, headers=headers) as response: return response + + +class ProcessErrorData(NamedTuple): + error: Exception + stack_trace: str + + +class IpcUtil: + + @staticmethod + def run(host_act, child_act) -> list: + # ensures a connection from one run doesn't interfere with the other + conn_1, conn_2 = mp.Pipe() + log = mp.SimpleQueue() + lock = mp.Lock() + turn_1 = partial(IpcUtil._actor_turn, conn_1, starts=True, log=log, lock=lock) + turn_2 = partial(IpcUtil._actor_turn, conn_2, starts=False, log=log, lock=lock) + proc_1 = mp.Process(target=host_act, args=(turn_1, log)) + proc_2 = mp.Process(target=child_act, args=(turn_2, log)) + proc_1.start() + proc_2.start() + try: + proc_1.join() + finally: + conn_1.send("1") + try: + proc_2.join() + finally: + conn_2.send("1") + conn_1.close() + conn_2.close() + result = IpcUtil.read_log(log) + log.close() + if proc_1.exitcode != 0 or proc_2.exitcode != 0: + error = Exception("General exception") + for item in result: + if isinstance(item, ProcessErrorData): + error, stacktrace = item + break + raise Exception(stacktrace) from error + return result + + @staticmethod + @contextmanager + def _actor_turn(conn: Connection, starts: bool, log, lock: mp.Lock, done: bool = False): + TIMEOUT = 1 + + try: + + def flush_conn(conn): + if not conn.poll(TIMEOUT): + err_msg = ( + "Tip: make sure the last 'with turn()' (in execution order) " + "is called with 'actor_turn(done=True)', otherwise it may hang." + ) + raise TimeoutError(err_msg) + conn.recv() + + if starts: + with lock: + conn.send("done") + yield + if not done: + flush_conn(conn) + else: + flush_conn(conn) + with lock: + yield + conn.send("done") + except Exception as e: + traceback.print_exc(file=sys.stderr) + err_header = f"Error from sub-process (pid={os.getpid()}) on test using IpcUtil" + traceback_str = f"{err_header}\n\n{traceback.format_exc()}" + + error = ProcessErrorData(e, traceback_str) + log.put(error) + exit(1) + + @staticmethod + def read_log(log: mp.SimpleQueue) -> list: + result = [] + while not log.empty(): + result.append(log.get()) + return result From a8840c7771e0df349ad4ca28223c56702631ba7f Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Tue, 5 Aug 2025 13:56:56 -0300 Subject: [PATCH 16/18] More test and test_util cleanup --- pulpcore/tests/functional/test_pubsub.py | 201 +++++++++++------------ pulpcore/tests/functional/test_utils.py | 2 +- pulpcore/tests/functional/utils.py | 44 +++-- 3 files changed, 126 insertions(+), 121 deletions(-) diff --git a/pulpcore/tests/functional/test_pubsub.py b/pulpcore/tests/functional/test_pubsub.py index 523cce6d72..96c93ad7c7 100644 --- a/pulpcore/tests/functional/test_pubsub.py +++ b/pulpcore/tests/functional/test_pubsub.py @@ -29,28 +29,58 @@ def django_connection_reset(request): django_db_blocker.block() -def test_postgres_pubsub(): - """Testing postgres low-level implementation.""" - from django.db import connection - - state = SimpleNamespace() - state.got_message = False - with connection.cursor() as cursor: - assert connection.connection is cursor.connection - conn = cursor.connection - # Listen and Notify - conn.execute("LISTEN abc") - conn.add_notify_handler(lambda notification: setattr(state, "got_message", True)) - cursor.execute("NOTIFY abc, 'foo'") - assert state.got_message is True - conn.execute("SELECT 1") - assert state.got_message is True - - # Reset and retry +class TestPostgresSpecifics: + def test_listen_notify_in_same_process(self): + """Testing postgres low-level implementation.""" + from django.db import connection + + state = SimpleNamespace() state.got_message = False - conn.execute("UNLISTEN abc") - cursor.execute("NOTIFY abc, 'foo'") - assert state.got_message is False + with connection.cursor() as cursor: + assert connection.connection is cursor.connection + conn = cursor.connection + # Listen and Notify + conn.execute("LISTEN abc") + conn.add_notify_handler(lambda notification: setattr(state, "got_message", True)) + cursor.execute("NOTIFY abc, 'foo'") + assert state.got_message is True + conn.execute("SELECT 1") + assert state.got_message is True + + # Reset and retry + state.got_message = False + conn.execute("UNLISTEN abc") + cursor.execute("NOTIFY abc, 'foo'") + assert state.got_message is False + + def test_low_level_assumptions_on_multiprocess(self): + """Asserts that we are really testing two different connections. + + From psycopg, the backend_id is: + "The process ID (PID) of the backend process handling this connection." + """ + from django.db import connection + + def host_act(host_turn, log): + with host_turn(): # 1 + assert connection.connection is None + with connection.cursor() as cursor: + cursor.execute("select 1") + assert connection.connection is not None + log.put(connection.connection.info.backend_pid) + + def child_act(child_turn, log): + with child_turn(): # 2 + assert connection.connection is None + with connection.cursor() as cursor: + cursor.execute("select 1") + assert connection.connection is not None + log.put(connection.connection.info.backend_pid) + + log = IpcUtil.run(host_act, child_act) + assert len(log) == 2 + host_connection_pid, child_connection_pid = log + assert host_connection_pid != child_connection_pid M = pubsub.PubsubMessage @@ -59,20 +89,34 @@ def test_postgres_pubsub(): ] +def unsubscribe_all(channels, subscriber): + for channel in channels: + subscriber.unsubscribe(channel) + + +def subscribe_all(channels, subscriber): + for channel in channels: + subscriber.subscribe(channel) + + +def publish_all(messages, publisher): + for channel, payload in messages: + publisher.publish(channel, payload=payload) + + @pytest.mark.parametrize("pubsub_backend", PUBSUB_BACKENDS) +@pytest.mark.parametrize( + "payload", + ( + pytest.param(None, id="none"), + pytest.param("", id="empty-string"), + pytest.param("payload", id="non-empty-string"), + pytest.param(123, id="int"), + pytest.param(datetime.now(), id="datetime"), + pytest.param(True, id="bool"), + ), +) class TestPublish: - - @pytest.mark.parametrize( - "payload", - ( - pytest.param(None, id="none"), - pytest.param("", id="empty-string"), - pytest.param("payload", id="non-empty-string"), - pytest.param(123, id="int"), - pytest.param(datetime.now(), id="datetime"), - pytest.param(True, id="bool"), - ), - ) def test_with_payload_as(self, pubsub_backend: pubsub.BasePubSubBackend, payload): pubsub_backend.publish("channel", payload=payload) @@ -90,29 +134,17 @@ def test_with_payload_as(self, pubsub_backend: pubsub.BasePubSubBackend, payload ), ) class TestNoIpcSubscribeFetch: - def unsubscribe_all(self, channels, subscriber): - for channel in channels: - subscriber.unsubscribe(channel) - - def subscribe_all(self, channels, subscriber): - for channel in channels: - subscriber.subscribe(channel) - - def publish_all(self, messages, publisher): - for channel, payload in messages: - publisher.publish(channel, payload=payload) - def test_with( self, pubsub_backend: pubsub.BasePubSubBackend, messages: list[pubsub.PubsubMessage] ): channels = {m.channel for m in messages} publisher = pubsub_backend with pubsub_backend() as subscriber: - self.subscribe_all(channels, subscriber) - self.publish_all(messages, publisher) + subscribe_all(channels, subscriber) + publish_all(messages, publisher) assert subscriber.fetch() == messages - self.unsubscribe_all(channels, subscriber) + unsubscribe_all(channels, subscriber) assert subscriber.fetch() == [] def test_select_readiness_with( @@ -122,14 +154,14 @@ def test_select_readiness_with( CHANNELS = {m.channel for m in messages} publisher = pubsub_backend with pubsub_backend() as subscriber: - self.subscribe_all(CHANNELS, subscriber) + subscribe_all(CHANNELS, subscriber) assert subscriber.get_subscriptions() == CHANNELS ready, _, _ = select.select([subscriber], [], [], TIMEOUT) assert subscriber not in ready assert subscriber.fetch() == [] - self.publish_all(messages, publisher) + publish_all(messages, publisher) ready, _, _ = select.select([subscriber], [], [], TIMEOUT) assert subscriber in ready @@ -141,49 +173,19 @@ def test_select_readiness_with( assert subscriber not in ready assert subscriber.fetch() == [] - self.unsubscribe_all(CHANNELS, subscriber) - self.publish_all(messages, publisher) + unsubscribe_all(CHANNELS, subscriber) + publish_all(messages, publisher) ready, _, _ = select.select([subscriber], [], [], TIMEOUT) assert subscriber not in ready assert subscriber.fetch() == [] -def test_postgres_backend_ipc(): - """Asserts that we are really testing two different connections. - - From psycopg, the backend_id is: - "The process ID (PID) of the backend process handling this connection." - """ - from django.db import connection - - def host_act(host_turn, log): - with host_turn(): # 1 - assert connection.connection is None - with connection.cursor() as cursor: - cursor.execute("select 1") - assert connection.connection is not None - log.put(connection.connection.info.backend_pid) - - def child_act(child_turn, log): - with child_turn(): # 2 - assert connection.connection is None - with connection.cursor() as cursor: - cursor.execute("select 1") - assert connection.connection is not None - log.put(connection.connection.info.backend_pid) - - log = IpcUtil.run(host_act, child_act) - assert len(log) == 2 - host_connection_pid, child_connection_pid = log - assert host_connection_pid != child_connection_pid - - @pytest.mark.parametrize("pubsub_backend", PUBSUB_BACKENDS) @pytest.mark.parametrize( "messages", ( pytest.param([M("a", "A1")], id="single-message"), - pytest.param([M("a", "A1")], id="test-leaking"), + pytest.param([M("a", "A1")], id="test-if-leaking"), pytest.param([M("b", "B1"), M("b", "B2")], id="two-messages-in-same-channel"), pytest.param( [M("c", "C1"), M("c", "C2"), M("d", "D1"), M("d", "D1")], @@ -212,8 +214,7 @@ def subscriber_act(subscriber_turn, log): with pubsub_backend() as subscriber: with subscriber_turn(): # 1 log.put("subscribe") - for channel in CHANNELS: - subscriber.subscribe(channel) + subscribe_all(CHANNELS, subscriber) with subscriber_turn(): # 3 log.put("fetch") @@ -225,8 +226,7 @@ def subscriber_act(subscriber_turn, log): log.put("fetch+unsubscribe") assert subscriber.fetch() == messages assert subscriber.fetch() == [] - for channel in CHANNELS: - subscriber.unsubscribe(channel) + unsubscribe_all(CHANNELS, subscriber) with subscriber_turn(done=True): # 7 log.put("fetch-empty") @@ -235,20 +235,17 @@ def subscriber_act(subscriber_turn, log): # child def publisher_act(publisher_turn, log): publisher = pubsub_backend - with publisher_turn(): + with publisher_turn(): # 2 log.put("publish") - for message in messages: # 2 - publisher.publish(message.channel, payload=message.payload) + publish_all(messages, publisher) - with publisher_turn(): + with publisher_turn(): # 4 log.put("publish") - for message in messages: # 4 - publisher.publish(message.channel, payload=message.payload) + publish_all(messages, publisher) - with publisher_turn(): + with publisher_turn(): # 6 log.put("publish") - for message in messages: # 6 - publisher.publish(message.channel, payload=message.payload) + publish_all(messages, publisher) log = IpcUtil.run(subscriber_act, publisher_act) assert log == EXPECTED_LOG @@ -270,8 +267,7 @@ def subscriber_act(subscriber_turn, log): with pubsub_backend() as subscriber: with subscriber_turn(): # 1 log.put("subscribe/select-empty") - for channel in CHANNELS: - subscriber.subscribe(channel) + subscribe_all(CHANNELS, subscriber) assert subscriber.get_subscriptions() == CHANNELS ready, _, _ = select.select([subscriber], [], [], TIMEOUT) assert subscriber not in ready @@ -286,8 +282,7 @@ def subscriber_act(subscriber_turn, log): assert subscriber in ready assert subscriber.fetch() == messages assert subscriber.fetch() == [] - for channel in CHANNELS: - subscriber.unsubscribe(channel) + unsubscribe_all(CHANNELS, subscriber) with subscriber_turn(done=True): # 5 log.put("fetch/select-empty") @@ -299,13 +294,11 @@ def publisher_act(publisher_turn, log): publisher = pubsub_backend with publisher_turn(): # 2 log.put("publish") - for message in messages: - publisher.publish(message.channel, payload=message.payload) + publish_all(messages, publisher) with publisher_turn(): # 4 log.put("publish") - for message in messages: - publisher.publish(message.channel, payload=message.payload) + publish_all(messages, publisher) log = IpcUtil.run(subscriber_act, publisher_act) assert log == EXPECTED_LOG diff --git a/pulpcore/tests/functional/test_utils.py b/pulpcore/tests/functional/test_utils.py index 53548fcc35..37efe69c14 100644 --- a/pulpcore/tests/functional/test_utils.py +++ b/pulpcore/tests/functional/test_utils.py @@ -19,7 +19,7 @@ def child_act(child_turn, log): with pytest.raises(Exception, match=error_msg): IpcUtil.run(host_act, child_act) - def test_turns_are_respected(self): + def test_turns_are_deterministic(self): RUNS = 1000 errors = 0 diff --git a/pulpcore/tests/functional/utils.py b/pulpcore/tests/functional/utils.py index 85ae6ee318..9b5bf789ee 100644 --- a/pulpcore/tests/functional/utils.py +++ b/pulpcore/tests/functional/utils.py @@ -167,14 +167,31 @@ async def _get_from_url(url, auth=None, headers=None): class ProcessErrorData(NamedTuple): error: Exception - stack_trace: str + stacktrace: str class IpcUtil: + TIMEOUT_ERROR_MESSAGE = ( + "Tip: make sure the last 'with turn()' (in execution order) " + "is called with 'actor_turn(done=True)', otherwise it may hang." + ) + SUBPROCESS_ERROR_HEADER_TEMPLATE = "Error from sub-process (pid={pid}) on test using IpcUtil" + TURN_WAIT_TIMEOUT = 1 @staticmethod def run(host_act, child_act) -> list: - # ensures a connection from one run doesn't interfere with the other + """Run two processes in synchronous alternate turns. + + The act are functions with the signature (act_turn, log), where act_turn is + a context manager where each step of the act takes place, and log is a + queue where each actor can put messages in using Q.put(item). + + Args: + host_act: The function of the act that start the communication + child_act: The function of the act that follows host_act + Returns: + A list with the items collected via log. + """ conn_1, conn_2 = mp.Pipe() log = mp.SimpleQueue() lock = mp.Lock() @@ -198,6 +215,7 @@ def run(host_act, child_act) -> list: log.close() if proc_1.exitcode != 0 or proc_2.exitcode != 0: error = Exception("General exception") + stacktrace = "No stacktrace" for item in result: if isinstance(item, ProcessErrorData): error, stacktrace = item @@ -208,19 +226,12 @@ def run(host_act, child_act) -> list: @staticmethod @contextmanager def _actor_turn(conn: Connection, starts: bool, log, lock: mp.Lock, done: bool = False): - TIMEOUT = 1 + def flush_conn(conn: Connection): + if not conn.poll(IpcUtil.TURN_WAIT_TIMEOUT): + raise TimeoutError(IpcUtil.TIMEOUT_ERROR_MESSAGE) + conn.recv() try: - - def flush_conn(conn): - if not conn.poll(TIMEOUT): - err_msg = ( - "Tip: make sure the last 'with turn()' (in execution order) " - "is called with 'actor_turn(done=True)', otherwise it may hang." - ) - raise TimeoutError(err_msg) - conn.recv() - if starts: with lock: conn.send("done") @@ -234,9 +245,8 @@ def flush_conn(conn): conn.send("done") except Exception as e: traceback.print_exc(file=sys.stderr) - err_header = f"Error from sub-process (pid={os.getpid()}) on test using IpcUtil" - traceback_str = f"{err_header}\n\n{traceback.format_exc()}" - + error_header = IpcUtil.SUBPROCESS_ERROR_HEADER_TEMPLATE.format(pid=os.getpid()) + traceback_str = f"{error_header}\n\n{traceback.format_exc()}" error = ProcessErrorData(e, traceback_str) log.put(error) exit(1) @@ -246,4 +256,6 @@ def read_log(log: mp.SimpleQueue) -> list: result = [] while not log.empty(): result.append(log.get()) + for item in result: + log.put(item) return result From cecf9439e00d6e162115de2a9d5695364f5eac1f Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Tue, 5 Aug 2025 14:07:46 -0300 Subject: [PATCH 17/18] Some adjustments to pubsub module --- pulpcore/tasking/pubsub.py | 31 ++++++++++++++++++++----------- pulpcore/tasking/worker.py | 2 -- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/pulpcore/tasking/pubsub.py b/pulpcore/tasking/pubsub.py index 55ec9b0b7d..6a7b7d3518 100644 --- a/pulpcore/tasking/pubsub.py +++ b/pulpcore/tasking/pubsub.py @@ -30,6 +30,9 @@ def subscribe(self, channel): def unsubscribe(self, channel): raise NotImplementedError() + def get_subscriptions(self): + raise NotImplementedError() + @classmethod def publish(cls, channel, payload=None): raise NotImplementedError() @@ -57,35 +60,35 @@ def drain_non_blocking_fd(fd): os.read(fd, 256) -PID = os.getpid() - - class PostgresPubSub(BasePubSubBackend): + PID = os.getpid() def __init__(self): self._subscriptions = set() self.message_buffer = [] - self.cursor = connection.cursor() + # ensures a connection is initialized + with connection.cursor() as cursor: + cursor.execute("select 1") self.backend_pid = connection.connection.info.backend_pid - # logger.info(f"{connection.connection.info.backend_pid=}") self.sentinel_r, self.sentinel_w = os.pipe() os.set_blocking(self.sentinel_r, False) os.set_blocking(self.sentinel_w, False) connection.connection.add_notify_handler(self._store_messages) + @classmethod + def _debug(cls, message): + logger.debug(f"[{cls.PID}] {message}") + def _store_messages(self, notification): - # logger.info(f"[{PID}] Received message: {notification}") self.message_buffer.append( PubsubMessage(channel=notification.channel, payload=notification.payload) ) if notification.pid == self.backend_pid: os.write(self.sentinel_w, b"1") - - def get_subscriptions(self): - return self._subscriptions.copy() + self._debug(f"Received message: {notification}") @classmethod - def publish(cls, channel, payload=None): + def publish(cls, channel, payload=""): query = ( (f"NOTIFY {channel}",) if not payload @@ -94,6 +97,7 @@ def publish(cls, channel, payload=None): with connection.cursor() as cursor: cursor.execute(*query) + cls._debug(f"Sent message: ({channel}, {str(payload)})") def subscribe(self, channel): self._subscriptions.add(channel) @@ -108,7 +112,12 @@ def unsubscribe(self, channel): with connection.cursor() as cursor: cursor.execute(f"UNLISTEN {channel}") + def get_subscriptions(self): + return self._subscriptions.copy() + def fileno(self) -> int: + # when pub/sub clients are the same, the notification callback may be called + # asynchronously, making select on connection miss new notifications ready, _, _ = select.select([self.sentinel_r], [], [], 0) if self.sentinel_r in ready: return self.sentinel_r @@ -120,7 +129,7 @@ def fetch(self) -> list[PubsubMessage]: result = self.message_buffer.copy() self.message_buffer.clear() drain_non_blocking_fd(self.sentinel_r) - # logger.info(f"[{PID}] Fetched messages: {result}") + self._debug(f"Fetched messages: {result}") return result def close(self): diff --git a/pulpcore/tasking/worker.py b/pulpcore/tasking/worker.py index 5688852348..4e054256e2 100644 --- a/pulpcore/tasking/worker.py +++ b/pulpcore/tasking/worker.py @@ -633,12 +633,10 @@ def run(self, burst=False): # do work if self.shutdown_requested: break - # _logger.info(_("=== Worker %s will handle unblocked tasks. ==="), self.name) self.handle_unblocked_tasks() if self.shutdown_requested: break # rest until notified to wakeup - # _logger.info(_("*** Worker %s entering sleep state. ***"), self.name) self.sleep() self.pubsub_teardown() self.shutdown() From fd3e32efecd9895d78f8a9b6c9ff0f08107e66ff Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Thu, 25 Sep 2025 12:10:55 -0300 Subject: [PATCH 18/18] Fix linting errors --- pulpcore/tasking/tasks.py | 1 - pulpcore/tasking/worker.py | 1 - 2 files changed, 2 deletions(-) diff --git a/pulpcore/tasking/tasks.py b/pulpcore/tasking/tasks.py index 65422dd125..023ba3a196 100644 --- a/pulpcore/tasking/tasks.py +++ b/pulpcore/tasking/tasks.py @@ -26,7 +26,6 @@ TASK_INCOMPLETE_STATES, TASK_STATES, IMMEDIATE_TIMEOUT, - TASK_WAKEUP_HANDLE, TASK_WAKEUP_UNBLOCK, ) from pulpcore.middleware import x_task_diagnostics_var diff --git a/pulpcore/tasking/worker.py b/pulpcore/tasking/worker.py index 4e054256e2..956c41ff7b 100644 --- a/pulpcore/tasking/worker.py +++ b/pulpcore/tasking/worker.py @@ -79,7 +79,6 @@ def __init__(self, auxiliary=False): self.heartbeat_period = timedelta(seconds=settings.WORKER_TTL / 3) self.last_metric_heartbeat = timezone.now() self.versions = {app.label: app.version for app in pulp_plugin_configs()} - self.cursor = connection.cursor() self.app_status = AppStatus.objects.create( name=self.name, app_type="worker", versions=self.versions )