diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 31e2e6e..36909aa 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -6,8 +6,7 @@ repos: - id: end-of-file-fixer - id: flake8 - repo: https://github.com/psf/black - rev: 19.3b0 + rev: 22.3.0 hooks: - id: black - args: - - --py36 + args: ["--target-version", "py36"] diff --git a/lapis/cli/simulate.py b/lapis/cli/simulate.py index aa176ee..d1a9030 100644 --- a/lapis/cli/simulate.py +++ b/lapis/cli/simulate.py @@ -10,10 +10,10 @@ from lapis.pool_io.htcondor import htcondor_pool_reader from lapis.job_io.swf import swf_job_reader -from lapis.scheduler import CondorJobScheduler +from lapis.scheduler.base import CondorJobScheduler from lapis.simulator import Simulator -from lapis.monitor import ( +from lapis.monitor.core import ( LoggingSocketHandler, LoggingUDPSocketHandler, SimulationTimeFilter, diff --git a/lapis/controller.py b/lapis/controller.py index 9a44b46..5901415 100644 --- a/lapis/controller.py +++ b/lapis/controller.py @@ -1,12 +1,17 @@ +from lapis.pool import Pool from cobald.controller.linear import LinearController from cobald.controller.relative_supply import RelativeSupplyController -from cobald.interfaces import Pool from usim import time class SimulatedLinearController(LinearController): def __init__( - self, target: Pool, low_utilisation=0.5, high_allocation=0.5, rate=1, interval=1 + self, + target: Pool, + low_utilisation: float = 0.5, + high_allocation: float = 0.5, + rate: float = 1, + interval: float = 1, ): super(SimulatedLinearController, self).__init__( target, low_utilisation, high_allocation, rate, interval @@ -22,11 +27,11 @@ class SimulatedRelativeSupplyController(RelativeSupplyController): def __init__( self, target: Pool, - low_utilisation=0.5, - high_allocation=0.5, - low_scale=0.9, - high_scale=1.1, - interval=1, + low_utilisation: float = 0.5, + high_allocation: float = 0.5, + low_scale: float = 0.9, + high_scale: float = 1.1, + interval: float = 1, ): super(SimulatedRelativeSupplyController, self).__init__( target=target, @@ -45,14 +50,19 @@ async def run(self): class SimulatedCostController(SimulatedLinearController): def __init__( - self, target: Pool, low_utilisation=0.5, high_allocation=0.5, rate=1, interval=1 + self, + target: Pool, + low_utilisation: float = 0.5, + high_allocation: float = 0.5, + rate: float = 1, + interval: float = 1, ): self.current_cost = 1 super(SimulatedCostController, self).__init__( target, low_utilisation, high_allocation, rate, interval ) - def regulate(self, interval): + def regulate(self, interval: float): allocation = 0 for drone in self.target.drones: allocation += drone.allocation @@ -64,8 +74,3 @@ def regulate(self, interval): self.target.demand = allocation if self.current_cost > 1: self.current_cost -= 1 - # self.target.demand = allocation + self.current_cost - # else: - # if self.current_cost > 1: - # self.current_cost -= 1 - # self.target.demand = allocation + self.current_cost diff --git a/lapis/cost.py b/lapis/cost.py deleted file mode 100644 index f391ff7..0000000 --- a/lapis/cost.py +++ /dev/null @@ -1,22 +0,0 @@ -def cobald_cost(simulator): - result = len(list(simulator.job_scheduler.drone_list)) - for drone in simulator.job_scheduler.drone_list: - result += 1 - tmp = 0 - for resource_key in drone.pool_resources: - tmp += drone.resources[resource_key] / drone.pool_resources[resource_key] - tmp /= len(drone.pool_resources) - result -= tmp - return result - - -def local_cobald_cost(pool): - result = 0 - for drone in pool.drones: - result += 1 - tmp = 0 - for resource_key in pool.resources: - tmp += drone.resources[resource_key] / pool.resources[resource_key] - tmp /= len(pool.resources) - result -= tmp - return result diff --git a/lapis/job.py b/lapis/job.py index c4627e0..e3c37de 100644 --- a/lapis/job.py +++ b/lapis/job.py @@ -1,13 +1,13 @@ import logging -from typing import Optional, TYPE_CHECKING +from typing import Optional, TYPE_CHECKING, Dict -from usim import time +from usim import time, Queue from usim import CancelTask -from lapis.monitor import sampling_required +from lapis.monitor.core import sampling_required if TYPE_CHECKING: - from lapis.drone import Drone + from lapis.workernode import WorkerNode class Job(object): @@ -17,8 +17,6 @@ class Job(object): "walltime", "requested_walltime", "queue_date", - "requested_inputfiles", - "used_inputfiles", "in_queue_since", "in_queue_until", "_name", @@ -28,12 +26,11 @@ class Job(object): def __init__( self, - resources: dict, - used_resources: dict, + resources: Dict[str, float], + used_resources: Dict[str, float], in_queue_since: float = 0, queue_date: float = 0, name: str = None, - drone: "Drone" = None, ): """ Definition of a job that uses a specified amount of resources `used_resources` @@ -47,7 +44,6 @@ def __init__( simulation scheduler :param queue_date: Time when job was inserted into queue in real life :param name: Name of the job - :param drone: Drone where the job is running on """ self.resources = resources self.used_resources = used_resources @@ -59,17 +55,22 @@ def __init__( self.used_resources[key], ) self.resources[key] = self.used_resources[key] - self.walltime = used_resources.pop("walltime") - self.requested_walltime = resources.pop("walltime", None) - self.requested_inputfiles = resources.pop("inputfiles", None) - self.used_inputfiles = used_resources.pop("inputfiles", None) + self.walltime: float = used_resources.pop("walltime") + """the job's runtime, in reality as well as in the simulation""" + self.requested_walltime: Optional[float] = resources.pop("walltime", None) + """estimate of the job's walltime""" self.queue_date = queue_date + """ point in time when the job was submitted to the simulated job queue""" assert in_queue_since >= 0, "Queue time cannot be negative" self.in_queue_since = in_queue_since - self.in_queue_until = None - self.drone = drone + """Time when job was inserted into the queue of the simulation scheduler""" + self.in_queue_until: Optional[float] = None + """point in time when the job left the job queue""" + self.drone = None self._name = name + """identifier of the job""" self._success: Optional[bool] = None + """flag indicating whether the job was completed successfully""" @property def name(self) -> str: @@ -83,7 +84,7 @@ def successful(self) -> Optional[bool]: def waiting_time(self) -> float: """ The time the job spent in the simulators scheduling queue. `Inf` when - the job is still waitiing. + the job is still waiting. :return: Time in queue """ @@ -91,7 +92,7 @@ def waiting_time(self) -> float: return self.in_queue_until - self.in_queue_since return float("Inf") - async def run(self, drone: "Drone"): + async def run(self, drone: "WorkerNode"): assert drone, "Jobs cannot run without a drone being assigned" self.drone = drone self.in_queue_until = time.now @@ -115,7 +116,13 @@ def __repr__(self): return "<%s: %s>" % (self.__class__.__name__, self._name or id(self)) -async def job_to_queue_scheduler(job_generator, job_queue): +async def job_to_queue_scheduler(job_generator, job_queue: Queue): + """ + Handles reading the simulation's job input and puts the job's into the job queue + + :param job_generator: reader object that yields jobs from input + :param job_queue: queue the jobs are added to + """ base_date = None for job in job_generator: if base_date is None: diff --git a/lapis/job_io/htcondor.py b/lapis/job_io/htcondor.py index 60a3684..496bf75 100644 --- a/lapis/job_io/htcondor.py +++ b/lapis/job_io/htcondor.py @@ -1,36 +1,48 @@ import csv import json import logging +from typing import Dict, Iterable from lapis.job import Job -from copy import deepcopy + + +default_resource_name_mapping: Dict[str, str] = { + "cores": "RequestCpus", + "walltime": "RequestWalltime", # s + "memory": "RequestMemory", # MiB + "disk": "RequestDisk", # KiB +} +default_used_resource_name_mapping: Dict[str, str] = { + "queuetime": "QDate", + "walltime": "RemoteWallClockTime", # s + "memory": "MemoryUsage", # MB + "disk": "DiskUsage_RAW", # KiB +} +default_unit_conversion_mapping: Dict[str, float] = { + "RequestCpus": 1, + "RequestWalltime": 1, + "RequestMemory": 1024 * 1024, + "RequestDisk": 1024, + "queuetime": 1, + "RemoteWallClockTime": 1, + "MemoryUsage": 1000 * 1000, + "DiskUsage_RAW": 1024, +} def htcondor_job_reader( iterable, - resource_name_mapping={ # noqa: B006 - "cores": "RequestCpus", - "walltime": "RequestWalltime", # s - "memory": "RequestMemory", # MiB - "disk": "RequestDisk", # KiB - }, - used_resource_name_mapping={ # noqa: B006 - "queuetime": "QDate", - "walltime": "RemoteWallClockTime", # s - "memory": "MemoryUsage", # MB - "disk": "DiskUsage_RAW", # KiB - }, - unit_conversion_mapping={ # noqa: B006 - "RequestCpus": 1, - "RequestWalltime": 1, - "RequestMemory": 1024 * 1024, - "RequestDisk": 1024, - "queuetime": 1, - "RemoteWallClockTime": 1, - "MemoryUsage": 1000 * 1000, - "DiskUsage_RAW": 1024, - }, -): + resource_name_mapping: Dict[str, str] = None, + used_resource_name_mapping: Dict[str, str] = None, + unit_conversion_mapping: Dict[str, float] = None, +) -> Iterable[Job]: + if resource_name_mapping is None: + resource_name_mapping = default_resource_name_mapping + if used_resource_name_mapping is None: + used_resource_name_mapping = default_used_resource_name_mapping + if unit_conversion_mapping is None: + unit_conversion_mapping = default_unit_conversion_mapping + input_file_type = iterable.name.split(".")[-1].lower() if input_file_type == "json": htcondor_reader = json.load(iterable) @@ -40,6 +52,7 @@ def htcondor_job_reader( logging.getLogger("implementation").error( "Invalid input file %s. Job input file can not be read." % iterable.name ) + return for entry in htcondor_reader: if float(entry[used_resource_name_mapping["walltime"]]) <= 0: logging.getLogger("implementation").warning( @@ -69,22 +82,6 @@ def htcondor_job_reader( float(entry[original_key]) * unit_conversion_mapping.get(original_key, 1) ) - - try: - resources["inputfiles"] = deepcopy(entry["Inputfiles"]) - used_resources["inputfiles"] = deepcopy(entry["Inputfiles"]) - for filename, filespecs in entry["Inputfiles"].items(): - if "usedsize" in filespecs: - del resources["inputfiles"][filename]["usedsize"] - if "filesize" in filespecs: - if "usedsize" not in filespecs: - used_resources["inputfiles"][filename]["usedsize"] = filespecs[ - "filesize" - ] - del used_resources["inputfiles"][filename]["filesize"] - - except KeyError: - pass yield Job( resources=resources, used_resources=used_resources, diff --git a/lapis/job_io/swf.py b/lapis/job_io/swf.py index 99124bd..1bc0d15 100644 --- a/lapis/job_io/swf.py +++ b/lapis/job_io/swf.py @@ -4,28 +4,40 @@ [Standard Workload Format](http://www.cs.huji.ac.il/labs/parallel/workload/swf.html). """ import csv +from typing import Dict, Iterable from lapis.job import Job +default_resource_name_mapping: Dict[str, str] = { + "cores": "Requested Number of Processors", + "walltime": "Requested Time", # s + "memory": "Requested Memory", # KiB +} +default_used_resource_name_mapping: Dict[str, str] = { + "walltime": "Run Time", # s + "cores": "Number of Allocated Processors", + "memory": "Used Memory", # KiB + "queuetime": "Submit Time", +} +default_unit_conversion_mapping: Dict[str, float] = { + "Used Memory": 1024, + "Requested Memory": 1024, +} + + def swf_job_reader( iterable, - resource_name_mapping={ # noqa: B006 - "cores": "Requested Number of Processors", - "walltime": "Requested Time", # s - "memory": "Requested Memory", # KiB - }, - used_resource_name_mapping={ # noqa: B006 - "walltime": "Run Time", # s - "cores": "Number of Allocated Processors", - "memory": "Used Memory", # KiB - "queuetime": "Submit Time", - }, - unit_conversion_mapping={ # noqa: B006 - "Used Memory": 1024, - "Requested Memory": 1024, - }, -): + resource_name_mapping: Dict[str, str] = None, + used_resource_name_mapping: Dict[str, str] = None, + unit_conversion_mapping: Dict[str, float] = None, +) -> Iterable[Job]: + if resource_name_mapping is None: + resource_name_mapping = default_resource_name_mapping + if used_resource_name_mapping is None: + used_resource_name_mapping = default_used_resource_name_mapping + if unit_conversion_mapping is None: + unit_conversion_mapping = default_unit_conversion_mapping header = { "Job Number": 0, "Submit Time": 1, diff --git a/lapis/monitor/cobald.py b/lapis/monitor/cobald.py index 21dfee0..a10776e 100644 --- a/lapis/monitor/cobald.py +++ b/lapis/monitor/cobald.py @@ -4,12 +4,12 @@ from cobald.monitor.format_line import LineProtocolFormatter from typing import List, Dict -from lapis.drone import Drone -from lapis.monitor import LoggingSocketHandler, LoggingUDPSocketHandler +from lapis.workernode import WorkerNode +from lapis.monitor.core import LoggingSocketHandler, LoggingUDPSocketHandler from lapis.pool import Pool -def drone_statistics(drone: Drone) -> List[Dict]: +def drone_statistics(drone: WorkerNode) -> List[Dict]: """ Collect allocation, utilisation, demand and supply of drones. @@ -32,7 +32,7 @@ def drone_statistics(drone: Drone) -> List[Dict]: drone_statistics.name = "cobald_status" -drone_statistics.whitelist = (Drone,) +drone_statistics.whitelist = (WorkerNode,) drone_statistics.logging_formatter = { LoggingSocketHandler.__name__: JsonFormatter(), logging.StreamHandler.__name__: JsonFormatter(), diff --git a/lapis/monitor/__init__.py b/lapis/monitor/core.py similarity index 80% rename from lapis/monitor/__init__.py rename to lapis/monitor/core.py index c7e4039..4a9b0a6 100644 --- a/lapis/monitor/__init__.py +++ b/lapis/monitor/core.py @@ -7,6 +7,9 @@ from usim import time, Queue +SIMULATION_START = None + + class LoggingSocketHandler(logging.handlers.SocketHandler): def makePickle(self, record): return self.format(record).encode() @@ -42,11 +45,19 @@ def __init__(self): self._statistics = {} async def run(self): - async for log_object in sampling_required: - for statistic in self._statistics.get(type(log_object), set()): - # do the logging - for record in statistic(log_object): - logging.getLogger(statistic.name).info(statistic.name, record) + # The Queue.__aiter__ cannot safely be finalised unless closed. + # We explicitly create and later on aclose it, to ensure this happens + # when the Scope collects us and the event loop is still around. + log_iter = sampling_required.__aiter__() + try: + async for log_object in log_iter: + for statistic in self._statistics.get(type(log_object), set()): + # do the logging + for record in statistic(log_object): + record["tardis"] = "lapis-%s" % SIMULATION_START + logging.getLogger(statistic.name).info(statistic.name, record) + except GeneratorExit: + pass def register_statistic(self, statistic: Callable) -> None: """ diff --git a/lapis/monitor/general.py b/lapis/monitor/general.py index be6d24d..bf8acdb 100644 --- a/lapis/monitor/general.py +++ b/lapis/monitor/general.py @@ -1,21 +1,22 @@ from typing import TYPE_CHECKING, List, Dict +import logging import logging.handlers from cobald.monitor.format_json import JsonFormatter from cobald.monitor.format_line import LineProtocolFormatter -from lapis.drone import Drone +from lapis.workernode import WorkerNode from lapis.job import Job -from lapis.monitor import LoggingSocketHandler, LoggingUDPSocketHandler +from lapis.monitor.core import LoggingSocketHandler, LoggingUDPSocketHandler from lapis.pool import Pool -from lapis.scheduler import CondorJobScheduler, JobQueue +from lapis.scheduler.base import CondorJobScheduler, JobQueue if TYPE_CHECKING: from lapis.simulator import Simulator -def resource_statistics(drone: Drone) -> List[Dict]: +def resource_statistics(drone: WorkerNode) -> List[Dict]: """ Log ratio of used and requested resources for drones. @@ -23,7 +24,7 @@ def resource_statistics(drone: Drone) -> List[Dict]: :return: list of records for logging """ results = [] - resources = drone.theoretical_available_resources + resources = drone.unallocated_resources used_resources = drone.available_resources for resource_type in resources: results.append( @@ -42,7 +43,7 @@ def resource_statistics(drone: Drone) -> List[Dict]: resource_statistics.name = "resource_status" -resource_statistics.whitelist = (Drone,) +resource_statistics.whitelist = (WorkerNode,) resource_statistics.logging_formatter = { LoggingSocketHandler.__name__: JsonFormatter(), logging.StreamHandler.__name__: JsonFormatter(), diff --git a/lapis/pool.py b/lapis/pool.py index 2a3e465..4fe9128 100644 --- a/lapis/pool.py +++ b/lapis/pool.py @@ -2,7 +2,7 @@ from cobald import interfaces from usim import eternity, Scope, interval -from .drone import Drone +from .workernode import WorkerNode class Pool(interfaces.Pool): @@ -34,10 +34,11 @@ def __init__( self._capacity = capacity self._name = name - async def init_pool(self, scope: Scope, init: int = 0): + def _init_pool(self, scope: Scope, init: int = 0): """ Initialisation of existing drones at creation time of pool. + :param scope: :param init: Number of drones to create. """ for _ in range(init): @@ -53,7 +54,7 @@ async def run(self): initialising new drones. Otherwise drones get removed. """ async with Scope() as scope: - await self.init_pool(scope=scope, init=self._level) + self._init_pool(scope=scope, init=self._level) async for _ in interval(1): drones_required = min(self._demand, self._capacity) - self._level while drones_required > 0: @@ -74,7 +75,7 @@ async def run(self): break @property - def drones(self) -> Generator[Drone, None, None]: + def drones(self) -> Generator[WorkerNode, None, None]: for drone in self._drones: if drone.supply > 0: yield drone @@ -148,5 +149,5 @@ async def run(self): Pool runs forever and does not check if number of drones needs to be adapted. """ async with Scope() as scope: - await self.init_pool(scope=scope, init=self._level) + self._init_pool(scope=scope, init=self._level) await eternity diff --git a/lapis/pool_io/htcondor.py b/lapis/pool_io/htcondor.py index 0dba5c1..a6cb39b 100644 --- a/lapis/pool_io/htcondor.py +++ b/lapis/pool_io/htcondor.py @@ -1,29 +1,34 @@ import csv from functools import partial -from typing import Callable +from typing import Callable, Dict, Iterable from ..pool import Pool +default_resource_name_mapping: Dict[str, str] = { + "cores": "TotalSlotCPUs", + "disk": "TotalSlotDisk", # MiB + "memory": "TotalSlotMemory", # MiB +} +default_unit_conversion_mapping: Dict[str, float] = { + "TotalSlotCPUs": 1, + "TotalSlotDisk": 1024 * 1024, + "TotalSlotMemory": 1024 * 1024, +} + + def htcondor_pool_reader( iterable, - resource_name_mapping: dict = { # noqa: B006 - "cores": "TotalSlotCPUs", - "disk": "TotalSlotDisk", # MiB - "memory": "TotalSlotMemory", # MiB - }, - unit_conversion_mapping: dict = { # noqa: B006 - "TotalSlotCPUs": 1, - "TotalSlotDisk": 1024 * 1024, - "TotalSlotMemory": 1024 * 1024, - }, + resource_name_mapping: Dict[str, str] = None, + unit_conversion_mapping: Dict[str, float] = None, pool_type: Callable = Pool, make_drone: Callable = None, -): +) -> Iterable[Pool]: """ Load a pool configuration that was exported via htcondor from files or iterables + :param unit_conversion_mapping: :param iterable: an iterable yielding lines of CSV, such as an open file :param resource_name_mapping: Mapping from given header names to well-defined resources in simulation @@ -31,6 +36,11 @@ def htcondor_pool_reader( :param make_drone: :return: Yields the :py:class:`Pool`s found in the given iterable """ + if resource_name_mapping is None: + resource_name_mapping = default_resource_name_mapping + if unit_conversion_mapping is None: + unit_conversion_mapping = default_unit_conversion_mapping + assert make_drone reader = csv.DictReader(iterable, delimiter=" ", skipinitialspace=True) for row in reader: diff --git a/lapis/pool_io/machines.py b/lapis/pool_io/machines.py index 38e0e94..61b3333 100644 --- a/lapis/pool_io/machines.py +++ b/lapis/pool_io/machines.py @@ -1,23 +1,27 @@ import csv from functools import partial -from typing import Callable +from typing import Callable, Dict, Iterable from ..pool import Pool +default_resource_name_mapping: Dict[str, str] = { + "cores": "CPUs_per_node", + "memory": "RAM_per_node_in_KB", +} +default_unit_conversion_mapping: Dict[str, float] = { + "CPUs_per_node": 1, + "RAM_per_node_in_KB": 1000, +} + + def machines_pool_reader( iterable, - resource_name_mapping: dict = { # noqa: B006 - "cores": "CPUs_per_node", - "memory": "RAM_per_node_in_KB", - }, - unit_conversion_mapping={ # noqa: B006 - "CPUs_per_node": 1, - "RAM_per_node_in_KB": 1000, - }, + resource_name_mapping: Dict[str, str] = None, + unit_conversion_mapping: Dict[str, float] = None, pool_type: Callable = Pool, make_drone: Callable = None, -): +) -> Iterable[Pool]: """ Load a pool configuration that was exported via htcondor from files or iterables @@ -29,6 +33,11 @@ def machines_pool_reader( :param pool_type: The type of pool to be yielded :return: Yields the :py:class:`StaticPool`s found in the given iterable """ + if resource_name_mapping is None: + resource_name_mapping = default_resource_name_mapping + if unit_conversion_mapping is None: + unit_conversion_mapping = default_unit_conversion_mapping + assert make_drone reader = csv.DictReader(iterable, delimiter=" ", skipinitialspace=True) for row in reader: diff --git a/lapis/scheduler/__init__.py b/lapis/scheduler/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lapis/scheduler.py b/lapis/scheduler/base.py similarity index 73% rename from lapis/scheduler.py rename to lapis/scheduler/base.py index b38d53e..2fbf137 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler/base.py @@ -1,15 +1,50 @@ -from typing import Dict -from usim import Scope, interval, Resources +from abc import ABC +from typing import Dict, Iterator, Optional +from usim import Scope, interval, Resources, Queue -from lapis.drone import Drone -from lapis.monitor import sampling_required +from lapis.workernode import WorkerNode +from lapis.job import Job +from lapis.monitor.core import sampling_required class JobQueue(list): pass -class CondorJobScheduler(object): +class JobScheduler(ABC): + __slots__ = () + + @property + def drone_list(self) -> Iterator[WorkerNode]: + """Yields the registered drones""" + raise NotImplementedError + + def register_drone(self, drone: WorkerNode): + """Register a drone at the scheduler""" + raise NotImplementedError + + def unregister_drone(self, drone: WorkerNode): + """Unregister a drone at the scheduler""" + raise NotImplementedError + + def update_drone(self, drone: WorkerNode): + """Update parameters of a drone""" + raise NotImplementedError + + async def run(self): + """Run method of the scheduler""" + raise NotImplementedError + + async def job_finished(self, job): + """ + Declare a job as finished by a drone. This might even mean, that the job + has failed and that the scheduler needs to requeue the job for further + processing. + """ + raise NotImplementedError + + +class CondorJobScheduler(JobScheduler): """ Goal of the htcondor job scheduler is to have a scheduler that somehow mimics how htcondor does schedule jobs. @@ -26,7 +61,7 @@ class CondorJobScheduler(object): :return: """ - def __init__(self, job_queue): + def __init__(self, job_queue: Queue): self._stream_queue = job_queue self.drone_cluster = [] self.interval = 60 @@ -35,15 +70,15 @@ def __init__(self, job_queue): self._processing = Resources(jobs=0) @property - def drone_list(self): + def drones(self) -> Iterator[WorkerNode]: for cluster in self.drone_cluster: for drone in cluster: yield drone - def register_drone(self, drone: Drone): + def register_drone(self, drone: WorkerNode): self._add_drone(drone) - def unregister_drone(self, drone: Drone): + def unregister_drone(self, drone: WorkerNode): for cluster in self.drone_cluster: try: cluster.remove(drone) @@ -53,7 +88,7 @@ def unregister_drone(self, drone: Drone): if len(cluster) == 0: self.drone_cluster.remove(cluster) - def _add_drone(self, drone: Drone, drone_resources: Dict = None): + def _add_drone(self, drone: WorkerNode, drone_resources: Dict = None): minimum_distance_cluster = None distance = float("Inf") if len(self.drone_cluster) > 0: @@ -62,13 +97,13 @@ def _add_drone(self, drone: Drone, drone_resources: Dict = None): for key in {*cluster[0].pool_resources, *drone.pool_resources}: if drone_resources: current_distance += abs( - cluster[0].theoretical_available_resources.get(key, 0) + cluster[0].unallocated_resources.get(key, 0) - drone_resources.get(key, 0) ) else: current_distance += abs( - cluster[0].theoretical_available_resources.get(key, 0) - - drone.theoretical_available_resources.get(key, 0) + cluster[0].unallocated_resources.get(key, 0) + - drone.unallocated_resources.get(key, 0) ) if current_distance < distance: minimum_distance_cluster = cluster @@ -80,7 +115,7 @@ def _add_drone(self, drone: Drone, drone_resources: Dict = None): else: self.drone_cluster.append([drone]) - def update_drone(self, drone: Drone): + def update_drone(self, drone: WorkerNode): self.unregister_drone(drone) self._add_drone(drone) @@ -95,7 +130,7 @@ async def run(self): self.job_queue.remove(job) await sampling_required.put(self.job_queue) self.unregister_drone(best_match) - left_resources = best_match.theoretical_available_resources + left_resources = best_match.unallocated_resources left_resources = { key: value - job.resources.get(key, 0) for key, value in left_resources.items() @@ -117,18 +152,18 @@ async def _collect_jobs(self): await sampling_required.put(self.job_queue) self._collecting = False - async def job_finished(self, job): + async def job_finished(self, job: Job): if job.successful: await self._processing.decrease(jobs=1) else: await self._stream_queue.put(job) - def _schedule_job(self, job) -> Drone: + def _schedule_job(self, job: Job) -> Optional[WorkerNode]: priorities = {} for cluster in self.drone_cluster: drone = cluster[0] cost = 0 - resources = drone.theoretical_available_resources + resources = drone.unallocated_resources for resource_type in job.resources: if resources.get(resource_type, 0) < job.resources[resource_type]: # Inf for all job resources that a drone does not support diff --git a/lapis/scheduler/classad.py b/lapis/scheduler/classad.py new file mode 100644 index 0000000..3cc698b --- /dev/null +++ b/lapis/scheduler/classad.py @@ -0,0 +1,689 @@ +from abc import abstractmethod +import random +from typing import Any, Dict, Generic, Iterator, List, NamedTuple, Set, Tuple, TypeVar +from weakref import WeakKeyDictionary + +from sortedcontainers import SortedDict + +from classad import parse +from classad._primitives import HTCInt, Undefined +from classad._expression import ClassAd +from classad._base_expression import Expression +from classad._functions import quantize +from usim import Scope, interval, Resources + +from lapis.workernode import WorkerNode +from lapis.job import Job +from lapis.monitor.core import sampling_required +from lapis.scheduler.base import JobScheduler, JobQueue + +quantization_defaults = { + "memory": HTCInt(128 * 1024 * 1024), + "disk": HTCInt(1024 * 1024), + "cores": HTCInt(1), +} + +# ClassAd attributes are not case sensitive +machine_ad_defaults = """ +requirements = target.requestcpus <= my.cpus +""".strip() + +job_ad_defaults = """ +requirements = my.requestcpus <= target.cpus && my.requestmemory <= target.memory +""" + +T = TypeVar("T") +DJ = TypeVar("DJ", WorkerNode, Job) + + +class WrappedClassAd(ClassAd, Generic[DJ]): + """ + Combines the original job/drone object and the associated ClassAd. + """ + + __slots__ = "_wrapped", "_temp" + + def __init__(self, classad: ClassAd, wrapped: DJ): + """ + Initialization for wrapped ClassAd + + :param classad: the wrapped objects ClassAd description + :param wrapped: wrapped object, either job or drone + """ + super(WrappedClassAd, self).__init__() + self._wrapped = wrapped + self._data = classad._data + self._temp = {} + + def empty(self): + """ + Only relevant for wrapped drones to determine whether there are no more + resources left on a drone. + + :return: true if no CPU cores are available, false otherwise + """ + try: + return self._temp["cores"] < 1 + except KeyError: + return self._wrapped.unallocated_resources["cores"] < 1 + + def __getitem__(self, item): + """ + This method is used when evaluating classad expressions. + + :param item: name of a quantity in the classad expression + :return: current value of this item + """ + + def access_wrapped(name, requested=True): + """ + Extracts the wrapped object's current quantity of a certain resource ( + cores, memory, disk) + + :param name: name of the resource that is to be accessed + :param requested: false if name is a resource of the drone, true if name + is a resource requested by a job + :return: value of respective resource + """ + if isinstance(self._wrapped, WorkerNode): + return self._wrapped.unallocated_resources[name] + if requested: + return self._wrapped.resources[name] + return self._wrapped.used_resources[name] + + if "target" not in item: + if "requestcpus" == item: + return access_wrapped("cores", requested=True) + elif "requestmemory" == item: + return (1 / 1024 / 1024) * access_wrapped("memory", requested=True) + elif "requestdisk" == item: + return (1 / 1024) * access_wrapped("disk", requested=True) + elif "requestwalltime" == item: + return self._wrapped.requested_walltime + elif "cpus" == item: + try: + return self._temp["cores"] + except KeyError: + return access_wrapped("cores", requested=False) + elif "memory" == item: + try: + return (1 / 1000 / 1000) * self._temp["memory"] + except KeyError: + return (1 / 1000 / 1000) * access_wrapped("memory", requested=False) + elif "disk" == item: + try: + return (1 / 1024) * self._temp["disk"] + except KeyError: + + return (1 / 1024) * access_wrapped("disk", requested=False) + + return super(WrappedClassAd, self).__getitem__(item) + + def clear_temporary_resources(self): + self._temp.clear() + + def __repr__(self): + return f"<{self.__class__.__name__}>: {self._wrapped}" + + def __eq__(self, other): + return super().__eq__(other) and self._wrapped == other._wrapped + + def __hash__(self): + return id(self._wrapped) + + +class Cluster(List[WrappedClassAd[DJ]], Generic[DJ]): + pass + + +class Bucket(List[Cluster[DJ]], Generic[DJ]): + pass + + +class NoMatch(Exception): + """A job could not be matched to any drone""" + + +class RankedClusterKey(NamedTuple): + rank: float + key: Tuple[float, ...] + + +RC = TypeVar("RC", bound="RankedClusters") + + +class RankedClusters(Generic[DJ]): + """Automatically cluster drones by rank""" + + @abstractmethod + def __init__(self, quantization: Dict[str, HTCInt], ranking: Expression): + raise NotImplementedError + + @abstractmethod + def empty(self) -> bool: + """Whether there are no resources available""" + raise NotImplementedError + + @abstractmethod + def copy(self: "RankedAutoClusters[DJ]") -> "RankedAutoClusters[DJ]": + """Copy the entire ranked auto clusters""" + raise NotImplementedError + + @abstractmethod + def add(self, item: WrappedClassAd[DJ]) -> None: + """Add a new item""" + raise NotImplementedError + + @abstractmethod + def remove(self, item: WrappedClassAd[DJ]) -> None: + """Remove an existing item""" + raise NotImplementedError + + def update(self, item) -> None: + """Update an existing item with its current state""" + self.remove(item) + self.add(item) + + @abstractmethod + def clusters(self) -> Iterator[Set[WrappedClassAd[DJ]]]: + raise NotImplementedError + + @abstractmethod + def items(self) -> Iterator[Tuple[Any, Set[WrappedClassAd[DJ]]]]: + raise NotImplementedError + + @abstractmethod + def cluster_groups(self) -> Iterator[List[Set[WrappedClassAd[WorkerNode]]]]: + """Group autoclusters by PreJobRank""" + raise NotImplementedError + + +class RankedAutoClusters(RankedClusters[DJ]): + """Automatically cluster similar jobs or drones""" + + def __init__(self, quantization: Dict[str, HTCInt], ranking: Expression): + """ + :param quantization: factors to convert resources into HTCondor scaling + :param ranking: prejobrank expression + """ + self._quantization = quantization + self._ranking = ranking + self._clusters: Dict[RankedClusterKey, Set[WrappedClassAd[DJ]]] = SortedDict() + self._inverse: Dict[WrappedClassAd[DJ], RankedClusterKey] = {} + + def empty(self) -> bool: + """ + Checks whether all drones in the RankedCluster are empty and currently not + running any jobs. + + :return: + """ + for drones in self._clusters.values(): + if not next(iter(drones)).empty(): + return False + return True + + def copy(self) -> "RankedAutoClusters[DJ]": + clone = type(self)(quantization=self._quantization, ranking=self._ranking) + clone._clusters = SortedDict( + (key, value.copy()) for key, value in self._clusters.items() + ) + clone._inverse = self._inverse.copy() + return clone + + def add(self, item: WrappedClassAd[DJ]): + """ + Add a new wrapped item, usually a drone, to the RankedAutoCluster. + Unless the item is already contained, the item's key is generated and it is + sorted in into the clusters accordingly. If there are already items with the + same key, the new item is added to the existing cluster. If not, + a new cluster is created. + + :param item: + :return: + """ + if item in self._inverse: + raise ValueError(f"{item!r} already stored; use `.update(item)` instead") + item_key = self._clustering_key(item) + try: + self._clusters[item_key].add(item) + except KeyError: + self._clusters[item_key] = {item} + self._inverse[item] = item_key + + def remove(self, item: WrappedClassAd[DJ]): + """ + Removes the item. + + :param item: + :return: + """ + item_key = self._inverse.pop(item) + cluster = self._clusters[item_key] + cluster.remove(item) + if not cluster: + del self._clusters[item_key] + + def _clustering_key(self, item: WrappedClassAd[DJ]): + """ + Calculates an item's clustering key based on the specified ranking (in my use + case the prejobrank) and the item's available resource. The resulting key's + structure is (prejobrank value, (available cpus, available memory, available + disk space)). The clustering key is negative as the SortedDict sorts its entries + from low keys to high keys. + + :param item: drone for which the clustering key is calculated. + :return: (prejobrank value, (available cpus, available memory, available + disk space)) + """ + # TODO: assert that order is consistent + quantization = self._quantization + return RankedClusterKey( + rank=-1.0 * self._ranking.evaluate(my=item), + key=tuple( + int(quantize(item[key], quantization.get(key, 1))) + for key in ("cpus", "memory", "disk") + ), + ) + + def clusters(self) -> Iterator[Set[WrappedClassAd[DJ]]]: + """ + :return: iterator of all clusters + """ + return iter(self._clusters.values()) + + def items(self) -> Iterator[Tuple[RankedClusterKey, Set[WrappedClassAd[DJ]]]]: + """ + :return: iterator of all clusters and corresponding keys + """ + return iter(self._clusters.items()) + + def cluster_groups(self) -> Iterator[List[Set[WrappedClassAd[WorkerNode]]]]: + """ + Sort clusters by the ranking key and then by the amount of available + resources into nested lists of sets. + + :return: + """ + group = [] + current_rank = None + for ranked_key, drones in self._clusters.items(): + if next(iter(drones)).empty(): + continue + if ranked_key.rank != current_rank: + current_rank = ranked_key.rank + if group: + yield group + group = [] + group.append(drones) + if group: + yield group + + +class RankedNonClusters(RankedClusters[DJ]): + """Automatically cluster jobs or drones by rank only""" + + def __init__(self, quantization: Dict[str, HTCInt], ranking: Expression): + self._quantization = quantization + self._ranking = ranking + self._clusters: Dict[float, Set[WrappedClassAd[DJ]]] = SortedDict() + self._inverse: Dict[WrappedClassAd[DJ], float] = {} + + def empty(self) -> bool: + for drones in self._clusters.values(): + for drone in drones: + if not drone.empty(): + return False + return True + + def copy(self) -> "RankedNonClusters[DJ]": + clone = type(self)(quantization=self._quantization, ranking=self._ranking) + clone._clusters = SortedDict( + (key, value.copy()) for key, value in self._clusters.items() + ) + clone._inverse = self._inverse.copy() + return clone + + def add(self, item: WrappedClassAd[DJ]): + if item in self._inverse: + raise ValueError(f"{item!r} already stored; use `.update(item)` instead") + item_key = self._clustering_key(item) + try: + self._clusters[item_key].add(item) + except KeyError: + self._clusters[item_key] = {item} + self._inverse[item] = item_key + + def remove(self, item: WrappedClassAd[DJ]): + item_key = self._inverse.pop(item) + cluster = self._clusters[item_key] + cluster.remove(item) + if not cluster: + del self._clusters[item_key] + + def update(self, item): + self.remove(item) + self.add(item) + + def _clustering_key(self, item: WrappedClassAd[DJ]): + """ + For RankNonClusters there is only one clustering key, the objects defined + ranking. The clustering key is negative as the SortedDict sorts its entries + from low keys to high keys. + """ + return -1.0 * self._ranking.evaluate(my=item) + + def clusters(self) -> Iterator[Set[WrappedClassAd[DJ]]]: + return iter(self._clusters.values()) + + def items(self) -> Iterator[Tuple[float, Set[WrappedClassAd[DJ]]]]: + return iter(self._clusters.items()) + + def cluster_groups(self) -> Iterator[List[Set[WrappedClassAd[WorkerNode]]]]: + """ + Sorts cluster by the ranking key. As there is no autoclustering, every drone + is in a dedicated set and drones of the same ranking are combined into a list. + These lists are then sorted by increasing ranking. + + :return: iterator of the lists containing drones with identical key + """ + for _ranked_key, drones in self._clusters.items(): + yield [{item} for item in drones] + + +class CondorClassadJobScheduler(JobScheduler): + """ + Goal of the htcondor job scheduler is to have a scheduler that somehow + mimics how htcondor does schedule jobs. + Htcondor does scheduling based on a priority queue. The priorities itself + are managed by operators of htcondor. + So different instances can apparently behave very different. + In this case a priority queue that sorts job slots + by increasing cost is built. The scheduler checks if a job either + exactly fits a slot or if it does fit into it several times. The cost for + putting a job at a given slot is given by the amount of resources that + might remain unallocated. + """ + + def __init__( + self, + job_queue, + machine_ad: str = machine_ad_defaults, + job_ad: str = job_ad_defaults, + pre_job_rank: str = "0", + interval: float = 60, + ): + """ + Initializes the CondorClassadJobScheduler + + :param job_queue: queue of jobs that are scheduled in the following simulation + :param machine_ad: ClassAd that is used with every drone + :param job_ad: ClassAd that is used with every job + :param pre_job_rank: ClassAd attribute that all drones are sorted by + :param interval: time between scheduling cycles + """ + self._stream_queue = job_queue + self._drones: RankedClusters[WorkerNode] = RankedNonClusters( + quantization=quantization_defaults, ranking=parse(pre_job_rank) + ) + self.interval = interval + self.job_queue = JobQueue() + self._collecting = True + self._processing = Resources(jobs=0) + + # temporary solution + self._wrapped_classads = WeakKeyDictionary() + self._machine_classad = parse(machine_ad) + self._job_classad = parse(job_ad) + + @property + def drone_list(self) -> Iterator[WorkerNode]: + """ + Takes an iterator over the WrappedClassAd objects of drones known to the + scheduler, extracts the drones and returns an iterator over the drone objects. + + :return: + """ + for cluster in self._drones.clusters(): + for drone in cluster: + yield drone._wrapped + + def register_drone(self, drone: WorkerNode): + """ + Provides the drones with the drone ClassAd, combines both into one object and + adds the resulting WrappedClassAd object to the drones known to the scheduler as + well as the dictionary containing all WrappedClassAd objects the scheduler + works with. + + :param drone: + """ + wrapped_drone = WrappedClassAd(classad=self._machine_classad, wrapped=drone) + self._drones.add(wrapped_drone) + self._wrapped_classads[drone] = wrapped_drone + + def unregister_drone(self, drone: WorkerNode): + """ + Remove a drone's representation from the scheduler's scope. + + :param drone: + :return: + """ + drone_wrapper = self._wrapped_classads[drone] + self._drones.remove(drone_wrapper) + + def update_drone(self, drone: WorkerNode): + """ + Update a drone's representation in the scheduler scope. + + :param drone: + :return: + """ + drone_wrapper = self._wrapped_classads[drone] + self._drones.update(drone_wrapper) + + async def run(self): + """ + Runs the scheduler's functionality. One executed, the scheduler starts up and + begins to add the jobs that are + + :return: + """ + async with Scope() as scope: + scope.do(self._collect_jobs()) + async for _ in interval(self.interval): + await self._schedule_jobs() + if ( + not self._collecting + and not self.job_queue + and self._processing.levels.jobs == 0 + ): + break + + @staticmethod + def _match_job( + job: ClassAd, pre_job_clusters: Iterator[List[Set[WrappedClassAd[WorkerNode]]]] + ): + """ + Tries to find a match for the transferred job among the available drones. + + :param job: job to match + :param pre_job_clusters: list of clusters of wrapped drones that are + presorted by a clustering mechanism of RankedAutoClusters/RankedNonClusters + that mimics the HTCondor NEGOTIATOR_PRE_JOB_RANK, short prejobrank. The + clusters contain drones that are considered to be equivalent with respect to all + Requirements and Ranks + that are used during the matchmaking process. This mimics the Autoclustering + functionality of HTCondor. + [[highest prejobrank {autocluster}, {autocluster}], ..., [lowest prejobrank { + autocluster}, {autocluster}] + :return: drone that is the best match for the job + + The matching is performed in several steps: + 1. The job's requirements are evaluted and only drones that meet them are + considered further. A drone of every autocluster is extracted from + pre_job_clusters and if it meets the job's requirements it is not removed + from pre_job_clusters. + 2. The autoclusters that are equivalent with respect to the prejobrank are + then sorted by the job's rank expression. The resulting format of + pre_job_clusters is + [[(highest prejobrank, highest jobrank) {autocluster} {autocluster}, + ..., (highest prejobrank, lowest jobrank) {autocluster}], ...] + 3. The resulting pre_job_clusters are then iterated and the drone with the + highest (prejobrank, jobrank) whose requirements are also compatible with the + job is returned as best match. + """ + + def debug_evaluate(expr, my, target=None): + """ + Reimplementation of the classad packages evaluate function. Having it + here enables developers to inspect the ClassAd evaluation process more + closely and to add debug output if necessary. + + :param expr: + :param my: + :param target: + :return: + """ + if type(expr) is str: + expr = my[expr] + result = expr.evaluate(my=my, target=target) + return result + + if job["Requirements"] != Undefined(): + pre_job_clusters_tmp = [] + for cluster_group in pre_job_clusters: + cluster_group_tmp = [] + for cluster in cluster_group: + if debug_evaluate( + "Requirements", my=job, target=next(iter(cluster)) + ): + cluster_group_tmp.append(cluster) + pre_job_clusters_tmp.append(cluster_group_tmp) + pre_job_clusters = pre_job_clusters_tmp + + if job["Rank"] != Undefined(): + pre_job_clusters_tmp = [] + for cluster_group in pre_job_clusters: + pre_job_clusters_tmp.append( + sorted( + cluster_group, + key=lambda cluster: ( + debug_evaluate("Rank", my=job, target=next(iter(cluster))), + random.random(), + ), + reverse=True, + ) + ) + pre_job_clusters = pre_job_clusters_tmp + + for cluster_group in pre_job_clusters: + # TODO: if we have POST_JOB_RANK, collect *all* matches of a group + for cluster in cluster_group: + for drone in cluster: + if drone["Requirements"] == Undefined() or drone.evaluate( + "Requirements", my=drone, target=job + ): + return drone + + raise NoMatch() + + async def _schedule_jobs(self): + """ + Handles the scheduling of jobs. Tried to match the jobs in the job queue to + available resources. This occurs in several steps. + 1. The list of drones known to the scheduler is copied. The copy can then be + used to keep track of the drones' available resources while matching jobs as + the jobs allocate resources on the original drones before being processed but + not during scheduling. + 2. The job in the job queue are matched to (the copied)resources iteratively. + The actual matching is performed by the `_match_job` method that returns the + most suitable drone unless no drone is compatible with the job's requirements. + If a match was found, the resources requested by the job are allocated on the + matched drone. If no resources remain unallocated after the last job's + allocation, the matching process is ended for this scheduler interval. + 3. After the job matching is finished, the matched jobs are removed from the + job queue as the index of a job in the job queue changes once a job with a + lower index is removed from the queue. + 4. The matched jobs' execution is triggered. + + """ + # Pre CachingJob Rank is the same for all jobs + # Use a copy to allow temporary "remainder after match" estimates + if self._drones.empty(): + return # early exit in case of no more available resources + pre_job_drones = self._drones.copy() + matches: List[Tuple[int, WrappedClassAd[Job], WrappedClassAd[WorkerNode]]] = [] + for queue_index, candidate_job in enumerate(self.job_queue): + try: + matched_drone = self._match_job( + candidate_job, pre_job_drones.cluster_groups() + ) + except NoMatch: + candidate_job._wrapped.failed_matches += 1 + continue + else: + matches.append((queue_index, candidate_job, matched_drone)) + for key, value in candidate_job._wrapped.resources.items(): + matched_drone._temp[key] = ( + matched_drone._temp.get( + key, + matched_drone._wrapped.unallocated_resources[key], + ) + - value + ) + pre_job_drones.update(matched_drone) + + if pre_job_drones.empty(): + break + if not matches: + return + # TODO: optimize for few matches, many matches, all matches + for queue_index, _, _ in reversed(matches): + del self.job_queue[queue_index] + for _, job, drone in matches: + drone.clear_temporary_resources() + await self._execute_job(job=job, drone=drone) + await sampling_required.put(self) + # NOTE: Is this correct? Triggers once instead of for each job + await sampling_required.put(self.job_queue) + + async def _execute_job(self, job: WrappedClassAd, drone: WrappedClassAd): + """ + Schedules a job on a drone by extracting both objects from the + respective WrappedClassAd and using the drone's scheduling functionality + + :param job: + :param drone: + """ + wrapped_job = job._wrapped + wrapped_drone = drone._wrapped + await wrapped_drone.schedule_job(wrapped_job) + + async def _collect_jobs(self): + """ + Combines jobs that are imported from the simulation's job config with a job + ClassAd and adds the resulting WrappedClassAd objects to the scheduler's job + queue. + """ + async for job in self._stream_queue: + wrapped_job = WrappedClassAd(classad=self._job_classad, wrapped=job) + self._wrapped_classads[job] = wrapped_job + self.job_queue.append(wrapped_job) + await self._processing.increase(jobs=1) + # TODO: logging happens with each job + # TODO: job queue to the outside now contains wrapped classads... + await sampling_required.put(self.job_queue) + self._collecting = False + + async def job_finished(self, job): + """ + Handles the impact of finishing jobs on the scheduler. If the job is completed + successfully, the amount of running jobs matched by the current scheduler + instance is reduced. If the job is not finished successfully, + it is resubmitted to the scheduler's job queue. + :param job: + """ + if job.successful: + await self._processing.decrease(jobs=1) + else: + self.job_queue.append(self._wrapped_classads[job]) diff --git a/lapis/simulator.py b/lapis/simulator.py index 2920202..e8857fc 100644 --- a/lapis/simulator.py +++ b/lapis/simulator.py @@ -1,10 +1,12 @@ import logging import random +import time as pytime from functools import partial from usim import run, time, until, Scope, Queue -from lapis.drone import Drone +import lapis.monitor as monitor +from lapis.workernode import WorkerNode from lapis.job import job_to_queue_scheduler from lapis.monitor.general import ( user_demand, @@ -14,7 +16,7 @@ configuration_information, job_events, ) -from lapis.monitor import Monitoring +from lapis.monitor.core import Monitoring from lapis.monitor.cobald import drone_statistics, pool_statistics @@ -54,7 +56,7 @@ def create_pools(self, pool_input, pool_reader, pool_type, controller=None): for pool in pool_reader( iterable=pool_input, pool_type=pool_type, - make_drone=partial(Drone, self.job_scheduler), + make_drone=partial(WorkerNode, self.job_scheduler), ): self.pools.append(pool) if controller: @@ -64,11 +66,12 @@ def create_scheduler(self, scheduler_type): self.job_scheduler = scheduler_type(job_queue=self.job_queue) def run(self, until=None): - print(f"running until {until}") + monitor.SIMULATION_START = pytime.time() + print(f"[lapis-{monitor.SIMULATION_START}] running until {until}") run(self._simulate(until)) async def _simulate(self, end): - print(f"Starting simulation at {time.now}") + print(f"[lapis-{monitor.SIMULATION_START}] Starting simulation at {time.now}") async with until(time == end) if end else Scope() as while_running: for pool in self.pools: while_running.do(pool.run(), volatile=True) @@ -79,7 +82,9 @@ async def _simulate(self, end): while_running.do(controller.run(), volatile=True) while_running.do(self.monitoring.run(), volatile=True) self.duration = time.now - print(f"Finished simulation at {self.duration}") + print( + f"[lapis-{monitor.SIMULATION_START}] Finished simulation at {self.duration}" + ) async def _queue_jobs(self, job_input, job_reader): await job_to_queue_scheduler( diff --git a/lapis/drone.py b/lapis/workernode.py similarity index 68% rename from lapis/drone.py rename to lapis/workernode.py index 48142c6..a1a7d39 100644 --- a/lapis/drone.py +++ b/lapis/workernode.py @@ -1,3 +1,5 @@ +from typing import Dict, List, Optional + from cobald import interfaces from usim import time, Scope, instant, Capacities, ResourcesUnavailable, Queue @@ -8,25 +10,31 @@ class ResourcesExceeded(Exception): ... -class Drone(interfaces.Pool): +class WorkerNode(interfaces.Pool): def __init__( self, scheduler, - pool_resources: dict, + pool_resources: Dict[str, float], scheduling_duration: float, - ignore_resources: list = None, + ignore_resources: List[str] = None, ): """ :param scheduler: :param pool_resources: :param scheduling_duration: """ - super(Drone, self).__init__() + super(WorkerNode, self).__init__() self.scheduler = scheduler + """scheduler that assigns jobs to the workernode""" self.pool_resources = pool_resources + """dict stating the workernode's resources""" self.resources = Capacities(**pool_resources) + """available resources, based on the amount of resources requested by + jobs running on the workernode """ # shadowing requested resources to determine jobs to be killed self.used_resources = Capacities(**pool_resources) + """available resources, based on the amount of resources actually used by + jobs running on the workernode""" if ignore_resources: self._valid_resource_keys = [ resource @@ -36,22 +44,43 @@ def __init__( else: self._valid_resource_keys = self.pool_resources.keys() self.scheduling_duration = scheduling_duration + """amount of time that passes between the drone's + start up and it's registration at the scheduler""" self._supply = 0 self.jobs = 0 - self._allocation = None - self._utilisation = None + """number of jobs running on the drone""" + self._allocation: Optional[float] = None + self._utilisation: Optional[float] = None self._job_queue = Queue() @property - def theoretical_available_resources(self): + def unallocated_resources(self) -> Dict[str, float]: + """ + Returns the amount of resources of the drone that were available if all jobs + used exactly the amount of resources they requested + + :return: dictionary of theoretically available resources + """ return dict(self.resources.levels) @property - def available_resources(self): + def available_resources(self) -> Dict[str, float]: + """ + Returns the amount of resources of the drone that are available based on the + amount of resources the running jobs actually use. + + :return: dictionary of available resources + """ return dict(self.used_resources.levels) async def run(self): - from lapis.monitor import sampling_required + """ + Handles the drone's activity during simulation. Upon execution the drone + registers itself at the scheduler and once jobs are scheduled to the drone's + job queue, these jobs are executed. Starting jobs via a job queue was + introduced to avoid errors in resource allocation and monitoring. + """ + from lapis.monitor.core import sampling_required await (time + self.scheduling_duration) self._supply = 1 @@ -96,7 +125,8 @@ def _init_allocation_and_utilisation(self): self._utilisation = min(resources) async def shutdown(self): - from lapis.monitor import sampling_required + """Upon shutdown, the drone unregisters from the scheduler.""" + from lapis.monitor.core import sampling_required self._supply = 0 self.scheduler.unregister_drone(self) @@ -104,6 +134,13 @@ async def shutdown(self): await (time + 1) async def schedule_job(self, job: Job, kill: bool = False): + """ + A job is scheduled to a drone by putting it in the drone's job queue. + + :param job: job that was matched to the drone + :param kill: flag, if true jobs can be killed if they use more resources + than they requested + """ await self._job_queue.put((job, kill)) async def _run_job(self, job: Job, kill: bool): @@ -119,7 +156,7 @@ async def _run_job(self, job: Job, kill: bool): """ job.drone = self async with Scope() as scope: - from lapis.monitor import sampling_required + from lapis.monitor.core import sampling_required self._utilisation = self._allocation = None diff --git a/lapis_tests/__init__.py b/lapis_tests/__init__.py index 722b3a2..b353281 100644 --- a/lapis_tests/__init__.py +++ b/lapis_tests/__init__.py @@ -3,7 +3,7 @@ from usim import run -from lapis.drone import Drone +from lapis.workernode import WorkerNode def via_usim(test_case: Callable[..., Coroutine]): @@ -33,15 +33,15 @@ def run_test(*args, **kwargs): class DummyScheduler: @staticmethod - def register_drone(drone: Drone): + def register_drone(drone: WorkerNode): pass @staticmethod - def unregister_drone(drone: Drone): + def unregister_drone(drone: WorkerNode): pass @staticmethod - def update_drone(drone: Drone): + def update_drone(drone: WorkerNode): pass diff --git a/lapis_tests/job_io/test_htcondor.py b/lapis_tests/job_io/test_htcondor.py index d86e6c3..0fad27f 100644 --- a/lapis_tests/job_io/test_htcondor.py +++ b/lapis_tests/job_io/test_htcondor.py @@ -1,5 +1,4 @@ import os -import json from lapis.job_io.htcondor import htcondor_job_reader @@ -19,29 +18,3 @@ def test_simple_read(self): # ensure that one job was removed by importer (wrong walltime given) lines = sum(1 for _ in input_file) assert jobs == (lines - 2) - - def test_read_with_inputfiles(self): - with open( - os.path.join( - os.path.dirname(__file__), "..", "data", "job_list_minimal.json" - ) - ) as input_file: - jobs = 0 - for job in htcondor_job_reader(input_file): - assert job is not None - jobs += 1 - if "inputfiles" in job.resources.keys(): - assert "filesize" in job.resources["inputfiles"].keys() - if "inputfiles" in job.used_resources.keys(): - assert "usedsize" in job.used_resources["inputfiles"].keys() - - assert jobs > 0 - - with open( - os.path.join( - os.path.dirname(__file__), "..", "data", "job_list_minimal.json" - ) - ) as input_file: - readout = json.load(input_file) - lines = sum(1 for _ in readout) - assert jobs == (lines - 1) diff --git a/lapis_tests/test_job.py b/lapis_tests/test_job.py index 181bb1a..6ba9f10 100644 --- a/lapis_tests/test_job.py +++ b/lapis_tests/test_job.py @@ -1,7 +1,7 @@ import pytest from usim import Scope, time -from lapis.drone import Drone +from lapis.workernode import WorkerNode from lapis.job import Job from lapis_tests import via_usim, DummyScheduler, DummyDrone @@ -43,7 +43,7 @@ async def test_job_in_drone(self): resources={"walltime": 50, "cores": 1, "memory": 1}, used_resources={"walltime": 10, "cores": 1, "memory": 1}, ) - drone = Drone( + drone = WorkerNode( scheduler=scheduler, pool_resources={"cores": 1, "memory": 1}, scheduling_duration=0, @@ -62,7 +62,7 @@ async def test_nonmatching_job_in_drone(self): resources={"walltime": 50, "cores": 2, "memory": 1}, used_resources={"walltime": 10, "cores": 1, "memory": 1}, ) - drone = Drone( + drone = WorkerNode( scheduler=scheduler, pool_resources={"cores": 1, "memory": 1}, scheduling_duration=0, @@ -85,7 +85,7 @@ async def test_two_nonmatching_jobs(self): resources={"walltime": 50, "cores": 1, "memory": 1}, used_resources={"walltime": 10, "cores": 1, "memory": 1}, ) - drone = Drone( + drone = WorkerNode( scheduler=scheduler, pool_resources={"cores": 1, "memory": 1}, scheduling_duration=0, @@ -111,7 +111,7 @@ async def test_two_matching_jobs(self): resources={"walltime": 50, "cores": 1, "memory": 1}, used_resources={"walltime": 10, "cores": 1, "memory": 1}, ) - drone = Drone( + drone = WorkerNode( scheduler=scheduler, pool_resources={"cores": 2, "memory": 2}, scheduling_duration=0, diff --git a/lapis_tests/test_simulator.py b/lapis_tests/test_simulator.py index 4875d6b..1b9fa5b 100644 --- a/lapis_tests/test_simulator.py +++ b/lapis_tests/test_simulator.py @@ -3,7 +3,7 @@ from lapis.job_io.htcondor import htcondor_job_reader from lapis.pool import StaticPool from lapis.pool_io.htcondor import htcondor_pool_reader -from lapis.scheduler import CondorJobScheduler +from lapis.scheduler.base import CondorJobScheduler from lapis.simulator import Simulator diff --git a/lapis_tests/utility/test_monitor.py b/lapis_tests/utility/test_monitor.py index 533e9eb..53e73d1 100644 --- a/lapis_tests/utility/test_monitor.py +++ b/lapis_tests/utility/test_monitor.py @@ -10,7 +10,7 @@ from . import make_test_logger from lapis.monitor.general import resource_statistics -from lapis.monitor import SimulationTimeFilter, Monitoring +from lapis.monitor.core import SimulationTimeFilter, Monitoring def parse_line_protocol(literal: str): diff --git a/pyproject.toml b/pyproject.toml index 44d01a8..65cd53b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ pre-commit = "^2.9.3" python = "^3.6.1" cobald = "^0.12" usim = "^0.4" +classad = "^0.4.0" click = "^7.1" Sphinx = { version = "^3.3.1", optional = true }