-
Notifications
You must be signed in to change notification settings - Fork 1
Caching integration #72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
maxfischer2781
wants to merge
159
commits into
master
Choose a base branch
from
feature/caching
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 125 commits
Commits
Show all changes
159 commits
Select commit
Hold shift + click to select a range
9215d64
added storage object to represent caches and associated readout funct…
tfesenbecker 0a46da7
extended CLI to support storage files
tfesenbecker e727b69
extended simulator to support storage files
tfesenbecker d64954d
added new drone attribute sitename connecting drones and storage elem…
tfesenbecker 5597434
added file provider object connecting storage objects and jobs
tfesenbecker bb1dcbe
added different caching/cache cleaning/walltime recalculation algorithms
tfesenbecker 8d6db96
renamed storage readout
tfesenbecker fb150db
fixed debug output
tfesenbecker 69072ae
renamed storage input reader
tfesenbecker 53ebec5
updated Job class
tfesenbecker f997223
replaced function modifying walltime by function with property decorator
tfesenbecker 2e2c06f
Revert "replaced function modifying walltime by function with propert…
tfesenbecker 110b3e9
replaced function modifying walltime by function with property decorator
tfesenbecker b032a0d
resolving PEP8 issues
tfesenbecker 7753d0d
Merge branch 'master' of https://github.com/MatterMiners/lapis into c…
tfesenbecker 5123034
fixed file provider bug (wrong inputfiles dictionary)
tfesenbecker 1c2fe9f
Update lapis/cli/simulate.py
tfesenbecker 8739ce9
renamed function get_used_storage to _calculate_used_storage
tfesenbecker 0b5a922
Merge branch 'cachingextension' of https://github.com/tfesenbecker/la…
tfesenbecker 855242a
attached fileprovider to drone instead of job and passed it via make_…
tfesenbecker bfadacb
reworked file coverage function to return a score
tfesenbecker 3f30c58
added proper __repr__ function
tfesenbecker 2b214aa
added file classes
tfesenbecker 2bd91d7
moved caching algorithm and associated cache cleanup to it's own class
tfesenbecker 29576eb
Redesign of the storage class and associated changes
tfesenbecker 146fbe3
put walltime getter and walltime recalculation back in seperate methods
tfesenbecker 7ef8dd9
added parallel treatment of jobs input files in file provider
tfesenbecker b94ab82
fixed failed unit test that were caused by Drone without file provide…
tfesenbecker 1e9e795
Merge branch 'master' of https://github.com/MatterMiners/lapis into c…
tfesenbecker 191df2b
changed scoring to take filesizes into consideration
tfesenbecker a635318
Merge branch 'cachingextension' into feature/storageimprovement
tfesenbecker 6f7ace1
Merge pull request #1 from tfesenbecker/feature/storageimprovement
tfesenbecker 75165ad
fixed bug from merge
tfesenbecker d943ed6
Merge branch 'cachingextension' of https://github.com/tfesenbecker/la…
tfesenbecker 9453632
Merge pull request #2 from tfesenbecker/feature/storageimprovement
tfesenbecker 32faa38
removed debug output to fix unit test
tfesenbecker 78a6f18
First steps towards including everything concerning caching into moni…
tfesenbecker c7c2e03
renamed method
tfesenbecker 73ada77
split processing of job into file transfer and actual calculation
tfesenbecker 02a79bb
refactored storage and file provider objects in order to use Pipe
tfesenbecker 63664e0
added monitoring for remote and storage connections
tfesenbecker 2df3841
small fix in monitoring
tfesenbecker a6a9783
Merge branch 'master' of https://github.com/MatterMiners/lapis into f…
tfesenbecker 6e5cdd7
Merge branch 'master' of https://github.com/MatterMiners/lapis into f…
tfesenbecker cf5b3ab
adapted job walltime to new job processing in order to fix job event …
tfesenbecker 3768f5b
minor clean ups
tfesenbecker df17230
added cache modelation via cachehitrate
tfesenbecker 128425f
Update lapis/cachealgorithm.py
tfesenbecker 04223e4
Update lapis/storage.py
tfesenbecker 59830d7
Update lapis/storage.py
tfesenbecker a7b3323
Update lapis/storage.py
tfesenbecker eff97c4
Update lapis/storage.py
tfesenbecker ffca7a3
Update lapis/storage.py
tfesenbecker 97e3f83
Update lapis/storage.py
tfesenbecker a4ceec4
resolved PEP8 issue
tfesenbecker b2cb120
Merge pull request #61 from tfesenbecker/feature/includepipes
tfesenbecker cdeeea6
minor fix
tfesenbecker edfc8f1
moved definition of remote throughput to CLI input, storage object th…
tfesenbecker 25a2a23
Extended cache algorithm documentation
tfesenbecker b374cf6
implemented minor changes requested in PRs
tfesenbecker 4487db6
Update lapis/storage.py
tfesenbecker 521529f
Merge branch 'master' of https://github.com/MatterMiners/lapis into f…
tfesenbecker 5ebf272
updated usim version requirement to 0.4.2
tfesenbecker 1e814ea
completed renaming of file provider to connection
tfesenbecker 43361c9
fixed job and simulator unit tests
tfesenbecker d47789f
fixed job and simulator unit tests
tfesenbecker a443618
replaced Storage.__repr__ to match the other classes
tfesenbecker cca1859
added missing default values for unit test compatibility
tfesenbecker 36f5966
extended monitoring
tfesenbecker be5482f
fixed PEP8 issue
tfesenbecker 4a358cf
updated via_usim decorator
eileen-kuehn d0051c2
added statistics about jobs in DummyScheduler
eileen-kuehn 4cae9be
made unit tests succeed again
eileen-kuehn 19ff945
made enabling of monitoring explicit
eileen-kuehn 719fc97
blackened file
eileen-kuehn b87064d
changed cli to also start without any caching information
eileen-kuehn c1ae198
changed assignment of connections a bit
eileen-kuehn e57cd00
removed creation of connection module from test as it is not required
eileen-kuehn 6df1b27
converted storage and file sizes to bytes
eileen-kuehn aced023
Merge branch 'master' into feature/caching
eileen-kuehn 000be38
corrected access to numberofaccesses
eileen-kuehn 08c6432
changed signature of StoredFile and adapted in IO operations
eileen-kuehn 883b758
improved storage
eileen-kuehn df362a5
Merge branch 'master' into feature/caching
eileen-kuehn d5673b4
minimum required usim version set to 0.4.3
eileen-kuehn 65fd2be
renamed remove_from_storage and add_to_storage to remove and add
eileen-kuehn 325194d
made free_space a property of storage
eileen-kuehn 22e4ea7
removed method find_file from storage
eileen-kuehn 7faf2ee
added todo
eileen-kuehn e7a21e7
ignored B006 for flake8
eileen-kuehn a06dc62
if file is available on storage, transfer now receives correct size
eileen-kuehn 3e3752b
fixed position of noqa
eileen-kuehn a570d50
made determine_inputfile_source private to connection
eileen-kuehn 4ce52df
renamed transfer_inputfiles to transfer_files
eileen-kuehn 6eb4615
removed queue from file lookup in storage and improved determine inpu…
eileen-kuehn 7a7492f
improved stream file in connection
eileen-kuehn 6a8801b
introduced HitrateStorage that transfers data based on a cache hitrate
eileen-kuehn 43577ac
fixed position of noqa
eileen-kuehn 19bde1f
removed cachehitrate from connection
eileen-kuehn 0c2bd3d
connection now sets reference to remote_connection for storage
eileen-kuehn 122a351
storage objects are now created based on specified cache hit rate
eileen-kuehn 23a7d7b
adapted usage of caching for jobs
eileen-kuehn e2e8f57
introduced calculation efficiency to job
eileen-kuehn b8e51fb
introduced calculation efficiency for jobs to cli
eileen-kuehn 2056b41
added more type hints for job
eileen-kuehn ddb5727
removed initialisation of connection
eileen-kuehn f730a51
moved caching related monitoring to extra file
eileen-kuehn 1bb6b13
each simulation run now can be identified
eileen-kuehn 4c771a0
added caching-specific monitoring information to documentation
eileen-kuehn 250ddae
added type hints for simulator
eileen-kuehn ddba83b
changed sizes for storage to bytes
eileen-kuehn f49c4ef
fixed bug leading to full RAM
tfesenbecker 7d92a69
Merge branch 'feature/caching' of https://github.com/MatterMiners/lap…
tfesenbecker 0b3b81e
added RemoteStorage
eileen-kuehn e2650c0
renamed storagesize to size and ensured correct units
eileen-kuehn 0adb6d0
ensured that size is always int
eileen-kuehn ac73c82
renamed method again
eileen-kuehn 17b23da
fixed semmle issue
eileen-kuehn 09ba7a4
added type hints
eileen-kuehn eae5555
removed cachealgorithm from storage and moved to connection
eileen-kuehn 2e6c1dd
fixed bug leading to full RAM again
tfesenbecker 0e3c573
fixed bug leading to full RAM again
tfesenbecker 27c1ea7
Merge branch 'feature/caching' of https://github.com/MatterMiners/lap…
tfesenbecker 2a61f00
fix hit rate based caching functionality
tfesenbecker 4a95f3a
added first test for storage io
eileen-kuehn 0e89302
adapted access to connection for RemoteStorage
eileen-kuehn 5fc8b37
added new test for storage input
eileen-kuehn d3ff0da
fixed assignment of remote storage
eileen-kuehn cbc1507
reverted change of transfer signature and added typehints
eileen-kuehn d0124d1
introduced interface for storage
eileen-kuehn c7af203
added docstrings to storage interface
eileen-kuehn 0019583
extended tests
eileen-kuehn 01efb93
removed public update method from storage and made update private
eileen-kuehn a7177f7
added unit conversion for storageelement connection and remote connec…
tfesenbecker d684dc8
reformated debug output
tfesenbecker 328e6dc
added debug output and fixed wrong function call
tfesenbecker 0e49c32
added debug output and fixed wrong function call
tfesenbecker 98dbba1
Merge branch 'feature/caching' of https://github.com/MatterMiners/lap…
tfesenbecker 93589ba
Merge branch 'master' of https://github.com/MatterMiners/lapis into f…
tfesenbecker 2351f51
reduced debug output, added default value for job_repr
tfesenbecker 9e3a446
Added unittests to test functionality of Connection class with Hitrat…
tfesenbecker 5cdcf35
extended type hints
tfesenbecker 2747bce
fixed error in _usedstorage calculation
tfesenbecker 4f6132f
storage.py now catches exception caused by not specified storage cont…
tfesenbecker fa0e98f
fixed test_storage unit tests
tfesenbecker d01ba77
added new unit tests for hitrate based caching
tfesenbecker 5477344
removed forgotten debug output
tfesenbecker 2dc77ff
storage_content_reader() handles empty files correctly now
tfesenbecker c573ac5
fixed line length
tfesenbecker f6fec85
fixed line length
tfesenbecker 2fb8afe
Monitor.run now properly closes the sampling aiter
maxfischer2781 a5d13de
made the linter happy
maxfischer2781 d277354
added default value for calculation_efficiency
tfesenbecker a165249
remove calculation_efficiency default value
tfesenbecker 691d523
corrected wrong attribute names
tfesenbecker b8ae864
fixed usage of transfer() interface and debug outputs
tfesenbecker 73d6352
fixed debug output leading to failing unit test
tfesenbecker 3c58797
fixed bug in filesize unit conversion
tfesenbecker 616a7e1
Extended hitrate based caching to support different cache hitrates fo…
tfesenbecker File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| from typing import Optional, Callable, Tuple | ||
|
|
||
| from lapis.files import RequestedFile, StoredFile | ||
| from lapis.storage import Storage | ||
| from lapis.utilities.cache_cleanup_implementations import sort_files_by_cachedsince | ||
|
|
||
|
|
||
| def check_size(file: RequestedFile, storage: Storage): | ||
| return storage.size >= file.filesize | ||
|
|
||
|
|
||
| def check_relevance(file: RequestedFile, storage: Storage): | ||
| return True | ||
|
|
||
|
|
||
| def delete_oldest( | ||
| file: RequestedFile, storage: Storage | ||
| ) -> Tuple[bool, Tuple[StoredFile]]: | ||
| deletable_files = [] | ||
| currently_free = storage.free_space | ||
| if currently_free < storage.free_space: | ||
| sorted_files = sort_files_by_cachedsince(storage.files.items()) | ||
| while currently_free < file.filesize: | ||
| deletable_files.append(next(sorted_files)) | ||
| currently_free += deletable_files[-1].filesize | ||
| return True, tuple(deletable_files) | ||
|
|
||
|
|
||
| def delete_oldest_few_used( | ||
| file: RequestedFile, storage: Storage | ||
| ) -> Tuple[bool, Optional[Tuple[StoredFile]]]: | ||
| deletable_files = [] | ||
| currently_free = storage.free_space | ||
| if currently_free < storage.free_space: | ||
| sorted_files = sort_files_by_cachedsince(storage.files.items()) | ||
| for current_file in sorted_files: | ||
| if current_file.numberofaccesses < 3: | ||
| deletable_files.append(current_file) | ||
| currently_free += deletable_files[-1].filesize | ||
| if currently_free >= file.filesize: | ||
| return True, tuple(deletable_files) | ||
| return False, None | ||
|
|
||
|
|
||
| class CacheAlgorithm(object): | ||
| def __init__(self, caching_strategy: Callable, deletion_strategy: Callable): | ||
| self._caching_strategy = lambda file, storage: check_size( | ||
| file, storage | ||
| ) and check_relevance(file, storage) | ||
| self._deletion_strategy = lambda file, storage: delete_oldest(file, storage) | ||
|
|
||
| def consider( | ||
| self, file: RequestedFile, storage: Storage | ||
| ) -> Tuple[bool, Optional[Tuple[StoredFile]]]: | ||
| if self._caching_strategy(file, storage): | ||
| return self._deletion_strategy(file, storage) | ||
| return False, None |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,137 @@ | ||
| import random | ||
|
|
||
| from typing import Union | ||
| from usim import Scope, time, Pipe | ||
|
|
||
| from lapis.cachealgorithm import ( | ||
| CacheAlgorithm, | ||
| check_size, | ||
| check_relevance, | ||
| delete_oldest_few_used, | ||
| ) | ||
| from lapis.storage import Storage, RemoteStorage | ||
| from lapis.files import RequestedFile | ||
| from lapis.monitor import sampling_required | ||
|
|
||
|
|
||
| class Connection(object): | ||
|
|
||
| __slots__ = ("storages", "remote_connection", "caching_algorithm") | ||
|
|
||
| def __init__(self, throughput=100): | ||
| self.storages = dict() | ||
| self.remote_connection = RemoteStorage(Pipe(throughput=throughput)) | ||
| self.caching_algorithm = CacheAlgorithm( | ||
| caching_strategy=lambda file, storage: check_size(file, storage) | ||
| and check_relevance(file, storage), | ||
| deletion_strategy=lambda file, storage: delete_oldest_few_used( | ||
| file, storage | ||
| ), | ||
| ) | ||
|
|
||
| def add_storage_element(self, storage_element: Storage): | ||
| """ | ||
| Register storage element in Connetion module clustering storage elements by | ||
| sitename | ||
| :param storage_element: | ||
| :return: | ||
| """ | ||
| storage_element.remote_connection = self.remote_connection | ||
| try: | ||
| self.storages[storage_element.sitename].append(storage_element) | ||
| except KeyError: | ||
| self.storages[storage_element.sitename] = [storage_element] | ||
|
|
||
| async def _determine_inputfile_source( | ||
| self, requested_file: RequestedFile, dronesite: str, job_repr: str | ||
| ) -> Union[Storage, RemoteStorage]: | ||
| """ | ||
| Collects NamedTuples containing the amount of data of the requested file | ||
| cached in a storage element and the storage element for all reachable storage | ||
| objects on the drone's site. The tuples are sorted by amount of cached data | ||
| and the storage object where the biggest part of the file is cached is | ||
| returned. If the file is not cached in any storage object the connection module | ||
| remote connection is returned. | ||
| :param requested_file: | ||
| :param dronesite: | ||
| :param job_repr: | ||
| :return: | ||
| """ | ||
| provided_storages = self.storages.get(dronesite, None) | ||
| if provided_storages is not None: | ||
| look_up_list = [] | ||
| for storage in provided_storages: | ||
| look_up_list.append(storage.look_up_file(requested_file, job_repr)) | ||
| storage_list = sorted( | ||
| [entry for entry in look_up_list], key=lambda x: x[0], reverse=True | ||
| ) | ||
| for entry in storage_list: | ||
| # TODO: check should better check that size is bigger than requested | ||
| if entry.cached_filesize > 0: | ||
| return entry.storage | ||
| return self.remote_connection | ||
|
|
||
| async def stream_file(self, requested_file: RequestedFile, dronesite, job_repr): | ||
| """ | ||
| Determines which storage object is used to provide the requested file and | ||
| startes the files transfer. For files transfered via remote connection a | ||
| potential cache decides whether to cache the file and handles the caching | ||
| process. | ||
| :param requested_file: | ||
| :param dronesite: | ||
| :param job_repr: | ||
| :return: | ||
| """ | ||
| used_connection = await self._determine_inputfile_source( | ||
| requested_file, dronesite, job_repr | ||
| ) | ||
| await sampling_required.put(used_connection) | ||
| if used_connection == self.remote_connection and self.storages.get( | ||
| dronesite, None | ||
| ): | ||
| try: | ||
| potential_cache = random.choice(self.storages[dronesite]) | ||
| cache_file, files_for_deletion = self.caching_algorithm.consider( | ||
| file=requested_file, storage=potential_cache | ||
| ) | ||
| if cache_file: | ||
| for file in files_for_deletion: | ||
| await potential_cache.remove(file, job_repr) | ||
| await potential_cache.add(requested_file, job_repr) | ||
| else: | ||
| print( | ||
| f"APPLY CACHING DECISION: Job {job_repr}, File {requested_file.filename}: File wasnt " | ||
| f"cached @ {time.now}" | ||
| ) | ||
| except KeyError: | ||
| pass | ||
| print(f"now transfering {requested_file.filesize} from {used_connection}") | ||
| await used_connection.transfer(requested_file, job_repr) | ||
| print( | ||
| "Job {}: finished transfering of file {}: {}GB @ {}".format( | ||
| job_repr, requested_file.filename, requested_file.filesize, time.now | ||
| ) | ||
| ) | ||
|
|
||
| async def transfer_files(self, drone, requested_files: dict, job_repr): | ||
| """ | ||
| Converts dict information about requested files to RequestedFile object and | ||
| parallely launches streaming for all files | ||
| :param drone: | ||
| :param requested_files: | ||
| :param job_repr: | ||
| :return: | ||
| """ | ||
| print("registered caches", self.storages) | ||
| start_time = time.now | ||
| async with Scope() as scope: | ||
| for inputfilename, inputfilespecs in requested_files.items(): | ||
| requested_file = RequestedFile( | ||
| inputfilename, inputfilespecs["usedsize"] | ||
| ) | ||
| scope.do(self.stream_file(requested_file, drone.sitename, job_repr)) | ||
| stream_time = time.now - start_time | ||
| print( | ||
| "STREAMED files {} in {}".format(list(requested_files.keys()), stream_time) | ||
| ) | ||
| return stream_time | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure that it is better to look up the file in the different storages sequentially and not more parallely?
I thought that XRootDs behaviour should be represented best if the look up is started at about the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is similar to my comment on the initial PR #53 (which I can't seem to find anymore...).
To get this out of the way first: Performing the lookup concurrently is fine by itself, and something we should add later on again. The issue is how this happened combined with it not being needed yet.
At the moment,
look_up_filesucceeds immediately (both realtime and simulated). Running it concurrently adds no benefit at all, it only complicates the code. In the interest of maintainability, not using concurrency makes it easier to keep working with the code. That includes adding concurrency back in later on.The old how is difficult to understand and thus verify/maintain/extend.
look_up_filedid either return directly to the caller or return via a side-effect to the queue. Consequently, the caller had to support two different call mechanisms as well. Changing anything about that, e.g. a different mechanism such as "get the first X successful results", would need changing multiple places at once.As a roadmap for the future, it would be better to use a modular concurrency approach here. What you've implemented is the gather pattern (kudos for that, by the way) -- which could be split into a regular async function (
look_up_file) and the concurrency pattern.Modularising it like this means:
look_up_filenorConnection(or any other usages) have to change significantly if we change the call style.