Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TUR-21619: Update to latest upstream changes, correct lint errors #5

Closed
wants to merge 10 commits into from
6 changes: 4 additions & 2 deletions .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- uses: actions/checkout@v4

- name: Build and Check Package
uses: hynek/build-and-inspect-python-package@v2.8
uses: hynek/build-and-inspect-python-package@v2.9

deploy:
needs: package
Expand All @@ -39,7 +39,9 @@ jobs:
path: dist

- name: Publish package to PyPI
uses: pypa/[email protected]
uses: pypa/[email protected]
with:
attestations: true

- name: Push tag
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Build and Check Package
uses: hynek/build-and-inspect-python-package@v2.8
uses: hynek/build-and-inspect-python-package@v2.9

test:

Expand Down
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: "v0.6.1"
rev: "v0.6.5"
hooks:
- id: ruff
args: ["--fix"]
Expand All @@ -23,7 +23,7 @@ repos:
language: python
additional_dependencies: [pygments, restructuredtext_lint]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.11.1
rev: v1.11.2
hooks:
- id: mypy
files: ^(src/|testing/)
Expand Down
129 changes: 122 additions & 7 deletions src/xdist/dsession.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Any
from typing import Sequence
import warnings
import traceback

import execnet
import pytest
Expand All @@ -21,6 +22,7 @@
from xdist.scheduler import LoadScopeScheduling
from xdist.scheduler import Scheduling
from xdist.scheduler import WorkStealingScheduling
from xdist.scheduler import CustomGroup
from xdist.workermanage import NodeManager
from xdist.workermanage import WorkerController

Expand Down Expand Up @@ -58,10 +60,14 @@ def __init__(self, config: pytest.Config) -> None:
self._failed_collection_errors: dict[object, bool] = {}
self._active_nodes: set[WorkerController] = set()
self._failed_nodes_count = 0
self.saved_put = None
self.remake_nodes = False
self.ready_to_run_tests = False
self._max_worker_restart = get_default_max_worker_restart(self.config)
# summary message to print at the end of the session
self._summary_report: str | None = None
self.terminal = config.pluginmanager.getplugin("terminalreporter")
self.worker_status: dict[WorkerController, str] = {}
if self.terminal:
self.trdist = TerminalDistReporter(config)
config.pluginmanager.register(self.trdist, "terminaldistreporter")
Expand All @@ -87,6 +93,7 @@ def pytest_sessionstart(self, session: pytest.Session) -> None:
soon as nodes start they will emit the worker_workerready event.
"""
self.nodemanager = NodeManager(self.config)
self.saved_put = self.queue.put
nodes = self.nodemanager.setup_nodes(putevent=self.queue.put)
self._active_nodes.update(nodes)
self._session = session
Expand Down Expand Up @@ -123,6 +130,8 @@ def pytest_xdist_make_scheduler(
return LoadGroupScheduling(config, log)
if dist == "worksteal":
return WorkStealingScheduling(config, log)
if dist == "customgroup":
return CustomGroup(config, log)
return None

@pytest.hookimpl
Expand All @@ -147,14 +156,19 @@ def loop_once(self) -> None:
"""Process one callback from one of the workers."""
while 1:
if not self._active_nodes:
# If everything has died stop looping
self.triggershutdown()
raise RuntimeError("Unexpectedly no active workers available")
# Worker teardown + recreation only occurs for CustomGroup Scheduler
if isinstance(self.sched, CustomGroup) and self.remake_nodes:
pass
else:
# We aren't using CustomGroup scheduler and everything has died: stop looping
self.triggershutdown()
raise RuntimeError("Unexpectedly no active workers available")
try:
eventcall = self.queue.get(timeout=2.0)
break
except Empty:
continue

callname, kwargs = eventcall
assert callname, kwargs
method = "worker_" + callname
Expand All @@ -165,6 +179,71 @@ def loop_once(self) -> None:
if self.sched.tests_finished:
self.triggershutdown()


def is_node_finishing(self, node: WorkerController):
"""Check if a test worker is considered to be finishing.

