Skip to content

Refactor get_all_functions to use inspect.signature instead of inspect.getfullargspe #104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 17, 2025
76 changes: 53 additions & 23 deletions nx_parallel/tests/test_get_chunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,64 @@
import nx_parallel as nxp


def get_all_functions(package_name="nx_parallel"):
"""Returns a dict keyed by function names to its arguments.

This function constructs a dictionary keyed by the function
names in the package `package_name` to dictionaries containing
the function's keyword arguments and positional arguments.
"""
package = importlib.import_module(package_name)
functions = {}

for name, obj in inspect.getmembers(package, inspect.isfunction):
if not name.startswith("_"):
args, kwargs = inspect.getfullargspec(obj)[:2]
functions[name] = {"args": args, "kwargs": kwargs}
def get_functions_with_get_chunks():
"""Returns a list of function names with the `get_chunks` kwarg."""

return functions
def get_all_funcs_with_args(package_name="nx_parallel"):
"""Returns a dict keyed by function names to a list of
the function's args names, for all the functions in
the package `package_name`.
"""
package = importlib.import_module(package_name)
funcs_with_args = {}

for name, obj in inspect.getmembers(package, inspect.isfunction):
if not name.startswith("_"):
signature = inspect.signature(obj)
arguments = [
param.name
for param in signature.parameters.values()
if param.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD
]
funcs_with_args[name] = arguments
return funcs_with_args

def get_functions_with_get_chunks():
"""Returns a list of function names with the `get_chunks` kwarg."""
all_funcs = get_all_functions()
all_funcs = get_all_funcs_with_args()
get_chunks_funcs = []
for func in all_funcs:
if "get_chunks" in all_funcs[func]["args"]:
if "get_chunks" in all_funcs[func]:
get_chunks_funcs.append(func)
return get_chunks_funcs


def test_get_functions_with_get_chunks():
# TODO: Instead of `expected` use ALGORTHMS from interface.py
# take care of functions like `connectivity.all_pairs_node_connectivity`
expected = {
"all_pairs_all_shortest_paths",
"all_pairs_bellman_ford_path",
"all_pairs_bellman_ford_path_length",
"all_pairs_dijkstra",
"all_pairs_dijkstra_path",
"all_pairs_dijkstra_path_length",
"all_pairs_node_connectivity",
"all_pairs_shortest_path",
"all_pairs_shortest_path_length",
"approximate_all_pairs_node_connectivity",
"betweenness_centrality",
"closeness_vitality",
"edge_betweenness_centrality",
"is_reachable",
"johnson",
"local_efficiency",
"node_redundancy",
"number_of_isolates",
"square_clustering",
"tournament_is_strongly_connected",
}
assert set(get_functions_with_get_chunks()) == expected


def test_get_chunks():
def random_chunking(nodes):
_nodes = list(nodes).copy()
Expand Down Expand Up @@ -76,14 +106,14 @@ def random_chunking(nodes):
if isinstance(c1, types.GeneratorType):
c1, c2 = dict(c1), dict(c2)
if func in chk_dict_vals:
for i in range(len(G.nodes)):
assert math.isclose(c1[i], c2[i], abs_tol=1e-16)
for key in c1:
assert math.isclose(c1[key], c2[key], abs_tol=1e-16)
else:
assert c1 == c2
else:
if func in chk_dict_vals:
for i in range(len(G.nodes)):
assert math.isclose(c1[i], c2[i], abs_tol=1e-16)
for key in c1:
assert math.isclose(c1[key], c2[key], abs_tol=1e-16)
else:
if isinstance(c1, float):
assert math.isclose(c1, c2, abs_tol=1e-16)
Expand Down
Loading