From 49574f36a9f22bb95dc9df5b1cb4018b2bd7b66d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20S=2E=20Ga=C3=9Fmann?= Date: Sun, 18 Feb 2024 14:06:23 +0100 Subject: [PATCH] fix: Allow flushing of legacy chunked_output_stream --- src/dplx/dp/legacy/chunked_output_stream.hpp | 163 ++++++++++++++---- .../dp/legacy/chunked_output_stream.test.cpp | 68 +++++++- 2 files changed, 200 insertions(+), 31 deletions(-) diff --git a/src/dplx/dp/legacy/chunked_output_stream.hpp b/src/dplx/dp/legacy/chunked_output_stream.hpp index e565b43..eee2c6f 100644 --- a/src/dplx/dp/legacy/chunked_output_stream.hpp +++ b/src/dplx/dp/legacy/chunked_output_stream.hpp @@ -81,12 +81,12 @@ class chunked_output_stream_base : public output_buffer reset(mCurrentChunk.data(), mCurrentChunk.size()); return outcome::success(); } + mDecomissionThreshold = static_cast(size()); + reset(static_cast(mSmallBuffer), small_buffer_size); if (requestedSize > small_buffer_size) { return errc::buffer_size_exceeded; } - mDecomissionThreshold = static_cast(size()); - reset(static_cast(mSmallBuffer), small_buffer_size); return outcome::success(); } @@ -96,34 +96,38 @@ class chunked_output_stream_base : public output_buffer data() - static_cast(mSmallBuffer)); if (consumedSize < static_cast(mDecomissionThreshold)) { - if (requestedSize > small_buffer_size) - { - return errc::buffer_size_exceeded; - } std::memcpy(chunkPart.data(), static_cast(mSmallBuffer), consumedSize); reset(static_cast(mSmallBuffer), small_buffer_size); mDecomissionThreshold -= static_cast(consumedSize); + if (requestedSize > small_buffer_size) + { + return errc::buffer_size_exceeded; + } return outcome::success(); } - std::memcpy(chunkPart.data(), static_cast(mSmallBuffer), - chunkPart.size()); - - DPLX_TRY(acquire_next_chunk()); + if (!chunkPart.empty()) [[likely]] + { + std::memcpy(chunkPart.data(), + static_cast(mSmallBuffer), + chunkPart.size()); + } auto const overlap = consumedSize - static_cast(mDecomissionThreshold); - std::memcpy( - mCurrentChunk.data(), - static_cast(mSmallBuffer) - // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) - + mDecomissionThreshold, - overlap); + if (auto acquireRx = acquire_next_chunk(); acquireRx.has_error()) + [[unlikely]] + { + move_remaining_small_buffer_to_front(overlap); + return std::move(acquireRx).assume_error(); + } - mCurrentChunk = mCurrentChunk.subspan(overlap); - mDecomissionThreshold = -1; + if (!try_move_small_buffer_to_next_chunk(overlap) && mRemaining == 0) + { + return errc::end_of_stream; + } return ensure_size(requestedSize); } auto do_bulk_write(std::byte const *src, std::size_t writeAmount) noexcept @@ -137,18 +141,25 @@ class chunked_output_stream_base : public output_buffer static_cast(mSmallBuffer), chunkPart.size()); - DPLX_TRY(acquire_next_chunk()); + // we can use small_buffer_size here as bulk_write is guaranteed to + // fill the buffer completely + // => small_buffer_size == data() - mSmallBuffer + auto const overlap + = small_buffer_size + - static_cast(mDecomissionThreshold); - auto const overlap = small_buffer_size - - static_cast(mDecomissionThreshold); - std::memcpy( - mCurrentChunk.data(), - static_cast(mSmallBuffer) - // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) - + chunkPart.size(), - overlap); - mCurrentChunk = mCurrentChunk.subspan(overlap); - mDecomissionThreshold = -1; + if (auto acquireRx = acquire_next_chunk(); acquireRx.has_error()) + [[unlikely]] + { + move_remaining_small_buffer_to_front(overlap); + return std::move(acquireRx).assume_error(); + } + + if (!try_move_small_buffer_to_next_chunk(overlap)) [[unlikely]] + { + DPLX_TRY(acquire_next_chunk()); + } + reset(); } else { @@ -177,6 +188,100 @@ class chunked_output_stream_base : public output_buffer reset(mCurrentChunk.data(), mCurrentChunk.size()); return outcome::success(); } + + auto do_sync_output() noexcept -> result override + { + if (mDecomissionThreshold < 0) + { + // small buffer is not in use => nothing to do + return outcome::success(); + } + + auto const chunkPart = mCurrentChunk.last( + static_cast(mDecomissionThreshold)); + auto const consumedSize = static_cast( + data() - static_cast(mSmallBuffer)); + if (consumedSize <= static_cast(mDecomissionThreshold)) + { + // written data still does not exceed current chunk + std::memcpy(chunkPart.data(), + static_cast(mSmallBuffer), consumedSize); + if (consumedSize == static_cast(mDecomissionThreshold)) + { + // avoid acquiring a new chunk as sync_output is usually called + // as a cleanup operation and a new chunk would go to waste + // and/or an illegal operation in case of a pre-sized stream + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) + reset(mCurrentChunk.data() + mCurrentChunk.size(), 0U); + mDecomissionThreshold = -1; + } + else + { + reset(static_cast(mSmallBuffer), + small_buffer_size); + mDecomissionThreshold -= static_cast(consumedSize); + } + return outcome::success(); + } + + std::memcpy(chunkPart.data(), static_cast(mSmallBuffer), + chunkPart.size()); + auto const overlap = consumedSize + - static_cast(mDecomissionThreshold); + if (auto acquireRx = acquire_next_chunk(); acquireRx.has_error()) + [[unlikely]] + { + move_remaining_small_buffer_to_front(overlap); + return std::move(acquireRx).assume_error(); + } + + if (!try_move_small_buffer_to_next_chunk(overlap)) + { + return errc::end_of_stream; + } + return outcome::success(); + } + + auto + try_move_small_buffer_to_next_chunk(std::size_t const remaining) noexcept + -> bool + { + auto const copyAmount = std::min(remaining, mCurrentChunk.size()); + if (mCurrentChunk.data() != nullptr) [[likely]] + { + std::memcpy( + mCurrentChunk.data(), + static_cast(mSmallBuffer) + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) + + mDecomissionThreshold, + remaining); + mCurrentChunk = mCurrentChunk.subspan(copyAmount); + } + if (remaining != copyAmount) [[unlikely]] + { + mDecomissionThreshold += static_cast(copyAmount); + move_remaining_small_buffer_to_front(remaining - copyAmount); + return false; + } + reset(mCurrentChunk.data(), mCurrentChunk.size()); + mDecomissionThreshold = -1; + return true; + } + + void + move_remaining_small_buffer_to_front(std::size_t const remaining) noexcept + { + std::memmove( + static_cast(mSmallBuffer), + static_cast(mSmallBuffer) + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) + + mDecomissionThreshold, + remaining); + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) + reset(static_cast(mSmallBuffer) + remaining, + small_buffer_size - remaining); + mDecomissionThreshold = 0; + } }; } // namespace dplx::dp::legacy diff --git a/src/dplx/dp/legacy/chunked_output_stream.test.cpp b/src/dplx/dp/legacy/chunked_output_stream.test.cpp index 1b5785a..1b47241 100644 --- a/src/dplx/dp/legacy/chunked_output_stream.test.cpp +++ b/src/dplx/dp/legacy/chunked_output_stream.test.cpp @@ -12,6 +12,7 @@ #include +#include "blob_matcher.hpp" #include "test_utils.hpp" namespace dp_tests @@ -37,13 +38,13 @@ class test_legacy_chunked_output_stream final static constexpr auto partition = dp::minimum_guaranteed_write_size * 2U - 1U; - explicit test_legacy_chunked_output_stream(unsigned int streamSize) + explicit test_legacy_chunked_output_stream(unsigned streamSize) : base_type({}, streamSize) , mChunks() , mNext(0U) { - constexpr auto invalidItem = std::byte{0xFEU}; + constexpr auto invalidItem = std::byte{0xfeU}; assert(streamSize > partition); mChunks[0].resize(partition); @@ -53,6 +54,15 @@ class test_legacy_chunked_output_stream final std::fill(mChunks[1].begin(), mChunks[1].end(), invalidItem); } + [[nodiscard]] auto content() const -> std::vector + { + std::vector result; + result.reserve(mChunks[0].size() + mChunks[1].size()); + result.insert(result.end(), mChunks[0].begin(), mChunks[0].end()); + result.insert(result.end(), mChunks[1].begin(), mChunks[1].end()); + return result; + } + private: auto acquire_next_chunk_impl() -> result> { @@ -79,4 +89,58 @@ TEST_CASE("legacy_chunked_output_stream smoke tests") CHECK(subject.size() == subject.partition - 2); } +// NOLINTBEGIN(cppcoreguidelines-pro-bounds-pointer-arithmetic) +TEST_CASE("legacy_chunked_output_stream wraps correctly with do_grow") +{ + constexpr unsigned streamSize = dp::minimum_guaranteed_write_size * 4 - 1; + test_legacy_chunked_output_stream subject(streamSize); + + REQUIRE(subject.ensure_size(1U)); + + std::byte buffer[streamSize] = {}; + constexpr std::size_t offset + = test_legacy_chunked_output_stream::partition - 1; + REQUIRE(subject.bulk_write(std::span(buffer).first(offset))); + + REQUIRE(subject.ensure_size(3U)); + auto *out = subject.data(); + buffer[offset] = out[0] = std::byte{'a'}; + buffer[offset + 1] = out[1] = std::byte{'b'}; + buffer[offset + 2] = out[2] = std::byte{'c'}; + subject.commit_written(3U); + + CHECK(subject.mChunks[0].back() == std::byte{0xfeU}); + REQUIRE(subject.sync_output()); + CHECK(subject.mChunks[0].back() == std::byte{'a'}); + + constexpr auto invalidItem = std::byte{0xfeU}; + std::ranges::fill_n(static_cast(buffer) + offset + 3, + streamSize - offset - 3, invalidItem); + + REQUIRE_BLOB_EQ(subject.content(), buffer); +} + +TEST_CASE("legacy_chunked_output_stream wraps correctly with bulk_write") +{ + constexpr unsigned streamSize = dp::minimum_guaranteed_write_size * 4; + test_legacy_chunked_output_stream subject(streamSize); + + std::byte buffer[streamSize] = {}; + constexpr std::size_t offset1 = 2U; + REQUIRE(subject.bulk_write(std::span(buffer).first(offset1))); + REQUIRE(subject.ensure_size(test_legacy_chunked_output_stream::partition + - 1)); + constexpr auto offset2 = dp::minimum_guaranteed_write_size * 3; + + REQUIRE(subject.bulk_write(std::span(buffer).first(offset2))); + + constexpr auto invalidItem = std::byte{0xfeU}; + std::ranges::fill_n(static_cast(buffer) + offset1 + offset2, + streamSize - (offset1 + offset2), invalidItem); + + REQUIRE_BLOB_EQ(subject.content(), buffer); +} + +// NOLINTEND(cppcoreguidelines-pro-bounds-pointer-arithmetic) + } // namespace dp_tests