Skip to content

Commit

Permalink
Fixing hang for scheduler!
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Heller committed Oct 22, 2018
1 parent 5db7be2 commit 06dc117
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 21 deletions.
4 changes: 3 additions & 1 deletion allscale/components/monitor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ namespace allscale { namespace components {


private:
typedef hpx::lcos::local::spinlock mutex_type;

// MONITOR MANAGEMENT
// Measuring total execution time
Expand All @@ -347,6 +348,8 @@ namespace allscale { namespace components {
std::uint64_t rank_, execution_init;

std::uint64_t num_localities_;
mutex_type init_mutex;
bool initialized = false;
bool enable_monitor;

// System parameters
Expand All @@ -361,7 +364,6 @@ namespace allscale { namespace components {
// hpx::id_type left_;
// hpx::id_type right_;

typedef hpx::lcos::local::spinlock mutex_type;
mutex_type work_map_mutex;

// Performance profiles per work item ID
Expand Down
2 changes: 1 addition & 1 deletion hpx
Submodule hpx updated 648 files
12 changes: 10 additions & 2 deletions src/components/monitor_component.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1852,6 +1852,10 @@ namespace allscale { namespace components {
}

void monitor::init() {
std::unique_lock<mutex_type> l(init_mutex);
if (initialized) return;
hpx::util::ignore_while_checking<std::unique_lock<mutex_type>> il(&l);

// bool enable_signals = true;
if(const char* env_p = std::getenv("ALLSCALE_MONITOR"))
{
Expand All @@ -1861,8 +1865,8 @@ namespace allscale { namespace components {

if(!enable_monitor)
{
std::cout << "Monitor component disabled!\n";
return;
std::cout << "Monitor component disabled!\n";
return;
}
num_localities_ = allscale::get_num_localities();

Expand Down Expand Up @@ -2155,6 +2159,10 @@ namespace allscale { namespace components {
// Init historical data
history = std::make_shared<allscale::historical_data>();

hpx::register_with_basename("allscale/monitor", this->get_id(), rank_).get();


initialized = true;

std::cerr
<< "Monitor component with rank "
Expand Down
33 changes: 28 additions & 5 deletions src/components/scheduler_component.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -848,18 +848,41 @@ std::pair<work_item, std::unique_ptr<data_item_manager::task_requirements_base>>
}
reqs->add_allowance(addr);

bool async = work.id().depth() < 2;//addr.getParent().getNumaNode() != numa_node;

hpx::future<void> acquired = reqs->acquire_split(addr);

typename hpx::traits::detail::shared_state_ptr_for<
hpx::future<void>>::type const &state =
hpx::traits::future_access<hpx::future<void>>::get_shared_state(
acquired);

state->set_on_completed(
[state, this, numa_node, work = std::move(work), reqs = std::move(reqs)]() mutable
auto f_split = [async, state, this, numa_node, work = std::move(work), reqs = std::move(reqs)]() mutable
{
auto &exec = executors_[numa_node];
work.split(exec, std::move(reqs));
});
if (async)
{
hpx::parallel::execution::post(
exec, hpx::util::annotated_function(
hpx::util::deferred_call([work = std::move(work), reqs = std::move(reqs), &exec]() mutable
{
work.split(exec, std::move(reqs));

}),
"allscale::work_item::split"));
}
else
{
work.split(exec, std::move(reqs));
}
};

if (acquired.is_ready())
{
f_split();
return std::make_pair(work_item(), std::unique_ptr<data_item_manager::task_requirements_base>());
}

state->set_on_completed(std::move(f_split));
return std::make_pair(work_item(), std::unique_ptr<data_item_manager::task_requirements_base>());
}
else
Expand Down
4 changes: 2 additions & 2 deletions src/dashboard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ namespace allscale { namespace dashboard
node_state get_state()
{
node_state state;
static allscale::components::monitor *monitor_c = &allscale::monitor::get();

allscale::components::monitor *monitor_c = &allscale::monitor::get();
//
state.rank = hpx::get_locality_id();
state.online = true;
state.active = true;
Expand Down
19 changes: 9 additions & 10 deletions src/monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ namespace allscale {
hpx::util::detail::yield_k(k, "get component...");
res = m.component_.get();
}
res->init();
return res;
}

Expand All @@ -75,17 +76,15 @@ namespace allscale {
hpx::id_type gid =
hpx::new_<components::monitor>(hpx::find_here(), rank).get();


component_ = hpx::get_ptr<components::monitor>(gid).get();
component_->init();

char *env = std::getenv("ALLSCALE_MONITOR");
if(env && env[0] == '0')
{
return;
}

hpx::register_with_basename("allscale/monitor", gid, rank).get();
// component_->init();
//
// char *env = std::getenv("ALLSCALE_MONITOR");
// if(env && env[0] == '0')
// {
// return;
// }
//
}

}
5 changes: 5 additions & 0 deletions src/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,11 @@ namespace allscale

void schedule(work_item work)
{
{
int i = 0;
unsigned j =0;
if (i < j) { return; }
}
if (policy_.value_ == replacable_policy::dynamic &&
work.id().is_root() && work.id().id > 0 && (work.id().id % 10 == 0))
{
Expand Down

0 comments on commit 06dc117

Please sign in to comment.