Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Adding config to nx-parallel #68

Closed
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a95ceb4
initial commit
Schefflera-Arboricola Jun 5, 2024
390078f
bug_fix
Schefflera-Arboricola Jun 5, 2024
27205d1
Merge branch 'networkx:main' into config_1
Schefflera-Arboricola Jun 24, 2024
ead0927
added default_config to get_info
Schefflera-Arboricola Jun 24, 2024
273d53d
added get_curr_configs
Schefflera-Arboricola Jun 24, 2024
a7ca076
updated configs
Schefflera-Arboricola Jun 24, 2024
4982fc0
rm 3 configs
Schefflera-Arboricola Jun 24, 2024
d4aaedb
Merge branch 'networkx:main' into config_1
Schefflera-Arboricola Jul 22, 2024
4465712
using joblib.parallel_config
Schefflera-Arboricola Jul 24, 2024
9ce30d7
indentation
Schefflera-Arboricola Jul 24, 2024
d7ba93d
updated cpu_count
Schefflera-Arboricola Jul 26, 2024
79e6881
integrating nx config
Schefflera-Arboricola Jul 27, 2024
ced3898
rm get_default_configs
Schefflera-Arboricola Jul 27, 2024
61dcbb7
renamed get_n_jobs
Schefflera-Arboricola Jul 27, 2024
788dbf8
n_job=-1s
Schefflera-Arboricola Jul 27, 2024
9085093
changing n_jobs=None
Schefflera-Arboricola Jul 27, 2024
9870bad
added backend=loky for testing
Schefflera-Arboricola Jul 27, 2024
f08780c
Merge branch 'networkx:main' into config_1
Schefflera-Arboricola Aug 1, 2024
75604c8
rm inconsistency - using config(singular)
Schefflera-Arboricola Aug 2, 2024
3979f2e
removed __all__ from config.py
Schefflera-Arboricola Aug 2, 2024
9c8b57b
changing the default backend to loky
Schefflera-Arboricola Aug 2, 2024
de99170
enhanced config documentation
Schefflera-Arboricola Aug 2, 2024
d354351
rm fetching list of configs
Schefflera-Arboricola Aug 2, 2024
e4bf668
made n_jobs=-1 as default
Schefflera-Arboricola Aug 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions _nx_parallel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,20 @@ def get_info():
"package": "nx_parallel",
"url": "https://github.com/networkx/nx-parallel",
"short_summary": "Parallel backend for NetworkX algorithms",
"default_config": {
"backend": None,
"n_jobs": None,
"verbose": 0,
"temp_folder": None,
"max_nbytes": "1M",
"mmap_mode": "r",
"prefer": None,
"require": None,
"return_as": "list",
"timeout": None,
"pre_dispatch": "2 * n_jobs",
"batch_size": "auto",
},
"functions": {
"number_of_isolates": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/isolate.py#L8",
Expand Down Expand Up @@ -150,18 +164,23 @@ def get_info():
'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` into `n` chunks, where `n` is the number of CPU cores."
},
},
"get_curr_configs": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/config.py#L44",
"additional_docs": "Returns the current configuration settings for nx_parallel.",
"additional_parameters": None,
},
"chunks": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L8",
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L9",
"additional_docs": "Divides an iterable into chunks of size n",
"additional_parameters": None,
},
"cpu_count": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L18",
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L19",
"additional_docs": "Returns the number of logical CPUs or cores",
"additional_parameters": None,
},
"create_iterables": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L26",
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L28",
"additional_docs": "Creates an iterable of function inputs for parallel computation based on the provided iterator type.",
"additional_parameters": {
"G : NetworkX graph": "iterator : str Type of iterator. Valid values are 'node', 'edge', 'isolate'",
Expand Down
10 changes: 8 additions & 2 deletions _nx_parallel/update_get_info.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import ast
from nx_parallel.utils.config import _configs

__all__ = [
"get_funcs_info",
Expand Down Expand Up @@ -134,7 +135,8 @@ def get_url(file_path, function_name):

# Creating a temp__init__.py file

string = '''# This file was automatically generated by update_get_info.py
string = (
'''# This file was automatically generated by update_get_info.py


def get_info():
Expand All @@ -145,7 +147,11 @@ def get_info():
"package": "nx_parallel",
"url": "https://github.com/networkx/nx-parallel",
"short_summary": "Parallel backend for NetworkX algorithms",
"functions": '''
"default_config": '''
+ str(_configs.get_config_dict())
+ """,
"functions": """
)

with open("_nx_parallel/temp__init__.py", "w") as f:
f.write(string + str(get_funcs_info()) + "}\n")
7 changes: 5 additions & 2 deletions nx_parallel/algorithms/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,18 @@ def _compute_clustering_chunk(node_iter_chunk):
else:
node_iter = list(G.nbunch_iter(nodes))

total_cores = nxp.cpu_count()
configs = nxp.get_curr_configs()
cores = nxp.cpu_count()
total_cores = cores if cores is not None else -1
configs["n_jobs"] = total_cores

if get_chunks == "chunks":
num_in_chunk = max(len(node_iter) // total_cores, 1)
node_iter_chunks = nxp.chunks(node_iter, num_in_chunk)
else:
node_iter_chunks = get_chunks(node_iter)

result = Parallel(n_jobs=total_cores)(
result = Parallel(**configs)(
Schefflera-Arboricola marked this conversation as resolved.
Show resolved Hide resolved
delayed(_compute_clustering_chunk)(node_iter_chunk)
for node_iter_chunk in node_iter_chunks
)
Expand Down
1 change: 1 addition & 0 deletions nx_parallel/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .chunk import *
from .config import *
4 changes: 3 additions & 1 deletion nx_parallel/utils/chunk.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import itertools
import os
import networkx as nx
import nx_parallel as nxp

__all__ = ["chunks", "cpu_count", "create_iterables"]

Expand All @@ -20,7 +21,8 @@ def cpu_count():
# Check if we are running under pytest
Schefflera-Arboricola marked this conversation as resolved.
Show resolved Hide resolved
if "PYTEST_CURRENT_TEST" in os.environ:
return 2
return os.cpu_count()
else:
return nxp.get_curr_configs("n_jobs")


def create_iterables(G, iterator, n_cores, list_of_iterator=None):
Expand Down
55 changes: 55 additions & 0 deletions nx_parallel/utils/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from networkx.utils.configs import Config
from typing import Union
from dataclasses import asdict
import networkx as nx

__all__ = [
"NxpConfig",
"_configs",
"get_curr_configs",
]


class NxpConfig(Config):
backend: str = None
n_jobs: int = None
verbose: int = 0
temp_folder: str = None
max_nbytes: Union[int, str] = "1M"
mmap_mode: str = "r"
prefer: str = None
require: str = None
return_as: str = "list"
timeout: float = None
pre_dispatch: str = "2 * n_jobs"
batch_size: int = "auto"

def get_config_dict(self, config=None):
"""Return the default configuration as a dictionary."""
config_dict = asdict(self)
if config is None:
return config_dict
elif isinstance(config, list):
new_config = {k: config_dict[k] for k in config if k in config_dict}
return new_config
elif config in config_dict:
return config_dict[config]
else:
raise KeyError(f"Invalid config: {config}")


_configs = NxpConfig()


def get_curr_configs(config=None):
"""Returns the current configuration settings for nx_parallel."""
config_dict = dict(nx.config.backends.parallel)
if config is None:
return config_dict
elif isinstance(config, list):
new_config = {k: config_dict[k] for k in config if k in config_dict}
return new_config
elif config in config_dict:
return config_dict[config]
else:
raise KeyError(f"Invalid config: {config}")
Loading