Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
95a4054
starting intitial implementation of JobTypeLoader
opalmer Sep 27, 2015
12ba1d6
adding some missing doc strings
opalmer Sep 27, 2015
d878a6f
removing TODO assertion, fixing statement in __init__
opalmer Sep 27, 2015
9096d0f
tests for cache_path(), download_source() and cache_directory
opalmer Sep 27, 2015
a15c62c
include farm_name in cache_directory path if set
opalmer Sep 29, 2015
f47e3b2
Merge branch 'master' of github.com:pyfarm/pyfarm-agent into jobtype_…
opalmer Sep 29, 2015
e8a49c6
check for None in farm_name
opalmer Sep 29, 2015
c4f736f
add cache_directory test for farm_name
opalmer Sep 29, 2015
702b46b
adding tests for compile_()
opalmer Sep 29, 2015
5ae3597
added _compile() static method, integrated it into load()
opalmer Sep 29, 2015
b5fea59
get code from correct url and with proper headers
opalmer Sep 30, 2015
0f3f85d
updating tests to act more like the master would
opalmer Sep 30, 2015
0853a80
tests for JobTypeLoader.load
opalmer Sep 30, 2015
62c189a
use inlineCallbacks for test_schema, otherwise JobType.load never runs
opalmer Oct 1, 2015
9753c18
refactor to use/cache job type dictionary rather than just the source…
opalmer Oct 1, 2015
5b2a53d
skip caching in a few cases rather than breaking
opalmer Oct 2, 2015
de45385
removing old code
opalmer Oct 2, 2015
5cdf641
rename _compile to create_module
opalmer Oct 2, 2015
256d5c3
updating tests to account for json response from master in JobTypeLoader
opalmer Oct 2, 2015
d70ad16
WIP update to test_accept
opalmer Oct 2, 2015
5b8cd76
starting to convert assign.py to inlineCallbacks so it's easiear to m…
opalmer Oct 7, 2015
3e1b737
fixing test_restarting
opalmer Oct 14, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 212 additions & 74 deletions pyfarm/jobtypes/core/internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,18 @@

try:
from httplib import (
OK, INTERNAL_SERVER_ERROR, CREATED, ACCEPTED, CONFLICT)
OK, INTERNAL_SERVER_ERROR, CREATED, ACCEPTED, CONFLICT, NOT_FOUND,
BAD_REQUEST)
except ImportError: # pragma: no cover
from http.client import (
OK, INTERNAL_SERVER_ERROR, CREATED, ACCEPTED, CONFLICT)
OK, INTERNAL_SERVER_ERROR, CREATED, ACCEPTED, CONFLICT, NOT_FOUND,
BAD_REQUEST)

from psutil import disk_usage

from twisted.internet import reactor, threads
from twisted.internet.defer import Deferred, DeferredList, succeed
from twisted.internet.defer import (
Deferred, DeferredList, succeed, inlineCallbacks, returnValue)
from twisted.web._newclient import (
ResponseNeverReceived, RequestTransmissionFailed)

Expand All @@ -63,7 +66,8 @@
from pyfarm.core.enums import WINDOWS, INTEGER_TYPES, STRING_TYPES, WorkState
from pyfarm.agent.config import config
from pyfarm.agent.logger import getLogger
from pyfarm.agent.http.core.client import get, post, http_retry_delay
from pyfarm.agent.http.core.client import (
get_direct, get, post, http_retry_delay)
from pyfarm.agent.utility import remove_file, remove_directory
from pyfarm.jobtypes.core.log import STDOUT, STDERR, logpool
from pyfarm.jobtypes.core.process import ReplaceEnvironment, ProcessProtocol
Expand All @@ -83,88 +87,222 @@ class InsufficientSpaceError(Exception):
pass


