Skip to content

Commit

Permalink
Towards a working load balancer...
Browse files Browse the repository at this point in the history
- Hierarchy Services
    - Switching to addressing scheme not containing NUMA domains

- Data Item Manager
    - Fixing acquisition/removal of regions

- Dashboard
    - Integrating backchannel
  • Loading branch information
Thomas Heller authored and sithhell committed Oct 29, 2018
1 parent 06dc117 commit 1b5f87b
Show file tree
Hide file tree
Showing 33 changed files with 1,252 additions and 838 deletions.
17 changes: 14 additions & 3 deletions allscale/components/monitor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <allscale/work_item_stats.hpp>
#include <allscale/util/graph_colouring.hpp>
#include <allscale/historical_data.hpp>
#include <allscale/task_times.hpp>


#include <hpx/include/components.hpp>
Expand All @@ -45,6 +46,7 @@ namespace allscale { namespace components {
struct HPX_COMPONENT_EXPORT monitor
: hpx::components::component_base<monitor>
{
typedef hpx::lcos::local::spinlock mutex_type;

monitor()
{
Expand All @@ -63,6 +65,14 @@ namespace allscale { namespace components {
// hpx::id_type get_left_neighbour() { return left_; }
// hpx::id_type get_right_neighbour() { return right_; }

mutex_type task_times_mtx_;
task_times task_times_;
task_times last_task_times_;
std::chrono::high_resolution_clock::time_point last_task_times_sample_;

void add_task_time(task_id::task_path const& path, task_times::time_t const& time);

task_times get_task_times();

/////////////////////////////////////////////////////////////////////////////////////
/// Performance Data Introspection
Expand Down Expand Up @@ -336,9 +346,9 @@ namespace allscale { namespace components {
// // /// \returns Cpu load
float get_cpu_load();

double get_avg_task_duration();

private:
typedef hpx::lcos::local::spinlock mutex_type;

// MONITOR MANAGEMENT
// Measuring total execution time
Expand All @@ -350,7 +360,9 @@ namespace allscale { namespace components {
std::uint64_t num_localities_;
mutex_type init_mutex;
bool initialized = false;
public:
bool enable_monitor;
private:

// System parameters
unsigned long long total_memory_;
Expand Down Expand Up @@ -453,12 +465,12 @@ namespace allscale { namespace components {
// hpx::id_type idle_rate_avg_counter_;
// double idle_rate_avg_;

#ifdef REALTIME_VIZ
// REALTIME VIZ
std::mutex counter_mutex_;
std::uint64_t num_active_tasks_;
std::uint64_t total_tasks_;
double total_task_duration_;
#ifdef REALTIME_VIZ

hpx::id_type idle_rate_counter_;
double idle_rate_;
Expand All @@ -470,7 +482,6 @@ namespace allscale { namespace components {
unsigned long long int sample_id_;

bool sample_task_stats();
double get_avg_task_duration();
#endif

// HISTORICAL DATA
Expand Down
3 changes: 2 additions & 1 deletion allscale/components/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ namespace allscale { namespace components {
std::vector<hpx::threads::mask_type> suspending_masks_;
std::vector<hpx::threads::mask_type> resuming_masks_;
std::vector<executor_type> executors_;
std::atomic<std::size_t> current_;
std::size_t numa_depth = 0;
std::size_t numa_start = 0;

// This is the depth where we don't want to split anymore...
std::size_t depth_cut_off_;
Expand Down
7 changes: 6 additions & 1 deletion allscale/dashboard.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,10 @@ namespace allscale { namespace dashboard

struct system_state
{
HPX_EXPORT system_state();

// the time this state was recorded
std::uint64_t time;
std::uint64_t time = 0;

// -- multi-objective metrics --

Expand All @@ -112,6 +114,8 @@ namespace allscale { namespace dashboard
// current power usage / max power usage on all nodes \in [0..1]
float power = 0;

std::string policy;

// the overall system-wide score of the objective function
float score = 0;

Expand All @@ -124,6 +128,7 @@ namespace allscale { namespace dashboard
};

void update();
void get_commands();
void shutdown();
}}

Expand Down
13 changes: 8 additions & 5 deletions allscale/data_item_manager/acquire.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ namespace allscale { namespace data_item_manager {
if (req.mode == access_mode::ReadWrite)
{
auto& entry =
runtime::HierarchicalOverlayNetwork::getLocalService<index_service<data_item_type>>(addr).get(req.ref);
entry.resize_fragment(req, req.region, true);
runtime::HierarchicalOverlayNetwork::getLocalService<index_service<data_item_type>>(addr.getLayer()).get(req.ref);
// entry.resize_fragment(req, req.region, true);
region_type missing;
{
std::lock_guard<mutex_type> l(entry.mtx_);
Expand Down Expand Up @@ -145,9 +145,9 @@ namespace allscale { namespace data_item_manager {
auto info = infof.get();
if (info.regions.empty()) return hpx::make_ready_future();

auto& entry =
runtime::HierarchicalOverlayNetwork::getLocalService<index_service<data_item_type>>(addr).get(req.ref);
entry.resize_fragment(req, req.region, false);
// auto& entry =
// runtime::HierarchicalOverlayNetwork::getLocalService<index_service<data_item_type>>(addr.getLayer()).get(req.ref);
// entry.resize_fragment(req, req.region, false);

std::vector<hpx::future<void>> transfers;

Expand All @@ -160,6 +160,9 @@ namespace allscale { namespace data_item_manager {
for (auto const& part: info.regions)
{
if (part.first == addr.getRank()) continue;
#if defined(HPX_DEBUG)
if (part.first == std::size_t(-1)) continue;
#endif

hpx::id_type target(
hpx::naming::get_id_from_locality_id(part.first));
Expand Down
25 changes: 16 additions & 9 deletions allscale/data_item_manager/add_allowance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,22 @@ namespace allscale { namespace data_item_manager {
using lease_type = allscale::lease<data_item_type>;
using region_type = typename data_item_type::region_type;

HPX_ASSERT(!req.region.empty());

if (req.mode == access_mode::ReadOnly)
return;

auto& entry =
runtime::HierarchicalOverlayNetwork::getLocalService<index_service<data_item_type>>(addr).get(req.ref);
runtime::HierarchicalOverlayNetwork::getLocalService<index_service<data_item_type>>(addr.getLayer()).get(req.ref);
HPX_ASSERT(!req.region.empty());

entry.add_full(req.allowance);
bool isLeaf = addr.isLeaf();
if (req.mode == access_mode::ReadWrite)
{
entry.add_full(req.allowance);
if (isLeaf)
entry.resize_fragment(req, req.region, true);
}
else
{
if (isLeaf)
entry.resize_fragment(req, req.region, false);
}
}

template <typename Requirement>
Expand All @@ -48,7 +55,7 @@ namespace allscale { namespace data_item_manager {
return;

auto& entry =
runtime::HierarchicalOverlayNetwork::getLocalService<index_service<data_item_type>>(addr).get(req.ref);
runtime::HierarchicalOverlayNetwork::getLocalService<index_service<data_item_type>>(addr.getLayer()).get(req.ref);

req.allowance = entry.add_left(req.allowance, req.region);
}
Expand All @@ -66,7 +73,7 @@ namespace allscale { namespace data_item_manager {
return;

auto& entry =
runtime::HierarchicalOverlayNetwork::getLocalService<index_service<data_item_type>>(addr).get(req.ref);
runtime::HierarchicalOverlayNetwork::getLocalService<index_service<data_item_type>>(addr.getLayer()).get(req.ref);

req.allowance = entry.add_right(req.allowance, req.region);
}
Expand Down
2 changes: 1 addition & 1 deletion allscale/data_item_manager/check_write_requirements.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace allscale { namespace data_item_manager {


auto& entry =
runtime::HierarchicalOverlayNetwork::getLocalService<index_service<data_item_type>>(addr).get(req.ref);
runtime::HierarchicalOverlayNetwork::getLocalService<index_service<data_item_type>>(addr.getLayer()).get(req.ref);

return entry.check_write_requirement(req.region);
}
Expand Down
31 changes: 24 additions & 7 deletions allscale/data_item_manager/data_item_view.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <allscale/data_item_manager/fragment.hpp>
#include <hpx/util/annotated_function.hpp>

#include <hpx/util/debugging.hpp>

#include <memory>

namespace allscale { namespace data_item_manager {
Expand Down Expand Up @@ -56,7 +58,12 @@ namespace allscale { namespace data_item_manager {

auto& frag = *item.fragment;

frag.insert(reader);
bool insert = reader.read<bool>();

if (insert)
{
frag.insert(reader);
}
}
}

Expand All @@ -72,16 +79,26 @@ namespace allscale { namespace data_item_manager {
HPX_ASSERT(item.fragment);

auto& frag = *item.fragment;

if (!allscale::api::core::isSubRegion(region_, frag.getCoveredRegion()))
{
writer.write(false);
return;
// std::cout << "not subregion...\n";
// hpx::util::attach_debugger();
}

HPX_ASSERT(allscale::api::core::isSubRegion(region_, frag.getCoveredRegion()));

writer.write(true);
frag.extract(writer, region_);

// if (migrate_)
// {
// // Remove exclusive ownership
// region_type reserved = region_type::difference(frag.getCoveredRegion(), region_);
// frag.resize(reserved);
// }
if (migrate_)
{
// Remove exclusive ownership
region_type reserved = region_type::difference(frag.getCoveredRegion(), region_);
frag.resize(reserved);
}
}
}

Expand Down
29 changes: 18 additions & 11 deletions allscale/data_item_manager/get_missing_regions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,51 @@ namespace allscale { namespace data_item_manager {
namespace detail
{
template <typename Requirement>
void get_missing_regions(runtime::HierarchyAddress const& addr, Requirement& req)
bool get_missing_regions(runtime::HierarchyAddress const& addr, Requirement& req)
{
using data_item_type = typename Requirement::data_item_type;

HPX_ASSERT(!req.region.empty());

// read only never has unallocated allowances.
if (req.mode == access_mode::ReadOnly)
return;
return false;

auto& entry =
runtime::HierarchicalOverlayNetwork::getLocalService<index_service<data_item_type>>(addr).get(req.ref);
runtime::HierarchicalOverlayNetwork::getLocalService<index_service<data_item_type>>(addr.getLayer()).get(req.ref);

req.allowance = entry.get_missing_region(req.region);
bool unallocated = false;
req.allowance = entry.get_missing_region(req.region, unallocated);
return unallocated;
}

template <typename Requirement, typename RequirementAllocator>
void get_missing_regions(runtime::HierarchyAddress const& addr, std::vector<Requirement, RequirementAllocator>& reqs)
bool get_missing_regions(runtime::HierarchyAddress const& addr, std::vector<Requirement, RequirementAllocator>& reqs)
{
bool res = false;
for (auto& req: reqs)
{
get_missing_regions(addr, req);
if (get_missing_regions(addr, req))
res = true;
}
return res;
}

template <typename Requirements, std::size_t...Is>
void get_missing_regions(runtime::HierarchyAddress const& addr, Requirements& reqs,
bool get_missing_regions(runtime::HierarchyAddress const& addr, Requirements& reqs,
hpx::util::detail::pack_c<std::size_t, Is...>)
{
int sequencer[] = {0, (detail::get_missing_regions(addr, hpx::util::get<Is>(reqs)), 0)...};
(void)sequencer;
bool res[] = {detail::get_missing_regions(addr, hpx::util::get<Is>(reqs))...};
for (bool r: res) if(r) return true;
return false;
}
}

template <typename Requirements>
void
bool
get_missing_regions(runtime::HierarchyAddress const& addr, Requirements& reqs)
{
detail::get_missing_regions(addr, reqs,
return detail::get_missing_regions(addr, reqs,
typename hpx::util::detail::make_index_pack<
hpx::util::tuple_size<Requirements>::type::value>::type{});
}
Expand Down
Loading

0 comments on commit 1b5f87b

Please sign in to comment.