Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge pytest-xdist latest changes + correct lint errors in nTop fork #6

Merged
merged 13 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- uses: actions/checkout@v4

- name: Build and Check Package
uses: hynek/build-and-inspect-python-package@v2.8
uses: hynek/build-and-inspect-python-package@v2.9

deploy:
needs: package
Expand All @@ -39,7 +39,9 @@ jobs:
path: dist

- name: Publish package to PyPI
uses: pypa/[email protected]
uses: pypa/[email protected]
with:
attestations: true

- name: Push tag
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Build and Check Package
uses: hynek/build-and-inspect-python-package@v2.8
uses: hynek/build-and-inspect-python-package@v2.9

test:

Expand Down
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: "v0.6.1"
rev: "v0.6.5"
hooks:
- id: ruff
args: ["--fix"]
Expand All @@ -23,7 +23,7 @@ repos:
language: python
additional_dependencies: [pygments, restructuredtext_lint]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.11.1
rev: v1.11.2
hooks:
- id: mypy
files: ^(src/|testing/)
Expand Down
44 changes: 28 additions & 16 deletions src/xdist/dsession.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,25 @@
from queue import Empty
from queue import Queue
import sys
import traceback
from typing import Any
from typing import Callable
from typing import Sequence
import warnings
import traceback

import execnet
import pytest

from xdist.remote import Producer
from xdist.remote import WorkerInfo
from xdist.scheduler import CustomGroup
from xdist.scheduler import EachScheduling
from xdist.scheduler import LoadFileScheduling
from xdist.scheduler import LoadGroupScheduling
from xdist.scheduler import LoadScheduling
from xdist.scheduler import LoadScopeScheduling
from xdist.scheduler import Scheduling
from xdist.scheduler import WorkStealingScheduling
from xdist.scheduler import CustomGroup
from xdist.workermanage import NodeManager
from xdist.workermanage import WorkerController

Expand Down Expand Up @@ -60,14 +61,14 @@ def __init__(self, config: pytest.Config) -> None:
self._failed_collection_errors: dict[object, bool] = {}
self._active_nodes: set[WorkerController] = set()
self._failed_nodes_count = 0
self.saved_put = None
self.saved_put: Callable[[tuple[str, dict[str, Any]]], None]
self.remake_nodes = False
self.ready_to_run_tests = False
self._max_worker_restart = get_default_max_worker_restart(self.config)
# summary message to print at the end of the session
self._summary_report: str | None = None
self.terminal = config.pluginmanager.getplugin("terminalreporter")
self.worker_status: dict[WorkerController, str] = {}
self.worker_status: dict[str, str] = {}
if self.terminal:
self.trdist = TerminalDistReporter(config)
config.pluginmanager.register(self.trdist, "terminaldistreporter")
Expand Down Expand Up @@ -180,63 +181,71 @@ def loop_once(self) -> None:
self.triggershutdown()


def is_node_finishing(self, node: WorkerController):
def is_node_finishing(self, node: WorkerController) -> bool:
"""Check if a test worker is considered to be finishing.

Evaluate whether it's on its last test, or if no tests are pending.
"""
assert self.sched is not None
pending = self.sched.node2pending.get(node)
return pending is not None and len(pending) < 2


def is_node_clear(self, node: WorkerController):
def is_node_clear(self, node: WorkerController) -> bool:
"""Check if a test worker has no pending tests."""
assert self.sched is not None
pending = self.sched.node2pending.get(node)
return pending is None or len(pending) == 0


def are_all_nodes_finishing(self):
def are_all_nodes_finishing(self) -> bool:
"""Check if all workers are finishing (See 'is_node_finishing' above)."""
assert self.sched is not None
return all(self.is_node_finishing(node) for node in self.sched.nodes)


def are_all_nodes_done(self):
def are_all_nodes_done(self) -> bool:
"""Check if all nodes have reported to finish."""
return all(s == "finished" for s in self.worker_status.values())


