Skip to content

Commit

Permalink
New compactor compiles
Browse files Browse the repository at this point in the history
  • Loading branch information
m09526 committed Nov 14, 2024
1 parent 2f03492 commit d3add2b
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 42 deletions.
6 changes: 4 additions & 2 deletions cpp/include/cmdline/lub.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#pragma once
#include <cudf/io/types.hpp>

#include <cudf/table/table.hpp>

#include <cstddef>
#include <memory>
#include <vector>

::size_t findLeastUpperBound(std::vector<cudf::io::table_with_metadata> const &tables, ::size_t const colNo = 0);
std::size_t findLeastUpperBound(std::vector<std::unique_ptr<cudf::table>> const &tables, std::size_t const colNo = 0);
5 changes: 3 additions & 2 deletions cpp/include/cmdline/slice.hpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
#pragma once

#include <cudf/io/types.hpp>
#include <cudf/scalar/scalar.hpp>
#include <cudf/table/table.hpp>
#include <cudf/table/table_view.hpp>

#include <memory>
#include <utility>
#include <vector>

int convertInteger(cudf::scalar const &scalar);

std::pair<std::vector<cudf::table_view>, std::vector<cudf::table_view>> splitAtNeedle(cudf::table_view const &needle,
std::vector<cudf::io::table_with_metadata> const &haystacks);
std::vector<std::unique_ptr<cudf::table>> const &haystacks);
2 changes: 1 addition & 1 deletion cpp/src/cmdline/check_sort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ int main(int argc, char **argv) {
std::unique_ptr<cudf::table> checkTable;
cudf::table_view checkView = currentTable.tbl->view();
if (prevTable.tbl) {
checkTable = cudf::concatenate(std::vector<cudf::table_view>{ *prevTable.tbl, *currentTable.tbl });
checkTable = cudf::concatenate(std::vector{ prevTable.tbl->view(), currentTable.tbl->view() });
checkView = std::move(checkTable->view());
}
bool chunkSorted = cudf::is_sorted(checkView, { cudf::order::ASCENDING }, { cudf::null_order::AFTER });
Expand Down
77 changes: 53 additions & 24 deletions cpp/src/cmdline/chunk_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

#include <CLI/CLI.hpp>// NOLINT

#include <cudf/concatenate.hpp>
#include <cudf/copying.hpp>
#include <cudf/io/datasource.hpp>
#include <cudf/io/parquet.hpp>
#include <cudf/io/types.hpp>
#include <cudf/merge.hpp>
#include <cudf/table/table.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/error.hpp>
Expand All @@ -33,6 +35,7 @@
#include <memory>
#include <string>
#include <tuple>
#include <utility>
#include <vector>

::size_t calcRowsWritten(auto const &readers) noexcept {
Expand Down Expand Up @@ -65,6 +68,8 @@ int main(int argc, char **argv) {
app.add_option("-c,--chunk-read-limit", chunkReadLimit, "cuDF Parquet reader chunk read limit in MiB");
std::size_t passReadLimit{ 1024 };
app.add_option("-p,--pass-read-limit", passReadLimit, "cuDF Parquet reader pass read limit in MiB");
std::size_t epsilon{ 10'000 };
app.add_option("-e,--epsilon", epsilon, "Lower bound for rows remaining in a table before loading next chunk");
CLI11_PARSE(app, argc, argv);// NOLINT

// force gpu initialization so it's not included in the time
Expand All @@ -81,14 +86,14 @@ int main(int argc, char **argv) {
// Container for all data sources and Parquet readers
std::vector<std::tuple<std::unique_ptr<cudf::io::datasource>,
std::unique_ptr<cudf::io::chunked_parquet_reader>,
::size_t,
::size_t>>
std::size_t,
std::size_t>>
readers;
readers.reserve(inputFiles.size());

// Make readers and find total row count
// We create the data source and disable prefetching while we read the footer
::size_t totalRows = 0;
std::size_t totalRows = 0;
for (auto const &f : inputFiles) {
auto source =
std::make_unique<gpu_compact::io::PrefetchingSource>(f, cudf::io::datasource::create(f), false);
Expand All @@ -110,49 +115,73 @@ int main(int argc, char **argv) {
auto &writer = *sinkDetails.writer;

SPDLOG_INFO("Start reading files");
// Loop doing reads
::size_t lastTotalRowCount = std::numeric_limits<::size_t>::max();
// Remaining parts initially empty
std::vector<std::unique_ptr<cudf::table>> remainingParts{ readers.size() };
std::size_t lastTotalRowCount = std::numeric_limits<std::size_t>::max();
auto const startTime = timestamp();
// Loop doing reads
while (lastTotalRowCount) {
lastTotalRowCount = 0;
// Loop through each reader
std::vector<cudf::io::table_with_metadata> tables;

for (::size_t rc = 0; auto &[src, reader, chunkNo, rowCount] : readers) {
// If reader has data, read a chunk and write, otherwise flag and ignore
for (std::size_t rc = 0; auto &[src, reader, chunkNo, rowCount] : readers) {
// If reader has data and we need some, perform a read
SPDLOG_INFO("Reader {:d}", rc);
if (reader->has_next()) {
SPDLOG_INFO(" Chunk: {:d}", chunkNo);
tables.push_back(reader->read_chunk());
auto const rowsInChunk = tables.back().metadata.num_rows_per_source.at(0);
SPDLOG_INFO(" Read chunk of {:d} rows", rowsInChunk);

// Increment chunk number in reader and add to row count
chunkNo++;
rowCount += rowsInChunk;

// Update overall count
lastTotalRowCount += rowsInChunk;
SPDLOG_INFO(" Reader has rows");
if (!remainingParts[rc] || remainingParts[rc]->num_rows() < epsilon) {
SPDLOG_INFO(
" No previous table or we only have {:d} in memory", remainingParts[rc]->num_rows());

// Read a chunk
SPDLOG_INFO(" Read chunk: {:d}", chunkNo);
auto table = reader->read_chunk();
auto const rowsInChunk = table.metadata.num_rows_per_source.at(0);
SPDLOG_INFO(" Read chunk of {:d} rows", rowsInChunk);
// Increment chunk number in reader and add to row count
chunkNo++;
rowCount += rowsInChunk;

// Now concat the old part to the new chunk
std::unique_ptr<cudf::table> concat =
cudf::concatenate(std::vector{ remainingParts[rc]->view(), table.tbl->view() });
remainingParts[rc] = std::move(concat);
SPDLOG_INFO(" New table has {:d} rows", remainingParts[rc]->num_rows());
}
} else {
SPDLOG_INFO(" Reader {:d} has no more rows", rc);
}

// Update overall count
lastTotalRowCount += remainingParts[rc]->num_rows();
rc++;
}

// Merge and write tables
if (lastTotalRowCount > 0) {
// Find the least upper bound in sort column across these tables
auto const leastUpperBound = findLeastUpperBound(tables, 0);
auto const leastUpperBound = findLeastUpperBound(remainingParts, 0);

// Now take search "needle" from last row from of table with LUB
auto const lubTable = tables[leastUpperBound].tbl->select({ 0 });
auto const lubTable = remainingParts[leastUpperBound]->select({ 0 });
auto const needle = cudf::split(lubTable, { lubTable.num_rows() - 1 })[1];
auto const tableVectors = splitAtNeedle(needle, tables);

// Split all tables at the needle
std::pair<std::vector<cudf::table_view>, std::vector<cudf::table_view>> const tableVectors =
splitAtNeedle(needle, remainingParts);

// Merge all the upper parts of the tables
SPDLOG_INFO("Merging {:d} rows", lastTotalRowCount);
auto merged = cudf::merge(tableVectors.first, { 0 }, { cudf::order::ASCENDING });
tables.clear();

// Duplicate the unmerged parts of the tables, so we can opportunistically clear the original
// tables we no longer need
for (std::size_t idx = 0; auto &&table : remainingParts) {
table = std::make_unique<cudf::table>(tableVectors.second[idx]);
idx++;
}

writer.write(*merged);

auto const elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(timestamp() - startTime);
auto const rowsWritten = calcRowsWritten(readers);
auto const fracRowsWritten = (static_cast<double>(rowsWritten) / totalRows);
Expand Down
11 changes: 5 additions & 6 deletions cpp/src/cmdline/lub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,21 @@
#include <cudf/utilities/type_dispatcher.hpp>
#include <spdlog/spdlog.h>

#include <memory>
#include <type_traits>
#include <utility>

::size_t findLeastUpperBound(std::vector<cudf::io::table_with_metadata> const &tables, ::size_t const colNo) {
::size_t findLeastUpperBound(std::vector<std::unique_ptr<cudf::table>> const &tables, ::size_t const colNo) {

auto action = [&tables, &colNo]<typename T>() {
using CudfScalarType = cudf::scalar_type_t<T>;
::size_t lubTableIndex = 0;
std::unique_ptr<cudf::scalar> currentLub =
cudf::get_element(tables.front().tbl->get_column(colNo), tables.front().tbl->get_column(colNo).size() - 1);
cudf::get_element(tables.front()->get_column(colNo), tables.front()->get_column(colNo).size() - 1);
// Loop over each table view, grab the last element in the sort column and find the lowest
for (::size_t idx = 0; cudf::io::table_with_metadata const &table : tables) {
for (::size_t idx = 0; std::unique_ptr<cudf::table> const &table : tables) {

std::unique_ptr<cudf::scalar> lastElement =
cudf::get_element(table.tbl->view().column(colNo), table.tbl->view().column(colNo).size() - 1);
cudf::get_element(table->view().column(colNo), table->view().column(colNo).size() - 1);
auto const lub_ptr = static_cast<CudfScalarType *>(currentLub.get());
auto const lastElement_ptr = static_cast<CudfScalarType *>(lastElement.get());

Expand Down Expand Up @@ -59,5 +58,5 @@ ::size_t findLeastUpperBound(std::vector<cudf::io::table_with_metadata> const &t
};

CUDF_EXPECTS(!tables.empty(), "vector of tables cannot be empty");
return cudf::type_dispatcher(tables.front().tbl->get_column(colNo).type(), action);
return cudf::type_dispatcher(tables.front()->get_column(colNo).type(), action);
}
14 changes: 7 additions & 7 deletions cpp/src/cmdline/slice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ int convertInteger(cudf::scalar const &scalar) {
}

std::pair<std::vector<cudf::table_view>, std::vector<cudf::table_view>> splitAtNeedle(cudf::table_view const &needle,
std::vector<cudf::io::table_with_metadata> const &haystacks) {
std::vector<std::unique_ptr<cudf::table>> const &haystacks) {
std::pair<std::vector<cudf::table_view>, std::vector<cudf::table_view>> lists;
std::vector<cudf::table_view> tablesToMerge;
std::vector<cudf::table_view> remainingFragments;
tablesToMerge.reserve(haystacks.size());
Expand All @@ -40,20 +41,19 @@ std::pair<std::vector<cudf::table_view>, std::vector<cudf::table_view>> splitAtN
for (::size_t idx = 0; auto const &table : haystacks) {
// Find needle in each table view, table is "haystack"
std::unique_ptr<cudf::column> splitPoint =
cudf::upper_bound(table.tbl->select({ 0 }), needle, { cudf::order::ASCENDING }, { cudf::null_order::AFTER });
cudf::upper_bound(table->select({ 0 }), needle, { cudf::order::ASCENDING }, { cudf::null_order::AFTER });
CUDF_EXPECTS(splitPoint->size() == 1, "Split result should be single row");
// Get this index back to host
std::unique_ptr<cudf::scalar> splitIndex = cudf::get_element(*splitPoint, 0);
int const splitPos = convertInteger(*splitIndex);
// Now split this table at that index
std::vector<cudf::table_view> splitTables = cudf::split(*table.tbl, { splitPos });
std::vector<cudf::table_view> splitTables = cudf::split(*table, { splitPos });
CUDF_EXPECTS(splitTables.size() == 2, "Should be two tables from split");
SPDLOG_INFO(
"File {:d} Table size after split {:d} and {:d}", idx, splitTables[0].num_rows(), splitTables[1].num_rows());
tablesToMerge.push_back(std::move(splitTables[0]));
remainingFragments.push_back(std::move(splitTables[1]));

lists.first.push_back(std::move(splitTables[0]));
lists.second.push_back(std::move(splitTables[1]));
idx++;
}
return { std::move(tablesToMerge), std::move(remainingFragments) };
return lists;
}

0 comments on commit d3add2b

Please sign in to comment.