diff --git a/.coveragerc b/.coveragerc index 217073f..836ee80 100644 --- a/.coveragerc +++ b/.coveragerc @@ -18,4 +18,4 @@ exclude_lines = raise NotImplementedError return NotImplemented if __name__ == "__main__" - if __name__ == '__main__' \ No newline at end of file + if __name__ == '__main__' diff --git a/.gitignore b/.gitignore index 6d07713..7bc8879 100644 --- a/.gitignore +++ b/.gitignore @@ -207,4 +207,3 @@ Icon Network Trash Folder Temporary Items .apdisk - diff --git a/docs/Makefile b/docs/Makefile index 298ea9e..5128596 100644 --- a/docs/Makefile +++ b/docs/Makefile @@ -16,4 +16,4 @@ help: # Catch-all target: route all unknown targets to Sphinx using the new # "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). %: Makefile - @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) \ No newline at end of file + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/docs/source/topics/monitoring.rst b/docs/source/topics/monitoring.rst index 6651137..e48dba3 100644 --- a/docs/source/topics/monitoring.rst +++ b/docs/source/topics/monitoring.rst @@ -77,9 +77,9 @@ COBalD-specific Monitoring Caching-specific Monitoring ~~~~~~~~~~~~~~~~~~~~~~~~~~~ -.. TODO:: - - Will be added as soon as the caching branch is merged. +.. autofunction:: lapis.monitor.caching.storage_status +.. autofunction:: lapis.monitor.caching.storage_connection +.. autofunction:: lapis.monitor.caching.remote_connection Telegraf -------- diff --git a/lapis/cachealgorithm.py b/lapis/cachealgorithm.py new file mode 100644 index 0000000..caa0d66 --- /dev/null +++ b/lapis/cachealgorithm.py @@ -0,0 +1,57 @@ +from typing import Optional, Callable, Tuple + +from lapis.files import RequestedFile, StoredFile +from lapis.storageelement import StorageElement +from lapis.utilities.cache_cleanup_implementations import sort_files_by_cachedsince + + +def check_size(file: RequestedFile, storage: StorageElement): + return storage.size >= file.filesize + + +def check_relevance(file: RequestedFile, storage: StorageElement): + return True + + +def delete_oldest( + file: RequestedFile, storage: StorageElement +) -> Tuple[bool, Tuple[StoredFile]]: + deletable_files = [] + currently_free = storage.available + if currently_free < storage.available: + sorted_files = sort_files_by_cachedsince(storage.files.items()) + while currently_free < file.filesize: + deletable_files.append(next(sorted_files)) + currently_free += deletable_files[-1].filesize + return True, tuple(deletable_files) + + +def delete_oldest_few_used( + file: RequestedFile, storage: StorageElement +) -> Tuple[bool, Optional[Tuple[StoredFile]]]: + deletable_files = [] + currently_free = storage.available + if currently_free < storage.available: + sorted_files = sort_files_by_cachedsince(storage.files.items()) + for current_file in sorted_files: + if current_file.numberofaccesses < 3: + deletable_files.append(current_file) + currently_free += deletable_files[-1].filesize + if currently_free >= file.filesize: + return True, tuple(deletable_files) + return False, None + + +class CacheAlgorithm(object): + def __init__(self, caching_strategy: Callable, deletion_strategy: Callable): + self._caching_strategy = lambda file, storage: check_size( + file, storage + ) and check_relevance(file, storage) + self._deletion_strategy = lambda file, storage: delete_oldest(file, storage) + + def consider( + self, file: RequestedFile, storage: StorageElement + ) -> Tuple[bool, Optional[Tuple[StoredFile]]]: + if self._caching_strategy(file, storage): + return self._deletion_strategy(file, storage) + return False, None diff --git a/lapis/cli/simulate.py b/lapis/cli/simulate.py index aa176ee..619e55f 100644 --- a/lapis/cli/simulate.py +++ b/lapis/cli/simulate.py @@ -1,3 +1,5 @@ +from functools import partial + import click import logging.handlers @@ -9,6 +11,8 @@ from lapis.pool import StaticPool, Pool from lapis.pool_io.htcondor import htcondor_pool_reader from lapis.job_io.swf import swf_job_reader +from lapis.storageelement import StorageElement, HitrateStorage +from lapis.storage_io.storage import storage_reader from lapis.scheduler import CondorJobScheduler from lapis.simulator import Simulator @@ -25,6 +29,8 @@ pool_import_mapper = {"htcondor": htcondor_pool_reader} +storage_import_mapper = {"standard": storage_reader} + @click.group() @click.option("--seed", type=int, default=1234) @@ -32,11 +38,13 @@ @click.option("--log-tcp", "log_tcp", is_flag=True) @click.option("--log-file", "log_file", type=click.File("w")) @click.option("--log-telegraf", "log_telegraf", is_flag=True) +@click.option("--calculation-efficiency", type=float) @click.pass_context -def cli(ctx, seed, until, log_tcp, log_file, log_telegraf): +def cli(ctx, seed, until, log_tcp, log_file, log_telegraf, calculation_efficiency): ctx.ensure_object(dict) ctx.obj["seed"] = seed ctx.obj["until"] = until + ctx.obj["calculation_efficiency"] = calculation_efficiency monitoring_logger = logging.getLogger() monitoring_logger.setLevel(logging.DEBUG) time_filter = SimulationTimeFilter() @@ -71,15 +79,43 @@ def cli(ctx, seed, until, log_tcp, log_file, log_telegraf): type=(click.File("r"), click.Choice(list(pool_import_mapper.keys()))), multiple=True, ) +@click.option( + "--storage-files", + "storage_files", + type=( + click.File("r"), + click.File("r"), + click.Choice(list(storage_import_mapper.keys())), + ), + default=(None, None, None), +) +@click.option("--remote-throughput", "remote_throughput", type=float, default=10) +@click.option("--cache-hitrate", "cache_hitrate", type=float, default=None) @click.pass_context -def static(ctx, job_file, pool_file): +def static(ctx, job_file, pool_file, storage_files, remote_throughput, cache_hitrate): click.echo("starting static environment") simulator = Simulator(seed=ctx.obj["seed"]) file, file_type = job_file simulator.create_job_generator( - job_input=file, job_reader=job_import_mapper[file_type] + job_input=file, + job_reader=partial( + job_import_mapper[file_type], + calculation_efficiency=ctx.obj["calculation_efficiency"], + ), ) simulator.create_scheduler(scheduler_type=CondorJobScheduler) + + if all(storage_files): + simulator.create_connection_module(remote_throughput * 1024 * 1024 * 1024) + storage_file, storage_content_file, storage_type = storage_files + simulator.create_storage( + storage_input=storage_file, + storage_content_input=storage_content_file, + storage_reader=storage_import_mapper[storage_type], + storage_type=partial(HitrateStorage, cache_hitrate) + if cache_hitrate is not None + else StorageElement, + ) for current_pool in pool_file: pool_file, pool_file_type = current_pool simulator.create_pools( @@ -87,6 +123,7 @@ def static(ctx, job_file, pool_file): pool_reader=pool_import_mapper[pool_file_type], pool_type=StaticPool, ) + simulator.enable_monitoring() simulator.run(until=ctx.obj["until"]) @@ -108,7 +145,11 @@ def dynamic(ctx, job_file, pool_file): simulator = Simulator(seed=ctx.obj["seed"]) file, file_type = job_file simulator.create_job_generator( - job_input=file, job_reader=job_import_mapper[file_type] + job_input=file, + job_reader=partial( + job_import_mapper[file_type], + calculation_efficiency=ctx.obj["calculation_efficiency"], + ), ) simulator.create_scheduler(scheduler_type=CondorJobScheduler) for current_pool in pool_file: @@ -119,6 +160,7 @@ def dynamic(ctx, job_file, pool_file): pool_type=Pool, controller=SimulatedLinearController, ) + simulator.enable_monitoring() simulator.run(until=ctx.obj["until"]) @@ -146,7 +188,11 @@ def hybrid(ctx, job_file, static_pool_file, dynamic_pool_file): simulator = Simulator(seed=ctx.obj["seed"]) file, file_type = job_file simulator.create_job_generator( - job_input=file, job_reader=job_import_mapper[file_type] + job_input=file, + job_reader=partial( + job_import_mapper[file_type], + calculation_efficiency=ctx.obj["calculation_efficiency"], + ), ) simulator.create_scheduler(scheduler_type=CondorJobScheduler) for current_pool in static_pool_file: @@ -164,6 +210,7 @@ def hybrid(ctx, job_file, static_pool_file, dynamic_pool_file): pool_type=Pool, controller=SimulatedLinearController, ) + simulator.enable_monitoring() simulator.run(until=ctx.obj["until"]) diff --git a/lapis/connection.py b/lapis/connection.py new file mode 100644 index 0000000..5f55838 --- /dev/null +++ b/lapis/connection.py @@ -0,0 +1,149 @@ +import random + +from typing import Union, Optional +from usim import Scope, time, Pipe + +from lapis.cachealgorithm import ( + CacheAlgorithm, + check_size, + check_relevance, + delete_oldest_few_used, +) +from lapis.storageelement import StorageElement, RemoteStorage +from lapis.files import RequestedFile, RequestedFile_HitrateBased +from lapis.monitor import sampling_required + + +class Connection(object): + + __slots__ = ("storages", "remote_connection", "caching_algorithm") + + def __init__(self, throughput=100): + self.storages = dict() + self.remote_connection = RemoteStorage(Pipe(throughput=throughput)) + self.caching_algorithm = CacheAlgorithm( + caching_strategy=lambda file, storage: check_size(file, storage) + and check_relevance(file, storage), + deletion_strategy=lambda file, storage: delete_oldest_few_used( + file, storage + ), + ) + + def add_storage_element(self, storage_element: StorageElement): + """ + Register storage element in Connetion module clustering storage elements by + sitename + :param storage_element: + :return: + """ + storage_element.remote_storage = self.remote_connection + try: + self.storages[storage_element.sitename].append(storage_element) + except KeyError: + self.storages[storage_element.sitename] = [storage_element] + + async def _determine_inputfile_source( + self, + requested_file: RequestedFile, + dronesite: Optional[str], + job_repr: Optional[str] = None, + ) -> Union[StorageElement, RemoteStorage]: + """ + Collects NamedTuples containing the amount of data of the requested file + cached in a storage element and the storage element for all reachable storage + objects on the drone's site. The tuples are sorted by amount of cached data + and the storage object where the biggest part of the file is cached is + returned. If the file is not cached in any storage object the connection module + remote connection is returned. + :param requested_file: + :param dronesite: + :param job_repr: + :return: + """ + provided_storages = self.storages.get(dronesite, None) + if provided_storages is not None: + look_up_list = [] + for storage in provided_storages: + look_up_list.append(storage.find(requested_file, job_repr)) + storage_list = sorted( + [entry for entry in look_up_list], key=lambda x: x[0], reverse=True + ) + for entry in storage_list: + # TODO: check should better check that size is bigger than requested + if entry.cached_filesize > 0: + return entry.storage + return self.remote_connection + + async def stream_file( + self, requested_file: RequestedFile, dronesite, job_repr=None + ): + """ + Determines which storage object is used to provide the requested file and + startes the files transfer. For files transfered via remote connection a + potential cache decides whether to cache the file and handles the caching + process. + :param requested_file: + :param dronesite: + :param job_repr: + :return: + """ + used_connection = await self._determine_inputfile_source( + requested_file, dronesite, job_repr + ) + await sampling_required.put(used_connection) + if used_connection == self.remote_connection and self.storages.get( + dronesite, None + ): + try: + potential_cache = random.choice(self.storages[dronesite]) + cache_file, files_for_deletion = self.caching_algorithm.consider( + file=requested_file, storage=potential_cache + ) + if cache_file: + for file in files_for_deletion: + await potential_cache.remove(file, job_repr) + await potential_cache.add(requested_file, job_repr) + else: + print( + f"APPLY CACHING DECISION: Job {job_repr}, " + f"File {requested_file.filename}: File wasnt " + f"cached @ {time.now}" + ) + except KeyError: + pass + print(f"now transfering {requested_file.filesize} from {used_connection}") + await used_connection.transfer(requested_file, job_repr=job_repr) + print( + "Job {}: finished transfering of file {}: {}GB @ {}".format( + job_repr, requested_file.filename, requested_file.filesize, time.now + ) + ) + + async def transfer_files(self, drone, requested_files: dict, job_repr=None): + """ + Converts dict information about requested files to RequestedFile object and + parallely launches streaming for all files + :param drone: + :param requested_files: + :param job_repr: + :return: + """ + start_time = time.now + async with Scope() as scope: + for inputfilename, inputfilespecs in requested_files.items(): + if "hitrates" in inputfilespecs.keys(): + requested_file = RequestedFile_HitrateBased( + inputfilename, + inputfilespecs["usedsize"], + inputfilespecs["hitrates"], + ) + else: + requested_file = RequestedFile( + inputfilename, inputfilespecs["usedsize"] + ) + scope.do(self.stream_file(requested_file, drone.sitename, job_repr)) + stream_time = time.now - start_time + print( + "STREAMED files {} in {}".format(list(requested_files.keys()), stream_time) + ) + return stream_time diff --git a/lapis/drone.py b/lapis/drone.py index 1932537..eb13bf3 100644 --- a/lapis/drone.py +++ b/lapis/drone.py @@ -1,7 +1,9 @@ from cobald import interfaces +from typing import Optional from usim import time, Scope, instant, Capacities, ResourcesUnavailable, Queue from lapis.job import Job +from lapis.connection import Connection class ResourcesExceeded(Exception): @@ -12,9 +14,11 @@ class Drone(interfaces.Pool): def __init__( self, scheduler, - pool_resources: dict, - scheduling_duration: float, + pool_resources: Optional[dict] = None, + scheduling_duration: Optional[float] = None, ignore_resources: list = None, + sitename: str = None, + connection: Connection = None, ): """ :param scheduler: @@ -23,6 +27,8 @@ def __init__( """ super(Drone, self).__init__() self.scheduler = scheduler + self.connection = connection + self.sitename = sitename self.pool_resources = pool_resources self.resources = Capacities(**pool_resources) # shadowing requested resources to determine jobs to be killed @@ -143,6 +149,11 @@ async def _run_job(self, job: Job, kill: bool): pass self.scheduler.update_drone(self) await job_execution.done + print( + "finished job {} on drone {} @ {}".format( + repr(job), repr(self), time.now + ) + ) except ResourcesUnavailable: await instant job_execution.cancel() diff --git a/lapis/files.py b/lapis/files.py new file mode 100644 index 0000000..e227447 --- /dev/null +++ b/lapis/files.py @@ -0,0 +1,54 @@ +from typing import Optional, NamedTuple + + +class StoredFile(object): + + __slots__ = ( + "filename", + "filesize", + "storedsize", + "cachedsince", + "lastaccessed", + "numberofaccesses", + ) + + def __init__( + self, + filename: str, + filesize: Optional[int] = None, + storedsize: Optional[int] = None, + cachedsince: Optional[int] = None, + lastaccessed: Optional[int] = None, + numberofaccesses: Optional[int] = None, + **filespecs, + ): + self.filename = filename + self.filesize = filesize + self.storedsize = storedsize or self.filesize + self.cachedsince = cachedsince + self.lastaccessed = lastaccessed + self.numberofaccesses = numberofaccesses + + def increment_accesses(self): + self.numberofaccesses += 1 + + +class RequestedFile(NamedTuple): + filename: str + filesize: Optional[int] = None + + def convert_to_stored_file_object(self, currenttime): + print(self.filesize) + return StoredFile( + self.filename, + filesize=self.filesize, + cachedsince=currenttime, + lastaccessed=currenttime, + numberofaccesses=1, + ) + + +class RequestedFile_HitrateBased(NamedTuple): + filename: str + filesize: float + cachehitrate: dict diff --git a/lapis/interfaces/_storage.py b/lapis/interfaces/_storage.py new file mode 100644 index 0000000..78f7dfc --- /dev/null +++ b/lapis/interfaces/_storage.py @@ -0,0 +1,61 @@ +import abc + +from typing import NamedTuple + +from lapis.files import RequestedFile, StoredFile + + +class LookUpInformation(NamedTuple): + cached_filesize: int + storage: "Storage" + + +class Storage(metaclass=abc.ABCMeta): + @property + @abc.abstractmethod + def size(self) -> int: + """Total size of storage in Bytes""" + raise NotImplementedError + + @property + @abc.abstractmethod + def available(self) -> int: + """Available storage in Bytes""" + raise NotImplementedError + + @property + @abc.abstractmethod + def used(self) -> int: + """Used storage in Bytes""" + raise NotImplementedError + + @abc.abstractmethod + async def transfer(self, file: RequestedFile, job_repr): + """ + Transfer size of given file via the storages' connection and update file + information. If the file was deleted since it was originally looked up + the resulting error is not raised. + + .. TODO:: What does this mean with the error? + """ + raise NotImplementedError + + @abc.abstractmethod + async def add(self, file: RequestedFile, job_repr): + """ + Add file information to storage and transfer the size of the file via + the storages' connection. + """ + raise NotImplementedError + + @abc.abstractmethod + async def remove(self, file: StoredFile, job_repr): + """ + Remove all file information and used filesize from the storage. + """ + raise NotImplementedError + + @abc.abstractmethod + def find(self, file: RequestedFile, job_repr) -> LookUpInformation: + """Information if a file is stored in Storage""" + raise NotImplementedError diff --git a/lapis/job.py b/lapis/job.py index c4627e0..33bf108 100644 --- a/lapis/job.py +++ b/lapis/job.py @@ -1,7 +1,7 @@ import logging from typing import Optional, TYPE_CHECKING -from usim import time +from usim import time, Scope, instant from usim import CancelTask from lapis.monitor import sampling_required @@ -24,6 +24,7 @@ class Job(object): "_name", "drone", "_success", + "calculation_efficiency", ) def __init__( @@ -32,8 +33,9 @@ def __init__( used_resources: dict, in_queue_since: float = 0, queue_date: float = 0, - name: str = None, - drone: "Drone" = None, + name: Optional[str] = None, + drone: "Optional[Drone]" = None, + calculation_efficiency: Optional[float] = None, ): """ Definition of a job that uses a specified amount of resources `used_resources` @@ -59,17 +61,20 @@ def __init__( self.used_resources[key], ) self.resources[key] = self.used_resources[key] - self.walltime = used_resources.pop("walltime") - self.requested_walltime = resources.pop("walltime", None) - self.requested_inputfiles = resources.pop("inputfiles", None) - self.used_inputfiles = used_resources.pop("inputfiles", None) + self.walltime: int = used_resources.pop("walltime") + self.requested_walltime: Optional[int] = resources.pop("walltime", None) self.queue_date = queue_date assert in_queue_since >= 0, "Queue time cannot be negative" self.in_queue_since = in_queue_since - self.in_queue_until = None + self.in_queue_until: Optional[float] = None self.drone = drone self._name = name self._success: Optional[bool] = None + self.calculation_efficiency = calculation_efficiency + + # caching-related + self.requested_inputfiles = resources.pop("inputfiles", None) + self.used_inputfiles = used_resources.pop("inputfiles", None) @property def name(self) -> str: @@ -91,22 +96,71 @@ def waiting_time(self) -> float: return self.in_queue_until - self.in_queue_since return float("Inf") + async def _calculate(self): + """ + Determines a jobs calculation time based on the jobs CPU time and a + calculation efficiency representing inefficient programming. + :param calculation_efficiency: + :return: + """ + print( + f"WALLTIME: Job {self} @ {time.now}, " + f"{self.used_resources.get('cores', None)}, " + f"{self.calculation_efficiency}" + ) + result = self.walltime + try: + result = ( + self.used_resources["cores"] / self.calculation_efficiency + ) * self.walltime + except (KeyError, TypeError): + pass + start = time.now + await (time + result) + print(f"finished calculation at {time.now - start}") + + async def _transfer_inputfiles(self): + try: + start = time.now + print(f"TRANSFERING INPUTFILES: Job {self} @ {start}") + await self.drone.connection.transfer_files( + drone=self.drone, + requested_files=self.used_inputfiles, + job_repr=repr(self), + ) + print( + f"streamed inputfiles {self.used_inputfiles.keys()} for job {self} " + f"in {time.now - start} timeunits, finished @ {time.now}" + ) + except AttributeError: + pass + async def run(self, drone: "Drone"): assert drone, "Jobs cannot run without a drone being assigned" self.drone = drone self.in_queue_until = time.now self._success = None await sampling_required.put(self) + print("running job {} in drone {}".format(repr(self), repr(self.drone))) try: - await (time + self.walltime) + start = time.now + async with Scope() as scope: + await instant + scope.do(self._transfer_inputfiles()) + scope.do(self._calculate()) except CancelTask: self.drone = None self._success = False + # TODO: in_queue_until is still set except BaseException: self.drone = None self._success = False + # TODO: in_queue_until is still set raise else: + old_walltime = self.walltime + self.walltime = time.now - start + print(f"monitored walltime of {old_walltime} changed to {self.walltime}") self.drone = None self._success = True await sampling_required.put(self) diff --git a/lapis/job_io/htcondor.py b/lapis/job_io/htcondor.py index 60a3684..87d94a6 100644 --- a/lapis/job_io/htcondor.py +++ b/lapis/job_io/htcondor.py @@ -1,6 +1,7 @@ import csv import json import logging +from typing import Optional from lapis.job import Job from copy import deepcopy @@ -8,6 +9,7 @@ def htcondor_job_reader( iterable, + calculation_efficiency: Optional[float] = None, resource_name_mapping={ # noqa: B006 "cores": "RequestCpus", "walltime": "RequestWalltime", # s @@ -29,6 +31,8 @@ def htcondor_job_reader( "RemoteWallClockTime": 1, "MemoryUsage": 1000 * 1000, "DiskUsage_RAW": 1024, + "filesize": 1024 * 1024 * 1024, + "usedsize": 1024 * 1024 * 1024, }, ): input_file_type = iterable.name.split(".")[-1].lower() @@ -70,17 +74,34 @@ def htcondor_job_reader( * unit_conversion_mapping.get(original_key, 1) ) + calculation_efficiency = entry.get( + "calculation_efficiency", calculation_efficiency + ) + try: + if not entry["Inputfiles"]: + del entry["Inputfiles"] resources["inputfiles"] = deepcopy(entry["Inputfiles"]) used_resources["inputfiles"] = deepcopy(entry["Inputfiles"]) for filename, filespecs in entry["Inputfiles"].items(): + for key in filespecs.keys(): + if key == "hitrates": + continue + resources["inputfiles"][filename][key] = filespecs[ + key + ] * unit_conversion_mapping.get(key, 1) + used_resources["inputfiles"][filename][key] = filespecs[ + key + ] * unit_conversion_mapping.get(key, 1) + if "usedsize" in filespecs: del resources["inputfiles"][filename]["usedsize"] + if "filesize" in filespecs: if "usedsize" not in filespecs: - used_resources["inputfiles"][filename]["usedsize"] = filespecs[ - "filesize" - ] + used_resources["inputfiles"][filename]["usedsize"] = resources[ + "inputfiles" + ][filename]["filesize"] del used_resources["inputfiles"][filename]["filesize"] except KeyError: @@ -89,4 +110,6 @@ def htcondor_job_reader( resources=resources, used_resources=used_resources, queue_date=float(entry[used_resource_name_mapping["queuetime"]]), + calculation_efficiency=calculation_efficiency, + name=entry.get("name", None), ) diff --git a/lapis/job_io/swf.py b/lapis/job_io/swf.py index 99124bd..bc75d20 100644 --- a/lapis/job_io/swf.py +++ b/lapis/job_io/swf.py @@ -4,12 +4,14 @@ [Standard Workload Format](http://www.cs.huji.ac.il/labs/parallel/workload/swf.html). """ import csv +from typing import Optional from lapis.job import Job def swf_job_reader( iterable, + calculation_efficiency: Optional[float] = None, resource_name_mapping={ # noqa: B006 "cores": "Requested Number of Processors", "walltime": "Requested Time", # s @@ -90,4 +92,5 @@ def swf_job_reader( used_resources=used_resources, queue_date=float(row[header[used_resource_name_mapping["queuetime"]]]), name=row[header["Job Number"]], + calculation_efficiency=calculation_efficiency, ) diff --git a/lapis/monitor/__init__.py b/lapis/monitor/__init__.py index c7e4039..47feb59 100644 --- a/lapis/monitor/__init__.py +++ b/lapis/monitor/__init__.py @@ -7,6 +7,9 @@ from usim import time, Queue +SIMULATION_START = None + + class LoggingSocketHandler(logging.handlers.SocketHandler): def makePickle(self, record): return self.format(record).encode() @@ -42,11 +45,19 @@ def __init__(self): self._statistics = {} async def run(self): - async for log_object in sampling_required: - for statistic in self._statistics.get(type(log_object), set()): - # do the logging - for record in statistic(log_object): - logging.getLogger(statistic.name).info(statistic.name, record) + # The Queue.__aiter__ cannot safely be finalised unless closed. + # We explicitly create and later on aclose it, to ensure this happens + # when the Scope collects us and the event loop is still around. + log_iter = sampling_required.__aiter__() + try: + async for log_object in log_iter: + for statistic in self._statistics.get(type(log_object), set()): + # do the logging + for record in statistic(log_object): + record["tardis"] = "lapis-%s" % SIMULATION_START + logging.getLogger(statistic.name).info(statistic.name, record) + except GeneratorExit: + await log_iter.aclose() def register_statistic(self, statistic: Callable) -> None: """ diff --git a/lapis/monitor/caching.py b/lapis/monitor/caching.py new file mode 100644 index 0000000..22ca822 --- /dev/null +++ b/lapis/monitor/caching.py @@ -0,0 +1,91 @@ +import logging + +from cobald.monitor.format_json import JsonFormatter +from cobald.monitor.format_line import LineProtocolFormatter +from usim import Pipe + +from lapis.monitor import LoggingSocketHandler, LoggingUDPSocketHandler +from lapis.storageelement import StorageElement + + +def storage_status(storage: StorageElement) -> list: + """ + Log information about current storage object state + :param storage: + :return: list of records for logging + """ + results = [ + { + "storage": repr(storage), + "usedstorage": storage.used, + "storagesize": storage.size, + "numberoffiles": len(storage.files), + } + ] + return results + + +storage_status.name = "storage_status" +storage_status.whitelist = (StorageElement,) +storage_status.logging_formatter = { + LoggingSocketHandler.__name__: JsonFormatter(), + logging.StreamHandler.__name__: JsonFormatter(), + LoggingUDPSocketHandler.__name__: LineProtocolFormatter( + tags={"tardis", "storage"}, resolution=1 + ), +} + + +def storage_connection(storage: StorageElement) -> list: + """ + Log information about the storages connection + :param storage: + :return: + """ + results = [ + { + "storage": repr(storage), + "throughput": storage.connection.throughput, + "requested_throughput": sum(storage.connection._subscriptions.values()), + "throughput_scale": storage.connection._throughput_scale, + } + ] + return results + + +storage_connection.name = "storage_connection" +storage_connection.whitelist = (StorageElement,) +storage_connection.logging_formatter = { + LoggingSocketHandler.__name__: JsonFormatter(), + logging.StreamHandler.__name__: JsonFormatter(), + LoggingUDPSocketHandler.__name__: LineProtocolFormatter( + tags={"tardis", "storage"}, resolution=1 + ), +} + + +def remote_connection(remote: Pipe) -> list: + """ + Log information about the remote connection + :param remote: + :return: + """ + results = [ + { + "throughput": remote.throughput, + "requested_throughput": sum(remote._subscriptions.values()), + "throughput_scale": remote._throughput_scale, + } + ] + return results + + +remote_connection.name = "remote_connection" +remote_connection.whitelist = (Pipe,) +remote_connection.logging_formatter = { + LoggingSocketHandler.__name__: JsonFormatter(), + logging.StreamHandler.__name__: JsonFormatter(), + LoggingUDPSocketHandler.__name__: LineProtocolFormatter( + tags={"tardis"}, resolution=1 + ), +} diff --git a/lapis/pool.py b/lapis/pool.py index 2a3e465..74bbc9b 100644 --- a/lapis/pool.py +++ b/lapis/pool.py @@ -1,7 +1,9 @@ +from functools import partial from typing import Generator, Callable from cobald import interfaces from usim import eternity, Scope, interval +from lapis.connection import Connection from .drone import Drone @@ -24,15 +26,20 @@ def __init__( capacity: int = float("inf"), init: int = 0, name: str = None, + connection: Connection = None, ): super(Pool, self).__init__() assert init <= capacity - self.make_drone = make_drone self._drones = [] self._demand = 1 self._level = init self._capacity = capacity self._name = name + # TODO: Should drones have access to the pool or the connection directly? + if connection is not None: + self.make_drone = partial(make_drone, connection=connection) + else: + self.make_drone = make_drone async def init_pool(self, scope: Scope, init: int = 0): """ @@ -136,10 +143,15 @@ class StaticPool(Pool): instantiated within the pool """ - def __init__(self, make_drone: Callable, capacity: int = 0): + def __init__( + self, make_drone: Callable, capacity: int = 0, connection: Connection = None + ): assert capacity > 0, "Static pool was initialised without any resources..." super(StaticPool, self).__init__( - capacity=capacity, init=capacity, make_drone=make_drone + capacity=capacity, + init=capacity, + make_drone=make_drone, + connection=connection, ) self._demand = capacity diff --git a/lapis/pool_io/htcondor.py b/lapis/pool_io/htcondor.py index 0dba5c1..d60e92e 100644 --- a/lapis/pool_io/htcondor.py +++ b/lapis/pool_io/htcondor.py @@ -2,6 +2,8 @@ from functools import partial from typing import Callable + +from lapis.connection import Connection from ..pool import Pool @@ -19,6 +21,7 @@ def htcondor_pool_reader( }, pool_type: Callable = Pool, make_drone: Callable = None, + connection: Connection = None, ): """ Load a pool configuration that was exported via htcondor from files or @@ -48,5 +51,7 @@ def htcondor_pool_reader( for key, value in resource_name_mapping.items() }, ignore_resources=["disk"], + sitename=row.get("sitename", None), ), + connection=connection, ) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index b38d53e..22bbe60 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -1,5 +1,5 @@ from typing import Dict -from usim import Scope, interval, Resources +from usim import Scope, interval, Resources, time from lapis.drone import Drone from lapis.monitor import sampling_required @@ -88,9 +88,17 @@ async def run(self): async with Scope() as scope: scope.do(self._collect_jobs()) async for _ in interval(self.interval): + print("NEW SCHEDULING INTERVAL @ {}".format(time.now)) + print(self.job_queue) for job in self.job_queue.copy(): + print("SCHEDULING {}".format(repr(job))) best_match = self._schedule_job(job) if best_match: + print( + "start job {} on drone {} @ {}".format( + repr(job), repr(best_match), time.now + ) + ) await best_match.schedule_job(job) self.job_queue.remove(job) await sampling_required.put(self.job_queue) @@ -129,6 +137,11 @@ def _schedule_job(self, job) -> Drone: drone = cluster[0] cost = 0 resources = drone.theoretical_available_resources + # print( + # "trying to match Job {} to {}, resources {}".format( + # repr(job), repr(drone), resources + # ) + # ) for resource_type in job.resources: if resources.get(resource_type, 0) < job.resources[resource_type]: # Inf for all job resources that a drone does not support diff --git a/lapis/simulator.py b/lapis/simulator.py index 2920202..d64671e 100644 --- a/lapis/simulator.py +++ b/lapis/simulator.py @@ -1,11 +1,18 @@ import logging import random +import time as pytime from functools import partial +from typing import List + +from cobald.interfaces import Controller from usim import run, time, until, Scope, Queue +import lapis.monitor as monitor from lapis.drone import Drone from lapis.job import job_to_queue_scheduler +from lapis.connection import Connection +from lapis.monitor.caching import storage_status, storage_connection, remote_connection from lapis.monitor.general import ( user_demand, job_statistics, @@ -14,9 +21,8 @@ configuration_information, job_events, ) -from lapis.monitor import Monitoring from lapis.monitor.cobald import drone_statistics, pool_statistics - +from lapis.pool import Pool logging.getLogger("implementation").propagate = False @@ -24,19 +30,17 @@ class Simulator(object): def __init__(self, seed=1234): random.seed(seed) - self.job_queue = Queue() - self.pools = [] - self.controllers = [] + self.job_queue: Queue = Queue() + self.pools: List[Pool] = [] + self.connection: Connection = None + self.controllers: List[Controller] = [] self.job_scheduler = None self.job_generator = None - self.cost = 0 self._job_generators = [] - self.monitoring = None + self.monitoring = monitor.Monitoring() self.duration = None - self.enable_monitoring() def enable_monitoring(self): - self.monitoring = Monitoring() self.monitoring.register_statistic(user_demand) self.monitoring.register_statistic(job_statistics) self.monitoring.register_statistic(job_events) @@ -45,6 +49,9 @@ def enable_monitoring(self): self.monitoring.register_statistic(resource_statistics) self.monitoring.register_statistic(pool_status) self.monitoring.register_statistic(configuration_information) + self.monitoring.register_statistic(storage_status) + self.monitoring.register_statistic(storage_connection) + self.monitoring.register_statistic(remote_connection) def create_job_generator(self, job_input, job_reader): self._job_generators.append((job_input, job_reader)) @@ -55,20 +62,36 @@ def create_pools(self, pool_input, pool_reader, pool_type, controller=None): iterable=pool_input, pool_type=pool_type, make_drone=partial(Drone, self.job_scheduler), + connection=self.connection, ): self.pools.append(pool) if controller: self.controllers.append(controller(target=pool, rate=1)) + def create_storage( + self, storage_input, storage_reader, storage_type, storage_content_input=None + ): + assert self.connection, "Connection module needs to be created before storages" + for storage in storage_reader( + storage=storage_input, + storage_content=storage_content_input, + storage_type=storage_type, + ): + self.connection.add_storage_element(storage) + def create_scheduler(self, scheduler_type): self.job_scheduler = scheduler_type(job_queue=self.job_queue) + def create_connection_module(self, remote_throughput): + self.connection = Connection(remote_throughput) + def run(self, until=None): - print(f"running until {until}") + monitor.SIMULATION_START = pytime.time() + print(f"[lapis-{monitor.SIMULATION_START}] running until {until}") run(self._simulate(until)) async def _simulate(self, end): - print(f"Starting simulation at {time.now}") + print(f"[lapis-{monitor.SIMULATION_START}] Starting simulation at {time.now}") async with until(time == end) if end else Scope() as while_running: for pool in self.pools: while_running.do(pool.run(), volatile=True) @@ -79,9 +102,11 @@ async def _simulate(self, end): while_running.do(controller.run(), volatile=True) while_running.do(self.monitoring.run(), volatile=True) self.duration = time.now - print(f"Finished simulation at {self.duration}") + print( + f"[lapis-{monitor.SIMULATION_START}] Finished simulation at {self.duration}" + ) async def _queue_jobs(self, job_input, job_reader): await job_to_queue_scheduler( - job_generator=job_reader(job_input), job_queue=self.job_queue + job_generator=partial(job_reader, job_input)(), job_queue=self.job_queue ) diff --git a/lapis/storage_io/__init__.py b/lapis/storage_io/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lapis/storage_io/storage.py b/lapis/storage_io/storage.py new file mode 100644 index 0000000..b31c6b1 --- /dev/null +++ b/lapis/storage_io/storage.py @@ -0,0 +1,83 @@ +import csv +from functools import partial + +from lapis.files import StoredFile + + +def storage_reader( + storage, + storage_content, + storage_type, + unit_conversion_mapping: dict = { # noqa: B006 + "cachesizeGB": 1024 * 1024 * 1024, + "throughput_limit": 1024 * 1024 * 1024, + }, +): + try: + storage_content = storage_content_reader(storage_content) + except TypeError: + storage_content = dict() + reader = csv.DictReader(storage, delimiter=" ", quotechar="'") + for row in reader: + yield partial( + storage_type, + name=row["name"], + sitename=row["sitename"], + size=int( + float(row["cachesizeGB"]) + * unit_conversion_mapping.get("cachesizeGB", 1) + ), + throughput_limit=int( + float(row["throughput_limit"]) + * unit_conversion_mapping.get("throughput_limit", 1) + ), + files=storage_content.get(row["name"], dict()), + )() + + +def storage_content_reader( + file_name, + unit_conversion_mapping: dict = { # noqa: B006 + "filesize": 1024 * 1024 * 1024, + "storedsize": 1024 * 1024 * 1024, + }, +): + reader = csv.DictReader(file_name, delimiter=" ", quotechar="'") + cache_information = dict() + for row in reader: + for key in row: + if key not in ["filename", "cachename"]: + row[key] = int(float(row[key]) * unit_conversion_mapping.get(key, 1)) + cache_information.setdefault(row["cachename"], {})[ + row["filename"] + ] = StoredFile(**row) + return cache_information + + +def storage_reader_filebased_hitrate_caching( + storage, + storage_type, + storage_content=None, + unit_conversion_mapping: dict = { # noqa: B006 + "cachesizeGB": 1024 * 1024 * 1024, + "throughput_limit": 1024 * 1024 * 1024, + }, +): + + reader = csv.DictReader(storage, delimiter=" ", quotechar="'") + for row in reader: + print(row) + yield partial( + storage_type, + name=row["name"], + sitename=row["sitename"], + size=int( + float(row["cachesizeGB"]) + * unit_conversion_mapping.get("cachesizeGB", 1) + ), + throughput_limit=int( + float(row["throughput_limit"]) + * unit_conversion_mapping.get("throughput_limit", 1) + ), + files=dict(), + )() diff --git a/lapis/storageelement.py b/lapis/storageelement.py new file mode 100644 index 0000000..3f4f2f3 --- /dev/null +++ b/lapis/storageelement.py @@ -0,0 +1,307 @@ +from typing import Optional + +from usim import time, Resources, Pipe, Scope + +from lapis.files import StoredFile, RequestedFile, RequestedFile_HitrateBased +from lapis.interfaces._storage import Storage, LookUpInformation + +import logging + + +class RemoteStorage(Storage): + def __init__(self, pipe: Pipe): + self.connection = pipe + + @property + def size(self): + return float("Inf") + + @property + def available(self): + return float("Inf") + + @property + def used(self): + return 0 + + async def transfer(self, file: RequestedFile, **kwargs): + await self.connection.transfer(total=file.filesize) + + async def add(self, file: StoredFile, **kwargs): + raise NotImplementedError + + async def remove(self, file: StoredFile, **kwargs): + raise NotImplementedError + + def find(self, file: RequestedFile, **kwargs) -> LookUpInformation: + raise NotImplementedError + + +class StorageElement(Storage): + + __slots__ = ( + "name", + "sitename", + "_size", + "deletion_duration", + "update_duration", + "_usedstorage", + "files", + "filenames", + "connection", + "remote_storage", + ) + + def __init__( + self, + name: Optional[str] = None, + sitename: Optional[str] = None, + size: int = 1000 * 1024 * 1024 * 1024, + throughput_limit: int = 10 * 1024 * 1024 * 1024, + files: Optional[dict] = None, + ): + self.name = name + self.sitename = sitename + self.deletion_duration = 5 + self.update_duration = 1 + self._size = size + self.files = files + self._usedstorage = Resources( + size=sum(file.storedsize for file in files.values()) + ) + self.connection = Pipe(throughput_limit) + self.remote_storage = None + + @property + def size(self): + return self._size + + @property + def used(self): + return self._usedstorage.levels.size + + @property + def available(self): + return self.size - self.used + + async def remove(self, file: StoredFile, job_repr=None): + """ + Deletes file from storage object. The time this operation takes is defined + by the storages deletion_duration attribute. + :param file: + :param job_repr: Needed for debug output, will be replaced + :return: + """ + print( + "REMOVE FROM STORAGE: Job {}, File {} @ {}".format( + job_repr, file.filename, time.now + ) + ) + await (time + self.deletion_duration) + await self._usedstorage.decrease(size=file.filesize) + self.files.pop(file.filename) + + async def add(self, file: RequestedFile, job_repr=None): + """ + Adds file to storage object transfering it through the storage objects + connection. This should be sufficient for now because files are only added + to the storage when they are also transfered through the Connections remote + connection. If this simulator is extended to include any kind of + direct file placement this has to be adapted. + :param file: + :param job_repr: Needed for debug output, will be replaced + :return: + """ + print( + "ADD TO STORAGE: Job {}, File {} @ {}".format( + job_repr, file.filename, time.now + ) + ) + file = file.convert_to_stored_file_object(time.now) + await self._usedstorage.increase(size=file.filesize) + self.files[file.filename] = file + await self.connection.transfer(file.filesize) + + async def _update(self, stored_file: StoredFile, job_repr): + """ + Updates a stored files information upon access. + :param stored_file: + :param job_repr: Needed for debug output, will be replaced + :return: + """ + await (time + self.update_duration) + stored_file.lastaccessed = time.now + stored_file.increment_accesses() + print( + "UPDATE: Job {}, File {} @ {}".format( + job_repr, stored_file.filename, time.now + ) + ) + + async def transfer(self, file: RequestedFile, job_repr=None): + """ + Manages file transfer via the storage elements connection and updates file + information. If the file should have been deleted since it was originally + looked up the resulting error is not raised. + :param file: + :param job_repr: Needed for debug output, will be replaced + :return: + """ + await self.connection.transfer(file.filesize) + try: + # TODO: needs handling of KeyError + await self._update(self.files[file.filename], job_repr) + except AttributeError: + pass + + def find(self, requested_file: RequestedFile, job_repr=None): + """ + Searches storage object for the requested_file and sends result (amount of + cached data, storage object) to the queue. + :param requested_file: + :param job_repr: Needed for debug output, will be replaced + :return: (amount of cached data, storage object) + """ + print( + "LOOK UP FILE: Job {}, File {}, Storage {} @ {}".format( + job_repr, requested_file.filename, repr(self), time.now + ) + ) + try: + result = LookUpInformation( + self.files[requested_file.filename].filesize, self + ) + except KeyError: + print( + "File {} not cached on any reachable storage".format( + requested_file.filename + ) + ) + result = LookUpInformation(0, self) + return result + + def __repr__(self): + return "<%s: %s>" % (self.__class__.__name__, self.name or id(self)) + + +class HitrateStorage(StorageElement): + def __init__( + self, + hitrate, + name: Optional[str] = None, + sitename: Optional[str] = None, + size: int = 1000 * 1024 * 1024 * 1024, + throughput_limit: int = 10 * 1024 * 1024 * 1024, + files: Optional[dict] = None, + ): + super(HitrateStorage, self).__init__( + name=name, + sitename=sitename, + size=size, + throughput_limit=throughput_limit, + files=files, + ) + self._hitrate = hitrate + + @property + def available(self): + return self.size + + @property + def used(self): + return 0 + + async def transfer(self, file: RequestedFile, job_repr=None): + print( + "TRANSFER: {}, filesize {}, remote: {}/{}, cache: {}/{}".format( + self._hitrate, + file.filesize, + (1 - self._hitrate) * file.filesize, + self.remote_storage.connection.throughput, + self._hitrate * file.filesize, + self.connection.throughput, + ) + ) + async with Scope() as scope: + logging.getLogger("implementation").warning( + "{} {} @ {} in {}".format( + self._hitrate * file.filesize, + (1 - self._hitrate) * file.filesize, + time.now, + file.filename[-30:], + ) + ) + scope.do(self.connection.transfer(total=self._hitrate * file.filesize)) + scope.do( + self.remote_storage.connection.transfer( + total=(1 - self._hitrate) * file.filesize + ) + ) + + def find(self, requested_file: RequestedFile, job_repr=None): + return LookUpInformation(requested_file.filesize, self) + + async def add(self, file: RequestedFile, job_repr=None): + pass + + async def remove(self, file: StoredFile, job_repr=None): + pass + + +class FileBasedHitrateStorage(StorageElement): + def __init__( + self, + name: Optional[str] = None, + sitename: Optional[str] = None, + size: int = 1000 * 1024 * 1024 * 1024, + throughput_limit: int = 10 * 1024 * 1024 * 1024, + files: Optional[dict] = None, + ): + super(FileBasedHitrateStorage, self).__init__( + name=name, + sitename=sitename, + size=size, + throughput_limit=throughput_limit, + files=files, + ) + + @property + def available(self): + return self.size + + @property + def used(self): + return 0 + + async def transfer(self, file: RequestedFile_HitrateBased, job_repr=None): + current_cachehitrate = file.cachehitrate.get(self.name, 0) + print( + "TRANSFER: on {} with {}, filesize {}, remote: {}/{}, cache: {}/{}".format( + self.name, + file.cachehitrate.get(self.name, 0), + file.filesize, + (1 - current_cachehitrate) * file.filesize, + self.remote_storage.connection.throughput, + current_cachehitrate * file.filesize, + self.connection.throughput, + ) + ) + async with Scope() as scope: + + scope.do( + self.connection.transfer(total=current_cachehitrate * file.filesize) + ) + scope.do( + self.remote_storage.connection.transfer( + total=(1 - current_cachehitrate) * file.filesize + ) + ) + + def find(self, requested_file: RequestedFile, job_repr=None): + return LookUpInformation(requested_file.filesize, self) + + async def add(self, file: RequestedFile, job_repr=None): + pass + + async def remove(self, file: StoredFile, job_repr=None): + pass diff --git a/lapis/utilities/cache_algorithm_implementations.py b/lapis/utilities/cache_algorithm_implementations.py new file mode 100644 index 0000000..468ed0d --- /dev/null +++ b/lapis/utilities/cache_algorithm_implementations.py @@ -0,0 +1,5 @@ +def cache_all(*args, **kwargs): + return True + + +cache_algorithm = {"standard": cache_all} diff --git a/lapis/utilities/cache_cleanup_implementations.py b/lapis/utilities/cache_cleanup_implementations.py new file mode 100644 index 0000000..0ddf493 --- /dev/null +++ b/lapis/utilities/cache_cleanup_implementations.py @@ -0,0 +1,42 @@ +from typing import List + +from lapis.files import StoredFile + + +def sort_files_by_cachedsince(stored_files: set) -> List[StoredFile]: + return sorted(list(stored_files), key=lambda x: x.cachedsince) + + +# async def fifo(size, storage): +# print("hit fifo") +# print(storage.files.keys()) +# # FIFO, test different implementations +# sorted_content = sorted( +# list(storage.files.items()), key=lambda x: x.filespecs.cachedsince +# ) +# print("sorted", sorted_content) +# while size < 0: +# print("hit while") +# size += sorted_content[0][1]["cachedsizeMB"] +# storage.files.pop(sorted_content[0][0]) +# await sleep(storage.placement_duration) +# await storage._usedstorage.decrease( +# **{"usedsize": sorted_content[0][1]["cachedsizeMB"]}) +# print(storage.usedstorage) +# sorted_content.pop(0) +# print("after fifo ", storage.files.keys()) +# +# +# def last_accessed(size, storage): +# # FIFO, test different implementations +# sorted_content = sorted( +# list(storage.content.items()), key=lambda x: x[1]["lastaccessed"] +# ) +# while size < 0: +# size += sorted_content[0][1]["cachedsizeMB"] +# storage.content.pop(sorted_content[0][0]) +# storage.usedstorage -= sorted_content[0][1]["cachedsizeMB"] +# sorted_content.pop(0) +# +# +# cache_cleanup = {"fifo": fifo, "lastaccessed": last_accessed} diff --git a/lapis/utilities/walltime_models.py b/lapis/utilities/walltime_models.py new file mode 100644 index 0000000..1a68ed9 --- /dev/null +++ b/lapis/utilities/walltime_models.py @@ -0,0 +1,11 @@ +# walltime models for caching + + +def extrapolate_walltime_to_maximal_efficiency( + job, original_walltime, maximal_efficiency: float = 0.8 +): + return (job.used_resources["cores"] / maximal_efficiency) * original_walltime + + +# TODO: add models depending on fraction of cached files etc +walltime_models = {"maxeff": extrapolate_walltime_to_maximal_efficiency} diff --git a/lapis_tests/__init__.py b/lapis_tests/__init__.py index 722b3a2..562d4c0 100644 --- a/lapis_tests/__init__.py +++ b/lapis_tests/__init__.py @@ -1,9 +1,21 @@ -from typing import Callable, Coroutine +from typing import Callable, Coroutine, Optional from functools import wraps -from usim import run +from usim import run, Resources from lapis.drone import Drone +from lapis.job import Job +from lapis.connection import Connection + + +class UnfinishedTest(RuntimeError): + """A test did never finish""" + + def __init__(self, test_case): + self.test_case = test_case + super().__init__( + "Test case %r did not finish" % getattr(test_case, "__name__", test_case) + ) def via_usim(test_case: Callable[..., Coroutine]): @@ -22,16 +34,24 @@ async def test_sleep(): @wraps(test_case) def run_test(*args, **kwargs): - # pytest currently ignores __tracebackhide__ if we re-raise - # https://github.com/pytest-dev/pytest/issues/1904 - __tracebackhide__ = True - # >>> This is not the frame you are looking for. Do read on. <<< - return run(test_case(*args, **kwargs)) + test_completed = False + + async def complete_test_case(): + nonlocal test_completed + await test_case(*args, **kwargs) + test_completed = True + + run(complete_test_case()) + if not test_completed: + raise UnfinishedTest(test_case) return run_test class DummyScheduler: + def __init__(self): + self.statistics = Resources(job_succeeded=0, job_failed=0) + @staticmethod def register_drone(drone: Drone): pass @@ -44,6 +64,18 @@ def unregister_drone(drone: Drone): def update_drone(drone: Drone): pass + async def job_finished(self, job: Job): + if job.successful: + await self.statistics.increase(job_succeeded=1) + else: + await self.statistics.increase(job_failed=1) + class DummyDrone: - pass + sitename = None + + def __init__(self, throughput: Optional[float] = None): + if throughput: + self.connection = Connection(throughput) + else: + self.connection = None diff --git a/lapis_tests/data/htcondor_pools.csv b/lapis_tests/data/htcondor_pools.csv index 82ffa21..bb2b433 100644 --- a/lapis_tests/data/htcondor_pools.csv +++ b/lapis_tests/data/htcondor_pools.csv @@ -2,4 +2,4 @@ 2 2 224400.0 8000 2 2 223100.0 8000 1 8 196300.0 32200 - 1 4 29700.0 8000 \ No newline at end of file + 1 4 29700.0 8000 diff --git a/lapis_tests/storage_io/__init__.py b/lapis_tests/storage_io/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lapis_tests/storage_io/test_storage.py b/lapis_tests/storage_io/test_storage.py new file mode 100644 index 0000000..68adb46 --- /dev/null +++ b/lapis_tests/storage_io/test_storage.py @@ -0,0 +1,58 @@ +from tempfile import NamedTemporaryFile + +from lapis.storageelement import StorageElement +from lapis.storage_io.storage import storage_reader + + +class TestStorageReader(object): + def _create_simple_config(self, to_string=False): + storage_config = NamedTemporaryFile(suffix=".csv") + with open(storage_config.name, "w") as write_stream: + write_stream.write( + f"name sitename cachesizeGB throughput_limit\n" + f"name sitename {str(10) if to_string else 10} " + f"{str(10.1) if to_string else 10.1} {str(1) if to_string else 1}" + ) + return storage_config + + def _create_simple_files(self, to_string=False): + file_config = NamedTemporaryFile(suffix=".csv") + with open(file_config.name, "w") as write_stream: + write_stream.write( + f"filename cachename filesize storedsize cachedsince lastaccessed " + f"numberofaccesses\n" + f"file name {str(10.1) if to_string else 10.1} " + f"{str(5.0) if to_string else 5.0} " + f"{str(0) if to_string else 0} {str(0) if to_string else 0} " + f"{str(1) if to_string else 1}" + ) + return file_config + + def test_empty_files(self): + simple_config = self._create_simple_config() + count = 0 + for storage in storage_reader( + open(simple_config.name, "r+"), None, StorageElement + ): + assert storage is not None + count += 1 + assert count == 1 + + def test_simple_read(self): + for variant in [False, True]: + print(f"starting with {variant}") + simple_config = self._create_simple_config(to_string=variant) + simple_files = self._create_simple_files(to_string=variant) + count = 0 + for storage in storage_reader( + open(simple_config.name, "r"), + open(simple_files.name, "r"), + StorageElement, + ): + assert storage is not None + assert type(storage.available) == int + assert storage.available == int(5.0 * 1024 * 1024 * 1024) + assert type(storage.size) == int + assert storage.size == int(10.0 * 1024 * 1024 * 1024) + count += 1 + assert count == 1 diff --git a/lapis_tests/test_caching_hitrate_based.py b/lapis_tests/test_caching_hitrate_based.py new file mode 100644 index 0000000..7fd7fc0 --- /dev/null +++ b/lapis_tests/test_caching_hitrate_based.py @@ -0,0 +1,211 @@ +from usim import time +from tempfile import NamedTemporaryFile +import json +from functools import partial + +from lapis_tests import via_usim, DummyDrone +from lapis.connection import Connection +from lapis.storageelement import HitrateStorage +from lapis.storage_io.storage import storage_reader +from lapis.files import RequestedFile +from lapis.simulator import Simulator +from lapis.job_io.htcondor import htcondor_job_reader +from lapis.pool import StaticPool +from lapis.pool_io.htcondor import htcondor_pool_reader +from lapis.scheduler import CondorJobScheduler + + +class TestHitrateCaching(object): + def test_hitratestorage(self): + size = 1000 + hitratestorage = HitrateStorage(hitrate=0.5, size=size, files={}) + requested_file = RequestedFile(filename="testfile", filesize=100) + looked_up_file = hitratestorage.find(requested_file, job_repr=None) + + assert size == hitratestorage.available + assert 0 == hitratestorage.used + assert 100 == looked_up_file.cached_filesize + assert hitratestorage == looked_up_file.storage + + @via_usim + async def test_add_storage_to_connection(self): + throughput = 10 + size = 1000 + hitratestorage = HitrateStorage(hitrate=0.5, size=size, files={}) + connection = Connection(throughput=throughput) + connection.add_storage_element(hitratestorage) + assert hitratestorage in connection.storages[hitratestorage.sitename] + + @via_usim + async def test_determine_inputfile_source(self): + throughput = 10 + size = 1000 + requested_file = RequestedFile(filename="testfile", filesize=100) + hitratestorage = HitrateStorage(hitrate=0.5, size=size, files={}) + connection = Connection(throughput=throughput) + connection.add_storage_element(hitratestorage) + cache = await connection._determine_inputfile_source( + requested_file=requested_file, dronesite=None + ) + assert cache is hitratestorage + + @via_usim + async def test_stream_file(self): + throughput = 10 + size = 1000 + requested_file = RequestedFile(filename="testfile", filesize=100) + hitratestorage = HitrateStorage(hitrate=0.5, size=size, files={}) + connection = Connection(throughput=throughput) + connection.add_storage_element(hitratestorage) + assert 0 == time.now + await connection.stream_file(requested_file=requested_file, dronesite=None) + assert 5 == time.now + + @via_usim + async def test_single_transfer_files(self): + throughput = 10 + size = 1000 + drone = DummyDrone(throughput) + requested_files = dict(test=dict(usedsize=100)) + hitratestorage = HitrateStorage(hitrate=0.5, size=size, files={}) + drone.connection.add_storage_element(hitratestorage) + stream_time = await drone.connection.transfer_files( + drone=drone, requested_files=requested_files, job_repr="test" + ) + + assert time.now == 5 + assert stream_time == 5 + + @via_usim + async def test_simultaneous_transfer(self): + throughput = 10 + size = 1000 + drone = DummyDrone(throughput) + requested_files = dict(test1=dict(usedsize=100), test2=dict(usedsize=200)) + hitratestorage = HitrateStorage(hitrate=0.5, size=size, files={}) + drone.connection.add_storage_element(hitratestorage) + stream_time = await drone.connection.transfer_files( + drone=drone, requested_files=requested_files + ) + assert time.now == 15 + assert stream_time == 15 + + @via_usim + async def test_caching_simulation_duration_short_jobs(self): + simulator = Simulator() + with NamedTemporaryFile(suffix=".csv") as machine_config, NamedTemporaryFile( + suffix=".csv" + ) as storage_config, NamedTemporaryFile(suffix=".json") as job_config: + with open(machine_config.name, "w") as write_stream: + write_stream.write( + "TotalSlotCPUs TotalSlotDisk TotalSlotMemory Count sitename\n" + "1 44624348.0 8000 1 site1" + ) + with open(job_config.name, "w") as write_stream: + job_description = [ + { + "QDate": 0, + "RequestCpus": 1, + "RequestWalltime": 60, + "RequestMemory": 1024, + "RequestDisk": 1024, + "RemoteWallClockTime": 1.0, + "MemoryUsage": 1024, + "DiskUsage_RAW": 1024, + "RemoteSysCpu": 1.0, + "RemoteUserCpu": 0.0, + "Inputfiles": dict( + file1=dict(usedsize=10), file2=dict(usedsize=5) + ), + } + ] * 2 + json.dump(job_description, write_stream) + with open(storage_config.name, "w") as write_stream: + write_stream.write( + "name sitename cachesizeGB throughput_limit\n" + "cache1 site1 1000 1.0" + ) + + job_input = open(job_config.name, "r+") + machine_input = open(machine_config.name, "r+") + storage_input = open(storage_config.name, "r+") + storage_content_input = None + cache_hitrate = 0.5 + simulator.create_job_generator( + job_input=job_input, job_reader=htcondor_job_reader + ) + simulator.create_scheduler(scheduler_type=CondorJobScheduler) + simulator.create_connection_module(remote_throughput=1.0) + simulator.create_pools( + pool_input=machine_input, + pool_reader=htcondor_pool_reader, + pool_type=StaticPool, + ) + simulator.create_storage( + storage_input=storage_input, + storage_content_input=storage_content_input, + storage_reader=storage_reader, + storage_type=partial(HitrateStorage, cache_hitrate), + ) + simulator.run() + assert 180 == simulator.duration + + @via_usim + async def test_caching_simulation_duration_long_jobs(self): + simulator = Simulator() + with NamedTemporaryFile(suffix=".csv") as machine_config, NamedTemporaryFile( + suffix=".csv" + ) as storage_config, NamedTemporaryFile(suffix=".json") as job_config: + with open(machine_config.name, "w") as write_stream: + write_stream.write( + "TotalSlotCPUs TotalSlotDisk TotalSlotMemory Count sitename\n" + "1 44624348.0 8000 1 site1" + ) + with open(job_config.name, "w") as write_stream: + job_description = [ + { + "QDate": 0, + "RequestCpus": 1, + "RequestWalltime": 60, + "RequestMemory": 1024, + "RequestDisk": 1024, + "RemoteWallClockTime": 1.0, + "MemoryUsage": 1024, + "DiskUsage_RAW": 1024, + "RemoteSysCpu": 1.0, + "RemoteUserCpu": 0.0, + "Inputfiles": dict( + file1=dict(usedsize=60), file2=dict(usedsize=60) + ), + } + ] * 2 + json.dump(job_description, write_stream) + with open(storage_config.name, "w") as write_stream: + write_stream.write( + "name sitename cachesizeGB throughput_limit\n" + "cache1 site1 1000 1.0" + ) + + job_input = open(job_config.name, "r+") + machine_input = open(machine_config.name, "r+") + storage_input = open(storage_config.name, "r+") + storage_content_input = None + cache_hitrate = 0.5 + simulator.create_job_generator( + job_input=job_input, job_reader=htcondor_job_reader + ) + simulator.create_scheduler(scheduler_type=CondorJobScheduler) + simulator.create_connection_module(remote_throughput=1.0) + simulator.create_pools( + pool_input=machine_input, + pool_reader=htcondor_pool_reader, + pool_type=StaticPool, + ) + simulator.create_storage( + storage_input=storage_input, + storage_content_input=storage_content_input, + storage_reader=storage_reader, + storage_type=partial(HitrateStorage, cache_hitrate), + ) + simulator.run() + assert 300 == simulator.duration diff --git a/lapis_tests/test_job.py b/lapis_tests/test_job.py index 181bb1a..3d03070 100644 --- a/lapis_tests/test_job.py +++ b/lapis_tests/test_job.py @@ -4,6 +4,7 @@ from lapis.drone import Drone from lapis.job import Job from lapis_tests import via_usim, DummyScheduler, DummyDrone +from lapis.connection import Connection class TestJob(object): @@ -47,10 +48,15 @@ async def test_job_in_drone(self): scheduler=scheduler, pool_resources={"cores": 1, "memory": 1}, scheduling_duration=0, + connection=Connection(), ) - await drone.run() async with Scope() as scope: + scope.do(drone.run(), volatile=True) scope.do(drone.schedule_job(job=job)) + await ( + scheduler.statistics._available + == scheduler.statistics.resource_type(job_succeeded=1) + ) assert 10 == time.now assert 0 == job.waiting_time assert job.successful @@ -67,9 +73,13 @@ async def test_nonmatching_job_in_drone(self): pool_resources={"cores": 1, "memory": 1}, scheduling_duration=0, ) - await drone.run() async with Scope() as scope: + scope.do(drone.run(), volatile=True) scope.do(drone.schedule_job(job=job)) + await ( + scheduler.statistics._available + == scheduler.statistics.resource_type(job_failed=1) + ) assert 0 == time assert not job.successful assert 0 == job.waiting_time @@ -90,10 +100,14 @@ async def test_two_nonmatching_jobs(self): pool_resources={"cores": 1, "memory": 1}, scheduling_duration=0, ) - await drone.run() async with Scope() as scope: + scope.do(drone.run(), volatile=True) scope.do(drone.schedule_job(job=job_one)) scope.do(drone.schedule_job(job=job_two)) + await ( + scheduler.statistics._available + == scheduler.statistics.resource_type(job_succeeded=1, job_failed=1) + ) assert 10 == time assert job_one.successful assert not job_two.successful @@ -116,10 +130,14 @@ async def test_two_matching_jobs(self): pool_resources={"cores": 2, "memory": 2}, scheduling_duration=0, ) - await drone.run() async with Scope() as scope: + scope.do(drone.run(), volatile=True) scope.do(drone.schedule_job(job=job_one)) scope.do(drone.schedule_job(job=job_two)) + await ( + scheduler.statistics._available + == scheduler.statistics.resource_type(job_succeeded=2) + ) assert 10 == time assert job_one.successful assert job_two.successful diff --git a/pyproject.toml b/pyproject.toml index 3217107..e60c348 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ classifiers = [ "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7", ] -requires = ["cobald", "usim == 0.4", "click"] +requires = ["cobald", "usim >= 0.4.3", "click"] [tool.flit.metadata.requires-extra] test = [