Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
eigenraven committed Nov 28, 2022
2 parents 3bbf3f2 + 2c976f3 commit ccdaf51
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
strategy:
fail-fast: false
matrix:
sanitiser: [None, Address, Thread, Undefined, Leak]
sanitiser: [None, Address, Thread, Undefined]
env:
HOST_TYPE: ci
REDIS_QUEUE_HOST: redis
Expand Down
4 changes: 1 addition & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ else()
endif()

# Top-level CMake config
set(CMAKE_CXX_FLAGS "-fwrapv -Wall -Wthread-safety -Werror=thread-safety")
set(CMAKE_CXX_FLAGS "-fwrapv -Wall -Werror=vla -Wthread-safety -Werror=thread-safety")
set(CMAKE_CXX_FLAGS_DEBUG "-g")
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
Expand Down Expand Up @@ -55,8 +55,6 @@ elseif (FAABRIC_USE_SANITISER STREQUAL "Thread")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread -fsanitize-ignorelist=${CMAKE_CURRENT_LIST_DIR}/thread-sanitizer-ignorelist.txt")
elseif (FAABRIC_USE_SANITISER STREQUAL "Undefined")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=undefined")
elseif (FAABRIC_USE_SANITISER STREQUAL "Leak")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=leak")
elseif (FAABRIC_USE_SANITISER STREQUAL "Memory")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=memory")
elseif (NOT ((FAABRIC_USE_SANITISER STREQUAL "None") OR (NOT FAABRIC_USE_SANITISER)))
Expand Down
14 changes: 0 additions & 14 deletions include/faabric/transport/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,6 @@
throw std::runtime_error("Error deserialising message"); \
}

#define SERIALISE_MSG(_msg) \
size_t serialisedSize = _msg.ByteSizeLong(); \
uint8_t serialisedBuffer[serialisedSize]; \
if (!_msg.SerializeToArray(serialisedBuffer, serialisedSize)) { \
throw std::runtime_error("Error serialising message"); \
}

#define SERIALISE_MSG_PTR(_msg) \
size_t serialisedSize = _msg->ByteSizeLong(); \
uint8_t serialisedBuffer[serialisedSize]; \
if (!_msg->SerializeToArray(serialisedBuffer, serialisedSize)) { \
throw std::runtime_error("Error serialising message"); \
}

#define SEND_FB_MSG(T, _mb) \
{ \
const uint8_t* _buffer = _mb.GetBufferPointer(); \
Expand Down
13 changes: 11 additions & 2 deletions src/scheduler/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,18 @@ void MpiWorld::sendRemoteMpiMessage(
int recvRank,
const std::shared_ptr<faabric::MPIMessage>& msg)
{
SERIALISE_MSG_PTR(msg);
std::string serialisedBuffer;
if (!msg->SerializeToString(&serialisedBuffer)) {
throw std::runtime_error("Error serialising message");
}
broker.sendMessage(
id, sendRank, recvRank, serialisedBuffer, serialisedSize, dstHost, true);
id,
sendRank,
recvRank,
reinterpret_cast<const uint8_t*>(serialisedBuffer.data()),
serialisedBuffer.size(),
dstHost,
true);
}