def are_all_active_nodes_collected(self):
def are_all_active_nodes_collected(self) -> bool:
"""Check if all nodes have reported collection to be complete."""
if not all(n.gateway.id in self.worker_status for n in self._active_nodes):
return False
return all(self.worker_status[n.gateway.id] == "collected" for n in self._active_nodes)


def reset_nodes_if_needed(self):
def reset_nodes_if_needed(self) -> None:
assert self.sched is not None
if self.are_all_nodes_finishing() and self.ready_to_run_tests and not self.sched.do_resched:
self.reset_nodes()


def reset_nodes(self):
def reset_nodes(self) -> None:
"""Issue shutdown notices to workers for rescheduling purposes."""
assert self.sched is not None
if len(self.sched.pending) != 0:
self.remake_nodes = True
for node in self.sched.nodes:
if self.is_node_finishing(node):
node.shutdown()


def reschedule(self):
def reschedule(self) -> None:
"""Reschedule tests."""
assert self.sched is not None
self.sched.do_resched = False
self.sched.check_schedule(self.sched.nodes[0], 1.0, True)


def prepare_for_reschedule(self):
def prepare_for_reschedule(self) -> None:
"""Update test workers and their status tracking so rescheduling is ready."""
assert self.sched is not None
self.remake_nodes = False
num_workers = self.sched.dist_groups[self.sched.pending_groups[0]]['group_workers']
self.trdist._status = {}
assert self.nodemanager is not None
new_nodes = self.nodemanager.setup_nodes(self.saved_put, num_workers)
self.worker_status = {}
self._active_nodes = set()
Expand Down Expand Up @@ -287,7 +296,9 @@ def worker_workerfinished(self, node: WorkerController) -> None:
try:
self.prepare_for_reschedule()
except Exception as e:
self.shouldstop = f"Exception caught during preparation for rescheduling. Giving up.\n{''.join(traceback.format_exception(e))}"
msg = ("Exception caught during preparation for rescheduling. Giving up."
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😢

f"\n{''.join(traceback.format_exception(e))}")
self.shouldstop = msg
return
self.config.hook.pytest_testnodedown(node=node, error=None)
if node.workeroutput["exitstatus"] == 2: # keyboard-interrupt
Expand All @@ -308,10 +319,11 @@ def worker_workerfinished(self, node: WorkerController) -> None:
assert not crashitem, (crashitem, node)
self._active_nodes.remove(node)

def update_worker_status(self, node, status):
def update_worker_status(self, node: WorkerController, status: str) -> None:
"""Track the worker status.

Can be used at callbacks like 'worker_workerfinished' so we remember wchic event was reported last by each worker.
Can be used at callbacks like 'worker_workerfinished' so we remember wchic event
was reported last by each worker.
"""
self.worker_status[node.workerinfo["id"]] = status

Expand Down
2 changes: 1 addition & 1 deletion src/xdist/scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from xdist.scheduler.customgroup import CustomGroup as CustomGroup
from xdist.scheduler.each import EachScheduling as EachScheduling
from xdist.scheduler.load import LoadScheduling as LoadScheduling
from xdist.scheduler.customgroup import CustomGroup as CustomGroup
from xdist.scheduler.loadfile import LoadFileScheduling as LoadFileScheduling
from xdist.scheduler.loadgroup import LoadGroupScheduling as LoadGroupScheduling
from xdist.scheduler.loadscope import LoadScopeScheduling as LoadScopeScheduling
Expand Down
28 changes: 17 additions & 11 deletions src/xdist/scheduler/customgroup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import annotations

from itertools import cycle
from typing import Sequence, Any
from typing import Any
from typing import Sequence

import pytest

Expand All @@ -10,6 +11,7 @@
from xdist.workermanage import parse_spec_config
from xdist.workermanage import WorkerController


