diff --git a/pyfarm/agent/http/api/assign.py b/pyfarm/agent/http/api/assign.py index 99868818..abce73ab 100644 --- a/pyfarm/agent/http/api/assign.py +++ b/pyfarm/agent/http/api/assign.py @@ -27,7 +27,6 @@ import traceback from functools import partial -from twisted.web.server import NOT_DONE_YET from twisted.internet import reactor from twisted.internet.defer import DeferredList from voluptuous import Schema, Required @@ -64,71 +63,66 @@ def __init__(self, agent): self.agent = agent def post(self, **kwargs): + if request_from_master(kwargs["request"]): + config.master_contacted() + request = kwargs["request"] request_data = kwargs["data"] - if request_from_master(request): - config.master_contacted() + # First, get the resources we have *right now*. In some cases + # this means using the functions in pyfarm.core.sysinfo because + # entries in `config` could be slightly out of sync with the system. + memory_free = free_ram() + cpus = config["agent_cpus"] + requires_ram = request_data["job"].get("ram") + requires_cpus = request_data["job"].get("cpus") if ("agent_id" in request_data and request_data["agent_id"] != config["agent_id"]): logger.error("Wrong agent_id in assignment: %s. Our id is %s", request_data["agent_id"], config["agent_id"]) - request.setResponseCode(BAD_REQUEST) - request.write(dumps( - {"error": "You have the wrong agent. I am %s." % - config["agent_id"], - "agent_id": config["agent_id"]})) - request.finish() - return NOT_DONE_YET - - if self.agent.reannounce_lock.locked: + return ( + dumps({"error": "You have the wrong agent. " + "I am %s." % config["agent_id"], + "agent_id": config["agent_id"]}), + BAD_REQUEST + ) + + elif self.agent.reannounce_lock.locked: logger.warning("Temporarily rejecting assignment because we " "are in the middle of a reannounce.") - request.setResponseCode(SERVICE_UNAVAILABLE) - request.write( + return ( dumps({"error": "Agent cannot accept assignments because of a " - "reannounce in progress. Try again shortly."})) - request.finish() - return NOT_DONE_YET + "reannounce in progress. Try again shortly."}), + SERVICE_UNAVAILABLE + ) - # First, get the resources we have *right now*. In some cases - # this means using the functions in pyfarm.core.sysinfo because - # entries in `config` could be slightly out of sync with the system. - memory_free = free_ram() - cpus = config["agent_cpus"] - requires_ram = request_data["job"].get("ram") - requires_cpus = request_data["job"].get("cpus") - - if self.agent.shutting_down: + elif self.agent.shutting_down: logger.error("Rejecting assignment because the agent is in the " "process of shutting down.") - request.setResponseCode(SERVICE_UNAVAILABLE) - request.write( + return ( dumps({"error": "Agent cannot accept assignments because it is " - "shutting down"})) - request.finish() - return NOT_DONE_YET + "shutting down."}), + SERVICE_UNAVAILABLE + ) - if "restart_requested" in config \ + elif "restart_requested" in config \ and config["restart_requested"] is True: logger.error("Rejecting assignment because of scheduled restart.") - request.setResponseCode(SERVICE_UNAVAILABLE) - request.write( + return ( dumps({"error": "Agent cannot accept assignments because of a " - "pending restart"})) - request.finish() - return NOT_DONE_YET + "pending restart."}), + SERVICE_UNAVAILABLE + ) elif "agent_id" not in config: logger.error( "Agent has not yet connected to the master or `agent_id` " "has not been set yet.") - request.setResponseCode(SERVICE_UNAVAILABLE) - request.write( - dumps({"error": "agent_id has not been set in the config"})) - request.finish() - return NOT_DONE_YET + return ( + dumps({"error": "agent_id has not been set in the config"}), + SERVICE_UNAVAILABLE + ) # Do we have enough ram? elif requires_ram is not None and requires_ram > memory_free: @@ -137,16 +131,13 @@ def post(self, **kwargs): "Rejecting Task %s.", request_data["job"]["id"], requires_ram, memory_free, request_data["job"]["id"]) - request.setResponseCode(BAD_REQUEST) - request.write( + config["free_ram"] = memory_free + return ( dumps({"error": "Not enough ram", "agent_ram": memory_free, - "requires_ram": requires_ram})) - request.finish() - - # touch the config - config["free_ram"] = memory_free - return NOT_DONE_YET + "requires_ram": requires_ram}), + BAD_REQUEST + ) # Do we have enough cpus (count wise)? elif requires_cpus is not None and requires_cpus > cpus: @@ -155,72 +146,53 @@ def post(self, **kwargs): "Rejecting Task %s.", request_data["job"]["id"], requires_cpus, cpus, request_data["job"]["id"]) - request.setResponseCode(BAD_REQUEST) - request.write( + return ( dumps({"error": "Not enough cpus", "agent_cpus": cpus, - "requires_cpus": requires_cpus})) - request.finish() - return NOT_DONE_YET - - # Check for double assignments - try: - current_assignments = config["current_assignments"].itervalues - except AttributeError: # pragma: no cover - current_assignments = config["current_assignments"].values + "requires_cpus": requires_cpus}), + BAD_REQUEST + ) new_task_ids = set(task["id"] for task in request_data["tasks"]) - for assignment in current_assignments(): + for assignment in config["current_assignments"].itervalues(): existing_task_ids = set(x["id"] for x in assignment["tasks"]) # If the assignment is identical to one we already have if existing_task_ids == new_task_ids: - logger.debug("Ignoring repeated assignment of the same batch") - request.setResponseCode(ACCEPTED) - request.write(dumps({"id": assignment["id"]})) - request.finish() - return NOT_DONE_YET + logger.debug( + "Ignoring repeated assignment of the same batch") + return dumps({"id": assignment["id"]}), ACCEPTED + # If there is only a partial overlap elif existing_task_ids & new_task_ids: logger.error("Rejecting assignment with partial overlap with " "existing assignment.") unknown_task_ids = new_task_ids - existing_task_ids - request.setResponseCode(CONFLICT) - request.write(dumps( - {"error": "Partial overlap of tasks", - "rejected_task_ids": list(unknown_task_ids)})) - request.finish() - return NOT_DONE_YET + return ( + dumps({"error": "Partial overlap of tasks", + "rejected_task_ids": list(unknown_task_ids)}), + CONFLICT + ) if not config["agent_allow_sharing"]: - try: - current_jobtypes = config["jobtypes"].itervalues - except AttributeError: # pragma: no cover - current_jobtypes = config["jobtypes"].values - for jobtype in current_jobtypes(): + for jobtype in config["jobtypes"].itervalues(): num_finished_tasks = (len(jobtype.finished_tasks) + len(jobtype.failed_tasks)) if len(jobtype.assignment["tasks"]) > num_finished_tasks: logger.error("Rejecting an assignment that would require " "agent sharing") - request.setResponseCode(CONFLICT) - request.write( - dumps({"error": - "Agent does not allow multiple assignments", - "rejected_task_ids": list(new_task_ids)})) - request.finish() - return NOT_DONE_YET + return ( + dumps({ + "error": "Agent does not allow multiple " + "assignments", + "rejected_task_ids": list(new_task_ids)}), + CONFLICT + ) assignment_uuid = uuid4() request_data.update(id=assignment_uuid) config["current_assignments"][assignment_uuid] = request_data - - # In all other cases we have some work to do inside of - # deferreds so we just have to respond - request.setResponseCode(ACCEPTED) - request.write(dumps({"id": assignment_uuid})) - request.finish() logger.debug("Accepted assignment %s: %r", assignment_uuid, request_data) logger.info("Accept assignment from job %s with %s tasks", @@ -393,4 +365,4 @@ def loaded_jobtype(jobtype_class, assign_id): jobtype_loader.addCallback(loaded_jobtype, assignment_uuid) jobtype_loader.addErrback(load_jobtype_failed, assignment_uuid) - return NOT_DONE_YET + return dumps({"id": assignment_uuid}), ACCEPTED diff --git a/tests/test_agent/test_http_api_assign.py b/tests/test_agent/test_http_api_assign.py index 6bccc1e6..058b5fa0 100644 --- a/tests/test_agent/test_http_api_assign.py +++ b/tests/test_agent/test_http_api_assign.py @@ -124,7 +124,7 @@ def test_restarting(self): self.assertEqual(len(request.written), 1) self.assertEqual( loads(request.written[0])["error"], - "Agent cannot accept assignments because of a pending restart") + u"Agent cannot accept assignments because of a pending restart.") def test_agent_id_not_set(self): config.pop("agent_id", None)