std::shared_ptr<faabric::MPIMessage> MpiWorld::recvRemoteMpiMessage(
Expand Down
24 changes: 12 additions & 12 deletions src/transport/MessageEndpointClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ void MessageEndpointClient::asyncSend(int header,
int sequenceNum)
{
ZoneScopedNS("MessageEndpointClient::asyncSend@2", 6);
size_t msgSize = msg->ByteSizeLong();
msgBuffer.resize(msgSize);
ZoneValue(msgSize);
std::string buffer;

TracyMessageL("Serialized");
if (!msg->SerializeToArray(msgBuffer.data(), msgBuffer.size())) {
if (!msg->SerializeToString(&buffer)) {
throw std::runtime_error("Error serialising message");
}

asyncSend(header, msgBuffer.data(), msgBuffer.size(), sequenceNum);
asyncSend(header,
reinterpret_cast<uint8_t*>(buffer.data()),
buffer.size(),
sequenceNum);
}

void MessageEndpointClient::asyncSend(int header,
Expand All @@ -51,15 +51,15 @@ void MessageEndpointClient::syncSend(int header,
google::protobuf::Message* response)
{
ZoneScopedNS("MessageEndpointClient::syncSend@3", 6);
size_t msgSize = msg->ByteSizeLong();
ZoneValue(msgSize);
msgBuffer.resize(msgSize);
if (!msg->SerializeToArray(msgBuffer.data(), msgBuffer.size())) {
std::string buffer;
if (!msg->SerializeToString(&buffer)) {
throw std::runtime_error("Error serialising message");
}
TracyMessageL("Serialized");

syncSend(header, msgBuffer.data(), msgBuffer.size(), response);
syncSend(header,
reinterpret_cast<uint8_t*>(buffer.data()),
buffer.size(),
response);
}

void MessageEndpointClient::syncSend(int header,
Expand Down
11 changes: 6 additions & 5 deletions src/transport/MessageEndpointServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,18 @@ void MessageEndpointServerHandler::start(int timeoutMs)
std::unique_ptr<google::protobuf::Message> resp =
server->doSyncRecv(body);

size_t respSize = resp->ByteSizeLong();

uint8_t buffer[respSize];
if (!resp->SerializeToArray(buffer, respSize)) {
std::string buffer;
if (!resp->SerializeToString(&buffer)) {
throw std::runtime_error(
"Error serialising message");
}

// Return the response
static_cast<SyncRecvMessageEndpoint*>(endpoint.get())
->sendResponse(NO_HEADER, buffer, respSize);
->sendResponse(
NO_HEADER,
reinterpret_cast<uint8_t*>(buffer.data()),
buffer.size());
}

// Wait on the request latch if necessary
Expand Down
6 changes: 3 additions & 3 deletions src/util/dirty.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <array>
#include <cstdint>
#include <cstring>
#include <fcntl.h>
Expand Down Expand Up @@ -590,8 +591,7 @@ void UffdDirtyTracker::eventThreadEntrypoint()
{
SPDLOG_TRACE(
"Starting uffd event thread (uffd={}, closeFd={})", uffd, closeFd);
int nFds = 2;
struct pollfd pollfds[nFds];
std::array<struct pollfd, 2> pollfds;

pollfds[0].fd = uffd;
pollfds[0].events = POLLIN;
Expand All @@ -600,7 +600,7 @@ void UffdDirtyTracker::eventThreadEntrypoint()
pollfds[1].events = POLLIN;

for (;;) {
int nReady = poll(pollfds, nFds, -1);
int nReady = poll(pollfds.data(), pollfds.size(), -1);
if (nReady == -1) {
SPDLOG_ERROR("Poll failed: {} ({})", errno, strerror(errno));
throw std::runtime_error("Poll failed");
Expand Down
4 changes: 2 additions & 2 deletions src/util/random.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace faabric::util {
std::string randomString(int len)
{
char result[len];
std::string result(len, '\0');

static const char alphanum[] = "123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
Expand All @@ -23,7 +23,7 @@ std::string randomString(int len)
result[i] = alphanum[r];
}

return std::string(result, result + len);
return result;
}

std::string randomStringFromSet(const std::unordered_set<std::string>& s)
Expand Down
7 changes: 6 additions & 1 deletion tasks/format_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ def format(ctx, check=False):
.split("\n")[:-1]
)

clang_cmd = "clang-format-10 -i {}".format(" ".join(files_to_check))
clang_cmd = [
"clang-format-10",
"--dry-run --Werror" if check else "-i",
" ".join(files_to_check),
]
clang_cmd = " ".join(clang_cmd)
run(clang_cmd, shell=True, check=True, cwd=PROJ_ROOT)

# ---- Append newlines to C/C++ files if not there ----
Expand Down

0 comments on commit ccdaf51

Please sign in to comment.