Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions lapis/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@


class SimulatedLinearController(LinearController):
def __init__(self, target: Pool, low_utilisation=0.5, high_allocation=0.5,
rate=1, interval=1):
def __init__(self, target: Pool, low_utilisation: float = 0.5,
high_allocation: float = 0.5, rate: float = 1, interval: float = 1):
super(SimulatedLinearController, self).__init__(
target, low_utilisation, high_allocation, rate, interval)

Expand All @@ -17,9 +17,9 @@ async def run(self):


class SimulatedRelativeSupplyController(RelativeSupplyController):
def __init__(self, target: Pool, low_utilisation=0.5, high_allocation=0.5,
low_scale=0.9, high_scale=1.1,
interval=1):
def __init__(self, target: Pool, low_utilisation: float = 0.5,
high_allocation: float = 0.5, low_scale: float = 0.9,
high_scale: float = 1.1, interval: float = 1):
super(SimulatedRelativeSupplyController, self).__init__(
target=target, low_utilisation=low_utilisation,
high_allocation=high_allocation, low_scale=low_scale,
Expand All @@ -32,13 +32,13 @@ async def run(self):


class SimulatedCostController(SimulatedLinearController):
def __init__(self, target: Pool, low_utilisation=0.5, high_allocation=0.5,
rate=1, interval=1):
def __init__(self, target: Pool, low_utilisation: float = 0.5,
high_allocation: float = 0.5, rate: float = 1, interval: float = 1):
self.current_cost = 1
super(SimulatedCostController, self).__init__(
target, low_utilisation, high_allocation, rate, interval)

def regulate(self, interval):
def regulate(self, interval: float):
allocation = 0
for drone in self.target.drones:
allocation += drone.allocation
Expand Down
22 changes: 0 additions & 22 deletions lapis/cost.py

This file was deleted.

10 changes: 5 additions & 5 deletions lapis/drone.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

from cobald import interfaces
from typing import Dict
from usim import time, Scope, instant
from usim.basics import Capacities, ResourcesUnavailable

Expand All @@ -12,9 +13,8 @@ class ResourcesExceeded(Exception):


class Drone(interfaces.Pool):
def __init__(self, scheduler, pool_resources: dict,
scheduling_duration: float,
ignore_resources: list = None):
def __init__(self, scheduler, pool_resources: Dict[str, float],
scheduling_duration: float, ignore_resources: list = None):
"""
:param scheduler:
:param pool_resources:
Expand Down Expand Up @@ -44,11 +44,11 @@ def __init__(self, scheduler, pool_resources: dict,
self._utilisation = None

@property
def theoretical_available_resources(self):
def theoretical_available_resources(self) -> Dict[str, float]:
return dict(self.resources.levels)

@property
def available_resources(self):
def available_resources(self) -> Dict[str, float]:
return dict(self.used_resources.levels)

async def run(self):
Expand Down
11 changes: 6 additions & 5 deletions lapis/job.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import logging

from usim import time
from usim import TaskCancelled
from typing import Dict
from usim import time, TaskCancelled
from usim.basics import Queue

from lapis.monitor import sampling_required

Expand All @@ -10,8 +11,8 @@ class Job(object):
__slots__ = ("resources", "used_resources", "walltime", "requested_walltime",
"queue_date", "in_queue_since", "in_queue_until", "_name", "_success")

def __init__(self, resources: dict, used_resources: dict, in_queue_since: float = 0,
queue_date: float = 0, name: str = None):
def __init__(self, resources: Dict[str, float], used_resources: Dict[str, float],
in_queue_since: float = 0, queue_date: float = 0, name: str = None):
"""
Definition of a job that uses a specified amount of resources `used_resources`
over a given amount of time, `walltime`. A job is described by its user
Expand Down Expand Up @@ -90,7 +91,7 @@ def __repr__(self):
return '<%s: %s>' % (self.__class__.__name__, self._name or id(self))


async def job_to_queue_scheduler(job_generator, job_queue):
async def job_to_queue_scheduler(job_generator, job_queue: Queue):
base_date = None
for job in job_generator:
if base_date is None:
Expand Down
24 changes: 19 additions & 5 deletions lapis/job_io/htcondor.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
import csv
import logging

from lapis.job import Job
from typing import Dict

from lapis.job import Job

def htcondor_job_reader(iterable, resource_name_mapping={ # noqa: B006
default_resource_name_mapping: Dict[str, str] = {
"cores": "RequestCpus",
"walltime": "RequestWalltime", # s
"memory": "RequestMemory", # MiB
"disk": "RequestDisk" # KiB
}, used_resource_name_mapping={ # noqa: B006
}
default_used_resource_name_mapping: Dict[str, str] = {
"queuetime": "QDate",
"walltime": "RemoteWallClockTime", # s
"cores": "Number of Allocated Processors",
"memory": "MemoryUsage", # MB
"disk": "DiskUsage_RAW" # KiB
}, unit_conversion_mapping={ # noqa: B006
}
default_unit_conversion_mapping: Dict[str, float] = {
"RequestCpus": 1,
"RequestWalltime": 1,
"RequestMemory": 1.024 / 1024,
Expand All @@ -25,7 +28,18 @@ def htcondor_job_reader(iterable, resource_name_mapping={ # noqa: B006
"Number of Allocated Processors": 1,
"MemoryUsage": 1 / 1024,
"DiskUsage_RAW": 1.024 / 1024 / 1024
}):
}


def htcondor_job_reader(iterable, resource_name_mapping: Dict[str, str] = None,
used_resource_name_mapping: Dict[str, str] = None,
unit_conversion_mapping: Dict[str, float] = None):
if resource_name_mapping is None:
resource_name_mapping = default_resource_name_mapping
if used_resource_name_mapping is None:
used_resource_name_mapping = default_used_resource_name_mapping
if unit_conversion_mapping is None:
unit_conversion_mapping = default_unit_conversion_mapping
htcondor_reader = csv.DictReader(iterable, delimiter=' ', quotechar="'")

for row in htcondor_reader:
Expand Down
24 changes: 19 additions & 5 deletions lapis/job_io/swf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,36 @@
"""
import csv

from lapis.job import Job
from typing import Dict

from lapis.job import Job

def swf_job_reader(iterable, resource_name_mapping={ # noqa: B006
default_resource_name_mapping: Dict[str, str] = {
"cores": "Requested Number of Processors",
"walltime": "Requested Time",
"memory": "Requested Memory"
}, used_resource_name_mapping={ # noqa: B006
}
default_used_resource_name_mapping: Dict[str, str] = {
"walltime": "Run Time",
"cores": "Number of Allocated Processors",
"memory": "Used Memory",
"queuetime": "Submit Time"
}, unit_conversion_mapping={ # noqa: B006
}
default_unit_conversion_mapping: Dict[str, float] = {
"Used Memory": 1 / 1024 / 1024,
"Requested Memory": 1 / 2114 / 1024
}):
}


def swf_job_reader(iterable, resource_name_mapping: Dict[str, str] = None,
used_resource_name_mapping: Dict[str, str] = None,
unit_conversion_mapping: Dict[str, float] = None):
if resource_name_mapping is None:
resource_name_mapping = default_resource_name_mapping
if used_resource_name_mapping is None:
used_resource_name_mapping = default_used_resource_name_mapping
if unit_conversion_mapping is None:
unit_conversion_mapping = default_unit_conversion_mapping
header = {
"Job Number": 0,
"Submit Time": 1,
Expand Down
2 changes: 1 addition & 1 deletion lapis/monitor/cobald.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def drone_statistics(simulator: "Simulator") -> list:
:return: list of records for logging
"""
results = []
for drone in simulator.job_scheduler.drone_list:
for drone in simulator.job_scheduler.drones:
results.append({
"pool_configuration": "None",
"pool_type": "drone",
Expand Down
4 changes: 2 additions & 2 deletions lapis/monitor/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def resource_statistics(simulator: "Simulator") -> list:
:return: list of records for logging
"""
results = []
for drone in simulator.job_scheduler.drone_list:
for drone in simulator.job_scheduler.drones:
resources = drone.theoretical_available_resources
used_resources = drone.available_resources
for resource_type in resources:
Expand Down Expand Up @@ -80,7 +80,7 @@ def job_statistics(simulator: "Simulator") -> list:
:return: list of records for logging
"""
result = 0
for drone in simulator.job_scheduler.drone_list:
for drone in simulator.job_scheduler.drones:
result += drone.jobs
return [{
"job_count": result
Expand Down
20 changes: 15 additions & 5 deletions lapis/pool_io/htcondor.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,41 @@
import csv
from functools import partial

from typing import Callable
from typing import Callable, Dict
from ..pool import Pool


def htcondor_pool_reader(iterable, resource_name_mapping: dict = { # noqa: B006
default_resource_name_mapping: Dict[str, str] = {
"cores": "TotalSlotCPUs",
"disk": "TotalSlotDisk", # MiB
"memory": "TotalSlotMemory" # MiB
}, unit_conversion_mapping: dict = { # noqa: B006
}
default_unit_conversion_mapping: Dict[str, float] = {
"TotalSlotCPUs": 1,
"TotalSlotDisk": 1.024 / 1024,
"TotalSlotMemory": 1.024 / 1024
}, pool_type: Callable = Pool, make_drone: Callable = None):
}


def htcondor_pool_reader(iterable, resource_name_mapping: Dict[str, str] = None,
unit_conversion_mapping: Dict[str, float] = None,
pool_type: Callable = Pool, make_drone: Callable = None):
"""
Load a pool configuration that was exported via htcondor from files or
iterables

:param iterable: an iterable yielding lines of CSV, such as an open file
:param resource_name_mapping: Mapping from given header names to well-defined
resources in simulation
:param unit_conversion_mapping: Mapping of units conversion
:param pool_type: The type of pool to be yielded
:param make_drone:
:return: Yields the :py:class:`Pool`s found in the given iterable
"""
assert make_drone
if resource_name_mapping is None:
resource_name_mapping = default_resource_name_mapping
if unit_conversion_mapping is None:
unit_conversion_mapping = default_unit_conversion_mapping
reader = csv.DictReader(iterable, delimiter=' ', skipinitialspace=True)
for row in reader:
try:
Expand Down
13 changes: 9 additions & 4 deletions lapis/pool_io/machines.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import csv
from functools import partial

from typing import Callable
from typing import Callable, Dict
from ..pool import Pool


def machines_pool_reader(iterable, resource_name_mapping: dict = { # noqa: B006
default_resource_name_mapping: Dict[str, str] = {
"cores": "CPUs_per_node",
"memory": "RAM_per_node_in_KB"
}, pool_type: Callable = Pool, make_drone: Callable = None):
}


def machines_pool_reader(iterable, resource_name_mapping: Dict[str, str] = None,
pool_type: Callable = Pool, make_drone: Callable = None):
"""
Load a pool configuration that was exported via htcondor from files or
iterables
Expand All @@ -21,6 +24,8 @@ def machines_pool_reader(iterable, resource_name_mapping: dict = { # noqa: B006
:return: Yields the :py:class:`StaticPool`s found in the given iterable
"""
assert make_drone
if resource_name_mapping is None:
resource_name_mapping = default_resource_name_mapping
reader = csv.DictReader(iterable, delimiter=' ', skipinitialspace=True)
for row in reader:
yield pool_type(
Expand Down
9 changes: 6 additions & 3 deletions lapis/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from typing import Optional, Generator

from usim import Scope, each, instant
from usim.basics import Queue

from lapis.drone import Drone

Expand Down Expand Up @@ -31,15 +34,15 @@ class CondorJobScheduler(object):
:return:
"""

def __init__(self, job_queue):
def __init__(self, job_queue: Queue):
self._stream_queue = job_queue
self.drone_cluster = []
self.interval = 60
self.job_queue = []
self._collecting = True

@property
def drone_list(self):
def drones(self) -> Generator[Drone, None, None]:
for cluster in self.drone_cluster:
for drone in cluster:
yield drone
Expand Down Expand Up @@ -100,7 +103,7 @@ async def _collect_jobs(self):
self.job_queue.append(job)
self._collecting = False

def _schedule_job(self, job) -> Drone:
def _schedule_job(self, job) -> Optional[Drone]:
priorities = {}
for cluster in self.drone_cluster:
drone = cluster[0]
Expand Down