import fnmatch import os import py import pytest import execnet import xdist.remote from _pytest import runner # XXX load dynamically class NodeManager(object): EXIT_TIMEOUT = 10 DEFAULT_IGNORES = ['.*', '*.pyc', '*.pyo', '*~'] def __init__(self, config, specs=None, defaultchdir="pyexecnetcache"): self.config = config self._nodesready = py.std.threading.Event() self.trace = self.config.trace.get("nodemanager") self.group = execnet.Group() if specs is None: specs = self._getxspecs() self.specs = [] for spec in specs: if not isinstance(spec, execnet.XSpec): spec = execnet.XSpec(spec) if not spec.chdir and not spec.popen: spec.chdir = defaultchdir self.group.allocate_id(spec) self.specs.append(spec) self.roots = self._getrsyncdirs() self.rsyncoptions = self._getrsyncoptions() def rsync_roots(self): """ make sure that all remote gateways have the same set of roots in their current directory. """ if self.roots: # send each rsync root for root in self.roots: self.rsync(root, **self.rsyncoptions) def makegateways(self): assert not list(self.group) self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs) for spec in self.specs: gw = self.group.makegateway(spec) self.config.hook.pytest_xdist_newgateway(gateway=gw) def setup_nodes(self, putevent): self.makegateways() self.rsync_roots() self.trace("setting up nodes") for gateway in self.group: node = SlaveController(self, gateway, self.config, putevent) gateway.node = node # to keep node alive node.setup() self.trace("started node %r" % node) def teardown_nodes(self): self.group.terminate(self.EXIT_TIMEOUT) def _getxspecs(self): xspeclist = [] for xspec in self.config.getvalue("tx"): i = xspec.find("*") try: num = int(xspec[:i]) except ValueError: xspeclist.append(xspec) else: xspeclist.extend([xspec[i+1:]] * num) if not xspeclist: raise pytest.UsageError( "MISSING test execution (tx) nodes: please specify --tx") return [execnet.XSpec(x) for x in xspeclist] def _getrsyncdirs(self): for spec in self.specs: if not spec.popen or spec.chdir: break else: return [] import pytest, _pytest pytestpath = pytest.__file__.rstrip("co") pytestdir = py.path.local(_pytest.__file__).dirpath() config = self.config candidates = [py._pydir,pytestpath,pytestdir] candidates += config.option.rsyncdir rsyncroots = config.getini("rsyncdirs") if rsyncroots: candidates.extend(rsyncroots) roots = [] for root in candidates: root = py.path.local(root).realpath() if not root.check(): raise pytest.UsageError("rsyncdir doesn't exist: %r" %(root,)) if root not in roots: roots.append(root) return roots def _getrsyncoptions(self): """Get options to be passed for rsync.""" ignores = list(self.DEFAULT_IGNORES) ignores += self.config.option.rsyncignore ignores += self.config.getini("rsyncignore") return { 'ignores': ignores, 'verbose': self.config.option.verbose, } def rsync(self, source, notify=None, verbose=False, ignores=None): """ perform rsync to all remote hosts. """ rsync = HostRSync(source, verbose=verbose, ignores=ignores) seen = py.builtin.set() gateways = [] for gateway in self.group: spec = gateway.spec if spec.popen and not spec.chdir: # XXX this assumes that sources are python-packages # and that adding the basedir does not hurt gateway.remote_exec(""" import sys ; sys.path.insert(0, %r) """ % os.path.dirname(str(source))).waitclose() continue if spec not in seen: def finished(): if notify: notify("rsyncrootready", spec, source) rsync.add_target_host(gateway, finished=finished) seen.add(spec) gateways.append(gateway) if seen: self.config.hook.pytest_xdist_rsyncstart( source=source, gateways=gateways, ) rsync.send() self.config.hook.pytest_xdist_rsyncfinish( source=source, gateways=gateways, ) class HostRSync(execnet.RSync): """ RSyncer that filters out common files """ def __init__(self, sourcedir, *args, **kwargs): self._synced = {} ignores= None if 'ignores' in kwargs: ignores = kwargs.pop('ignores') self._ignores = ignores or [] super(HostRSync, self).__init__(sourcedir=sourcedir, **kwargs) def filter(self, path): path = py.path.local(path) for x in self._ignores: x = getattr(x, 'strpath', x) if fnmatch.fnmatch(path.basename, x) or fnmatch.fnmatch(path.strpath, x): return False else: return True def add_target_host(self, gateway, finished=None): remotepath = os.path.basename(self._sourcedir) super(HostRSync, self).add_target(gateway, remotepath, finishedcallback=finished, delete=True,) def _report_send_file(self, gateway, modified_rel_path): if self._verbose: path = os.path.basename(self._sourcedir) + "/" + modified_rel_path remotepath = gateway.spec.chdir py.builtin.print_('%s:%s <= %s' % (gateway.spec, remotepath, path)) def make_reltoroot(roots, args): # XXX introduce/use public API for splitting py.test args splitcode = "::" l = [] for arg in args: parts = arg.split(splitcode) fspath = py.path.local(parts[0]) for root in roots: x = fspath.relto(root) if x or fspath == root: parts[0] = root.basename + "/" + x break else: raise ValueError("arg %s not relative to an rsync root" % (arg,)) l.append(splitcode.join(parts)) return l class SlaveController(object): ENDMARK = -1 def __init__(self, nodemanager, gateway, config, putevent): self.nodemanager = nodemanager self.putevent = putevent self.gateway = gateway self.config = config self.slaveinput = {'slaveid': gateway.id} self._down = False self.log = py.log.Producer("slavectl-%s" % gateway.id) if not self.config.option.debug: py.log.setconsumer(self.log._keywords, None) def __repr__(self): return "<%s %s>" %(self.__class__.__name__, self.gateway.id,) def setup(self): self.log("setting up slave session") spec = self.gateway.spec args = self.config.args if not spec.popen or spec.chdir: args = make_reltoroot(self.nodemanager.roots, args) option_dict = vars(self.config.option) if spec.popen: name = "popen-%s" % self.gateway.id basetemp = self.config._tmpdirhandler.getbasetemp() option_dict['basetemp'] = str(basetemp.join(name)) self.config.hook.pytest_configure_node(node=self) self.channel = self.gateway.remote_exec(xdist.remote) self.channel.send((self.slaveinput, args, option_dict)) if self.putevent: self.channel.setcallback(self.process_from_remote, endmarker=self.ENDMARK) def ensure_teardown(self): if hasattr(self, 'channel'): if not self.channel.isclosed(): self.log("closing", self.channel) self.channel.close() #del self.channel if hasattr(self, 'gateway'): self.log("exiting", self.gateway) self.gateway.exit() #del self.gateway def send_runtest_some(self, indices): self.sendcommand("runtests", indices=indices) def send_runtest_all(self): self.sendcommand("runtests_all",) def shutdown(self): if not self._down: try: self.sendcommand("shutdown") except IOError: pass def sendcommand(self, name, **kwargs): """ send a named parametrized command to the other side. """ self.log("sending command %s(**%s)" % (name, kwargs)) self.channel.send((name, kwargs)) def notify_inproc(self, eventname, **kwargs): self.log("queuing %s(**%s)" % (eventname, kwargs)) self.putevent((eventname, kwargs)) def process_from_remote(self, eventcall): """ this gets called for each object we receive from the other side and if the channel closes. Note that channel callbacks run in the receiver thread of execnet gateways - we need to avoid raising exceptions or doing heavy work. """ try: if eventcall == self.ENDMARK: err = self.channel._getremoteerror() if not self._down: if not err or isinstance(err, EOFError): err = "Not properly terminated" # lost connection? self.notify_inproc("errordown", node=self, error=err) self._down = True return eventname, kwargs = eventcall if eventname in ("collectionstart"): self.log("ignoring %s(%s)" %(eventname, kwargs)) elif eventname == "slaveready": self.notify_inproc(eventname, node=self, **kwargs) elif eventname == "slavefinished": self._down = True self.slaveoutput = kwargs['slaveoutput'] self.notify_inproc("slavefinished", node=self) elif eventname == "logstart": self.notify_inproc(eventname, node=self, **kwargs) elif eventname in ("testreport", "collectreport", "teardownreport"): item_index = kwargs.pop("item_index", None) rep = unserialize_report(eventname, kwargs['data']) if item_index is not None: rep.item_index = item_index self.notify_inproc(eventname, node=self, rep=rep) elif eventname == "collectionfinish": self.notify_inproc(eventname, node=self, ids=kwargs['ids']) else: raise ValueError("unknown event: %s" %(eventname,)) except KeyboardInterrupt: # should not land in receiver-thread raise except: excinfo = py.code.ExceptionInfo() py.builtin.print_("!" * 20, excinfo) self.config.pluginmanager.notify_exception(excinfo) def unserialize_report(name, reportdict): if name == "testreport": return runner.TestReport(**reportdict) elif name == "collectreport": return runner.CollectReport(**reportdict)