Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions include/boost/http_io/body_read_stream.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//
// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Official repository: https://github.com/vinniefalco/http_io
//

#ifndef BOOST_HTTP_IO_BODY_READ_STREAM_HPP
#define BOOST_HTTP_IO_BODY_READ_STREAM_HPP

#include <boost/http_io/detail/config.hpp>
#include <boost/http_proto/request_parser.hpp>
#include <boost/http_proto/response_parser.hpp>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should be able to get away with just parser.hpp instead of these other two

#include <boost/asio/async_result.hpp>
#include <boost/system/error_code.hpp>
#include <boost/system/result.hpp>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think result.hpp is going to help?

#include <cstdint>

namespace boost {
namespace http_io {

template<class UnderlyingAsyncReadStream>
class body_read_stream {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: add get_executor()


public:
explicit
body_read_stream(
const rts::context& rts_ctx,
UnderlyingAsyncReadStream& und_stream,
http_proto::parser& pr);

template<
class MutableBufferSequence,
BOOST_ASIO_COMPLETION_TOKEN_FOR(
void(system::error_code, std::size_t)) CompletionToken>
BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken,
void(system::error_code, std::size_t))
async_read_some(
const MutableBufferSequence& mb,
CompletionToken&& token);

private:
const rts::context& rts_ctx_;
UnderlyingAsyncReadStream& und_stream_;
http_proto::parser& pr_;
};

} // http_io
} // boost

#include <boost/http_io/impl/body_read_stream.hpp>

#endif
187 changes: 187 additions & 0 deletions include/boost/http_io/impl/body_read_stream.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
//
// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Official repository: https://github.com/vinniefalco/http_io
//

#ifndef BOOST_HTTP_IO_IMPL_BODY_READ_STREAM_HPP
#define BOOST_HTTP_IO_IMPL_BODY_READ_STREAM_HPP

#include <boost/http_io/detail/except.hpp>
#include <boost/http_proto/error.hpp>
#include <boost/http_proto/parser.hpp>
#include <boost/asio/append.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/compose.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/immediate.hpp>
#include <boost/asio/prepend.hpp>
#include <boost/assert.hpp>

namespace boost {
namespace http_io {

namespace detail {

template <class MutableBufferSequence, class UnderlyingAsyncReadStream>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just Stream would suffice instead of UnderlyingAsyncReadStreamToUseForTheNextLayer?

class body_read_stream_op : public asio::coroutine {

UnderlyingAsyncReadStream& underlying_stream_;
const MutableBufferSequence& mb_;
http_proto::parser& pr_;
bool already_have_header_ = false;
bool some_ = false;

public:
body_read_stream_op(
UnderlyingAsyncReadStream& s,
const MutableBufferSequence& mb,
http_proto::parser& pr,
bool some) noexcept
: underlying_stream_(s)
, mb_(mb)
, pr_(pr)
, some_(some)
{
}


template<class Self>
void
operator()(
Self& self,
system::error_code ec = {},
std::size_t bytes_transferred = 0)
{
BOOST_ASIO_CORO_REENTER(*this)
{
if (!pr_.is_complete())
{
for (;;)
{
BOOST_ASIO_CORO_YIELD
{
BOOST_ASIO_HANDLER_LOCATION((
__FILE__, __LINE__,
"async_read_some"));
underlying_stream_.async_read_some(
pr_.prepare(),
std::move(self));
}
pr_.commit(bytes_transferred);
if (ec == asio::error::eof)
{
BOOST_ASSERT(
bytes_transferred == 0);
pr_.commit_eof();
ec = {};
}
else if (ec.failed())
{
break; // genuine error
}
pr_.parse(ec);
if (ec.failed() && ec != http_proto::condition::need_more_input)
{
break; // genuine error.
}
if (already_have_header_) {
if (!ec.failed())
{
BOOST_ASSERT(
pr_.is_complete());
break;
}
if (some_)
{
ec = {};
break;
}
}
if (!already_have_header_ && pr_.got_header())
{
already_have_header_ = true;
ec = {}; // override possible need_more_input
pr_.parse(ec); // having parsed the header, callle parse again for the start of the body.
if (ec.failed() && ec != http_proto::condition::need_more_input)
{
break; // genuine error.
}
if (!ec.failed())
{
BOOST_ASSERT(
pr_.is_complete());
break;
}
}
}
}

auto source_buf = pr_.pull_body();

std::size_t n = boost::asio::buffer_copy(mb_, source_buf);

pr_.consume_body(n);

ec = (n != 0) ? system::error_code{} : asio::stream_errc::eof;

// for some reason this crashes - not sure yet why:
//asio::dispatch(
// asio::get_associated_executor(underlying_stream_),
// asio::prepend(std::move(self), ec, n));

self.complete(ec, n); // TODO - work out the byte count
}
}
};

} // detail

//------------------------------------------------

// TODO: copy in Beast's stream traits to check if UnderlyingAsyncReadStream
// is an AsyncReadStream, and also static_assert that body_read_stream is too.



template<class UnderlyingAsyncReadStream>
body_read_stream<UnderlyingAsyncReadStream>::body_read_stream(
const rts::context& rts_ctx
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the context is needed

, UnderlyingAsyncReadStream& und_stream
, http_proto::parser& pr)
:
rts_ctx_(rts_ctx)
, und_stream_(und_stream)
, pr_(pr)
{
}


template<class UnderlyingAsyncReadStream>
template<
class MutableBufferSequence,
BOOST_ASIO_COMPLETION_TOKEN_FOR(
void(system::error_code, std::size_t)) CompletionToken>
BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken,
void(system::error_code, std::size_t))
body_read_stream<UnderlyingAsyncReadStream>::async_read_some(
const MutableBufferSequence& mb
, CompletionToken&& token)
{
return asio::async_compose<
CompletionToken,
void(system::error_code, std::size_t)>(
detail::body_read_stream_op<
MutableBufferSequence, UnderlyingAsyncReadStream>{und_stream_, mb, pr_, true},
token,
asio::get_associated_executor(und_stream_)
);
}

} // http_io
} // boost

