Skip to content

Commit

Permalink
Executor decision caching and single-host optimisations (faasm#222)
Browse files Browse the repository at this point in the history
* Add flag to message and set in sch

* Porting over executor changes

* Start fixing restore checks in tests

* Fix up restore count test

* Add test for passing in single host flag

* Start on refactor of executor threads

* Refactoring executor thread handling

* Tidy-up

* Compilation errors

* Adding tests for decision caching

* Hooking up tests for executor threading

* Move decision caching into scheduler

* Add tests for cached topology hint

* Revert changes to dist tests

* Fixing up tests for single host/ thread exec

* Send out ptp mappings

* Fix failing threading test
  • Loading branch information
Shillaker authored Feb 11, 2022
1 parent 8f617bb commit 55714be
Show file tree
Hide file tree
Showing 11 changed files with 765 additions and 313 deletions.
19 changes: 13 additions & 6 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class Executor
faabric::Message& msg,
bool createIfNotExists = false);

void updateMainThreadSnapshot();

virtual std::span<uint8_t> getMemoryView();

protected:
Expand All @@ -106,11 +108,7 @@ class Executor

// ---- Application threads ----
std::shared_mutex threadExecutionMutex;
std::unordered_map<std::string, int> cachedGroupIds;
std::unordered_map<std::string, std::vector<std::string>>
cachedDecisionHosts;
std::vector<char> dirtyRegions;

void deleteMainThreadSnapshot(const faabric::Message& msg);

// ---- Function execution thread pool ----
Expand All @@ -133,6 +131,11 @@ class Scheduler
public:
Scheduler();

faabric::util::SchedulingDecision makeSchedulingDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingTopologyHint topologyHint =
faabric::util::SchedulingTopologyHint::NONE);

void callFunction(faabric::Message& msg, bool forceLocal = false);

faabric::util::SchedulingDecision callFunctions(
Expand Down Expand Up @@ -176,6 +179,9 @@ class Scheduler

void setThreadResultLocally(uint32_t msgId, int32_t returnValue);

std::vector<std::pair<uint32_t, int32_t>> awaitThreadResults(
std::shared_ptr<faabric::BatchExecuteRequest> req);

int32_t awaitThreadResult(uint32_t messageId);

void registerThread(uint32_t msgId);
Expand Down Expand Up @@ -281,14 +287,15 @@ class Scheduler

std::unordered_map<std::string, std::set<std::string>> registeredHosts;

faabric::util::SchedulingDecision makeSchedulingDecision(
faabric::util::SchedulingDecision doSchedulingDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingTopologyHint topologyHint);

faabric::util::SchedulingDecision doCallFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingDecision& decision,
faabric::util::FullLock& lock);
faabric::util::FullLock& lock,
faabric::util::SchedulingTopologyHint topologyHint);

std::shared_ptr<Executor> claimExecutor(
faabric::Message& msg,
Expand Down
60 changes: 55 additions & 5 deletions include/faabric/util/scheduling.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <vector>

#include <faabric/proto/faabric.pb.h>
#include <faabric/util/locks.h>

namespace faabric::util {

Expand Down Expand Up @@ -33,6 +34,8 @@ class SchedulingDecision

std::string returnHost;

bool isSingleHost();

void addMessage(const std::string& host, const faabric::Message& msg);

void addMessage(const std::string& host, int32_t messageId, int32_t appIdx);
Expand All @@ -45,8 +48,8 @@ class SchedulingDecision

// Scheduling topology hints help the scheduler decide which host to assign new
// requests in a batch.
// - NORMAL: bin-packs requests to slots in hosts starting from the master
// host, and overloadds the master if it runs out of resources.
// - NONE: bin-packs requests to slots in hosts starting from the master
// host, and overloadds the master if it runs out of resources.
// - FORCE_LOCAL: force local execution irrespective of the available
// resources.
// - NEVER_ALONE: never allocates a single (non-master) request to a host
Expand All @@ -55,7 +58,8 @@ class SchedulingDecision
// migration opportunities to appear.
enum SchedulingTopologyHint
{
NORMAL,
NONE,
CACHED,
FORCE_LOCAL,
NEVER_ALONE,
UNDERFULL,
Expand All @@ -65,20 +69,66 @@ enum SchedulingTopologyHint
// around
const std::unordered_map<std::string, SchedulingTopologyHint>
strToTopologyHint = {
{ "NORMAL", SchedulingTopologyHint::NORMAL },
{ "NONE", SchedulingTopologyHint::NONE },
{ "CACHED", SchedulingTopologyHint::CACHED },
{ "FORCE_LOCAL", SchedulingTopologyHint::FORCE_LOCAL },
{ "NEVER_ALONE", SchedulingTopologyHint::NEVER_ALONE },
{ "UNDERFULL", SchedulingTopologyHint::UNDERFULL },
};

const std::unordered_map<SchedulingTopologyHint, std::string>
topologyHintToStr = {
{ SchedulingTopologyHint::NORMAL, "NORMAL" },
{ SchedulingTopologyHint::NONE, "NONE" },
{ SchedulingTopologyHint::CACHED, "CACHED" },
{ SchedulingTopologyHint::FORCE_LOCAL, "FORCE_LOCAL" },
{ SchedulingTopologyHint::NEVER_ALONE, "NEVER_ALONE" },
{ SchedulingTopologyHint::UNDERFULL, "UNDERFULL" },
};

/**
* A record of a decision already taken for the given size of batch request
* for the given function. This doesn't contain the messages themselves,
* just the hosts and group ID that was used.
*/
class CachedDecision
{
public:
CachedDecision(const std::vector<std::string>& hostsIn, int groupIdIn);

std::vector<std::string> getHosts() { return hosts; }

int getGroupId() const { return groupId; }

private:
std::vector<std::string> hosts;
int groupId = 0;
};

/**
* Repository for cached scheduling decisions. Object is not thread safe as we
* assume only a single executor will be caching decisions for a given function
* and size of batch request on one host at a time.
*/
class DecisionCache
{
public:
std::shared_ptr<CachedDecision> getCachedDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req);

void addCachedDecision(std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingDecision& decision);

void clear();

private:
std::string getCacheKey(std::shared_ptr<faabric::BatchExecuteRequest> req);

std::unordered_map<std::string, std::shared_ptr<CachedDecision>>
cachedDecisions;
};

DecisionCache& getSchedulingDecisionCache();

// Migration strategies help the scheduler decide wether the scheduling decision
// for a batch request could be changed with the new set of available resources.
// - BIN_PACK: sort hosts by the number of functions from the batch they are
Expand Down
4 changes: 4 additions & 0 deletions src/proto/faabric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ message BatchExecuteRequest {
// Arbitrary context for this batch
int32 subType = 6;
bytes contextData = 7;

// Flag set by the scheduler when this batch is all executing on a single
// host
bool singleHost = 8;
}

message HostResources {
Expand Down
Loading

0 comments on commit 55714be

Please sign in to comment.