class CustomGroup:
"""Implement grouped load scheduling across a variable number of nodes.

Expand Down Expand Up @@ -203,7 +205,7 @@ def remove_pending_tests_from_node(
) -> None:
raise NotImplementedError()

def check_schedule(self, node: WorkerController, duration: float = 0, from_dsession=False) -> None:
def check_schedule(self, node: WorkerController, duration: float = 0, from_dsession: bool = False) -> None:
"""Maybe schedule new items on the node.

If there are any globally pending nodes left then this will
Expand All @@ -226,7 +228,7 @@ def check_schedule(self, node: WorkerController, duration: float = 0, from_dsess
dist_group_key = self.pending_groups.pop(0)
dist_group = self.dist_groups[dist_group_key]
nodes = cycle(self.nodes[0:dist_group['group_workers']])
schedule_log = {n.gateway.id:[] for n in self.nodes[0:dist_group['group_workers']]}
schedule_log: dict[str, Any] = {n.gateway.id:[] for n in self.nodes[0:dist_group['group_workers']]}
for _ in range(len(dist_group['test_indices'])):
n = next(nodes)
#needs cleaner way to be identified
Expand All @@ -235,13 +237,16 @@ def check_schedule(self, node: WorkerController, duration: float = 0, from_dsess

self._send_tests_group(n, 1, dist_group_key)
del self.dist_groups[dist_group_key]
message = f"\n[-] [csg] check_schedule: processed scheduling for {dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid,nt in schedule_log.items()])}"
message = (f"\n[-] [csg] check_schedule: processed scheduling for {dist_group_key}:"
f" {' '.join([f'{nid} ({len(nt)})' for nid,nt in schedule_log.items()])}")
self.report_line(message)

else:
pending = self.node2pending.get(node)
pending = self.node2pending.get(node, [])
if len(pending) < 2:
self.report_line(f"[-] [csg] Shutting down {node.workerinput['workerid']} because only one case is pending")
self.report_line(
f"[-] [csg] Shutting down {node.workerinput['workerid']} because only one case is pending"
)
node.shutdown()

self.log("num items waiting for node:", len(self.pending))
Expand Down Expand Up @@ -301,7 +306,7 @@ def schedule(self) -> None:
if not self.collection:
return

dist_groups = {}
dist_groups: dict[str, dict[Any, Any]] = {}

if self.is_first_time:
for i, test in enumerate(self.collection):
Expand Down Expand Up @@ -338,15 +343,16 @@ def schedule(self) -> None:
dist_group_key = self.pending_groups.pop(0)
dist_group = self.dist_groups[dist_group_key]
nodes = cycle(self.nodes[0:dist_group['group_workers']])
schedule_log = {n.gateway.id: [] for n in self.nodes[0:dist_group['group_workers']]}
schedule_log: dict[str, Any] = {n.gateway.id: [] for n in self.nodes[0:dist_group['group_workers']]}
for _ in range(len(dist_group['test_indices'])):
n = next(nodes)
# needs cleaner way to be identified
tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:1]
schedule_log[n.gateway.id].extend(tests_per_node)
self._send_tests_group(n, 1, dist_group_key)
del self.dist_groups[dist_group_key]
message = f"\n[-] [csg] schedule: processed scheduling for {dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid, nt in schedule_log.items()])}"
message = ("\n[-] [csg] schedule: processed scheduling for "
f"{dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid, nt in schedule_log.items()])}")
self.report_line(message)

def _send_tests(self, node: WorkerController, num: int) -> None:
Expand All @@ -356,7 +362,7 @@ def _send_tests(self, node: WorkerController, num: int) -> None:
self.node2pending[node].extend(tests_per_node)
node.send_runtest_some(tests_per_node)

def _send_tests_group(self, node: WorkerController, num: int, dist_group_key) -> None:
def _send_tests_group(self, node: WorkerController, num: int, dist_group_key: str) -> None:
tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:num]
if tests_per_node:
del self.dist_groups[dist_group_key]['pending_indices'][:num]
Expand Down Expand Up @@ -396,4 +402,4 @@ def _check_nodes_have_same_collection(self) -> bool:

