Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Adding config to nx-parallel #68

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a95ceb4
initial commit
Schefflera-Arboricola Jun 5, 2024
390078f
bug_fix
Schefflera-Arboricola Jun 5, 2024
27205d1
Merge branch 'networkx:main' into config_1
Schefflera-Arboricola Jun 24, 2024
ead0927
added default_config to get_info
Schefflera-Arboricola Jun 24, 2024
273d53d
added get_curr_configs
Schefflera-Arboricola Jun 24, 2024
a7ca076
updated configs
Schefflera-Arboricola Jun 24, 2024
4982fc0
rm 3 configs
Schefflera-Arboricola Jun 24, 2024
d4aaedb
Merge branch 'networkx:main' into config_1
Schefflera-Arboricola Jul 22, 2024
4465712
using joblib.parallel_config
Schefflera-Arboricola Jul 24, 2024
9ce30d7
indentation
Schefflera-Arboricola Jul 24, 2024
d7ba93d
updated cpu_count
Schefflera-Arboricola Jul 26, 2024
79e6881
integrating nx config
Schefflera-Arboricola Jul 27, 2024
ced3898
rm get_default_configs
Schefflera-Arboricola Jul 27, 2024
61dcbb7
renamed get_n_jobs
Schefflera-Arboricola Jul 27, 2024
788dbf8
n_job=-1s
Schefflera-Arboricola Jul 27, 2024
9085093
changing n_jobs=None
Schefflera-Arboricola Jul 27, 2024
9870bad
added backend=loky for testing
Schefflera-Arboricola Jul 27, 2024
f08780c
Merge branch 'networkx:main' into config_1
Schefflera-Arboricola Aug 1, 2024
75604c8
rm inconsistency - using config(singular)
Schefflera-Arboricola Aug 2, 2024
3979f2e
removed __all__ from config.py
Schefflera-Arboricola Aug 2, 2024
9c8b57b
changing the default backend to loky
Schefflera-Arboricola Aug 2, 2024
de99170
enhanced config documentation
Schefflera-Arboricola Aug 2, 2024
d354351
rm fetching list of configs
Schefflera-Arboricola Aug 2, 2024
e4bf668
made n_jobs=-1 as default
Schefflera-Arboricola Aug 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions Config.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Using Configurations in nx-parallel

nx-parallel algorithms internally use the [`joblib.parallel_config`](https://joblib.readthedocs.io/en/latest/generated/joblib.parallel_config.html) context manager. You can change the configuration parameters of this internal `joblib.parallel_config` context manager using [NetworkX's config](https://networkx.org/documentation/latest/reference/backends.html#module-networkx.utils.configs) context manager. The internal `joblib.parallel_config` context manager uses the [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html) call, which is responsible for all the parallel process creation.

## Default Configuration

When you import NetworkX, the default configurations for all the installed backends are set as shown below:

```python
import networkx as nx

nx.config
```

Output:

```
NetworkXConfig(backend_priority=[], backends=Config(parallel=ParallelConfig(backend='loky', n_jobs=None, verbose=0, temp_folder=None, max_nbytes='1M', mmap_mode='r', prefer=None, require=None, inner_max_num_threads=None, backend_params={})), cache_converted_graphs=True)
```

### Note

The default settings of `joblib.parallel_config` are the same as the default configs in the `ParallelConfig` class, except for the `backend` config, which is `None` in `joblib.parallel_config` and `"loky"` in `ParallelConfig`. This prevents errors when using the NetworkX config, as the internal `joblib.Parallel`'s `backend` default is `"loky"` when not specified. This consistency in user experience is maintained for ease of use. Additinally, by default the `n_jobs` value is `-1`.

## Modifying Configuration

The `ParallelConfig` class inherits from NetworkX's `Config` class. You can use this as a `dataclass` or as a context manager to change the nx-parallel configurations.

### As a config variable

You can directly set the desired parameters:

```python
nxp_config = nx.config.backends.parallel
nxp_config.n_jobs = 6
nxp_config.verbose = 15

G = nx.complete_graph(20)

# backend -> loky, n_jobs -> 6, verbose -> 15
nx.square_clustering(G, backend="parallel")
```

### As a Context Manager

You can use the context manager to temporarily change configuration parameters for specific code blocks:

```python
# in continuation with the above code block

with nxp_config(n_jobs=4):
# backend -> loky, n_jobs -> 4, verbose -> 15
nx.square_clustering(G, backend="parallel")
with nxp_config(backend="threading", verbose=0):
# backend -> threading, n_jobs -> 4, verbose -> 0
nx.betweenness_centrality(G, backend="parallel")
with nxp_config(n_jobs=8):
# backend -> threading, n_jobs -> 8, verbose -> 0
nx.number_of_isolates(G, backend="parallel")
```

From the comments, you can observe how the context managers acquire the configurations from the outer context manager or the global configurations when the context manager is not inside any context manager.

Note that using `joblib.parallel_config` will output unexpected results. We recommend using the NetworkX's config context manager, as it is the same as the `joblib.parallel_config` context manager because it only provides the configurations to the internal `joblib.parallel_config` context manager.

Also, modifying the global config inside a context manager will update the configuration inside as well as outside the context manager permanently, as shown below:

```python
nxp_config.n_jobs = 6

nx.square_clustering(G, backend="parallel") # n_jobs -> 6

with nxp_config(n_jobs=4):
nx.square_clustering(G, backend="parallel") # n_jobs -> 4
nxp_config.n_jobs = 8
nx.square_clustering(G, backend="parallel") # n_jobs -> 8

nx.square_clustering(G, backend="parallel") # n_jobs -> 8
```

Please feel free to create issues or PRs if you want to improve this documentation or if you have any feedback.

Thank you :)
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ Note that for all functions inside `nx_code.py` that do not have an nx-parallel
import networkx as nx
import nx_parallel as nxp

