Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Biased Sampling in cuGraph-DGL #4595

Merged
merged 106 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
106 commits
Select commit Hold shift + click to select a range
b5915ff
merge
alexbarghi-nv Jun 3, 2024
4e3045f
c
alexbarghi-nv Jun 6, 2024
f243351
pull in dependency fixes
alexbarghi-nv Jun 6, 2024
b1adcd3
merge
alexbarghi-nv Jun 6, 2024
4c29329
w
alexbarghi-nv Jun 7, 2024
265f546
basic graph/fs
alexbarghi-nv Jun 7, 2024
b51eda4
dist sampling
alexbarghi-nv Jun 10, 2024
9943260
graph data views
alexbarghi-nv Jun 12, 2024
055db0a
remove unwanted file
alexbarghi-nv Jun 12, 2024
020bf67
Merge branch 'branch-24.08' of https://github.com/rapidsai/cugraph in…
alexbarghi-nv Jun 13, 2024
1f76898
revert devcontainer change
alexbarghi-nv Jun 13, 2024
927ee09
tests, bugfixes, resolve indexing problem (sort of)
alexbarghi-nv Jun 14, 2024
ffdb2fa
Merge branch 'branch-24.08' into dgl-dist-sampling
nv-rliu Jun 24, 2024
68129d9
add heteogeneous tests
alexbarghi-nv Jun 25, 2024
20450a3
testing, fixing graph API
alexbarghi-nv Jun 26, 2024
58ec793
Merge branch 'dgl-dist-sampling' of https://github.com/alexbarghi-nv/…
alexbarghi-nv Jun 26, 2024
557d9aa
Loaders
alexbarghi-nv Jun 27, 2024
a8c0848
add todo
alexbarghi-nv Jun 27, 2024
913b8cd
fix block issue, typing
alexbarghi-nv Jun 28, 2024
79c8f78
reorganize tests
alexbarghi-nv Jul 1, 2024
b25128b
Merge branch 'branch-24.08' of https://github.com/rapidsai/cugraph in…
alexbarghi-nv Jul 1, 2024
a56b56d
sampling
alexbarghi-nv Jul 2, 2024
8f14f88
revert dependencies.yaml
alexbarghi-nv Jul 2, 2024
5f74252
update tensordict dependency
alexbarghi-nv Jul 2, 2024
ad120ac
Merge branch 'branch-24.08' into dgl-dist-sampling
alexbarghi-nv Jul 2, 2024
b2fdef8
update dependencies
alexbarghi-nv Jul 2, 2024
5ce714d
Merge branch 'dgl-dist-sampling' of https://github.com/alexbarghi-nv/…
alexbarghi-nv Jul 2, 2024
92fd866
update meta files
alexbarghi-nv Jul 2, 2024
6107d82
fix csr/csc issue, wrap up tests
alexbarghi-nv Jul 3, 2024
f04700d
Merge branch 'branch-24.08' into dgl-dist-sampling
alexbarghi-nv Jul 8, 2024
6bc4b4a
m
alexbarghi-nv Jul 8, 2024
faeb4a5
style
alexbarghi-nv Jul 8, 2024
afb9452
revert ci script
alexbarghi-nv Jul 8, 2024
48ba6d4
fix meta.yaml issue
alexbarghi-nv Jul 9, 2024
786c1af
Merge branch 'branch-24.08' into dgl-dist-sampling
alexbarghi-nv Jul 9, 2024
801de87
add type hint
alexbarghi-nv Jul 10, 2024
5e511cc
add missing type hint
alexbarghi-nv Jul 10, 2024
035b69a
remove comment, add issue reference
alexbarghi-nv Jul 10, 2024
ebbc1db
Merge branch 'dgl-dist-sampling' of https://github.com/alexbarghi-nv/…
alexbarghi-nv Jul 10, 2024
b412776
Add type hint
alexbarghi-nv Jul 10, 2024
1c72bd6
add convert function, fix bugs
alexbarghi-nv Jul 10, 2024
18f6ac2
Merge branch 'dgl-dist-sampling' of https://github.com/alexbarghi-nv/…
alexbarghi-nv Jul 10, 2024
9bd0440
Merge branch 'branch-24.08' into dgl-dist-sampling
alexbarghi-nv Jul 10, 2024
2d522b1
move worker init to utility
alexbarghi-nv Jul 10, 2024
4b60d8d
Merge branch 'dgl-dist-sampling' of https://github.com/alexbarghi-nv/…
alexbarghi-nv Jul 10, 2024
e1fa6e0
revert none return, add check
alexbarghi-nv Jul 10, 2024
8529987
style
alexbarghi-nv Jul 10, 2024
89f4ef4
use global communicator
alexbarghi-nv Jul 22, 2024
4d82ee0
global
alexbarghi-nv Jul 22, 2024
b4ed827
Merge branch 'branch-24.08' into use-correct-communicator
alexbarghi-nv Jul 23, 2024
e144ad1
Merge branch 'branch-24.08' into use-correct-communicator
alexbarghi-nv Jul 24, 2024
2b160bf
use int64 to store # edges
alexbarghi-nv Jul 24, 2024
22b85d2
Merge branch 'use-correct-communicator' of https://github.com/alexbar…
alexbarghi-nv Jul 24, 2024
ae9133f
resolve merge conflict
alexbarghi-nv Jul 25, 2024
1cf01c7
Merge branch 'use-correct-communicator' into dgl-examples
alexbarghi-nv Jul 25, 2024
6db236c
example
alexbarghi-nv Jul 25, 2024
7a3d38f
reverse mfgs
alexbarghi-nv Jul 25, 2024
710741c
node classification
alexbarghi-nv Jul 30, 2024
ddb95d6
resolve merge conflict
alexbarghi-nv Jul 30, 2024
f943d91
mnmg
alexbarghi-nv Jul 30, 2024
7ba4d89
use global communicator
alexbarghi-nv Jul 30, 2024
d9e9b50
Merge branch 'dgl-dist-sampling' into dgl-examples
alexbarghi-nv Jul 30, 2024
e0f1a90
examples
alexbarghi-nv Jul 31, 2024
2d3a640
fix partition function
alexbarghi-nv Aug 1, 2024
994aca8
fix minor issues
alexbarghi-nv Aug 1, 2024
d1c8494
remove dask example
alexbarghi-nv Aug 1, 2024
e92aa28
Merge branch 'branch-24.08' into dgl-examples
alexbarghi-nv Aug 1, 2024
dc811f9
initial write of graph store changes
alexbarghi-nv Aug 1, 2024
9764f0e
Merge branch 'branch-24.10' into biased-pyg
alexbarghi-nv Aug 1, 2024
2ddfed2
updating pylibcugraph
alexbarghi-nv Aug 1, 2024
299ea45
Merge branch 'biased-pyg' of https://github.com/alexbarghi-nv/cugraph…
alexbarghi-nv Aug 1, 2024
28b5240
update improts
alexbarghi-nv Aug 1, 2024
75e2f58
cleaning up plc
alexbarghi-nv Aug 2, 2024
35897c6
testing
alexbarghi-nv Aug 2, 2024
555ccfc
style
alexbarghi-nv Aug 2, 2024
ef4962a
remove print
alexbarghi-nv Aug 2, 2024
bf32da0
commit
alexbarghi-nv Aug 2, 2024
1a2937d
remove comments
alexbarghi-nv Aug 2, 2024
05e1da4
use float64
alexbarghi-nv Aug 2, 2024
c5842b7
Merge branch 'dgl-examples' of https://github.com/alexbarghi-nv/cugra…
alexbarghi-nv Aug 2, 2024
5b46f43
set dtype
alexbarghi-nv Aug 2, 2024
5877c4e
resolving memory problem
alexbarghi-nv Aug 5, 2024
73429df
correction to fix
alexbarghi-nv Aug 5, 2024
2358256
change function name, fix docstring
alexbarghi-nv Aug 5, 2024
b0f65e3
fix another typo
alexbarghi-nv Aug 5, 2024
2083b26
Merge branch 'branch-24.10' of https://github.com/rapidsai/cugraph in…
alexbarghi-nv Aug 5, 2024
139b3d6
allow setting directories
alexbarghi-nv Aug 5, 2024
4b29eec
Merge branch 'branch-24.10' of https://github.com/rapidsai/cugraph in…
alexbarghi-nv Aug 6, 2024
3fa49af
Merge branch 'biased-pyg' into biased-dgl
alexbarghi-nv Aug 6, 2024
3dd4273
biased sampling
alexbarghi-nv Aug 6, 2024
41a4c39
testing
alexbarghi-nv Aug 7, 2024
29be85c
remove debug code
alexbarghi-nv Aug 7, 2024
270541a
support heterogeneous graph
alexbarghi-nv Aug 7, 2024
49470ff
add explanatory comment
alexbarghi-nv Aug 7, 2024
cfda062
bug fix
seunghwak Aug 8, 2024
6091f71
udpate transform_v_frontier_e with edge mask to return the transforme…
seunghwak Aug 9, 2024
c574043
add tests that include zero biases
seunghwak Aug 9, 2024
9d566e8
Merge branch 'branch-24.10' of https://github.com/rapidsai/cugraph in…
seunghwak Aug 9, 2024
d0c6920
Merge branch 'bug_biased_sampling' of https://github.com/seunghwak/cu…
alexbarghi-nv Aug 9, 2024
6fcb1f0
c
alexbarghi-nv Aug 9, 2024
5b39ffd
Merge branch 'branch-24.10' into biased-dgl
alexbarghi-nv Aug 9, 2024
da53c02
update branch
alexbarghi-nv Aug 19, 2024
b6ecc14
fix style
alexbarghi-nv Aug 21, 2024
40aded1
Merge branch 'branch-24.10' into biased-dgl
alexbarghi-nv Aug 21, 2024
46f2054
resolve conflict
alexbarghi-nv Sep 18, 2024
7e976c7
Merge branch 'branch-24.10' into biased-dgl
alexbarghi-nv Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions python/cugraph-dgl/cugraph_dgl/dataloading/neighbor_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from typing import Sequence, Optional, Union, List, Tuple, Iterator