def report_line(self, line: str) -> None:
if self.terminal and self.config.option.verbose >= 0:
self.terminal.write_line(line)
self.terminal.write_line(line)
8 changes: 8 additions & 0 deletions src/xdist/scheduler/each.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from typing import Any
from typing import Sequence

import pytest
Expand Down Expand Up @@ -29,6 +30,10 @@ def __init__(self, config: pytest.Config, log: Producer | None = None) -> None:
self.numnodes = len(parse_spec_config(config))
self.node2collection: dict[WorkerController, list[str]] = {}
self.node2pending: dict[WorkerController, list[int]] = {}
self.do_resched: bool = False
self.pending: list[int] = []
self.dist_groups: dict[str, Any] = {}
self.pending_groups: list[str] = []
self._started: list[WorkerController] = []
self._removed2pending: dict[WorkerController, list[int]] = {}
if log is None:
Expand Down Expand Up @@ -106,6 +111,9 @@ def add_node_collection(
self.node2pending[node] = pending
break

def check_schedule(self, node: WorkerController, duration: float = 0, from_dsession: bool = False) -> None:
raise NotImplementedError()

def mark_test_complete(
self, node: WorkerController, item_index: int, duration: float = 0
) -> None:
Expand Down
7 changes: 6 additions & 1 deletion src/xdist/scheduler/load.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from itertools import cycle
from typing import Any
from typing import Sequence

import pytest
Expand Down Expand Up @@ -62,6 +63,9 @@ def __init__(self, config: pytest.Config, log: Producer | None = None) -> None:
self.node2collection: dict[WorkerController, list[str]] = {}
self.node2pending: dict[WorkerController, list[int]] = {}
self.pending: list[int] = []
self.do_resched: bool = False
self.dist_groups: dict[str, Any] = {}
self.pending_groups: list[str] = []
self.collection: list[str] | None = None
if log is None:
self.log = Producer("loadsched")
Expand Down Expand Up @@ -176,7 +180,8 @@ def remove_pending_tests_from_node(
) -> None:
raise NotImplementedError()

def check_schedule(self, node: WorkerController, duration: float = 0) -> None:

def check_schedule(self, node: WorkerController, duration: float = 0, from_dsession: bool = False) -> None:
"""Maybe schedule new items on the node.

If there are any globally pending nodes left then this will
Expand Down
11 changes: 9 additions & 2 deletions src/xdist/scheduler/loadscope.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from collections import OrderedDict
from typing import Any
from typing import NoReturn
from typing import Sequence

Expand Down Expand Up @@ -93,11 +94,14 @@ class LoadScopeScheduling:
def __init__(self, config: pytest.Config, log: Producer | None = None) -> None:
self.numnodes = len(parse_spec_config(config))
self.collection: list[str] | None = None

self.node2pending: dict[WorkerController, list[int]] = {}
self.workqueue: OrderedDict[str, dict[str, bool]] = OrderedDict()
self.assigned_work: dict[WorkerController, dict[str, dict[str, bool]]] = {}
self.registered_collections: dict[WorkerController, list[str]] = {}

self.do_resched: bool = False
self.pending: list[int] = []
self.dist_groups: dict[str, Any] = {}
self.pending_groups: list[str] = []
if log is None:
self.log = Producer("loadscopesched")
else:
Expand Down Expand Up @@ -163,6 +167,9 @@ def add_node(self, node: WorkerController) -> None:
assert node not in self.assigned_work
self.assigned_work[node] = {}

def check_schedule(self, node: WorkerController, duration: float = 0, from_dsession: bool = False) -> None:
raise NotImplementedError()

def remove_node(self, node: WorkerController) -> str | None:
"""Remove a node from the scheduler.

Expand Down
Loading