Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ option(BUILD_SHARED_LIBS "build shared library" ON)
option(PISTACHE_BUILD_TESTS "build tests alongside the project" OFF)
option(PISTACHE_ENABLE_FLAKY_TESTS "if tests are built, also run ones that are known to be flaky" ON)
option(PISTACHE_ENABLE_NETWORK_TESTS "if tests are built, run ones needing network access" ON)
option(PISTACHE_USE_SSL "add support for SSL server" OFF)
option(PISTACHE_USE_SSL "add support for SSL server" ON)
option(PISTACHE_PIC "Enable pistache PIC" ON)
option(PISTACHE_BUILD_FUZZ "Build fuzzer for oss-fuzz" OFF)

Expand Down
8 changes: 7 additions & 1 deletion include/pistache/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,10 @@ namespace Pistache
} // namespace Uri

// Remove when RequestBuilder will be out of namespace Experimental
namespace Experimental {class RequestBuilder; }
namespace Experimental
{
class RequestBuilder;
}

// 5. Request
class Request : public Message
Expand Down Expand Up @@ -318,6 +321,9 @@ namespace Pistache

std::streamsize write(const char* data, std::streamsize sz);

bool isOpen();
bool isClosed();

void flush();
void ends();

Expand Down
15 changes: 15 additions & 0 deletions src/common/http.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <unordered_map>

#include <fcntl.h>
#include <sys/ioctl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
Expand Down Expand Up @@ -737,6 +738,20 @@ namespace Pistache::Http
return peer_.lock();
}

bool ResponseStream::isOpen()
{
int error = 0;
socklen_t len = sizeof(error);
int ret = getsockopt(peer()->fd(), SOL_SOCKET, SO_ERROR, &error, &len);

return ret == 0 && error == 0;
}

bool ResponseStream::isClosed()
{
return !isOpen();
}

void ResponseStream::flush()
{
timeout_.disarm();
Expand Down
5 changes: 4 additions & 1 deletion src/common/transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,10 @@ namespace Pistache::Tcp
Async::Deferred<ssize_t> deferred = std::move(entry.deferred);

auto cleanUp = [&]() {
wq.pop_front();
if (!wq.empty())
{
wq.pop_front();
}
if (wq.empty())
{
toWrite.erase(fd);
Expand Down
60 changes: 21 additions & 39 deletions tests/streaming_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ namespace

// from
// https://stackoverflow.com/questions/6624667/can-i-use-libcurls-curlopt-writefunction-with-a-c11-lambda-expression#14720398
auto curl_callback = +[](void* ptr, size_t size, size_t nmemb,
typedef size_t (*CURL_WRITEFUNCTION_PTR)(void*, size_t, size_t, void*);
auto curl_callback = [](void* ptr, size_t size, size_t nmemb,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change needed?

void* userdata) -> size_t {
auto* chunks = static_cast<Chunks*>(userdata);
chunks->emplace_back(static_cast<char*>(ptr), size * nmemb);
Expand Down Expand Up @@ -166,7 +167,8 @@ class StreamingTests : public testing::Test
url = "http://localhost:" + std::to_string(endpoint.getPort()) + "/";

curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_callback);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
static_cast<CURL_WRITEFUNCTION_PTR>(curl_callback));
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &chunks);
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
}
Expand Down Expand Up @@ -266,44 +268,17 @@ TEST_F(StreamingTests, ChunkedStream)
EXPECT_EQ(chunks[2], "!");
}

class ClientDisconnectHandler : public Http::Handler {
public:
HTTP_PROTOTYPE(ClientDisconnectHandler)

void onRequest(const Http::Request&, Http::ResponseWriter response) override
{
auto stream = response.stream(Http::Code::Ok);

stream << "Hello ";
stream.flush();

std::this_thread::sleep_for(std::chrono::seconds(1));

stream << "world";
stream.flush();

std::this_thread::sleep_for(std::chrono::seconds(1));

stream << "!";
stream.ends();
}
};

TEST(StreamingTest, ClientDisconnect)
TEST_F(StreamingTests, ChunkedStreamDisconnect)
{
Http::Endpoint endpoint(Address(IP::loopback(), Port(0)));
endpoint.init(Http::Endpoint::options().flags(Tcp::Options::ReuseAddr));
endpoint.setHandler(Http::make_handler<ClientDisconnectHandler>());
endpoint.serveThreaded();
SyncContext ctx;

const std::string url = "http://localhost:" + std::to_string(endpoint.getPort());
// force unbuffered
curl_easy_setopt(curl, CURLOPT_BUFFERSIZE, 1);

std::thread thread([&url]() {
CURL* curl = curl_easy_init();
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
Init(std::make_shared<HelloHandler>(ctx));

CURLM* curlm = curl_multi_init();
std::thread thread([&]() {
CURLM *multi_handle;
int still_running = 1;
curl_multi_add_handle(curlm, curl);

Expand All @@ -315,15 +290,22 @@ TEST(StreamingTest, ClientDisconnect)
curl_multi_perform(curlm, &still_running);
}

std::this_thread::sleep_for(std::chrono::milliseconds(200));

// Hard-close the client request & socket before server is done responding
curl_multi_cleanup(curlm);
curl_easy_cleanup(curl);
curl_multi_cleanup(multi_handle);
});

std::unique_lock<std::mutex> lk { ctx.m };
ctx.cv.wait(lk, [&ctx] { return ctx.flag; });

//Bad behavior might take a few seconds...
std::this_thread::sleep_for(std::chrono::milliseconds(2000));

if (thread.joinable())
{
thread.join();
}

// Don't care about response content, this test will fail if SIGPIPE is raised
// Don't care about response content, this test will fail if SIGINT is raised
}