Evaluate whether it's on its last test, or if no tests are pending.
"""
pending = self.sched.node2pending.get(node)
return pending is not None and len(pending) < 2


def is_node_clear(self, node: WorkerController):
"""Check if a test worker has no pending tests."""
pending = self.sched.node2pending.get(node)
return pending is None or len(pending) == 0


def are_all_nodes_finishing(self):
"""Check if all workers are finishing (See 'is_node_finishing' above)."""
return all(self.is_node_finishing(node) for node in self.sched.nodes)


def are_all_nodes_done(self):
"""Check if all nodes have reported to finish."""
return all(s == "finished" for s in self.worker_status.values())


def are_all_active_nodes_collected(self):
"""Check if all nodes have reported collection to be complete."""
if not all(n.gateway.id in self.worker_status for n in self._active_nodes):
return False
return all(self.worker_status[n.gateway.id] == "collected" for n in self._active_nodes)


def reset_nodes_if_needed(self):
if self.are_all_nodes_finishing() and self.ready_to_run_tests and not self.sched.do_resched:
self.reset_nodes()


def reset_nodes(self):
"""Issue shutdown notices to workers for rescheduling purposes."""
if len(self.sched.pending) != 0:
self.remake_nodes = True
for node in self.sched.nodes:
if self.is_node_finishing(node):
node.shutdown()


def reschedule(self):
"""Reschedule tests."""
self.sched.do_resched = False
self.sched.check_schedule(self.sched.nodes[0], 1.0, True)


def prepare_for_reschedule(self):
"""Update test workers and their status tracking so rescheduling is ready."""
self.remake_nodes = False
num_workers = self.sched.dist_groups[self.sched.pending_groups[0]]['group_workers']
self.trdist._status = {}
new_nodes = self.nodemanager.setup_nodes(self.saved_put, num_workers)
self.worker_status = {}
self._active_nodes = set()
self._active_nodes.update(new_nodes)
self.sched.node2pending = {}
self.sched.do_resched = True

#
# callbacks for processing events from workers
#
Expand All @@ -182,6 +261,7 @@ def worker_workerready(
node.workerinfo = workerinfo
node.workerinfo["id"] = node.gateway.id
node.workerinfo["spec"] = node.gateway.spec
self.update_worker_status(node, "ready")

self.config.hook.pytest_testnodeready(node=node)
if self.shuttingdown:
Expand All @@ -198,6 +278,17 @@ def worker_workerfinished(self, node: WorkerController) -> None:
The node might not be in the scheduler if it had not emitted
workerready before shutdown was triggered.
"""
self.update_worker_status(node, "finished")

if isinstance(self.sched, CustomGroup) and self.remake_nodes:
node.ensure_teardown()
self._active_nodes.remove(node)
if self.are_all_nodes_done():
try:
self.prepare_for_reschedule()
except Exception as e:
self.shouldstop = f"Exception caught during preparation for rescheduling. Giving up.\n{''.join(traceback.format_exception(e))}"
return
self.config.hook.pytest_testnodedown(node=node, error=None)
if node.workeroutput["exitstatus"] == 2: # keyboard-interrupt
self.shouldstop = f"{node} received keyboard-interrupt"
Expand All @@ -217,6 +308,13 @@ def worker_workerfinished(self, node: WorkerController) -> None:
assert not crashitem, (crashitem, node)
self._active_nodes.remove(node)

def update_worker_status(self, node, status):
"""Track the worker status.

Can be used at callbacks like 'worker_workerfinished' so we remember wchic event was reported last by each worker.
"""
self.worker_status[node.workerinfo["id"]] = status

