Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ repos:
- id: end-of-file-fixer
- id: flake8
- repo: https://github.com/psf/black
rev: 19.3b0
rev: 22.3.0
hooks:
- id: black
args:
- --py36
args: ["--target-version", "py36"]
4 changes: 2 additions & 2 deletions lapis/cli/simulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
from lapis.pool_io.htcondor import htcondor_pool_reader
from lapis.job_io.swf import swf_job_reader

from lapis.scheduler import CondorJobScheduler
from lapis.scheduler.base import CondorJobScheduler
from lapis.simulator import Simulator

from lapis.monitor import (
from lapis.monitor.core import (
LoggingSocketHandler,
LoggingUDPSocketHandler,
SimulationTimeFilter,
Expand Down
33 changes: 19 additions & 14 deletions lapis/controller.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
from lapis.pool import Pool
from cobald.controller.linear import LinearController
from cobald.controller.relative_supply import RelativeSupplyController
from cobald.interfaces import Pool
from usim import time


class SimulatedLinearController(LinearController):
def __init__(
self, target: Pool, low_utilisation=0.5, high_allocation=0.5, rate=1, interval=1
self,
target: Pool,
low_utilisation: float = 0.5,
high_allocation: float = 0.5,
rate: float = 1,
interval: float = 1,
):
super(SimulatedLinearController, self).__init__(
target, low_utilisation, high_allocation, rate, interval
Expand All @@ -22,11 +27,11 @@ class SimulatedRelativeSupplyController(RelativeSupplyController):
def __init__(
self,
target: Pool,
low_utilisation=0.5,
high_allocation=0.5,
low_scale=0.9,
high_scale=1.1,
interval=1,
low_utilisation: float = 0.5,
high_allocation: float = 0.5,
low_scale: float = 0.9,
high_scale: float = 1.1,
interval: float = 1,
):
super(SimulatedRelativeSupplyController, self).__init__(
target=target,
Expand All @@ -45,14 +50,19 @@ async def run(self):

class SimulatedCostController(SimulatedLinearController):
def __init__(
self, target: Pool, low_utilisation=0.5, high_allocation=0.5, rate=1, interval=1
self,
target: Pool,
low_utilisation: float = 0.5,
high_allocation: float = 0.5,
rate: float = 1,
interval: float = 1,
):
self.current_cost = 1
super(SimulatedCostController, self).__init__(
target, low_utilisation, high_allocation, rate, interval
)

def regulate(self, interval):
def regulate(self, interval: float):
allocation = 0
for drone in self.target.drones:
allocation += drone.allocation
Expand All @@ -64,8 +74,3 @@ def regulate(self, interval):
self.target.demand = allocation
if self.current_cost > 1:
self.current_cost -= 1
# self.target.demand = allocation + self.current_cost
# else:
# if self.current_cost > 1:
# self.current_cost -= 1
# self.target.demand = allocation + self.current_cost
22 changes: 0 additions & 22 deletions lapis/cost.py

This file was deleted.

45 changes: 26 additions & 19 deletions lapis/job.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import logging
from typing import Optional, TYPE_CHECKING
from typing import Optional, TYPE_CHECKING, Dict

from usim import time
from usim import time, Queue
from usim import CancelTask

from lapis.monitor import sampling_required
from lapis.monitor.core import sampling_required

if TYPE_CHECKING:
from lapis.drone import Drone
from lapis.workernode import WorkerNode


class Job(object):
Expand All @@ -17,8 +17,6 @@ class Job(object):
"walltime",
"requested_walltime",
"queue_date",
"requested_inputfiles",
"used_inputfiles",
"in_queue_since",
"in_queue_until",
"_name",
Expand All @@ -28,12 +26,11 @@ class Job(object):

def __init__(
self,
resources: dict,
used_resources: dict,
resources: Dict[str, float],
used_resources: Dict[str, float],
in_queue_since: float = 0,
queue_date: float = 0,
name: str = None,
drone: "Drone" = None,
):
"""
Definition of a job that uses a specified amount of resources `used_resources`
Expand All @@ -47,7 +44,6 @@ def __init__(
simulation scheduler
:param queue_date: Time when job was inserted into queue in real life
:param name: Name of the job
:param drone: Drone where the job is running on
"""
self.resources = resources
self.used_resources = used_resources
Expand All @@ -59,17 +55,22 @@ def __init__(
self.used_resources[key],
)
self.resources[key] = self.used_resources[key]
self.walltime = used_resources.pop("walltime")
self.requested_walltime = resources.pop("walltime", None)
self.requested_inputfiles = resources.pop("inputfiles", None)
self.used_inputfiles = used_resources.pop("inputfiles", None)
self.walltime: float = used_resources.pop("walltime")
"""the job's runtime, in reality as well as in the simulation"""
self.requested_walltime: Optional[float] = resources.pop("walltime", None)
"""estimate of the job's walltime"""
self.queue_date = queue_date
""" point in time when the job was submitted to the simulated job queue"""
assert in_queue_since >= 0, "Queue time cannot be negative"
self.in_queue_since = in_queue_since
self.in_queue_until = None
self.drone = drone
"""Time when job was inserted into the queue of the simulation scheduler"""
self.in_queue_until: Optional[float] = None
"""point in time when the job left the job queue"""
self.drone = None
self._name = name
"""identifier of the job"""
self._success: Optional[bool] = None
"""flag indicating whether the job was completed successfully"""

@property
def name(self) -> str:
Expand All @@ -83,15 +84,15 @@ def successful(self) -> Optional[bool]:
def waiting_time(self) -> float:
"""
The time the job spent in the simulators scheduling queue. `Inf` when
the job is still waitiing.
the job is still waiting.

:return: Time in queue
"""
if self.in_queue_until is not None:
return self.in_queue_until - self.in_queue_since
return float("Inf")

async def run(self, drone: "Drone"):
async def run(self, drone: "WorkerNode"):
assert drone, "Jobs cannot run without a drone being assigned"
self.drone = drone
self.in_queue_until = time.now
Expand All @@ -115,7 +116,13 @@ def __repr__(self):
return "<%s: %s>" % (self.__class__.__name__, self._name or id(self))


async def job_to_queue_scheduler(job_generator, job_queue):
async def job_to_queue_scheduler(job_generator, job_queue: Queue):
"""
Handles reading the simulation's job input and puts the job's into the job queue

:param job_generator: reader object that yields jobs from input
:param job_queue: queue the jobs are added to
"""
base_date = None
for job in job_generator:
if base_date is None:
Expand Down
77 changes: 37 additions & 40 deletions lapis/job_io/htcondor.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,48 @@
import csv
import json
import logging
from typing import Dict, Iterable

from lapis.job import Job
from copy import deepcopy


default_resource_name_mapping: Dict[str, str] = {
"cores": "RequestCpus",
"walltime": "RequestWalltime", # s
"memory": "RequestMemory", # MiB
"disk": "RequestDisk", # KiB
}
default_used_resource_name_mapping: Dict[str, str] = {
"queuetime": "QDate",
"walltime": "RemoteWallClockTime", # s
"memory": "MemoryUsage", # MB
"disk": "DiskUsage_RAW", # KiB
}
default_unit_conversion_mapping: Dict[str, float] = {
"RequestCpus": 1,
"RequestWalltime": 1,
"RequestMemory": 1024 * 1024,
"RequestDisk": 1024,
"queuetime": 1,
"RemoteWallClockTime": 1,
"MemoryUsage": 1000 * 1000,
"DiskUsage_RAW": 1024,
}


def htcondor_job_reader(
iterable,
resource_name_mapping={ # noqa: B006
"cores": "RequestCpus",
"walltime": "RequestWalltime", # s
"memory": "RequestMemory", # MiB
"disk": "RequestDisk", # KiB
},
used_resource_name_mapping={ # noqa: B006
"queuetime": "QDate",
"walltime": "RemoteWallClockTime", # s
"memory": "MemoryUsage", # MB
"disk": "DiskUsage_RAW", # KiB
},
unit_conversion_mapping={ # noqa: B006
"RequestCpus": 1,
"RequestWalltime": 1,
"RequestMemory": 1024 * 1024,
"RequestDisk": 1024,
"queuetime": 1,
"RemoteWallClockTime": 1,
"MemoryUsage": 1000 * 1000,
"DiskUsage_RAW": 1024,
},
):
resource_name_mapping: Dict[str, str] = None,
used_resource_name_mapping: Dict[str, str] = None,
unit_conversion_mapping: Dict[str, float] = None,
) -> Iterable[Job]:
if resource_name_mapping is None:
resource_name_mapping = default_resource_name_mapping
if used_resource_name_mapping is None:
used_resource_name_mapping = default_used_resource_name_mapping
if unit_conversion_mapping is None:
unit_conversion_mapping = default_unit_conversion_mapping

input_file_type = iterable.name.split(".")[-1].lower()
if input_file_type == "json":
htcondor_reader = json.load(iterable)
Expand All @@ -40,6 +52,7 @@ def htcondor_job_reader(
logging.getLogger("implementation").error(
"Invalid input file %s. Job input file can not be read." % iterable.name
)
return
for entry in htcondor_reader:
if float(entry[used_resource_name_mapping["walltime"]]) <= 0:
logging.getLogger("implementation").warning(
Expand Down Expand Up @@ -69,22 +82,6 @@ def htcondor_job_reader(
float(entry[original_key])
* unit_conversion_mapping.get(original_key, 1)
)

try:
resources["inputfiles"] = deepcopy(entry["Inputfiles"])
used_resources["inputfiles"] = deepcopy(entry["Inputfiles"])
for filename, filespecs in entry["Inputfiles"].items():
if "usedsize" in filespecs:
del resources["inputfiles"][filename]["usedsize"]
if "filesize" in filespecs:
if "usedsize" not in filespecs:
used_resources["inputfiles"][filename]["usedsize"] = filespecs[
"filesize"
]
del used_resources["inputfiles"][filename]["filesize"]

except KeyError:
pass
yield Job(
resources=resources,
used_resources=used_resources,
Expand Down
44 changes: 28 additions & 16 deletions lapis/job_io/swf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,40 @@
[Standard Workload Format](http://www.cs.huji.ac.il/labs/parallel/workload/swf.html).
"""
import csv
from typing import Dict, Iterable

from lapis.job import Job


default_resource_name_mapping: Dict[str, str] = {
"cores": "Requested Number of Processors",
"walltime": "Requested Time", # s
"memory": "Requested Memory", # KiB
}
default_used_resource_name_mapping: Dict[str, str] = {
"walltime": "Run Time", # s
"cores": "Number of Allocated Processors",
"memory": "Used Memory", # KiB
"queuetime": "Submit Time",
}
default_unit_conversion_mapping: Dict[str, float] = {
"Used Memory": 1024,
"Requested Memory": 1024,
}


def swf_job_reader(
iterable,
resource_name_mapping={ # noqa: B006
"cores": "Requested Number of Processors",
"walltime": "Requested Time", # s
"memory": "Requested Memory", # KiB
},
used_resource_name_mapping={ # noqa: B006
"walltime": "Run Time", # s
"cores": "Number of Allocated Processors",
"memory": "Used Memory", # KiB
"queuetime": "Submit Time",
},
unit_conversion_mapping={ # noqa: B006
"Used Memory": 1024,
"Requested Memory": 1024,
},
):
resource_name_mapping: Dict[str, str] = None,
used_resource_name_mapping: Dict[str, str] = None,
unit_conversion_mapping: Dict[str, float] = None,
) -> Iterable[Job]:
if resource_name_mapping is None:
resource_name_mapping = default_resource_name_mapping
if used_resource_name_mapping is None:
used_resource_name_mapping = default_used_resource_name_mapping
if unit_conversion_mapping is None:
unit_conversion_mapping = default_unit_conversion_mapping
header = {
"Job Number": 0,
"Submit Time": 1,
Expand Down
Loading