Skip to content

Commit

Permalink
Asio+Beast-based endpoint server (faasm#274)
Browse files Browse the repository at this point in the history
* Update dependency versions

* boost::beast and asio-based asynchronous endpoint implementation

* Address review comments

* Remove Pistache as it is no longer used

* Fix TSan-detected data races in distributed tests

* Turn off MPI all-to-all disttests, see <faasm#275>

* Run clang-format-13
  • Loading branch information
eigenraven authored Oct 12, 2022
1 parent be18960 commit d686b8e
Show file tree
Hide file tree
Showing 18 changed files with 653 additions and 229 deletions.
46 changes: 18 additions & 28 deletions cmake/ExternalProjects.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ list(PREPEND CMAKE_PREFIX_PATH ${CMAKE_CURRENT_BINARY_DIR})

if(NOT EXISTS "${CMAKE_CURRENT_BINARY_DIR}/conan.cmake")
message(STATUS "Downloading conan.cmake from https://github.com/conan-io/cmake-conan")
file(DOWNLOAD "https://raw.githubusercontent.com/conan-io/cmake-conan/v0.16.1/conan.cmake"
file(DOWNLOAD "https://raw.githubusercontent.com/conan-io/cmake-conan/0.18.1/conan.cmake"
"${CMAKE_CURRENT_BINARY_DIR}/conan.cmake"
EXPECTED_HASH SHA256=396e16d0f5eabdc6a14afddbcfff62a54a7ee75c6da23f32f7a31bc85db23484
TLS_VERIFY ON)
endif()

include(${CMAKE_CURRENT_BINARY_DIR}/conan.cmake)

conan_check(VERSION 1.43.0 REQUIRED)
conan_check(VERSION 1.52.0 REQUIRED)

# Enable revisions in the conan config
execute_process(COMMAND ${CONAN_CMD} config set general.revisions_enabled=1
Expand All @@ -29,18 +28,18 @@ endif()

conan_cmake_configure(
REQUIRES
"boost/1.77.0@#d0be0b4b04a551f5d49ac540e59f51bd"
"catch2/2.13.7@#31c8cd08e3c957a9eac8cb1377cf5863"
"boost/1.80.0@#db5db5bd811d23b95089d4a95259d147"
"catch2/2.13.9@#8793d3e6287d3684201418de556d98fe"
"cppcodec/0.2@#f6385611ce2f7cff954ac8b16e25c4fa"
"cpprestsdk/2.10.18@#36e30936126a3da485ce05d619fb1249"
"cppzmq/4.8.1@#e0f26b0614b3d812815edc102ce0d881"
"flatbuffers/2.0.0@#82f5d13594b370c3668bb8abccffc706"
"hiredis/1.0.2@#297f55bf1e66f8b9c1dc0e7d35e705ab"
"cpprestsdk/2.10.18@#ed9788e9d202d6eadd92581368ddfc2f"
"cppzmq/4.8.1@#010df8fa1c5ebbc615704e8c16693bac"
"flatbuffers/2.0.5@#c6a9508bd476da080f7aecbe7a094b68"
"hiredis/1.0.2@#370dad964286cadb1f15dc90252e8ef3"
"protobuf/3.20.0@#8e4de7081bea093469c9e6076149b2b4"
"rapidjson/cci.20200410@#abe3eeacf36801901f6f6d82d124781a"
"readerwriterqueue/1.0.5@#4232c2ff826eb41e33d8ad8efd3c4c4c"
"spdlog/1.9.2@#3724602b7b7e843c5e0a687c45e279c9"
"zeromq/4.3.4@#3b9b0de9c4509784dc92629f3aaf2fe4"
"rapidjson/cci.20211112@#65b4e5feb6f1edfc8cbac0f669acaf17"
"readerwriterqueue/1.0.6@#a95c8da3d68822dec4d4c13fff4b5c96"
"spdlog/1.10.0@#6406c337028e15e56cd6a070cbac54c4"
"zeromq/4.3.4@#d4fe4001f6c2e5960e58c251687c5b2f"
"zlib/1.2.12@#3b9e037ae1c615d045a06c67d88491ae"
GENERATORS
cmake_find_package
Expand Down Expand Up @@ -79,10 +78,10 @@ conan_cmake_install(PATH_OR_REFERENCE .

include(${CMAKE_CURRENT_BINARY_DIR}/conan_paths.cmake)

find_package(Boost 1.77.0 REQUIRED)
find_package(Boost 1.79.0 REQUIRED)
find_package(Catch2 REQUIRED)
find_package(Flatbuffers REQUIRED)
find_package(Protobuf REQUIRED)
find_package(FlatBuffers REQUIRED)
find_package(Protobuf 3.20.0 REQUIRED)
find_package(RapidJSON REQUIRED)
find_package(ZLIB REQUIRED)
find_package(ZeroMQ REQUIRED)
Expand All @@ -94,15 +93,6 @@ find_package(hiredis REQUIRED)
find_package(spdlog REQUIRED)
find_package(readerwriterqueue REQUIRED)

# Pistache - Conan version is out of date and doesn't support clang
FetchContent_Declare(pistache_ext
GIT_REPOSITORY "https://github.com/pistacheio/pistache.git"
GIT_TAG "ff9db0d9439a4411b24541d97a937968f384a4d3"
)

FetchContent_MakeAvailable(pistache_ext)
add_library(pistache::pistache ALIAS pistache_static)

# zstd (Conan version not customizable enough)
set(ZSTD_BUILD_CONTRIB OFF CACHE INTERNAL "")
set(ZSTD_BUILD_CONTRIB OFF CACHE INTERNAL "")
Expand All @@ -120,13 +110,13 @@ set(ZSTD_LZ4_SUPPORT OFF CACHE INTERNAL "")

FetchContent_Declare(zstd_ext
GIT_REPOSITORY "https://github.com/facebook/zstd"
GIT_TAG "v1.5.0"
GIT_TAG "v1.5.2"
SOURCE_SUBDIR "build/cmake"
)

FetchContent_MakeAvailable(zstd_ext)
# Work around zstd not declaring its targets properly
target_include_directories(libzstd_static INTERFACE $<BUILD_INTERFACE:${zstd_ext_SOURCE_DIR}/lib>)
target_include_directories(libzstd_static SYSTEM INTERFACE $<BUILD_INTERFACE:${zstd_ext_SOURCE_DIR}/lib>)
add_library(zstd::libzstd_static ALIAS libzstd_static)

# Group all external dependencies into a convenient virtual CMake library
Expand All @@ -143,7 +133,6 @@ target_link_libraries(faabric_common_dependencies INTERFACE
cppzmq::cppzmq
flatbuffers::flatbuffers
hiredis::hiredis
pistache::pistache
protobuf::libprotobuf
RapidJSON::RapidJSON
readerwriterqueue::readerwriterqueue
Expand All @@ -153,5 +142,6 @@ target_link_libraries(faabric_common_dependencies INTERFACE
)
target_compile_definitions(faabric_common_dependencies INTERFACE
FMT_DEPRECATED= # Suppress warnings about use of deprecated api by spdlog
BOOST_NO_TYPEID=1 # Prevent odd crashes within asio implementation
)
add_library(faabric::common_dependencies ALIAS faabric_common_dependencies)
53 changes: 41 additions & 12 deletions include/faabric/endpoint/FaabricEndpoint.h
Original file line number Diff line number Diff line change
@@ -1,37 +1,66 @@
#pragma once

#include <pistache/endpoint.h>
#include <pistache/http.h>
#include <functional>
#include <memory>

#include <faabric/proto/faabric.pb.h>
#include <faabric/util/asio.h>
#include <faabric/util/config.h>

namespace faabric::endpoint {

enum EndpointMode
enum class EndpointMode
{
SIGNAL,
BG_THREAD
};

namespace detail {
struct EndpointState;
}

struct HttpRequestContext
{
asio::io_context& ioc;
asio::any_io_executor executor;
std::function<void(faabric::util::BeastHttpResponse&&)> sendFunction;
};

class HttpRequestHandler
{
public:
virtual void onRequest(HttpRequestContext&& ctx,
faabric::util::BeastHttpRequest&& request) = 0;
};

class FaabricEndpoint
{
public:
FaabricEndpoint();

FaabricEndpoint(int portIn, int threadCountIn);
FaabricEndpoint(
int port,
int threadCount,
std::shared_ptr<HttpRequestHandler> requestHandlerIn = nullptr);

void start(EndpointMode mode);
FaabricEndpoint(const FaabricEndpoint&) = delete;

void stop();
FaabricEndpoint(FaabricEndpoint&&) = delete;

private:
int port = faabric::util::getSystemConfig().endpointPort;
int threadCount = faabric::util::getSystemConfig().endpointNumThreads;
FaabricEndpoint& operator=(const FaabricEndpoint&) = delete;

FaabricEndpoint& operator=(FaabricEndpoint&&) = delete;

Pistache::Http::Endpoint httpEndpoint;
virtual ~FaabricEndpoint();

std::mutex mx;
void start(EndpointMode mode = EndpointMode::SIGNAL);

void runEndpoint();
void stop();

private:
int port;
int threadCount;
std::unique_ptr<detail::EndpointState> state;
std::shared_ptr<HttpRequestHandler> requestHandler;
};
}
26 changes: 14 additions & 12 deletions include/faabric/endpoint/FaabricEndpointHandler.h
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
#pragma once

#include <faabric/endpoint/FaabricEndpoint.h>
#include <faabric/proto/faabric.pb.h>
#include <pistache/http.h>

namespace faabric::endpoint {
class FaabricEndpointHandler : public Pistache::Http::Handler
class FaabricEndpointHandler final
: public HttpRequestHandler
, public std::enable_shared_from_this<FaabricEndpointHandler>
{
public:
HTTP_PROTOTYPE(FaabricEndpointHandler)

void onTimeout(const Pistache::Http::Request& request,
Pistache::Http::ResponseWriter writer) override;

void onRequest(const Pistache::Http::Request& request,
Pistache::Http::ResponseWriter response) override;

std::pair<int, std::string> handleFunction(const std::string& requestStr);
void onRequest(HttpRequestContext&& ctx,
faabric::util::BeastHttpRequest&& request) override;

private:
std::pair<int, std::string> executeFunction(faabric::Message& msg);
void executeFunction(HttpRequestContext&& ctx,
faabric::util::BeastHttpResponse&& partialResponse,
std::shared_ptr<faabric::BatchExecuteRequest> ber,
size_t messageIndex);

void onFunctionResult(HttpRequestContext&& ctx,
faabric::util::BeastHttpResponse&& partialResponse,
faabric::Message& msg);
};
}
11 changes: 0 additions & 11 deletions include/faabric/endpoint/macros.h

This file was deleted.

44 changes: 42 additions & 2 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <faabric/snapshot/SnapshotRegistry.h>
#include <faabric/transport/PointToPointBroker.h>
#include <faabric/util/PeriodicBackgroundThread.h>
#include <faabric/util/asio.h>
#include <faabric/util/clock.h>
#include <faabric/util/config.h>
#include <faabric/util/dirty.h>
Expand Down Expand Up @@ -152,6 +153,40 @@ class FunctionMigrationThread : public faabric::util::PeriodicBackgroundThread
void doWork() override;
};

/**
* A promise for a future message result with an associated eventfd for use with
* asio.
*/
class MessageLocalResult final
{
public:
std::promise<std::unique_ptr<faabric::Message>> promise;
int eventFd = -1;

MessageLocalResult();

MessageLocalResult(const MessageLocalResult&) = delete;

inline MessageLocalResult(MessageLocalResult&& other)
{
this->operator=(std::move(other));
}

MessageLocalResult& operator=(const MessageLocalResult&) = delete;

inline MessageLocalResult& operator=(MessageLocalResult&& other)
{
this->promise = std::move(other.promise);
this->eventFd = other.eventFd;
other.eventFd = -1;
return *this;
}

~MessageLocalResult();

void setValue(std::unique_ptr<faabric::Message>&& msg);
};

/**
* Background thread that periodically checks to see if any executors have
* become stale (i.e. not handled any requests in a given timeout). If any are
Expand Down Expand Up @@ -214,6 +249,12 @@ class Scheduler

faabric::Message getFunctionResult(unsigned int messageId, int timeout);

void getFunctionResultAsync(unsigned int messageId,
int timeoutMs,
asio::io_context& ioc,
asio::any_io_executor& executor,
std::function<void(faabric::Message&)> handler);

void setThreadResult(const faabric::Message& msg,
int32_t returnValue,
const std::string& key,
Expand Down Expand Up @@ -332,8 +373,7 @@ class Scheduler
std::unordered_map<uint32_t, faabric::transport::Message>
threadResultMessages;

std::unordered_map<uint32_t,
std::promise<std::unique_ptr<faabric::Message>>>
std::unordered_map<uint32_t, std::shared_ptr<MessageLocalResult>>
localResults;

std::unordered_map<std::string, std::set<std::string>> pushedSnapshotsMap;
Expand Down
14 changes: 14 additions & 0 deletions include/faabric/util/asio.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#pragma once

#include <boost/asio.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>

namespace asio = boost::asio;
namespace beast = boost::beast;

namespace faabric::util {
using BeastHttpRequest = beast::http::request<beast::http::string_body>;
using BeastHttpResponse = beast::http::response<beast::http::string_body>;
}
Loading

0 comments on commit d686b8e

Please sign in to comment.