def worker_internal_error(
self, node: WorkerController, formatted_error: str
) -> None:
Expand Down Expand Up @@ -283,7 +381,10 @@ def worker_collectionfinish(
scheduling the first time it logs which scheduler is in use.
"""
if self.shuttingdown:
self.report_line(f"[-] [dse] collectionfinish while closing {node.gateway.id}")
return
self.update_worker_status(node, "collected")

self.config.hook.pytest_xdist_node_collection_finished(node=node, ids=ids)
# tell session which items were effectively collected otherwise
# the controller node will finish the session with EXIT_NOTESTSCOLLECTED
Expand All @@ -300,10 +401,18 @@ def worker_collectionfinish(
self.trdist.ensure_show_status()
self.terminal.write_line("")
if self.config.option.verbose > 0:
self.terminal.write_line(
f"scheduling tests via {self.sched.__class__.__name__}"
)
self.sched.schedule()
self.report_line(f"[-] [dse] scheduling tests via {self.sched.__class__.__name__}")
if isinstance(self.sched, CustomGroup):
if self.ready_to_run_tests and self.are_all_active_nodes_collected():
# we're coming back here after finishing a batch of tests - so start the next batch
self.reschedule()
self.reset_nodes_if_needed()
else:
self.ready_to_run_tests = True
self.sched.schedule()
self.reset_nodes_if_needed()
else:
self.sched.schedule()

def worker_logstart(
self,
Expand Down Expand Up @@ -339,6 +448,12 @@ def worker_runtest_protocol_complete(
"""
assert self.sched is not None
self.sched.mark_test_complete(node, item_index, duration)
if isinstance(self.sched, CustomGroup):
if self.are_all_nodes_finishing():
if self.shouldstop:
self.report_line("Won't reschedule - should stop.")
else:
self.reset_nodes()

def worker_unscheduled(
self, node: WorkerController, indices: Sequence[int]
Expand Down
3 changes: 3 additions & 0 deletions src/xdist/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def pytest_addoption(parser: pytest.Parser) -> None:
"loadfile",
"loadgroup",
"worksteal",
"customgroup",
"no",
],
dest="dist",
Expand All @@ -124,6 +125,8 @@ def pytest_addoption(parser: pytest.Parser) -> None:
"loadgroup: Like 'load', but sends tests marked with 'xdist_group' to the same worker.\n\n"
"worksteal: Split the test suite between available environments,"
" then re-balance when any worker runs out of tests.\n\n"
# TODO: Update docstring
"customgroup: TODO: add docs here"
"(default) no: Run tests inprocess, don't distribute."
),
)
Expand Down
7 changes: 5 additions & 2 deletions src/xdist/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,17 @@ def run_one_test(self) -> None:
"runtest_protocol_complete", item_index=self.item_index, duration=duration
)

@pytest.mark.trylast
def pytest_collection_modifyitems(
self,
config: pytest.Config,
items: list[pytest.Item],
) -> None:
# add the group name to nodeid as suffix if --dist=loadgroup
if config.getvalue("loadgroup"):
if config.getvalue("loadgroup") or config.getvalue("customgroup"):
functional_mark = "xdist_group" if config.getvalue("loadgroup") else "xdist_custom"
for item in items:
mark = item.get_closest_marker("xdist_group")
mark = item.get_closest_marker(functional_mark)
if not mark:
continue
gname = (
Expand Down Expand Up @@ -357,6 +359,7 @@ def getinfodict() -> WorkerInfo:

def setup_config(config: pytest.Config, basetemp: str | None) -> None:
config.option.loadgroup = config.getvalue("dist") == "loadgroup"
config.option.customgroup = config.getvalue("dist") == "customgroup"
config.option.looponfail = False
config.option.usepdb = False
config.option.dist = "no"
Expand Down
1 change: 1 addition & 0 deletions src/xdist/scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from xdist.scheduler.each import EachScheduling as EachScheduling
from xdist.scheduler.load import LoadScheduling as LoadScheduling
from xdist.scheduler.customgroup import CustomGroup as CustomGroup
from xdist.scheduler.loadfile import LoadFileScheduling as LoadFileScheduling
from xdist.scheduler.loadgroup import LoadGroupScheduling as LoadGroupScheduling
from xdist.scheduler.loadscope import LoadScopeScheduling as LoadScopeScheduling
Expand Down
Loading