Skip to content

Commit

Permalink
Merge pull request #23 from deeplex/dev/fix-legacy-chunk_output_strea…
Browse files Browse the repository at this point in the history
…m-buffer-sync

fix: Allow flushing of legacy chunked_output_stream
  • Loading branch information
BurningEnlightenment authored Feb 19, 2024
2 parents 2d35385 + 49574f3 commit b61aafb
Showing 2 changed files with 200 additions and 31 deletions.
163 changes: 134 additions & 29 deletions src/dplx/dp/legacy/chunked_output_stream.hpp
Original file line number Diff line number Diff line change
@@ -81,12 +81,12 @@ class chunked_output_stream_base : public output_buffer
reset(mCurrentChunk.data(), mCurrentChunk.size());
return outcome::success();
}
mDecomissionThreshold = static_cast<std::int8_t>(size());
reset(static_cast<std::byte *>(mSmallBuffer), small_buffer_size);
if (requestedSize > small_buffer_size)
{
return errc::buffer_size_exceeded;
}
mDecomissionThreshold = static_cast<std::int8_t>(size());
reset(static_cast<std::byte *>(mSmallBuffer), small_buffer_size);
return outcome::success();
}

@@ -96,34 +96,38 @@ class chunked_output_stream_base : public output_buffer
data() - static_cast<std::byte *>(mSmallBuffer));
if (consumedSize < static_cast<std::size_t>(mDecomissionThreshold))
{
if (requestedSize > small_buffer_size)
{
return errc::buffer_size_exceeded;
}
std::memcpy(chunkPart.data(),
static_cast<std::byte *>(mSmallBuffer), consumedSize);

reset(static_cast<std::byte *>(mSmallBuffer), small_buffer_size);
mDecomissionThreshold -= static_cast<std::int8_t>(consumedSize);
if (requestedSize > small_buffer_size)
{
return errc::buffer_size_exceeded;
}
return outcome::success();
}

std::memcpy(chunkPart.data(), static_cast<std::byte *>(mSmallBuffer),
chunkPart.size());

DPLX_TRY(acquire_next_chunk());
if (!chunkPart.empty()) [[likely]]
{
std::memcpy(chunkPart.data(),
static_cast<std::byte *>(mSmallBuffer),
chunkPart.size());
}

auto const overlap = consumedSize
- static_cast<std::size_t>(mDecomissionThreshold);
std::memcpy(
mCurrentChunk.data(),
static_cast<std::byte *>(mSmallBuffer)
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
+ mDecomissionThreshold,
overlap);
if (auto acquireRx = acquire_next_chunk(); acquireRx.has_error())
[[unlikely]]
{
move_remaining_small_buffer_to_front(overlap);
return std::move(acquireRx).assume_error();
}

mCurrentChunk = mCurrentChunk.subspan(overlap);
mDecomissionThreshold = -1;
if (!try_move_small_buffer_to_next_chunk(overlap) && mRemaining == 0)
{
return errc::end_of_stream;
}
return ensure_size(requestedSize);
}
auto do_bulk_write(std::byte const *src, std::size_t writeAmount) noexcept
@@ -137,18 +141,25 @@ class chunked_output_stream_base : public output_buffer
static_cast<std::byte *>(mSmallBuffer),
chunkPart.size());

DPLX_TRY(acquire_next_chunk());
// we can use small_buffer_size here as bulk_write is guaranteed to
// fill the buffer completely
// => small_buffer_size == data() - mSmallBuffer
auto const overlap
= small_buffer_size
- static_cast<std::size_t>(mDecomissionThreshold);

auto const overlap = small_buffer_size
- static_cast<unsigned>(mDecomissionThreshold);
std::memcpy(
mCurrentChunk.data(),
static_cast<std::byte *>(mSmallBuffer)
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
+ chunkPart.size(),
overlap);
mCurrentChunk = mCurrentChunk.subspan(overlap);
mDecomissionThreshold = -1;
if (auto acquireRx = acquire_next_chunk(); acquireRx.has_error())
[[unlikely]]
{
move_remaining_small_buffer_to_front(overlap);
return std::move(acquireRx).assume_error();
}