#endif
1 change: 1 addition & 0 deletions test/unit/Jamfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ project
;

local SOURCES =
body_read_stream.cpp
buffer.cpp
read.cpp
sandbox.cpp
Expand Down
140 changes: 140 additions & 0 deletions test/unit/body_read_stream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
//
// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Official repository: https://github.com/vinniefalco/http_io
//

#include <boost/http_io/body_read_stream.hpp>

#include <boost/asio/async_result.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/buffers/buffer.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/prepend.hpp>
#include <boost/asio/post.hpp>
#include <boost/static_assert.hpp>
#include <boost/rts/context.hpp>
#include <boost/utility/string_view.hpp>

#include "test_suite.hpp"

#include <iostream>

namespace boost {

template<class Executor>
struct MockReadStream {
MockReadStream(Executor& ex, const std::string &data) : ex_(ex), mock_data_(data), sent_(0)
{
}

Executor get_executor() const { return ex_; }

//template <typename Token> auto async_write_some(asio::const_buffer buf, Token&& token) {
// return asio::async_initiate<Token, void(system::error_code, size_t)>( //
// [&ex_](auto h, auto buf) {
// asio::dispatch(ex_, [=, h = std::move(h)]() mutable {
// std::move(h)({}, asio::buffer_size(buf));
// });
// },
// token, buf);
//}

template<
class MutableBufferSequence,
BOOST_ASIO_COMPLETION_TOKEN_FOR(
void(system::error_code, std::size_t)) CompletionToken>
BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken,
void(system::error_code, std::size_t))
async_read_some(
const MutableBufferSequence& buf
, CompletionToken&& token)
{
return asio::async_initiate<CompletionToken, void(system::error_code, size_t)>(
[this](
CompletionToken&& token
, const MutableBufferSequence& buf)
{
boost::string_view source_str{
mock_data_.data() + sent_,
mock_data_.size() - sent_ };
auto source_buf = asio::buffer(source_str);

std::size_t chunk_size = rand() % mock_data_.size() + 1 + 100;

std::size_t n = asio::buffer_copy(buf, source_buf, chunk_size);

system::error_code ec = (n != 0) ? system::error_code{} : asio::stream_errc::eof;

sent_ += n;

std::move(token)(ec, n);
//asio::post(ex_, asio::prepend(std::move(token), ec, n));
},
token, buf);
}

Executor& ex_;
std::string mock_data_;
std::size_t sent_;
};

