Skip to content

Commit

Permalink
Reduce differences with upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
eigenraven committed Oct 3, 2022
1 parent 0070d82 commit 578ca87
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 75 deletions.
24 changes: 11 additions & 13 deletions include/faabric/endpoint/FaabricEndpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
#include <faabric/proto/faabric.pb.h>
#include <faabric/util/asio.h>
#include <faabric/util/config.h>
#include <pistache/endpoint.h>
#include <pistache/http.h>

namespace faabric::endpoint {

Expand Down Expand Up @@ -35,19 +33,19 @@ class HttpRequestHandler
faabric::util::BeastHttpRequest&& request) = 0;
};

class Endpoint
class FaabricEndpoint
{
public:
Endpoint() = delete;
Endpoint(const Endpoint&) = delete;
Endpoint(Endpoint&&) = delete;
Endpoint& operator=(const Endpoint&) = delete;
Endpoint& operator=(Endpoint&&) = delete;
virtual ~Endpoint();

Endpoint(int port,
int threadCount,
std::shared_ptr<HttpRequestHandler> requestHandlerIn);
FaabricEndpoint() = delete;
FaabricEndpoint(const FaabricEndpoint&) = delete;
FaabricEndpoint(FaabricEndpoint&&) = delete;
FaabricEndpoint& operator=(const FaabricEndpoint&) = delete;
FaabricEndpoint& operator=(FaabricEndpoint&&) = delete;
virtual ~FaabricEndpoint();

FaabricEndpoint(int port,
int threadCount,
std::shared_ptr<HttpRequestHandler> requestHandlerIn);

void start(EndpointMode mode = EndpointMode::SIGNAL);

Expand Down
11 changes: 0 additions & 11 deletions include/faabric/endpoint/macros.h

This file was deleted.

2 changes: 1 addition & 1 deletion include/faabric/transport/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// https://zguide.zeromq.org/docs/chapter2/#I-O-Threads

#define ZMQ_CONTEXT_IO_THREADS 1
#define FAASM_ZMQ_MAX_SOCKETS 1024 * 1024
#define FAASM_ZMQ_MAX_SOCKETS 32 * 1024 * 1024

namespace faabric::transport {

Expand Down
34 changes: 17 additions & 17 deletions src/endpoint/FaabricEndpoint.cpp
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
#include <faabric/endpoint/FaabricEndpoint.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/util/logging.h>
#include <faabric/util/macros.h>
#include <faabric/util/timing.h>

#include <functional>
#include <optional>
#include <pistache/endpoint.h>
#include <pistache/listener.h>
#include <signal.h>
#include <stdexcept>
#include <thread>
#include <typeinfo>
#include <vector>

namespace faabric::endpoint {
Expand All @@ -21,7 +15,8 @@ struct EndpointState
{
EndpointState(int threadCountIn)
: ioc(threadCountIn)
{}
{
}
asio::io_context ioc;
std::vector<std::thread> ioThreads;
};
Expand All @@ -45,7 +40,8 @@ class HttpConnection : public std::enable_shared_from_this<HttpConnection>
, buffer()
, parser()
, handler(handlerIn)
{}
{
}

void run()
{
Expand Down Expand Up @@ -191,16 +187,18 @@ class EndpointListener : public std::enable_shared_from_this<EndpointListener>
};
}

Endpoint::Endpoint(int portIn,
int threadCountIn,
std::shared_ptr<HttpRequestHandler> requestHandlerIn)
FaabricEndpoint::FaabricEndpoint(
int portIn,
int threadCountIn,
std::shared_ptr<HttpRequestHandler> requestHandlerIn)
: port(portIn)
, threadCount(threadCountIn)
, state(nullptr)
, requestHandler(requestHandlerIn)
{}
{
}

Endpoint::~Endpoint() {}
FaabricEndpoint::~FaabricEndpoint() {}

struct SchedulerMonitoringTask
: public std::enable_shared_from_this<SchedulerMonitoringTask>
Expand All @@ -211,7 +209,8 @@ struct SchedulerMonitoringTask
SchedulerMonitoringTask(asio::io_context& ioc)
: ioc(ioc)
, timer(ioc, boost::posix_time::milliseconds(1))
{}
{
}

void run()
{
Expand All @@ -223,7 +222,7 @@ struct SchedulerMonitoringTask
}
};

void Endpoint::start(EndpointMode mode)
void FaabricEndpoint::start(EndpointMode mode)
{
SPDLOG_INFO("Starting HTTP endpoint on {}, {} threads", port, threadCount);
if (getpid() != gettid()) {
Expand Down Expand Up @@ -253,7 +252,8 @@ void Endpoint::start(EndpointMode mode)

std::make_shared<SchedulerMonitoringTask>(state->ioc)->run();

int extraThreads = std::max((mode == EndpointMode::SIGNAL) ? 0 : 1, this->threadCount - 1);
int extraThreads =
std::max((mode == EndpointMode::SIGNAL) ? 0 : 1, this->threadCount - 1);
state->ioThreads.reserve(extraThreads);
auto ioc_run = [&ioc{ state->ioc }]() { ioc.run(); };
for (int i = 0; i < extraThreads; i++) {
Expand All @@ -264,7 +264,7 @@ void Endpoint::start(EndpointMode mode)
}
}

void Endpoint::stop()
void FaabricEndpoint::stop()
{
SPDLOG_INFO("Shutting down endpoint on {}", port);
state->ioc.stop();
Expand Down
21 changes: 13 additions & 8 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Scheduler::Scheduler()

// Start the reaper thread
reaperThread.start(conf.reaperIntervalSeconds);

if (this->conf.isStorageNode) {
redis::Redis& redis = redis::Redis::getQueue();
redis.sadd(ALL_STORAGE_HOST_SET, this->thisHost);
Expand Down Expand Up @@ -414,7 +414,8 @@ faabric::util::SchedulingDecision Scheduler::callFunctions(
SchedulingDecision decision = doSchedulingDecision(req, topologyHint);

// Pass decision as hint
return doCallFunctions(std::move(req), decision, caller, lock, topologyHint);
return doCallFunctions(
std::move(req), decision, caller, lock, topologyHint);
}

faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision(
Expand Down Expand Up @@ -584,7 +585,8 @@ faabric::util::SchedulingDecision Scheduler::doSchedulingDecision(
unregisteredHosts.push_back(std::move(h));
}
} else {
unregisteredHosts = getUnregisteredHosts(firstMsg.user(), firstMsg.function());
unregisteredHosts =
getUnregisteredHosts(firstMsg.user(), firstMsg.function());
}

for (const auto& h : unregisteredHosts) {
Expand Down Expand Up @@ -688,8 +690,11 @@ faabric::util::SchedulingDecision Scheduler::callFunctions(
const MessageRecord& caller)
{
faabric::util::FullLock lock(mx);
return doCallFunctions(
std::move(req), hint, caller, lock, faabric::util::SchedulingTopologyHint::NONE);
return doCallFunctions(std::move(req),
hint,
caller,
lock,
faabric::util::SchedulingTopologyHint::NONE);
}

faabric::util::SchedulingDecision Scheduler::doCallFunctions(
Expand Down Expand Up @@ -1407,8 +1412,7 @@ faabric::Message Scheduler::getFunctionResult(unsigned int messageId,
std::atomic_int* suspendedCtr = nullptr;
if (!caller.function.empty()) {
faabric::util::SharedLock _l(mx);
suspendedCtr =
&suspendedExecutors[caller.user + "/" + caller.function];
suspendedCtr = &suspendedExecutors[caller.user + "/" + caller.function];
_l.unlock();
suspendedCtr->fetch_add(1, std::memory_order_acq_rel);
monitorWaitingTasks.fetch_add(1, std::memory_order_acq_rel);
Expand Down Expand Up @@ -1544,7 +1548,8 @@ void Scheduler::getFunctionResultAsync(
, mlr(std::move(mlr))
, dsc(std::move(dsc))
, handler(handler)
{}
{
}
~MlrAwaiter() { dsc.release(); }
void await(const boost::system::error_code& ec)
{
Expand Down
39 changes: 25 additions & 14 deletions src/transport/MessageEndpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ zmq::socket_t socketFactory(zmq::socket_type socketType,
break;
}
default: {
SPDLOG_ERROR(
"Invalid bind socket type {} ({})", (int) socketType, address);
SPDLOG_ERROR("Invalid bind socket type {} ({})",
(int)socketType,
address);
throw std::runtime_error(
"Binding with invalid socket type");
}
Expand Down Expand Up @@ -179,7 +180,7 @@ zmq::socket_t socketFactory(zmq::socket_type socketType,
}
default: {
SPDLOG_ERROR("Invalid connect socket type {} ({})",
(int) socketType,
(int)socketType,
address);
throw std::runtime_error(
"Connecting with unrecognized socket type");
Expand Down Expand Up @@ -215,7 +216,8 @@ MessageEndpoint::MessageEndpoint(const std::string& hostIn,
int timeoutMsIn)
: MessageEndpoint("tcp://" + hostIn + ":" + std::to_string(portIn),
timeoutMsIn)
{}
{
}

std::string MessageEndpoint::getAddress()
{
Expand Down Expand Up @@ -330,7 +332,7 @@ Message MessageEndpoint::recvBuffer(zmq::socket_t& socket, size_t size)
}
} catch (zmq::error_t& e) {
if (e.num() == ZMQ_ETERM) {
SPDLOG_WARN("Endpoint {} received ETERM on recv", address);
SPDLOG_WARN("FaabricEndpoint {} received ETERM on recv", address);
return Message(MessageResponseCode::TERM);
}

Expand Down Expand Up @@ -529,7 +531,8 @@ AsyncFanOutMessageEndpoint::AsyncFanOutMessageEndpoint(

AsyncFanInMessageEndpoint::AsyncFanInMessageEndpoint(int portIn, int timeoutMs)
: FanInMessageEndpoint(portIn, timeoutMs, zmq::socket_type::pull)
{}
{
}

// ----------------------------------------------
// SYNC FAN IN AND FAN OUT
Expand All @@ -542,11 +545,13 @@ SyncFanOutMessageEndpoint::SyncFanOutMessageEndpoint(
timeoutMs,
zmq::socket_type::dealer,
MessageEndpointConnectType::BIND)
{}
{
}

SyncFanInMessageEndpoint::SyncFanInMessageEndpoint(int portIn, int timeoutMs)
: FanInMessageEndpoint(portIn, timeoutMs, zmq::socket_type::router)
{}
{
}

// ----------------------------------------------
// ASYNC RECV ENDPOINT
Expand All @@ -559,11 +564,13 @@ AsyncRecvMessageEndpoint::AsyncRecvMessageEndpoint(
timeoutMs,
zmq::socket_type::pull,
MessageEndpointConnectType::CONNECT)
{}
{
}

AsyncRecvMessageEndpoint::AsyncRecvMessageEndpoint(int portIn, int timeoutMs)
: RecvMessageEndpoint(portIn, timeoutMs, zmq::socket_type::pull)
{}
{
}

Message AsyncRecvMessageEndpoint::recv()
{
Expand All @@ -578,7 +585,8 @@ AsyncInternalRecvMessageEndpoint::AsyncInternalRecvMessageEndpoint(
timeoutMs,
zmq::socket_type::pull,
MessageEndpointConnectType::BIND)
{}
{
}

Message AsyncInternalRecvMessageEndpoint::recv()
{
Expand All @@ -596,11 +604,13 @@ SyncRecvMessageEndpoint::SyncRecvMessageEndpoint(const std::string& inprocLabel,
timeoutMs,
zmq::socket_type::rep,
MessageEndpointConnectType::CONNECT)
{}
{
}

SyncRecvMessageEndpoint::SyncRecvMessageEndpoint(int portIn, int timeoutMs)
: RecvMessageEndpoint(portIn, timeoutMs, zmq::socket_type::rep)
{}
{
}

Message SyncRecvMessageEndpoint::recv()
{
Expand All @@ -626,7 +636,8 @@ AsyncDirectRecvEndpoint::AsyncDirectRecvEndpoint(const std::string& inprocLabel,
timeoutMs,
zmq::socket_type::pair,
MessageEndpointConnectType::BIND)
{}
{
}

Message AsyncDirectRecvEndpoint::recv()
{
Expand Down
2 changes: 1 addition & 1 deletion src/transport/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ void initGlobalMessageContext()

SPDLOG_TRACE("Initialising global ZeroMQ context");
instance = std::make_shared<zmq::context_t>(ZMQ_CONTEXT_IO_THREADS,
32 * 1024 * 1024);
FAASM_ZMQ_MAX_SOCKETS);
}

std::shared_ptr<zmq::context_t> getGlobalMessageContext()
Expand Down
2 changes: 1 addition & 1 deletion tests/dist/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ int main()
// Endpoint will block until killed
SPDLOG_INFO("Starting HTTP endpoint on worker");
const auto& config = faabric::util::getSystemConfig();
faabric::endpoint::Endpoint endpoint(
faabric::endpoint::FaabricEndpoint endpoint(
config.endpointPort,
config.endpointNumThreads,
std::make_shared<faabric::endpoint::FaabricEndpointHandler>());
Expand Down
Loading

0 comments on commit 578ca87

Please sign in to comment.