if (!try_move_small_buffer_to_next_chunk(overlap)) [[unlikely]]
{
DPLX_TRY(acquire_next_chunk());
}
reset();
}
else
{
@@ -177,6 +188,100 @@ class chunked_output_stream_base : public output_buffer
reset(mCurrentChunk.data(), mCurrentChunk.size());
return outcome::success();
}

auto do_sync_output() noexcept -> result<void> override
{
if (mDecomissionThreshold < 0)
{
// small buffer is not in use => nothing to do
return outcome::success();
}

auto const chunkPart = mCurrentChunk.last(
static_cast<std::size_t>(mDecomissionThreshold));
auto const consumedSize = static_cast<std::size_t>(
data() - static_cast<std::byte *>(mSmallBuffer));
if (consumedSize <= static_cast<std::size_t>(mDecomissionThreshold))
{
// written data still does not exceed current chunk
std::memcpy(chunkPart.data(),
static_cast<std::byte *>(mSmallBuffer), consumedSize);
if (consumedSize == static_cast<std::size_t>(mDecomissionThreshold))
{
// avoid acquiring a new chunk as sync_output is usually called
// as a cleanup operation and a new chunk would go to waste
// and/or an illegal operation in case of a pre-sized stream
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
reset(mCurrentChunk.data() + mCurrentChunk.size(), 0U);
mDecomissionThreshold = -1;
}
else
{
reset(static_cast<std::byte *>(mSmallBuffer),
small_buffer_size);
mDecomissionThreshold -= static_cast<std::int8_t>(consumedSize);
}
return outcome::success();
}

std::memcpy(chunkPart.data(), static_cast<std::byte *>(mSmallBuffer),
chunkPart.size());
auto const overlap = consumedSize
- static_cast<std::size_t>(mDecomissionThreshold);
if (auto acquireRx = acquire_next_chunk(); acquireRx.has_error())
[[unlikely]]
{
move_remaining_small_buffer_to_front(overlap);
return std::move(acquireRx).assume_error();
}

if (!try_move_small_buffer_to_next_chunk(overlap))
{
return errc::end_of_stream;
}
return outcome::success();
}

auto
try_move_small_buffer_to_next_chunk(std::size_t const remaining) noexcept
-> bool
{
auto const copyAmount = std::min(remaining, mCurrentChunk.size());
if (mCurrentChunk.data() != nullptr) [[likely]]
{
std::memcpy(
mCurrentChunk.data(),
static_cast<std::byte *>(mSmallBuffer)
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
+ mDecomissionThreshold,
remaining);
mCurrentChunk = mCurrentChunk.subspan(copyAmount);
}
if (remaining != copyAmount) [[unlikely]]
{
mDecomissionThreshold += static_cast<std::int8_t>(copyAmount);
move_remaining_small_buffer_to_front(remaining - copyAmount);
return false;
}
reset(mCurrentChunk.data(), mCurrentChunk.size());
mDecomissionThreshold = -1;
return true;
}

void
move_remaining_small_buffer_to_front(std::size_t const remaining) noexcept
{
std::memmove(
static_cast<std::byte *>(mSmallBuffer),
static_cast<std::byte *>(mSmallBuffer)
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
+ mDecomissionThreshold,
remaining);
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
reset(static_cast<std::byte *>(mSmallBuffer) + remaining,
small_buffer_size - remaining);
mDecomissionThreshold = 0;
}
};

} // namespace dplx::dp::legacy
68 changes: 66 additions & 2 deletions src/dplx/dp/legacy/chunked_output_stream.test.cpp
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@

#include <catch2/catch_test_macros.hpp>