nxp_config = nx.config.backends.parallel
nxp_config.n_jobs = 4
nxp_config.verbose = 15

G = nx.path_graph(4)
H = nxp.ParallelGraph(G)

Expand All @@ -121,10 +125,10 @@ nxp.betweenness_centrality(G)

# method 4 : using nx-parallel implementation with ParallelGraph object
nxp.betweenness_centrality(H)

# output : {0: 0.0, 1: 0.6666666666666666, 2: 0.6666666666666666, 3: 0.0}
```

In nx-parallel, by default the `loky` backend is used and all the CPU cores are used. For more on how to play with configurations in nx-parallel refer the [Config.md](./Config.md)!

### Notes

1. Some functions in networkx have the same name but different implementations, so to avoid these name conflicts at the time of dispatching networkx differentiates them by specifying the `name` parameter in the `_dispatchable` decorator of such algorithms. So, `method 3` and `method 4` are not recommended. But, you can use them if you know the correct `name`. For example:
Expand Down
16 changes: 12 additions & 4 deletions _nx_parallel/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# This file was automatically generated by update_get_info.py

from .config import _config


def get_info():
"""Return a dictionary with information about the package."""
Expand All @@ -9,6 +11,7 @@ def get_info():
"package": "nx_parallel",
"url": "https://github.com/networkx/nx-parallel",
"short_summary": "Parallel backend for NetworkX algorithms",
"default_config": _config,
"functions": {
"number_of_isolates": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/isolate.py#L8",
Expand Down Expand Up @@ -151,22 +154,27 @@ def get_info():
},
},
"chunks": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L8",
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L15",
"additional_docs": "Divides an iterable into chunks of size n",
"additional_parameters": None,
},
"cpu_count": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L18",
"additional_docs": "Returns the number of logical CPUs or cores",
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L25",
"additional_docs": "Returns the positive value of `n_jobs` using `joblib.parallel.get_active_backend()`.",
"additional_parameters": None,
},
"create_iterables": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L26",
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L41",
"additional_docs": "Creates an iterable of function inputs for parallel computation based on the provided iterator type.",
"additional_parameters": {
"G : NetworkX graph": "iterator : str Type of iterator. Valid values are 'node', 'edge', 'isolate'",
"iterable : Iterable": "An iterable of function inputs.",
},
},
"get_config": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L79",
"additional_docs": "Returns the current configuration settings for nx_parallel.",
"additional_parameters": None,
},
},
}
20 changes: 20 additions & 0 deletions _nx_parallel/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from networkx.utils.configs import Config
from typing import Union
from dataclasses import dataclass, field


@dataclass
class ParallelConfig(Config):
backend: str = "loky"
n_jobs: int = -1
verbose: int = 0
temp_folder: str = None
max_nbytes: Union[int, str] = "1M"
mmap_mode: str = "r"
prefer: str = None
require: str = None
inner_max_num_threads: int = None
backend_params: dict = field(default_factory=dict)


_config = ParallelConfig()
6 changes: 5 additions & 1 deletion _nx_parallel/update_get_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ def extract_docstrings_from_file(file_path):
and node.targets[0].id == "__all__"
):
all_list = [
expr.s for expr in node.value.elts if isinstance(expr, ast.Str)
expr.value
for expr in node.value.elts
if isinstance(expr, ast.Constant)
dschult marked this conversation as resolved.
Show resolved Hide resolved
]
elif isinstance(node, ast.FunctionDef):
if all_list and node.name in all_list:
Expand Down Expand Up @@ -136,6 +138,7 @@ def get_url(file_path, function_name):

string = '''# This file was automatically generated by update_get_info.py

from .config import _config

def get_info():
"""Return a dictionary with information about the package."""
Expand All @@ -145,6 +148,7 @@ def get_info():
"package": "nx_parallel",
"url": "https://github.com/networkx/nx-parallel",
"short_summary": "Parallel backend for NetworkX algorithms",
"default_config": _config,
"functions": '''

with open("_nx_parallel/temp__init__.py", "w") as f:
Expand Down
16 changes: 9 additions & 7 deletions nx_parallel/algorithms/cluster.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from itertools import combinations, chain
from joblib import Parallel, delayed
from joblib import Parallel, delayed, parallel_config
import nx_parallel as nxp

__all__ = [
Expand Down Expand Up @@ -47,18 +47,20 @@ def _compute_clustering_chunk(node_iter_chunk):
else:
node_iter = list(G.nbunch_iter(nodes))

total_cores = nxp.cpu_count()
config = nxp.get_config()
n_jobs = config["n_jobs"]

if get_chunks == "chunks":
num_in_chunk = max(len(node_iter) // total_cores, 1)
num_in_chunk = max(len(node_iter) // n_jobs, 1)
node_iter_chunks = nxp.chunks(node_iter, num_in_chunk)
else:
node_iter_chunks = get_chunks(node_iter)

result = Parallel(n_jobs=total_cores)(
delayed(_compute_clustering_chunk)(node_iter_chunk)
for node_iter_chunk in node_iter_chunks
)
with parallel_config(**config):
result = Parallel()(
delayed(_compute_clustering_chunk)(node_iter_chunk)
for node_iter_chunk in node_iter_chunks
)
clustering = dict(chain.from_iterable(result))

if nodes in G:
Expand Down
42 changes: 37 additions & 5 deletions nx_parallel/utils/chunk.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import itertools
import os
import networkx as nx
from dataclasses import asdict

__all__ = ["chunks", "cpu_count", "create_iterables"]
__all__ = [
"chunks",
"cpu_count",
"create_iterables",
"get_config",
]


def chunks(iterable, n):
Expand All @@ -15,12 +21,19 @@ def chunks(iterable, n):
yield x


def cpu_count():
"""Returns the number of logical CPUs or cores"""
# Check if we are running under pytest
def cpu_count(n_jobs=None): # todo: rename to get_n_jobs
"""Returns the positive value of `n_jobs`."""
if "PYTEST_CURRENT_TEST" in os.environ:
return 2
return os.cpu_count()
else:
n_cpus = os.cpu_count()
if n_jobs is None:
return 1
if n_jobs < 0:
return n_cpus + n_jobs + 1
if n_jobs == 0:
raise ValueError("n_jobs == 0 in Parallel has no meaning")
return int(n_jobs)


def create_iterables(G, iterator, n_cores, list_of_iterator=None):
Expand Down Expand Up @@ -51,3 +64,22 @@ def create_iterables(G, iterator, n_cores, list_of_iterator=None):
return chunks(list_of_iterator, num_in_chunk)
else:
raise ValueError("Invalid iterator type.")


def _get_config(config):
config_dict = asdict(config)
config_dict.update(config_dict["backend_params"])
del config_dict["backend_params"]
config_dict["n_jobs"] = cpu_count(config_dict["n_jobs"])
return config_dict


def get_config(config=None):
"""Returns the current configuration settings for nx_parallel."""
config_dict = _get_config(nx.config.backends.parallel)
if config is None:
return config_dict
elif config in config_dict:
return config_dict[config]
else:
raise KeyError(f"Invalid config: {config}")
Loading