from cugraph.gnn import UniformNeighborSampler, DistSampleWriter
from cugraph.gnn import UniformNeighborSampler, BiasedNeighborSampler, DistSampleWriter
from cugraph.utilities.utils import import_optional

import cugraph_dgl
Expand Down Expand Up @@ -93,7 +93,6 @@ def __init__(
If provided, the probability of each neighbor being
sampled is proportional to the edge feature
with the given name. Mutually exclusive with mask.
Currently unsupported.
mask: str
Optional.
If proivided, only neighbors where the edge mask
Expand Down Expand Up @@ -133,10 +132,6 @@ def __init__(
raise NotImplementedError(
"Edge masking is currently unsupported by cuGraph-DGL"
)
if prob:
raise NotImplementedError(
"Edge masking is currently unsupported by cuGraph-DGL"
)
if prefetch_edge_feats:
warnings.warn("'prefetch_edge_feats' is ignored by cuGraph-DGL")
if prefetch_node_feats:
Expand All @@ -146,6 +141,8 @@ def __init__(
if fused:
warnings.warn("'fused' is ignored by cuGraph-DGL")

self.__prob_attr = prob

self.fanouts = fanouts_per_layer
reverse_fanouts = fanouts_per_layer.copy()
reverse_fanouts.reverse()
Expand Down Expand Up @@ -180,8 +177,14 @@ def sample(
format=kwargs.pop("format", "parquet"),
)

ds = UniformNeighborSampler(
g._graph(self.edge_dir),
sampling_clx = (
UniformNeighborSampler
if self.__prob_attr is None
else BiasedNeighborSampler
)

ds = sampling_clx(
g._graph(self.edge_dir, prob_attr=self.__prob_attr),
writer,
compression="CSR",
fanout=self._reversed_fanout_vals,
Expand Down
148 changes: 82 additions & 66 deletions python/cugraph-dgl/cugraph_dgl/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def add_edges(
self.__graph = None
self.__vertex_offsets = None

def num_nodes(self, ntype: str = None) -> int:
def num_nodes(self, ntype: Optional[str] = None) -> int:
"""
Returns the number of nodes of ntype, or if ntype is not provided,
the total number of nodes in the graph.
Expand All @@ -322,7 +322,7 @@ def num_nodes(self, ntype: str = None) -> int:

return self.__num_nodes_dict[ntype]

def number_of_nodes(self, ntype: str = None) -> int:
def number_of_nodes(self, ntype: Optional[str] = None) -> int:
"""
Alias for num_nodes.
"""
Expand Down Expand Up @@ -381,7 +381,7 @@ def _vertex_offsets(self) -> Dict[str, int]:

return dict(self.__vertex_offsets)

def __get_edgelist(self) -> Dict[str, "torch.Tensor"]:
def __get_edgelist(self, prob_attr=None) -> Dict[str, "torch.Tensor"]:
"""
This function always returns src/dst labels with respect
to the out direction.
Expand Down Expand Up @@ -431,63 +431,71 @@ def __get_edgelist(self) -> Dict[str, "torch.Tensor"]:
)
)

num_edges_t = torch.tensor(
[self.__edge_indices[et].shape[1] for et in sorted_keys], device="cuda"
)

if self.is_multi_gpu:
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()

num_edges_t = torch.tensor(
[self.__edge_indices[et].shape[1] for et in sorted_keys], device="cuda"
)
num_edges_all_t = torch.empty(
world_size, num_edges_t.numel(), dtype=torch.int64, device="cuda"
)
torch.distributed.all_gather_into_tensor(num_edges_all_t, num_edges_t)

if rank > 0:
start_offsets = num_edges_all_t[:rank].T.sum(axis=1)
edge_id_array = torch.concat(
start_offsets = num_edges_all_t[:rank].T.sum(axis=1)

else:
rank = 0
start_offsets = torch.zeros(
(len(sorted_keys),), dtype=torch.int64, device="cuda"
)
num_edges_all_t = num_edges_t.reshape((1, num_edges_t.numel()))

# Use pinned memory here for fast access to CPU/WG storage
edge_id_array_per_type = [
torch.arange(
start_offsets[i],
start_offsets[i] + num_edges_all_t[rank][i],
dtype=torch.int64,
device="cpu",
).pin_memory()
for i in range(len(sorted_keys))
]

# Retrieve the weights from the appropriate feature(s)
# DGL implicitly requires all edge types use the same
# feature name.
if prob_attr is None:
weights = None
else:
if len(sorted_keys) > 1:
weights = torch.concat(
[
torch.arange(
start_offsets[i],
start_offsets[i] + num_edges_all_t[rank][i],
dtype=torch.int64,
device="cuda",
)
for i in range(len(sorted_keys))
self.edata[prob_attr][sorted_keys[i]][ix]
for i, ix in enumerate(edge_id_array_per_type)
]
)
else:
edge_id_array = torch.concat(
[
torch.arange(
self.__edge_indices[et].shape[1],
dtype=torch.int64,
device="cuda",
)
for et in sorted_keys
]
)
weights = self.edata[prob_attr][edge_id_array_per_type[0]]

else:
# single GPU
edge_id_array = torch.concat(
[
torch.arange(
self.__edge_indices[et].shape[1],
dtype=torch.int64,
device="cuda",
)
for et in sorted_keys
]
)
# Safe to move this to cuda because the consumer will always
# move it to cuda if it isn't already there.
edge_id_array = torch.concat(edge_id_array_per_type).cuda()

return {
edgelist_dict = {
"src": edge_index[0],
"dst": edge_index[1],
"etp": edge_type_array,
"eid": edge_id_array,
}

if weights is not None:
edgelist_dict["wgt"] = weights

return edgelist_dict

@property
def is_homogeneous(self):
return len(self.__num_edges_dict) <= 1 and len(self.__num_nodes_dict) <= 1
Expand All @@ -508,7 +516,9 @@ def _resource_handle(self):
return self.__handle

def _graph(
self, direction: str
self,
direction: str,
prob_attr: Optional[str] = None,
) -> Union[pylibcugraph.SGGraph, pylibcugraph.MGGraph]:
"""
Gets the pylibcugraph Graph object with edges pointing in the given direction
Expand All @@ -522,12 +532,16 @@ def _graph(
is_multigraph=True, is_symmetric=False
)

if self.__graph is not None and self.__graph[1] != direction:
self.__graph = None
if self.__graph is not None:
if (
self.__graph["direction"] != direction
or self.__graph["prob_attr"] != prob_attr
):
self.__graph = None

if self.__graph is None:
src_col, dst_col = ("src", "dst") if direction == "out" else ("dst", "src")
edgelist_dict = self.__get_edgelist()
edgelist_dict = self.__get_edgelist(prob_attr=prob_attr)

if self.is_multi_gpu:
rank = torch.distributed.get_rank()
Expand All @@ -536,33 +550,35 @@ def _graph(
vertices_array = cupy.arange(self.num_nodes(), dtype="int64")
vertices_array = cupy.array_split(vertices_array, world_size)[rank]

self.__graph = (
pylibcugraph.MGGraph(
self._resource_handle,
graph_properties,
[cupy.asarray(edgelist_dict[src_col]).astype("int64")],
[cupy.asarray(edgelist_dict[dst_col]).astype("int64")],
vertices_array=[vertices_array],
edge_id_array=[cupy.asarray(edgelist_dict["eid"])],
edge_type_array=[cupy.asarray(edgelist_dict["etp"])],
),
direction,
graph = pylibcugraph.MGGraph(
self._resource_handle,
graph_properties,
[cupy.asarray(edgelist_dict[src_col]).astype("int64")],
[cupy.asarray(edgelist_dict[dst_col]).astype("int64")],
vertices_array=[vertices_array],
edge_id_array=[cupy.asarray(edgelist_dict["eid"])],
edge_type_array=[cupy.asarray(edgelist_dict["etp"])],
weight_array=[cupy.asarray(edgelist_dict["wgt"])]
if "wgt" in edgelist_dict
else None,
)
else:
self.__graph = (
pylibcugraph.SGGraph(
self._resource_handle,
graph_properties,
cupy.asarray(edgelist_dict[src_col]).astype("int64"),
cupy.asarray(edgelist_dict[dst_col]).astype("int64"),
vertices_array=cupy.arange(self.num_nodes(), dtype="int64"),
edge_id_array=cupy.asarray(edgelist_dict["eid"]),
edge_type_array=cupy.asarray(edgelist_dict["etp"]),
),
direction,
graph = pylibcugraph.SGGraph(
self._resource_handle,
graph_properties,
cupy.asarray(edgelist_dict[src_col]).astype("int64"),
cupy.asarray(edgelist_dict[dst_col]).astype("int64"),
vertices_array=cupy.arange(self.num_nodes(), dtype="int64"),
edge_id_array=cupy.asarray(edgelist_dict["eid"]),
edge_type_array=cupy.asarray(edgelist_dict["etp"]),
weight_array=cupy.asarray(edgelist_dict["wgt"])
if "wgt" in edgelist_dict
else None,
)

return self.__graph[0]
self.__graph = {"graph": graph, "direction": direction, "prob_attr": prob_attr}

return self.__graph["graph"]

def _has_n_emb(self, ntype: str, emb_name: str) -> bool:
return (ntype, emb_name) in self.__ndata_storage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.


import cugraph_dgl.dataloading
import pytest

Expand Down Expand Up @@ -48,9 +49,12 @@ def test_dataloader_basic_homogeneous():
assert len(out_t) <= 2


def sample_dgl_graphs(g, train_nid, fanouts, batch_size=1):
def sample_dgl_graphs(g, train_nid, fanouts, batch_size=1, prob_attr=None):
# Single fanout to match cugraph
sampler = dgl.dataloading.NeighborSampler(fanouts)
sampler = dgl.dataloading.NeighborSampler(
fanouts,
prob=prob_attr,
)
dataloader = dgl.dataloading.DataLoader(
g,
train_nid,
Expand All @@ -71,8 +75,13 @@ def sample_dgl_graphs(g, train_nid, fanouts, batch_size=1):
return dgl_output


def sample_cugraph_dgl_graphs(cugraph_g, train_nid, fanouts, batch_size=1):
sampler = cugraph_dgl.dataloading.NeighborSampler(fanouts)
def sample_cugraph_dgl_graphs(
cugraph_g, train_nid, fanouts, batch_size=1, prob_attr=None
):
sampler = cugraph_dgl.dataloading.NeighborSampler(
fanouts,
prob=prob_attr,
)

dataloader = cugraph_dgl.dataloading.FutureDataLoader(
cugraph_g,
Expand Down Expand Up @@ -126,3 +135,41 @@ def test_same_homogeneousgraph_results(ix, batch_size):
dgl_output[0]["blocks"][0].num_edges()
== cugraph_output[0]["blocks"][0].num_edges()
)


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
@pytest.mark.skipif(isinstance(dgl, MissingModule), reason="dgl not available")
def test_dataloader_biased_homogeneous():
src = torch.tensor([1, 2, 3, 4, 5, 6, 7, 8])
dst = torch.tensor([0, 0, 0, 0, 1, 1, 1, 1])
wgt = torch.tensor([1, 1, 2, 0, 0, 0, 2, 1], dtype=torch.float32)

train_nid = torch.tensor([0, 1])
# Create a heterograph with 3 node types and 3 edges types.
dgl_g = dgl.graph((src, dst))
dgl_g.edata["wgt"] = wgt

cugraph_g = cugraph_dgl.Graph(is_multi_gpu=False)
cugraph_g.add_nodes(9)
cugraph_g.add_edges(u=src, v=dst, data={"wgt": wgt})

dgl_output = sample_dgl_graphs(dgl_g, train_nid, [4], batch_size=2, prob_attr="wgt")
cugraph_output = sample_cugraph_dgl_graphs(
cugraph_g, train_nid, [4], batch_size=2, prob_attr="wgt"
)

cugraph_output_nodes = cugraph_output[0]["output_nodes"].cpu().numpy()
dgl_output_nodes = dgl_output[0]["output_nodes"].cpu().numpy()

np.testing.assert_array_equal(
np.sort(cugraph_output_nodes), np.sort(dgl_output_nodes)
)
assert (
dgl_output[0]["blocks"][0].num_dst_nodes()
== cugraph_output[0]["blocks"][0].num_dst_nodes()
)
assert (
dgl_output[0]["blocks"][0].num_edges()
== cugraph_output[0]["blocks"][0].num_edges()
)
assert 5 == cugraph_output[0]["blocks"][0].num_edges()
Loading
Loading