Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 36 additions & 21 deletions kagen/io/parhip.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ ParhipID BuildVersion(
const ParhipID vertex_weights_bit = (static_cast<SInt>(has_vertex_weights) ^ 1) << 1;
const ParhipID edge_id_width_bit = static_cast<SInt>(has_32bit_edge_ids) << 2;
const ParhipID vertex_id_width_bit = static_cast<SInt>(has_32bit_vertex_ids) << 3;
const ParhipID vertex_weight_width_bit = static_cast<SInt>(has_32bit_vertex_weights) << 3;
const ParhipID edge_weight_width_bit = static_cast<SInt>(has_32bit_edge_weights) << 4;
const ParhipID vertex_weight_width_bit = static_cast<SInt>(has_32bit_vertex_weights) << 4;
const ParhipID edge_weight_width_bit = static_cast<SInt>(has_32bit_edge_weights) << 5;

return vertex_weights_bit | edge_weights_bit | edge_id_width_bit | vertex_id_width_bit | vertex_weight_width_bit
| edge_weight_width_bit;
Expand Down Expand Up @@ -69,8 +69,9 @@ void ParhipWriter::WriteHeader(const std::string& filename) {
// Header
if (rank_ == ROOT && config_.header != OutputHeader::NEVER) {
std::ofstream out(filename, std::ios_base::binary | std::ios_base::out | std::ios_base::trunc);
const ParhipID version =
BuildVersion(info_.has_vertex_weights, info_.has_edge_weights, false, config_.width == 32, false, false);
const ParhipID version = BuildVersion(
info_.has_vertex_weights, info_.has_edge_weights, false, config_.vtx_width == 32, false,
config_.adjwgt_width == 32);
const ParhipID global_n = info_.global_n;
const ParhipID global_m = info_.global_m;
out.write(reinterpret_cast<const char*>(&version), sizeof(ParhipID));
Expand All @@ -79,6 +80,30 @@ void ParhipWriter::WriteHeader(const std::string& filename) {
}
}

void ParhipWriter::WriteOffsets(const std::string& filename, const XadjArray& xadj) {
const std::uint64_t vtx_width_in_byte = (config_.vtx_width == 32) ? 4 : 8;
std::vector<parhip::ParhipID> xadj_in_file(info_.local_n + 1);
const std::uint64_t header_size = 3 * sizeof(parhip::ParhipID);
const std::uint64_t xadj_offset_size = (info_.global_n + 1) * sizeof(parhip::ParhipID);
const std::uint64_t prev_adjncy_offset_size = (info_.offset_m) * vtx_width_in_byte;
std::uint64_t cur_offset = header_size + xadj_offset_size + prev_adjncy_offset_size;
for (std::uint64_t v = 0; v <= info_.local_n; ++v) {
xadj_in_file[v] = static_cast<parhip::ParhipID>(cur_offset + xadj[v] * vtx_width_in_byte);
}

std::ofstream out(filename, std::ios_base::binary | std::ios_base::out | std::ios_base::app);
if (!out)
throw IOError("Failed to open file " + filename + " for writing");

// Case distinction: the last offset acts as a guardian and should only be written by the last PE
if (rank_ + 1 == size_) {
out.write(reinterpret_cast<const char*>(xadj_in_file.data()), xadj_in_file.size() * sizeof(parhip::ParhipID));
} else {
out.write(
reinterpret_cast<const char*>(xadj_in_file.data()), (xadj_in_file.size() - 1) * sizeof(parhip::ParhipID));
}
}

void ParhipWriter::WriteOffsets(const std::string& filename) {
std::vector<ParhipID> offset(graph_.NumberOfLocalVertices() + 1);

Expand Down Expand Up @@ -138,30 +163,20 @@ void WriteEdgesImpl(std::ofstream& out, const Edgelist& edges) {

void ParhipWriter::WriteEdges(const std::string& filename) {
std::ofstream out(filename, std::ios_base::out | std::ios_base::binary | std::ios_base::app);
if (config_.width == 32) {
if (config_.vtx_width == 32) {
WriteEdgesImpl<std::uint32_t>(out, graph_.edges);
} else {
WriteEdgesImpl<ParhipID>(out, graph_.edges);
}
}

void ParhipWriter::WriteVertexWeights(const std::string& filename) {
std::ofstream out(filename, std::ios_base::out | std::ios_base::binary | std::ios_base::app);
out.write(
reinterpret_cast<const char*>(graph_.vertex_weights.data()),
graph_.vertex_weights.size() * sizeof(ParhipWeight));
}

void ParhipWriter::WriteEdgeWeights(const std::string& filename) {
std::ofstream out(filename, std::ios_base::out | std::ios_base::binary | std::ios_base::app);
out.write(
reinterpret_cast<const char*>(graph_.edge_weights.data()), graph_.edge_weights.size() * sizeof(ParhipWeight));
}

bool ParhipWriter::Write(const int pass, const std::string& filename) {
if (config_.distributed) {
throw IOError("ParHiP format does not support distributed output");
}
if (graph_.representation == GraphRepresentation::CSR) {
return WriteFromCSR(pass, filename, graph_.xadj, graph_.adjncy, &graph_.vertex_weights, &graph_.edge_weights);
}

graph_.SortEdgelist();

Expand All @@ -181,14 +196,14 @@ bool ParhipWriter::Write(const int pass, const std::string& filename) {

case 2:
if (info_.has_vertex_weights) {
WriteVertexWeights(filename);
WriteWeights(filename, graph_.vertex_weights, sizeof(VertexWeights::value_type) == 4);
} else {
WriteEdgeWeights(filename);
WriteWeights(filename, graph_.edge_weights, sizeof(EdgeWeights::value_type) == 4);
}
return info_.has_vertex_weights && info_.has_edge_weights;

case 3:
WriteEdgeWeights(filename);
WriteWeights(filename, graph_.edge_weights, sizeof(EdgeWeights::value_type) == 4);
return false;
}

Expand Down
92 changes: 89 additions & 3 deletions kagen/io/parhip.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#pragma once

#include "kagen/context.h"
#include "kagen/io.h"
#include "kagen/io/graph_format.h"

#include <fstream>
Expand Down Expand Up @@ -78,16 +79,29 @@ class ParhipWriter : public GraphWriter {

bool Write(const int pass, const std::string& filename) final;

template <typename VertexId, typename VertexWeight, typename EdgeWeight>
bool WriteFromCSR(
const int pass, const std::string& filename, const XadjArray& xadj, const std::vector<VertexId>& adjncy,
const std::vector<VertexWeight>* vertex_weights = nullptr,
const std::vector<EdgeWeight>* edge_weights = nullptr);

private:
void WriteHeader(const std::string& filename);
const std::size_t buffer_chunk_size_ = 1024 * 1024;
void WriteHeader(const std::string& filename);

void WriteOffsets(const std::string& filename);
void WriteOffsets(const std::string& filename, const XadjArray& xadj);

void WriteEdges(const std::string& filename);

void WriteVertexWeights(const std::string& filename);
template <typename T, typename OnDiskT>
void WriteInChunks(std::vector<T> const& data, std::ofstream& out) const;

template <typename T>
void WriteAdjncyArray(const std::string& filename, const std::vector<T>& adjncy_array) const;

void WriteEdgeWeights(const std::string& filename);
template <typename T>
void WriteWeights(const std::string& filename, const std::vector<T>& data, bool use_32_bits) const;
};

class ParhipReader : public GraphReader {
Expand Down Expand Up @@ -119,4 +133,76 @@ class ParhipFactory : public FileFormatFactory {
std::unique_ptr<GraphWriter>
CreateWriter(const OutputGraphConfig& config, Graph& graph, GraphInfo info, PEID rank, PEID size) const final;
};

template <typename VertexId, typename VertexWeight, typename EdgeWeight>
bool ParhipWriter::WriteFromCSR(
const int pass, const std::string& filename, const XadjArray& xadj, const std::vector<VertexId>& adjncy,
const std::vector<VertexWeight>* vertex_weights, const std::vector<EdgeWeight>* edge_weights) {
switch (pass) {
case 0:
WriteHeader(filename);
WriteOffsets(filename, xadj);
return true;

case 1:
WriteAdjncyArray(filename, adjncy);
return info_.has_vertex_weights || info_.has_edge_weights;

case 2:
if (info_.has_vertex_weights) {
WriteWeights<VertexWeight>(filename, *vertex_weights, config_.vwgt_width == 32);
} else {
WriteWeights<EdgeWeight>(filename, *edge_weights, config_.adjwgt_width == 32);
}
return info_.has_vertex_weights && info_.has_edge_weights;

case 3:
if (info_.has_edge_weights) {
WriteWeights(filename, *edge_weights, config_.adjwgt_width == 32);
}
return false;
}
return false;
}

template <typename T, typename OnDiskT>
void ParhipWriter::WriteInChunks(std::vector<T> const& data, std::ofstream& out) const {
std::vector<OnDiskT> buf;
buf.reserve(std::min<std::size_t>(buffer_chunk_size_, data.size()));
for (std::size_t pos = 0, N = data.size(); pos < N; pos += buffer_chunk_size_) {
const std::size_t cnt = std::min<std::size_t>(buffer_chunk_size_, N - pos);
buf.clear();
buf.resize(cnt);
for (std::size_t i = 0; i < cnt; ++i) {
buf[i] = static_cast<OnDiskT>(data[pos + i]);
}
out.write(reinterpret_cast<const char*>(buf.data()), buf.size() * sizeof(OnDiskT));
}
}

template <typename T>
void ParhipWriter::WriteAdjncyArray(const std::string& filename, const std::vector<T>& adjncy_array) const {
std::ofstream out(filename, std::ios_base::binary | std::ios_base::out | std::ios_base::app);
if (!out)
throw IOError("Failed to open file " + filename + " for writing");
if (config_.vtx_width == 32) {
WriteInChunks<T, std::uint32_t>(adjncy_array, out);
} else {
WriteInChunks<T, std::uint64_t>(adjncy_array, out);
}
}

template <typename T>
void ParhipWriter::WriteWeights(const std::string& filename, const std::vector<T>& data, bool use_32_bits) const {
static_assert(std::is_integral<T>::value, "On disk weight type must be integral");

std::ofstream out(filename, std::ios_base::binary | std::ios_base::out | std::ios_base::app);
if (!out)
throw IOError("Failed to open file " + filename + " for writing");
if (use_32_bits) {
WriteInChunks<T, std::int32_t>(data, out);
} else {
WriteInChunks<T, std::int64_t>(data, out);
}
}
} // namespace kagen
5 changes: 5 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ kagen_add_test(test_generic_file_generator
FILES file/generic_file_generator_test.cpp
CORES 1 2 3 4 8 16)

kagen_add_test(test_parhip_reader_writer
FILES file/parhip_reader_writer.cpp
CORES 1 2 3 4 8)


# Geometric Generator
kagen_add_test(test_rgg2d
FILES geometric/rgg2d_test.cpp
Expand Down
Loading