Skip to content

Commit

Permalink
Switch from zeromq to nng-based messaging.
Browse files Browse the repository at this point in the history
Nng is mostly thread-safe so can avoid the need to create as many sockets/connections as zeromq needed.
  • Loading branch information
eigenraven committed Nov 25, 2022
1 parent a955f70 commit 3eceb6d
Show file tree
Hide file tree
Showing 27 changed files with 656 additions and 718 deletions.
4 changes: 0 additions & 4 deletions examples/server.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include <faabric/endpoint/FaabricEndpoint.h>
#include <faabric/runner/FaabricMain.h>
#include <faabric/scheduler/ExecutorFactory.h>
#include <faabric/transport/context.h>
#include <faabric/util/logging.h>

using namespace faabric::scheduler;
Expand Down Expand Up @@ -39,7 +38,6 @@ class ExampleExecutorFactory : public ExecutorFactory
int main()
{
faabric::util::initLogging();
faabric::transport::initGlobalMessageContext();

// Start the worker pool
SPDLOG_INFO("Starting executor pool in the background");
Expand All @@ -55,7 +53,5 @@ int main()

SPDLOG_INFO("Shutting down endpoint");
m.shutdown();
faabric::transport::closeGlobalMessageContext();

return EXIT_SUCCESS;
}
14 changes: 6 additions & 8 deletions include/faabric/scheduler/FunctionCallServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,17 @@ class FunctionCallServer final
std::unique_ptr<google::protobuf::Message> doSyncRecv(
transport::Message& message) override;

std::unique_ptr<google::protobuf::Message> recvFlush(const uint8_t* buffer,
size_t bufferSize);
std::unique_ptr<google::protobuf::Message> recvFlush(
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvGetResources(
const uint8_t* buffer,
size_t bufferSize);
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvPendingMigrations(
const uint8_t* buffer,
size_t bufferSize);
std::span<const uint8_t> buffer);

void recvExecuteFunctions(const uint8_t* buffer, size_t bufferSize);
void recvExecuteFunctions(std::span<const uint8_t> buffer);

void recvUnregister(const uint8_t* buffer, size_t bufferSize);
void recvUnregister(std::span<const uint8_t> buffer);
};
}
8 changes: 3 additions & 5 deletions include/faabric/snapshot/SnapshotServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@ class SnapshotServer final : public faabric::transport::MessageEndpointServer
transport::Message& message) override;

std::unique_ptr<google::protobuf::Message> recvPushSnapshot(
const uint8_t* buffer,
size_t bufferSize);
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvPushSnapshotUpdate(
const uint8_t* buffer,
size_t bufferSize);
std::span<const uint8_t> buffer);

