Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev/ci/tur 21619 lint merge 2 #7

Merged
merged 9 commits into from
Sep 20, 2024
36 changes: 21 additions & 15 deletions src/xdist/dsession.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ def loop_once(self) -> None:
if self.sched.tests_finished:
self.triggershutdown()


def is_node_finishing(self, node: WorkerController) -> bool:
"""Check if a test worker is considered to be finishing.

Expand All @@ -191,32 +190,33 @@ def is_node_finishing(self, node: WorkerController) -> bool:
pending = self.sched.node2pending.get(node)
return pending is not None and len(pending) < 2


def are_all_nodes_finishing(self) -> bool:
"""Check if all workers are finishing (See 'is_node_finishing' above)."""
assert self.sched is not None
return all(self.is_node_finishing(node) for node in self.sched.nodes)


def are_all_nodes_done(self) -> bool:
"""Check if all nodes have reported to finish."""
return all(s == "finished" for s in self.worker_status.values())


def are_all_active_nodes_collected(self) -> bool:
"""Check if all nodes have reported collection to be complete."""
if not all(n.gateway.id in self.worker_status for n in self._active_nodes):
return False
return all(self.worker_status[n.gateway.id] == "collected" for n in self._active_nodes)

return all(
self.worker_status[n.gateway.id] == "collected" for n in self._active_nodes
)

def reset_nodes_if_needed(self) -> None:
assert self.sched is not None
assert type(self.sched) is CustomGroup
if self.are_all_nodes_finishing() and self.ready_to_run_tests and not self.sched.do_resched:
if (
self.are_all_nodes_finishing()
and self.ready_to_run_tests
and not self.sched.do_resched
):
self.reset_nodes()


def reset_nodes(self) -> None:
"""Issue shutdown notices to workers for rescheduling purposes."""
assert self.sched is not None
Expand All @@ -227,21 +227,21 @@ def reset_nodes(self) -> None:
if self.is_node_finishing(node):
node.shutdown()


def reschedule(self) -> None:
"""Reschedule tests."""
assert self.sched is not None
assert type(self.sched) is CustomGroup
self.sched.do_resched = False
self.sched.check_schedule(self.sched.nodes[0], 1.0, True)


def prepare_for_reschedule(self) -> None:
"""Update test workers and their status tracking so rescheduling is ready."""
assert type(self.sched) is CustomGroup
assert self.sched is not None
self.remake_nodes = False
num_workers = self.sched.dist_groups[self.sched.pending_groups[0]]['group_workers']
num_workers = self.sched.dist_groups[self.sched.pending_groups[0]][
"group_workers"
]
self.trdist._status = {}
assert self.nodemanager is not None
new_nodes = self.nodemanager.setup_nodes(self.saved_put, num_workers)
Expand Down Expand Up @@ -295,8 +295,10 @@ def worker_workerfinished(self, node: WorkerController) -> None:
try:
self.prepare_for_reschedule()
except Exception as e:
msg = ("Exception caught during preparation for rescheduling. Giving up."
f"\n{''.join(traceback.format_exception(e))}")
msg = (
"Exception caught during preparation for rescheduling. Giving up."
f"\n{''.join(traceback.format_exception(e))}"
)
self.shouldstop = msg
return
self.config.hook.pytest_testnodedown(node=node, error=None)
Expand Down Expand Up @@ -392,7 +394,9 @@ def worker_collectionfinish(
scheduling the first time it logs which scheduler is in use.
"""
if self.shuttingdown:
self.report_line(f"[-] [dse] collectionfinish while closing {node.gateway.id}")
self.report_line(
f"[-] [dse] collectionfinish while closing {node.gateway.id}"
)
return
self.update_worker_status(node, "collected")

