Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 64 additions & 92 deletions pyfarm/agent/http/api/assign.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import traceback
from functools import partial

from twisted.web.server import NOT_DONE_YET
from twisted.internet import reactor
from twisted.internet.defer import DeferredList
from voluptuous import Schema, Required
Expand Down Expand Up @@ -64,71 +63,66 @@ def __init__(self, agent):
self.agent = agent

def post(self, **kwargs):
if request_from_master(kwargs["request"]):
config.master_contacted()

request = kwargs["request"]
request_data = kwargs["data"]

if request_from_master(request):
config.master_contacted()
# First, get the resources we have *right now*. In some cases
# this means using the functions in pyfarm.core.sysinfo because
# entries in `config` could be slightly out of sync with the system.
memory_free = free_ram()
cpus = config["agent_cpus"]
requires_ram = request_data["job"].get("ram")
requires_cpus = request_data["job"].get("cpus")

if ("agent_id" in request_data and
request_data["agent_id"] != config["agent_id"]):
logger.error("Wrong agent_id in assignment: %s. Our id is %s",
request_data["agent_id"], config["agent_id"])
request.setResponseCode(BAD_REQUEST)
request.write(dumps(
{"error": "You have the wrong agent. I am %s." %
config["agent_id"],
"agent_id": config["agent_id"]}))
request.finish()
return NOT_DONE_YET

if self.agent.reannounce_lock.locked:
return (
dumps({"error": "You have the wrong agent. "
"I am %s." % config["agent_id"],
"agent_id": config["agent_id"]}),
BAD_REQUEST
)

elif self.agent.reannounce_lock.locked:
logger.warning("Temporarily rejecting assignment because we "
"are in the middle of a reannounce.")
request.setResponseCode(SERVICE_UNAVAILABLE)
request.write(
return (
dumps({"error": "Agent cannot accept assignments because of a "
"reannounce in progress. Try again shortly."}))
request.finish()
return NOT_DONE_YET
"reannounce in progress. Try again shortly."}),
SERVICE_UNAVAILABLE
)

# First, get the resources we have *right now*. In some cases
# this means using the functions in pyfarm.core.sysinfo because
# entries in `config` could be slightly out of sync with the system.
memory_free = free_ram()
cpus = config["agent_cpus"]
requires_ram = request_data["job"].get("ram")
requires_cpus = request_data["job"].get("cpus")

if self.agent.shutting_down:
elif self.agent.shutting_down:
logger.error("Rejecting assignment because the agent is in the "
"process of shutting down.")
request.setResponseCode(SERVICE_UNAVAILABLE)
request.write(
return (
dumps({"error": "Agent cannot accept assignments because it is "
"shutting down"}))
request.finish()
return NOT_DONE_YET
"shutting down."}),
SERVICE_UNAVAILABLE
)

if "restart_requested" in config \
elif "restart_requested" in config \
and config["restart_requested"] is True:
logger.error("Rejecting assignment because of scheduled restart.")
request.setResponseCode(SERVICE_UNAVAILABLE)
request.write(
return (
dumps({"error": "Agent cannot accept assignments because of a "
"pending restart"}))
request.finish()
return NOT_DONE_YET
"pending restart."}),
SERVICE_UNAVAILABLE
)

elif "agent_id" not in config:
logger.error(
"Agent has not yet connected to the master or `agent_id` "
"has not been set yet.")
request.setResponseCode(SERVICE_UNAVAILABLE)
request.write(
dumps({"error": "agent_id has not been set in the config"}))
request.finish()
return NOT_DONE_YET
return (
dumps({"error": "agent_id has not been set in the config"}),
SERVICE_UNAVAILABLE
)

# Do we have enough ram?
elif requires_ram is not None and requires_ram > memory_free:
Expand All @@ -137,16 +131,13 @@ def post(self, **kwargs):
"Rejecting Task %s.",
request_data["job"]["id"], requires_ram, memory_free,
request_data["job"]["id"])
request.setResponseCode(BAD_REQUEST)
request.write(
config["free_ram"] = memory_free
return (
dumps({"error": "Not enough ram",
"agent_ram": memory_free,
"requires_ram": requires_ram}))
request.finish()

# touch the config
config["free_ram"] = memory_free
return NOT_DONE_YET
"requires_ram": requires_ram}),
BAD_REQUEST
)

