diff --git a/lapis/controller.py b/lapis/controller.py index 32681bf..6089437 100644 --- a/lapis/controller.py +++ b/lapis/controller.py @@ -5,8 +5,8 @@ class SimulatedLinearController(LinearController): - def __init__(self, target: Pool, low_utilisation=0.5, high_allocation=0.5, - rate=1, interval=1): + def __init__(self, target: Pool, low_utilisation: float = 0.5, + high_allocation: float = 0.5, rate: float = 1, interval: float = 1): super(SimulatedLinearController, self).__init__( target, low_utilisation, high_allocation, rate, interval) @@ -17,9 +17,9 @@ async def run(self): class SimulatedRelativeSupplyController(RelativeSupplyController): - def __init__(self, target: Pool, low_utilisation=0.5, high_allocation=0.5, - low_scale=0.9, high_scale=1.1, - interval=1): + def __init__(self, target: Pool, low_utilisation: float = 0.5, + high_allocation: float = 0.5, low_scale: float = 0.9, + high_scale: float = 1.1, interval: float = 1): super(SimulatedRelativeSupplyController, self).__init__( target=target, low_utilisation=low_utilisation, high_allocation=high_allocation, low_scale=low_scale, @@ -32,13 +32,13 @@ async def run(self): class SimulatedCostController(SimulatedLinearController): - def __init__(self, target: Pool, low_utilisation=0.5, high_allocation=0.5, - rate=1, interval=1): + def __init__(self, target: Pool, low_utilisation: float = 0.5, + high_allocation: float = 0.5, rate: float = 1, interval: float = 1): self.current_cost = 1 super(SimulatedCostController, self).__init__( target, low_utilisation, high_allocation, rate, interval) - def regulate(self, interval): + def regulate(self, interval: float): allocation = 0 for drone in self.target.drones: allocation += drone.allocation diff --git a/lapis/cost.py b/lapis/cost.py deleted file mode 100644 index f391ff7..0000000 --- a/lapis/cost.py +++ /dev/null @@ -1,22 +0,0 @@ -def cobald_cost(simulator): - result = len(list(simulator.job_scheduler.drone_list)) - for drone in simulator.job_scheduler.drone_list: - result += 1 - tmp = 0 - for resource_key in drone.pool_resources: - tmp += drone.resources[resource_key] / drone.pool_resources[resource_key] - tmp /= len(drone.pool_resources) - result -= tmp - return result - - -def local_cobald_cost(pool): - result = 0 - for drone in pool.drones: - result += 1 - tmp = 0 - for resource_key in pool.resources: - tmp += drone.resources[resource_key] / pool.resources[resource_key] - tmp /= len(pool.resources) - result -= tmp - return result diff --git a/lapis/drone.py b/lapis/drone.py index 3b54cb1..e94506d 100644 --- a/lapis/drone.py +++ b/lapis/drone.py @@ -1,6 +1,7 @@ import logging from cobald import interfaces +from typing import Dict from usim import time, Scope, instant from usim.basics import Capacities, ResourcesUnavailable @@ -12,9 +13,8 @@ class ResourcesExceeded(Exception): class Drone(interfaces.Pool): - def __init__(self, scheduler, pool_resources: dict, - scheduling_duration: float, - ignore_resources: list = None): + def __init__(self, scheduler, pool_resources: Dict[str, float], + scheduling_duration: float, ignore_resources: list = None): """ :param scheduler: :param pool_resources: @@ -44,11 +44,11 @@ def __init__(self, scheduler, pool_resources: dict, self._utilisation = None @property - def theoretical_available_resources(self): + def theoretical_available_resources(self) -> Dict[str, float]: return dict(self.resources.levels) @property - def available_resources(self): + def available_resources(self) -> Dict[str, float]: return dict(self.used_resources.levels) async def run(self): diff --git a/lapis/job.py b/lapis/job.py index 758fd0c..6b678d2 100644 --- a/lapis/job.py +++ b/lapis/job.py @@ -1,7 +1,8 @@ import logging -from usim import time -from usim import TaskCancelled +from typing import Dict +from usim import time, TaskCancelled +from usim.basics import Queue from lapis.monitor import sampling_required @@ -10,8 +11,8 @@ class Job(object): __slots__ = ("resources", "used_resources", "walltime", "requested_walltime", "queue_date", "in_queue_since", "in_queue_until", "_name", "_success") - def __init__(self, resources: dict, used_resources: dict, in_queue_since: float = 0, - queue_date: float = 0, name: str = None): + def __init__(self, resources: Dict[str, float], used_resources: Dict[str, float], + in_queue_since: float = 0, queue_date: float = 0, name: str = None): """ Definition of a job that uses a specified amount of resources `used_resources` over a given amount of time, `walltime`. A job is described by its user @@ -90,7 +91,7 @@ def __repr__(self): return '<%s: %s>' % (self.__class__.__name__, self._name or id(self)) -async def job_to_queue_scheduler(job_generator, job_queue): +async def job_to_queue_scheduler(job_generator, job_queue: Queue): base_date = None for job in job_generator: if base_date is None: diff --git a/lapis/job_io/htcondor.py b/lapis/job_io/htcondor.py index 1115f53..94a4084 100644 --- a/lapis/job_io/htcondor.py +++ b/lapis/job_io/htcondor.py @@ -1,21 +1,24 @@ import csv import logging -from lapis.job import Job +from typing import Dict +from lapis.job import Job -def htcondor_job_reader(iterable, resource_name_mapping={ # noqa: B006 +default_resource_name_mapping: Dict[str, str] = { "cores": "RequestCpus", "walltime": "RequestWalltime", # s "memory": "RequestMemory", # MiB "disk": "RequestDisk" # KiB -}, used_resource_name_mapping={ # noqa: B006 +} +default_used_resource_name_mapping: Dict[str, str] = { "queuetime": "QDate", "walltime": "RemoteWallClockTime", # s "cores": "Number of Allocated Processors", "memory": "MemoryUsage", # MB "disk": "DiskUsage_RAW" # KiB -}, unit_conversion_mapping={ # noqa: B006 +} +default_unit_conversion_mapping: Dict[str, float] = { "RequestCpus": 1, "RequestWalltime": 1, "RequestMemory": 1.024 / 1024, @@ -25,7 +28,18 @@ def htcondor_job_reader(iterable, resource_name_mapping={ # noqa: B006 "Number of Allocated Processors": 1, "MemoryUsage": 1 / 1024, "DiskUsage_RAW": 1.024 / 1024 / 1024 -}): +} + + +def htcondor_job_reader(iterable, resource_name_mapping: Dict[str, str] = None, + used_resource_name_mapping: Dict[str, str] = None, + unit_conversion_mapping: Dict[str, float] = None): + if resource_name_mapping is None: + resource_name_mapping = default_resource_name_mapping + if used_resource_name_mapping is None: + used_resource_name_mapping = default_used_resource_name_mapping + if unit_conversion_mapping is None: + unit_conversion_mapping = default_unit_conversion_mapping htcondor_reader = csv.DictReader(iterable, delimiter=' ', quotechar="'") for row in htcondor_reader: diff --git a/lapis/job_io/swf.py b/lapis/job_io/swf.py index 4d5e2d6..ee177f0 100644 --- a/lapis/job_io/swf.py +++ b/lapis/job_io/swf.py @@ -5,22 +5,36 @@ """ import csv -from lapis.job import Job +from typing import Dict +from lapis.job import Job -def swf_job_reader(iterable, resource_name_mapping={ # noqa: B006 +default_resource_name_mapping: Dict[str, str] = { "cores": "Requested Number of Processors", "walltime": "Requested Time", "memory": "Requested Memory" -}, used_resource_name_mapping={ # noqa: B006 +} +default_used_resource_name_mapping: Dict[str, str] = { "walltime": "Run Time", "cores": "Number of Allocated Processors", "memory": "Used Memory", "queuetime": "Submit Time" -}, unit_conversion_mapping={ # noqa: B006 +} +default_unit_conversion_mapping: Dict[str, float] = { "Used Memory": 1 / 1024 / 1024, "Requested Memory": 1 / 2114 / 1024 -}): +} + + +def swf_job_reader(iterable, resource_name_mapping: Dict[str, str] = None, + used_resource_name_mapping: Dict[str, str] = None, + unit_conversion_mapping: Dict[str, float] = None): + if resource_name_mapping is None: + resource_name_mapping = default_resource_name_mapping + if used_resource_name_mapping is None: + used_resource_name_mapping = default_used_resource_name_mapping + if unit_conversion_mapping is None: + unit_conversion_mapping = default_unit_conversion_mapping header = { "Job Number": 0, "Submit Time": 1, diff --git a/lapis/monitor/cobald.py b/lapis/monitor/cobald.py index 3c6c879..9f4355a 100644 --- a/lapis/monitor/cobald.py +++ b/lapis/monitor/cobald.py @@ -18,7 +18,7 @@ def drone_statistics(simulator: "Simulator") -> list: :return: list of records for logging """ results = [] - for drone in simulator.job_scheduler.drone_list: + for drone in simulator.job_scheduler.drones: results.append({ "pool_configuration": "None", "pool_type": "drone", diff --git a/lapis/monitor/general.py b/lapis/monitor/general.py index 13a417a..1f5376e 100644 --- a/lapis/monitor/general.py +++ b/lapis/monitor/general.py @@ -19,7 +19,7 @@ def resource_statistics(simulator: "Simulator") -> list: :return: list of records for logging """ results = [] - for drone in simulator.job_scheduler.drone_list: + for drone in simulator.job_scheduler.drones: resources = drone.theoretical_available_resources used_resources = drone.available_resources for resource_type in resources: @@ -80,7 +80,7 @@ def job_statistics(simulator: "Simulator") -> list: :return: list of records for logging """ result = 0 - for drone in simulator.job_scheduler.drone_list: + for drone in simulator.job_scheduler.drones: result += drone.jobs return [{ "job_count": result diff --git a/lapis/pool_io/htcondor.py b/lapis/pool_io/htcondor.py index 0dd3e90..90616f0 100644 --- a/lapis/pool_io/htcondor.py +++ b/lapis/pool_io/htcondor.py @@ -1,19 +1,24 @@ import csv from functools import partial -from typing import Callable +from typing import Callable, Dict from ..pool import Pool - -def htcondor_pool_reader(iterable, resource_name_mapping: dict = { # noqa: B006 +default_resource_name_mapping: Dict[str, str] = { "cores": "TotalSlotCPUs", "disk": "TotalSlotDisk", # MiB "memory": "TotalSlotMemory" # MiB -}, unit_conversion_mapping: dict = { # noqa: B006 +} +default_unit_conversion_mapping: Dict[str, float] = { "TotalSlotCPUs": 1, "TotalSlotDisk": 1.024 / 1024, "TotalSlotMemory": 1.024 / 1024 -}, pool_type: Callable = Pool, make_drone: Callable = None): +} + + +def htcondor_pool_reader(iterable, resource_name_mapping: Dict[str, str] = None, + unit_conversion_mapping: Dict[str, float] = None, + pool_type: Callable = Pool, make_drone: Callable = None): """ Load a pool configuration that was exported via htcondor from files or iterables @@ -21,11 +26,16 @@ def htcondor_pool_reader(iterable, resource_name_mapping: dict = { # noqa: B006 :param iterable: an iterable yielding lines of CSV, such as an open file :param resource_name_mapping: Mapping from given header names to well-defined resources in simulation + :param unit_conversion_mapping: Mapping of units conversion :param pool_type: The type of pool to be yielded :param make_drone: :return: Yields the :py:class:`Pool`s found in the given iterable """ assert make_drone + if resource_name_mapping is None: + resource_name_mapping = default_resource_name_mapping + if unit_conversion_mapping is None: + unit_conversion_mapping = default_unit_conversion_mapping reader = csv.DictReader(iterable, delimiter=' ', skipinitialspace=True) for row in reader: try: diff --git a/lapis/pool_io/machines.py b/lapis/pool_io/machines.py index d7272ac..61ca586 100644 --- a/lapis/pool_io/machines.py +++ b/lapis/pool_io/machines.py @@ -1,14 +1,17 @@ import csv from functools import partial -from typing import Callable +from typing import Callable, Dict from ..pool import Pool - -def machines_pool_reader(iterable, resource_name_mapping: dict = { # noqa: B006 +default_resource_name_mapping: Dict[str, str] = { "cores": "CPUs_per_node", "memory": "RAM_per_node_in_KB" -}, pool_type: Callable = Pool, make_drone: Callable = None): +} + + +def machines_pool_reader(iterable, resource_name_mapping: Dict[str, str] = None, + pool_type: Callable = Pool, make_drone: Callable = None): """ Load a pool configuration that was exported via htcondor from files or iterables @@ -21,6 +24,8 @@ def machines_pool_reader(iterable, resource_name_mapping: dict = { # noqa: B006 :return: Yields the :py:class:`StaticPool`s found in the given iterable """ assert make_drone + if resource_name_mapping is None: + resource_name_mapping = default_resource_name_mapping reader = csv.DictReader(iterable, delimiter=' ', skipinitialspace=True) for row in reader: yield pool_type( diff --git a/lapis/scheduler.py b/lapis/scheduler.py index de86401..4ae6aa9 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -1,4 +1,7 @@ +from typing import Optional, Generator + from usim import Scope, each, instant +from usim.basics import Queue from lapis.drone import Drone @@ -31,7 +34,7 @@ class CondorJobScheduler(object): :return: """ - def __init__(self, job_queue): + def __init__(self, job_queue: Queue): self._stream_queue = job_queue self.drone_cluster = [] self.interval = 60 @@ -39,7 +42,7 @@ def __init__(self, job_queue): self._collecting = True @property - def drone_list(self): + def drones(self) -> Generator[Drone, None, None]: for cluster in self.drone_cluster: for drone in cluster: yield drone @@ -100,7 +103,7 @@ async def _collect_jobs(self): self.job_queue.append(job) self._collecting = False - def _schedule_job(self, job) -> Drone: + def _schedule_job(self, job) -> Optional[Drone]: priorities = {} for cluster in self.drone_cluster: drone = cluster[0]