#include "blob_matcher.hpp"
#include "test_utils.hpp"

namespace dp_tests
@@ -37,13 +38,13 @@ class test_legacy_chunked_output_stream final
static constexpr auto partition
= dp::minimum_guaranteed_write_size * 2U - 1U;

explicit test_legacy_chunked_output_stream(unsigned int streamSize)
explicit test_legacy_chunked_output_stream(unsigned streamSize)
: base_type({}, streamSize)
, mChunks()
, mNext(0U)

{
constexpr auto invalidItem = std::byte{0xFEU};
constexpr auto invalidItem = std::byte{0xfeU};
assert(streamSize > partition);

mChunks[0].resize(partition);
@@ -53,6 +54,15 @@ class test_legacy_chunked_output_stream final
std::fill(mChunks[1].begin(), mChunks[1].end(), invalidItem);
}

[[nodiscard]] auto content() const -> std::vector<std::byte>
{
std::vector<std::byte> result;
result.reserve(mChunks[0].size() + mChunks[1].size());
result.insert(result.end(), mChunks[0].begin(), mChunks[0].end());
result.insert(result.end(), mChunks[1].begin(), mChunks[1].end());
return result;
}

private:
auto acquire_next_chunk_impl() -> result<std::span<std::byte>>
{
@@ -79,4 +89,58 @@ TEST_CASE("legacy_chunked_output_stream smoke tests")
CHECK(subject.size() == subject.partition - 2);
}

// NOLINTBEGIN(cppcoreguidelines-pro-bounds-pointer-arithmetic)
TEST_CASE("legacy_chunked_output_stream wraps correctly with do_grow")
{
constexpr unsigned streamSize = dp::minimum_guaranteed_write_size * 4 - 1;
test_legacy_chunked_output_stream subject(streamSize);

REQUIRE(subject.ensure_size(1U));

std::byte buffer[streamSize] = {};
constexpr std::size_t offset
= test_legacy_chunked_output_stream::partition - 1;
REQUIRE(subject.bulk_write(std::span(buffer).first(offset)));

REQUIRE(subject.ensure_size(3U));
auto *out = subject.data();
buffer[offset] = out[0] = std::byte{'a'};
buffer[offset + 1] = out[1] = std::byte{'b'};
buffer[offset + 2] = out[2] = std::byte{'c'};
subject.commit_written(3U);

CHECK(subject.mChunks[0].back() == std::byte{0xfeU});
REQUIRE(subject.sync_output());
CHECK(subject.mChunks[0].back() == std::byte{'a'});

constexpr auto invalidItem = std::byte{0xfeU};
std::ranges::fill_n(static_cast<std::byte *>(buffer) + offset + 3,
streamSize - offset - 3, invalidItem);

REQUIRE_BLOB_EQ(subject.content(), buffer);
}

TEST_CASE("legacy_chunked_output_stream wraps correctly with bulk_write")
{
constexpr unsigned streamSize = dp::minimum_guaranteed_write_size * 4;
test_legacy_chunked_output_stream subject(streamSize);

std::byte buffer[streamSize] = {};
constexpr std::size_t offset1 = 2U;
REQUIRE(subject.bulk_write(std::span(buffer).first(offset1)));
REQUIRE(subject.ensure_size(test_legacy_chunked_output_stream::partition
- 1));
constexpr auto offset2 = dp::minimum_guaranteed_write_size * 3;

REQUIRE(subject.bulk_write(std::span(buffer).first(offset2)));

constexpr auto invalidItem = std::byte{0xfeU};
std::ranges::fill_n(static_cast<std::byte *>(buffer) + offset1 + offset2,
streamSize - (offset1 + offset2), invalidItem);

REQUIRE_BLOB_EQ(subject.content(), buffer);
}

// NOLINTEND(cppcoreguidelines-pro-bounds-pointer-arithmetic)

} // namespace dp_tests

0 comments on commit b61aafb

Please sign in to comment.