From 2f23ac19139fb8c0723fddcff04be8c73a656bf3 Mon Sep 17 00:00:00 2001 From: Charles Hastings Date: Mon, 5 Aug 2024 13:50:57 -0700 Subject: [PATCH] Negative sampling now working for SG, MG with 1/2/4 GPUs --- cpp/src/sampling/negative_sampling_impl.cuh | 424 +++++++++++++++++--- cpp/tests/sampling/mg_negative_sampling.cu | 83 +--- 2 files changed, 382 insertions(+), 125 deletions(-) diff --git a/cpp/src/sampling/negative_sampling_impl.cuh b/cpp/src/sampling/negative_sampling_impl.cuh index dc174098b59..a0dc500d9e6 100644 --- a/cpp/src/sampling/negative_sampling_impl.cuh +++ b/cpp/src/sampling/negative_sampling_impl.cuh @@ -16,15 +16,376 @@ #pragma once +#include "prims/reduce_v.cuh" +#include "prims/update_edge_src_dst_property.cuh" +#include "utilities/collect_comm.cuh" + #include #include #include +#include +#include + +#include +#include +#include +#include +#include #include +#include +#include #include namespace cugraph { +namespace detail { + +template +class negative_sampling_impl_t { + private: + static const bool store_transposed = false; + + public: + negative_sampling_impl_t( + raft::handle_t const& handle, + graph_view_t const& graph_view, + std::optional> src_bias, + std::optional> dst_bias) + : gpu_bias_v_(0, handle.get_stream()), + src_bias_v_(0, handle.get_stream()), + dst_bias_v_(0, handle.get_stream()), + src_bias_cache_(std::nullopt), + dst_bias_cache_(std::nullopt) + { + // Need to normalize the src_bias + if (src_bias) { + // Normalize the src bias. + rmm::device_uvector normalized_bias(graph_view.local_vertex_partition_range_size(), + handle.get_stream()); + + weight_t sum = reduce_v(handle, graph_view, src_bias->begin()); + + if constexpr (multi_gpu) { + sum = host_scalar_allreduce( + handle.get_comms(), sum, raft::comms::op_t::SUM, handle.get_stream()); + } + + thrust::transform(handle.get_thrust_policy(), + src_bias->begin(), + src_bias->end(), + normalized_bias.begin(), + divider_t{sum}); + + // Distribute the src bias around the edge partitions + src_bias_cache_ = std::make_optional< + edge_src_property_t, weight_t>>( + handle, graph_view); + update_edge_src_property( + handle, graph_view, normalized_bias.begin(), src_bias_cache_->mutable_view()); + } + + if (dst_bias) { + // Normalize the dst bias. + rmm::device_uvector normalized_bias(graph_view.local_vertex_partition_range_size(), + handle.get_stream()); + + weight_t sum = reduce_v(handle, graph_view, dst_bias->begin()); + + if constexpr (multi_gpu) { + sum = host_scalar_allreduce( + handle.get_comms(), sum, raft::comms::op_t::SUM, handle.get_stream()); + } + + thrust::transform(handle.get_thrust_policy(), + dst_bias->begin(), + dst_bias->end(), + normalized_bias.begin(), + divider_t{sum}); + + dst_bias_cache_ = std::make_optional< + edge_dst_property_t, weight_t>>( + handle, graph_view); + update_edge_dst_property( + handle, graph_view, normalized_bias.begin(), dst_bias_cache_->mutable_view()); + } + + if constexpr (multi_gpu) { + weight_t dst_bias_sum{0}; + + if (dst_bias) { + // Compute the dst_bias sum for this partition and normalize cached values + dst_bias_sum = thrust::reduce( + handle.get_thrust_policy(), + dst_bias_cache_->view().value_first(), + dst_bias_cache_->view().value_first() + graph_view.local_edge_partition_dst_range_size(), + weight_t{0}); + + thrust::transform(handle.get_thrust_policy(), + dst_bias_cache_->mutable_view().value_first(), + dst_bias_cache_->mutable_view().value_first() + + graph_view.local_edge_partition_dst_range_size(), + dst_bias_cache_->mutable_view().value_first(), + divider_t{dst_bias_sum}); + + thrust::inclusive_scan(handle.get_thrust_policy(), + dst_bias_cache_->mutable_view().value_first(), + dst_bias_cache_->mutable_view().value_first() + + graph_view.local_edge_partition_dst_range_size(), + dst_bias_cache_->mutable_view().value_first()); + } else { + dst_bias_sum = static_cast(graph_view.local_edge_partition_dst_range_size()) / + static_cast(graph_view.number_of_vertices()); + } + + std::vector h_gpu_bias; + h_gpu_bias.reserve(graph_view.number_of_local_edge_partitions()); + + for (size_t partition_idx = 0; partition_idx < graph_view.number_of_local_edge_partitions(); + ++partition_idx) { + weight_t src_bias_sum{ + static_cast(graph_view.local_edge_partition_src_range_size(partition_idx)) / + static_cast(graph_view.number_of_vertices())}; + + if (src_bias) { + // Normalize each batch of biases and compute the inclusive prefix sum + src_bias_sum = + thrust::reduce(handle.get_thrust_policy(), + src_bias_cache_->view().value_firsts()[partition_idx], + src_bias_cache_->view().value_firsts()[partition_idx] + + graph_view.local_edge_partition_src_range_size(partition_idx), + weight_t{0}); + + thrust::transform(handle.get_thrust_policy(), + src_bias_cache_->mutable_view().value_firsts()[partition_idx], + src_bias_cache_->mutable_view().value_firsts()[partition_idx] + + graph_view.local_edge_partition_src_range_size(partition_idx), + src_bias_cache_->mutable_view().value_firsts()[partition_idx], + divider_t{src_bias_sum}); + + thrust::inclusive_scan(handle.get_thrust_policy(), + src_bias_cache_->mutable_view().value_firsts()[partition_idx], + src_bias_cache_->mutable_view().value_firsts()[partition_idx] + + graph_view.local_edge_partition_src_range_size(partition_idx), + src_bias_cache_->mutable_view().value_firsts()[partition_idx]); + } + + // Because src_bias and dst_bias are normalized, the probability of a random edge appearing + // on this partition is (src_bias_sum * dst_bias_sum) + h_gpu_bias.push_back(src_bias_sum * dst_bias_sum); + } + + rmm::device_uvector d_gpu_bias(h_gpu_bias.size(), handle.get_stream()); + raft::update_device( + d_gpu_bias.data(), h_gpu_bias.data(), h_gpu_bias.size(), handle.get_stream()); + + gpu_bias_v_ = cugraph::device_allgatherv( + handle, + handle.get_comms(), + raft::device_span{d_gpu_bias.data(), d_gpu_bias.size()}); + + thrust::inclusive_scan( + handle.get_thrust_policy(), gpu_bias_v_.begin(), gpu_bias_v_.end(), gpu_bias_v_.begin()); + } else { + if (dst_bias_cache_) + thrust::inclusive_scan(handle.get_thrust_policy(), + dst_bias_cache_->mutable_view().value_first(), + dst_bias_cache_->mutable_view().value_first() + + graph_view.local_edge_partition_dst_range_size(), + dst_bias_cache_->mutable_view().value_first()); + + if (src_bias_cache_) + thrust::inclusive_scan(handle.get_thrust_policy(), + src_bias_cache_->mutable_view().value_firsts()[0], + src_bias_cache_->mutable_view().value_firsts()[0] + + graph_view.local_edge_partition_src_range_size(0), + src_bias_cache_->mutable_view().value_firsts()[0]); + } + } + + std::tuple, rmm::device_uvector> create_local_samples( + raft::handle_t const& handle, + raft::random::RngState& rng_state, + graph_view_t const& graph_view, + size_t num_samples) + { + rmm::device_uvector src(0, handle.get_stream()); + rmm::device_uvector dst(0, handle.get_stream()); + + std::vector sample_counts; + + // Determine sample counts per GPU edge partition + if constexpr (multi_gpu) { + auto const comm_size = handle.get_comms().get_size(); + auto const rank = handle.get_comms().get_rank(); + auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name()); + auto const major_comm_size = major_comm.get_size(); + auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); + auto const minor_comm_size = minor_comm.get_size(); + + // First step is to count how many go on each edge_partition + rmm::device_uvector gpu_counts(gpu_bias_v_.size(), handle.get_stream()); + thrust::fill(handle.get_thrust_policy(), gpu_counts.begin(), gpu_counts.end(), int{0}); + + rmm::device_uvector random_values(num_samples, handle.get_stream()); + detail::uniform_random_fill(handle.get_stream(), + random_values.data(), + random_values.size(), + weight_t{0}, + weight_t{1}, + rng_state); + + thrust::sort(handle.get_thrust_policy(), random_values.begin(), random_values.end()); + + thrust::upper_bound(handle.get_thrust_policy(), + random_values.begin(), + random_values.end(), + gpu_bias_v_.begin(), + gpu_bias_v_.end(), + gpu_counts.begin()); + + thrust::adjacent_difference( + handle.get_thrust_policy(), gpu_counts.begin(), gpu_counts.end(), gpu_counts.begin()); + + device_allreduce(handle.get_comms(), + gpu_counts.begin(), + gpu_counts.begin(), + gpu_counts.size(), + raft::comms::op_t::SUM, + handle.get_stream()); + + num_samples = thrust::reduce(handle.get_thrust_policy(), + gpu_counts.begin() + rank * minor_comm_size, + gpu_counts.begin() + rank * minor_comm_size + minor_comm_size, + size_t{0}); + + sample_counts.resize(minor_comm_size); + raft::update_host(sample_counts.data(), + gpu_counts.data() + rank * minor_comm_size, + minor_comm_size, + handle.get_stream()); + + } else { + // SG is only one partition + sample_counts.push_back(num_samples); + } + + src.resize(num_samples, handle.get_stream()); + dst.resize(num_samples, handle.get_stream()); + + size_t current_pos{0}; + + for (size_t partition_idx = 0; partition_idx < graph_view.number_of_local_edge_partitions(); + ++partition_idx) { + if (sample_counts[partition_idx] > 0) { + if (src_bias_cache_) { + rmm::device_uvector random_values(sample_counts[partition_idx], + handle.get_stream()); + detail::uniform_random_fill(handle.get_stream(), + random_values.data(), + random_values.size(), + weight_t{0}, + weight_t{1}, + rng_state); + + thrust::transform( + handle.get_thrust_policy(), + random_values.begin(), + random_values.end(), + src.begin() + current_pos, + [biases = + raft::device_span{ + src_bias_cache_->view().value_firsts()[partition_idx], + static_cast( + graph_view.local_edge_partition_src_range_size(partition_idx))}, + offset = graph_view.local_edge_partition_src_range_first( + partition_idx)] __device__(weight_t r) { + size_t result = + offset + static_cast(thrust::distance( + biases.begin(), + thrust::lower_bound(thrust::seq, biases.begin(), biases.end(), r))); + + // FIXME: https://github.com/rapidsai/raft/issues/2400 + // results in the possibility that 1 can appear as a + // random floating point value, which results in the sampling + // algorithm below generating a value that's OOB. + if (result == (offset + biases.size())) --result; + + return result; + }); + } else { + detail::uniform_random_fill( + handle.get_stream(), + src.data() + current_pos, + sample_counts[partition_idx], + graph_view.local_edge_partition_src_range_first(partition_idx), + graph_view.local_edge_partition_src_range_last(partition_idx), + rng_state); + } + + if (dst_bias_cache_) { + rmm::device_uvector random_values(sample_counts[partition_idx], + handle.get_stream()); + detail::uniform_random_fill(handle.get_stream(), + random_values.data(), + random_values.size(), + weight_t{0}, + weight_t{1}, + rng_state); + + thrust::transform( + handle.get_thrust_policy(), + random_values.begin(), + random_values.end(), + dst.begin() + current_pos, + [biases = + raft::device_span{ + dst_bias_cache_->view().value_first(), + static_cast(graph_view.local_edge_partition_dst_range_size())}, + offset = graph_view.local_edge_partition_dst_range_first()] __device__(weight_t r) { + size_t result = + offset + static_cast(thrust::distance( + biases.begin(), + thrust::lower_bound(thrust::seq, biases.begin(), biases.end(), r))); + + // FIXME: https://github.com/rapidsai/raft/issues/2400 + // results in the possibility that 1 can appear as a + // random floating point value, which results in the sampling + // algorithm below generating a value that's OOB. + if (result == (offset + biases.size())) --result; + + return result; + }); + } else { + detail::uniform_random_fill(handle.get_stream(), + dst.data() + current_pos, + sample_counts[partition_idx], + graph_view.local_edge_partition_dst_range_first(), + graph_view.local_edge_partition_dst_range_last(), + rng_state); + } + + current_pos += sample_counts[partition_idx]; + } + } + + return std::make_tuple(std::move(src), std::move(dst)); + } + + private: + rmm::device_uvector gpu_bias_v_; + rmm::device_uvector src_bias_v_; + rmm::device_uvector dst_bias_v_; + std::optional< + edge_src_property_t, weight_t>> + src_bias_cache_; + std::optional< + edge_dst_property_t, weight_t>> + dst_bias_cache_; +}; + +} // namespace detail + template , rmm::device_uvector> negativ bool exact_number_of_samples, bool do_expensive_check) { + detail::negative_sampling_impl_t impl( + handle, graph_view, src_bias, dst_bias); + rmm::device_uvector src(0, handle.get_stream()); rmm::device_uvector dst(0, handle.get_stream()); @@ -57,60 +421,16 @@ std::tuple, rmm::device_uvector> negativ (samples_in_this_batch / num_gpus) + (rank < (samples_in_this_batch % num_gpus) ? 1 : 0); } - rmm::device_uvector batch_src(samples_in_this_batch, handle.get_stream()); - rmm::device_uvector batch_dst(samples_in_this_batch, handle.get_stream()); - - if (src_bias) { - detail::biased_random_fill(handle, - rng_state, - raft::device_span{batch_src.data(), batch_src.size()}, - *src_bias); - } else { - detail::uniform_random_fill(handle.get_stream(), - batch_src.data(), - batch_src.size(), - vertex_t{0}, - graph_view.number_of_vertices(), - rng_state); - } - - if (dst_bias) { - detail::biased_random_fill(handle, - rng_state, - raft::device_span{batch_dst.data(), batch_dst.size()}, - *dst_bias); - } else { - detail::uniform_random_fill(handle.get_stream(), - batch_dst.data(), - batch_dst.size(), - vertex_t{0}, - graph_view.number_of_vertices(), - rng_state); - } - - if constexpr (multi_gpu) { - auto vertex_partition_range_lasts = graph_view.vertex_partition_range_lasts(); - - std::tie(batch_src, batch_dst, std::ignore, std::ignore, std::ignore) = - detail::shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning( - handle, - std::move(batch_src), - std::move(batch_dst), - std::nullopt, - std::nullopt, - std::nullopt, - vertex_partition_range_lasts); - } + auto [batch_src, batch_dst] = + impl.create_local_samples(handle, rng_state, graph_view, samples_in_this_batch); if (remove_false_negatives) { auto has_edge_flags = graph_view.has_edge(handle, raft::device_span{batch_src.data(), batch_src.size()}, raft::device_span{batch_dst.data(), batch_dst.size()}, - do_expensive_check); + // do_expensive_check); + true); auto begin_iter = thrust::make_zip_iterator(batch_src.begin(), batch_dst.begin(), has_edge_flags.begin()); @@ -177,16 +497,16 @@ std::tuple, rmm::device_uvector> negativ } if (exact_number_of_samples) { - size_t num_batch_samples = src.size(); + size_t current_sample_size = src.size(); if constexpr (multi_gpu) { - num_batch_samples = cugraph::host_scalar_allreduce( - handle.get_comms(), num_batch_samples, raft::comms::op_t::SUM, handle.get_stream()); + current_sample_size = cugraph::host_scalar_allreduce( + handle.get_comms(), current_sample_size, raft::comms::op_t::SUM, handle.get_stream()); } // FIXME: We could oversample and discard the unnecessary samples // to reduce the number of iterations in the outer loop, but it seems like // exact_number_of_samples is an edge case not worth optimizing for at this time. - samples_in_this_batch = num_samples - num_batch_samples; + samples_in_this_batch = num_samples - current_sample_size; } else { samples_in_this_batch = 0; } diff --git a/cpp/tests/sampling/mg_negative_sampling.cu b/cpp/tests/sampling/mg_negative_sampling.cu index e180594f87b..0bc6bc2e737 100644 --- a/cpp/tests/sampling/mg_negative_sampling.cu +++ b/cpp/tests/sampling/mg_negative_sampling.cu @@ -89,7 +89,7 @@ class Tests_MGNegative_Sampling : public ::testing::TestWithParam> dst_bias{std::nullopt}; if (negative_sampling_usecase.use_src_bias) { - src_bias_v.resize(graph_view.number_of_vertices(), handle_->get_stream()); + src_bias_v.resize(graph_view.local_vertex_partition_range_size(), handle_->get_stream()); cugraph::detail::uniform_random_fill(handle_->get_stream(), src_bias_v.data(), @@ -102,7 +102,7 @@ class Tests_MGNegative_Sampling : public ::testing::TestWithParamget_stream()); + dst_bias_v.resize(graph_view.local_vertex_partition_range_size(), handle_->get_stream()); cugraph::detail::uniform_random_fill(handle_->get_stream(), dst_bias_v.data(), @@ -160,12 +160,19 @@ class Tests_MGNegative_Sampling : public ::testing::TestWithParamget_subcomm(cugraph::partition_manager::major_comm_name()).get_size(), handle_->get_subcomm(cugraph::partition_manager::minor_comm_name()) .get_size()}] __device__(auto e) { + if (gpu_id_key_func(thrust::get<0>(e), thrust::get<1>(e)) != comm_rank) + printf(" gpu_id(%d,%d) = %d, expected %d\n", + (int)thrust::get<0>(e), + (int)thrust::get<1>(e), + gpu_id_key_func(thrust::get<0>(e), thrust::get<1>(e)), + comm_rank); + return (gpu_id_key_func(thrust::get<0>(e), thrust::get<1>(e)) != comm_rank); }); ASSERT_EQ(error_count, 0) << "generate edges out of range > 0"; - if (negative_sampling_usecase.remove_duplicates) { + if ((negative_sampling_usecase.remove_duplicates) && (src_out.size() > 0)) { error_count = thrust::count_if( handle_->get_thrust_policy(), thrust::make_counting_iterator(1), @@ -222,21 +229,9 @@ template Tests_MGNegative_Sampling::handle_ = nullptr; -using Tests_MGNegative_Sampling_File_i32_i32_float = - Tests_MGNegative_Sampling; - -using Tests_MGNegative_Sampling_File_i32_i64_float = - Tests_MGNegative_Sampling; - using Tests_MGNegative_Sampling_File_i64_i64_float = Tests_MGNegative_Sampling; -using Tests_MGNegative_Sampling_Rmat_i32_i32_float = - Tests_MGNegative_Sampling; - -using Tests_MGNegative_Sampling_Rmat_i32_i64_float = - Tests_MGNegative_Sampling; - using Tests_MGNegative_Sampling_Rmat_i64_i64_float = Tests_MGNegative_Sampling; @@ -312,71 +307,23 @@ void run_all_tests(CurrentTest* current_test) Negative_Sampling_Usecase{2, true, true, true, true, true, true}); } -TEST_P(Tests_MGNegative_Sampling_File_i32_i32_float, CheckInt32Int32Float) -{ - load_graph(override_File_Usecase_with_cmd_line_arguments(GetParam())); - run_all_tests(this); -} - -TEST_P(Tests_MGNegative_Sampling_File_i32_i64_float, CheckInt32Int64Float) -{ - load_graph(override_File_Usecase_with_cmd_line_arguments(GetParam())); - run_all_tests(this); -} - TEST_P(Tests_MGNegative_Sampling_File_i64_i64_float, CheckInt64Int64Float) { load_graph(override_File_Usecase_with_cmd_line_arguments(GetParam())); run_all_tests(this); } -TEST_P(Tests_MGNegative_Sampling_Rmat_i32_i32_float, CheckInt32Int32Float) -{ - load_graph(override_Rmat_Usecase_with_cmd_line_arguments(GetParam())); - run_all_tests(this); -} - -TEST_P(Tests_MGNegative_Sampling_Rmat_i32_i64_float, CheckInt32Int64Float) -{ - load_graph(override_Rmat_Usecase_with_cmd_line_arguments(GetParam())); - run_all_tests(this); -} - TEST_P(Tests_MGNegative_Sampling_Rmat_i64_i64_float, CheckInt64Int64Float) { load_graph(override_Rmat_Usecase_with_cmd_line_arguments(GetParam())); run_all_tests(this); } -INSTANTIATE_TEST_SUITE_P( - file_test, - Tests_MGNegative_Sampling_File_i32_i32_float, - ::testing::Values(cugraph::test::File_Usecase("test/datasets/karate.mtx"))); - -INSTANTIATE_TEST_SUITE_P( - file_test, - Tests_MGNegative_Sampling_File_i32_i64_float, - ::testing::Values(cugraph::test::File_Usecase("test/datasets/karate.mtx"))); - INSTANTIATE_TEST_SUITE_P( file_test, Tests_MGNegative_Sampling_File_i64_i64_float, ::testing::Values(cugraph::test::File_Usecase("test/datasets/karate.mtx"))); -INSTANTIATE_TEST_SUITE_P( - file_large_test, - Tests_MGNegative_Sampling_File_i32_i32_float, - ::testing::Values(cugraph::test::File_Usecase("test/datasets/web-Google.mtx"), - cugraph::test::File_Usecase("test/datasets/ljournal-2008.mtx"), - cugraph::test::File_Usecase("test/datasets/webbase-1M.mtx"))); - -INSTANTIATE_TEST_SUITE_P( - file_large_test, - Tests_MGNegative_Sampling_File_i32_i64_float, - ::testing::Values(cugraph::test::File_Usecase("test/datasets/web-Google.mtx"), - cugraph::test::File_Usecase("test/datasets/ljournal-2008.mtx"), - cugraph::test::File_Usecase("test/datasets/webbase-1M.mtx"))); - INSTANTIATE_TEST_SUITE_P( file_large_test, Tests_MGNegative_Sampling_File_i64_i64_float, @@ -384,16 +331,6 @@ INSTANTIATE_TEST_SUITE_P( cugraph::test::File_Usecase("test/datasets/ljournal-2008.mtx"), cugraph::test::File_Usecase("test/datasets/webbase-1M.mtx"))); -INSTANTIATE_TEST_SUITE_P( - rmat_small_test, - Tests_MGNegative_Sampling_Rmat_i32_i32_float, - ::testing::Values(cugraph::test::Rmat_Usecase(10, 16, 0.57, 0.19, 0.19, 0, false, false, 0))); - -INSTANTIATE_TEST_SUITE_P( - rmat_small_test, - Tests_MGNegative_Sampling_Rmat_i32_i64_float, - ::testing::Values(cugraph::test::Rmat_Usecase(10, 16, 0.57, 0.19, 0.19, 0, false, false, 0))); - INSTANTIATE_TEST_SUITE_P( rmat_small_test, Tests_MGNegative_Sampling_Rmat_i64_i64_float,