# Do we have enough cpus (count wise)?
elif requires_cpus is not None and requires_cpus > cpus:
Expand All @@ -155,72 +146,53 @@ def post(self, **kwargs):
"Rejecting Task %s.",
request_data["job"]["id"], requires_cpus, cpus,
request_data["job"]["id"])
request.setResponseCode(BAD_REQUEST)
request.write(
return (
dumps({"error": "Not enough cpus",
"agent_cpus": cpus,
"requires_cpus": requires_cpus}))
request.finish()
return NOT_DONE_YET

# Check for double assignments
try:
current_assignments = config["current_assignments"].itervalues
except AttributeError: # pragma: no cover
current_assignments = config["current_assignments"].values
"requires_cpus": requires_cpus}),
BAD_REQUEST
)

new_task_ids = set(task["id"] for task in request_data["tasks"])

for assignment in current_assignments():
for assignment in config["current_assignments"].itervalues():
existing_task_ids = set(x["id"] for x in assignment["tasks"])

# If the assignment is identical to one we already have
if existing_task_ids == new_task_ids:
logger.debug("Ignoring repeated assignment of the same batch")
request.setResponseCode(ACCEPTED)
request.write(dumps({"id": assignment["id"]}))
request.finish()
return NOT_DONE_YET
logger.debug(
"Ignoring repeated assignment of the same batch")
return dumps({"id": assignment["id"]}), ACCEPTED

# If there is only a partial overlap
elif existing_task_ids & new_task_ids:
logger.error("Rejecting assignment with partial overlap with "
"existing assignment.")
unknown_task_ids = new_task_ids - existing_task_ids
request.setResponseCode(CONFLICT)
request.write(dumps(
{"error": "Partial overlap of tasks",
"rejected_task_ids": list(unknown_task_ids)}))
request.finish()
return NOT_DONE_YET
return (
dumps({"error": "Partial overlap of tasks",
"rejected_task_ids": list(unknown_task_ids)}),
CONFLICT
)

if not config["agent_allow_sharing"]:
try:
current_jobtypes = config["jobtypes"].itervalues
except AttributeError: # pragma: no cover
current_jobtypes = config["jobtypes"].values
for jobtype in current_jobtypes():
for jobtype in config["jobtypes"].itervalues():
num_finished_tasks = (len(jobtype.finished_tasks) +
len(jobtype.failed_tasks))
if len(jobtype.assignment["tasks"]) > num_finished_tasks:
logger.error("Rejecting an assignment that would require "
"agent sharing")
request.setResponseCode(CONFLICT)
request.write(
dumps({"error":
"Agent does not allow multiple assignments",
"rejected_task_ids": list(new_task_ids)}))
request.finish()
return NOT_DONE_YET
return (
dumps({
"error": "Agent does not allow multiple "
"assignments",
"rejected_task_ids": list(new_task_ids)}),
CONFLICT
)

assignment_uuid = uuid4()
request_data.update(id=assignment_uuid)
config["current_assignments"][assignment_uuid] = request_data

# In all other cases we have some work to do inside of
# deferreds so we just have to respond
request.setResponseCode(ACCEPTED)
request.write(dumps({"id": assignment_uuid}))
request.finish()
logger.debug("Accepted assignment %s: %r",
assignment_uuid, request_data)
logger.info("Accept assignment from job %s with %s tasks",
Expand Down Expand Up @@ -393,4 +365,4 @@ def loaded_jobtype(jobtype_class, assign_id):
jobtype_loader.addCallback(loaded_jobtype, assignment_uuid)
jobtype_loader.addErrback(load_jobtype_failed, assignment_uuid)

return NOT_DONE_YET
return dumps({"id": assignment_uuid}), ACCEPTED
2 changes: 1 addition & 1 deletion tests/test_agent/test_http_api_assign.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def test_restarting(self):
self.assertEqual(len(request.written), 1)
self.assertEqual(
loads(request.written[0])["error"],
"Agent cannot accept assignments because of a pending restart")
u"Agent cannot accept assignments because of a pending restart.")

def test_agent_id_not_set(self):
config.pop("agent_id", None)
Expand Down