From 510f212298bd7fc8a47fa3219deb2706988f6d4e Mon Sep 17 00:00:00 2001 From: Ronny Pfannschmidt Date: Tue, 9 Feb 2021 21:49:52 +0100 Subject: [PATCH] initial extract of the execnet gateway WIP --- src/xdist/backends.py | 87 ++++++++++++++++++++++++++++++++++++ src/xdist/dsession.py | 2 +- src/xdist/looponfail.py | 3 +- src/xdist/workermanage.py | 69 ++++++---------------------- testing/test_plugin.py | 4 +- testing/test_remote.py | 4 +- testing/test_workermanage.py | 41 ++++++++++------- 7 files changed, 133 insertions(+), 77 deletions(-) create mode 100644 src/xdist/backends.py diff --git a/src/xdist/backends.py b/src/xdist/backends.py new file mode 100644 index 00000000..1369e9ce --- /dev/null +++ b/src/xdist/backends.py @@ -0,0 +1,87 @@ +import re +import fnmatch +import os +import py # TODO remove +import pytest + + +def parse_spec_config(config): + xspeclist = [] + for xspec in config.getvalue("tx"): + i = xspec.find("*") + try: + num = int(xspec[:i]) + except ValueError: + xspeclist.append(xspec) + else: + xspeclist.extend([xspec[i + 1 :]] * num) + if not xspeclist: + raise pytest.UsageError( + "MISSING test execution (tx) nodes: please specify --tx" + ) + return xspeclist + + +class ExecnetNodeControl: + @classmethod + def from_config(cls, config, specs, defaultchdir): + final_specs = [] + + import execnet + + group = execnet.Group() + if specs is None: + specs = [execnet.XSpec(x) for x in parse_spec_config(config)] + for spec in specs: + if not isinstance(spec, execnet.XSpec): + spec = execnet.XSpec(spec) + if not spec.chdir and not spec.popen: + spec.chdir = defaultchdir + group.allocate_id(spec) + final_specs.append(spec) + + return cls(group, final_specs) + + def __init__(self, group, specs): + self.group = group + self.specs = specs + + @staticmethod + def get_rsync(source, verbose=False, ignores=None): + import execnet + + # todo: cache the class + class HostRSync(execnet.RSync): + """ RSyncer that filters out common files + """ + + def __init__(self, sourcedir, *args, **kwargs): + self._synced = {} + ignores = kwargs.pop("ignores", None) or [] + self._ignores = [ + re.compile(fnmatch.translate(getattr(x, "strpath", x))) + for x in ignores + ] + super().__init__(sourcedir=sourcedir, **kwargs) + + def filter(self, path): + path = py.path.local(path) + for cre in self._ignores: + if cre.match(path.basename) or cre.match(path.strpath): + return False + else: + return True + + def add_target_host(self, gateway, finished=None): + remotepath = os.path.basename(self._sourcedir) + super().add_target( + gateway, remotepath, finishedcallback=finished, delete=True + ) + + def _report_send_file(self, gateway, modified_rel_path): + if self._verbose > 0: + path = os.path.basename(self._sourcedir) + "/" + modified_rel_path + remotepath = gateway.spec.chdir + print("{}:{} <= {}".format(gateway.spec, remotepath, path)) + + return HostRSync(source, verbose=verbose, ignores=ignores) diff --git a/src/xdist/dsession.py b/src/xdist/dsession.py index 3db8e025..e13d96fe 100644 --- a/src/xdist/dsession.py +++ b/src/xdist/dsession.py @@ -308,7 +308,7 @@ def _clone_node(self, node): """ spec = node.gateway.spec spec.id = None - self.nodemanager.group.allocate_id(spec) + self.nodemanager._execnet.group.allocate_id(spec) node = self.nodemanager.setup_node(spec, self.queue.put) self._active_nodes.add(node) return node diff --git a/src/xdist/looponfail.py b/src/xdist/looponfail.py index 19b9313e..4172420d 100644 --- a/src/xdist/looponfail.py +++ b/src/xdist/looponfail.py @@ -10,7 +10,6 @@ import pytest import sys import time -import execnet def pytest_addoption(parser): @@ -65,6 +64,8 @@ def trace(self, *args): print("RemoteControl:", msg) def initgateway(self): + import execnet + return execnet.makegateway("popen") def setup(self, out=None): diff --git a/src/xdist/workermanage.py b/src/xdist/workermanage.py index 6a705d34..7b3ef491 100644 --- a/src/xdist/workermanage.py +++ b/src/xdist/workermanage.py @@ -1,14 +1,12 @@ -import fnmatch import os -import re import sys import uuid import py import pytest -import execnet import xdist.remote +from .backends import ExecnetNodeControl def parse_spec_config(config): @@ -38,17 +36,9 @@ def __init__(self, config, specs=None, defaultchdir="pyexecnetcache"): self.testrunuid = self.config.getoption("testrunuid") if self.testrunuid is None: self.testrunuid = uuid.uuid4().hex - self.group = execnet.Group() - if specs is None: - specs = self._getxspecs() - self.specs = [] - for spec in specs: - if not isinstance(spec, execnet.XSpec): - spec = execnet.XSpec(spec) - if not spec.chdir and not spec.popen: - spec.chdir = defaultchdir - self.group.allocate_id(spec) - self.specs.append(spec) + + self._execnet = ExecnetNodeControl.from_config(config, specs, defaultchdir) + self.roots = self._getrsyncdirs() self.rsyncoptions = self._getrsyncoptions() self._rsynced_specs = set() @@ -60,12 +50,14 @@ def rsync_roots(self, gateway): self.rsync(gateway, root, **self.rsyncoptions) def setup_nodes(self, putevent): - self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs) + self.config.hook.pytest_xdist_setupnodes( + config=self.config, specs=self._execnet.specs + ) self.trace("setting up nodes") - return [self.setup_node(spec, putevent) for spec in self.specs] + return [self.setup_node(spec, putevent) for spec in self._execnet.specs] def setup_node(self, spec, putevent): - gw = self.group.makegateway(spec) + gw = self._execnet.group.makegateway(spec) self.config.hook.pytest_xdist_newgateway(gateway=gw) self.rsync_roots(gw) node = WorkerController(self, gw, self.config, putevent) @@ -75,13 +67,11 @@ def setup_node(self, spec, putevent): return node def teardown_nodes(self): - self.group.terminate(self.EXIT_TIMEOUT) - - def _getxspecs(self): - return [execnet.XSpec(x) for x in parse_spec_config(self.config)] + self._execnet.group.terminate(self.EXIT_TIMEOUT) def _getrsyncdirs(self): - for spec in self.specs: + # todo: move to backends ? + for spec in self._execnet.specs: if not spec.popen or spec.chdir: break else: @@ -130,7 +120,7 @@ def rsync(self, gateway, source, notify=None, verbose=False, ignores=None): # XXX This changes the calling behaviour of # pytest_xdist_rsyncstart and pytest_xdist_rsyncfinish to # be called once per rsync target. - rsync = HostRSync(source, verbose=verbose, ignores=ignores) + rsync = self._execnet.get_rsync(source, verbose=verbose, ignores=ignores) spec = gateway.spec if spec.popen and not spec.chdir: # XXX This assumes that sources are python-packages @@ -156,37 +146,6 @@ def finished(): self.config.hook.pytest_xdist_rsyncfinish(source=source, gateways=[gateway]) -class HostRSync(execnet.RSync): - """ RSyncer that filters out common files - """ - - def __init__(self, sourcedir, *args, **kwargs): - self._synced = {} - ignores = kwargs.pop("ignores", None) or [] - self._ignores = [ - re.compile(fnmatch.translate(getattr(x, "strpath", x))) for x in ignores - ] - super().__init__(sourcedir=sourcedir, **kwargs) - - def filter(self, path): - path = py.path.local(path) - for cre in self._ignores: - if cre.match(path.basename) or cre.match(path.strpath): - return False - else: - return True - - def add_target_host(self, gateway, finished=None): - remotepath = os.path.basename(self._sourcedir) - super().add_target(gateway, remotepath, finishedcallback=finished, delete=True) - - def _report_send_file(self, gateway, modified_rel_path): - if self._verbose > 0: - path = os.path.basename(self._sourcedir) + "/" + modified_rel_path - remotepath = gateway.spec.chdir - print("{}:{} <= {}".format(gateway.spec, remotepath, path)) - - def make_reltoroot(roots, args): # XXX introduce/use public API for splitting pytest args splitcode = "::" @@ -224,7 +183,7 @@ def __init__(self, nodemanager, gateway, config, putevent): self.config = config self.workerinput = { "workerid": gateway.id, - "workercount": len(nodemanager.specs), + "workercount": len(nodemanager._execnet.specs), "testrunuid": nodemanager.testrunuid, "mainargv": sys.argv, } diff --git a/testing/test_plugin.py b/testing/test_plugin.py index c1aac652..ff401c10 100644 --- a/testing/test_plugin.py +++ b/testing/test_plugin.py @@ -149,7 +149,7 @@ class TestDistOptions: def test_getxspecs(self, testdir): config = testdir.parseconfigure("--tx=popen", "--tx", "ssh=xyz") nodemanager = NodeManager(config) - xspecs = nodemanager._getxspecs() + xspecs = nodemanager._execnet.specs assert len(xspecs) == 2 print(xspecs) assert xspecs[0].popen @@ -157,7 +157,7 @@ def test_getxspecs(self, testdir): def test_xspecs_multiplied(self, testdir): config = testdir.parseconfigure("--tx=3*popen") - xspecs = NodeManager(config)._getxspecs() + xspecs = NodeManager(config)._execnet.specs assert len(xspecs) == 3 assert xspecs[1].popen diff --git a/testing/test_remote.py b/testing/test_remote.py index da2f6a86..7cdae94f 100644 --- a/testing/test_remote.py +++ b/testing/test_remote.py @@ -46,7 +46,9 @@ def setup(self,): class DummyMananger: testrunuid = uuid.uuid4().hex - specs = [0, 1] + + class _execnet: + specs = [0, 1] self.slp = WorkerController(DummyMananger, self.gateway, config, putevent) self.request.addfinalizer(self.slp.ensure_teardown) diff --git a/testing/test_workermanage.py b/testing/test_workermanage.py index 3cf19a8f..ba3ce3f6 100644 --- a/testing/test_workermanage.py +++ b/testing/test_workermanage.py @@ -4,7 +4,8 @@ import execnet from _pytest.pytester import HookRecorder from xdist import workermanage, newhooks -from xdist.workermanage import HostRSync, NodeManager +from xdist.workermanage import NodeManager +from xdist.backends import ExecnetNodeControl pytest_plugins = "pytester" @@ -48,13 +49,13 @@ def setup(self): class TestNodeManagerPopen: def test_popen_no_default_chdir(self, config): gm = NodeManager(config, ["popen"]) - assert gm.specs[0].chdir is None + assert gm._execnet.specs[0].chdir is None def test_default_chdir(self, config): specs = ["ssh=noco", "socket=xyz"] - for spec in NodeManager(config, specs).specs: + for spec in NodeManager(config, specs)._execnet.specs: assert spec.chdir == "pyexecnetcache" - for spec in NodeManager(config, specs, defaultchdir="abc").specs: + for spec in NodeManager(config, specs, defaultchdir="abc")._execnet.specs: assert spec.chdir == "abc" def test_popen_makegateway_events(self, config, hookrecorder, workercontroller): @@ -68,16 +69,16 @@ def test_popen_makegateway_events(self, config, hookrecorder, workercontroller): assert call.gateway.id == "gw0" call = hookrecorder.popcall("pytest_xdist_newgateway") assert call.gateway.id == "gw1" - assert len(hm.group) == 2 + assert len(hm._execnet.group) == 2 hm.teardown_nodes() - assert not len(hm.group) + assert not len(hm._execnet.group) def test_popens_rsync(self, config, mysetup, workercontroller): source = mysetup.source hm = NodeManager(config, ["popen"] * 2) hm.setup_nodes(None) - assert len(hm.group) == 2 - for gw in hm.group: + assert len(hm._execnet.group) == 2 + for gw in hm._execnet.group: class pseudoexec: args = [] @@ -90,11 +91,11 @@ def waitclose(self): gw.remote_exec = pseudoexec notifications = [] - for gw in hm.group: + for gw in hm._execnet.group: hm.rsync(gw, source, notify=lambda *args: notifications.append(args)) assert not notifications hm.teardown_nodes() - assert not len(hm.group) + assert not len(hm._execnet.group) assert "sys.path.insert" in gw.remote_exec.args[0] def test_rsync_popen_with_path(self, config, mysetup, workercontroller): @@ -103,10 +104,14 @@ def test_rsync_popen_with_path(self, config, mysetup, workercontroller): hm.setup_nodes(None) source.ensure("dir1", "dir2", "hello") notifications = [] - for gw in hm.group: + for gw in hm._execnet.group: hm.rsync(gw, source, notify=lambda *args: notifications.append(args)) assert len(notifications) == 1 - assert notifications[0] == ("rsyncrootready", hm.group["gw0"].spec, source) + assert notifications[0] == ( + "rsyncrootready", + hm._execnet.group["gw0"].spec, + source, + ) hm.teardown_nodes() dest = dest.join(source.basename) assert dest.join("dir1").check() @@ -121,12 +126,12 @@ def test_rsync_same_popen_twice( hm.roots = [] hm.setup_nodes(None) source.ensure("dir1", "dir2", "hello") - gw = hm.group[0] + gw = hm._execnet.group[0] hm.rsync(gw, source) call = hookrecorder.popcall("pytest_xdist_rsyncstart") assert call.source == source assert len(call.gateways) == 1 - assert call.gateways[0] in hm.group + assert call.gateways[0] in hm._execnet.group call = hookrecorder.popcall("pytest_xdist_rsyncfinish") @@ -137,7 +142,9 @@ def test_hrsync_filter(self, mysetup): source.ensure(".svn", "entries") source.ensure(".somedotfile", "moreentries") source.ensure("somedir", "editfile~") - syncer = HostRSync(source, ignores=NodeManager.DEFAULT_IGNORES) + syncer = ExecnetNodeControl.get_rsync( + source, ignores=NodeManager.DEFAULT_IGNORES + ) files = list(source.visit(rec=syncer.filter, fil=syncer.filter)) assert len(files) == 3 basenames = [x.basename for x in files] @@ -149,7 +156,7 @@ def test_hrsync_one_host(self, mysetup): source, dest = mysetup.source, mysetup.dest gw = execnet.makegateway("popen//chdir=%s" % dest) finished = [] - rsync = HostRSync(source) + rsync = ExecnetNodeControl.get_rsync(source) rsync.add_target_host(gw, finished=lambda: finished.append(1)) source.join("hello.py").write("world") rsync.send() @@ -272,7 +279,7 @@ def test_optimise_popen(self, testdir, mysetup, workercontroller): config = testdir.parseconfig(source) nodemanager = NodeManager(config, specs) nodemanager.setup_nodes(None) # calls .rysnc_roots() - for gwspec in nodemanager.specs: + for gwspec in nodemanager._execnet.specs: assert gwspec._samefilesystem() assert not gwspec.chdir