diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 9b202bd4..f739d297 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -19,7 +19,7 @@ jobs: - uses: actions/checkout@v4 - name: Build and Check Package - uses: hynek/build-and-inspect-python-package@v2.8 + uses: hynek/build-and-inspect-python-package@v2.9 deploy: needs: package @@ -39,7 +39,9 @@ jobs: path: dist - name: Publish package to PyPI - uses: pypa/gh-action-pypi-publish@v1.9.0 + uses: pypa/gh-action-pypi-publish@v1.10.1 + with: + attestations: true - name: Push tag run: | diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b81f37e3..ca33e3a1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -22,7 +22,7 @@ jobs: steps: - uses: actions/checkout@v4 - name: Build and Check Package - uses: hynek/build-and-inspect-python-package@v2.8 + uses: hynek/build-and-inspect-python-package@v2.9 test: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index cb240bdb..6fffdf2e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/astral-sh/ruff-pre-commit - rev: "v0.6.1" + rev: "v0.6.5" hooks: - id: ruff args: ["--fix"] @@ -23,7 +23,7 @@ repos: language: python additional_dependencies: [pygments, restructuredtext_lint] - repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.11.1 + rev: v1.11.2 hooks: - id: mypy files: ^(src/|testing/) diff --git a/src/xdist/dsession.py b/src/xdist/dsession.py index 62079a28..2eff34c1 100644 --- a/src/xdist/dsession.py +++ b/src/xdist/dsession.py @@ -8,6 +8,7 @@ from typing import Any from typing import Sequence import warnings +import traceback import execnet import pytest @@ -21,6 +22,7 @@ from xdist.scheduler import LoadScopeScheduling from xdist.scheduler import Scheduling from xdist.scheduler import WorkStealingScheduling +from xdist.scheduler import CustomGroup from xdist.workermanage import NodeManager from xdist.workermanage import WorkerController @@ -58,10 +60,14 @@ def __init__(self, config: pytest.Config) -> None: self._failed_collection_errors: dict[object, bool] = {} self._active_nodes: set[WorkerController] = set() self._failed_nodes_count = 0 + self.saved_put = None + self.remake_nodes = False + self.ready_to_run_tests = False self._max_worker_restart = get_default_max_worker_restart(self.config) # summary message to print at the end of the session self._summary_report: str | None = None self.terminal = config.pluginmanager.getplugin("terminalreporter") + self.worker_status: dict[WorkerController, str] = {} if self.terminal: self.trdist = TerminalDistReporter(config) config.pluginmanager.register(self.trdist, "terminaldistreporter") @@ -87,6 +93,7 @@ def pytest_sessionstart(self, session: pytest.Session) -> None: soon as nodes start they will emit the worker_workerready event. """ self.nodemanager = NodeManager(self.config) + self.saved_put = self.queue.put nodes = self.nodemanager.setup_nodes(putevent=self.queue.put) self._active_nodes.update(nodes) self._session = session @@ -123,6 +130,8 @@ def pytest_xdist_make_scheduler( return LoadGroupScheduling(config, log) if dist == "worksteal": return WorkStealingScheduling(config, log) + if dist == "customgroup": + return CustomGroup(config, log) return None @pytest.hookimpl @@ -147,14 +156,19 @@ def loop_once(self) -> None: """Process one callback from one of the workers.""" while 1: if not self._active_nodes: - # If everything has died stop looping - self.triggershutdown() - raise RuntimeError("Unexpectedly no active workers available") + # Worker teardown + recreation only occurs for CustomGroup Scheduler + if isinstance(self.sched, CustomGroup) and self.remake_nodes: + pass + else: + # We aren't using CustomGroup scheduler and everything has died: stop looping + self.triggershutdown() + raise RuntimeError("Unexpectedly no active workers available") try: eventcall = self.queue.get(timeout=2.0) break except Empty: continue + callname, kwargs = eventcall assert callname, kwargs method = "worker_" + callname @@ -165,6 +179,71 @@ def loop_once(self) -> None: if self.sched.tests_finished: self.triggershutdown() + + def is_node_finishing(self, node: WorkerController): + """Check if a test worker is considered to be finishing. + + Evaluate whether it's on its last test, or if no tests are pending. + """ + pending = self.sched.node2pending.get(node) + return pending is not None and len(pending) < 2 + + + def is_node_clear(self, node: WorkerController): + """Check if a test worker has no pending tests.""" + pending = self.sched.node2pending.get(node) + return pending is None or len(pending) == 0 + + + def are_all_nodes_finishing(self): + """Check if all workers are finishing (See 'is_node_finishing' above).""" + return all(self.is_node_finishing(node) for node in self.sched.nodes) + + + def are_all_nodes_done(self): + """Check if all nodes have reported to finish.""" + return all(s == "finished" for s in self.worker_status.values()) + + + def are_all_active_nodes_collected(self): + """Check if all nodes have reported collection to be complete.""" + if not all(n.gateway.id in self.worker_status for n in self._active_nodes): + return False + return all(self.worker_status[n.gateway.id] == "collected" for n in self._active_nodes) + + + def reset_nodes_if_needed(self): + if self.are_all_nodes_finishing() and self.ready_to_run_tests and not self.sched.do_resched: + self.reset_nodes() + + + def reset_nodes(self): + """Issue shutdown notices to workers for rescheduling purposes.""" + if len(self.sched.pending) != 0: + self.remake_nodes = True + for node in self.sched.nodes: + if self.is_node_finishing(node): + node.shutdown() + + + def reschedule(self): + """Reschedule tests.""" + self.sched.do_resched = False + self.sched.check_schedule(self.sched.nodes[0], 1.0, True) + + + def prepare_for_reschedule(self): + """Update test workers and their status tracking so rescheduling is ready.""" + self.remake_nodes = False + num_workers = self.sched.dist_groups[self.sched.pending_groups[0]]['group_workers'] + self.trdist._status = {} + new_nodes = self.nodemanager.setup_nodes(self.saved_put, num_workers) + self.worker_status = {} + self._active_nodes = set() + self._active_nodes.update(new_nodes) + self.sched.node2pending = {} + self.sched.do_resched = True + # # callbacks for processing events from workers # @@ -182,6 +261,7 @@ def worker_workerready( node.workerinfo = workerinfo node.workerinfo["id"] = node.gateway.id node.workerinfo["spec"] = node.gateway.spec + self.update_worker_status(node, "ready") self.config.hook.pytest_testnodeready(node=node) if self.shuttingdown: @@ -198,6 +278,17 @@ def worker_workerfinished(self, node: WorkerController) -> None: The node might not be in the scheduler if it had not emitted workerready before shutdown was triggered. """ + self.update_worker_status(node, "finished") + + if isinstance(self.sched, CustomGroup) and self.remake_nodes: + node.ensure_teardown() + self._active_nodes.remove(node) + if self.are_all_nodes_done(): + try: + self.prepare_for_reschedule() + except Exception as e: + self.shouldstop = f"Exception caught during preparation for rescheduling. Giving up.\n{''.join(traceback.format_exception(e))}" + return self.config.hook.pytest_testnodedown(node=node, error=None) if node.workeroutput["exitstatus"] == 2: # keyboard-interrupt self.shouldstop = f"{node} received keyboard-interrupt" @@ -217,6 +308,13 @@ def worker_workerfinished(self, node: WorkerController) -> None: assert not crashitem, (crashitem, node) self._active_nodes.remove(node) + def update_worker_status(self, node, status): + """Track the worker status. + + Can be used at callbacks like 'worker_workerfinished' so we remember wchic event was reported last by each worker. + """ + self.worker_status[node.workerinfo["id"]] = status + def worker_internal_error( self, node: WorkerController, formatted_error: str ) -> None: @@ -283,7 +381,10 @@ def worker_collectionfinish( scheduling the first time it logs which scheduler is in use. """ if self.shuttingdown: + self.report_line(f"[-] [dse] collectionfinish while closing {node.gateway.id}") return + self.update_worker_status(node, "collected") + self.config.hook.pytest_xdist_node_collection_finished(node=node, ids=ids) # tell session which items were effectively collected otherwise # the controller node will finish the session with EXIT_NOTESTSCOLLECTED @@ -300,10 +401,18 @@ def worker_collectionfinish( self.trdist.ensure_show_status() self.terminal.write_line("") if self.config.option.verbose > 0: - self.terminal.write_line( - f"scheduling tests via {self.sched.__class__.__name__}" - ) - self.sched.schedule() + self.report_line(f"[-] [dse] scheduling tests via {self.sched.__class__.__name__}") + if isinstance(self.sched, CustomGroup): + if self.ready_to_run_tests and self.are_all_active_nodes_collected(): + # we're coming back here after finishing a batch of tests - so start the next batch + self.reschedule() + self.reset_nodes_if_needed() + else: + self.ready_to_run_tests = True + self.sched.schedule() + self.reset_nodes_if_needed() + else: + self.sched.schedule() def worker_logstart( self, @@ -339,6 +448,12 @@ def worker_runtest_protocol_complete( """ assert self.sched is not None self.sched.mark_test_complete(node, item_index, duration) + if isinstance(self.sched, CustomGroup): + if self.are_all_nodes_finishing(): + if self.shouldstop: + self.report_line("Won't reschedule - should stop.") + else: + self.reset_nodes() def worker_unscheduled( self, node: WorkerController, indices: Sequence[int] diff --git a/src/xdist/plugin.py b/src/xdist/plugin.py index f670d9de..a600a705 100644 --- a/src/xdist/plugin.py +++ b/src/xdist/plugin.py @@ -108,6 +108,7 @@ def pytest_addoption(parser: pytest.Parser) -> None: "loadfile", "loadgroup", "worksteal", + "customgroup", "no", ], dest="dist", @@ -124,6 +125,8 @@ def pytest_addoption(parser: pytest.Parser) -> None: "loadgroup: Like 'load', but sends tests marked with 'xdist_group' to the same worker.\n\n" "worksteal: Split the test suite between available environments," " then re-balance when any worker runs out of tests.\n\n" + # TODO: Update docstring + "customgroup: TODO: add docs here" "(default) no: Run tests inprocess, don't distribute." ), ) diff --git a/src/xdist/remote.py b/src/xdist/remote.py index dd1f9883..0ec9047d 100644 --- a/src/xdist/remote.py +++ b/src/xdist/remote.py @@ -201,15 +201,17 @@ def run_one_test(self) -> None: "runtest_protocol_complete", item_index=self.item_index, duration=duration ) + @pytest.mark.trylast def pytest_collection_modifyitems( self, config: pytest.Config, items: list[pytest.Item], ) -> None: # add the group name to nodeid as suffix if --dist=loadgroup - if config.getvalue("loadgroup"): + if config.getvalue("loadgroup") or config.getvalue("customgroup"): + functional_mark = "xdist_group" if config.getvalue("loadgroup") else "xdist_custom" for item in items: - mark = item.get_closest_marker("xdist_group") + mark = item.get_closest_marker(functional_mark) if not mark: continue gname = ( @@ -357,6 +359,7 @@ def getinfodict() -> WorkerInfo: def setup_config(config: pytest.Config, basetemp: str | None) -> None: config.option.loadgroup = config.getvalue("dist") == "loadgroup" + config.option.customgroup = config.getvalue("dist") == "customgroup" config.option.looponfail = False config.option.usepdb = False config.option.dist = "no" diff --git a/src/xdist/scheduler/__init__.py b/src/xdist/scheduler/__init__.py index b4894732..34b791d7 100644 --- a/src/xdist/scheduler/__init__.py +++ b/src/xdist/scheduler/__init__.py @@ -1,5 +1,6 @@ from xdist.scheduler.each import EachScheduling as EachScheduling from xdist.scheduler.load import LoadScheduling as LoadScheduling +from xdist.scheduler.customgroup import CustomGroup as CustomGroup from xdist.scheduler.loadfile import LoadFileScheduling as LoadFileScheduling from xdist.scheduler.loadgroup import LoadGroupScheduling as LoadGroupScheduling from xdist.scheduler.loadscope import LoadScopeScheduling as LoadScopeScheduling diff --git a/src/xdist/scheduler/customgroup.py b/src/xdist/scheduler/customgroup.py new file mode 100644 index 00000000..fec7001b --- /dev/null +++ b/src/xdist/scheduler/customgroup.py @@ -0,0 +1,399 @@ +from __future__ import annotations + +from itertools import cycle +from typing import Sequence, Any + +import pytest + +from xdist.remote import Producer +from xdist.report import report_collection_diff +from xdist.workermanage import parse_spec_config +from xdist.workermanage import WorkerController + +class CustomGroup: + """Implement grouped load scheduling across a variable number of nodes. + + This distributes tests into groups based on the presence of xdist_custom pytest marks. + Groups are ran sequentially with tests within each group running in parallel. + The number of workers assigned to each group is based on the xdist_custom pytest mark. + Tests without the xdist_custom pytest mark are assigned to a "default" group and run + using all available workers. + + Example: + Consider 12 pytest test cases. + - 4 test cases are marked with @pytest.mark.xdist_custom(name="low_4") + - 2 test cases are marked with @pytest.mark.xdist_custom(name="med_2") + - 2 test cases are marked with @pytest.mark.xdist_custom(name="high_1") + - 4 test cases are not marked with a xdist_custom mark. + Consider the pytest run was initiated with 4 workers (-n 4) + - The 4 test cases marked with "low_4" would run in a group using 4 workers + - The 2 test cases marked with "med_2" would run in a group using 2 workers + - The 2 test cases marked with "high_1" would run in a group with 1 worker + - The 4 unmarked test cases would run in a group using 4 workers. + Only one group would run at any given time. For example, while the "high_1" tests are executing, + the other pending test groups would not be scheduled or excuting. The order in which groups + are executed is variable. For example, "high_1" may execute first, or it may execute second, etc. + If a group pytest mark specifies more workers than the pytest run is initialized with the + number of workers the run was initialized with will be used instead (-n argument is a maximum). + + Attributes:: + + :terminal: Terminal reporter for writing terminal output + + :numnodes: The expected number of nodes taking part. The actual + number of nodes will vary during the scheduler's lifetime as + nodes are added by the DSession as they are brought up and + removed either because of a dead node or normal shutdown. This + number is primarily used to know when the initial collection is + completed. + + :node2collection: Map of nodes and their test collection. All + collections should always be identical. + + :node2pending: Map of nodes and the indices of their pending + tests. The indices are an index into ``.pending`` (which is + identical to their own collection stored in + ``.node2collection``). + + :pending: List of indices of globally pending tests. These are + tests which have not yet been allocated to a chunk for a node + to process. + + :collection: The one collection once it is validated to be + identical between all the nodes. It is initialised to None + until ``.schedule()`` is called. + + :log: A py.log.Producer instance. + + :config: Config object, used for handling hooks. + + :dist_groups: Execution groups. Updated based on xdist_custom pytest marks. + Maps group names to tests, test indices, pending indices, and stores the number of workers to use + for that test execution group. + + :pending_groups: List of dist_group keys that are pending + + :is_first_time: Boolean to track whether we have called schedule() before or not + + :do_resched: Boolean to track whether we should schedule another distribution group. + Accessed in dsession.py + """ + + def __init__(self, config: pytest.Config, log: Producer | None = None) -> None: + self.terminal = config.pluginmanager.getplugin("terminalreporter") + self.numnodes = len(parse_spec_config(config)) + self.node2collection: dict[WorkerController, list[str]] = {} + self.node2pending: dict[WorkerController, list[int]] = {} + self.pending: list[int] = [] + self.collection: list[str] | None = None + if log is None: + self.log = Producer("loadsched") + else: + self.log = log.loadsched + self.config = config + self.dist_groups: dict[str, Any] = {} + self.pending_groups: list[str] = [] + self.is_first_time: bool = True + self.do_resched: bool = False + + @property + def nodes(self) -> list[WorkerController]: + """A list of all nodes in the scheduler.""" + return list(self.node2pending.keys()) + + @property + def collection_is_completed(self) -> bool: + """Boolean indication initial test collection is complete. + + This is a boolean indicating all initial participating nodes + have finished collection. The required number of initial + nodes is defined by ``.numnodes``. + """ + return len(self.node2collection) >= self.numnodes + + @property + def tests_finished(self) -> bool: + """Return True if all tests have been executed by the nodes.""" + if not self.collection_is_completed: + return False + if self.pending: + return False + for pending in self.node2pending.values(): + if len(pending) >= 2: + return False + return True + + @property + def has_pending(self) -> bool: + """Return True if there are pending test items. + + This indicates that collection has finished and nodes are + still processing test items, so this can be thought of as + "the scheduler is active". + """ + if self.pending: + return True + for pending in self.node2pending.values(): + if pending: + return True + return False + + def add_node(self, node: WorkerController) -> None: + """Add a new node to the scheduler. + + From now on the node will be allocated chunks of tests to + execute. + + Called by the ``DSession.worker_workerready`` hook when it + successfully bootstraps a new node. + """ + assert node not in self.node2pending + self.node2pending[node] = [] + + def add_node_collection( + self, node: WorkerController, collection: Sequence[str] + ) -> None: + """Add the collected test items from a node. + + The collection is stored in the ``.node2collection`` map. + Called by the ``DSession.worker_collectionfinish`` hook. + """ + assert node in self.node2pending + if self.collection_is_completed: + # A new node has been added later, perhaps an original one died. + # .schedule() should have + # been called by now + assert self.collection + if collection != self.collection: + other_node = next(iter(self.node2collection.keys())) + msg = report_collection_diff( + self.collection, collection, other_node.gateway.id, node.gateway.id + ) + self.log(msg) + return + self.node2collection[node] = list(collection) + + def mark_test_complete( + self, node: WorkerController, item_index: int, duration: float = 0 + ) -> None: + """Mark test item as completed by node. + + The duration it took to execute the item is used as a hint to + the scheduler. + + This is called by the ``DSession.worker_testreport`` hook. + """ + self.node2pending[node].remove(item_index) + self.check_schedule(node, duration=duration) + + def mark_test_pending(self, item: str) -> None: + + assert self.collection is not None + self.pending.insert( + 0, + self.collection.index(item), + ) + for node in self.node2pending: + self.check_schedule(node) + + def remove_pending_tests_from_node( + self, + node: WorkerController, + indices: Sequence[int], + ) -> None: + raise NotImplementedError() + + def check_schedule(self, node: WorkerController, duration: float = 0, from_dsession=False) -> None: + """Maybe schedule new items on the node. + + If there are any globally pending nodes left then this will + check if the given node should be given any more tests. The + ``duration`` of the last test is optionally used as a + heuristic to influence how many tests the node is assigned. + """ + if node.shutting_down: + self.report_line(f"[-] [csg] {node.workerinput['workerid']} is already shutting down") + return + + if self.pending: + any_working = False + for node in self.nodes: + if len(self.node2pending[node]) not in [0, 1]: + any_working = True + + if not any_working and from_dsession: + if self.pending_groups: + dist_group_key = self.pending_groups.pop(0) + dist_group = self.dist_groups[dist_group_key] + nodes = cycle(self.nodes[0:dist_group['group_workers']]) + schedule_log = {n.gateway.id:[] for n in self.nodes[0:dist_group['group_workers']]} + for _ in range(len(dist_group['test_indices'])): + n = next(nodes) + #needs cleaner way to be identified + tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:1] + schedule_log[n.gateway.id].extend(tests_per_node) + + self._send_tests_group(n, 1, dist_group_key) + del self.dist_groups[dist_group_key] + message = f"\n[-] [csg] check_schedule: processed scheduling for {dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid,nt in schedule_log.items()])}" + self.report_line(message) + + else: + pending = self.node2pending.get(node) + if len(pending) < 2: + self.report_line(f"[-] [csg] Shutting down {node.workerinput['workerid']} because only one case is pending") + node.shutdown() + + self.log("num items waiting for node:", len(self.pending)) + + def remove_node(self, node: WorkerController) -> str | None: + """Remove a node from the scheduler. + + This should be called either when the node crashed or at + shutdown time. In the former case any pending items assigned + to the node will be re-scheduled. Called by the + ``DSession.worker_workerfinished`` and + ``DSession.worker_errordown`` hooks. + + Return the item which was being executing while the node + crashed or None if the node has no more pending items. + + """ + pending = self.node2pending.pop(node) + if not pending: + return None + + # The node crashed, reassing pending items + assert self.collection is not None + crashitem = self.collection[pending.pop(0)] + self.pending.extend(pending) + for node in self.node2pending: + self.check_schedule(node) + return crashitem + + def schedule(self) -> None: + """Initiate distribution of the test collection. + + Initiate scheduling of the items across the nodes. If this + gets called again later it behaves the same as calling + ``.check_schedule()`` on all nodes so that newly added nodes + will start to be used. + + This is called by the ``DSession.worker_collectionfinish`` hook + if ``.collection_is_completed`` is True. + """ + assert self.collection_is_completed + + # Initial distribution already happened, reschedule on all nodes + if self.collection is not None: + for node in self.nodes: + self.check_schedule(node) + return + + # XXX allow nodes to have different collections + if not self._check_nodes_have_same_collection(): + self.log("**Different tests collected, aborting run**") + return + + # Collections are identical, create the index of pending items. + self.collection = next(iter(self.node2collection.values())) + self.pending[:] = range(len(self.collection)) + if not self.collection: + return + + dist_groups = {} + + if self.is_first_time: + for i, test in enumerate(self.collection): + if '@' in test: + group_mark = test.split('@')[-1] + group_workers = int(group_mark.split('_')[-1]) + if group_workers > len(self.nodes): + # We can only distribute across as many nodes as we have available + # If a group requests more, we fallback to our actual max + group_workers = len(self.nodes) + else: + group_mark = 'default' + group_workers = len(self.nodes) + existing_tests = dist_groups.get(group_mark, {}).get('tests', []) + existing_tests.append(test) + existing_indices = dist_groups.get(group_mark, {}).get('test_indices', []) + existing_indices.append(i) + + dist_groups[group_mark] = { + 'tests': existing_tests, + 'group_workers': group_workers, + 'test_indices': existing_indices, + 'pending_indices': existing_indices + } + self.dist_groups = dist_groups + self.pending_groups = list(dist_groups.keys()) + self.is_first_time = False + else: + for node in self.nodes: + self.check_schedule(node) + + if not self.pending_groups: + return + dist_group_key = self.pending_groups.pop(0) + dist_group = self.dist_groups[dist_group_key] + nodes = cycle(self.nodes[0:dist_group['group_workers']]) + schedule_log = {n.gateway.id: [] for n in self.nodes[0:dist_group['group_workers']]} + for _ in range(len(dist_group['test_indices'])): + n = next(nodes) + # needs cleaner way to be identified + tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:1] + schedule_log[n.gateway.id].extend(tests_per_node) + self._send_tests_group(n, 1, dist_group_key) + del self.dist_groups[dist_group_key] + message = f"\n[-] [csg] schedule: processed scheduling for {dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid, nt in schedule_log.items()])}" + self.report_line(message) + + def _send_tests(self, node: WorkerController, num: int) -> None: + tests_per_node = self.pending[:num] + if tests_per_node: + del self.pending[:num] + self.node2pending[node].extend(tests_per_node) + node.send_runtest_some(tests_per_node) + + def _send_tests_group(self, node: WorkerController, num: int, dist_group_key) -> None: + tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:num] + if tests_per_node: + del self.dist_groups[dist_group_key]['pending_indices'][:num] + for test_index in tests_per_node: + self.pending.remove(test_index) + self.node2pending[node].extend(tests_per_node) + node.send_runtest_some(tests_per_node) + + + def _check_nodes_have_same_collection(self) -> bool: + """Return True if all nodes have collected the same items. + + If collections differ, this method returns False while logging + the collection differences and posting collection errors to + pytest_collectreport hook. + """ + node_collection_items = list(self.node2collection.items()) + first_node, col = node_collection_items[0] + same_collection = True + for node, collection in node_collection_items[1:]: + msg = report_collection_diff( + col, collection, first_node.gateway.id, node.gateway.id + ) + if msg: + same_collection = False + self.log(msg) + if self.config is not None: + rep = pytest.CollectReport( + nodeid=node.gateway.id, + outcome="failed", + longrepr=msg, + result=[], + ) + self.config.hook.pytest_collectreport(report=rep) + + return same_collection + + def report_line(self, line: str) -> None: + if self.terminal and self.config.option.verbose >= 0: + self.terminal.write_line(line) \ No newline at end of file diff --git a/src/xdist/workermanage.py b/src/xdist/workermanage.py index 44d1be4c..2b410108 100644 --- a/src/xdist/workermanage.py +++ b/src/xdist/workermanage.py @@ -11,6 +11,7 @@ from typing import Literal from typing import Sequence from typing import Union +from typing import Optional import uuid import warnings @@ -82,9 +83,12 @@ def rsync_roots(self, gateway: execnet.Gateway) -> None: def setup_nodes( self, putevent: Callable[[tuple[str, dict[str, Any]]], None], + max_nodes: Optional[int] = None ) -> list[WorkerController]: self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs) self.trace("setting up nodes") + if max_nodes: + return [self.setup_node(spec, putevent) for spec in self.specs[0:max_nodes]] return [self.setup_node(spec, putevent) for spec in self.specs] def setup_node( diff --git a/xdist-testing-ntop/README.md b/xdist-testing-ntop/README.md new file mode 100644 index 00000000..4306d4f9 --- /dev/null +++ b/xdist-testing-ntop/README.md @@ -0,0 +1,14 @@ +# Testing pytest-xdist Scheduler Custumization +- Run with `python -m pytest test.py` to run tests +- Run with `python -m pytest test.py -n --dist customgroup --junit-xml results.xml -v` to use new scheduler + report to xml and have verbose terminal output + - Verbose terminal output is semi-required when using customgroup. It allows the user to confirm the correct tests are running with the correct number of processes. + +## Notes: +- Install local pytest with `python -m pip install .` or `python -m pip install -e .` + - When ran from root of `pytest-xdist` repository + +## Using Customgroup +- Add pytest mark `xdist_custom(name="_")` to tests + - Tests without this marking will use the maximum worker count specified by `-n` argument +- Add `xdist_custom` to `pytest.ini` to avoid warnings about unregistered marks +- Run tests as detailed above diff --git a/xdist-testing-ntop/pytest.ini b/xdist-testing-ntop/pytest.ini new file mode 100644 index 00000000..9a767524 --- /dev/null +++ b/xdist-testing-ntop/pytest.ini @@ -0,0 +1,5 @@ +[pytest] +log_cli_level=0 + +markers= + xdist_custom \ No newline at end of file diff --git a/xdist-testing-ntop/test.py b/xdist-testing-ntop/test.py new file mode 100644 index 00000000..ff8f9996 --- /dev/null +++ b/xdist-testing-ntop/test.py @@ -0,0 +1,94 @@ +import pytest +import time + + +@pytest.mark.xdist_custom(name="low_4") +def test_1(): + time.sleep(2) + assert True + +@pytest.mark.xdist_custom(name="low_4") +def test_2(): + time.sleep(2) + assert True + +@pytest.mark.xdist_custom(name="low_4") +def test_3(): + time.sleep(2) + assert True + +@pytest.mark.xdist_custom(name="low_4") +def test_4(): + time.sleep(2) + assert True + +# @pytest.mark.xdist_custom(name="low_4") +# def test_4a(): +# time.sleep(2) +# assert True +# +# @pytest.mark.xdist_custom(name="low_4") +# def test_4b(): +# time.sleep(2) +# assert True +# +# @pytest.mark.xdist_custom(name="low_4") +# def test_4c(): +# time.sleep(2) +# assert True +# +# @pytest.mark.xdist_custom(name="low_4") +# def test_4d(): +# time.sleep(2) +# assert True +# +# @pytest.mark.xdist_custom(name="low_4") +# def test_4e(): +# time.sleep(2) +# assert True + +@pytest.mark.xdist_custom(name="med_2") +def test_5(): + time.sleep(3) + assert True + +@pytest.mark.xdist_custom(name="med_2") +def test_6(): + time.sleep(3) + assert True + +@pytest.mark.xdist_custom(name="med_2") +def test_7(): + time.sleep(3) + assert True + +@pytest.mark.xdist_custom(name="med_2") +def test_8(): + time.sleep(3) + assert True + +@pytest.mark.xdist_custom(name="high_1") +def test_9(): + time.sleep(5) + assert True + +@pytest.mark.xdist_custom(name="high_1") +def test_10(): + time.sleep(5) + assert True + +def test_11(): + time.sleep(1) + assert True + +def test_12(): + time.sleep(1) + assert True + +def test_13(): + time.sleep(1) + assert True + +def test_14(): + time.sleep(1) + assert True