From 1b5f87b97fc299bf2f0f7caf0586e683c6011456 Mon Sep 17 00:00:00 2001 From: Thomas Heller Date: Mon, 29 Oct 2018 08:41:16 +0100 Subject: [PATCH] Towards a working load balancer... - Hierarchy Services - Switching to addressing scheme not containing NUMA domains - Data Item Manager - Fixing acquisition/removal of regions - Dashboard - Integrating backchannel --- allscale/components/monitor.hpp | 17 +- allscale/components/scheduler.hpp | 3 +- allscale/dashboard.hpp | 7 +- allscale/data_item_manager/acquire.hpp | 13 +- allscale/data_item_manager/add_allowance.hpp | 25 +- .../check_write_requirements.hpp | 2 +- allscale/data_item_manager/data_item_view.hpp | 31 +- .../data_item_manager/get_missing_regions.hpp | 29 +- allscale/data_item_manager/index_service.hpp | 47 ++- .../data_item_manager/task_requirements.hpp | 6 +- allscale/detail/work_item_impl.hpp | 9 + allscale/hierarchy.hpp | 90 ++--- allscale/monitor.hpp | 3 + allscale/optimizer.hpp | 42 +-- allscale/schedule_policy.hpp | 19 +- allscale/scheduler.hpp | 13 +- allscale/task_id.hpp | 29 +- allscale/task_times.hpp | 50 +++ src/CMakeLists.txt | 1 + src/components/monitor_component.cpp | 59 +++- src/components/resilience_component.cpp | 1 + src/components/scheduler_component.cpp | 117 +++---- src/dashboard.cpp | 98 +++++- src/hierarchy.cpp | 38 +-- src/monitor.cpp | 9 + src/optimizer.cpp | 69 ++-- src/schedule_policy.cpp | 268 ++++++++------- src/scheduler.cpp | 277 ++++++++++----- src/task_id.cpp | 4 +- src/task_times.cpp | 87 +++++ tests/unit/core/grid_init.cpp | 2 +- tests/unit/core/hierarchy.cpp | 316 +++++------------- tests/unit/core/schedule_policy.cpp | 309 ++++++++++++----- 33 files changed, 1252 insertions(+), 838 deletions(-) create mode 100644 allscale/task_times.hpp create mode 100644 src/task_times.cpp diff --git a/allscale/components/monitor.hpp b/allscale/components/monitor.hpp index df21ffe..813c4e9 100644 --- a/allscale/components/monitor.hpp +++ b/allscale/components/monitor.hpp @@ -22,6 +22,7 @@ #include #include #include +#include #include @@ -45,6 +46,7 @@ namespace allscale { namespace components { struct HPX_COMPONENT_EXPORT monitor : hpx::components::component_base { + typedef hpx::lcos::local::spinlock mutex_type; monitor() { @@ -63,6 +65,14 @@ namespace allscale { namespace components { // hpx::id_type get_left_neighbour() { return left_; } // hpx::id_type get_right_neighbour() { return right_; } + mutex_type task_times_mtx_; + task_times task_times_; + task_times last_task_times_; + std::chrono::high_resolution_clock::time_point last_task_times_sample_; + + void add_task_time(task_id::task_path const& path, task_times::time_t const& time); + + task_times get_task_times(); ///////////////////////////////////////////////////////////////////////////////////// /// Performance Data Introspection @@ -336,9 +346,9 @@ namespace allscale { namespace components { // // /// \returns Cpu load float get_cpu_load(); + double get_avg_task_duration(); private: - typedef hpx::lcos::local::spinlock mutex_type; // MONITOR MANAGEMENT // Measuring total execution time @@ -350,7 +360,9 @@ namespace allscale { namespace components { std::uint64_t num_localities_; mutex_type init_mutex; bool initialized = false; + public: bool enable_monitor; + private: // System parameters unsigned long long total_memory_; @@ -453,12 +465,12 @@ namespace allscale { namespace components { // hpx::id_type idle_rate_avg_counter_; // double idle_rate_avg_; -#ifdef REALTIME_VIZ // REALTIME VIZ std::mutex counter_mutex_; std::uint64_t num_active_tasks_; std::uint64_t total_tasks_; double total_task_duration_; +#ifdef REALTIME_VIZ hpx::id_type idle_rate_counter_; double idle_rate_; @@ -470,7 +482,6 @@ namespace allscale { namespace components { unsigned long long int sample_id_; bool sample_task_stats(); - double get_avg_task_duration(); #endif // HISTORICAL DATA diff --git a/allscale/components/scheduler.hpp b/allscale/components/scheduler.hpp index 5d5d165..7eed6e5 100644 --- a/allscale/components/scheduler.hpp +++ b/allscale/components/scheduler.hpp @@ -125,7 +125,8 @@ namespace allscale { namespace components { std::vector suspending_masks_; std::vector resuming_masks_; std::vector executors_; - std::atomic current_; + std::size_t numa_depth = 0; + std::size_t numa_start = 0; // This is the depth where we don't want to split anymore... std::size_t depth_cut_off_; diff --git a/allscale/dashboard.hpp b/allscale/dashboard.hpp index a28b530..a8b32a3 100644 --- a/allscale/dashboard.hpp +++ b/allscale/dashboard.hpp @@ -98,8 +98,10 @@ namespace allscale { namespace dashboard struct system_state { + HPX_EXPORT system_state(); + // the time this state was recorded - std::uint64_t time; + std::uint64_t time = 0; // -- multi-objective metrics -- @@ -112,6 +114,8 @@ namespace allscale { namespace dashboard // current power usage / max power usage on all nodes \in [0..1] float power = 0; + std::string policy; + // the overall system-wide score of the objective function float score = 0; @@ -124,6 +128,7 @@ namespace allscale { namespace dashboard }; void update(); + void get_commands(); void shutdown(); }} diff --git a/allscale/data_item_manager/acquire.hpp b/allscale/data_item_manager/acquire.hpp index 699a397..35d7570 100644 --- a/allscale/data_item_manager/acquire.hpp +++ b/allscale/data_item_manager/acquire.hpp @@ -79,8 +79,8 @@ namespace allscale { namespace data_item_manager { if (req.mode == access_mode::ReadWrite) { auto& entry = - runtime::HierarchicalOverlayNetwork::getLocalService>(addr).get(req.ref); - entry.resize_fragment(req, req.region, true); + runtime::HierarchicalOverlayNetwork::getLocalService>(addr.getLayer()).get(req.ref); +// entry.resize_fragment(req, req.region, true); region_type missing; { std::lock_guard l(entry.mtx_); @@ -145,9 +145,9 @@ namespace allscale { namespace data_item_manager { auto info = infof.get(); if (info.regions.empty()) return hpx::make_ready_future(); - auto& entry = - runtime::HierarchicalOverlayNetwork::getLocalService>(addr).get(req.ref); - entry.resize_fragment(req, req.region, false); +// auto& entry = +// runtime::HierarchicalOverlayNetwork::getLocalService>(addr.getLayer()).get(req.ref); +// entry.resize_fragment(req, req.region, false); std::vector> transfers; @@ -160,6 +160,9 @@ namespace allscale { namespace data_item_manager { for (auto const& part: info.regions) { if (part.first == addr.getRank()) continue; +#if defined(HPX_DEBUG) + if (part.first == std::size_t(-1)) continue; +#endif hpx::id_type target( hpx::naming::get_id_from_locality_id(part.first)); diff --git a/allscale/data_item_manager/add_allowance.hpp b/allscale/data_item_manager/add_allowance.hpp index 9f8e2f4..2e4e2ce 100644 --- a/allscale/data_item_manager/add_allowance.hpp +++ b/allscale/data_item_manager/add_allowance.hpp @@ -24,15 +24,22 @@ namespace allscale { namespace data_item_manager { using lease_type = allscale::lease; using region_type = typename data_item_type::region_type; - HPX_ASSERT(!req.region.empty()); - - if (req.mode == access_mode::ReadOnly) - return; - auto& entry = - runtime::HierarchicalOverlayNetwork::getLocalService>(addr).get(req.ref); + runtime::HierarchicalOverlayNetwork::getLocalService>(addr.getLayer()).get(req.ref); + HPX_ASSERT(!req.region.empty()); - entry.add_full(req.allowance); + bool isLeaf = addr.isLeaf(); + if (req.mode == access_mode::ReadWrite) + { + entry.add_full(req.allowance); + if (isLeaf) + entry.resize_fragment(req, req.region, true); + } + else + { + if (isLeaf) + entry.resize_fragment(req, req.region, false); + } } template @@ -48,7 +55,7 @@ namespace allscale { namespace data_item_manager { return; auto& entry = - runtime::HierarchicalOverlayNetwork::getLocalService>(addr).get(req.ref); + runtime::HierarchicalOverlayNetwork::getLocalService>(addr.getLayer()).get(req.ref); req.allowance = entry.add_left(req.allowance, req.region); } @@ -66,7 +73,7 @@ namespace allscale { namespace data_item_manager { return; auto& entry = - runtime::HierarchicalOverlayNetwork::getLocalService>(addr).get(req.ref); + runtime::HierarchicalOverlayNetwork::getLocalService>(addr.getLayer()).get(req.ref); req.allowance = entry.add_right(req.allowance, req.region); } diff --git a/allscale/data_item_manager/check_write_requirements.hpp b/allscale/data_item_manager/check_write_requirements.hpp index c58abb1..886cf63 100644 --- a/allscale/data_item_manager/check_write_requirements.hpp +++ b/allscale/data_item_manager/check_write_requirements.hpp @@ -33,7 +33,7 @@ namespace allscale { namespace data_item_manager { auto& entry = - runtime::HierarchicalOverlayNetwork::getLocalService>(addr).get(req.ref); + runtime::HierarchicalOverlayNetwork::getLocalService>(addr.getLayer()).get(req.ref); return entry.check_write_requirement(req.region); } diff --git a/allscale/data_item_manager/data_item_view.hpp b/allscale/data_item_manager/data_item_view.hpp index 9f3429f..60924ad 100644 --- a/allscale/data_item_manager/data_item_view.hpp +++ b/allscale/data_item_manager/data_item_view.hpp @@ -6,6 +6,8 @@ #include #include +#include + #include namespace allscale { namespace data_item_manager { @@ -56,7 +58,12 @@ namespace allscale { namespace data_item_manager { auto& frag = *item.fragment; - frag.insert(reader); + bool insert = reader.read(); + + if (insert) + { + frag.insert(reader); + } } } @@ -72,16 +79,26 @@ namespace allscale { namespace data_item_manager { HPX_ASSERT(item.fragment); auto& frag = *item.fragment; + + if (!allscale::api::core::isSubRegion(region_, frag.getCoveredRegion())) + { + writer.write(false); + return; +// std::cout << "not subregion...\n"; +// hpx::util::attach_debugger(); + } + HPX_ASSERT(allscale::api::core::isSubRegion(region_, frag.getCoveredRegion())); + writer.write(true); frag.extract(writer, region_); -// if (migrate_) -// { -// // Remove exclusive ownership -// region_type reserved = region_type::difference(frag.getCoveredRegion(), region_); -// frag.resize(reserved); -// } + if (migrate_) + { + // Remove exclusive ownership + region_type reserved = region_type::difference(frag.getCoveredRegion(), region_); + frag.resize(reserved); + } } } diff --git a/allscale/data_item_manager/get_missing_regions.hpp b/allscale/data_item_manager/get_missing_regions.hpp index 4cd2f65..b0c1c5c 100644 --- a/allscale/data_item_manager/get_missing_regions.hpp +++ b/allscale/data_item_manager/get_missing_regions.hpp @@ -18,44 +18,51 @@ namespace allscale { namespace data_item_manager { namespace detail { template - void get_missing_regions(runtime::HierarchyAddress const& addr, Requirement& req) + bool get_missing_regions(runtime::HierarchyAddress const& addr, Requirement& req) { using data_item_type = typename Requirement::data_item_type; HPX_ASSERT(!req.region.empty()); + // read only never has unallocated allowances. if (req.mode == access_mode::ReadOnly) - return; + return false; auto& entry = - runtime::HierarchicalOverlayNetwork::getLocalService>(addr).get(req.ref); + runtime::HierarchicalOverlayNetwork::getLocalService>(addr.getLayer()).get(req.ref); - req.allowance = entry.get_missing_region(req.region); + bool unallocated = false; + req.allowance = entry.get_missing_region(req.region, unallocated); + return unallocated; } template - void get_missing_regions(runtime::HierarchyAddress const& addr, std::vector& reqs) + bool get_missing_regions(runtime::HierarchyAddress const& addr, std::vector& reqs) { + bool res = false; for (auto& req: reqs) { - get_missing_regions(addr, req); + if (get_missing_regions(addr, req)) + res = true; } + return res; } template - void get_missing_regions(runtime::HierarchyAddress const& addr, Requirements& reqs, + bool get_missing_regions(runtime::HierarchyAddress const& addr, Requirements& reqs, hpx::util::detail::pack_c) { - int sequencer[] = {0, (detail::get_missing_regions(addr, hpx::util::get(reqs)), 0)...}; - (void)sequencer; + bool res[] = {detail::get_missing_regions(addr, hpx::util::get(reqs))...}; + for (bool r: res) if(r) return true; + return false; } } template - void + bool get_missing_regions(runtime::HierarchyAddress const& addr, Requirements& reqs) { - detail::get_missing_regions(addr, reqs, + return detail::get_missing_regions(addr, reqs, typename hpx::util::detail::make_index_pack< hpx::util::tuple_size::type::value>::type{}); } diff --git a/allscale/data_item_manager/index_service.hpp b/allscale/data_item_manager/index_service.hpp index f5aab0e..e2e3bfd 100644 --- a/allscale/data_item_manager/index_service.hpp +++ b/allscale/data_item_manager/index_service.hpp @@ -96,20 +96,27 @@ namespace allscale { namespace data_item_manager { return region_type::intersect(region, unallocated); } - region_type get_missing_region(region_type const& region) const + region_type get_missing_region(region_type const& region, bool& unallocated) const { if (region.empty()) return {}; std::lock_guard l(mtx_); region_type managed_unallocated = get_managed_unallocated(region); - if (!managed_unallocated.empty()) return managed_unallocated; + if (!managed_unallocated.empty()) + { + unallocated = true; + return managed_unallocated; + } + + unallocated = false; return region_type::difference(region, full_); } bool check_write_requirement(region_type const& region) const { - return get_missing_region(region).empty(); + bool unallocated = false; + return get_missing_region(region, unallocated).empty(); } template @@ -127,12 +134,10 @@ namespace allscale { namespace data_item_manager { region_type reserved = region_type::merge(frag.getCoveredRegion(), region); // resize fragment... frag.resize(reserved); - // update ownership - if (exclusive) - item.exclusive = region_type::merge( - item.exclusive, - region_type::intersect(frag.getCoveredRegion(), region)); } + // update ownership + if (exclusive) + item.exclusive = region_type::merge(item.exclusive, region); } } @@ -517,7 +522,6 @@ namespace allscale { namespace data_item_manager { #endif } - return hpx::make_ready_future(std::move(info)); } @@ -571,18 +575,10 @@ namespace allscale { namespace data_item_manager { // First, recurse to right child for better overlap. if (!remote_regions[1].empty()) { - if (service_->right_id_) - { - remote_infos.push_back( - hpx::async>( - service_->right_id_, service_->right_, ref, std::move(remote_regions[1]))); - } - else - { - remote_infos.push_back( - data_item_manager::collect_child_ownerships( - service_->right_, ref, std::move(remote_regions[1]))); - } + HPX_ASSERT(service_->right_id_); + remote_infos.push_back( + hpx::async>( + service_->right_id_, service_->right_, ref, std::move(remote_regions[1]))); } if (!remote_regions[0].empty()) { @@ -833,8 +829,7 @@ namespace allscale { namespace data_item_manager { : here_(here) , parent_(here_.getParent()) , is_root_(here_ == runtime::HierarchyAddress::getRootOfNetworkSize( - allscale::get_num_numa_nodes(), allscale::get_num_localities() - )) + allscale::get_num_localities())) { if (parent_.getRank() != scheduler::rank()) { @@ -903,7 +898,7 @@ namespace allscale { namespace data_item_manager { using data_item_type = typename Requirement::data_item_type; return runtime::HierarchicalOverlayNetwork::getLocalService< - index_service>(addr).get(req.ref).locate(req); + index_service>(addr.getLayer()).get(req.ref).locate(req); } @@ -915,7 +910,7 @@ namespace allscale { namespace data_item_manager { { using data_item_type = typename DataItemReference::data_item_type; return runtime::HierarchicalOverlayNetwork::getLocalService< - index_service>(addr).get(ref).acquire_ownership_for( + index_service>(addr.getLayer()).get(ref).acquire_ownership_for( child, ref, std::move(missing)); } @@ -926,7 +921,7 @@ namespace allscale { namespace data_item_manager { { using data_item_type = typename DataItemReference::data_item_type; return runtime::HierarchicalOverlayNetwork::getLocalService< - index_service>(addr).get(ref).collect_child_ownerships(ref, std::move(region)); + index_service>(addr.getLayer()).get(ref).collect_child_ownerships(ref, std::move(region)); } template diff --git a/allscale/data_item_manager/task_requirements.hpp b/allscale/data_item_manager/task_requirements.hpp index 6fa86d3..174dc90 100644 --- a/allscale/data_item_manager/task_requirements.hpp +++ b/allscale/data_item_manager/task_requirements.hpp @@ -44,7 +44,7 @@ namespace allscale { namespace data_item_manager { virtual void show() const = 0; virtual bool check_write_requirements(hierarchy_address const&) const = 0; - virtual void get_missing_regions(hierarchy_address const&) = 0; + virtual bool get_missing_regions(hierarchy_address const&) = 0; virtual void add_allowance(hierarchy_address const&) const= 0; virtual void add_allowance_left(hierarchy_address const&)= 0; virtual void add_allowance_right(hierarchy_address const&)= 0; @@ -90,9 +90,9 @@ namespace allscale { namespace data_item_manager { return data_item_manager::check_write_requirements(addr, process_requirements_); } - void get_missing_regions(hierarchy_address const& addr) final + bool get_missing_regions(hierarchy_address const& addr) final { - data_item_manager::get_missing_regions(addr, process_requirements_); + return data_item_manager::get_missing_regions(addr, process_requirements_); } void add_allowance(hierarchy_address const& addr) const final diff --git a/allscale/detail/work_item_impl.hpp b/allscale/detail/work_item_impl.hpp index cfd4376..9f9fe2c 100644 --- a/allscale/detail/work_item_impl.hpp +++ b/allscale/detail/work_item_impl.hpp @@ -340,9 +340,14 @@ namespace allscale { namespace detail { work_item(this_)); this_work_item::set s(*this_); + auto begin = std::chrono::high_resolution_clock::now(); + auto work_res = WorkItemDescription::process_variant::execute(closure_); + auto end = std::chrono::high_resolution_clock::now(); + monitor::add_task_time(this->id().path, std::chrono::duration_cast(end - begin)); + finalize(std::move(this_), std::move(work_res), std::move(reqs), false); } @@ -360,8 +365,12 @@ namespace allscale { namespace detail { work_item(this_)); this_work_item::set s(*this_); + auto begin = std::chrono::high_resolution_clock::now(); WorkItemDescription::process_variant::execute(closure_); + auto end = std::chrono::high_resolution_clock::now(); + monitor::add_task_time(this->id().path, std::chrono::duration_cast(end - begin)); + finalize(std::move(this_), hpx::util::unused_type(), std::move(reqs), false); } diff --git a/allscale/hierarchy.hpp b/allscale/hierarchy.hpp index 628c0dc..2790cf5 100644 --- a/allscale/hierarchy.hpp +++ b/allscale/hierarchy.hpp @@ -29,17 +29,14 @@ namespace runtime { // the addressed rank std::size_t rank; - std::size_t numa_node; // the addressed layer layer_t layer; public: - static HPX_EXPORT std::size_t numaCutOff; - // creates a new address targeting the given rank on the given node - HierarchyAddress(std::size_t rank = 0, std::size_t numa_node = 0, layer_t layer = 0) - : rank(rank), numa_node(numa_node), layer(layer) { + HierarchyAddress(std::size_t rank = 0, layer_t layer = 0) + : rank(rank), layer(layer) { // assert that the provided combination is valid assert_true(check()); } @@ -51,10 +48,6 @@ namespace runtime { return rank; } - std::size_t getNumaNode() const { - return numa_node; - } - // obtains the layer on the hosting node this address is referencing layer_t getLayer() const { return layer; @@ -77,8 +70,7 @@ namespace runtime { // determines whether this node is the right child of its parent bool isRightChild() const { - return getParent().getRightChild() == *this; -// return rank & (1<(const HierarchyAddress& other) const { - return std::tie(rank, numa_node, layer) > std::tie(other.rank, other.numa_node, other.layer); + return std::tie(rank, layer) > std::tie(other.rank, other.layer); } bool operator>=(const HierarchyAddress& other) const { - return std::tie(rank, numa_node, layer) >= std::tie(other.rank, other.numa_node, other.layer); + return std::tie(rank, layer) >= std::tie(other.rank, other.layer); } bool operator<=(const HierarchyAddress& other) const { - return std::tie(rank, numa_node, layer) <= std::tie(other.rank, other.numa_node, other.layer); + return std::tie(rank, layer) <= std::tie(other.rank, other.layer); } // add printer support @@ -170,7 +150,7 @@ namespace runtime { // --- utilities --- // computes the number of layers present on the given rank within a network of the given size - static HPX_EXPORT layer_t getLayersOn(std::size_t rank, std::size_t numa_node, std::size_t numa_size, std::size_t size); + static HPX_EXPORT layer_t getLayersOn(std::size_t rank, std::size_t size); private: @@ -188,7 +168,7 @@ namespace runtime { class HierarchyService { // the locally running services (one instance for each layer) - std::vector> services; + std::vector services; public: @@ -196,43 +176,36 @@ namespace runtime { template void init(const Args& ... args) { // start up services - std::size_t num_numa_nodes = allscale::get_num_numa_nodes(); std::size_t num_localities = allscale::get_num_localities(); std::size_t locality_id = hpx::get_locality_id(); + auto numServices = HierarchyAddress::getLayersOn(locality_id,num_localities); + // Calculate the total number of local services - services.resize(num_numa_nodes); - for (std::size_t numa_node = 0; numa_node != num_numa_nodes; ++numa_node) + services.reserve(num_localities); + for(layer_t i=0; i< numServices; i++) { - auto numServices = HierarchyAddress::getLayersOn( - locality_id, numa_node, num_numa_nodes, num_localities); - - services[numa_node].reserve(numServices); - for(layer_t i=0; i< numServices; i++) - { - services[numa_node].emplace_back( - HierarchyAddress(locality_id, numa_node, i), args...); - } + services.emplace_back( + HierarchyAddress(locality_id, i), args...); } hpx::lcos::barrier::synchronize(); } // retrieves a service instance - Service& get(std::size_t numa_node, layer_t layer) { - assert_lt(numa_node,services.size()); - assert_lt(layer,services[numa_node].size()); - return services[numa_node][layer]; + Service& get(layer_t layer) { + assert_lt(layer,services.size()); + return services[layer]; } // applies an operation on all local services template - void forAll(const Op& op) { - for(auto& numa_services : services) { - for(auto& cur : numa_services) { - op(cur); - } + void forAll(const Op& op) + { + for(auto& cur : services) + { + op(cur); } } @@ -262,7 +235,7 @@ namespace runtime { */ HierarchyAddress getRootAddress() const { return HierarchyAddress::getRootOfNetworkSize( - allscale::get_num_numa_nodes(), allscale::get_num_localities()); + allscale::get_num_localities()); } /** @@ -277,11 +250,10 @@ namespace runtime { * Obtains a reference to a locally running service instance. */ template - static S& getLocalService(HierarchyAddress const& addr) + static S& getLocalService(layer_t layer = 0) { - assert_eq(addr.getRank(), hpx::get_locality_id()); auto& s = getService(); - return s.get(addr.getNumaNode(), addr.getLayer()); + return s.get(layer); } /** diff --git a/allscale/monitor.hpp b/allscale/monitor.hpp index 1251921..285ed9e 100644 --- a/allscale/monitor.hpp +++ b/allscale/monitor.hpp @@ -3,6 +3,7 @@ #define ALLSCALE_MONITOR_HPP #include +#include #include #include @@ -38,9 +39,11 @@ namespace allscale { static void connect(event e, event_function f); static void disconnect(event e, event_function f); static HPX_EXPORT void signal(event e, work_item const& w); + static HPX_EXPORT void add_task_time(task_id::task_path const& path, task_times::time_t const& time); static components::monitor & get(); static components::monitor *get_ptr(); static HPX_EXPORT void stop(); + static bool enabled; private: // static std::shared_ptr & get_ptr(); diff --git a/allscale/optimizer.hpp b/allscale/optimizer.hpp index 9200c22..d044e70 100644 --- a/allscale/optimizer.hpp +++ b/allscale/optimizer.hpp @@ -3,6 +3,7 @@ #define ALLSCALE_OPTIMIZER_HPP #include +#include #include #include @@ -11,6 +12,19 @@ #include namespace allscale { + struct optimizer_state + { + float load_; + task_times task_times_; + + template + void serialize(Archive& ar, unsigned) + { + ar & load_; + ar & task_times_; + } + }; + /** * A class to model user-defined tuning objectives. * @@ -63,30 +77,6 @@ namespace allscale { tuning_objective get_default_objective(); - struct optimizer_state - { - optimizer_state() : load(1.f), active_frequency(1000.f), cores_per_node(1) - {} - - optimizer_state(float l, float freq, std::size_t cores) - : load(l) - , active_frequency(freq) - , cores_per_node(cores) - {} - - float load; - float active_frequency; - std::size_t cores_per_node; - - template - void serialize(Archive& ar, unsigned) - { - ar & load; - ar & active_frequency; - ar & cores_per_node; - } - }; - struct global_optimizer { global_optimizer(); @@ -104,7 +94,7 @@ namespace allscale { hpx::future balance(bool); private: - void tune(std::vector const& state); + void tune(std::vector const& state); std::size_t num_active_nodes_; float active_frequency_; @@ -123,6 +113,4 @@ namespace allscale { }; } -HPX_IS_BITWISE_SERIALIZABLE(allscale::optimizer_state); - #endif diff --git a/allscale/schedule_policy.hpp b/allscale/schedule_policy.hpp index 5f67cfb..dde3850 100644 --- a/allscale/schedule_policy.hpp +++ b/allscale/schedule_policy.hpp @@ -99,7 +99,10 @@ namespace allscale { * @param N the number of nodes to distribute work on * @param granularity the negative exponent of the acceptable load imbalance; e.g. 0 => 2^0 = 100%, 5 => 2^-5 = 3.125% */ - static HPX_EXPORT std::unique_ptr create_uniform(int N, int M, int granularity); + static HPX_EXPORT std::unique_ptr create_uniform(std::vector const& mask, int granularity); + static HPX_EXPORT std::unique_ptr create_uniform(std::vector const& mask); + + static HPX_EXPORT std::unique_ptr create_uniform(int N, int granularity); /** * Creates a scheduling policy distributing work uniformly among the given number of nodes. The @@ -107,7 +110,7 @@ namespace allscale { * * @param N the number of nodes to distribute work on */ - static HPX_EXPORT std::unique_ptr create_uniform(int N, int M); + static HPX_EXPORT std::unique_ptr create_uniform(int N); /** * Creates an updated load balancing policy based on a given policy and a measured load distribution. @@ -117,7 +120,10 @@ namespace allscale { * @param loadDistribution the load distribution measured, utilized for weighting tasks. Ther must be one entry per node, * no entry must be negative. */ - static HPX_EXPORT std::unique_ptr create_rebalanced(const scheduling_policy& old, const std::vector& load, std::vector const& mask); + static HPX_EXPORT std::unique_ptr create_rebalanced(const scheduling_policy& old, const std::vector& load); + + static HPX_EXPORT std::unique_ptr create_rebalanced(const scheduling_policy& old, const std::vector& load, std::vector const& mask); + static HPX_EXPORT std::unique_ptr create_rebalanced(const scheduling_policy& old, task_times const& times, std::vector const& mask); // --- observer --- @@ -131,6 +137,11 @@ namespace allscale { return tree_; } + int get_granularity() const + { + return granularity_; + } + // retrieves the task distribution pattern this tree is realizing HPX_EXPORT std::vector task_distribution_mapping() const; @@ -166,13 +177,13 @@ namespace allscale { // ar & granularity_; // ar & tree_; // } - private: tree_scheduling_policy(runtime::HierarchyAddress root, int granularity, decision_tree&& tree) : root_(root) , granularity_(granularity) , tree_(std::move(tree)) {} + private: const runtime::HierarchyAddress& getPresumedRootAddress() const { return root_; } diff --git a/allscale/scheduler.hpp b/allscale/scheduler.hpp index 50fb86f..fd819dc 100644 --- a/allscale/scheduler.hpp +++ b/allscale/scheduler.hpp @@ -38,8 +38,11 @@ namespace allscale scheduler(std::size_t rank); static HPX_EXPORT std::size_t rank(); + static HPX_EXPORT hpx::future toggle_node(std::size_t locality_id); + static HPX_EXPORT hpx::future set_policy(std::string policy); + static HPX_EXPORT std::string policy(); - static HPX_EXPORT void update_policy(std::vector const& state, std::vector mask); + static HPX_EXPORT void update_policy(task_times const& times, std::vector mask); static HPX_EXPORT void schedule(work_item&& work); static HPX_EXPORT components::scheduler* run(std::size_t rank); @@ -47,11 +50,19 @@ namespace allscale static components::scheduler* get_ptr(); static components::scheduler & get(); + + static bool active(); + + static void toggle_active(bool toggle = true); + private: typedef hpx::lcos::local::spinlock mutex_type; static HPX_EXPORT void partition_resources(hpx::resource::partitioner& rp); + static mutex_type active_mtx_; + static bool active_; + std::shared_ptr component_; }; diff --git a/allscale/task_id.hpp b/allscale/task_id.hpp index 9fb11ea..f9a5cd2 100644 --- a/allscale/task_id.hpp +++ b/allscale/task_id.hpp @@ -18,10 +18,33 @@ namespace allscale { std::uint64_t locality_id; std::uint64_t id; task_path path; + mutable std::uint8_t numa_depth = 0; + mutable std::uint8_t numa_node_ = std::uint8_t(-1); // This is used for monitoring... std::shared_ptr profile; + std::size_t numa_node(std::size_t numa_d, std::size_t cut_off) const + { + if (depth() < cut_off) return 0; + + if (numa_node_ == std::uint8_t(-1)) + { + numa_depth = numa_d; + numa_node_ = 0; + } + else + { + if (numa_depth > 0) + { + --numa_depth; + numa_node_ = numa_node_ * 2 + (path.getPath() & 0x1); + } + } + + return numa_node_; + } + template void serialize(Archive& ar, unsigned) { @@ -49,12 +72,12 @@ namespace allscale { task_id left_child() const { - return {locality_id, id, path.getLeftChildPath(), nullptr}; + return {locality_id, id, path.getLeftChildPath(), numa_depth, numa_node_, nullptr}; } task_id right_child() const { - return {locality_id, id, path.getRightChildPath(), nullptr}; + return {locality_id, id, path.getRightChildPath(), numa_depth, numa_node_, nullptr}; } std::size_t depth() const @@ -105,7 +128,7 @@ namespace allscale { friend std::ostream& operator<<(std::ostream& os, task_id const& id) { - return os << "T-" << id.locality_id << "." << id.id << id.path; + return os << "T-" << id.locality_id << "." << id.id << id.path << '(' << id.path.getPath() << ',' << +id.path.getLength() << ')'; } friend std::string to_string(task_id const& id); diff --git a/allscale/task_times.hpp b/allscale/task_times.hpp new file mode 100644 index 0000000..8c1f881 --- /dev/null +++ b/allscale/task_times.hpp @@ -0,0 +1,50 @@ + +#ifndef ALLSCALE_TASK_TIMES_HPP +#define ALLSCALE_TASK_TIMES_HPP + +#include + +#include +#include +#include + +namespace allscale { + struct task_times + { + using time_t = std::chrono::nanoseconds; + + std::vector times; + public: + + task_times(); + + void add(task_id const& id, time_t const& time); + void add(task_id::task_path const& path, time_t const& time); + + float get_time(task_id::task_path const& path) const; + + task_times& operator+=(task_times const& other); + task_times operator-(task_times const& other); + task_times operator/(float f); + + template + void serialize(Archive& ar, unsigned) + { + ar & times; + } + + friend std::ostream& operator<<(std::ostream& os, task_times const& times) + { + os << '['; + for (std::size_t i = 0; i != times.times.size()-1; ++i) + { + os << times.times[i] << ", "; + } + os << times.times.back() << ']'; + + return os; + } + }; +} + +#endif diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index df4cb3e..398cae9 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -11,6 +11,7 @@ set(_srcs scheduler.cpp schedule_policy.cpp task_id.cpp + task_times.cpp this_work_item.cpp treeture.cpp optimizer.cpp diff --git a/src/components/monitor_component.cpp b/src/components/monitor_component.cpp index 69745e6..9b9de4a 100644 --- a/src/components/monitor_component.cpp +++ b/src/components/monitor_component.cpp @@ -51,7 +51,8 @@ namespace allscale { namespace components { monitor::monitor(std::uint64_t rank) - : rank_(rank) + : last_task_times_sample_(std::chrono::high_resolution_clock::now()) + , rank_(rank) , num_localities_(0) , enable_monitor(true) , total_memory_(0) @@ -86,9 +87,10 @@ namespace allscale { namespace components { , bytes_recv_(0) , cpu_load_(0.0) //#endif -#ifdef REALTIME_VIZ , num_active_tasks_(0) , total_tasks_(0) + , total_task_duration_(0.0) +#ifdef REALTIME_VIZ , realtime_viz(0) , sample_id_(0) , timer_( @@ -112,6 +114,34 @@ namespace allscale { namespace components { } + void monitor::add_task_time(task_id::task_path const& path, task_times::time_t const& time) + { + if (!enable_monitor) return; + + std::lock_guard l(task_times_mtx_); +// if (hpx::get_locality_id() == 0) +// { +// std::cout << path.getPath() << " " << time.count() << "\n"; +// } + task_times_.add(path, time); + } + + task_times monitor::get_task_times() + { + if (!enable_monitor) return task_times{}; + + std::lock_guard l(task_times_mtx_); + auto now = std::chrono::high_resolution_clock::now(); + + // normalize to one second + auto interval = std::chrono::duration_cast(now - last_task_times_sample_); + auto res = (task_times_ - last_task_times_) / (interval.count() * 1e-9f); + + last_task_times_sample_ = now; + last_task_times_ = task_times_; + return res; + } + #ifdef REALTIME_VIZ bool monitor::sample_task_stats() { @@ -136,7 +166,7 @@ namespace allscale { namespace components { // << "Average time per task: " << get_avg_task_duration() << "IDLE RATE: " << idle_rate_ << std::endl; return true; } - +#endif double monitor::get_avg_task_duration() { @@ -144,7 +174,6 @@ namespace allscale { namespace components { else return total_task_duration_/(double)total_tasks_; } -#endif std::uint64_t monitor::get_timestamp( void ) { @@ -554,15 +583,11 @@ namespace allscale { namespace components { std::shared_ptr stats; auto my_wid = w.id(); -#ifdef REALTIME_VIZ - if(realtime_viz) { - // Global task stats + + { std::unique_lock lock2(counter_mutex_); - total_tasks_++; num_active_tasks_--; total_task_duration_ += p->get_exclusive_time(); - lock2.unlock(); } -#endif #ifdef HAVE_PAPI @@ -1259,6 +1284,13 @@ namespace allscale { namespace components { double monitor::get_avg_idle_rate() { + auto now = std::chrono::steady_clock::now(); + std::chrono::duration time_elapsed = + std::chrono::duration_cast>(now - execution_start); + + return get_avg_task_duration() / time_elapsed.count(); + + /* hpx::performance_counters::counter_value idle_avg_value; idle_avg_value = hpx::performance_counters::stubs::performance_counter::get_value( @@ -1266,8 +1298,8 @@ namespace allscale { namespace components { return idle_avg_value.get_value() * 0.01; -*/ return 0.0; +*/ } double monitor::get_avg_idle_rate_remote(hpx::id_type locality) @@ -2171,13 +2203,10 @@ namespace allscale { namespace components { if (rank_ == 0) { dashboard::update(); + dashboard::get_commands(); } } - - - - }} //HPX_REGISTER_ACTION(allscale::components::monitor::get_my_rank_action, get_my_rank_action); diff --git a/src/components/resilience_component.cpp b/src/components/resilience_component.cpp index e30a295..d9a4ebf 100644 --- a/src/components/resilience_component.cpp +++ b/src/components/resilience_component.cpp @@ -102,6 +102,7 @@ namespace allscale { namespace components { rank_running_.resize(num_localities, true); } + localities.reserve(num_localities); for(std::size_t i = 0; i < num_localities; i++) { hpx::shared_future locality_future = diff --git a/src/components/scheduler_component.cpp b/src/components/scheduler_component.cpp index 8c534a7..b6db47c 100644 --- a/src/components/scheduler_component.cpp +++ b/src/components/scheduler_component.cpp @@ -90,7 +90,7 @@ scheduler::scheduler(std::uint64_t rank) uselopt(false) { allscale_monitor = &allscale::monitor::get(); - thread_times.resize(hpx::get_os_thread_count()); + thread_times.resize(os_thread_count); #ifdef DEBUG_ std::cout << "DEBUG_ is defined" << std::endl << std::flush; @@ -120,7 +120,7 @@ std::size_t scheduler::get_num_numa_nodes() { numa_nodes = topo_->get_number_of_sockets(); std::vector node_masks(numa_nodes); - std::size_t num_os_threads = hpx::get_os_thread_count(); + std::size_t num_os_threads = os_thread_count; for (std::size_t num_thread = 0; num_thread != num_os_threads; ++num_thread) { std::size_t pu_num = rp_->get_pu_num(num_thread); std::size_t numa_node = topo_->get_numa_node_number(pu_num); @@ -164,7 +164,7 @@ std::size_t scheduler::get_num_numa_cores(std::size_t domain) { std::vector node_masks(numa_nodes); std::size_t res = 0; - std::size_t num_os_threads = hpx::get_os_thread_count(); + std::size_t num_os_threads = os_thread_count; for (std::size_t num_thread = 0; num_thread != num_os_threads; num_thread++) { std::size_t pu_num = rp_->get_pu_num(num_thread); std::size_t numa_node = topo_->get_numa_node_number(pu_num); @@ -197,6 +197,8 @@ void scheduler::init() { std::vector objectives_priorities; int objectives_priority_idx=0; + std::size_t num_localities = allscale::get_num_localities(); + std::unique_lock l(resize_mtx_); hpx::util::ignore_while_checking> il(&l); if (initialized_) @@ -212,21 +214,16 @@ void scheduler::init() { rp_ = &hpx::resource::get_partitioner(); topo_ = &hpx::threads::get_topology(); - std::size_t num_cores = 0; - for(std::size_t i = 0; i < allscale::get_num_numa_nodes(); ++i) - { - num_cores += get_num_numa_cores(i) + 1; - } - if (num_cores == 1) depth_cut_off_ = 1; - else - { - depth_cut_off_ = - std::ceil( - std::log2( - num_cores * allscale::get_num_localities() - ) - ); - } + std::size_t num_cores = os_thread_count; + + depth_cut_off_ = + std::ceil( + std::log2( + (num_cores + 2) * allscale::get_num_localities() + ) + ); + +// std::cout << "init: " << num_cores << " " << allscale::get_num_localities() << " " << depth_cut_off_ << '\n'; // Reading user provided options in terms of desired optimization objectives std::string input_objective_str = @@ -375,6 +372,9 @@ void scheduler::init() { auto const &numa_domains = rp_->numa_domains(); executors_.reserve(numa_domains.size()); thread_pools_.reserve(numa_domains.size()); + numa_start = runtime::HierarchyAddress::getLayersOn(0, num_localities); + numa_depth = std::floor(std::log2(numa_domains.size())); +// std::cout << "got numa depth: " << numa_depth << '\n'; for (std::size_t domain = 0; domain < numa_domains.size(); ++domain) { std::string pool_name; @@ -837,7 +837,8 @@ std::pair> nr_tasks_scheduled++; - std::size_t numa_node = addr.getNumaNode(); + // FIXME!!!! + std::size_t numa_node = work.id().numa_node(numa_depth, numa_start); if (do_split(work, numa_node)) { @@ -876,12 +877,6 @@ std::pair> } }; - if (acquired.is_ready()) - { - f_split(); - return std::make_pair(work_item(), std::unique_ptr()); - } - state->set_on_completed(std::move(f_split)); return std::make_pair(work_item(), std::unique_ptr()); } @@ -890,6 +885,17 @@ std::pair> if (!addr.isLeaf()) return std::make_pair(std::move(work), std::move(reqs)); reqs->add_allowance(addr); + +// { +// static hpx::lcos::local::spinlock mtx; +// mtx.lock(); +// std::cout << work.name() << " " << work.id() << " " << addr << " " +// << runtime::HierarchyAddress::getLayersOn(0, num) +// << " " << numa_node << " " << depth_cut_off_ << '\n'; +// mtx.unlock(); +// } + + hpx::future acquired = reqs->acquire_process(addr); typename hpx::traits::detail::shared_state_ptr_for< hpx::future>::type const &state = @@ -927,7 +933,8 @@ bool scheduler::do_split(work_item const &w, std::size_t numa_node) { // Check if we reached the required depth // FIXME: make the cut off runtime configurable... // FIXME:!!!!!!! - if (w.id().depth() < depth_cut_off_) { + if (w.id().depth() < depth_cut_off_) + { // std::cout << " // FIXME: add more elaborate splitting criterions return true; @@ -1104,28 +1111,13 @@ unsigned int scheduler::suspend_threads(std::size_t suspendthreads) { // Setting the default thread depths of the NUMA domain { - std::size_t num_cores = 0; - for(std::size_t i = 0; i < allscale::get_num_numa_nodes(); ++i) - { - if (i == pool_idx) - { - num_cores = get_num_numa_cores(pool_idx) - suspend_threads.size() + 1; - } - else - { - num_cores += get_num_numa_cores(i) + 1; - } - } - if (num_cores == 1) depth_cut_off_ = 1; - else - { - depth_cut_off_ = - std::ceil( - std::log2( - num_cores * allscale::get_num_localities() - ) - ); - } + std::size_t num_cores = hpx::get_os_thread_count() - suspend_threads.size(); + depth_cut_off_ = + std::ceil( + std::log2( + (num_cores + 2) * allscale::get_num_localities() + ) + ); } #ifdef MEASURE_ update_active_osthreads(-1 * suspend_threads.size()); @@ -1299,28 +1291,13 @@ unsigned int scheduler::resume_threads(std::size_t resumethreads) { } // Setting the default thread depths of the NUMA domain { - std::size_t num_cores = 0; - for(std::size_t i = 0; i < allscale::get_num_numa_nodes(); ++i) - { - if (i == pool_idx) - { - num_cores = get_num_numa_cores(pool_idx) + resume_threads.size() + 1; - } - else - { - num_cores += get_num_numa_cores(i) + 1; - } - } - if (num_cores == 1) depth_cut_off_ = 1; - else - { - depth_cut_off_ = - std::ceil( - std::log2( - num_cores * allscale::get_num_localities() - ) - ); - } + std::size_t num_cores = hpx::get_os_thread_count() + resume_threads.size(); + depth_cut_off_ = + std::ceil( + std::log2( + (num_cores + 2) * allscale::get_num_localities() + ) + ); } #ifdef MEASURE_ update_active_osthreads(resume_threads.size()); diff --git a/src/dashboard.cpp b/src/dashboard.cpp index 02f830d..eae2071 100644 --- a/src/dashboard.cpp +++ b/src/dashboard.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #ifdef ALLSCALE_HAVE_CPUFREQ #include #endif @@ -26,12 +27,12 @@ namespace allscale { namespace dashboard node_state get_state() { node_state state; - allscale::components::monitor *monitor_c = &allscale::monitor::get(); // state.rank = hpx::get_locality_id(); state.online = true; - state.active = true; + state.active = scheduler::active(); + allscale::components::monitor *monitor_c = &allscale::monitor::get(); // FIXME: add proper metrics here... state.num_cores = monitor_c->get_num_cpus(); @@ -44,6 +45,7 @@ namespace allscale { namespace dashboard state.task_throughput = monitor_c->get_throughput(); state.weighted_task_throughput = monitor_c->get_weighted_throughput(); state.idle_rate = (monitor_c->get_idle_rate())/100; + state.network_in = monitor_c->get_network_in(); state.network_out = monitor_c->get_network_out(); @@ -103,7 +105,7 @@ namespace allscale { namespace dashboard std::string res; res += "{\"id\":" + std::to_string(rank) + ','; res += "\"time\":" + std::to_string(time) + ','; - res += "\"state\":\"" + (online ? (active ? std::string("active\",") : "stand-by\",") : "offline\""); + res += "\"state\":\"" + (online ? (active ? std::string("active\",") : "standby\",") : "offline\""); if (!online) { res += '}'; return res; @@ -130,6 +132,10 @@ namespace allscale { namespace dashboard return res; } + system_state::system_state() + : policy(scheduler::policy()) + {} + std::string system_state::to_json() const { std::string res; @@ -138,7 +144,13 @@ namespace allscale { namespace dashboard res += "\"speed\":" + std::to_string(speed) + ','; res += "\"efficiency\":" + std::to_string(efficiency) + ','; res += "\"power\":" + std::to_string(power) + ','; + res += "\"objective_exponent\": {"; + res += "\"speed\": 1.0,"; + res += "\"efficiency\": 1.0,"; + res += "\"power\": 1.0"; + res += "},"; res += "\"score\":" + std::to_string(score) + ','; + res += "\"scheduler\": \"" + policy + "\","; res += "\"nodes\":["; for (auto & node: nodes) { @@ -247,6 +259,14 @@ namespace allscale { namespace dashboard std::size_t msg_size; }; + struct command + { + command() : size_(0) {} + + std::uint64_t size_; + std::vector command_; + }; + template void write(system_state& state, F f) { @@ -281,6 +301,70 @@ namespace allscale { namespace dashboard ); } + void get_commands() + { + std::shared_ptr cmd = std::make_shared(); + + boost::asio::async_read(socket_, boost::asio::buffer(&cmd->size_, sizeof(std::uint64_t)), + [cmd, this](boost::system::error_code ec, std::size_t /*length*/) mutable + { + if (ec) + { + std::cerr << "Read failed...\n"; + return; + } + + std::cout << "got message: " << cmd->size_ << " " << be64toh(cmd->size_) << '\n'; + + cmd->command_.resize(be64toh(cmd->size_)); + auto buffer = boost::asio::buffer(cmd->command_); + boost::asio::async_read(socket_, buffer, + [cmd, this](boost::system::error_code ec, std::size_t /*length*/) mutable + { + auto cmd_it = std::find(cmd->command_.begin(), cmd->command_.end(), ' '); + + std::string command(cmd->command_.begin(), cmd_it); + std::string payload(cmd_it + 1, cmd->command_.end()); + + hpx::future cmd_fut; + + if (command == "set_scheduler") + { + std::cerr << "Setting scheduler policy to " << payload << '\n'; + cmd_fut = scheduler::set_policy(payload); + } + else if (command == "toggle_node") + { + std::size_t locality_id = std::stol(payload); + std::cerr << "Toggling locality " << locality_id << '\n'; + cmd_fut = scheduler::toggle_node(locality_id); + } + else if (command == "set_speed") + { + cmd_fut = hpx::make_ready_future(); + } + else if (command == "set_efficiency") + { + cmd_fut = hpx::make_ready_future(); + } + else if (command == "set_power") + { + cmd_fut = hpx::make_ready_future(); + } + else + { + cmd_fut = hpx::make_ready_future(); + std::cout << "got unknown command: " << command << ':' << payload << '\n'; + } + + // Read next... + cmd_fut.then([this](hpx::future){ get_commands(); }); + } + ); + } + ); + } + void shutdown() { if (!enabled_) return; @@ -374,4 +458,12 @@ namespace allscale { namespace dashboard }); }); } + + void get_commands() + { + dashboard_client& client = dashboard_client::get(); + + if (!client) return; + client.get_commands(); + } }} diff --git a/src/hierarchy.cpp b/src/hierarchy.cpp index 035fb38..f60eb7b 100644 --- a/src/hierarchy.cpp +++ b/src/hierarchy.cpp @@ -3,50 +3,36 @@ namespace allscale { namespace runtime { - HierarchyAddress HierarchyAddress::getRootOfNetworkSize(std::size_t numa_size, std::size_t size) { + HierarchyAddress HierarchyAddress::getRootOfNetworkSize(std::size_t size) { // do not support empty networks assert_ne(0,size); // obtain the result - return { 0, 0, getLayersOn(0, 0, numa_size, size)-1 }; + return { 0, getLayersOn(0, size)-1 }; } - std::size_t HierarchyAddress::numaCutOff = 0; - bool HierarchyAddress::check() const { -// // to be valid, the last 'layer' digits in the rank must be 0 -// for(layer_t i=0; i>i) % 2 != 0) return false; -// } -// // all fine + // to be valid, the last 'layer' digits in the rank must be 0 + for(layer_t i=0; i>i) % 2 != 0) return false; + } + // all fine return true; } - layer_t HierarchyAddress::getLayersOn(std::size_t rank, std::size_t numa_node, std::size_t numa_size, std::size_t size) { + layer_t HierarchyAddress::getLayersOn(std::size_t rank, std::size_t size) { assert_lt(rank,size); - // Calculate numa height - std::size_t numa_height = 0; - if (numa_node == 0) - { - std::size_t pos = 1; - while (pos < numa_size) - { - numa_height++; - pos <<=1; - } - } - // if it is the root, we have to compute the ceil(log_2(size)) if (rank == 0) { // compute the height of the hierarchy layer_t height = 0; - std::size_t pos = 1; + std::size_t pos = 1; while(pos < size) { height++; pos <<=1; } - return height+1 + numa_height; + return height+1; } // otherwise: compute the number of trailing 0s + 1 @@ -55,11 +41,11 @@ namespace runtime { rank >>= 1; res++; } - return res + numa_height; + return res; } std::ostream& operator<<(std::ostream& out, const HierarchyAddress& addr) { - return out << addr.rank << ":" << addr.numa_node << ":" << addr.layer; + return out << addr.rank << ":" << addr.layer; } diff --git a/src/monitor.cpp b/src/monitor.cpp index b5b4e2a..dc34369 100644 --- a/src/monitor.cpp +++ b/src/monitor.cpp @@ -12,6 +12,8 @@ typedef hpx::components::component monitor_compon HPX_REGISTER_COMPONENT(monitor_component) namespace allscale { + bool monitor::enabled = false; + components::monitor* monitor::run(std::size_t rank) { return get_ptr(); @@ -52,6 +54,12 @@ namespace allscale { } } + void monitor::add_task_time(task_id::task_path const& path, task_times::time_t const& time) + { + if (enabled) + get().add_task_time(path, time); + } + components::monitor &monitor::get() { HPX_ASSERT(get_ptr()); @@ -68,6 +76,7 @@ namespace allscale { res = m.component_.get(); } res->init(); + enabled = res->enable_monitor; return res; } diff --git a/src/optimizer.cpp b/src/optimizer.cpp index e1b8f15..0201981 100644 --- a/src/optimizer.cpp +++ b/src/optimizer.cpp @@ -24,17 +24,13 @@ namespace allscale optimizer_state get_optimizer_state() { float load = 1.f - float(monitor::get().get_idle_rate() / 100.); -#ifdef ALLSCALE_HAVE_CPUFREQ - float frequency = components::util::hardware_reconf::get_kernel_freq(0); -#else - float frequency = 1.f; -#endif - return optimizer_state(load, frequency, scheduler::get().get_active_threads()); +// return load; + return {load, monitor::get().get_task_times()}; } - void optimizer_update_policy(std::vector const& state, std::vector mask) + void optimizer_update_policy(task_times const& times, std::vector mask) { - scheduler::update_policy(state, mask); + scheduler::update_policy(times, mask); } } @@ -123,7 +119,7 @@ namespace allscale , localities_(hpx::find_all_localities()) {} - void global_optimizer::tune(std::vector const& state) + void global_optimizer::tune(std::vector const& state) { #ifdef ALLSCALE_HAVE_CPUFREQ float max_frequency = components::util::hardware_reconf::get_frequencies(0).back(); @@ -148,14 +144,14 @@ namespace allscale for (std::size_t i = 0; i < allscale::get_num_localities(); ++i) { - if (i < num_active_nodes_) - { - used_cycles += state[i].load * state[i].active_frequency * state[i].cores_per_node; - avail_cycles += state[i].active_frequency * state[i].cores_per_node; - used_power += estimate_power(state[i].active_frequency); - active_frequency += state[i].active_frequency / num_active_nodes_; - } - max_cycles += max_frequency * state[i].cores_per_node; +// if (i < num_active_nodes_) +// { +// used_cycles += state[i] * state[i].active_frequency * state[i].cores_per_node; +// avail_cycles += state[i].active_frequency * state[i].cores_per_node; +// used_power += estimate_power(state[i].active_frequency); +// active_frequency += state[i].active_frequency / num_active_nodes_; +// } +// max_cycles += max_frequency * state[i].cores_per_node; max_power = estimate_power(max_frequency); } @@ -251,34 +247,37 @@ namespace allscale [this, tuned](hpx::future> future_state) { auto state = future_state.get(); +// std::vector load; +// load.reserve(state.size()); // compute the load variance - optimizer_state avg_state = std::accumulate( - state.begin(), state.end(), optimizer_state(0.0f, 0.f, 0), - [this](optimizer_state const& lhs, optimizer_state const& rhs) + float avg_load = 0.0f; + task_times times; + + std::for_each(state.begin(), state.end(), + [×, &avg_load](optimizer_state const& s) { - return optimizer_state( - (lhs.load + rhs.load), - (lhs.active_frequency + rhs.active_frequency) / num_active_nodes_, - (lhs.cores_per_node + rhs.cores_per_node) / num_active_nodes_); - } - ); - avg_state.load = avg_state.load / num_active_nodes_; +// load.push_back(s.load_); + avg_load += s.load_; + times += s.task_times_; + }); + avg_load = avg_load / num_active_nodes_; + float sum_dist = 0.f; for(std::size_t i=0; i 1) ? sum_dist / (num_active_nodes_ - 1) : 0.0f; std::cerr - << "Average load " << std::setprecision(2) << avg_state.load +// << times << '\n' + << "Average load " << std::setprecision(2) << avg_load << ", load variance " << std::setprecision(2) << var - << ", average frequency " << std::setprecision(2) << avg_state.active_frequency - << ", total progress " << std::setprecision(2) << (avg_state.load*num_active_nodes_) +// << ", average frequency " << std::setprecision(2) << avg_state.active_frequency + << ", total progress " << std::setprecision(2) << (avg_load*num_active_nodes_) << " on " << num_active_nodes_ << " nodes\n"; // -- Step 2: if stable, adjust number of nodes and clock speed @@ -286,7 +285,7 @@ namespace allscale if (tuned && var < 0.01f) { // adjust number of nodes and CPU frequency - tune(state); +// tune(state); } // -- Step 3: enforce selected number of nodes and clock speed, keep system balanced @@ -297,7 +296,7 @@ namespace allscale } hpx::lcos::broadcast_apply( - localities_, state, mask); + localities_, times, mask); // // get the local scheduler // auto& scheduleService = node.getService>().get(0); diff --git a/src/schedule_policy.cpp b/src/schedule_policy.cpp index fa135c1..dc30566 100644 --- a/src/schedule_policy.cpp +++ b/src/schedule_policy.cpp @@ -91,19 +91,24 @@ namespace allscale { } namespace { - std::vector getEqualDistribution(int numNodes, int numTasks) { + std::vector getEqualDistribution(std::vector const& mask, int numTasks) { std::vector res(numTasks); // fill it with an even load task distribution + std::size_t numNodes = std::count(mask.begin(), mask.end(), true); int share = numTasks / numNodes; int remainder = numTasks % numNodes; int c = 0; + int l = 0; for(int i=0; i 2^0 = 100%, 5 => 2^-5 = 3.125% */ - std::unique_ptr tree_scheduling_policy::create_uniform(int N, int M, int granularity) + std::unique_ptr tree_scheduling_policy::create_uniform(std::vector const& mask, int granularity) { // some sanity checks - assert_lt(0,N * M); + assert_lt(0, std::count(mask.begin(), mask.end(), true)); // compute number of levels to be scheduled - auto log2 = ceilLog2(N * M); + auto log2 = ceilLog2(mask.size()); auto levels = std::max(log2,granularity); // create initial task to node mapping int numTasks = (1< mapping = getEqualDistribution(N * M,numTasks); + std::vector mapping = getEqualDistribution(mask,numTasks); // std::cerr << "create policy: " << (1<(new tree_scheduling_policy( - runtime::HierarchyAddress::getRootOfNetworkSize(N, M), levels, + runtime::HierarchyAddress::getRootOfNetworkSize(mask.size()), levels, toDecisionTree((1< tree_scheduling_policy::create_uniform(std::vector const& mask) + { + return create_uniform(mask, std::max(ceilLog2(mask.size())+3,5)); + } + + std::unique_ptr tree_scheduling_policy::create_uniform(int N, int granularity) + { + std::vector mask(N, true); + return create_uniform(mask, granularity); + } + /** * Creates a scheduling policy distributing work uniformly among the given number of nodes. The * granulartiy will be adjusted accordingly, such that ~8 tasks per node are created. * * @param N the number of nodes to distribute work on */ - std::unique_ptr tree_scheduling_policy::create_uniform(int N, int M) + std::unique_ptr tree_scheduling_policy::create_uniform(int N) { - return create_uniform(N, M, std::max(ceilLog2(N * M)+3,5)); + std::vector mask(N, true); + return create_uniform(mask, std::max(ceilLog2(N)+3,5)); + } + + namespace { + + std::unique_ptr rebalance_tasks(tree_scheduling_policy const& old, const std::vector& task_costs, std::vector const& mask) + { + // check input... + std::size_t mapping_size = 1 << old.get_granularity(); + HPX_ASSERT(mapping_size == task_costs.size()); + + std::size_t num_nodes = mask.size(); + + // --- redistributing costs --- + + // get number of available nodes + std::size_t available_nodes = 0; + for (bool x : mask) if (x) available_nodes++; + + // if there is really non available, force it to at least one + if (available_nodes == 0) available_nodes = 1; + + float total_costs = std::accumulate(task_costs.begin(), task_costs.end(), 0.0f); + float share = total_costs / available_nodes; + + float cur_costs = 0; + float next_goal = share; + + std::size_t cur_node = 0; + while (!mask[cur_node]) cur_node++; + + std::vector new_mapping(mapping_size); + for (std::size_t i = 0; i < mapping_size; ++i) + { + // compute next costs + auto next_costs = cur_costs + task_costs[i]; + + // if we cross a boundary + if (cur_node < num_nodes - 1) + { + if (std::abs(cur_costs - next_goal) < std::abs(next_goal - next_costs)) + { + cur_node++; + while (!mask[cur_node]) cur_node++; + next_goal += share; + } + } + + new_mapping[i] = cur_node; + cur_costs = next_costs; + } + + // create new scheduling policy + auto log2 = old.root().getLayer(); + + return std::unique_ptr(new tree_scheduling_policy( + old.root(), old.get_granularity(), + toDecisionTree((1< tree_scheduling_policy::create_rebalanced(const scheduling_policy& old_base, const std::vector& state, std::vector const& mask) + std::unique_ptr tree_scheduling_policy::create_rebalanced(const scheduling_policy& old_base, const std::vector& state) + { + std::vector mask(state.size(), true); + return create_rebalanced(old_base, state, mask); + } + + std::unique_ptr tree_scheduling_policy::create_rebalanced(const scheduling_policy& old_base, const std::vector& state, std::vector const& mask) { tree_scheduling_policy const& old = static_cast(old_base); @@ -234,7 +316,7 @@ namespace allscale { std::size_t num_nodes = state.size(); // test that all load values are positive - HPX_ASSERT(std::all_of(state.begin(), state.end(), [](optimizer_state const& state) { return state.load >= 0.0f; })); + HPX_ASSERT(std::all_of(state.begin(), state.end(), [](float load) { return load >= 0.0f; })); // count number of tasks per node std::vector old_share(num_nodes,0); @@ -247,7 +329,7 @@ namespace allscale { // compute average costs for tasks (default value) float sum = 0.f; - for(const auto& cur : state) sum += cur.load; + for(const auto& cur : state) sum += cur; float avg = sum / mapping.size(); // average costs per task on node @@ -260,7 +342,7 @@ namespace allscale { } else { - costs[i] = state[i].load/old_share[i]; + costs[i] = state[i]/old_share[i]; } } @@ -270,114 +352,40 @@ namespace allscale { for(std::size_t i=0; i < mapping.size(); i++) { task_costs[i] = costs[mapping[i]]; - total_costs += task_costs[i]; } - // --- redistributing costs --- - - // get number of now available nodes - std::size_t available_nodes = 0; - for(bool x : mask) if (x) available_nodes++; - - // if there is really none, make it at least one - if (available_nodes < 1) available_nodes = 1; - - float share = total_costs / available_nodes; + return rebalance_tasks(old, task_costs, mask); + } - float cur_costs = 0; - float next_goal = share; - std::size_t cur_node = 0; - while(!mask[cur_node]) cur_node++; - std::vector new_mapping(mapping.size()); - for(std::size_t i=0; i& res) { - // compute next costs - auto next_costs = cur_costs + task_costs[i]; - - // if we cross a boundary - if (cur_node < (num_nodes-1)) + // if we are deep enough... + if (cur.getLength() == depth) { - // decide whether to switch to next node - if (std::abs(cur_costs-next_goal) < std::abs(next_goal-next_costs)) - { - // stopping here is closer to the target - cur_node++; - while(!mask[cur_node]) cur_node++; - next_goal += share; - } + res[cur.getPath()] = times.get_time(cur); + return; } - // else, just add current task to current node - new_mapping[i] = cur_node; - cur_costs = next_costs; + // otherwise, process left and right + sample_task_costs(times, cur.getLeftChildPath(), depth, res); + sample_task_costs(times, cur.getRightChildPath(), depth, res); } - // for development, to estimate quality: - - const bool DEBUG_FLAG = false; - if (DEBUG_FLAG) { - // --- compute new load distribution --- - std::vector newEstCosts(num_nodes,0); - for(std::size_t i=0; i& v)->float { - float s = 0; - for(const auto& cur : v) { - s += cur; - } - return s / v.size(); - }; - auto variance = [&](const std::vector& v)->float { - float m = mean(v); - float s = 0; - for(const auto& cur : v) { - auto d = cur - m; - s += d*d; - } - return s / (v.size()-1); - }; + std::unique_ptr tree_scheduling_policy::create_rebalanced(const scheduling_policy& old_base, task_times const& times, std::vector const& mask) + { + tree_scheduling_policy const& old = static_cast(old_base); - auto mean_s = [](const std::vector& v)->float { - float s = 0; - for(const auto& cur : v) { - s += cur.load; - } - return s / v.size(); - }; - auto variance_s = [&](const std::vector& v)->float { - float m = mean_s(v); - float s = 0; - for(const auto& cur : v) { - auto d = cur.load - m; - s += d*d; - } - return s / (v.size()-1); - }; - - std::vector load(state.size()); - std::transform(state.begin(), state.end(), load.begin(), [](optimizer_state const& s) { return s.load; }); - - std::cerr << "Load vector: " << load << " - " << mean_s(state) << " / " << variance_s(state) << "\n"; - std::cerr << "Est. vector: " << newEstCosts << " - " << mean(newEstCosts) << " / " << variance(newEstCosts) << "\n"; - std::cerr << "Target Load: " << share << "\n"; - std::cerr << "Task shared: " << old_share << "\n"; - std::cerr << "Node costs: " << costs << "\n"; - std::cerr << "Task costs: " << task_costs << "\n"; - std::cerr << "In-distribution: " << mapping << "\n"; - std::cerr << "Out-distribution: " << new_mapping << "\n"; - std::cerr << toDecisionTree((1< task_costs(mapping_size); - // create new scheduling policy - auto log2 = old.root_.getLayer(); -// std::cerr << "create rebalance policy: " << (1<(new tree_scheduling_policy( - old.root_, old.granularity_, - toDecisionTree((1<= int(root_.getLayer()) - int(addr.getLayer()); + return int(path.getLength()) - 3 >= int(root_.getLayer()) - int(addr.getLayer()); } schedule_decision random_scheduling_policy::decide(runtime::HierarchyAddress const& addr, const task_id::task_path& path) const @@ -518,12 +528,14 @@ namespace allscale { if (path.getLength() < 3) return schedule_decision::stay; - if (int(path.getLength()) - 3 >= int(root_.getLayer()) - int(addr.getLayer())) + if (int(path.getLength() - 3 == int(root_.getLayer()) - int(addr.getLayer()))) { +// std::cout << "wtf??\n"; return schedule_decision::stay; } auto r = policy(generator); +// std::cout << r << "\n"; if (r < 0.5) return schedule_decision::left; diff --git a/src/scheduler.cpp b/src/scheduler.cpp index a526aaf..254590a 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -28,13 +28,35 @@ namespace allscale void schedule_global(runtime::HierarchyAddress addr, work_item work); void schedule_down_global(runtime::HierarchyAddress addr, work_item work, std::unique_ptr reqs); + + void toggle_node_broadcast(std::size_t locality_id); + void set_policy_broadcast(std::string policy); } HPX_PLAIN_DIRECT_ACTION(allscale::schedule_global, schedule_global_action); HPX_PLAIN_DIRECT_ACTION(allscale::schedule_down_global, schedule_down_global_action); +HPX_PLAIN_DIRECT_ACTION(allscale::toggle_node_broadcast, toggle_node_action); +HPX_PLAIN_DIRECT_ACTION(allscale::set_policy_broadcast, set_policy_action); namespace allscale { + scheduler::mutex_type scheduler::active_mtx_; + bool scheduler::active_ = true; + bool scheduler::active() + { + std::lock_guard l(active_mtx_); + return active_; + } + + void scheduler::toggle_active(bool toggle) + { + std::lock_guard l(active_mtx_); + if (toggle) + active_ = !active_; + else + active_ = true; + } + void scheduler::partition_resources(hpx::resource::partitioner& rp) { auto const& numa_domains = rp.numa_domains(); @@ -143,54 +165,73 @@ namespace allscale policy_value value_; std::unique_ptr policy_; - }; - replacable_policy create_policy() - { - char * env = std::getenv("ALLSCALE_SCHEDULING_POLICY"); - if (env) + std::string policy() { - if (env == std::string("uniform")) + switch (value_) { - return { - replacable_policy::static_, - tree_scheduling_policy::create_uniform( - allscale::get_num_numa_nodes(), allscale::get_num_localities()) - }; - } - if (env == std::string("dynamic")) - { - return { - replacable_policy::dynamic, - tree_scheduling_policy::create_uniform( - allscale::get_num_numa_nodes(), allscale::get_num_localities()) - }; - } - if (env == std::string("tuned")) - { - return { - replacable_policy::tuned, - tree_scheduling_policy::create_uniform( - allscale::get_num_numa_nodes(), allscale::get_num_localities()) - }; - } - if (env == std::string("random")) - { - return { - replacable_policy::random, - std::unique_ptr(new random_scheduling_policy( - runtime::HierarchyAddress::getRootOfNetworkSize(allscale::get_num_numa_nodes(), allscale::get_num_localities())) - ) - }; + case static_: + return "uniform"; + case dynamic: + return "balanced"; + case tuned: + return "tuned"; + case random: + return "random"; + default: + return "unknown"; } } + }; + + replacable_policy create_policy(std::string const& policy) + { + if (policy == "uniform") + { + return { + replacable_policy::static_, + tree_scheduling_policy::create_uniform(allscale::get_num_localities()) + }; + } + if (policy == "balanced") + { + return { + replacable_policy::dynamic, + tree_scheduling_policy::create_uniform(allscale::get_num_localities()) + }; + } + if (policy == "tuned") + { + return { + replacable_policy::tuned, + tree_scheduling_policy::create_uniform(allscale::get_num_localities()) + }; + } + if (policy == "random") + { + return { + replacable_policy::random, + std::unique_ptr(new random_scheduling_policy( + runtime::HierarchyAddress::getRootOfNetworkSize(allscale::get_num_localities())) + ) + }; + } return { replacable_policy::static_, - tree_scheduling_policy::create_uniform( - allscale::get_num_numa_nodes(), allscale::get_num_localities()) + tree_scheduling_policy::create_uniform(allscale::get_num_localities()) }; } + + replacable_policy create_policy() + { + char * env = std::getenv("ALLSCALE_SCHEDULING_POLICY"); + if (env) + { + return create_policy(env); + } + return create_policy("uniform"); + } } struct scheduler_service @@ -199,6 +240,7 @@ namespace allscale using mutex_type = hpx::lcos::local::spinlock; mutex_type mtx_; replacable_policy policy_; + std::vector mask_; runtime::HierarchyAddress here_; runtime::HierarchyAddress root_; runtime::HierarchyAddress parent_; @@ -211,6 +253,7 @@ namespace allscale scheduler_service(scheduler_service&& other) : policy_(std::move(other.policy_)) + , mask_(std::move(other.mask_)) , here_(std::move(other.here_)) , root_(std::move(other.root_)) , parent_(std::move(other.parent_)) @@ -224,10 +267,10 @@ namespace allscale scheduler_service(runtime::HierarchyAddress here) : policy_(create_policy()) + , mask_(allscale::get_num_localities(), true) , here_(here) - , root_(runtime::HierarchyAddress::getRootOfNetworkSize( - allscale::get_num_numa_nodes(), allscale::get_num_localities() - )) + , root_( + runtime::HierarchyAddress::getRootOfNetworkSize(allscale::get_num_localities())) , parent_(here_.getParent()) , is_root_(here_ == root_) { @@ -257,22 +300,49 @@ namespace allscale } } - void update_policy(std::vector const& state, std::vector mask) + std::string policy() + { + return policy_.policy(); + } + + void toggle_node(std::size_t locality_id) { std::lock_guard l(mtx_); - if (!(policy_.value_ == replacable_policy::dynamic || policy_.value_ == replacable_policy::tuned)) return; + mask_[locality_id] = !mask_[locality_id]; - policy_.policy_ = tree_scheduling_policy::create_rebalanced(*policy_.policy_, state, mask); + if (policy_.value_ == replacable_policy::random) + { + policy_.value_ = replacable_policy::static_; + policy_.policy_ = tree_scheduling_policy::create_uniform(mask_); + } + else + { + policy_.policy_ = tree_scheduling_policy::create_uniform(mask_); + } } - void schedule(work_item work) + void set_policy(std::string policy) { + auto pol = create_policy(policy); { - int i = 0; - unsigned j =0; - if (i < j) { return; } + std::lock_guard l(mtx_); + mask_ = std::vector(mask_.size(), true); + policy_ = std::move(pol); } - if (policy_.value_ == replacable_policy::dynamic && + } + + void update_policy(task_times const& times, std::vector const& mask) + { + std::lock_guard l(mtx_); + if (!(policy_.value_ == replacable_policy::dynamic || policy_.value_ == replacable_policy::tuned)) return; + + mask_ = mask; + policy_.policy_ = tree_scheduling_policy::create_rebalanced(*policy_.policy_, times, mask_); + } + + void schedule(work_item work) + { + if (is_root_ && policy_.value_ == replacable_policy::dynamic && work.id().is_root() && work.id().id > 0 && (work.id().id % 10 == 0)) { optimizer_.balance(false); @@ -301,25 +371,34 @@ namespace allscale // test that this virtual node is allowed to interfere with the scheduling // of this task std::lock_guard l(mtx_); -// is_involved = policy_.policy_->is_involved(here_, (path.isRoot() ? path : path.getParentPath())); - is_involved = policy_.policy_->is_involved(here_, path); + is_involved = (path.isRoot()) ? is_root_ : policy_.policy_->is_involved(here_, path.getParentPath()); +// is_involved = policy_.policy_->is_involved(here_, path); } if (is_involved // test that this virtual node has control over all required data && reqs->check_write_requirements(here_)) { -// std::cout << here_ << ' ' << work.name() << "." << work.id() << ": shortcut " << '\n'; +// std::cout << here_ << ' ' << work.name() << "." << work.id() << ": shortcut " << '\n'; schedule_down(std::move(work), std::move(reqs)); return; } } + bool unallocated = reqs->get_missing_regions(here_); + // If there are unallocated allowances still to process, do so + if (unallocated) + { +// std::cout << "unallocated...\n"; + schedule_down(std::move(work), std::move(reqs)); + return; + } + // if we are not the root, we need to propagate to our parent... if (!is_root_) { if (!parent_id_) { - runtime::HierarchicalOverlayNetwork::getLocalService(parent_). + runtime::HierarchicalOverlayNetwork::getLocalService(parent_.getLayer()). schedule(std::move(work)); } else @@ -329,7 +408,6 @@ namespace allscale return; } - reqs->get_missing_regions(here_); schedule_down(std::move(work), std::move(reqs)); return; } @@ -358,11 +436,10 @@ namespace allscale // if this is not involved, send task to parent if (!is_involved) { - reqs->add_allowance(here_); HPX_ASSERT(!is_root_); if (!parent_id_) { - runtime::HierarchicalOverlayNetwork::getLocalService(parent_). + runtime::HierarchicalOverlayNetwork::getLocalService(parent_.getLayer()). schedule(std::move(work)); } else @@ -372,15 +449,10 @@ namespace allscale return; } - if (here_.isLeaf()) - { - d = schedule_decision::stay; - } - HPX_ASSERT(d != schedule_decision::done); // if it should stay, process it here - if (d == schedule_decision::stay) + if (d == schedule_decision::stay || here_.isLeaf()) { schedule_local(std::move(work), std::move(reqs)); return; @@ -393,7 +465,7 @@ namespace allscale // std::cout << here_ << ' ' << work.name() << "." << id << ": left: " << '\n'; // reqs->show(); reqs->add_allowance_left(here_); - runtime::HierarchicalOverlayNetwork::getLocalService(left_). + runtime::HierarchicalOverlayNetwork::getLocalService(left_.getLayer()). schedule_down(std::move(work), std::move(reqs)); } else @@ -402,16 +474,10 @@ namespace allscale // reqs->show(); reqs->add_allowance_right(here_); - if (!right_id_ && allscale::resilience::rank_running(right_.getRank())) - { - runtime::HierarchicalOverlayNetwork::getLocalService(right_). - schedule_down(std::move(work), std::move(reqs)); - } - else - { - allscale::resilience::global_wi_dispatched(work, right_.getRank()); - hpx::apply(right_id_, right_, std::move(work), std::move(reqs)); - } + HPX_ASSERT(right_id_); + + allscale::resilience::global_wi_dispatched(work, right_.getRank()); + hpx::apply(right_id_, right_, std::move(work), std::move(reqs)); } return; } @@ -435,7 +501,8 @@ namespace allscale // std::cout << here_ << ' ' << work.name() << "." << id << ": left: " << '\n'; // reqs->show(); res.second->add_allowance_left(here_); - runtime::HierarchicalOverlayNetwork::getLocalService(left_). + HPX_ASSERT(left_.getLayer() == here_.getLayer()-1); + runtime::HierarchicalOverlayNetwork::getLocalService(left_.getLayer()). schedule_local(std::move(res.first), std::move(res.second)); } }; @@ -443,12 +510,8 @@ namespace allscale void scheduler::schedule(work_item&& work) { // Calculate our local root address ... - static auto rank = get().rank_; - static auto local_layers = runtime::HierarchyAddress::getLayersOn( - rank, 0, allscale::get_num_numa_nodes(), allscale::get_num_localities()); - static auto addr = runtime::HierarchyAddress(rank, 0, local_layers - 1); - static auto &local_scheduler_service = - runtime::HierarchicalOverlayNetwork::getLocalService(addr); + auto &local_scheduler_service = + runtime::HierarchicalOverlayNetwork::getLocalService(); allscale::monitor::signal(allscale::monitor::work_item_enqueued, work); @@ -456,25 +519,70 @@ namespace allscale // get().enqueue(work); } - void scheduler::update_policy(std::vector const& state, std::vector mask) + void toggle_node_broadcast(std::size_t locality_id) + { + runtime::HierarchicalOverlayNetwork::forAllLocal( + [&](scheduler_service& sched) + { + sched.toggle_node(locality_id); + } + ); + + if (hpx::get_locality_id() == locality_id) + { + scheduler::toggle_active(); + } + } + + hpx::future scheduler::toggle_node(std::size_t locality_id) + { + return hpx::lcos::broadcast(hpx::find_all_localities(), + locality_id); + } + + void set_policy_broadcast(std::string policy) + { + runtime::HierarchicalOverlayNetwork::forAllLocal( + [&](scheduler_service& sched) + { + sched.set_policy(policy); + } + ); + // This sets the state to active again... + scheduler::toggle_active(false); + } + + hpx::future scheduler::set_policy(std::string policy) + { + return hpx::lcos::broadcast(hpx::find_all_localities(), + policy); + } + + std::string scheduler::policy() + { + return runtime::HierarchicalOverlayNetwork::getLocalService(). + policy(); + } + + void scheduler::update_policy(task_times const& times, std::vector mask) { runtime::HierarchicalOverlayNetwork::forAllLocal( [&](scheduler_service& sched) { - sched.update_policy(state, mask); + sched.update_policy(times, mask); } ); } void schedule_global(runtime::HierarchyAddress addr, work_item work) { - runtime::HierarchicalOverlayNetwork::getLocalService(addr). + runtime::HierarchicalOverlayNetwork::getLocalService(addr.getLayer()). schedule(std::move(work)); } void schedule_down_global(runtime::HierarchyAddress addr, work_item work, std::unique_ptr reqs) { - runtime::HierarchicalOverlayNetwork::getLocalService(addr). + runtime::HierarchicalOverlayNetwork::getLocalService(addr.getLayer()). schedule_down(std::move(work), std::move(reqs)); } @@ -485,7 +593,6 @@ namespace allscale components::scheduler* scheduler::run(std::size_t rank) { - runtime::HierarchyAddress::numaCutOff = std::ceil(std::log2(allscale::get_num_numa_nodes())); runtime::HierarchicalOverlayNetwork hierarchy; hierarchy.installService(); diff --git a/src/task_id.cpp b/src/task_id.cpp index 751d973..d9427a1 100644 --- a/src/task_id.cpp +++ b/src/task_id.cpp @@ -20,7 +20,7 @@ namespace allscale { } return this_wi->id(); } - return {locality_id, id, path.getParentPath(), nullptr}; + return {locality_id, id, path.getParentPath(), 0, std::uint8_t(-1), nullptr}; } task_id task_id::create_root() @@ -59,7 +59,7 @@ namespace allscale { { std::stringstream ss; - ss << id; + ss << id << '(' << id.path.getPath() << ',' << id.path.getLength() << ')'; return ss.str(); } diff --git a/src/task_times.cpp b/src/task_times.cpp new file mode 100644 index 0000000..772ba49 --- /dev/null +++ b/src/task_times.cpp @@ -0,0 +1,87 @@ +#include + +namespace allscale +{ + constexpr std::size_t TRACKING_LEVEL = 14; + + task_times::task_times() + : times(1< getAllAddresses(std::size_t numa, std::size_t size) { + std::set getAllAddresses(std::size_t size) { std::set all; - collectAll(HierarchyAddress::getRootOfNetworkSize(numa, size),all); + collectAll(HierarchyAddress::getRootOfNetworkSize(size),all); // filter all beyond size std::set res; for(const auto& cur : all) { - if (cur.getRank() < size && cur.getNumaNode() < numa) res.insert(cur); + if (cur.getRank() < size) res.insert(cur); } // done @@ -248,61 +118,55 @@ namespace { } -void navigation(std::size_t M) +void navigation() { using namespace allscale::runtime; - { - std::size_t N = 8; - HierarchyAddress::numaCutOff = std::ceil(std::log2(M)); - - std::size_t depth = std::ceil(std::log2(N * M)); - std::size_t num_nodes = std::pow(2, depth + 1) - 1; - - // test the full navigation in an 8-wide tree - auto root = HierarchyAddress::getRootOfNetworkSize(M, N); - - // collect all nodes - std::set all; - collectAll(root,all); - - // check the number of nodes - HPX_TEST_EQ(num_nodes,all.size()); - - int numLeafs = 0; - int numInner = 0; - int numLeftChildren = 0; - int numRightChildren = 0; - for(const auto& cur : all) { - - if (cur.isLeaf()) { - numLeafs++; - } else { - HPX_TEST(cur.isVirtualNode()); - numInner++; - } - - // check family relations - if (cur.isLeftChild()) { - HPX_TEST_EQ(cur,cur.getParent().getLeftChild()); - numLeftChildren++; - } else { - HPX_TEST(cur.isRightChild()); - HPX_TEST_EQ(cur,cur.getParent().getRightChild()); - numRightChildren++; - } - - // check height - HPX_TEST_LT(cur.getLayer(), HierarchyAddress::getLayersOn(cur.getRank(),0, M, N)); - } - - // check the correct number of leafs and inner nodes - HPX_TEST_EQ((num_nodes + 1)/2,numLeafs); - HPX_TEST_EQ(num_nodes - ((num_nodes + 1)/2),numInner); - - // also: the correct number of left and right children - HPX_TEST_EQ(num_nodes/2 + 1,numLeftChildren); // the root is a left child - HPX_TEST_EQ(num_nodes/2,numRightChildren); + std::size_t N = 8; + + // test the full navigation in an 8-wide tree + auto root = HierarchyAddress::getRootOfNetworkSize(N); + + // collect all nodes + std::set all; + collectAll(root,all); + + // check the number of nodes + HPX_TEST_EQ(15,all.size()); + + int numLeafs = 0; + int numInner = 0; + int numLeftChildren = 0; + int numRightChildren = 0; + for(const auto& cur : all) { + + if (cur.isLeaf()) { + numLeafs++; + } else { + HPX_TEST(cur.isVirtualNode()); + numInner++; + } + + // check family relations + if (cur.isLeftChild()) { + HPX_TEST_EQ(cur,cur.getParent().getLeftChild()); + numLeftChildren++; + } else { + HPX_TEST(cur.isRightChild()); + HPX_TEST_EQ(cur,cur.getParent().getRightChild()); + numRightChildren++; + } + + // check height + HPX_TEST_LT(cur.getLayer(), HierarchyAddress::getLayersOn(cur.getRank(), N)); } + + // check the correct number of leafs and inner nodes + HPX_TEST_EQ(8,numLeafs); + HPX_TEST_EQ(7,numInner); + + // also: the correct number of left and right children + HPX_TEST_EQ(8,numLeftChildren); // the root is a left child + HPX_TEST_EQ(7,numRightChildren); } using namespace allscale::runtime; @@ -319,13 +183,14 @@ namespace { } HierarchyAddress where_are_you(HierarchyAddress const& addr) { - return HierarchicalOverlayNetwork::getLocalService(addr).whereAreYou(); + return HierarchicalOverlayNetwork::getLocalService(addr.getLayer()).whereAreYou(); } + HPX_PLAIN_ACTION(where_are_you) void comm_test_impl() { - auto all = getAllAddresses(allscale::get_num_numa_nodes(), allscale::get_num_localities()); + auto all = getAllAddresses(allscale::get_num_localities()); for(const auto& cur : all) { hpx::id_type dest = hpx::naming::get_id_from_locality_id(cur.getRank()); @@ -361,12 +226,7 @@ int hpx_main(int argc, char **argv) print(); root(); layers_on_node(); - navigation(1); - navigation(2); - navigation(4); - - HierarchyAddress::numaCutOff = std::ceil(std::log2(allscale::get_num_numa_nodes())); - + navigation(); comm_test(); return hpx::finalize(); diff --git a/tests/unit/core/schedule_policy.cpp b/tests/unit/core/schedule_policy.cpp index 4504637..bc868a2 100644 --- a/tests/unit/core/schedule_policy.cpp +++ b/tests/unit/core/schedule_policy.cpp @@ -86,12 +86,12 @@ namespace { } - allscale::runtime::HierarchyAddress traceTarget(int netSize, int numa, const allscale::scheduling_policy& policy, const allscale::task_id::task_path& path) { + allscale::runtime::HierarchyAddress traceTarget(int netSize, const allscale::scheduling_policy& policy, const allscale::task_id::task_path& path) { // for roots it is easy - if (path.isRoot()) return allscale::runtime::HierarchyAddress::getRootOfNetworkSize(netSize, numa); + if (path.isRoot()) return allscale::runtime::HierarchyAddress::getRootOfNetworkSize(netSize); // for everything else, we walk recursive - auto res = traceTarget(netSize, numa, policy,path.getParentPath()); + auto res = traceTarget(netSize, policy,path.getParentPath()); // simulate scheduling switch(policy.decide(res,path)) { @@ -104,10 +104,10 @@ namespace { return res; } - allscale::runtime::HierarchyAddress getTarget(int netSize, int numa, const allscale::scheduling_policy& policy, const allscale::task_id::task_path& path) { + allscale::runtime::HierarchyAddress getTarget(int netSize, const allscale::scheduling_policy& policy, const allscale::task_id::task_path& path) { // trace current path - auto res = traceTarget(netSize,numa,policy,path); + auto res = traceTarget(netSize,policy,path); // check if this is a leaf-level node if (res.isLeaf()) return res.getRank(); @@ -121,7 +121,7 @@ namespace { std::size_t pos = initial; for(const auto& cur : children) { if (cur.getLength() != path.getLength()*2) continue; - auto childTarget = traceTarget(netSize,numa, policy,cur); + auto childTarget = traceTarget(netSize, policy,cur); HPX_TEST(childTarget.isLeaf()); if (pos == initial) pos = childTarget.getRank(); else @@ -150,12 +150,11 @@ namespace { void scheduling_policy_uniform_fixed() { constexpr int NUM_NODES = 3; - constexpr int NUM_NUMA = 1; constexpr int CEIL_LOG_2_NUM_NODES = ceilLog2(NUM_NODES); constexpr int GRANULARITY = 3; // get uniform distributed policy - auto up = allscale::tree_scheduling_policy::create_uniform(NUM_NODES, NUM_NUMA, GRANULARITY); + auto up = allscale::tree_scheduling_policy::create_uniform(NUM_NODES, GRANULARITY); auto& u = static_cast(*up); // std::cout << u << "\n"; @@ -167,9 +166,9 @@ void scheduling_policy_uniform_fixed() // collect scheduling target on lowest level std::vector targets; for(const auto& cur : paths) { - HPX_TEST_EQ(traceTarget(NUM_NODES, NUM_NUMA,u,cur),u.get_target(cur)); + HPX_TEST_EQ(traceTarget(NUM_NODES, u,cur),u.get_target(cur)); if (cur.getLength() != max_length) continue; - auto target = getTarget(NUM_NODES, NUM_NUMA,u,cur); + auto target = getTarget(NUM_NODES,u,cur); HPX_TEST_EQ(0u,target.getLayer()); targets.push_back(target.getRank()); } @@ -180,12 +179,11 @@ void scheduling_policy_uniform_fixed() void scheduling_policy_uniform_fixed_coarse() { constexpr int NUM_NODES = 3; - constexpr int NUM_NUMA = 1; constexpr int CEIL_LOG_2_NUM_NODES = ceilLog2(NUM_NODES); constexpr int GRANULARITY = 2; // get uniform distributed policy - auto up = allscale::tree_scheduling_policy::create_uniform(NUM_NODES,NUM_NUMA, GRANULARITY); + auto up = allscale::tree_scheduling_policy::create_uniform(NUM_NODES, GRANULARITY); auto& u = static_cast(*up); // std::cout << u << "\n"; @@ -197,9 +195,9 @@ void scheduling_policy_uniform_fixed_coarse() // collect scheduling target on lowest level std::vector targets; for(const auto& cur : paths) { - HPX_TEST_EQ(traceTarget(NUM_NODES,NUM_NUMA, u,cur),u.get_target(cur)); + HPX_TEST_EQ(traceTarget(NUM_NODES, u,cur),u.get_target(cur)); if (cur.getLength() != max_length) continue; - auto target = getTarget(NUM_NODES,NUM_NUMA, u,cur); + auto target = getTarget(NUM_NODES,u,cur); HPX_TEST_EQ(0u,target.getLayer()); targets.push_back(target.getRank()); } @@ -211,12 +209,11 @@ void scheduling_policy_uniform_fixed_coarse() void scheduling_policy_uniform_fixed_fine() { constexpr int NUM_NODES = 3; - constexpr int NUM_NUMA = 1; constexpr int CEIL_LOG_2_NUM_NODES = ceilLog2(NUM_NODES); constexpr int GRANULARITY = 5; // get uniform distributed policy - auto up = allscale::tree_scheduling_policy::create_uniform(NUM_NODES,NUM_NUMA, GRANULARITY); + auto up = allscale::tree_scheduling_policy::create_uniform(NUM_NODES, GRANULARITY); auto& u = static_cast(*up); // std::cout << u << "\n"; @@ -228,9 +225,9 @@ void scheduling_policy_uniform_fixed_fine() // collect scheduling target on lowest level std::vector targets; for(const auto& cur : paths) { - HPX_TEST_EQ(traceTarget(NUM_NODES, NUM_NUMA,u,cur),u.get_target(cur)); + HPX_TEST_EQ(traceTarget(NUM_NODES,u,cur),u.get_target(cur)); if (cur.getLength() != max_length) continue; - auto target = getTarget(NUM_NODES, NUM_NUMA, u,cur); + auto target = getTarget(NUM_NODES,u,cur); HPX_TEST_EQ(0u,target.getLayer()); targets.push_back(target.getRank()); } @@ -248,12 +245,11 @@ void scheduling_policy_uniform_n3_deeper() // SCOPED_TRACE("n=" + toString(n) + ",e=" + toString(e)); int NUM_NODES = n; - constexpr int NUM_NUMA = 1; int CEIL_LOG_2_NUM_NODES = ceilLog2(n); int GRANULARITY = CEIL_LOG_2_NUM_NODES + e; // get uniform distributed policy - auto up = allscale::tree_scheduling_policy::create_uniform(NUM_NODES,NUM_NUMA, GRANULARITY); + auto up = allscale::tree_scheduling_policy::create_uniform(NUM_NODES, GRANULARITY); auto& u = static_cast(*up); // get the list of all paths down to the given level @@ -263,9 +259,9 @@ void scheduling_policy_uniform_n3_deeper() // collect scheduling target on lowest level std::vector targets; for(const auto& cur : paths) { - HPX_TEST_EQ(traceTarget(NUM_NODES, NUM_NUMA, u,cur),u.get_target(cur)); + HPX_TEST_EQ(traceTarget(NUM_NODES, u,cur),u.get_target(cur)); if (cur.getLength() != max_length) continue; - auto target = getTarget(NUM_NODES, NUM_NUMA, u,cur); + auto target = getTarget(NUM_NODES, u,cur); HPX_TEST_EQ(0u,target.getLayer()); targets.push_back(target.getRank()); } @@ -366,7 +362,7 @@ void scheduling_policy_redirect() for(int num_nodes=1; num_nodes<=10; num_nodes++) { // get a uniform distribution - auto up = allscale::tree_scheduling_policy::create_uniform(num_nodes, 1); + auto up = allscale::tree_scheduling_policy::create_uniform(num_nodes); auto& policy = static_cast(*up); // std::cout << "N=" << num_nodes << "\n" << policy << "\n"; @@ -379,64 +375,136 @@ void scheduling_policy_redirect() } } -void scheduling_policy_balancing() +void scheduling_policy_rebalancing() { -// auto up = allscale::tree_scheduling_policy::create_uniform(4,1,5); -// auto& u = static_cast(*up); -// -// // providing a nicely balanced load should not cause any changes -// auto loadDist = std::vector(4,1.0); -// auto bp1 = allscale::tree_scheduling_policy::create_rebalanced(u,loadDist); -// auto& b1 = static_cast(*bp1); -// HPX_TEST_EQ(u.task_distribution_mapping(),b1.task_distribution_mapping()); -// -// // alter the distribution -// loadDist[1] = 3; // node 1 has 3x more load -// loadDist[3] = 2; // node 3 has 2x more load -// auto bp2 = allscale::tree_scheduling_policy::create_rebalanced(u,loadDist); -// auto& b2 = static_cast(*bp2); -// HPX_TEST_NEQ(u.task_distribution_mapping(),b2.task_distribution_mapping()); -// -// -// // something more homogeneous -// loadDist[0] = 1.25; -// loadDist[1] = 1.5; -// loadDist[2] = 1.25; -// loadDist[3] = 2; -// auto bp3 = allscale::tree_scheduling_policy::create_rebalanced(u,loadDist); -// auto& b3 = static_cast(*bp3); -// HPX_TEST_NEQ(u.task_distribution_mapping(),b3.task_distribution_mapping()); -// -// -// // something pretty even -// loadDist[0] = 1.05; -// loadDist[1] = 0.98; -// loadDist[2] = 0.99; -// loadDist[3] = 1.04; -// auto bp4 = allscale::tree_scheduling_policy::create_rebalanced(u,loadDist); -// auto& b4 = static_cast(*bp4); -// HPX_TEST_EQ(u.task_distribution_mapping(),b4.task_distribution_mapping()); -// -// -// -// // test zero-load value -// loadDist[0] = 1.05; -// loadDist[1] = 0; -// loadDist[2] = 0.99; -// loadDist[3] = 1.04; -// auto bp5 = allscale::tree_scheduling_policy::create_rebalanced(u,loadDist); -// auto& b5 = static_cast(*bp5); -// HPX_TEST_NEQ(u.task_distribution_mapping(),b5.task_distribution_mapping()); + auto up = allscale::tree_scheduling_policy::create_uniform(4,5); + auto& u = static_cast(*up); + + // providing a nicely balanced load should not cause any changes + auto loadDist = std::vector(4,1.0); + auto bp1 = allscale::tree_scheduling_policy::create_rebalanced(u,loadDist); + auto& b1 = static_cast(*bp1); + HPX_TEST_EQ(u.task_distribution_mapping(),b1.task_distribution_mapping()); + + // alter the distribution + loadDist[1] = 3; // node 1 has 3x more load + loadDist[3] = 2; // node 3 has 2x more load + auto bp2 = allscale::tree_scheduling_policy::create_rebalanced(u,loadDist); + auto& b2 = static_cast(*bp2); + HPX_TEST_NEQ(u.task_distribution_mapping(),b2.task_distribution_mapping()); + + + // something more homogeneous + loadDist[0] = 1.25; + loadDist[1] = 1.5; + loadDist[2] = 1.25; + loadDist[3] = 2; + auto bp3 = allscale::tree_scheduling_policy::create_rebalanced(u,loadDist); + auto& b3 = static_cast(*bp3); + HPX_TEST_NEQ(u.task_distribution_mapping(),b3.task_distribution_mapping()); + + + // something pretty even + loadDist[0] = 1.05; + loadDist[1] = 0.98; + loadDist[2] = 0.99; + loadDist[3] = 1.04; + auto bp4 = allscale::tree_scheduling_policy::create_rebalanced(u,loadDist); + auto& b4 = static_cast(*bp4); + HPX_TEST_EQ(u.task_distribution_mapping(),b4.task_distribution_mapping()); + + + + // test zero-load value + loadDist[0] = 1.05; + loadDist[1] = 0; + loadDist[2] = 0.99; + loadDist[3] = 1.04; + auto bp5 = allscale::tree_scheduling_policy::create_rebalanced(u,loadDist); + auto& b5 = static_cast(*bp5); + HPX_TEST_NEQ(u.task_distribution_mapping(),b5.task_distribution_mapping()); } -void scheduling_policy_scaling() +void scheduling_policy_resizing() +{ + auto up = allscale::tree_scheduling_policy::create_uniform(4,5); + auto& u = static_cast(*up); + + auto loadDist = std::vector(4, 1.0); + auto mask = std::vector(4, true); + auto bp1 = allscale::tree_scheduling_policy::create_rebalanced(u,loadDist, mask); + auto& b1 = static_cast(*bp1); + + HPX_TEST_EQ(u.task_distribution_mapping(), b1.task_distribution_mapping()); + + // remove node 2 + mask[2] = false; + auto bp2 = allscale::tree_scheduling_policy::create_rebalanced(u,loadDist, mask); + auto& b2 = static_cast(*bp2); + HPX_TEST_EQ("[0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,3,3,3,3,3,3,3,3,3,3,3]", toString(b2.task_distribution_mapping())); + + // re-enable node 2 + loadDist[2] = 0; + mask[2] = true; + auto bp3 = allscale::tree_scheduling_policy::create_rebalanced(b2,loadDist, mask); + auto& b3 = static_cast(*bp3); + + HPX_TEST_EQ(u.task_distribution_mapping(), b3.task_distribution_mapping()); +} + +void scheduling_policy_resizing_to_zero() { + auto up = allscale::tree_scheduling_policy::create_uniform(6,5); + auto& u = static_cast(*up); + // providing a nicely balanced load should not cause any changes + auto loadDist = std::vector(6, 1.0); + auto mask = std::vector(6, true); + auto bp1 = allscale::tree_scheduling_policy::create_rebalanced(u,loadDist, mask); + auto& b1 = static_cast(*bp1); + + // remove node 2 + mask[2] = false; + auto bp2 = allscale::tree_scheduling_policy::create_rebalanced(u,loadDist, mask); + auto& b2 = static_cast(*bp2); + HPX_TEST_EQ("[0,0,0,0,0,0,0,1,1,1,1,1,1,1,3,3,3,3,3,3,4,4,4,4,4,4,5,5,5,5,5,5]",toString(b2.task_distribution_mapping())); + + // remove node 4 + loadDist[2] = 0; + mask[4] = false; + auto bp3 = allscale::tree_scheduling_policy::create_rebalanced(b2,loadDist, mask); + auto& b3 = static_cast(*bp3); + HPX_TEST_EQ("[0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,3,3,3,3,3,3,3,5,5,5,5,5,5,5,5]",toString(b3.task_distribution_mapping())); + + // remove node 1 + loadDist[4] = 0; + mask[1] = false; + auto bp4 = allscale::tree_scheduling_policy::create_rebalanced(b3,loadDist, mask); + auto& b4 = static_cast(*bp4); + HPX_TEST_EQ("[0,0,0,0,0,0,0,0,0,0,0,0,3,3,3,3,3,3,3,3,3,3,5,5,5,5,5,5,5,5,5,5]",toString(b4.task_distribution_mapping())); + + // remove node 5 + loadDist[1] = 0; + mask[5] = false; + auto bp5 = allscale::tree_scheduling_policy::create_rebalanced(b4,loadDist, mask); + auto& b5 = static_cast(*bp5); + HPX_TEST_EQ("[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3]",toString(b5.task_distribution_mapping())); + + // remove node 0 + loadDist[5] = 0; + mask[0] = false; + auto bp6 = allscale::tree_scheduling_policy::create_rebalanced(b5,loadDist, mask); + auto& b6 = static_cast(*bp6); + HPX_TEST_EQ("[3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3]",toString(b6.task_distribution_mapping())); +} + +void scheduling_policy_scaling() +{ int N = 200; // create a policy for N nodes - auto policy = allscale::tree_scheduling_policy::create_uniform(N, 1); + auto policy = allscale::tree_scheduling_policy::create_uniform(N); auto& u = static_cast(*policy); std::cout << u.task_distribution_mapping() << "\n"; @@ -445,19 +513,89 @@ void scheduling_policy_scaling() void scheduling_policy_scaling_rebalancing() { + int N = 200; -// int N = 200; -// -// // create a policy for N nodes -// auto up = allscale::tree_scheduling_policy::create_uniform(N, 1); -// auto& u = static_cast(*up); -// -// std::vector load(N,1.0); -// auto bp = allscale::tree_scheduling_policy::create_rebalanced(u,load); -// auto& b = static_cast(*bp); -// -// HPX_TEST_EQ(u.task_distribution_mapping(),b.task_distribution_mapping()); + // create a policy for N nodes + auto up = allscale::tree_scheduling_policy::create_uniform(N, 1); + auto& u = static_cast(*up); + + std::vector load(N,1.0); + auto bp = allscale::tree_scheduling_policy::create_rebalanced(u,load); + auto& b = static_cast(*bp); + + HPX_TEST_EQ(u.task_distribution_mapping(),b.task_distribution_mapping()); + +} + +void root_handling() +{ + // create a setup of 4 nodes, only using first two nodes + std::vector mask(4, true); + mask[3] = false; + mask[2] = false; + + // create a uniform work distribution among those + auto policy = allscale::tree_scheduling_policy::create_uniform(mask); + auto& u = static_cast(*policy); + auto n = allscale::runtime::HierarchyAddress::getRootOfNetworkSize(4); + auto nl = n.getLeftChild(); + auto nr = n.getRightChild(); + auto n0 = nl.getLeftChild(); + auto n1 = nl.getRightChild(); + auto n2 = nr.getLeftChild(); + auto n3 = nr.getRightChild(); + + // check the root task + auto p = allscale::task_id::task_path::root(); + + // check involved nodes + HPX_TEST(u.is_involved(n, p)); + HPX_TEST(u.is_involved(nl, p)); + HPX_TEST(!u.is_involved(nr, p)); + HPX_TEST(!u.is_involved(n0, p)); + HPX_TEST(!u.is_involved(n1, p)); + HPX_TEST(!u.is_involved(n2, p)); + HPX_TEST(!u.is_involved(n3, p)); + + // check decision on involved nodes + HPX_TEST_EQ(allscale::schedule_decision::left, u.decide(n, p)); + HPX_TEST_EQ(allscale::schedule_decision::stay, u.decide(nl, p)); + HPX_TEST_EQ(allscale::schedule_decision::stay, u.decide(nr, p)); + + // check task .0 + auto pl = p.getLeftChildPath(); + + // check involved nodes + HPX_TEST(u.is_involved(n, pl)); + HPX_TEST(u.is_involved(nl, pl)); + HPX_TEST(!u.is_involved(nr, pl)); + HPX_TEST(u.is_involved(n0, pl)); + HPX_TEST(!u.is_involved(n1, pl)); + HPX_TEST(!u.is_involved(n2, pl)); + HPX_TEST(!u.is_involved(n3, pl)); + + // check deciision on involved nodes + HPX_TEST_EQ(allscale::schedule_decision::left, u.decide(n, pl)); + HPX_TEST_EQ(allscale::schedule_decision::left, u.decide(nl, pl)); + HPX_TEST_EQ(allscale::schedule_decision::stay, u.decide(n0, pl)); + + // check task .1 + auto pr = p.getRightChildPath(); + + // check involved nodes + HPX_TEST(u.is_involved(n, pr)); + HPX_TEST(u.is_involved(nl, pr)); + HPX_TEST(!u.is_involved(nr, pr)); + HPX_TEST(!u.is_involved(n0, pr)); + HPX_TEST(u.is_involved(n1, pr)); + HPX_TEST(!u.is_involved(n2, pr)); + HPX_TEST(!u.is_involved(n3, pr)); + + // check deciision on involved nodes + HPX_TEST_EQ(allscale::schedule_decision::left, u.decide(n, pr)); + HPX_TEST_EQ(allscale::schedule_decision::right, u.decide(nl, pr)); + HPX_TEST_EQ(allscale::schedule_decision::stay, u.decide(n1, pr)); } int hpx_main(int argc, char **argv) @@ -469,9 +607,12 @@ int hpx_main(int argc, char **argv) scheduling_policy_uniform_fixed_fine(); scheduling_policy_uniform_n3_deeper(); scheduling_policy_redirect(); - scheduling_policy_balancing(); + scheduling_policy_rebalancing(); + scheduling_policy_resizing(); + scheduling_policy_resizing_to_zero(); scheduling_policy_scaling(); scheduling_policy_scaling_rebalancing(); + root_handling(); return hpx::finalize(); }