Skip to content

Commit c65cc0f

Browse files
iblanco11981870b-butlerbdiceklywangpre-commit-ci[bot]
authored
Added the no-progress flag for project.py status to hide the progress… (#685)
* Added the no-progress flag for project.py status to hide the progress bars * Implement unit test for appearance of progress bar * Update changelog.txt * partial change progress to no progress label * added documentation for variables * Update flow/util/misc.py Co-authored-by: Brandon Butler <[email protected]> * change format of if statements * added thread parallelization and serial tests * Changed from no_progress to hide_progress. * Update flow/project.py Co-authored-by: Bradley Dice <[email protected]> * Update tests/test_status.py Co-authored-by: Bradley Dice <[email protected]> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Update changelog.txt * changed all the progress variables to hide-progress * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix: no-progress process parallization pickling. We need to use cloudpickle to serialize/deserialize functions for process parallelization. * fix: thread parallelization for _get_parallel_executor Converts the generator to a list which is what print_status expects. * test: Fix no progress bar tests for signac 2.0 * Remove newline. * fix: Revert breaking changes to execution progress bar * doc: fix formatting Co-authored-by: Bradley Dice <[email protected]> * Simplify parallel_executor logic. * fix: Changes to _get_parallel_executor Still results in shorter clearer code. * fix: Conditional string in _get_parallel_executor * fix: ordering in _get_parallel_executor --------- Co-authored-by: Brandon Butler <[email protected]> Co-authored-by: Bradley Dice <[email protected]> Co-authored-by: Kelly Wang <[email protected]> Co-authored-by: Kelly Wang <[email protected]> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 5093591 commit c65cc0f

File tree

4 files changed

+88
-34
lines changed

4 files changed

+88
-34
lines changed

changelog.txt

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ Version 0.26
1414
Added
1515
+++++
1616

17+
- Added feature to hide the status of the progress bar (#685).
1718
- ``test-workflow`` CLI option for testing template environments/submission scripts (#747).
1819
- Frontier environment and template (#743).
1920
- Added ``-o`` / ``--operation`` flag to report project status information for specific operations (#725).

flow/project.py

+24-8
Original file line numberDiff line numberDiff line change
@@ -2602,6 +2602,7 @@ def _fetch_status(
26022602
err,
26032603
ignore_errors,
26042604
status_parallelization="none",
2605+
hide_progress=False,
26052606
names=None,
26062607
):
26072608
"""Fetch status for the provided aggregates / jobs.
@@ -2617,6 +2618,8 @@ def _fetch_status(
26172618
status_parallelization : str
26182619
Parallelization mode for fetching the status. Allowed values are
26192620
"thread", "process", or "none". (Default value = "none")
2621+
hide_progress : bool
2622+
Hide the progress bar when printing status output (Default value = False).
26202623
names : iterable of :class:`str`
26212624
Only show status for operations that match the provided set of names
26222625
(interpreted as regular expressions), or all if the argument is
@@ -2653,7 +2656,9 @@ def _fetch_status(
26532656
"Valid choices are 'thread', 'process', or 'none'."
26542657
)
26552658

2656-
parallel_executor = _get_parallel_executor(status_parallelization)
2659+
parallel_executor = _get_parallel_executor(
2660+
status_parallelization, hide_progress
2661+
)
26572662

26582663
# Update the project's status cache
26592664
scheduler_info = self._query_scheduler_status(
@@ -2750,11 +2755,13 @@ def compute_status(data):
27502755
self._get_job_labels,
27512756
ignore_errors=ignore_errors,
27522757
)
2753-
job_labels = parallel_executor(
2754-
compute_labels,
2755-
individual_jobs,
2756-
desc="Fetching labels",
2757-
file=err,
2758+
job_labels = list(
2759+
parallel_executor(
2760+
compute_labels,
2761+
individual_jobs,
2762+
desc="Fetching labels",
2763+
file=err,
2764+
)
27582765
)
27592766

27602767
def combine_group_and_operation_status(aggregate_status_results):
@@ -2795,7 +2802,6 @@ def combine_group_and_operation_status(aggregate_status_results):
27952802
"_error": error_message,
27962803
}
27972804
)
2798-
27992805
return status_results_combined, job_labels, individual_jobs
28002806

28012807
PRINT_STATUS_ALL_VARYING_PARAMETERS = True
@@ -2824,6 +2830,7 @@ def print_status(
28242830
profile=False,
28252831
eligible_jobs_max_lines=None,
28262832
output_format="terminal",
2833+
hide_progress=False,
28272834
operation=None,
28282835
):
28292836
"""Print the status of the project.
@@ -2875,6 +2882,8 @@ def print_status(
28752882
output_format : str
28762883
Status output format, supports:
28772884
'terminal' (default), 'markdown' or 'html'.
2885+
hide_progress : bool
2886+
Hide the progress bar from the status output. (Default value = False)
28782887
operation : iterable of :class:`str`
28792888
Show status of operations that match the provided set of names
28802889
(interpreted as regular expressions), or all if the argument is
@@ -2923,6 +2932,7 @@ def print_status(
29232932
err=err,
29242933
ignore_errors=ignore_errors,
29252934
status_parallelization=status_parallelization,
2935+
hide_progress=hide_progress,
29262936
names=operation,
29272937
)
29282938

@@ -3003,6 +3013,7 @@ def print_status(
30033013
err=err,
30043014
ignore_errors=ignore_errors,
30053015
status_parallelization=status_parallelization,
3016+
hide_progress=hide_progress,
30063017
names=operation,
30073018
)
30083019
profiling_results = None
@@ -3557,7 +3568,7 @@ def run(
35573568
will not exceed this argument. The default is 1, there is no limit
35583569
if this argument is None.
35593570
progress : bool
3560-
Show a progress bar during execution. (Default value = False)
3571+
Show a progress bar during execution (Default value = False).
35613572
order : str, callable, or None
35623573
Specify the order of operations. Possible values are:
35633574
@@ -5001,6 +5012,11 @@ class MyProject(FlowProject):
50015012
"to show result for. Defaults to the main module. "
50025013
"(requires pprofile)",
50035014
)
5015+
parser_status.add_argument(
5016+
"--hide-progress",
5017+
action="store_true",
5018+
help="Hide the progress bar",
5019+
)
50045020
parser_status.add_argument(
50055021
"-o",
50065022
"--operation",

flow/util/misc.py

+37-26
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import os
88
import warnings
99
from collections.abc import MutableMapping
10+
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
1011
from contextlib import contextmanager
1112
from functools import lru_cache, partial
1213
from itertools import cycle, islice
@@ -335,7 +336,7 @@ def _run_cloudpickled_func(func, *args):
335336
return unpickled_func(*args)
336337

337338

338-
def _get_parallel_executor(parallelization="none"):
339+
def _get_parallel_executor(parallelization="none", hide_progress=False):
339340
"""Get an executor for the desired parallelization strategy.
340341
341342
This executor shows a progress bar while executing a function over an
@@ -346,50 +347,60 @@ def _get_parallel_executor(parallelization="none"):
346347
(see :meth:`concurrent.futures.Executor.map`). All other ``**kwargs`` are
347348
passed to the tqdm progress bar.
348349
350+
Warning
351+
-------
352+
We ignore key word arguments when ``hide_progress == True``.
353+
349354
Parameters
350355
----------
351356
parallelization : str
352357
Parallelization mode. Allowed values are "thread", "process", or
353358
"none". (Default value = "none")
359+
hide_progress : bool
360+
Hide the progress bar when printing status output (Default value = False).
354361
355362
Returns
356363
-------
357364
callable
358-
A callable with signature ``func, iterable, **kwargs``.
365+
A callable with signature ``func, iterable, **kwargs`` which returns an interator.
359366
360367
"""
361-
if parallelization == "thread":
368+
if parallelization == "process":
369+
executor = ProcessPoolExecutor().map
370+
if not hide_progress:
371+
executor = partial(process_map, tqdm_class=tqdm)
362372

363373
def parallel_executor(func, iterable, **kwargs):
364-
return thread_map(func, iterable, tqdm_class=tqdm, **kwargs)
365-
366-
elif parallelization == "process":
374+
# The top-level function called on each process cannot be a local function, it must be a
375+
# module-level function. Creating a partial here allows us to use the passed function
376+
# "func" regardless of whether it is a local function.
377+
func = partial(_run_cloudpickled_func, cloudpickle.dumps(func))
378+
# The tqdm progress bar requires a total. We compute the total in advance because a map
379+
# iterable (which has no total) is passed to process_map.
380+
kwargs.setdefault("total", len(iterable))
381+
iterable = map(cloudpickle.dumps, iterable)
382+
if hide_progress:
383+
return executor(func, iterable)
384+
return executor(func, iterable, **kwargs)
385+
386+
elif parallelization == "thread":
387+
executor = ThreadPoolExecutor().map
388+
if not hide_progress:
389+
executor = partial(thread_map, tqdm_class=tqdm)
367390

368391
def parallel_executor(func, iterable, **kwargs):
369-
# The tqdm progress bar requires a total. We compute the total in
370-
# advance because a map iterable (which has no total) is passed to
371-
# process_map.
372-
if "total" not in kwargs:
373-
kwargs["total"] = len(iterable)
374-
375-
return process_map(
376-
# The top-level function called on each process cannot be a
377-
# local function, it must be a module-level function. Creating
378-
# a partial here allows us to use the passed function "func"
379-
# regardless of whether it is a local function.
380-
partial(_run_cloudpickled_func, cloudpickle.dumps(func)),
381-
map(cloudpickle.dumps, iterable),
382-
tqdm_class=tqdm,
383-
**kwargs,
384-
)
392+
if hide_progress:
393+
return executor(func, iterable)
394+
return executor(func, iterable, **kwargs)
385395

386396
else:
397+
executor = map if hide_progress else partial(tmap, tqdm_class=tqdm)
387398

388399
def parallel_executor(func, iterable, **kwargs):
389-
if "chunksize" in kwargs:
390-
# Chunk size only applies to thread/process parallel executors
391-
del kwargs["chunksize"]
392-
return list(tmap(func, iterable, tqdm_class=tqdm, **kwargs))
400+
if hide_progress:
401+
return executor(func, iterable)
402+
kwargs.pop("chunksize", None)
403+
return executor(func, iterable, **kwargs)
393404

394405
return parallel_executor
395406

tests/test_status.py

+26
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,35 @@
66
import sys
77

88
import generate_status_reference_data as gen
9+
import pytest
910
import signac
1011

1112

13+
@pytest.fixture(params=[True, False])
14+
def hide_progress_bar(request):
15+
return request.param
16+
17+
18+
@pytest.fixture(params=["thread", "process", "none"])
19+
def parallelization(request):
20+
return request.param
21+
22+
23+
def test_hide_progress_bar(hide_progress_bar, parallelization):
24+
with signac.TemporaryProject() as p, signac.TemporaryProject() as status_pr:
25+
gen.init(p)
26+
fp = gen._TestProject.get_project(path=p.path)
27+
fp._flow_config["status_parallelization"] = parallelization
28+
status_pr.import_from(origin=gen.ARCHIVE_PATH)
29+
for job in status_pr:
30+
kwargs = job.statepoint()
31+
tmp_err = io.TextIOWrapper(io.BytesIO(), sys.stderr.encoding)
32+
fp.print_status(**kwargs, err=tmp_err, hide_progress=hide_progress_bar)
33+
tmp_err.seek(0)
34+
generated_tqdm = tmp_err.read()
35+
assert ("Fetching status" not in generated_tqdm) == hide_progress_bar
36+
37+
1238
def test_print_status():
1339
# Must import the data into the project.
1440
with signac.TemporaryProject() as p, signac.TemporaryProject() as status_pr:

0 commit comments

Comments
 (0)