class Cache(object):
"""Internal methods for caching job types"""
cache = {}
JOBTYPE_VERSION_URL = \
"%(master_api)s/jobtypes/%(name)s/versions/%(version)s"
CACHE_DIRECTORY = config.get("jobtype_cache_directory", "")
class JobTypeDownloadError(Exception):
"""
Raised when there is some problem download a job type.
"""

if not CACHE_DIRECTORY: # pragma: no cover
CACHE_DIRECTORY = None # make sure it's None
logger.warning("Job type cache directory has been disabled.")

else:
try:
os.makedirs(CACHE_DIRECTORY)
class JobTypeNotFound(JobTypeDownloadError):
"""
Raised when we fail to download a job type because
it no longer exists on the master.
"""
def __init__(self, name, version):
super(JobTypeNotFound, self).__init__(
"Job type %s v%s not found" % (name, version))
self.name = name
self.version = version

except OSError as e: # pragma: no cover
if e.errno != EEXIST:
logger.error(
"Failed to create %r. Job type caching is "
"now disabled.", CACHE_DIRECTORY)
raise

class JobTypeLoader(object):
"""Class for retrieval, loading and caching of job types."""
def __init__(self):
if not config["jobtype_enable_cache"]:
cache_directory = None
else:
logger.info("Created job type cache directory %r", CACHE_DIRECTORY)
try:
cache_directory = config["jobtype_cache_directory"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we join config['farm_name'] to this, if it is set?

That way, if agents get switched between farms for some reason, they won't get the different jobtypes caches mixed up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes fixed, good catch thanks.

except KeyError:
cache_directory = None

logger.debug("Job type cache directory is %r", CACHE_DIRECTORY)
if not cache_directory or (
isinstance(cache_directory, STRING_TYPES)
and not cache_directory.strip()):
logger.warning("Cache directory is blank, disabling cache.")
cache_directory = None

@classmethod
def _download_jobtype(cls, name, version):
"""
Downloads the job type specified in ``assignment``. This
method will pass the response it receives to :meth:`_cache_jobtype`
however failures will be retried.
"""
logger.debug("Downloading job type %r version %s", name, version)
url = str(cls.JOBTYPE_VERSION_URL % {
"master_api": config["master_api"],
"name": name, "version": version})

result = Deferred()
download = lambda *_: \
get(url,
callback=result.callback,
errback=lambda *_: reactor.callLater(http_retry_delay(),
download))
download()
return result
if cache_directory is not None:
try:
os.makedirs(cache_directory)
except OSError as e: # pragma: no cover
if e.errno != EEXIST:
logger.error(
"Failed to create %r. Job type caching is "
"now disabled.", cache_directory)
raise

@classmethod
def _cache_filepath(cls, cache_key, classname, version):
return str(join(
cls.CACHE_DIRECTORY, "%s_%s_v%s.py" % (
cache_key, classname, version)))
self.cache_directory = cache_directory

@classmethod
def _cache_key(cls, assignment):
return assignment["jobtype"]["name"], assignment["jobtype"]["version"]
def cache_path(self, name, version):
"""
Returns the path to create a cache file for the given job type
name and version. This function will return None if caching is
currently disabled.

:param str name:
Name of the job type to cache
:param str version:
Version of the job type to cache
"""
if self.cache_directory:
return join(
self.cache_directory,
"{name}_{version}.py".format(name=name, version=version))

@classmethod
def _jobtype_download_complete(cls, response, cache_key):
# Server is offline or experiencing issues right
# now so we should retry the request.
if response.code >= INTERNAL_SERVER_ERROR:
logger.debug(
"Could not download jobtype because of internal server error.")
return reactor.callLater(
http_retry_delay(),
response.request.retry)

downloaded_data = response.json()
@inlineCallbacks
def load(self, name, version):
"""
Main method used by a job type to load the job type class. Internally
this handles retrieval of the job type either from the cache or from
the master. This will also cache the job type we retrieve from the
master if caching is enabled.

if not config["jobtype_enable_cache"]:
logger.debug("Jobtype cache is disabled, loading the jobtype.")
return cls._load_jobtype(downloaded_data, None)
:param str name:
The name of the job type to load and return.

else:
# When the download is complete, cache the results
logger.debug("Caching the jobtype locally.")
caching = cls._cache_jobtype(cache_key, downloaded_data)
caching.addCallback(
lambda result: cls._load_jobtype(*result))
return caching
:param str version:
The version of the job type to load and return.
"""
source_code = yield self.cached_source(name, version)

# Caching may be disabled if source_code is still None
if source_code is None:
source_code = yield self.download_source(name, version)
yield self.write_cache(name, version, source_code)

@inlineCallbacks
def download_source(self, name, version):
"""
Downloads and returns the source code for the given name and version
of a job type.

:param str name:
The name of the job type to download the source code for.

:param str version:
The version of the job type to download the source code for.
"""
url = "{master_api}/jobtypes/{name}/versions/{version}".format(
master_api=config["master_api"], name=name, version=version
)
logger.debug("Downloading %s", url)

while True:
try:
response = yield get_direct(url)
except Exception as error:
logger.error(
"Failed to download %s: %s. Request will be retried.",
url, error)
delay = Deferred()
reactor.callLater(http_retry_delay(), delay.callback, None)
yield delay
else:
if response.code == OK:
data = yield treq.json_content(response)
returnValue(data)

elif response.code == NOT_FOUND:
raise JobTypeNotFound(name, version)

# Server is offline or experiencing issues right
# now so we should retry the request.
elif response.code >= INTERNAL_SERVER_ERROR:
logger.debug(
"Could not download job type, response %r while "
"downloading %s. Request will be retried.",
response.code, url
)
delay = Deferred()
reactor.callLater(http_retry_delay(), delay.callback, None)
yield delay

# If we got a bad request coming back from the
# master we've done something wrong. Don't
# retry the request because we shouldn't
# expect the response code to change with
# a retry.
elif response.code >= BAD_REQUEST:
raise JobTypeDownloadError(
"HTTP %s error when downloading %s" % (
response.code, url))

else:
logger.warning(
"Unknown response %r while downloading %s. Request "
"will be retried.", response.code, url)
delay = Deferred()
reactor.callLater(http_retry_delay(), delay.callback, None)
yield delay

@inlineCallbacks
def cached_source(self, name, version):
"""
Searches for and returns the cached ource code for the given name
and version of a job type. Returns None if the job type is not
current cached or of caching is disabled.

:param str name:
The name of the job type to return the cached entry for.

:param str version:
The version of the job type to return the cached entry for.
"""
cache_path = self.cache_path(name, version)

if not self.cache_directory or not isfile(cache_path):
returnValue(None)

stream = yield threads.deferToThread(open, cache_path, "rb")
try:
data = yield threads.deferToThread(stream.read)
finally:
stream.close()
returnValue(data)

@inlineCallbacks
def write_cache(self, name, version, source_code):
"""
Writes the given ``source_code`` to the disk cache for the given
job type name and version.

:param str name:
The name of the job type we're writing a cache for.

:param str version:
The version of the job type we're writing a cache for.

:param str source_code:
The source code of the job type we're writing a cache for.

:returns:
This function does not return anything.
"""
if not self.cache_directory:
returnValue(None)

error = False
cache_path = self.cache_path(name, version)
output = yield threads.deferToThread(open, cache_path, "wb")

# Write the data
try:
yield threads.deferToThread(output.write, source_code)
except Exception as error:
logger.error(
"Failed to write %r v%s to %s: %s", name, version, error)
error = True
raise
finally:
output.close()

if error:
remove_file(output.name, raise_=False)


class CacheOld(object):
"""Internal methods for caching job types"""

@classmethod
def _cache_jobtype(cls, cache_key, jobtype):
Expand Down
Loading