namespace http_io {

struct body_read_stream_test
{
void
run()
{
std::string data = "HTTP/1.1 200 OK\r\n"
"Content-Type: text/html\r\n"
"Last-Modified: Thu, 09 Oct 2025 16:42:02 GMT\r\n"
"Cache-Control: max-age=86000\r\n"
"Date: Thu, 16 Oct 2025 15:09:10 GMT\r\n"
"Content-Length: 60\r\n"
"Connection: keep-alive\r\n"
"\r\n"
"<!doctype html><html><head><title>Hello World</title></html>\r\n";

asio::io_context ioc;
MockReadStream ms(ioc, data);

std::array<char, 40> arr;
auto buf = asio::buffer(arr);

rts::context rts_ctx;
http_proto::response_parser::config cfg;
cfg.body_limit = 1024 * 1024;
cfg.min_buffer = 1024 * 1024;
http_proto::install_parser_service(rts_ctx, cfg);
http_proto::response_parser pr(rts_ctx);
pr.reset();
pr.start();
body_read_stream brs(rts_ctx, ms, pr);

brs.async_read_some(buf,
[this, &brs, &arr, &buf](system::error_code ec, std::size_t bytes_transferred)
{
std::cout << "Error: " << ec.failed() << " bytes " << bytes_transferred << std::endl;
std::cout << std::string(arr.data(), bytes_transferred) << std::endl;

brs.async_read_some(buf,
[this, &brs, &arr, &buf](system::error_code ec, std::size_t bytes_transferred)
{
std::cout << "2nd Error: " << ec.failed() << " bytes " << bytes_transferred << std::endl;
std::cout << std::string(arr.data(), bytes_transferred) << std::endl;
});
});
ioc.run();
}
};

TEST_SUITE(
body_read_stream_test,
"boost.http_io.body_read_stream");

Copy link
Collaborator

@ashtum ashtum Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MungoG, now that we have test::stream available, we could simplify the unit test to something like this. I'm not sure how much of the existing source code it covers, but it should be a good starting point.

struct body_read_stream_test
{
    core::string_view const msg =
        "HTTP/1.1 200 OK\r\n"
        "Content-Length: 3\r\n"
        "\r\n"
        "abc";

public:
    void
    testAsyncReadSome()
    {
        boost::asio::io_context ioc;
        rts::context rts_ctx;
        http_proto::install_parser_service(rts_ctx, {});

        test::stream ts(ioc, msg);
        http_proto::response_parser pr(rts_ctx);
        pr.reset();
        pr.start();

        // limit async_read_some for better coverage
        ts.read_size(1);

        body_read_stream<test::stream> brs(ts, pr);

        char storage[8];
        buffers::flat_buffer buf(storage, sizeof(storage));

        // read 1 octet of body
        brs.async_read_some(
            buf.prepare(buf.capacity()),
            [&](system::error_code ec, std::size_t n)
            {
                BOOST_TEST(! ec.failed());
                BOOST_TEST_EQ(n, 1);
                buf.commit(n);
            });
        test::run(ioc);
        BOOST_TEST(pr.got_header());
        BOOST_TEST(! pr.is_complete());
        BOOST_TEST_EQ(
            buffers::size(pr.pull_body()), 0);

        // read the remaining body
        // asio::async_read repeatedly calls async_read_some
        // until all supplied buffers are filled, or an error occurs.
        asio::async_read(
            brs,
            buf.prepare(buf.capacity()),
            [&](system::error_code ec, std::size_t n)
            {
                BOOST_TEST_EQ(ec, asio::error::eof);
                BOOST_TEST_EQ(n, 2);
                buf.commit(n);
            });
        test::run(ioc);
        BOOST_TEST(pr.got_header());
        BOOST_TEST(pr.is_complete());
        // no body octets should remain
        BOOST_TEST_EQ(
            buffers::size(pr.pull_body()), 0);
        BOOST_TEST(
            core::string_view(
                static_cast<const char*>(buf.data().data()),
                buf.data().size()) == "abc");
    }

    void
    run()
    {
        testAsyncReadSome();
    }
};

} // http_io
} // boost
Loading