Skip to content

Commit

Permalink
update argument list in function call
Browse files Browse the repository at this point in the history
  • Loading branch information
jnke2016 committed Dec 28, 2024
1 parent 4c8744f commit 4da1c7e
Showing 1 changed file with 14 additions and 34 deletions.
48 changes: 14 additions & 34 deletions python/cugraph/cugraph/dask/sampling/node2vec_random_walks.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,30 +48,31 @@ def convert_to_cudf(cp_paths, number_map=None, is_vertex_paths=False):
return cudf.Series(cp_paths)


def _call_plc_node2vec_random_walks(sID, mg_graph_x, st_x, max_depth, compress_result, p, q):
def _call_plc_node2vec_random_walks(sID, mg_graph_x, st_x, max_depth, p, q, random_state):

return pylibcugraph_node2vec_random_walks(
resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()),
graph=mg_graph_x,
seed_array=st_x,
max_depth=max_depth,
compress_result=compress_result,
p=p,
q=q
q=q,
random_state=random_state
)


# FIXME: Add type anotation
def node2vec_random_walks(
input_graph,
start_vertices=None,
max_depth=None,
compress_result=True,
p=1.0,
q=1.0
q=1.0,
random_state=None
):
"""
Computes random walks for each node in 'start_vertices', under the
node2vec_random_walks sampling framework.
node2vec sampling framework.
parameters
----------
Expand All @@ -87,10 +88,6 @@ def node2vec_random_walks(
The maximum depth of the random walks. If not specified, the maximum
depth is set to 1.
compress_result: bool, optional (default=True)
If True, coalesced paths are returned with a sizes array with offsets.
Otherwise padded paths are returned with an empty sizes array.
p: float, optional (default=1.0, [0 < p])
Return factor, which represents the likelihood of backtracking to
a previous node in the walk. A higher value makes it less likely to
Expand All @@ -103,6 +100,9 @@ def node2vec_random_walks(
is likelier to visit nodes closer to the outgoing node. If q < 1, the
random walk is likelier to visit nodes further from the outgoing node.
A positive float.
random_state: int, optional
Random seed to use when making sampling calls.
Returns
-------
Expand All @@ -112,20 +112,13 @@ def node2vec_random_walks(
edge_weight_paths: dask_cudf.Series
Series containing the edge weights of edges represented by the
returned vertex_paths
sizes : dask_cudf.Series
The path size or sizes in case of coalesced paths.
"""
client = default_client()

if (not isinstance(max_depth, int)) or (max_depth < 1):
raise ValueError(
f"'max_depth' must be a positive integer, " f"got: {max_depth}"
)
if not isinstance(compress_result, bool):
raise ValueError(
f"'compress_result' must be a bool, " f"got: {compress_result}"
)
if (not isinstance(p, float)) or (p <= 0.0):
raise ValueError(f"'p' must be a positive float, got: {p}")
if (not isinstance(q, float)) or (q <= 0.0):
Expand Down Expand Up @@ -158,19 +151,16 @@ def node2vec_random_walks(
start_vertices, client, return_type="dict"
)

#print("start vertex_type = ", start_vertices_type)
#print("edgelist type = ", input_graph.edgelist.edgelist_df)

result = [
client.submit(
_call_plc_node2vec_random_walks,
Comms.get_session_id(),
input_graph._plc_graph[w],
start_v[0] if start_v else cudf.Series(dtype=start_vertices_type),
max_depth,
compress_result=compress_result,
p=p,
q=q,
random_state=random_state,
workers=[w],
allow_other_workers=False,
)
Expand All @@ -181,7 +171,6 @@ def node2vec_random_walks(

result_vertex_paths = [client.submit(op.getitem, f, 0) for f in result]
result_edge_wgt_paths = [client.submit(op.getitem, f, 1) for f in result]
result_sizes = [client.submit(op.getitem, f, 2) for f in result]

cudf_vertex_paths = [
client.submit(convert_to_cudf, cp_vertex_paths, input_graph.renumber_map, True)
Expand All @@ -193,26 +182,17 @@ def node2vec_random_walks(
for cp_edge_wgt_paths in result_edge_wgt_paths
]

cudf_sizes = [
client.submit(convert_to_cudf, cp_sizes)
for cp_sizes in result_sizes
]

wait([cudf_vertex_paths, cudf_edge_wgt_paths, cudf_sizes])
wait([cudf_vertex_paths, cudf_edge_wgt_paths])


ddf_vertex_paths = dask_cudf.from_delayed(cudf_vertex_paths).persist()
ddf_edge_wgt_paths = dask_cudf.from_delayed(cudf_edge_wgt_paths).persist()
ddf_sizes = dask_cudf.from_delayed(cudf_sizes).persist()
#wait([ddf_vertex_paths, ddf_edge_wgt_paths])

# Wait until the inactive futures are released
wait(
[
(r.release(), c_v.release(), c_e.release())
for r, c_v, c_e, c_s in zip(result, cudf_vertex_paths, cudf_edge_wgt_paths, cudf_sizes)
for r, c_v, c_e in zip(result, cudf_vertex_paths, cudf_edge_wgt_paths)
]
)

return ddf_vertex_paths, ddf_edge_wgt_paths, ddf_sizes
return ddf_vertex_paths, ddf_edge_wgt_paths

0 comments on commit 4da1c7e

Please sign in to comment.