Skip to content

Commit

Permalink
Add load balancing policies and handle exceptions in FaabricEndpointH…
Browse files Browse the repository at this point in the history
…andler
  • Loading branch information
DonaldJennings committed Feb 14, 2024
2 parents 609a47c + f2f0184 commit fefeaa8
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 8 deletions.
5 changes: 5 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
<<<<<<< HEAD
"editor.tokenColorCustomizations": {
"[*Light*]": {
"textMateRules": [
Expand Down Expand Up @@ -40,5 +41,9 @@
}
}
]
=======
"files.associations": {
"array": "cpp"
>>>>>>> f2f01849eae98603aa3f1223f9af265743b91e5d
}
}
28 changes: 28 additions & 0 deletions include/faabric/loadbalance/LoadBalancePolicy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#pragma once

#include <set>
#include <string>

class LoadBalancePolicy
{
public:
virtual std::string dispatch(const std::set<std::string>& warm_faaslets) = 0;
};

class FaasmDefaultPolicy : public LoadBalancePolicy
{
public:
std::string dispatch(const std::set<std::string>& warm_faaslets) override;
};

class LeastLoadAveragePolicy : public LoadBalancePolicy
{
public:
std::string dispatch(const std::set<std::string>& warm_faaslets) override;
};

class MostSlotsPolicy : public LoadBalancePolicy
{
public:
std::string dispatch(const std::set<std::string>& warm_faaslets) override;
};
20 changes: 17 additions & 3 deletions src/endpoint/FaabricEndpointHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,22 @@ void FaabricEndpointHandler::onRequest(
response.result(beast::http::status::ok);
response.body() = std::string("Flush sent");
} else {
executeFunction(
std::move(ctx), std::move(response), std::move(msg));
try
{
executeFunction(std::move(ctx), std::move(response), std::move(msg));
}
catch (const std::exception& e)
{
SPDLOG_ERROR("Caught exception in FaabricEndpointHandler::onRequest: {}", e.what());
response.result(beast::http::status::internal_server_error);
response.body() = std::string("Caught exception: ") + e.what();
ctx.sendFunction(std::move(response));
} catch (faabric::util::FaabricException& e) {
SPDLOG_ERROR("Caught FaabricException in FaabricEndpointHandler::onRequest: {}", e.what());
response.result(beast::http::status::internal_server_error);
response.body() = std::string("Caught exception: ") + e.what();
ctx.sendFunction(std::move(response));
}
return;
}
}
Expand Down Expand Up @@ -175,7 +189,7 @@ void FaabricEndpointHandler::onFunctionResult(

response.body() = result.outputdata();
SPDLOG_DEBUG("Worker thread {} sending response", gettid());
ctx.sendFunction(std::move(response));
return ctx.sendFunction(std::move(response));
SPDLOG_DEBUG("Worker thread {} response sent", gettid());
// We're done with this request
}
Expand Down
7 changes: 7 additions & 0 deletions src/loadbalance/FaasmDefaultPolicy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#include <faabric/loadbalance/LoadBalancePolicy.h>
#include <stdexcept>

std::string FaasmDefaultPolicy::dispatch(const std::set<std::string>& warm_faaslets)
{
throw std::runtime_error("FaasmDefaultPolicy::dispatch not implemented");
}
7 changes: 7 additions & 0 deletions src/loadbalance/LeastLoadAveragePolicy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#include <faabric/loadbalance/LoadBalancePolicy.h>
#include <stdexcept>

std::string LeastLoadAveragePolicy::dispatch(const std::set<std::string>& warm_faaslets)
{
throw std::runtime_error("LeastLoadAveragePolicy::dispatch not implemented");
}
7 changes: 7 additions & 0 deletions src/loadbalance/MostSlotsPolicy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#include <faabric/loadbalance/LoadBalancePolicy.h>
#include <stdexcept>

std::string MostSlotsPolicy::dispatch(const std::set<std::string>& warm_faaslets)
{
throw std::runtime_error("MostSlotsPolicy::dispatch not implemented");
}
26 changes: 21 additions & 5 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ int Scheduler::reapStaleExecutors()
req.set_user(user);
req.set_function(function);

SPDLOG_DEBUG("Unregistering {} from {}", key, thisHost);
getFunctionCallClient(masterHost)->unregister(req);
}

