diff --git a/kagen/io/parhip.cpp b/kagen/io/parhip.cpp index a459a76a..e16c10d1 100644 --- a/kagen/io/parhip.cpp +++ b/kagen/io/parhip.cpp @@ -29,8 +29,8 @@ ParhipID BuildVersion( const ParhipID vertex_weights_bit = (static_cast(has_vertex_weights) ^ 1) << 1; const ParhipID edge_id_width_bit = static_cast(has_32bit_edge_ids) << 2; const ParhipID vertex_id_width_bit = static_cast(has_32bit_vertex_ids) << 3; - const ParhipID vertex_weight_width_bit = static_cast(has_32bit_vertex_weights) << 3; - const ParhipID edge_weight_width_bit = static_cast(has_32bit_edge_weights) << 4; + const ParhipID vertex_weight_width_bit = static_cast(has_32bit_vertex_weights) << 4; + const ParhipID edge_weight_width_bit = static_cast(has_32bit_edge_weights) << 5; return vertex_weights_bit | edge_weights_bit | edge_id_width_bit | vertex_id_width_bit | vertex_weight_width_bit | edge_weight_width_bit; @@ -69,8 +69,9 @@ void ParhipWriter::WriteHeader(const std::string& filename) { // Header if (rank_ == ROOT && config_.header != OutputHeader::NEVER) { std::ofstream out(filename, std::ios_base::binary | std::ios_base::out | std::ios_base::trunc); - const ParhipID version = - BuildVersion(info_.has_vertex_weights, info_.has_edge_weights, false, config_.width == 32, false, false); + const ParhipID version = BuildVersion( + info_.has_vertex_weights, info_.has_edge_weights, false, config_.vtx_width == 32, false, + config_.adjwgt_width == 32); const ParhipID global_n = info_.global_n; const ParhipID global_m = info_.global_m; out.write(reinterpret_cast(&version), sizeof(ParhipID)); @@ -79,6 +80,30 @@ void ParhipWriter::WriteHeader(const std::string& filename) { } } +void ParhipWriter::WriteOffsets(const std::string& filename, const XadjArray& xadj) { + const std::uint64_t vtx_width_in_byte = (config_.vtx_width == 32) ? 4 : 8; + std::vector xadj_in_file(info_.local_n + 1); + const std::uint64_t header_size = 3 * sizeof(parhip::ParhipID); + const std::uint64_t xadj_offset_size = (info_.global_n + 1) * sizeof(parhip::ParhipID); + const std::uint64_t prev_adjncy_offset_size = (info_.offset_m) * vtx_width_in_byte; + std::uint64_t cur_offset = header_size + xadj_offset_size + prev_adjncy_offset_size; + for (std::uint64_t v = 0; v <= info_.local_n; ++v) { + xadj_in_file[v] = static_cast(cur_offset + xadj[v] * vtx_width_in_byte); + } + + std::ofstream out(filename, std::ios_base::binary | std::ios_base::out | std::ios_base::app); + if (!out) + throw IOError("Failed to open file " + filename + " for writing"); + + // Case distinction: the last offset acts as a guardian and should only be written by the last PE + if (rank_ + 1 == size_) { + out.write(reinterpret_cast(xadj_in_file.data()), xadj_in_file.size() * sizeof(parhip::ParhipID)); + } else { + out.write( + reinterpret_cast(xadj_in_file.data()), (xadj_in_file.size() - 1) * sizeof(parhip::ParhipID)); + } +} + void ParhipWriter::WriteOffsets(const std::string& filename) { std::vector offset(graph_.NumberOfLocalVertices() + 1); @@ -138,30 +163,20 @@ void WriteEdgesImpl(std::ofstream& out, const Edgelist& edges) { void ParhipWriter::WriteEdges(const std::string& filename) { std::ofstream out(filename, std::ios_base::out | std::ios_base::binary | std::ios_base::app); - if (config_.width == 32) { + if (config_.vtx_width == 32) { WriteEdgesImpl(out, graph_.edges); } else { WriteEdgesImpl(out, graph_.edges); } } -void ParhipWriter::WriteVertexWeights(const std::string& filename) { - std::ofstream out(filename, std::ios_base::out | std::ios_base::binary | std::ios_base::app); - out.write( - reinterpret_cast(graph_.vertex_weights.data()), - graph_.vertex_weights.size() * sizeof(ParhipWeight)); -} - -void ParhipWriter::WriteEdgeWeights(const std::string& filename) { - std::ofstream out(filename, std::ios_base::out | std::ios_base::binary | std::ios_base::app); - out.write( - reinterpret_cast(graph_.edge_weights.data()), graph_.edge_weights.size() * sizeof(ParhipWeight)); -} - bool ParhipWriter::Write(const int pass, const std::string& filename) { if (config_.distributed) { throw IOError("ParHiP format does not support distributed output"); } + if (graph_.representation == GraphRepresentation::CSR) { + return WriteFromCSR(pass, filename, graph_.xadj, graph_.adjncy, &graph_.vertex_weights, &graph_.edge_weights); + } graph_.SortEdgelist(); @@ -181,14 +196,14 @@ bool ParhipWriter::Write(const int pass, const std::string& filename) { case 2: if (info_.has_vertex_weights) { - WriteVertexWeights(filename); + WriteWeights(filename, graph_.vertex_weights, sizeof(VertexWeights::value_type) == 4); } else { - WriteEdgeWeights(filename); + WriteWeights(filename, graph_.edge_weights, sizeof(EdgeWeights::value_type) == 4); } return info_.has_vertex_weights && info_.has_edge_weights; case 3: - WriteEdgeWeights(filename); + WriteWeights(filename, graph_.edge_weights, sizeof(EdgeWeights::value_type) == 4); return false; } diff --git a/kagen/io/parhip.h b/kagen/io/parhip.h index bd53a70c..df6210f5 100644 --- a/kagen/io/parhip.h +++ b/kagen/io/parhip.h @@ -45,6 +45,7 @@ #pragma once #include "kagen/context.h" +#include "kagen/io.h" #include "kagen/io/graph_format.h" #include @@ -78,16 +79,29 @@ class ParhipWriter : public GraphWriter { bool Write(const int pass, const std::string& filename) final; + template + bool WriteFromCSR( + const int pass, const std::string& filename, const XadjArray& xadj, const std::vector& adjncy, + const std::vector* vertex_weights = nullptr, + const std::vector* edge_weights = nullptr); + private: - void WriteHeader(const std::string& filename); + const std::size_t buffer_chunk_size_ = 1024 * 1024; + void WriteHeader(const std::string& filename); void WriteOffsets(const std::string& filename); + void WriteOffsets(const std::string& filename, const XadjArray& xadj); void WriteEdges(const std::string& filename); - void WriteVertexWeights(const std::string& filename); + template + void WriteInChunks(std::vector const& data, std::ofstream& out) const; + + template + void WriteAdjncyArray(const std::string& filename, const std::vector& adjncy_array) const; - void WriteEdgeWeights(const std::string& filename); + template + void WriteWeights(const std::string& filename, const std::vector& data, bool use_32_bits) const; }; class ParhipReader : public GraphReader { @@ -119,4 +133,76 @@ class ParhipFactory : public FileFormatFactory { std::unique_ptr CreateWriter(const OutputGraphConfig& config, Graph& graph, GraphInfo info, PEID rank, PEID size) const final; }; + +template +bool ParhipWriter::WriteFromCSR( + const int pass, const std::string& filename, const XadjArray& xadj, const std::vector& adjncy, + const std::vector* vertex_weights, const std::vector* edge_weights) { + switch (pass) { + case 0: + WriteHeader(filename); + WriteOffsets(filename, xadj); + return true; + + case 1: + WriteAdjncyArray(filename, adjncy); + return info_.has_vertex_weights || info_.has_edge_weights; + + case 2: + if (info_.has_vertex_weights) { + WriteWeights(filename, *vertex_weights, config_.vwgt_width == 32); + } else { + WriteWeights(filename, *edge_weights, config_.adjwgt_width == 32); + } + return info_.has_vertex_weights && info_.has_edge_weights; + + case 3: + if (info_.has_edge_weights) { + WriteWeights(filename, *edge_weights, config_.adjwgt_width == 32); + } + return false; + } + return false; +} + +template +void ParhipWriter::WriteInChunks(std::vector const& data, std::ofstream& out) const { + std::vector buf; + buf.reserve(std::min(buffer_chunk_size_, data.size())); + for (std::size_t pos = 0, N = data.size(); pos < N; pos += buffer_chunk_size_) { + const std::size_t cnt = std::min(buffer_chunk_size_, N - pos); + buf.clear(); + buf.resize(cnt); + for (std::size_t i = 0; i < cnt; ++i) { + buf[i] = static_cast(data[pos + i]); + } + out.write(reinterpret_cast(buf.data()), buf.size() * sizeof(OnDiskT)); + } +} + +template +void ParhipWriter::WriteAdjncyArray(const std::string& filename, const std::vector& adjncy_array) const { + std::ofstream out(filename, std::ios_base::binary | std::ios_base::out | std::ios_base::app); + if (!out) + throw IOError("Failed to open file " + filename + " for writing"); + if (config_.vtx_width == 32) { + WriteInChunks(adjncy_array, out); + } else { + WriteInChunks(adjncy_array, out); + } +} + +template +void ParhipWriter::WriteWeights(const std::string& filename, const std::vector& data, bool use_32_bits) const { + static_assert(std::is_integral::value, "On disk weight type must be integral"); + + std::ofstream out(filename, std::ios_base::binary | std::ios_base::out | std::ios_base::app); + if (!out) + throw IOError("Failed to open file " + filename + " for writing"); + if (use_32_bits) { + WriteInChunks(data, out); + } else { + WriteInChunks(data, out); + } +} } // namespace kagen diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index baa40c49..3fc82758 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -53,6 +53,11 @@ kagen_add_test(test_generic_file_generator FILES file/generic_file_generator_test.cpp CORES 1 2 3 4 8 16) +kagen_add_test(test_parhip_reader_writer + FILES file/parhip_reader_writer.cpp + CORES 1 2 3 4 8) + + # Geometric Generator kagen_add_test(test_rgg2d FILES geometric/rgg2d_test.cpp diff --git a/tests/file/parhip_reader_writer.cpp b/tests/file/parhip_reader_writer.cpp new file mode 100644 index 00000000..6a2be55c --- /dev/null +++ b/tests/file/parhip_reader_writer.cpp @@ -0,0 +1,223 @@ +#include "kagen/context.h" +#include "kagen/generators/file/file_graph.h" + +#include +#include +#include + +#include "io/parhip.h" +#include "tests/gather.h" +#include +#include +#include +#include + +using namespace kagen; + +using WeightRange = std::pair; +using GeneratorFunc = std::function; +MATCHER_P(EqualAdjacenyStructure, graph, "") { + bool same_edge_list = graph.edges == arg.edges; + bool same_xadj = graph.xadj == arg.xadj; + bool same_adjcny = graph.adjncy == arg.adjncy; + return same_edge_list && same_xadj && same_adjcny; +} + +MATCHER_P(EqualWeights, graph, "") { + bool same_edge_weights = graph.edge_weights == arg.edge_weights; + bool same_vertex_weights = graph.vertex_weights == arg.vertex_weights; + return same_edge_weights && same_vertex_weights; +} + +struct ParhipReadWriteTestFixture + : public ::testing::TestWithParam> { +}; + +INSTANTIATE_TEST_SUITE_P( + GenericGeneratorTest, ParhipReadWriteTestFixture, + ::testing::Values( + std::make_tuple( + "GNM", GeneratorFunc([](KaGen& gen, SInt n, SInt m) { return gen.GenerateUndirectedGNM(n, m); }), + GraphDistribution::BALANCE_VERTICES, GraphRepresentation::CSR), + std::make_tuple( + "RMAT", GeneratorFunc([](KaGen& gen, SInt n, SInt m) { return gen.GenerateRMAT(n, m, 0.56, 0.19, 0.19); }), + GraphDistribution::BALANCE_VERTICES, GraphRepresentation::CSR), + std::make_tuple( + "GNM", GeneratorFunc([](KaGen& gen, SInt n, SInt m) { return gen.GenerateUndirectedGNM(n, m); }), + GraphDistribution::BALANCE_EDGES, GraphRepresentation::CSR), + std::make_tuple( + "RMAT", GeneratorFunc([](KaGen& gen, SInt n, SInt m) { return gen.GenerateRMAT(n, m, 0.56, 0.19, 0.19); }), + GraphDistribution::BALANCE_EDGES, GraphRepresentation::CSR), + std::make_tuple( + "GNM", GeneratorFunc([](KaGen& gen, SInt n, SInt m) { return gen.GenerateUndirectedGNM(n, m); }), + GraphDistribution::ROOT, GraphRepresentation::CSR), + std::make_tuple( + "RMAT", GeneratorFunc([](KaGen& gen, SInt n, SInt m) { return gen.GenerateRMAT(n, m, 0.56, 0.19, 0.19); }), + GraphDistribution::ROOT, GraphRepresentation::CSR)), + [](const ::testing::TestParamInfo& info) -> std::string { + const std::string gen = std::get<0>(info.param); + const int dist = static_cast>(std::get<2>(info.param)); + const int rep = static_cast>(std::get<3>(info.param)); + return gen + "_" + std::to_string(dist) + "_" + std::to_string(rep); + }); + +namespace { +template +void write_graph(Write&& write, int rank, int size) { + bool continue_write = false; + int round = 0; + do { + for (int i = 0; i < size; ++i) { + if (i == rank) { + continue_write = write(round); + } + MPI_Barrier(MPI_COMM_WORLD); + } + MPI_Barrier(MPI_COMM_WORLD); + ++round; + } while (continue_write); +} +std::filesystem::path get_temp_dir() { + if (const char* runner_env = std::getenv("RUNNER_TEMP"); runner_env && *runner_env) { + return std::filesystem::path(runner_env); + } + return std::filesystem::path(::testing::TempDir()); +} + +std::filesystem::path get_file_path(const std::string& instance_id) { + return get_temp_dir() / (instance_id + ".parhip"); +} + +std::string get_instance_id(std::string const& test_name) { + auto pos = test_name.rfind('/'); + return (pos == std::string::npos) ? test_name : test_name.substr(pos + 1); +} +} // namespace +// + +TEST_P(ParhipReadWriteTestFixture, default_write_read_in_parhip_format) { + const ::testing::TestInfo* test_info = ::testing::UnitTest::GetInstance()->current_test_info(); + const std::string instance_id = get_instance_id(test_info->name()); + GeneratorFunc generate = std::get<1>(GetParam()); + const SInt n = 1000; + const SInt m = 16 * n; + const WeightRange weight_range{1, 100}; + MPI_Comm comm = MPI_COMM_WORLD; + int rank, size; + MPI_Comm_rank(comm, &rank); + MPI_Comm_size(comm, &size); + + // setup + kagen::KaGen generator(comm); + generator.UseCSRRepresentation(); + generator.ConfigureEdgeWeightGeneration( + kagen::EdgeWeightGeneratorType::UNIFORM_RANDOM, weight_range.first, weight_range.second); + + Graph generated_graph = generate(generator, n, m); + + // writer setup + GraphInfo info(generated_graph, comm); + OutputGraphConfig config; + config.filename = get_file_path(instance_id); + kagen::ParhipWriter writer(config, generated_graph, info, rank, size); + auto write = [&](int round) { + return writer.Write(round, config.filename); + }; + write_graph(write, rank, size); + + // read graph + const auto read_graph = + generator.GenerateFromOptionString("type=file;filename=" + config.filename + ";edgeweights_generator=default"); + + const auto total_generated_graph = kagen::testing::GatherGraph(generated_graph); + const auto total_read_graph = kagen::testing::GatherGraph(read_graph); + EXPECT_THAT(total_read_graph, EqualAdjacenyStructure(total_generated_graph)); + EXPECT_THAT(total_read_graph, EqualWeights(total_generated_graph)); +} + +TEST_P(ParhipReadWriteTestFixture, write_from_csr_read_in_parhip_format) { + const ::testing::TestInfo* test_info = ::testing::UnitTest::GetInstance()->current_test_info(); + const std::string instance_id = get_instance_id(test_info->name()); + GeneratorFunc generate = std::get<1>(GetParam()); + const SInt n = 1000; + const SInt m = 16 * n; + const WeightRange weight_range{1, 100}; + MPI_Comm comm = MPI_COMM_WORLD; + int rank, size; + MPI_Comm_rank(comm, &rank); + MPI_Comm_size(comm, &size); + + // setup + kagen::KaGen generator(comm); + generator.UseCSRRepresentation(); + generator.ConfigureEdgeWeightGeneration( + kagen::EdgeWeightGeneratorType::UNIFORM_RANDOM, weight_range.first, weight_range.second); + + Graph generated_graph = generate(generator, n, m); + + // writer setup + GraphInfo info(generated_graph, comm); + OutputGraphConfig config; + config.filename = get_file_path(instance_id); + kagen::ParhipWriter writer(config, generated_graph, info, rank, size); + auto write = [&](int round) { + return writer.WriteFromCSR( + round, config.filename, generated_graph.xadj, generated_graph.adjncy, nullptr, + &generated_graph.edge_weights); + }; + write_graph(write, rank, size); + + // read graph + const auto read_graph = + generator.GenerateFromOptionString("type=file;filename=" + config.filename + ";edgeweights_generator=default"); + + const auto total_generated_graph = kagen::testing::GatherGraph(generated_graph); + const auto total_read_graph = kagen::testing::GatherGraph(read_graph); + EXPECT_THAT(total_read_graph, EqualAdjacenyStructure(total_generated_graph)); + EXPECT_THAT(total_read_graph, EqualWeights(total_generated_graph)); +} + +TEST_P(ParhipReadWriteTestFixture, write_from_csr_read_in_parhip_format_32bit_edges) { + const ::testing::TestInfo* test_info = ::testing::UnitTest::GetInstance()->current_test_info(); + const std::string instance_id = get_instance_id(test_info->name()); + GeneratorFunc generate = std::get<1>(GetParam()); + const SInt n = 1000; + const SInt m = 16 * n; + const WeightRange weight_range{1, 100}; + MPI_Comm comm = MPI_COMM_WORLD; + int rank, size; + MPI_Comm_rank(comm, &rank); + MPI_Comm_size(comm, &size); + + // setup + kagen::KaGen generator(comm); + generator.UseCSRRepresentation(); + generator.ConfigureEdgeWeightGeneration( + kagen::EdgeWeightGeneratorType::UNIFORM_RANDOM, weight_range.first, weight_range.second); + Graph generated_graph = generate(generator, n, m); + + // writer setup + GraphInfo info(generated_graph, comm); + OutputGraphConfig config; + config.filename = get_file_path(instance_id); + config.adjwgt_width = 32; + kagen::ParhipWriter writer(config, generated_graph, info, rank, size); + std::vector edge_weight_32_bit(generated_graph.edge_weights.size()); + std::fill(edge_weight_32_bit.begin(), edge_weight_32_bit.end(), static_cast(size)); + + auto write = [&](int round) { + return writer.WriteFromCSR( + round, config.filename, generated_graph.xadj, generated_graph.adjncy, nullptr, &edge_weight_32_bit); + }; + write_graph(write, rank, size); + + // read graph + generator.UseCSRRepresentation(); + const auto read_graph = + generator.GenerateFromOptionString("type=file;filename=" + config.filename + ";edgeweights_generator=default"); + const auto total_generated_graph = kagen::testing::GatherGraph(generated_graph); + const auto total_read_graph = kagen::testing::GatherGraph(read_graph); + EXPECT_THAT(total_read_graph, EqualAdjacenyStructure(total_generated_graph)); + EXPECT_THAT(total_read_graph.edge_weights, ::testing::Each(size)); + EXPECT_TRUE(total_read_graph.vertex_weights.empty()); +}