diff --git a/CMakeLists.txt b/CMakeLists.txt index 27e0c6a1a..8938c3ce2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -19,7 +19,7 @@ option(BUILD_SHARED_LIBS "build shared library" ON) option(PISTACHE_BUILD_TESTS "build tests alongside the project" OFF) option(PISTACHE_ENABLE_FLAKY_TESTS "if tests are built, also run ones that are known to be flaky" ON) option(PISTACHE_ENABLE_NETWORK_TESTS "if tests are built, run ones needing network access" ON) -option(PISTACHE_USE_SSL "add support for SSL server" OFF) +option(PISTACHE_USE_SSL "add support for SSL server" ON) option(PISTACHE_PIC "Enable pistache PIC" ON) option(PISTACHE_BUILD_FUZZ "Build fuzzer for oss-fuzz" OFF) diff --git a/include/pistache/http.h b/include/pistache/http.h index 8e4313a0c..047e62990 100644 --- a/include/pistache/http.h +++ b/include/pistache/http.h @@ -169,7 +169,10 @@ namespace Pistache } // namespace Uri // Remove when RequestBuilder will be out of namespace Experimental - namespace Experimental {class RequestBuilder; } + namespace Experimental + { + class RequestBuilder; + } // 5. Request class Request : public Message @@ -318,6 +321,9 @@ namespace Pistache std::streamsize write(const char* data, std::streamsize sz); + bool isOpen(); + bool isClosed(); + void flush(); void ends(); diff --git a/src/common/http.cc b/src/common/http.cc index d691e4789..4958c48f3 100644 --- a/src/common/http.cc +++ b/src/common/http.cc @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -737,6 +738,20 @@ namespace Pistache::Http return peer_.lock(); } + bool ResponseStream::isOpen() + { + int error = 0; + socklen_t len = sizeof(error); + int ret = getsockopt(peer()->fd(), SOL_SOCKET, SO_ERROR, &error, &len); + + return ret == 0 && error == 0; + } + + bool ResponseStream::isClosed() + { + return !isOpen(); + } + void ResponseStream::flush() { timeout_.disarm(); diff --git a/src/common/transport.cc b/src/common/transport.cc index 17d6ad738..02ed7e561 100644 --- a/src/common/transport.cc +++ b/src/common/transport.cc @@ -256,7 +256,10 @@ namespace Pistache::Tcp Async::Deferred deferred = std::move(entry.deferred); auto cleanUp = [&]() { - wq.pop_front(); + if (!wq.empty()) + { + wq.pop_front(); + } if (wq.empty()) { toWrite.erase(fd); diff --git a/tests/streaming_test.cc b/tests/streaming_test.cc index 2d2cbcf04..2a4c1dd7a 100644 --- a/tests/streaming_test.cc +++ b/tests/streaming_test.cc @@ -126,7 +126,8 @@ namespace // from // https://stackoverflow.com/questions/6624667/can-i-use-libcurls-curlopt-writefunction-with-a-c11-lambda-expression#14720398 -auto curl_callback = +[](void* ptr, size_t size, size_t nmemb, +typedef size_t (*CURL_WRITEFUNCTION_PTR)(void*, size_t, size_t, void*); +auto curl_callback = [](void* ptr, size_t size, size_t nmemb, void* userdata) -> size_t { auto* chunks = static_cast(userdata); chunks->emplace_back(static_cast(ptr), size * nmemb); @@ -166,7 +167,8 @@ class StreamingTests : public testing::Test url = "http://localhost:" + std::to_string(endpoint.getPort()) + "/"; curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_callback); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, + static_cast(curl_callback)); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &chunks); curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); } @@ -266,44 +268,17 @@ TEST_F(StreamingTests, ChunkedStream) EXPECT_EQ(chunks[2], "!"); } -class ClientDisconnectHandler : public Http::Handler { -public: - HTTP_PROTOTYPE(ClientDisconnectHandler) - - void onRequest(const Http::Request&, Http::ResponseWriter response) override - { - auto stream = response.stream(Http::Code::Ok); - - stream << "Hello "; - stream.flush(); - - std::this_thread::sleep_for(std::chrono::seconds(1)); - - stream << "world"; - stream.flush(); - - std::this_thread::sleep_for(std::chrono::seconds(1)); - - stream << "!"; - stream.ends(); - } -}; - -TEST(StreamingTest, ClientDisconnect) +TEST_F(StreamingTests, ChunkedStreamDisconnect) { - Http::Endpoint endpoint(Address(IP::loopback(), Port(0))); - endpoint.init(Http::Endpoint::options().flags(Tcp::Options::ReuseAddr)); - endpoint.setHandler(Http::make_handler()); - endpoint.serveThreaded(); + SyncContext ctx; - const std::string url = "http://localhost:" + std::to_string(endpoint.getPort()); + // force unbuffered + curl_easy_setopt(curl, CURLOPT_BUFFERSIZE, 1); - std::thread thread([&url]() { - CURL* curl = curl_easy_init(); - curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); - curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); + Init(std::make_shared(ctx)); - CURLM* curlm = curl_multi_init(); + std::thread thread([&]() { + CURLM *multi_handle; int still_running = 1; curl_multi_add_handle(curlm, curl); @@ -315,15 +290,22 @@ TEST(StreamingTest, ClientDisconnect) curl_multi_perform(curlm, &still_running); } + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + // Hard-close the client request & socket before server is done responding - curl_multi_cleanup(curlm); - curl_easy_cleanup(curl); + curl_multi_cleanup(multi_handle); }); + std::unique_lock lk { ctx.m }; + ctx.cv.wait(lk, [&ctx] { return ctx.flag; }); + + //Bad behavior might take a few seconds... + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + if (thread.joinable()) { thread.join(); } - // Don't care about response content, this test will fail if SIGPIPE is raised + // Don't care about response content, this test will fail if SIGINT is raised }