Expand Down Expand Up @@ -409,6 +410,7 @@ faabric::util::SchedulingDecision Scheduler::callFunctions(
SPDLOG_DEBUG("Forwarding {} back to master {}", funcStr, masterHost);

ZoneScopedN("Scheduler::callFunctions forward to master");
SPDLOG_DEBUG("Forwarding {} to master {}", funcStr, masterHost);
getFunctionCallClient(masterHost)->executeFunctions(req);
SchedulingDecision decision(firstMsg.appid(), firstMsg.groupid());
decision.returnHost = masterHost;
Expand Down Expand Up @@ -530,7 +532,9 @@ faabric::util::SchedulingDecision Scheduler::doSchedulingDecision(

// Make sure we don't execute the wrong kind (storage/compute) of
// call locally
if (hostKindDifferent) {
if (hostKindDifferent) {
SPDLOG_DEBUG("Host kind different, not scheduling {} locally",
funcStr);
nLocally = 0;
}

Expand All @@ -544,14 +548,18 @@ faabric::util::SchedulingDecision Scheduler::doSchedulingDecision(
// If some are left, we need to distribute.
// First try and do so on already registered hosts.
int remainder = nMessages - nLocally;

if (!hostKindDifferent && remainder > 0) {
SPDLOG_DEBUG("Scheduling {}/{} of {} on registered hosts",
remainder,
nMessages,
funcStr);
const std::set<std::string>& thisRegisteredHosts =
getFunctionRegisteredHosts(
firstMsg.user(), firstMsg.function(), false);

for (const auto& h : thisRegisteredHosts) {
// Work out resources on the remote host
SPDLOG_DEBUG("Checking {} for resources", h);
faabric::HostResources r = getHostResources(h);
int available = r.slots() - r.usedslots();

Expand Down Expand Up @@ -607,22 +615,26 @@ faabric::util::SchedulingDecision Scheduler::doSchedulingDecision(

lastHost = h;
// Work out resources on the remote host
SPDLOG_DEBUG("Checkig unregeistered {} for resources", h);
SPDLOG_DEBUG("Remaining: {}", remainder);

faabric::HostResources r = getHostResources(h);
int available = r.slots() - r.usedslots();

// We need to floor at zero here in case the remote host is
// overloaded, in which case its used slots will be greater than
// its available slots.
available = std::max<int>(0, available);
int nOnThisHost = std::min(available, remainder);

SPDLOG_DEBUG("Unregisted Host Available Slots: {}, nOnThisHost: {}", available, nOnThisHost);

if (topologyHint ==
faabric::util::SchedulingTopologyHint::NEVER_ALONE &&
nOnThisHost < 2) {
continue;
}

SPDLOG_TRACE("Scheduling {}/{} of {} on {} (unregistered)",
SPDLOG_DEBUG("Scheduling {}/{} of {} on {} (unregistered)",
nOnThisHost,
nMessages,
funcStr,
Expand Down Expand Up @@ -991,6 +1003,7 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions(
}

// Dispatch the calls
SPDLOG_DEBUG("Dispatching {} to {}", funcStr, host);
getFunctionCallClient(host)->executeFunctions(hostRequest);
}
}
Expand Down Expand Up @@ -1209,6 +1222,7 @@ void Scheduler::broadcastFlush()
allHosts.erase(thisHost);

// Dispatch flush message to all other hosts
SPDLOG_DEBUG("Broadcasting flush to {} hosts", allHosts.size());
for (auto& otherHost : allHosts) {
getFunctionCallClient(otherHost)->sendFlush();
}
Expand Down Expand Up @@ -1265,6 +1279,7 @@ void Scheduler::setFunctionResult(std::unique_ptr<faabric::Message> msg)
if (!directResultHost.empty()) {
ZoneScopedN("Direct result send");
faabric::util::FullLock lock(mx);
SPDLOG_DEBUG("Sending direct result for {} to {}", msg->id(), directResultHost);
auto fc = getFunctionCallClient(directResultHost);
lock.unlock();
{
Expand Down Expand Up @@ -1652,7 +1667,7 @@ void Scheduler::setThisHostResources(faabric::HostResources& res)

faabric::HostResources Scheduler::getHostResources(const std::string& host)
{
SPDLOG_TRACE("Requesting resources from {}", host);
SPDLOG_DEBUG("Requesting resources from {}", host);
return getFunctionCallClient(host)->getResources();
}

Expand Down Expand Up @@ -1833,6 +1848,7 @@ void Scheduler::broadcastPendingMigrations(
registeredHosts.erase(thisHost);

// Send pending migrations to all involved hosts
SPDLOG_DEBUG("Broadcasting pending migrations for app {}", msg.appid());
for (auto& otherHost : thisRegisteredHosts) {
getFunctionCallClient(otherHost)->sendPendingMigrations(
pendingMigrations);
Expand Down

0 comments on commit fefeaa8

Please sign in to comment.