diff --git a/pyfarm/agent/http/api/assign.py b/pyfarm/agent/http/api/assign.py index 99868818..7819e8fc 100644 --- a/pyfarm/agent/http/api/assign.py +++ b/pyfarm/agent/http/api/assign.py @@ -29,7 +29,7 @@ from twisted.web.server import NOT_DONE_YET from twisted.internet import reactor -from twisted.internet.defer import DeferredList +from twisted.internet.defer import DeferredList, inlineCallbacks from voluptuous import Schema, Required from pyfarm.core.enums import WorkState, AgentState @@ -62,169 +62,15 @@ class Assign(APIResource): def __init__(self, agent): self.agent = agent + self.tasks = [] - def post(self, **kwargs): - request = kwargs["request"] - request_data = kwargs["data"] - - if request_from_master(request): - config.master_contacted() - - if ("agent_id" in request_data and - request_data["agent_id"] != config["agent_id"]): - logger.error("Wrong agent_id in assignment: %s. Our id is %s", - request_data["agent_id"], config["agent_id"]) - request.setResponseCode(BAD_REQUEST) - request.write(dumps( - {"error": "You have the wrong agent. I am %s." % - config["agent_id"], - "agent_id": config["agent_id"]})) - request.finish() - return NOT_DONE_YET - - if self.agent.reannounce_lock.locked: - logger.warning("Temporarily rejecting assignment because we " - "are in the middle of a reannounce.") - request.setResponseCode(SERVICE_UNAVAILABLE) - request.write( - dumps({"error": "Agent cannot accept assignments because of a " - "reannounce in progress. Try again shortly."})) - request.finish() - return NOT_DONE_YET - - # First, get the resources we have *right now*. In some cases - # this means using the functions in pyfarm.core.sysinfo because - # entries in `config` could be slightly out of sync with the system. - memory_free = free_ram() - cpus = config["agent_cpus"] - requires_ram = request_data["job"].get("ram") - requires_cpus = request_data["job"].get("cpus") - - if self.agent.shutting_down: - logger.error("Rejecting assignment because the agent is in the " - "process of shutting down.") - request.setResponseCode(SERVICE_UNAVAILABLE) - request.write( - dumps({"error": "Agent cannot accept assignments because it is " - "shutting down"})) - request.finish() - return NOT_DONE_YET - - if "restart_requested" in config \ - and config["restart_requested"] is True: - logger.error("Rejecting assignment because of scheduled restart.") - request.setResponseCode(SERVICE_UNAVAILABLE) - request.write( - dumps({"error": "Agent cannot accept assignments because of a " - "pending restart"})) - request.finish() - return NOT_DONE_YET - - elif "agent_id" not in config: - logger.error( - "Agent has not yet connected to the master or `agent_id` " - "has not been set yet.") - request.setResponseCode(SERVICE_UNAVAILABLE) - request.write( - dumps({"error": "agent_id has not been set in the config"})) - request.finish() - return NOT_DONE_YET - - # Do we have enough ram? - elif requires_ram is not None and requires_ram > memory_free: - logger.error( - "Task %s requires %sMB of ram, this agent has %sMB free. " - "Rejecting Task %s.", - request_data["job"]["id"], requires_ram, memory_free, - request_data["job"]["id"]) - request.setResponseCode(BAD_REQUEST) - request.write( - dumps({"error": "Not enough ram", - "agent_ram": memory_free, - "requires_ram": requires_ram})) - request.finish() - - # touch the config - config["free_ram"] = memory_free - return NOT_DONE_YET - - # Do we have enough cpus (count wise)? - elif requires_cpus is not None and requires_cpus > cpus: - logger.error( - "Task %s requires %s CPUs, this agent has %s CPUs. " - "Rejecting Task %s.", - request_data["job"]["id"], requires_cpus, cpus, - request_data["job"]["id"]) - request.setResponseCode(BAD_REQUEST) - request.write( - dumps({"error": "Not enough cpus", - "agent_cpus": cpus, - "requires_cpus": requires_cpus})) - request.finish() - return NOT_DONE_YET - - # Check for double assignments - try: - current_assignments = config["current_assignments"].itervalues - except AttributeError: # pragma: no cover - current_assignments = config["current_assignments"].values - - new_task_ids = set(task["id"] for task in request_data["tasks"]) - - for assignment in current_assignments(): - existing_task_ids = set(x["id"] for x in assignment["tasks"]) - - # If the assignment is identical to one we already have - if existing_task_ids == new_task_ids: - logger.debug("Ignoring repeated assignment of the same batch") - request.setResponseCode(ACCEPTED) - request.write(dumps({"id": assignment["id"]})) - request.finish() - return NOT_DONE_YET - # If there is only a partial overlap - elif existing_task_ids & new_task_ids: - logger.error("Rejecting assignment with partial overlap with " - "existing assignment.") - unknown_task_ids = new_task_ids - existing_task_ids - request.setResponseCode(CONFLICT) - request.write(dumps( - {"error": "Partial overlap of tasks", - "rejected_task_ids": list(unknown_task_ids)})) - request.finish() - return NOT_DONE_YET - - if not config["agent_allow_sharing"]: - try: - current_jobtypes = config["jobtypes"].itervalues - except AttributeError: # pragma: no cover - current_jobtypes = config["jobtypes"].values - for jobtype in current_jobtypes(): - num_finished_tasks = (len(jobtype.finished_tasks) + - len(jobtype.failed_tasks)) - if len(jobtype.assignment["tasks"]) > num_finished_tasks: - logger.error("Rejecting an assignment that would require " - "agent sharing") - request.setResponseCode(CONFLICT) - request.write( - dumps({"error": - "Agent does not allow multiple assignments", - "rejected_task_ids": list(new_task_ids)})) - request.finish() - return NOT_DONE_YET - - assignment_uuid = uuid4() - request_data.update(id=assignment_uuid) - config["current_assignments"][assignment_uuid] = request_data - - # In all other cases we have some work to do inside of - # deferreds so we just have to respond - request.setResponseCode(ACCEPTED) - request.write(dumps({"id": assignment_uuid})) - request.finish() - logger.debug("Accepted assignment %s: %r", - assignment_uuid, request_data) - logger.info("Accept assignment from job %s with %s tasks", - request_data["job"]["title"], len(request_data["tasks"])) + @inlineCallbacks + def handle_assignment(self, request_data, assignment_uuid): + logger.debug( + "Accepting assignment %s: %r", assignment_uuid, request_data) + logger.info( + "Accept assignment from job %s with %s tasks", + request_data["job"]["title"], len(request_data["tasks"])) def assignment_failed(result, assign_id): logger.error( @@ -340,7 +186,14 @@ def error_callback(cburl, cbdata, task, failure_reason): if jobtype_id: config["jobtypes"].pop(jobtype_id, None) - def loaded_jobtype(jobtype_class, assign_id): + # Load the job type then pass the class along to the + # callback. No errback here because all the errors + # are handled internally in this case. + try: + jobtype_class = yield JobType.load(request_data) + except Exception: + pass # load_jobtype_failed + else: # TODO: report error to master if hasattr(jobtype_class, "getTraceback"): logger.error(jobtype_class.getTraceback()) @@ -361,10 +214,10 @@ def loaded_jobtype(jobtype_class, assign_id): # TODO: add callback to stop persistent process try: started_deferred, stopped_deferred = instance._start() - started_deferred.addCallback(assignment_started, assign_id) - started_deferred.addErrback(assignment_failed, assign_id) - stopped_deferred.addCallback(assignment_stopped, assign_id) - stopped_deferred.addErrback(assignment_failed, assign_id) + started_deferred.addCallback(assignment_started, assignment_uuid) + started_deferred.addErrback(assignment_failed, assignment_uuid) + stopped_deferred.addCallback(assignment_stopped, assignment_uuid) + stopped_deferred.addErrback(assignment_failed, assignment_uuid) stopped_deferred.addBoth(restart_if_necessary) stopped_deferred.addBoth( lambda *args: instance._remove_tempdirs()) @@ -372,6 +225,7 @@ def loaded_jobtype(jobtype_class, assign_id): lambda *args: instance._close_logs()) stopped_deferred.addBoth( lambda *args: instance._upload_logfile()) + except Exception as e: logger.error("Error on starting jobtype, stopping it now. " "Error was: %r. Traceback: %s", e, @@ -380,17 +234,159 @@ def loaded_jobtype(jobtype_class, assign_id): error="Error while loading jobtype: %r. " "Traceback: %s" % (e, traceback.format_exc())) - assignment = config["current_assignments"].pop(assign_id) + assignment = config["current_assignments"].pop(assignment_uuid) if "jobtype" in assignment: jobtype_id = assignment["jobtype"].pop("id", None) if jobtype_id: config["jobtypes"].pop(jobtype_id, None) - # Load the job type then pass the class along to the - # callback. No errback here because all the errors - # are handled internally in this case. - jobtype_loader = JobType.load(request_data) - jobtype_loader.addCallback(loaded_jobtype, assignment_uuid) - jobtype_loader.addErrback(load_jobtype_failed, assignment_uuid) + def post(self, **kwargs): + request_data = kwargs["data"] + + # First, get the resources we have *right now*. In some cases + # this means using the functions in pyfarm.core.sysinfo because + # entries in `config` could be slightly out of sync with the system. + memory_free = free_ram() + cpus = config["agent_cpus"] + requires_ram = request_data["job"].get("ram") + requires_cpus = request_data["job"].get("cpus") + + if request_from_master(kwargs["request"]): + config.master_contacted() + + if ("agent_id" in request_data and + request_data["agent_id"] != config["agent_id"]): + logger.error("Wrong agent_id in assignment: %s. Our id is %s", + request_data["agent_id"], config["agent_id"]) + return ( + dumps({"error": "You have the wrong agent. " + "I am %s." % config["agent_id"], + "agent_id": config["agent_id"]}), + BAD_REQUEST + ) + + elif self.agent.reannounce_lock.locked: + logger.warning("Temporarily rejecting assignment because we " + "are in the middle of a reannounce.") + return ( + dumps({"error": "Agent cannot accept assignments because of a " + "reannounce in progress. Try again shortly."}), + SERVICE_UNAVAILABLE + ) + + elif self.agent.shutting_down: + logger.error("Rejecting assignment because the agent is in the " + "process of shutting down.") + return ( + dumps({"error": "Agent cannot accept assignments because it is " + "shutting down."}), + SERVICE_UNAVAILABLE + ) + + elif "restart_requested" in config \ + and config["restart_requested"] is True: + logger.error("Rejecting assignment because of scheduled restart.") + return ( + dumps({"error": "Agent cannot accept assignments because of a " + "pending restart."}), + SERVICE_UNAVAILABLE + ) + + elif "agent_id" not in config: + logger.error( + "Agent has not yet connected to the master or `agent_id` " + "has not been set yet.") + return ( + dumps({"error": "agent_id has not been set in the config"}), + SERVICE_UNAVAILABLE + ) + + # Do we have enough ram? + elif requires_ram is not None and requires_ram > memory_free: + logger.error( + "Task %s requires %sMB of ram, this agent has %sMB free. " + "Rejecting Task %s.", + request_data["job"]["id"], requires_ram, memory_free, + request_data["job"]["id"]) + config["free_ram"] = memory_free + return ( + dumps({"error": "Not enough ram", + "agent_ram": memory_free, + "requires_ram": requires_ram}), + BAD_REQUEST + ) + + # Do we have enough cpus (count wise)? + elif requires_cpus is not None and requires_cpus > cpus: + logger.error( + "Task %s requires %s CPUs, this agent has %s CPUs. " + "Rejecting Task %s.", + request_data["job"]["id"], requires_cpus, cpus, + request_data["job"]["id"]) + return ( + dumps({"error": "Not enough cpus", + "agent_cpus": cpus, + "requires_cpus": requires_cpus}), + BAD_REQUEST + ) + + # Check for double assignments + try: + current_assignments = config["current_assignments"].itervalues + except AttributeError: # pragma: no cover + current_assignments = config["current_assignments"].values + + new_task_ids = set(task["id"] for task in request_data["tasks"]) + + for assignment in current_assignments(): + existing_task_ids = set(x["id"] for x in assignment["tasks"]) + + # If the assignment is identical to one we already have + if existing_task_ids == new_task_ids: + logger.debug( + "Ignoring repeated assignment of the same batch") + return dumps({"id": assignment["id"]}), ACCEPTED + + # If there is only a partial overlap + elif existing_task_ids & new_task_ids: + logger.error("Rejecting assignment with partial overlap with " + "existing assignment.") + unknown_task_ids = new_task_ids - existing_task_ids + return ( + dumps({"error": "Partial overlap of tasks", + "rejected_task_ids": list(unknown_task_ids)}), + CONFLICT + ) + + if not config["agent_allow_sharing"]: + try: + current_jobtypes = config["jobtypes"].itervalues + except AttributeError: # pragma: no cover + current_jobtypes = config["jobtypes"].values + + for jobtype in current_jobtypes(): + num_finished_tasks = (len(jobtype.finished_tasks) + + len(jobtype.failed_tasks)) + if len(jobtype.assignment["tasks"]) > num_finished_tasks: + logger.error("Rejecting an assignment that would require " + "agent sharing") + return ( + dumps({ + "error": "Agent does not allow multiple " + "assignments", + "rejected_task_ids": list(new_task_ids)}), + CONFLICT + ) + + assignment_uuid = uuid4() + request_data.update(id=assignment_uuid) + config["current_assignments"][assignment_uuid] = request_data + + # Schedule the method to handle the assignment itself + task = reactor.callLater( + 0, self.handle_assignment, assignment_uuid, request_data) + self.tasks.append(task) + + # Let the client know we're going to handle the assignment. + return dumps({"id": assignment_uuid}), ACCEPTED - return NOT_DONE_YET diff --git a/pyfarm/jobtypes/core/internals.py b/pyfarm/jobtypes/core/internals.py index cfcde1d0..b32caedb 100644 --- a/pyfarm/jobtypes/core/internals.py +++ b/pyfarm/jobtypes/core/internals.py @@ -23,13 +23,14 @@ """ import imp +import json import os import sys import tempfile import time from collections import namedtuple -from errno import EEXIST from datetime import datetime +from errno import EEXIST, EPERM, ENOSPC, EACCES, EAGAIN, EINTR, ENXIO from os.path import dirname, join, isfile, basename from uuid import UUID from functools import partial @@ -44,17 +45,26 @@ except ImportError: # pragma: no cover grp = NotImplemented +try: + WindowsError +except NameError: # pragma: no cover + WindowsError = OSError + try: from httplib import ( - OK, INTERNAL_SERVER_ERROR, CREATED, ACCEPTED, CONFLICT) + OK, INTERNAL_SERVER_ERROR, CREATED, ACCEPTED, CONFLICT, NOT_FOUND, + BAD_REQUEST) except ImportError: # pragma: no cover from http.client import ( - OK, INTERNAL_SERVER_ERROR, CREATED, ACCEPTED, CONFLICT) + OK, INTERNAL_SERVER_ERROR, CREATED, ACCEPTED, CONFLICT, NOT_FOUND, + BAD_REQUEST) from psutil import disk_usage -from twisted.internet import reactor, threads -from twisted.internet.defer import Deferred, DeferredList, succeed +from twisted.internet import reactor +from twisted.internet.threads import deferToThread +from twisted.internet.defer import ( + Deferred, DeferredList, succeed, inlineCallbacks, returnValue) from twisted.web._newclient import ( ResponseNeverReceived, RequestTransmissionFailed) @@ -63,7 +73,7 @@ from pyfarm.core.enums import WINDOWS, INTEGER_TYPES, STRING_TYPES, WorkState from pyfarm.agent.config import config from pyfarm.agent.logger import getLogger -from pyfarm.agent.http.core.client import get, post, http_retry_delay +from pyfarm.agent.http.core.client import get_direct, post, http_retry_delay from pyfarm.agent.utility import remove_file, remove_directory from pyfarm.jobtypes.core.log import STDOUT, STDERR, logpool from pyfarm.jobtypes.core.process import ReplaceEnvironment, ProcessProtocol @@ -83,208 +93,317 @@ class InsufficientSpaceError(Exception): pass -class Cache(object): - """Internal methods for caching job types""" - cache = {} - JOBTYPE_VERSION_URL = \ - "%(master_api)s/jobtypes/%(name)s/versions/%(version)s" - CACHE_DIRECTORY = config.get("jobtype_cache_directory", "") +class JobTypeDownloadError(Exception): + """ + Raised when there is some problem download a job type. + """ - if not CACHE_DIRECTORY: # pragma: no cover - CACHE_DIRECTORY = None # make sure it's None - logger.warning("Job type cache directory has been disabled.") - else: - try: - os.makedirs(CACHE_DIRECTORY) +class JobTypeNotFound(JobTypeDownloadError): + """ + Raised when we fail to download a job type because + it no longer exists on the master. + """ + def __init__(self, name, version): + super(JobTypeNotFound, self).__init__( + "Job type %s v%s not found" % (name, version)) + self.name = name + self.version = version - except OSError as e: # pragma: no cover - if e.errno != EEXIST: - logger.error( - "Failed to create %r. Job type caching is " - "now disabled.", CACHE_DIRECTORY) - raise + +class JobTypeLoader(object): + """Class for retrieval, loading and caching of job types.""" + API_URL = "{master_api}/jobtypes/{name}/versions/{version}" + MODULE_NAME = "{name}_{version}_{classname}_{id}" + + def __init__(self): + if not config["jobtype_enable_cache"]: + cache_directory = None else: - logger.info("Created job type cache directory %r", CACHE_DIRECTORY) + try: + cache_directory = config["jobtype_cache_directory"] - logger.debug("Job type cache directory is %r", CACHE_DIRECTORY) + # Include the farm name in the config path as well. This + # should prevent the wrong cache from being used if an agent + # switches farms. + farm_name = config.get("farm_name") + if farm_name is not None and farm_name: + cache_directory = join(cache_directory, farm_name) - @classmethod - def _download_jobtype(cls, name, version): - """ - Downloads the job type specified in ``assignment``. This - method will pass the response it receives to :meth:`_cache_jobtype` - however failures will be retried. - """ - logger.debug("Downloading job type %r version %s", name, version) - url = str(cls.JOBTYPE_VERSION_URL % { - "master_api": config["master_api"], - "name": name, "version": version}) - - result = Deferred() - download = lambda *_: \ - get(url, - callback=result.callback, - errback=lambda *_: reactor.callLater(http_retry_delay(), - download)) - download() - return result + except KeyError: + cache_directory = None - @classmethod - def _cache_filepath(cls, cache_key, classname, version): - return str(join( - cls.CACHE_DIRECTORY, "%s_%s_v%s.py" % ( - cache_key, classname, version))) + if not cache_directory or ( + isinstance(cache_directory, STRING_TYPES) + and not cache_directory.strip()): + logger.warning("Cache directory is blank, disabling cache.") + cache_directory = None - @classmethod - def _cache_key(cls, assignment): - return assignment["jobtype"]["name"], assignment["jobtype"]["version"] + if cache_directory is not None: + try: + os.makedirs(cache_directory) + except OSError as e: # pragma: no cover + if e.errno != EEXIST: + logger.error( + "Failed to create %r. Job type caching is " + "now disabled.", cache_directory) + raise + + self.cache_directory = cache_directory @classmethod - def _jobtype_download_complete(cls, response, cache_key): - # Server is offline or experiencing issues right - # now so we should retry the request. - if response.code >= INTERNAL_SERVER_ERROR: - logger.debug( - "Could not download jobtype because of internal server error.") - return reactor.callLater( - http_retry_delay(), - response.request.retry) - - downloaded_data = response.json() + @inlineCallbacks + def create_module(cls, job_type): + """ + Creates a module for the given ``job_type``. This module will + contain the loaded job type class. - if not config["jobtype_enable_cache"]: - logger.debug("Jobtype cache is disabled, loading the jobtype.") - return cls._load_jobtype(downloaded_data, None) + :param dict job_type: + The source code to compile to a code object - else: - # When the download is complete, cache the results - logger.debug("Caching the jobtype locally.") - caching = cls._cache_jobtype(cache_key, downloaded_data) - caching.addCallback( - lambda result: cls._load_jobtype(*result)) - return caching + :raise TypeError: + Raised when ``job_type`` is not a dictionary. + + :returns: + Returns a module like object with the job type. + """ + if not isinstance(job_type, dict): + raise TypeError("Expected dictionary instance for `job_type`") + + module_name = cls.MODULE_NAME.format( + name=job_type["name"], version=job_type["version"], + classname=job_type["classname"], id=os.urandom(8).encode("hex") + ) + module = imp.new_module(module_name) + compiled = \ + yield deferToThread(compile, job_type["code"], "", "exec") + exec compiled in module.__dict__ + returnValue(module) + + def cache_path(self, name, version): + """ + Returns the path to create a cache file for the given job type + name and version. This function will return None if caching is + currently disabled. + + :param str name: + Name of the job type to cache + :param str version: + Version of the job type to cache + """ + if self.cache_directory: + return join( + self.cache_directory, + "{name}_{version}.json".format(name=name, version=version)) + @inlineCallbacks + def load(self, name, version): + """ + Main method used by a job type to load the job type class. Internally + this handles retrieval of the job type either from the cache or from + the master. This will also cache the job type we retrieve from the + master if caching is enabled. - @classmethod - def _cache_jobtype(cls, cache_key, jobtype): + :param str name: + The name of the job type to load and return. + + :param str version: + The version of the job type to load and return. """ - Once the job type is downloaded this classmethod is called - to store it on disk. In the rare even that we fail to write it - to disk, we store it in memory instead. + job_type = yield self.cached_data(name, version) + + # Caching may be disabled if source_code is still None + if job_type is None: + job_type = yield self.download_source(name, version) + yield self.write_cache(job_type) + + module = yield self.create_module(job_type) + + # TODO: this is a bit of an odd case. The old code didn't implement + # this but the classname field is nullable so we need to handle it + if job_type["classname"] is None: + classname = None # FIXME + raise NotImplementedError( + "FIXME: `classname` missing not implemented") + + elif not hasattr(module, job_type["classname"]): + raise AttributeError( + "No such attribute {classname} in job " + "type {name} v{version}".format( + classname=job_type["classname"], + name=job_type["name"], + version=job_type["version"])) + else: + classname = job_type["classname"] + + returnValue(getattr(module, classname)) + + @inlineCallbacks + def download_source(self, name, version): """ - if isinstance(cache_key, tuple): - cache_key = cache_key[0] + Downloads and returns the source code for the given name and version + of a job type. - assert isinstance(cache_key, STRING_TYPES) - assert isinstance(jobtype, dict) - filename = cls._cache_filepath( - cache_key, jobtype["classname"], jobtype["version"]) - success = Deferred() - jobtype = jobtype.copy() + :param str name: + The name of the job type to download the source code for. - def write_to_disk(filename): - parent_dir = dirname(filename) + :param str version: + The version of the job type to download the source code for. + """ + url = self.API_URL.format( + master_api=config["master_api"], name=name, version=version + ) + logger.debug("Downloading %s", url) + + while True: try: - os.makedirs(parent_dir) - except (IOError, OSError) as e: # pragma: no cover - if e.errno != EEXIST: - logger.error("Failed to create %s: %s", parent_dir, e) + response = yield get_direct(url) + except Exception as error: + logger.error( + "Failed to download %s: %s. Request will be retried.", + url, error) + delay = Deferred() + reactor.callLater(http_retry_delay(), delay.callback, None) + yield delay else: - logger.debug("Created %s", parent_dir) + if response.code == OK: + data = yield treq.json_content(response) + returnValue(data) - if isfile(filename): # pragma: no cover - logcache.debug("%s is already cached on disk", filename) - jobtype.pop("code", None) - return jobtype, filename + elif response.code == NOT_FOUND: + raise JobTypeNotFound(name, version) - try: - with open(filename, "w") as stream: - stream.write(jobtype["code"]) + # Server is offline or experiencing issues right + # now so we should retry the request. + elif response.code >= INTERNAL_SERVER_ERROR: + logger.debug( + "Could not download job type, response %r while " + "downloading %s. Request will be retried.", + response.code, url + ) + delay = Deferred() + reactor.callLater(http_retry_delay(), delay.callback, None) + yield delay + + # If we got a bad request coming back from the + # master we've done something wrong. Don't + # retry the request because we shouldn't + # expect the response code to change with + # a retry. + elif response.code >= BAD_REQUEST: + raise JobTypeDownloadError( + "HTTP %s error when downloading %s" % ( + response.code, url)) - # If the above fails, use a temp file instead - except (IOError, OSError) as e: # pragma: no cover - fd, tmpfilepath = tempfile.mkstemp(suffix=".py") - logcache.warning( - "Failed to write %s, using %s instead: %s", - filename, tmpfilepath, e) + else: + logger.warning( + "Unknown response %r while downloading %s. Request " + "will be retried.", response.code, url) + delay = Deferred() + reactor.callLater(http_retry_delay(), delay.callback, None) + yield delay + + @inlineCallbacks + def cached_data(self, name, version): + """ + Searches for and returns the cached data for the given name and + version of a job type. - with os.fdopen(fd, "w") as stream: - stream.write(jobtype["code"]) + :param str name: + The name of the job type to return the cached entry for. - jobtype.pop("code", None) - return jobtype, tmpfilepath + :param str version: + The version of the job type to return the cached entry for. - else: - logger.debug( - "Wrote job type %s version %s to %s", - jobtype["name"], jobtype["version"], filename) - jobtype.pop("code", None) - return jobtype, filename - - def written_to_disk(results): - jobtype, filename = results - cls.cache[cache_key] = (jobtype, filename) - logcache.info("Created cache for %r at %s", cache_key, filename) - success.callback((jobtype, filename)) - - def failed_to_write_to_disk(error): # pragma: no cover - logcache.error( - "Failed to write job type cache to disk, will use " - "memory instead: %s", error) - - # The code exists in the job type because it's - # only removed on success. - cls.cache[cache_key] = (jobtype, None) - success.callback((jobtype, None)) - - # Defer the write process to a thread so we don't - # block the reactor if the write is slow - logger.debug( - "Caching job type %s version %s to %s", - jobtype["classname"], jobtype.get("version", "?"), filename) - writer = threads.deferToThread(write_to_disk, filename) - writer.addCallbacks(written_to_disk, failed_to_write_to_disk) - return success + :returns: + Returns the cached job type dictionary response from disk. May + also return None if the cache does not exist or caching is disabled. + """ + cache_path = self.cache_path(name, version) - @classmethod - def _module_for_jobtype(cls, jobtype): - return "pyfarm.jobtypes.cached.%s%s%s" % ( - jobtype["classname"], jobtype["name"], jobtype["version"]) + if not self.cache_directory or not isfile(cache_path): + returnValue(None) + + stream = yield deferToThread(open, cache_path, "rb") + try: + data = yield deferToThread(json.load, stream) + finally: + stream.close() + returnValue(data) + + @inlineCallbacks + def write_cache(self, job_type): + """ + Writes the given ``job_type`` to the disk cache. + + :param dict job_type: + A dictionary containing information about the job type including + it's name, version, classname, etc + + :raises TypeError: + Raised if ``job_type`` is not a dictionary. + + :returns: + This function does not return anything. + """ + if not self.cache_directory: + returnValue(None) + + if not isinstance(job_type, dict): + raise TypeError("Expected dictionary instance for `job_type`") + + error = False + cache_path = self.cache_path(job_type["name"], job_type["version"]) + output = None + try: + output = yield deferToThread(open, cache_path, "wb") + + # Write the json data to disk in a consistent minified format. + yield deferToThread( + json.dump, job_type, output, + separators=(",", ":"), sort_keys=True) + + # Catch some high level file system errors. In some cases we + # ignore the problem because we can still load the job type, + # we just can't cache it. + except (OSError, IOError, WindowsError) as error: + if error.errno in (EPERM, EACCES): + logger.warning( + "Cannot write cache to %s. " + "Permission error.", cache_path) + returnValue(None) + + elif error.errno in (EAGAIN, EINTR): + logger.warning( + "Will not cache to %s. System call failed or Python was " + "told to try again.", cache_path) + + elif error.errno == ENOSPC: + logger.warning( + "Cannot write cache to %s. " + "No space left on device.", cache_path) + returnValue(None) + + elif error.errno == ENXIO: + logger.warning( + "Cannot write cache to %s. No such device.", cache_path) + returnValue(None) - @classmethod - def _load_jobtype(cls, jobtype, filepath): - def load_jobtype(jobtype_data, path): - module_name = cls._module_for_jobtype(jobtype_data) - - # Create or load the module - if filepath is not None: - logger.debug("Attempting to load module from file path %s", - filepath) - try: - module = imp.load_source(module_name, path) - except Exception as e: - type = sys.exc_info()[0] - value = sys.exc_info()[1] - logger.error("Importing module from jobtype file failed: " - "%s, value: %s", type, value) - raise else: - logcache.warning( - "Loading (%s, %s) directly from memory", - jobtype_data["name"], jobtype_data["version"]) + raise - module = imp.new_module(module_name) - exec jobtype_data["code"] in module.__dict__ - sys.modules[module_name] = module + except Exception as error: + logger.error( + "Failed to write %r v%s to %s: %s", + job_type["name"], job_type["version"], error) + raise - logger.debug("Returning class %s from module", - jobtype_data["classname"]) - return getattr(module, jobtype_data["classname"]) + finally: + if output is not None: + output.close() - # Load the job type itself in a thread so we limit disk I/O - # and other blocking issues in the main thread. - return threads.deferToThread(load_jobtype, jobtype, filepath) + if error: + remove_file(output.name, raise_=False) class Process(object): diff --git a/pyfarm/jobtypes/core/jobtype.py b/pyfarm/jobtypes/core/jobtype.py index 914333cd..6de56a55 100644 --- a/pyfarm/jobtypes/core/jobtype.py +++ b/pyfarm/jobtypes/core/jobtype.py @@ -44,7 +44,6 @@ import treq from twisted.internet import reactor -from twisted.internet.defer import Deferred from twisted.internet.error import ProcessDone, ProcessTerminated from twisted.python.failure import Failure from twisted.internet.defer import inlineCallbacks, Deferred, returnValue @@ -55,14 +54,14 @@ from pyfarm.core.enums import INTEGER_TYPES, STRING_TYPES, WorkState, WINDOWS from pyfarm.core.utility import ImmutableDict from pyfarm.agent.config import config -from pyfarm.agent.http.core.client import post, http_retry_delay, post_direct +from pyfarm.agent.http.core.client import http_retry_delay, post_direct from pyfarm.agent.logger import getLogger from pyfarm.agent.sysinfo import memory, system from pyfarm.agent.sysinfo.user import is_administrator, username from pyfarm.agent.utility import ( TASKS_SCHEMA, JOBTYPE_SCHEMA, JOB_SCHEMA, validate_uuid) from pyfarm.jobtypes.core.internals import ( - USER_GROUP_TYPES, Cache, Process, TypeChecks, System, pwd, grp) + USER_GROUP_TYPES, JobTypeLoader, Process, TypeChecks, System, pwd, grp) from pyfarm.jobtypes.core.log import STDOUT, STDERR, logpool from pyfarm.jobtypes.core.process import ProcessProtocol @@ -206,7 +205,7 @@ def set_default_environment(self, value): self.env = value -class JobType(Cache, System, Process, TypeChecks): +class JobType(System, Process, TypeChecks): """ Base class for all other job types. This class is intended to abstract away many of the asynchronous necessary to run @@ -254,6 +253,7 @@ class JobType(Cache, System, Process, TypeChecks): This is analogous to ``finished_tasks`` except it contains failed tasks only. """ + LOADER = JobTypeLoader() PERSISTENT_JOB_DATA = {} COMMAND_DATA = CommandData PROCESS_PROTOCOL = ProcessProtocol @@ -311,6 +311,7 @@ def __repr__(self): self.assignment["job"]["title"]) @classmethod + @inlineCallbacks def load(cls, assignment): """ Given an assignment this class method will load the job type either @@ -322,20 +323,11 @@ def load(cls, assignment): that the internal data is correct. """ cls.ASSIGNMENT_SCHEMA(assignment) - - cache_key = cls._cache_key(assignment) - logger.debug("Cache key for assignment is %s", cache_key) - - if config["jobtype_enable_cache"] or cache_key not in cls.cache: - logger.debug("Jobtype not in cache or cache disabled") - download = cls._download_jobtype( - assignment["jobtype"]["name"], - assignment["jobtype"]["version"]) - download.addCallback(cls._jobtype_download_complete, cache_key) - return download - else: - logger.debug("Caching jobtype") - return cls._load_jobtype(cls.cache[cache_key], None) + jobtype = yield cls.LOADER.load( + assignment["jobtype"]["name"], + assignment["jobtype"]["version"] + ) + returnValue(jobtype) @classmethod def prepare_for_job(cls, job): diff --git a/tests/test_agent/test_http_api_assign.py b/tests/test_agent/test_http_api_assign.py index 6bccc1e6..b260852b 100644 --- a/tests/test_agent/test_http_api_assign.py +++ b/tests/test_agent/test_http_api_assign.py @@ -14,26 +14,27 @@ # See the License for the specific language governing permissions and # limitations under the License. -from json import loads +from json import loads, dumps from os import urandom from random import randint from uuid import UUID try: - from httplib import ACCEPTED, BAD_REQUEST, CONFLICT, SERVICE_UNAVAILABLE + from httplib import ACCEPTED, BAD_REQUEST, CONFLICT, SERVICE_UNAVAILABLE, OK except ImportError: # pragma: no cover - from http.client import ACCEPTED, BAD_REQUEST, CONFLICT, SERVICE_UNAVAILABLE + from http.client import ( + ACCEPTED, BAD_REQUEST, CONFLICT, SERVICE_UNAVAILABLE, OK) from twisted.web.server import NOT_DONE_YET -from twisted.internet.defer import DeferredLock +from twisted.internet.defer import DeferredLock, inlineCallbacks from pyfarm.agent.config import config from pyfarm.agent.http.api.assign import Assign from pyfarm.agent.sysinfo.memory import total_ram from pyfarm.agent.sysinfo.cpu import total_cpus -from pyfarm.agent.testutil import BaseAPITestCase -from pyfarm.jobtypes.core.jobtype import JobType +from pyfarm.agent.testutil import BaseAPITestCase, APITestServer +from pyfarm.jobtypes.core.internals import JobTypeLoader FAKE_JOBTYPE = """ from twisted.internet.defer import Deferred @@ -124,7 +125,7 @@ def test_restarting(self): self.assertEqual(len(request.written), 1) self.assertEqual( loads(request.written[0])["error"], - "Agent cannot accept assignments because of a pending restart") + u"Agent cannot accept assignments because of a pending restart.") def test_agent_id_not_set(self): config.pop("agent_id", None) @@ -196,50 +197,54 @@ def test_duplicate_task(self): def test_accepted(self): # Cache the fake job type and make sure the config # turns off caching - jobtype = { + job_type = { "classname": "FakeJobType", "code": FAKE_JOBTYPE, "name": self.data["jobtype"]["name"], "version": self.data["jobtype"]["version"]} - JobType.cache[(self.data["jobtype"]["name"], - self.data["jobtype"]["version"])] = (jobtype, None) - config.update( - jobtype_enable_cache=False, - current_assignments={}) - request = self.post( - data=self.data, - headers={"User-Agent": config["master_user_agent"]}) - assign = self.instance_class() - result = assign.render(request) - self.assertEqual(result, NOT_DONE_YET) - self.assertTrue(request.finished) - self.assertEqual(request.responseCode, ACCEPTED) - self.assertEqual(len(request.written), 1) - response_id = UUID(loads(request.written[0])["id"]) - self.assertIn(response_id, config["current_assignments"]) - - # An assignment uuid has been added - test_data = self.data.copy() - current_assignment = config["current_assignments"][response_id].copy() - - # Update the original test data with the new assignment data - # and make sure it matches - test_data.update(id=response_id) - # TODO: The jobtype instance is created asynchronously in a deferred, so - # checking its behaviour from this test is quite hairy. Find a better - # solution than simply not testing it if it's not there yet - if "id" in current_assignment["jobtype"]: - test_data["jobtype"].update(id=current_assignment["jobtype"]["id"]) - self.assertEqual(current_assignment, test_data) - if "id" in current_assignment["jobtype"]: - self.assertIn(current_assignment["jobtype"]["id"], config["jobtypes"]) - - # Now trigger the started callback so we can make sure the job - # type gets removed - jobtype = config["jobtypes"][current_assignment["jobtype"]["id"]] - jobtype.fake_started.callback(None) - jobtype.fake_stopped.callback(None) - self.assertNotIn(response_id, config["current_assignments"]) + config.update(jobtype_enable_cache=False, current_assignments={}) + + url = "/jobtypes/{name}/versions/{version}".format( + name=job_type["name"], version=job_type["version"]) + with APITestServer(url, code=OK, response=dumps(job_type)): + request = self.post( + data=self.data, + headers={"User-Agent": config["master_user_agent"]}) + + assign = self.instance_class() + result = assign.render(request) + + self.assertEqual(result, NOT_DONE_YET) + self.assertTrue(request.finished) + self.assertEqual(request.responseCode, ACCEPTED) + self.assertEqual(len(request.written), 1) + response_id = UUID(loads(request.written[0])["id"]) + self.assertIn(response_id, config["current_assignments"]) + + # An assignment uuid has been added + test_data = self.data.copy() + current_assignment = config["current_assignments"][response_id].copy() + + # Update the original test data with the new assignment data + # and make sure it matches + test_data.update(id=response_id) + + # TODO: The jobtype instance is created asynchronously in a + # deferred, so checking its behaviour from this test is quite + # hairy. Find a better solution than simply not testing it if it's + # not there yet + if "id" in current_assignment["jobtype"]: + test_data["jobtype"].update(id=current_assignment["jobtype"]["id"]) + self.assertEqual(current_assignment, test_data) + if "id" in current_assignment["jobtype"]: + self.assertIn(current_assignment["jobtype"]["id"], config["jobtypes"]) + + # Now trigger the started callback so we can make sure the job + # type gets removed + job_type = config["jobtypes"][current_assignment["jobtype"]["id"]] + job_type.fake_started.callback(None) + job_type.fake_stopped.callback(None) + self.assertNotIn(response_id, config["current_assignments"]) """ def test_accepted_type_error(self): # Cache the fake job type and make sure the config diff --git a/tests/test_jobtypes/test_core_internals.py b/tests/test_jobtypes/test_core_internals.py index 578eeb3e..3c8b9317 100644 --- a/tests/test_jobtypes/test_core_internals.py +++ b/tests/test_jobtypes/test_core_internals.py @@ -19,17 +19,22 @@ import re import os import shutil +import sys import tempfile from collections import namedtuple +from contextlib import nested from datetime import timedelta -from os.path import isdir, join, isfile from errno import EEXIST +from os.path import isdir, join, isfile +from textwrap import dedent from uuid import uuid4 try: - from httplib import CREATED, OK + from httplib import ( + CREATED, OK, NOT_FOUND, INTERNAL_SERVER_ERROR, BAD_REQUEST) except ImportError: # pragma: no cover - from http.client import CREATED, OK + from http.client import ( + CREATED, OK, NOT_FOUND, INTERNAL_SERVER_ERROR, BAD_REQUEST) try: import pwd @@ -45,16 +50,19 @@ from mock import patch from twisted.internet import reactor from twisted.internet.defer import Deferred, inlineCallbacks +from twisted.internet.threads import deferToThread from pyfarm.core.enums import STRING_TYPES, LINUX, MAC, WINDOWS, BSD, WorkState -from pyfarm.agent.testutil import APITestServer -from pyfarm.agent.testutil import TestCase, skipIf from pyfarm.agent.config import config +from pyfarm.agent.http.core.client import get_direct from pyfarm.agent.sysinfo import user -from pyfarm.jobtypes.core.internals import ( - ITERABLE_CONTAINERS, InsufficientSpaceError, Cache, Process, System, - TypeChecks, pwd, grp, logger) +from pyfarm.agent.testutil import APITestServer, TestCase, skipIf from pyfarm.jobtypes.core.log import logpool +from pyfarm.jobtypes.core import internals # Used by mocks in some tests +from pyfarm.jobtypes.core.internals import ( + ITERABLE_CONTAINERS, InsufficientSpaceError, JobTypeLoader, Process, System, + TypeChecks, pwd, grp, logger, JobTypeNotFound, JobTypeDownloadError) +from pyfarm.agent.utility import remove_directory FakeExitCode = namedtuple("FakeExitCode", ("exitCode", )) FakeProcessResult = namedtuple("FakeProcessResult", ("value", )) @@ -113,50 +121,311 @@ def test_posix(self): self.assertIsNot(grp, NotImplemented) -class TestCache(TestCase): - def test_cache_directory(self): - self.assertTrue(isdir(Cache.CACHE_DIRECTORY)) +class TestJobTypeLoader(TestCase): + def setUp(self): + super(TestJobTypeLoader, self).setUp() + parent_dir = tempfile.mkdtemp() + config.update( + jobtype_enable_cache=True, + jobtype_cache_directory=parent_dir, + ) + self.addCleanup(remove_directory, parent_dir, raise_=False) + + def jobtype_dict(self, name, version, classname, code): + return { + "code": code, + "name": name, + "version": version, + "classname": classname + } + + # + # Tests for JobTypeLoader.cache_directory + # + def test_cache_directory_config_caching_disabled(self): + config["jobtype_enable_cache"] = False + loader = JobTypeLoader() + self.assertIsNone(loader.cache_directory) + + def test_cache_directory_config_cache_directory_is_blank_string(self): + config["jobtype_cache_directory"] = " " + loader = JobTypeLoader() + self.assertIsNone(loader.cache_directory) + + def test_cache_directory_config_cache_directory_not_set(self): + config.pop("jobtype_cache_directory") + loader = JobTypeLoader() + self.assertIsNone(loader.cache_directory) + + def test_cache_directory_created(self): + config.pop("farm_name", None) + cache_dir = join(config["jobtype_cache_directory"], "foo_cache") + self.assertFalse(isdir(cache_dir)) + config["jobtype_cache_directory"] = cache_dir + loader = JobTypeLoader() + self.assertEqual(loader.cache_directory, cache_dir) + self.assertTrue(isdir(cache_dir)) + + def test_cache_directory_with_farm_name_created(self): + config["farm_name"] = "main" + cache_dir = join( + config["jobtype_cache_directory"], config["farm_name"]) + self.assertFalse(isdir(cache_dir)) + loader = JobTypeLoader() + self.assertEqual(loader.cache_directory, cache_dir) + self.assertTrue(isdir(cache_dir)) + + # + # Tests for JobTypeLoader.create_module + # + @inlineCallbacks + def test_create_module_creates_functioning_class(self): + job_type = self.jobtype_dict("Foobar", 1, "foobar", dedent(""" + from pyfarm.core.enums import WorkState + class Foobar(object): + def __init__(self, value): self.value = value + def state(self): return WorkState.DONE + """).strip()) + module = yield JobTypeLoader.create_module(job_type) + foobar = module.Foobar(42) + self.assertEqual(foobar.state(), WorkState.DONE) + self.assertEqual(foobar.value, 42) + + @inlineCallbacks + def test_create_module_does_not_modify_sys_modules(self): + original_sys_modules = sys.modules.copy() + + job_type = self.jobtype_dict("Foobar", 1, "foobar", "FOO = True") + module = yield JobTypeLoader.create_module(job_type) + self.assertTrue(module.FOO) + + # Calling JobTypeLoader._compile should not cause a new module + # to be created in sys.modules that matches the new module name. + # If it does then the agent could hold onto the job type as long + # as the service is running. + for key, value in sys.modules.iteritems(): + self.assertNotEqual(key, module.__name__) + if value is not None: + self.assertNotEqual(value.__name__, module.__name__) + + # It also should not modify the overall content of sys.modules + # either. + self.assertEqual(original_sys_modules, sys.modules) + + # + # Tests for JobTypeLoader.cache_path() + # + def test_cache_path_caching_disabled(self): + loader = JobTypeLoader() + loader.cache_directory = None + self.assertIsNone(loader.cache_path("a", "b")) + + def test_cache_path_caching_enabled(self): + loader = JobTypeLoader() + self.assertEqual( + loader.cache_path("a", "b"), join(loader.cache_directory, "a_b.json")) + + # + # Tests for JobTypeLoader.load + # + @inlineCallbacks + def test_load_from_cache(self): + job_type = self.jobtype_dict("foobar", 1, "Foobar", dedent(""" + from pyfarm.core.enums import WorkState + class Foobar(object): + def __init__(self, value): self.value = value + def state(self): return WorkState.FAILED + """).strip()) + loader = JobTypeLoader() + fd, path = tempfile.mkstemp() + os.close(fd) + self.addCleanup(os.remove, path) + + with open(path, "wb") as cache_file: + cache_file.write(json.dumps(job_type)) + + with patch.object(loader, "cache_path", return_value=path): + Foobar = yield loader.load("foobar", "1") + + foobar = Foobar(43) + self.assertEqual(foobar.state(), WorkState.FAILED) + self.assertEqual(foobar.value, 43) + + @inlineCallbacks + def test_load_from_master(self): + job_type = self.jobtype_dict("foobar", 1, "Foobar", dedent(""" + from pyfarm.core.enums import WorkState + class Foobar(object): + def __init__(self, value): self.value = value + def state(self): return WorkState.PAUSED + """).strip()) + + loader = JobTypeLoader() + with APITestServer("/jobtypes/foobar/versions/3/code", code=OK, + response=json.dumps(job_type)): + Foobar = yield loader.load("foobar", "3") + + foobar = Foobar(44) + self.assertEqual(foobar.state(), WorkState.PAUSED) + self.assertEqual(foobar.value, 44) + + @inlineCallbacks + def test_load_from_master_writes_cache(self): + job_type = self.jobtype_dict("foobar", 5, "Foobar", dedent(""" + from pyfarm.core.enums import WorkState + class Foobar(object): + def __init__(self, value): self.value = value + def state(self): return WorkState.PAUSED + """).strip()) + + cache_directory = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, cache_directory) + loader = JobTypeLoader() + loader.cache_directory = cache_directory + with APITestServer("/jobtypes/foobar/versions/5/code", code=OK, + response=json.dumps(job_type)): + yield loader.load("foobar", "5") + + self.assertTrue(isfile(loader.cache_path("foobar", "5"))) + + @inlineCallbacks + def test_load_no_such_class_raises_attributeerror(self): + job_type = self.jobtype_dict("foobar", 1, "Foobar", "FOO = 1") + loader = JobTypeLoader() + with nested( + APITestServer( + "/jobtypes/foobar/versions/6/code", code=OK, + response=json.dumps(job_type)), + self.assertRaises(AttributeError) + ): + yield loader.load("foobar", "6") + # + # Tests for JobTypeLoader.download_source + # @inlineCallbacks - def test_download(self): - classname = "AgentUnittest" + os.urandom(8).encode("hex") - cache = Cache() + def test_download_source_exception_causes_retry(self): + class GetDirect(object): + def __init__(self): + self.hits = 0 + + def __call__(self, *args, **kwargs): + self.hits += 1 + if self.hits < 2: + raise Exception("Fail!") + return get_direct(*args, **kwargs) + + loader = JobTypeLoader() + alt_get_direct = GetDirect() + job_type = self.jobtype_dict("foo", 1, "Foobar", dedent(""" + from pyfarm.core.enums import WorkState + class Foobar(object): + def __init__(self, value): self.value = value + def state(self): return WorkState.FAILED + """).strip()) + with nested( + APITestServer( + "/jobtypes/foo/versions/1/code", code=OK, + response=json.dumps(job_type)), + patch.object(internals, "get_direct", alt_get_direct) + ): + response = yield loader.download_source("foo", "1") + # If the request was retried then alt_get_direct.hits should + # have been incremented a couple of times. + self.assertGreaterEqual(alt_get_direct.hits, 1) + self.assertEqual(response, json.loads(json.dumps(job_type))) + + @inlineCallbacks + def test_download_source_ok(self): + job_type = self.jobtype_dict("foo", 1, "Foobar", dedent(""" + from pyfarm.core.enums import WorkState + class Foobar(object): + def __init__(self, value): self.value = value + def state(self): return WorkState.FAILED + """).strip()) + loader = JobTypeLoader() with APITestServer( - "/jobtypes/%s/versions/1" % classname, - headers={"Content-Type": "application/json"}, - code=OK, response=json.dumps({"data": "This is a job type"}) + "/jobtypes/foo/versions/1/code", code=OK, + response=json.dumps(job_type)): + response = yield loader.download_source("foo", "1") + + self.assertEqual(response, json.loads(json.dumps(job_type))) + + @inlineCallbacks + def test_download_source_not_found(self): + loader = JobTypeLoader() + with nested( + APITestServer("/jobtypes/foo/versions/1/code", code=NOT_FOUND), + self.assertRaises(JobTypeNotFound) ): - response = yield cache._download_jobtype(classname, 1) - self.assertEqual(response.code, OK) - data = response.json() - self.assertEqual(data.get("data"), "This is a job type") + yield loader.download_source("foo", "1") - def test_filename(self): - cache = Cache() - self.assertEqual( - cache._cache_filepath("foobar", "someclass", 1), - str(join( - Cache.CACHE_DIRECTORY, "foobar_someclass_v1.py"))) - - def test_cache(self): - cache = Cache() - classname = "Test%s" % os.urandom(8).encode("hex") - version = 1 - code = os.urandom(8).encode("hex") - cache_key = "Key%s" % classname - filepath = cache._cache_filepath(cache_key, classname, version) - jobtype = {"classname": classname, "code": code, "version": version} - - def written(data): - self.assertEqual(data[0]["classname"], classname) - if data[1] is not None: - self.assertEqual(data[1], filepath) - self.assertTrue(isfile(filepath)) - - cached = cache._cache_jobtype(cache_key, jobtype) - cached.addCallback(written) - return cached + @inlineCallbacks + def test_download_source_internal_server_error_retries(self): + job_type = self.jobtype_dict("foo", 1, "Foobar", dedent(""" + from pyfarm.core.enums import WorkState + class Foobar(object): + def __init__(self, value): self.value = value + def state(self): return WorkState.FAILED + """).strip()) + loader = JobTypeLoader() + with nested( + APITestServer("/jobtypes/foo/versions/1/code", + code=INTERNAL_SERVER_ERROR, + response=json.dumps(job_type)), + patch.object(logger, "debug") + ) as (server, mock_debug): + reactor.callLater(1, setattr, server.resource, "code", OK) + response = yield loader.download_source("foo", "1") + + self.assertEqual(response, json.loads(json.dumps(job_type))) + + # The logger will let us know if the request is being retried + # in this case. From outside the download_source call, it's + # transparent that a retry is occurring. + for mock_call in mock_debug.mock_calls: + if "Request will be retried." in mock_call[1][0]: + break + else: + self.fail("'Request will be retried' never logged") + + @inlineCallbacks + def test_download_source_bad_request(self): + loader = JobTypeLoader() + with nested( + APITestServer("/jobtypes/foo/versions/1/code", code=BAD_REQUEST), + self.assertRaises(JobTypeDownloadError) + ): + yield loader.download_source("foo", "1") + + @inlineCallbacks + def test_download_source_other_error_retries(self): + job_type = self.jobtype_dict("foo", 1, "Foobar", dedent(""" + from pyfarm.core.enums import WorkState + class Foobar(object): + def __init__(self, value): self.value = value + def state(self): return WorkState.FAILED + """).strip()) + loader = JobTypeLoader() + with nested( + APITestServer("/jobtypes/foo/versions/1/code", + code=799, response=json.dumps(job_type)), + patch.object(logger, "debug") + ) as (server, mock_debug): + reactor.callLater(1, setattr, server.resource, "code", OK) + response = yield loader.download_source("foo", "1") + + self.assertEqual(response, json.loads(json.dumps(job_type))) + + # The logger will let us know if the request is being retried + # in this case. From outside the download_source call, it's + # transparent that a retry is occurring. + for mock_call in mock_debug.mock_calls: + if "Request will be retried." in mock_call[1][0]: + break + else: + self.fail("'Request will be retried' never logged") class TestProcess(TestCase): diff --git a/tests/test_jobtypes/test_core_jobtype.py b/tests/test_jobtypes/test_core_jobtype.py index 08082c08..fcb01982 100644 --- a/tests/test_jobtypes/test_core_jobtype.py +++ b/tests/test_jobtypes/test_core_jobtype.py @@ -18,6 +18,7 @@ import re from uuid import UUID, uuid4 +from twisted.internet.defer import inlineCallbacks from voluptuous import Schema, MultipleInvalid from pyfarm.core.utility import ImmutableDict @@ -208,6 +209,7 @@ def test_set_default_environment(self): class TestJobTypeLoad(TestCase): + @inlineCallbacks def test_schema(self): with self.assertRaises(MultipleInvalid): - JobType.load({}) + yield JobType.load({})