Expand All @@ -412,7 +416,9 @@ def worker_collectionfinish(
self.trdist.ensure_show_status()
self.terminal.write_line("")
if self.config.option.verbose > 0:
self.report_line(f"[-] [dse] scheduling tests via {self.sched.__class__.__name__}")
self.report_line(
f"[-] [dse] scheduling tests via {self.sched.__class__.__name__}"
)
if isinstance(self.sched, CustomGroup):
if self.ready_to_run_tests and self.are_all_active_nodes_collected():
# we're coming back here after finishing a batch of tests - so start the next batch
Expand Down
4 changes: 3 additions & 1 deletion src/xdist/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,9 @@ def pytest_collection_modifyitems(
) -> None:
# add the group name to nodeid as suffix if --dist=loadgroup
if config.getvalue("loadgroup") or config.getvalue("customgroup"):
functional_mark = "xdist_group" if config.getvalue("loadgroup") else "xdist_custom"
functional_mark = (
"xdist_group" if config.getvalue("loadgroup") else "xdist_custom"
)
for item in items:
mark = item.get_closest_marker(functional_mark)
if not mark:
Expand Down
77 changes: 47 additions & 30 deletions src/xdist/scheduler/customgroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ def mark_test_complete(
self.check_schedule(node, duration=duration)

def mark_test_pending(self, item: str) -> None:

assert self.collection is not None
self.pending.insert(
0,
Expand All @@ -205,7 +204,9 @@ def remove_pending_tests_from_node(
) -> None:
raise NotImplementedError()

def check_schedule(self, node: WorkerController, duration: float = 0, from_dsession: bool = False) -> None:
def check_schedule(
self, node: WorkerController, duration: float = 0, from_dsession: bool = False
) -> None:
"""Maybe schedule new items on the node.

If there are any globally pending nodes left then this will
Expand All @@ -214,7 +215,9 @@ def check_schedule(self, node: WorkerController, duration: float = 0, from_dsess
heuristic to influence how many tests the node is assigned.
"""
if node.shutting_down:
self.report_line(f"[-] [csg] {node.workerinput['workerid']} is already shutting down")
self.report_line(
f"[-] [csg] {node.workerinput['workerid']} is already shutting down"
)
return

if self.pending:
Expand All @@ -227,18 +230,25 @@ def check_schedule(self, node: WorkerController, duration: float = 0, from_dsess
if self.pending_groups:
dist_group_key = self.pending_groups.pop(0)
dist_group = self.dist_groups[dist_group_key]
nodes = cycle(self.nodes[0:dist_group['group_workers']])
schedule_log: dict[str, Any] = {n.gateway.id:[] for n in self.nodes[0:dist_group['group_workers']]}
for _ in range(len(dist_group['test_indices'])):
nodes = cycle(self.nodes[0 : dist_group["group_workers"]])
schedule_log: dict[str, Any] = {
n.gateway.id: []
for n in self.nodes[0 : dist_group["group_workers"]]
}
for _ in range(len(dist_group["test_indices"])):
n = next(nodes)
#needs cleaner way to be identified
tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:1]
# needs cleaner way to be identified
tests_per_node = self.dist_groups[dist_group_key][
"pending_indices"
][:1]
schedule_log[n.gateway.id].extend(tests_per_node)

self._send_tests_group(n, 1, dist_group_key)
del self.dist_groups[dist_group_key]
message = (f"\n[-] [csg] check_schedule: processed scheduling for {dist_group_key}:"
f" {' '.join([f'{nid} ({len(nt)})' for nid,nt in schedule_log.items()])}")
message = (
f"\n[-] [csg] check_schedule: processed scheduling for {dist_group_key}:"
f" {' '.join([f'{nid} ({len(nt)})' for nid,nt in schedule_log.items()])}"
)
self.report_line(message)

else:
Expand Down Expand Up @@ -310,26 +320,28 @@ def schedule(self) -> None:

if self.is_first_time:
for i, test in enumerate(self.collection):
if '@' in test:
group_mark = test.split('@')[-1]
group_workers = int(group_mark.split('_')[-1])
if "@" in test:
group_mark = test.split("@")[-1]
group_workers = int(group_mark.split("_")[-1])
if group_workers > len(self.nodes):
# We can only distribute across as many nodes as we have available
# If a group requests more, we fallback to our actual max
group_workers = len(self.nodes)
else:
group_mark = 'default'
group_mark = "default"
group_workers = len(self.nodes)
existing_tests = dist_groups.get(group_mark, {}).get('tests', [])
existing_tests = dist_groups.get(group_mark, {}).get("tests", [])
existing_tests.append(test)
existing_indices = dist_groups.get(group_mark, {}).get('test_indices', [])
existing_indices = dist_groups.get(group_mark, {}).get(
"test_indices", []
)
existing_indices.append(i)

dist_groups[group_mark] = {
'tests': existing_tests,
'group_workers': group_workers,
'test_indices': existing_indices,
'pending_indices': existing_indices
"tests": existing_tests,
"group_workers": group_workers,
"test_indices": existing_indices,
"pending_indices": existing_indices,
}
self.dist_groups = dist_groups
self.pending_groups = list(dist_groups.keys())
Expand All @@ -342,17 +354,21 @@ def schedule(self) -> None:
return
dist_group_key = self.pending_groups.pop(0)
dist_group = self.dist_groups[dist_group_key]
nodes = cycle(self.nodes[0:dist_group['group_workers']])
schedule_log: dict[str, Any] = {n.gateway.id: [] for n in self.nodes[0:dist_group['group_workers']]}
for _ in range(len(dist_group['test_indices'])):
nodes = cycle(self.nodes[0 : dist_group["group_workers"]])
schedule_log: dict[str, Any] = {
n.gateway.id: [] for n in self.nodes[0 : dist_group["group_workers"]]
}
for _ in range(len(dist_group["test_indices"])):
n = next(nodes)
# needs cleaner way to be identified
tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:1]
tests_per_node = self.dist_groups[dist_group_key]["pending_indices"][:1]
schedule_log[n.gateway.id].extend(tests_per_node)
self._send_tests_group(n, 1, dist_group_key)
del self.dist_groups[dist_group_key]
message = ("\n[-] [csg] schedule: processed scheduling for "
f"{dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid, nt in schedule_log.items()])}")
message = (
"\n[-] [csg] schedule: processed scheduling for "
f"{dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid, nt in schedule_log.items()])}"
)
self.report_line(message)

def _send_tests(self, node: WorkerController, num: int) -> None:
Expand All @@ -362,16 +378,17 @@ def _send_tests(self, node: WorkerController, num: int) -> None:
self.node2pending[node].extend(tests_per_node)
node.send_runtest_some(tests_per_node)

def _send_tests_group(self, node: WorkerController, num: int, dist_group_key: str) -> None:
tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:num]
def _send_tests_group(
self, node: WorkerController, num: int, dist_group_key: str
) -> None:
tests_per_node = self.dist_groups[dist_group_key]["pending_indices"][:num]
if tests_per_node:
del self.dist_groups[dist_group_key]['pending_indices'][:num]
del self.dist_groups[dist_group_key]["pending_indices"][:num]
for test_index in tests_per_node:
self.pending.remove(test_index)
self.node2pending[node].extend(tests_per_node)
node.send_runtest_some(tests_per_node)


def _check_nodes_have_same_collection(self) -> bool:
"""Return True if all nodes have collected the same items.

Expand Down
2 changes: 1 addition & 1 deletion src/xdist/workermanage.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def rsync_roots(self, gateway: execnet.Gateway) -> None:
def setup_nodes(
self,
putevent: Callable[[tuple[str, dict[str, Any]]], None],
max_nodes: int | None = None
max_nodes: int | None = None,
) -> list[WorkerController]:
self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs)
self.trace("setting up nodes")
Expand Down
14 changes: 14 additions & 0 deletions xdist-testing-ntop/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,25 @@ def test_1():
time.sleep(2)
assert True


@pytest.mark.xdist_custom(name="low_4")
def test_2():
time.sleep(2)
assert True


@pytest.mark.xdist_custom(name="low_4")
def test_3():
time.sleep(2)
assert True


@pytest.mark.xdist_custom(name="low_4")
def test_4():
time.sleep(2)
assert True


# @pytest.mark.xdist_custom(name="low_4")
# def test_4a():
# time.sleep(2)
Expand All @@ -48,48 +52,58 @@ def test_4():
# time.sleep(2)
# assert True


@pytest.mark.xdist_custom(name="med_2")
def test_5():
time.sleep(3)
assert True


@pytest.mark.xdist_custom(name="med_2")
def test_6():
time.sleep(3)
assert True


@pytest.mark.xdist_custom(name="med_2")
def test_7():
time.sleep(3)
assert True


@pytest.mark.xdist_custom(name="med_2")
def test_8():
time.sleep(3)
assert True


@pytest.mark.xdist_custom(name="high_1")
def test_9():
time.sleep(5)
assert True


@pytest.mark.xdist_custom(name="high_1")
def test_10():
time.sleep(5)
assert True


def test_11():
time.sleep(1)
assert True


def test_12():
time.sleep(1)
assert True


def test_13():
time.sleep(1)
assert True


def test_14():
time.sleep(1)
assert True