From baeccba99329876120f6526f78e475f23a6ff021 Mon Sep 17 00:00:00 2001 From: lashemilt Date: Thu, 17 Mar 2022 10:07:41 +0000 Subject: [PATCH 1/8] rewriting the search_existing_files functions so that we can use a decorator more easily --- .../engines/file_watchdog_engine.py | 53 ++++++++----------- 1 file changed, 23 insertions(+), 30 deletions(-) diff --git a/rfi_file_monitor/engines/file_watchdog_engine.py b/rfi_file_monitor/engines/file_watchdog_engine.py index 73a826d..6194b3c 100644 --- a/rfi_file_monitor/engines/file_watchdog_engine.py +++ b/rfi_file_monitor/engines/file_watchdog_engine.py @@ -125,37 +125,30 @@ def should_exit(self, value: bool): def _search_for_existing_files(self, directory: Path) -> List[RegularFile]: rv: List[RegularFile] = list() - for child in directory.iterdir(): - if ( - child.is_file() - and not child.is_symlink() - and match_path( - child, - included_patterns=self._included_patterns, - excluded_patterns=self._excluded_patterns, - case_sensitive=False, - ) - ): + path_tree = os.walk(directory) + for root, dirs, files in path_tree: + for fname in files: + if (not Path(fname).is_symlink() and + match_path( + Path(fname), + included_patterns=self._included_patterns, + excluded_patterns=self._excluded_patterns, + case_sensitive=False, + ) + + ): + file_path = Path(os.path.join(root,fname)) + relative_file_path = file_path.relative_to( + self.params.monitored_directory + ) + _file = RegularFile( + str(file_path), + relative_file_path, + get_file_creation_timestamp(file_path), + FileStatus.SAVED, + ) + rv.append(_file) - file_path = directory.joinpath(child) - relative_file_path = file_path.relative_to( - self.params.monitored_directory - ) - _file = RegularFile( - str(file_path), - relative_file_path, - get_file_creation_timestamp(file_path), - FileStatus.SAVED, - ) - rv.append(_file) - elif ( - self.params.monitor_recursively - and child.is_dir() - and not child.is_symlink() - ): - rv.extend( - self._search_for_existing_files(directory.joinpath(child)) - ) return rv def run(self): From 52cd6d1a0384566e3a9a775ed1df4db8df2020d5 Mon Sep 17 00:00:00 2001 From: lashemilt Date: Fri, 18 Mar 2022 08:40:10 +0000 Subject: [PATCH 2/8] first pass at bulk upload decorator --- .../engines/file_watchdog_engine.py | 49 ++++++++++++------- rfi_file_monitor/utils/decorators.py | 12 +++++ 2 files changed, 43 insertions(+), 18 deletions(-) diff --git a/rfi_file_monitor/engines/file_watchdog_engine.py b/rfi_file_monitor/engines/file_watchdog_engine.py index 6194b3c..2aaf383 100644 --- a/rfi_file_monitor/engines/file_watchdog_engine.py +++ b/rfi_file_monitor/engines/file_watchdog_engine.py @@ -20,6 +20,7 @@ exported_filetype, with_advanced_settings, with_pango_docs, + do_bulk_upload ) from .file_watchdog_engine_advanced_settings import ( FileWatchdogEngineAdvancedSettings, @@ -151,6 +152,30 @@ def _search_for_existing_files(self, directory: Path) -> List[RegularFile]: return rv + @do_bulk_upload + def process_existing_files(self, existing_files): + try: + + GLib.idle_add( + self._engine._appwindow._queue_manager.add, + existing_files, + priority=GLib.PRIORITY_DEFAULT_IDLE, + ) + except Exception as e: + self._engine.cleanup() + GLib.idle_add( + self._engine.abort, + self._task_window, + e, + priority=GLib.PRIORITY_HIGH, + ) + GLib.idle_add( + self._engine.kill_task_window, + self._task_window, + priority=GLib.PRIORITY_HIGH, + ) + return + def run(self): # confirm patterns are valid if bool( @@ -173,24 +198,12 @@ def run(self): self._task_window.set_text, "Processing existing files...", ) - try: - existing_files = self._search_for_existing_files( - Path(self.params.monitored_directory) - ) - GLib.idle_add( - self._engine._appwindow._queue_manager.add, - existing_files, - priority=GLib.PRIORITY_DEFAULT_IDLE, - ) - except Exception as e: - self._engine.cleanup() - GLib.idle_add( - self._engine.abort, - self._task_window, - e, - priority=GLib.PRIORITY_HIGH, - ) - return + + existing_files = self._search_for_existing_files( + Path(self.params.monitored_directory) + ) + self.process_existing_files(existing_files) + return # if we get here, things should be working. # close task_window diff --git a/rfi_file_monitor/utils/decorators.py b/rfi_file_monitor/utils/decorators.py index ad6451b..819c357 100644 --- a/rfi_file_monitor/utils/decorators.py +++ b/rfi_file_monitor/utils/decorators.py @@ -18,6 +18,7 @@ import collections.abc import functools import threading +from time import sleep logger = logging.getLogger(__name__) @@ -187,3 +188,14 @@ def wrapper(self: Operation, file: File): raise NotImplementedError(f"{type(file)} is currently unsupported") return wrapper + +def do_bulk_upload( process_existing_files: Callable[List]): + @functools.wraps(process_existing_files) + def wrapper(self: Engine, existing_files: List): + no_files = sum(len(fs) for _, _, fs, in os.walk(*args)) + if no_files > 5000: # do not like this hard coded value but will put this in for performance testing + chunked_input = [existing_files[i:i + 5000] for i in range(0, len(existing_files), 5000)] + for rv in chunked_input: + process_existing_files(self, rv) + sleep(60) # calculate this based on the file weight? + return wrapper From 1e707a2362ea5d646940d4d4f2d0d5b84f0bf9f2 Mon Sep 17 00:00:00 2001 From: lashemilt Date: Fri, 18 Mar 2022 11:25:59 +0000 Subject: [PATCH 3/8] first pass at checking weights --- rfi_file_monitor/utils/decorators.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/rfi_file_monitor/utils/decorators.py b/rfi_file_monitor/utils/decorators.py index 819c357..4213ede 100644 --- a/rfi_file_monitor/utils/decorators.py +++ b/rfi_file_monitor/utils/decorators.py @@ -20,6 +20,7 @@ import threading from time import sleep + logger = logging.getLogger(__name__) _app = Gio.Application.get_default() @@ -192,10 +193,15 @@ def wrapper(self: Operation, file: File): def do_bulk_upload( process_existing_files: Callable[List]): @functools.wraps(process_existing_files) def wrapper(self: Engine, existing_files: List): - no_files = sum(len(fs) for _, _, fs, in os.walk(*args)) - if no_files > 5000: # do not like this hard coded value but will put this in for performance testing - chunked_input = [existing_files[i:i + 5000] for i in range(0, len(existing_files), 5000)] + + if len(existing_files) > 2000: # do not like this hard coded value but it is emperically derived - this is the max number of files that a queue can take without a long wait for users. + chunked_input = [existing_files[i:i + 2000] for i in range(0, len(existing_files), 2000)] for rv in chunked_input: + chunk_weight = sum([Path(file.filename).stat().st_size for file in rv]) process_existing_files(self, rv) - sleep(60) # calculate this based on the file weight? + sleep_time = chunk_weight/(150e6) + logger.info(msg=f'Chunk Weight: {chunk_weight} *********************************************') + logger.info(msg=f'Sleep time: {sleep_time} *********************************************') + sleep(sleep_time) # assuming good network speed as we don't want to block upload with unnecessary waiting 150 MiB/s + return wrapper From 10e2ca98ac5eef8d7e091abb9e5390c6228d1312 Mon Sep 17 00:00:00 2001 From: lashemilt Date: Fri, 18 Mar 2022 13:41:22 +0000 Subject: [PATCH 4/8] updating how long to sleep --- .../engines/file_watchdog_engine.py | 25 ++++++++----------- rfi_file_monitor/utils/decorators.py | 25 +++++++++++++------ 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/rfi_file_monitor/engines/file_watchdog_engine.py b/rfi_file_monitor/engines/file_watchdog_engine.py index 2aaf383..9871ee7 100644 --- a/rfi_file_monitor/engines/file_watchdog_engine.py +++ b/rfi_file_monitor/engines/file_watchdog_engine.py @@ -20,7 +20,7 @@ exported_filetype, with_advanced_settings, with_pango_docs, - do_bulk_upload + do_bulk_upload, ) from .file_watchdog_engine_advanced_settings import ( FileWatchdogEngineAdvancedSettings, @@ -129,16 +129,13 @@ def _search_for_existing_files(self, directory: Path) -> List[RegularFile]: path_tree = os.walk(directory) for root, dirs, files in path_tree: for fname in files: - if (not Path(fname).is_symlink() and - match_path( - Path(fname), - included_patterns=self._included_patterns, - excluded_patterns=self._excluded_patterns, - case_sensitive=False, - ) - + if not Path(fname).is_symlink() and match_path( + Path(fname), + included_patterns=self._included_patterns, + excluded_patterns=self._excluded_patterns, + case_sensitive=False, ): - file_path = Path(os.path.join(root,fname)) + file_path = Path(os.path.join(root, fname)) relative_file_path = file_path.relative_to( self.params.monitored_directory ) @@ -170,10 +167,10 @@ def process_existing_files(self, existing_files): priority=GLib.PRIORITY_HIGH, ) GLib.idle_add( - self._engine.kill_task_window, - self._task_window, - priority=GLib.PRIORITY_HIGH, - ) + self._engine.kill_task_window, + self._task_window, + priority=GLib.PRIORITY_HIGH, + ) return def run(self): diff --git a/rfi_file_monitor/utils/decorators.py b/rfi_file_monitor/utils/decorators.py index 4213ede..14ae21d 100644 --- a/rfi_file_monitor/utils/decorators.py +++ b/rfi_file_monitor/utils/decorators.py @@ -190,18 +190,27 @@ def wrapper(self: Operation, file: File): return wrapper -def do_bulk_upload( process_existing_files: Callable[List]): + +def do_bulk_upload(process_existing_files: Callable[List]): @functools.wraps(process_existing_files) def wrapper(self: Engine, existing_files: List): - if len(existing_files) > 2000: # do not like this hard coded value but it is emperically derived - this is the max number of files that a queue can take without a long wait for users. - chunked_input = [existing_files[i:i + 2000] for i in range(0, len(existing_files), 2000)] + if ( + len(existing_files) > 2000 + ): # do not like this hard coded value but it is emperically derived - + # this is the max number of files that a queue can take without a long wait for users. + chunked_input = [ + existing_files[i : i + 2000] + for i in range(0, len(existing_files), 2000) + ] for rv in chunked_input: - chunk_weight = sum([Path(file.filename).stat().st_size for file in rv]) + chunk_weight = sum( + [Path(file.filename).stat().st_size for file in rv] + ) process_existing_files(self, rv) - sleep_time = chunk_weight/(150e6) - logger.info(msg=f'Chunk Weight: {chunk_weight} *********************************************') - logger.info(msg=f'Sleep time: {sleep_time} *********************************************') - sleep(sleep_time) # assuming good network speed as we don't want to block upload with unnecessary waiting 150 MiB/s + sleep_time = (chunk_weight/50e6)*0.75 + sleep( + sleep_time + ) # assuming 50 Mb/s network speed as we don't want to block upload with unnecessary waiting. Start queuing up files when we are 75% done. return wrapper From 4c8528f24e0d4fb3534f01f7427e1da1a5ee84b4 Mon Sep 17 00:00:00 2001 From: lashemilt Date: Fri, 18 Mar 2022 17:16:43 +0000 Subject: [PATCH 5/8] have a better way of refreshing the queue that looks at the queue manager itself and refreshes when there are only 10% of the files to go --- rfi_file_monitor/utils/decorators.py | 29 +++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/rfi_file_monitor/utils/decorators.py b/rfi_file_monitor/utils/decorators.py index 14ae21d..c6a1c90 100644 --- a/rfi_file_monitor/utils/decorators.py +++ b/rfi_file_monitor/utils/decorators.py @@ -19,7 +19,7 @@ import functools import threading from time import sleep - +from ..file import FileStatus logger = logging.getLogger(__name__) @@ -195,22 +195,33 @@ def do_bulk_upload(process_existing_files: Callable[List]): @functools.wraps(process_existing_files) def wrapper(self: Engine, existing_files: List): + chunk_size = 2000 if ( - len(existing_files) > 2000 - ): # do not like this hard coded value but it is emperically derived - + len(existing_files) > chunk_size + ): + + # do not like this hard coded value but it is emperically derived - # this is the max number of files that a queue can take without a long wait for users. chunked_input = [ - existing_files[i : i + 2000] - for i in range(0, len(existing_files), 2000) + existing_files[i : i + chunk_size] + for i in range(0, len(existing_files), chunk_size) ] + n= 1 + processed_files = 0 for rv in chunked_input: chunk_weight = sum( [Path(file.filename).stat().st_size for file in rv] ) process_existing_files(self, rv) - sleep_time = (chunk_weight/50e6)*0.75 - sleep( - sleep_time - ) # assuming 50 Mb/s network speed as we don't want to block upload with unnecessary waiting. Start queuing up files when we are 75% done. + + while processed_files < chunk_size*n*0.9: # refresh the list when we are at 90% of the size + j = 0 + processed_files = sum([ + item + for item in self._engine._appwindow._queue_manager._files_dict.values() + if item.status == FileStatus.SUCCESS + ]) + sleep(1) + n = n + 1 return wrapper From 1433d0a6809cfeb4084c5198771a5cc02dcfe568 Mon Sep 17 00:00:00 2001 From: lashemilt Date: Fri, 8 Apr 2022 09:11:39 +0100 Subject: [PATCH 6/8] Pep 8 changes --- rfi_file_monitor/utils/decorators.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/rfi_file_monitor/utils/decorators.py b/rfi_file_monitor/utils/decorators.py index c6a1c90..beed00c 100644 --- a/rfi_file_monitor/utils/decorators.py +++ b/rfi_file_monitor/utils/decorators.py @@ -200,13 +200,14 @@ def wrapper(self: Engine, existing_files: List): len(existing_files) > chunk_size ): - # do not like this hard coded value but it is emperically derived - - # this is the max number of files that a queue can take without a long wait for users. + # do not like this hard coded value but it is empirically derived - + # this is the max number of files that a queue can take without a long wait for users working on a standard + # size machine with 8CPU 8G RAM chunked_input = [ - existing_files[i : i + chunk_size] + existing_files[i: i + chunk_size] for i in range(0, len(existing_files), chunk_size) ] - n= 1 + n = 1 processed_files = 0 for rv in chunked_input: chunk_weight = sum( From 04ca8b3455063e9d67001e5ebaf34e9c225d668b Mon Sep 17 00:00:00 2001 From: lashemilt Date: Tue, 26 Apr 2022 13:47:58 +0100 Subject: [PATCH 7/8] adding behaviour so total number of existing files is retained and does not change with the decorator --- rfi_file_monitor/engines/file_watchdog_engine.py | 6 +++++- rfi_file_monitor/queue_manager.py | 6 +++++- rfi_file_monitor/utils/decorators.py | 9 ++++----- setup.cfg | 2 +- 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/rfi_file_monitor/engines/file_watchdog_engine.py b/rfi_file_monitor/engines/file_watchdog_engine.py index 9871ee7..361c34b 100644 --- a/rfi_file_monitor/engines/file_watchdog_engine.py +++ b/rfi_file_monitor/engines/file_watchdog_engine.py @@ -146,13 +146,17 @@ def _search_for_existing_files(self, directory: Path) -> List[RegularFile]: FileStatus.SAVED, ) rv.append(_file) + GLib.idle_add( + self._engine._appwindow._queue_manager.get_total_files_in_path, + len(rv), + priority=GLib.PRIORITY_DEFAULT_IDLE, + ) return rv @do_bulk_upload def process_existing_files(self, existing_files): try: - GLib.idle_add( self._engine._appwindow._queue_manager.add, existing_files, diff --git a/rfi_file_monitor/queue_manager.py b/rfi_file_monitor/queue_manager.py index 91bfdb1..9848fb6 100644 --- a/rfi_file_monitor/queue_manager.py +++ b/rfi_file_monitor/queue_manager.py @@ -38,6 +38,7 @@ def __init__(self, appwindow): self._files_dict: OrderedDictType[str, File] = OrderedDict() self._jobs_list: Final[List[Job]] = list() self._njobs_running: int = 0 + self._total_files_in_path: int = 0 kwargs = dict( halign=Gtk.Align.FILL, @@ -453,6 +454,9 @@ def stop(self): self._running = False self.notify("running") + def get_total_files_in_path(self, number_of_files: int): + self._total_files_in_path = number_of_files + def _files_dict_timeout_cb(self, *user_data): """ This function runs every second, and will take action based on the status of all files in the dict @@ -563,7 +567,7 @@ def _files_dict_timeout_cb(self, *user_data): # update status bar self._appwindow._status_grid.get_child_at( 0, 0 - ).props.label = f"Total: {len(self._files_dict)}" + ).props.label = f"Total: {self._total_files_in_path }" for _status, _counter in status_counters.items(): self._appwindow._status_grid.get_child_at( int(_status), 0 diff --git a/rfi_file_monitor/utils/decorators.py b/rfi_file_monitor/utils/decorators.py index beed00c..74a96f9 100644 --- a/rfi_file_monitor/utils/decorators.py +++ b/rfi_file_monitor/utils/decorators.py @@ -210,13 +210,12 @@ def wrapper(self: Engine, existing_files: List): n = 1 processed_files = 0 for rv in chunked_input: - chunk_weight = sum( - [Path(file.filename).stat().st_size for file in rv] - ) + # chunk_weight = sum( + # [Path(file.filename).stat().st_size for file in rv] + # ) process_existing_files(self, rv) - while processed_files < chunk_size*n*0.9: # refresh the list when we are at 90% of the size - j = 0 + while processed_files < chunk_size*n*0.9: # refresh the list when we are at 90% of the size processed_files = sum([ item for item in self._engine._appwindow._queue_manager._files_dict.values() diff --git a/setup.cfg b/setup.cfg index 5d7fe42..428cd96 100644 --- a/setup.cfg +++ b/setup.cfg @@ -65,7 +65,7 @@ rfi_file_monitor.files = WeightedRegularFile = rfi_file_monitor.files.regular_file:WeightedRegularFile S3Object = rfi_file_monitor.files.s3_object:S3Object Directory = rfi_file_monitor.files.directory:Directory -gui_scripts = +console_scripts = rfi-file-monitor = rfi_file_monitor:main [bdist_wheel] From d8b352fe9d3ad6933f7e6bac512d20fa139037e5 Mon Sep 17 00:00:00 2001 From: lashemilt Date: Thu, 28 Apr 2022 09:43:36 +0100 Subject: [PATCH 8/8] black check --- rfi_file_monitor/utils/decorators.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/rfi_file_monitor/utils/decorators.py b/rfi_file_monitor/utils/decorators.py index 74a96f9..f8b5415 100644 --- a/rfi_file_monitor/utils/decorators.py +++ b/rfi_file_monitor/utils/decorators.py @@ -196,15 +196,13 @@ def do_bulk_upload(process_existing_files: Callable[List]): def wrapper(self: Engine, existing_files: List): chunk_size = 2000 - if ( - len(existing_files) > chunk_size - ): + if len(existing_files) > chunk_size: # do not like this hard coded value but it is empirically derived - # this is the max number of files that a queue can take without a long wait for users working on a standard # size machine with 8CPU 8G RAM chunked_input = [ - existing_files[i: i + chunk_size] + existing_files[i : i + chunk_size] for i in range(0, len(existing_files), chunk_size) ] n = 1 @@ -215,12 +213,16 @@ def wrapper(self: Engine, existing_files: List): # ) process_existing_files(self, rv) - while processed_files < chunk_size*n*0.9: # refresh the list when we are at 90% of the size - processed_files = sum([ - item - for item in self._engine._appwindow._queue_manager._files_dict.values() - if item.status == FileStatus.SUCCESS - ]) + while ( + processed_files < chunk_size * n * 0.9 + ): # refresh the list when we are at 90% of the size + processed_files = sum( + [ + item + for item in self._engine._appwindow._queue_manager._files_dict.values() + if item.status == FileStatus.SUCCESS + ] + ) sleep(1) n = n + 1