void recvDeleteSnapshot(const uint8_t* buffer, size_t bufferSize);
void recvDeleteSnapshot(std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvThreadResult(
faabric::transport::Message& message);
Expand Down
26 changes: 12 additions & 14 deletions include/faabric/state/StateServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,25 @@ class StateServer final : public faabric::transport::MessageEndpointServer

// Sync methods

std::unique_ptr<google::protobuf::Message> recvSize(const uint8_t* buffer,
size_t bufferSize);
std::unique_ptr<google::protobuf::Message> recvSize(
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvPull(const uint8_t* buffer,
size_t bufferSize);
std::unique_ptr<google::protobuf::Message> recvPull(
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvPush(const uint8_t* buffer,
size_t bufferSize);
std::unique_ptr<google::protobuf::Message> recvPush(
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvAppend(const uint8_t* buffer,
size_t bufferSize);
std::unique_ptr<google::protobuf::Message> recvAppend(
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvPullAppended(
const uint8_t* buffer,
size_t bufferSize);
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvClearAppended(
const uint8_t* buffer,
size_t bufferSize);
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvDelete(const uint8_t* buffer,
size_t bufferSize);
std::unique_ptr<google::protobuf::Message> recvDelete(
std::span<const uint8_t> buffer);
};
}
109 changes: 84 additions & 25 deletions include/faabric/transport/Message.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
#pragma once

#include <span>
#include <string>
#include <zmq.hpp>
#include <vector>

#include <faabric/util/bytes.h>
#include <nng/nng.h>

// The header structure is:
// - Message code (uint8_t)
// - Message body size (uint64_t)
// - Message sequence number of in-order message delivery default -1 (int32_t)
#define NO_HEADER 0
#define HEADER_MSG_SIZE (sizeof(uint8_t) + sizeof(uint64_t) + sizeof(int32_t))

#define SHUTDOWN_HEADER 220
static constexpr std::array<uint8_t, 4> shutdownPayload = { 0, 0, 1, 1 };

#define NO_SEQUENCE_NUM -1

Expand All @@ -10,7 +24,7 @@ namespace faabric::transport {
/**
* Types of message send/ receive outcomes.
*/
enum MessageResponseCode
enum class MessageResponseCode
{
SUCCESS,
TERM,
Expand All @@ -26,42 +40,87 @@ enum MessageResponseCode
* Messages are not copyable, only movable, as they will regularly contain large
* amounts of data.
*/
class Message
class Message final
{
public:
// Delete everything copy-related, default everything move-related
Message(const Message& other) = delete;

Message& operator=(const Message& other) = delete;

Message(Message&& other) = default;

Message& operator=(Message&& other) = default;
Message(size_t bufferSize);

Message(size_t size);
Message(nng_msg* nngMsg);

Message(MessageResponseCode responseCodeIn);

MessageResponseCode getResponseCode() { return responseCode; }

char* data();

uint8_t* udata();

std::vector<uint8_t> dataCopy();
~Message();

int size();
// Delete everything copy-related, custom move constructors to reset the
// original object on move.
Message(const Message& other) = delete;

void setHeader(uint8_t header) { _header = header; };
Message& operator=(const Message& other) = delete;

uint8_t getHeader() const { return _header; };
// Inline for better codegen
Message(Message&& other) { this->operator=(std::move(other)); }

Message& operator=(Message&& other)
{
other.nngMsg = nngMsg;
other.responseCode = responseCode;
other._header = _header;
other._sequenceNum = _sequenceNum;
nngMsg = nullptr;
responseCode = MessageResponseCode::SUCCESS;
_header = 0;
_sequenceNum = NO_SEQUENCE_NUM;
return *this;
}

void setSequenceNum(int sequenceNum) { _sequenceNum = sequenceNum; };
MessageResponseCode getResponseCode() { return responseCode; }

int getSequenceNum() const { return _sequenceNum; };
// Includes the header
std::span<uint8_t> allData()
{
return nngMsg == nullptr
? std::span<uint8_t>()
: std::span<uint8_t>(
reinterpret_cast<uint8_t*>(nng_msg_body(nngMsg)),
nng_msg_len(nngMsg));
}

std::span<const uint8_t> allData() const
{
return nngMsg == nullptr
? std::span<const uint8_t>()
: std::span<const uint8_t>(
reinterpret_cast<const uint8_t*>(nng_msg_body(nngMsg)),
nng_msg_len(nngMsg));
}

std::span<char> data();

std::span<uint8_t> udata();

std::vector<uint8_t> dataCopy() const;

uint8_t getHeader() const
{
return nngMsg == nullptr ? 0 : allData().data()[0];
}

uint64_t getDeclaredDataSize() const
{
return faabric::util::unalignedRead<uint64_t>(allData().data() +
sizeof(uint8_t));
}

int getSequenceNum() const
{
return allData().size() < HEADER_MSG_SIZE
? NO_SEQUENCE_NUM
: faabric::util::unalignedRead<int32_t>(
allData().data() + sizeof(uint8_t) + sizeof(uint64_t));
}

private:
std::vector<uint8_t> buffer;
nng_msg* nngMsg = nullptr;

MessageResponseCode responseCode = MessageResponseCode::SUCCESS;

Expand Down
Loading

0 comments on commit 3eceb6d

Please sign in to comment.