Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Refactor Parallel Graph Algorithms to Use a Centralized Parallel Configuration with Flexible Iterators #86

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 39 additions & 37 deletions nx_parallel/algorithms/centrality/betweenness.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from joblib import Parallel, delayed
from networkx.algorithms.centrality.betweenness import (
_accumulate_basic,
_accumulate_endpoints,
Expand Down Expand Up @@ -41,28 +40,31 @@ def betweenness_centrality(
if hasattr(G, "graph_object"):
G = G.graph_object

dPys marked this conversation as resolved.
Show resolved Hide resolved
if k is None:
nodes = G.nodes
else:
nodes = seed.sample(list(G.nodes), k)
def process_func(G, chunk, weight, endpoints):
return _betweenness_centrality_node_subset(
G, chunk, weight=weight, endpoints=endpoints
)

n_jobs = nxp.get_n_jobs()

if get_chunks == "chunks":
node_chunks = nxp.create_iterables(G, "node", n_jobs, nodes)
else:
node_chunks = get_chunks(nodes)

bt_cs = Parallel()(
delayed(_betweenness_centrality_node_subset)(G, chunk, weight, endpoints)
for chunk in node_chunks
def iterator_func(G):
if k is None:
return G.nodes
else:
return seed.sample(list(G.nodes), k)

bt_cs = nxp.utils.chunks.execute_parallel(
G,
dPys marked this conversation as resolved.
Show resolved Hide resolved
process_func=process_func,
iterator_func=iterator_func,
get_chunks=get_chunks,
weight=weight,
endpoints=endpoints,
)

# Reducing partial solution
bt_c = bt_cs[0]
for bt in bt_cs[1:]:
for n in bt:
bt_c[n] += bt[n]
bt_c = {}
for bt in bt_cs:
for n, value in bt.items():
bt_c[n] = bt_c.get(n, 0.0) + value

betweenness = _rescale(
bt_c,
Expand Down Expand Up @@ -111,28 +113,28 @@ def edge_betweenness_centrality(
if hasattr(G, "graph_object"):
G = G.graph_object

if k is None:
nodes = G.nodes
else:
nodes = seed.sample(list(G.nodes), k)
def process_func(G, chunk, weight):
return _edge_betweenness_centrality_node_subset(G, chunk, weight=weight)

n_jobs = nxp.get_n_jobs()

if get_chunks == "chunks":
node_chunks = nxp.create_iterables(G, "node", n_jobs, nodes)
else:
node_chunks = get_chunks(nodes)

bt_cs = Parallel()(
delayed(_edge_betweenness_centrality_node_subset)(G, chunk, weight)
for chunk in node_chunks
def iterator_func(G):
if k is None:
return G.nodes
else:
return seed.sample(list(G.nodes), k)

bt_cs = nxp.utils.chunk.execute_parallel(
G,
process_func=process_func,
iterator_func=iterator_func,
get_chunks=get_chunks,
weight=weight,
)

# Reducing partial solution
bt_c = bt_cs[0]
for bt in bt_cs[1:]:
for e in bt:
bt_c[e] += bt[e]
bt_c = {}
for partial_bt in bt_cs:
for edge, value in partial_bt.items():
bt_c[edge] = bt_c.get(edge, 0.0) + value

for n in G: # remove nodes to only return edges
del bt_c[n]
Expand Down
107 changes: 72 additions & 35 deletions nx_parallel/utils/chunk.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,34 @@
import itertools
import os
import threading
from contextlib import contextmanager
import networkx as nx
import nx_parallel as nxp
from joblib import Parallel, delayed

__all__ = ["chunks", "get_n_jobs", "execute_parallel"]

__all__ = ["chunks", "get_n_jobs", "create_iterables"]
_joblib_config = (
threading.local()
) # thread-local storage ensures that parallel configs are thread-safe and do not interfere with each other during concurrent executions.


@contextmanager
def parallel_config(**kwargs):
"""
Context manager to set Joblib's Parallel configurations in thread-local storage.

Parameters
----------
**kwargs : dict
Keyword arguments corresponding to Joblib's Parallel parameters (e.g., backend, verbose).
"""
original_kwargs = getattr(_joblib_config, "parallel_kwargs", {})
_joblib_config.parallel_kwargs = kwargs
try:
yield
finally:
_joblib_config.parallel_kwargs = original_kwargs


def chunks(iterable, n_chunks):
Expand Down Expand Up @@ -52,44 +77,56 @@ def get_n_jobs(n_jobs=None):
return int(n_jobs)


def create_iterables(G, iterator, n_cores, list_of_iterator=None):
"""Create an iterable of function inputs for parallel computation
based on the provided iterator type.
def execute_parallel(
G: nx.Graph,
process_func,
iterator_func,
get_chunks="chunks",
**kwargs,
):
"""
Helper function to execute a processing function in parallel over chunks of data.

Parameters
----------
G : NetworkX graph
The NetworkX graph.
iterator : str
Type of iterator. Valid values are 'node', 'edge', 'isolate'
n_cores : int
The number of cores to use.
list_of_iterator : list, optional
A precomputed list of items to iterate over. If None, it will
be generated based on the iterator type.
G : networkx.Graph
The graph on which the algorithm operates.
process_func : callable
The function to process each chunk. Should accept (G, chunk, **kwargs).
iterator_func : callable, optional
A function that takes G and returns an iterable of data to process.
get_chunks : str or callable, optional (default="chunks")
Determines how to chunk the data.
- If "chunks", chunks are created automatically based on the number of jobs.
- If callable, it should take the data iterable and return an iterable of chunks.
**kwargs : dict
Additional keyword arguments to pass to `process_func`.

Returns
-------
iterable : Iterable
An iterable of function inputs.

Raises
------
ValueError
If the iterator type is not one of "node", "edge" or "isolate".
list
A list of results from each parallel execution.
"""

if not list_of_iterator:
if iterator == "node":
list_of_iterator = list(G.nodes)
elif iterator == "edge":
list_of_iterator = list(G.edges)
elif iterator == "isolate":
list_of_iterator = list(nx.isolates(G))
else:
raise ValueError(f"Invalid iterator type: {iterator}")

if not list_of_iterator:
return iter([])

return chunks(list_of_iterator, n_cores)
n_jobs = nxp.get_n_jobs()

# generate data using the iterator function
data = iterator_func(G)

# handle chunking
if get_chunks == "chunks":
# convert data to a list if it's a generator or other iterable
data = list(data)
data_chunks = nxp.chunks(data, max(len(data) // n_jobs, 1))
elif callable(get_chunks):
data_chunks = get_chunks(data)
else:
raise ValueError(
"get_chunks must be 'chunks' or a callable that returns an iterable of chunks."
)

# read parallel_kwargs from thread-local storage
parallel_kwargs = getattr(_joblib_config, "parallel_kwargs", {})

return Parallel(n_jobs=n_jobs, **(parallel_kwargs or {}))(
dPys marked this conversation as resolved.
Show resolved Hide resolved
delayed(process_func)(G, chunk, **kwargs) for chunk in data_chunks
)
Loading