Skip to content

Commit

Permalink
Fixing problems uncovered by amdados pilot
Browse files Browse the repository at this point in the history
 - Data Item Manager:
    - First touch now follows the location of other existing requirements
 - Scheduling:
    - Work Items spawned as first, don't "split" the tree execution config
  • Loading branch information
Thomas Heller committed Jul 11, 2018
1 parent 870f40d commit 542f7d7
Show file tree
Hide file tree
Showing 16 changed files with 146 additions and 54 deletions.
36 changes: 22 additions & 14 deletions allscale/data_item_manager/acquire.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,35 @@ namespace allscale { namespace data_item_manager {

HPX_ASSERT(!req.region.empty());

// Resize data to the requested size...
auto& item = data_item_store<data_item_type>::lookup(req.ref);
region_type req_region;
if (req.mode == access_mode::ReadOnly)
{
req_region = std::move(req.region);
}
else
region_type remainder = req.region;
for (auto& r: info.regions)
{
boost::shared_lock<mutex_type> l(item.region_mtx);
HPX_ASSERT(req.mode == access_mode::ReadWrite);
// clip region to registered region...
req_region = region_type::intersect(item.owned_region, req.region);
remainder = region_type::difference(remainder, r.second);
}

// Resize data to the requested size...
auto& item = data_item_store<data_item_type>::lookup(req.ref);
// region_type req_region;
// if (req.mode == access_mode::ReadOnly)
// {
// req_region = std::move(req.region);
// }
// else
// {
// boost::shared_lock<mutex_type> l(item.region_mtx);
// HPX_ASSERT(req.mode == access_mode::ReadWrite);
// // clip region to registered region...
// req_region = region_type::intersect(item.owned_region, req.region);
// }
{
std::unique_lock<mutex_type> ll(item.fragment_mtx);
item.owned_region =
region_type::merge(item.owned_region, remainder);
auto& frag = fragment(req.ref, item, ll);
if (!allscale::api::core::isSubRegion(req_region, frag.getCoveredRegion()))
if (!allscale::api::core::isSubRegion(req.region, frag.getCoveredRegion()))
{
frag.resize(
region_type::merge(req_region, frag.getCoveredRegion())
region_type::merge(req.region, frag.getCoveredRegion())
);
}
}
Expand Down
58 changes: 49 additions & 9 deletions allscale/data_item_manager/acquire_rank.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,48 @@ namespace allscale { namespace data_item_manager {
return std::size_t(-2);
}

HPX_ASSERT(!info.regions.empty());
// If we have more than one part, we need to split
if (info.regions.size() > 1)
using data_item_type = typename Requirement::data_item_type;
using lease_type = allscale::lease<data_item_type>;
using transfer_action_type = transfer_action<data_item_type>;
using region_type = typename data_item_type::region_type;
using mutex_type = typename data_item_store<data_item_type>::data_item_type::mutex_type;

// If we couldn't resolve a remainder, we need to allocate the requested
// fragment. This allocation happens either on this rank or the location
// of the other requirements. As such, we return -2.
if (info.regions.empty())
{
return std::size_t(-1);
return std::size_t(-2);
}

return info.regions.begin()->first;
region_type remainder = req.region;
std::size_t dest_rank(-2);
for (auto& r: info.regions)
{
remainder = region_type::difference(remainder, r.second);

if (r.first != dest_rank)
{
if (dest_rank != std::size_t(-2))
{
return std::size_t(-1);
}
else
{
dest_rank = r.first;
}
}
}

return dest_rank;
// HPX_ASSERT(!info.regions.empty());
// // If we have more than one part, we need to split
// if (info.regions.size() > 1)
// {
// return std::size_t(-1);
// }
//
// return info.regions.begin()->first;
}

template <typename Requirement, typename RequirementAllocator,
Expand Down Expand Up @@ -70,7 +104,10 @@ namespace allscale { namespace data_item_manager {
{
std::size_t cur_rank = detail::acquire_rank(reqs[i], infos[i], write_count);
if (cur_rank == std::size_t(-2)) continue;
if (cur_rank == std::size_t(-1)) return cur_rank;
if (cur_rank == std::size_t(-1))
{
return cur_rank;
}

if (cur_rank != rank)
{
Expand Down Expand Up @@ -118,7 +155,10 @@ namespace allscale { namespace data_item_manager {
for (std::size_t cur_rank: ranks)
{
if (cur_rank == std::size_t(-2)) continue;
if (cur_rank == std::size_t(-1)) return cur_rank;
if (cur_rank == std::size_t(-1))
{
return cur_rank;
}

if (cur_rank != rank)
{
Expand Down Expand Up @@ -163,9 +203,9 @@ namespace allscale { namespace data_item_manager {
template <typename Requirement, typename LocationInfo>
void print_location_info(Requirement const& req, LocationInfo const& info)
{
if (info.regions.size() > 1)
// if (info.regions.size() > 1)
{
std::cerr << "Conflicting requirement " << typeid(req).name() << " with region: " << req.region << '\n';
std::cerr << "Conflicting requirement " << typeid(req).name() << "(" << (req.mode == access_mode::ReadOnly? "ro" : "rw") << ", "<< std::hex << req.ref.id().get_lsb() << std::dec << ") with region: " << req.region << '\n';
std::cerr << "Cannot resolve locations on " << hpx::get_locality_id() << ":\n";
for (auto const& parts: info.regions)
{
Expand Down
12 changes: 6 additions & 6 deletions allscale/data_item_manager/locate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,12 @@ namespace allscale { namespace data_item_manager {
if (src_id == this_id && !remainder.empty())
{
HPX_ASSERT(locate_state::init == state);
auto & part = info.regions[this_id];
part = region_type::merge(part, remainder);
std::unique_lock<mutex_type> l(item.region_mtx);
// merge with our own region
item.owned_region =
region_type::merge(item.owned_region, remainder);
// auto & part = info.regions[this_id];
// part = region_type::merge(part, remainder);
// std::unique_lock<mutex_type> l(item.region_mtx);
// // merge with our own region
// item.owned_region =
// region_type::merge(item.owned_region, remainder);
#if defined(ALLSCALE_DEBUG_DIM)
std::stringstream filename;
filename << "data_item." << this_id << ".log";
Expand Down
7 changes: 7 additions & 0 deletions allscale/data_item_manager/location_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ namespace allscale { namespace data_item_manager {
{
location_info() = default;

location_info(location_info const& other) = default;

location_info(location_info&& other) noexcept
: regions(std::move(other.regions))
{
}

std::unordered_map<std::size_t, Region> regions;

template <typename Archive>
Expand Down
4 changes: 2 additions & 2 deletions allscale/data_item_reference.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ namespace allscale {
, cache(nullptr)
{}

data_item_reference(data_item_reference&& other)
data_item_reference(data_item_reference&& other) noexcept
: id_(std::move(other.id_))
, cache(nullptr)
{
Expand All @@ -85,7 +85,7 @@ namespace allscale {
return *this;
}

data_item_reference& operator=(data_item_reference&& other)
data_item_reference& operator=(data_item_reference&& other) noexcept
{
id_ = other.id_;
cache.store(other.cache.load(std::memory_order_acquire), std::memory_order_release);
Expand Down
8 changes: 8 additions & 0 deletions allscale/data_item_requirement.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ namespace allscale{

data_item_requirement() = default;

data_item_requirement(data_item_requirement const&) = default;
data_item_requirement(data_item_requirement && other) noexcept
: ref(std::move(other.ref))
, region(std::move(other.region))
, mode(other.mode)
{
}

data_item_requirement(
data_item_reference<DataItemType> pref,
typename DataItemType::region_type pregion,
Expand Down
2 changes: 1 addition & 1 deletion allscale/detail/work_item_impl_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace allscale { namespace detail {
id_.update_rank(rank);
}

void set_this_id(machine_config const& config);
void set_this_id(machine_config const& config, bool is_first);
this_work_item::id const& id() const;
virtual const char* name() const=0;

Expand Down
4 changes: 4 additions & 0 deletions allscale/spawn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ namespace allscale
typedef typename WorkItemDescription::result_type result_type;
allscale::treeture<result_type> tres(treeture_init);

// auto id = this_work_item::get_id_ptr();
// if (id)
// id->reset_distribution();

scheduler::schedule(
work_item(true, WorkItemDescription(), deps.dep_, tres, std::forward<Ts>(vs)...)
);
Expand Down
4 changes: 3 additions & 1 deletion allscale/this_work_item.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ namespace allscale {

id();

void set(detail::work_item_impl_base*, machine_config const& mconfig);
void set(detail::work_item_impl_base*, machine_config const& mconfig, bool);

std::string name() const;
std::size_t last() const;
std::size_t depth() const;

void reset_distribution();

std::size_t hash() const;

id parent() const;
Expand Down
2 changes: 1 addition & 1 deletion allscale/work_item.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ namespace allscale {

void set_this_id(machine_config const& mconfig)
{
impl_->set_this_id(mconfig);
impl_->set_this_id(mconfig, is_first_);
}

bool is_first()
Expand Down
5 changes: 5 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ add_hpx_executable(
SOURCES ipic3d_allscalecc_data.cpp
COMPONENT_DEPENDENCIES allscale
)
add_hpx_executable(
amdados_allscalecc
SOURCES amdados/amdados_generated.cpp
COMPONENT_DEPENDENCIES allscale
)

add_subdirectory(pfor)
add_subdirectory(uts)
3 changes: 3 additions & 0 deletions src/components/scheduler_component.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,9 @@ namespace allscale { namespace components {


std::uint64_t schedule_rank = work.id().rank();
if (schedule_rank == std::uint64_t(-1))
schedule_rank = rank_;

if ((schedule_rank != rank_ && !work.enqueue_remote()) ||
!allscale::resilience::rank_running(schedule_rank))
{
Expand Down
4 changes: 2 additions & 2 deletions src/detail/work_item_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
#include <allscale/detail/work_item_impl_base.hpp>

namespace allscale { namespace detail {
void work_item_impl_base::set_this_id(machine_config const& mconfig)
void work_item_impl_base::set_this_id(machine_config const& mconfig, bool is_first)
{
id_.set(this, mconfig);
id_.set(this, mconfig, this->can_split() || !is_first);
}

this_work_item::id const& work_item_impl_base::id() const
Expand Down
19 changes: 16 additions & 3 deletions src/this_work_item.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ namespace allscale { namespace this_work_item {
next_id_(0)
{
HPX_ASSERT(i == 0);
reset_distribution();
}

void id::reset_distribution()
{
config_.rank_ = -1;
config_.numa_domain_ = 0;
config_.locality_depth_ = -1;//hpx::get_num_localities(hpx::launch::sync);
Expand Down Expand Up @@ -183,13 +188,13 @@ namespace allscale { namespace this_work_item {

void id::update_rank(std::size_t rank)
{
HPX_ASSERT(config_.rank_ != std::uint64_t(-1));
// HPX_ASSERT(config_.rank_ != std::uint64_t(-1));
config_.rank_ = rank;
}

std::size_t id::rank() const
{
HPX_ASSERT(config_.rank_ != std::uint64_t(-1));
// HPX_ASSERT(config_.rank_ != std::uint64_t(-1));
return config_.rank_;
}

Expand All @@ -204,13 +209,19 @@ namespace allscale { namespace this_work_item {
}

// void id::set(std::shared_ptr<detail::work_item_impl_base> wi)
void id::set(detail::work_item_impl_base* wi, machine_config const& mconfig)
void id::set(detail::work_item_impl_base* wi, machine_config const& mconfig, bool can_split)
{
id& parent = get_id();
next_id_ = 0;
id_ = parent.id_;
id_.push_back(parent.next_id_++);

if (!can_split)
{
config_ = parent.config_;
return;
}

// HPX_ASSERT(parent.config_.rank_ != std::uint64_t(-1));
// HPX_ASSERT(parent.config_.locality_depth_ != std::uint64_t(-1));
// HPX_ASSERT(parent.config_.numa_depth_ != std::uint8_t(-1));
Expand All @@ -222,12 +233,14 @@ namespace allscale { namespace this_work_item {
}
else
{
// if ((id_.back() & 1) == 0)
if (id_.back() == 0)
{
setup_left(mconfig, parent.config_);
}
else
{
// HPX_ASSERT((id_.back() & 1) == 1);
setup_right(mconfig, parent.config_);
}
}
Expand Down
30 changes: 15 additions & 15 deletions tests/performance/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@ set(tests
data_item_get
data_item_get_invariant
stream
tpc.allscalecc.latency_n29
tpc.allscalecc.latency_n30
tpc.allscalecc.latency_n32
tpc.allscalecc.latency_n34
tpc.allscalecc.latency_n36
tpc.allscalecc.throughput_2_n29
tpc.allscalecc.throughput_2_n30
tpc.allscalecc.throughput_2_n32
tpc.allscalecc.throughput_2_n34
tpc.allscalecc.throughput_2_n36
tpc.allscalecc.throughput_n29
tpc.allscalecc.throughput_n30
tpc.allscalecc.throughput_n32
tpc.allscalecc.throughput_n34
tpc.allscalecc.throughput_n36
#tpc.allscalecc.latency_n29
#tpc.allscalecc.latency_n30
#tpc.allscalecc.latency_n32
#tpc.allscalecc.latency_n34
#tpc.allscalecc.latency_n36
#tpc.allscalecc.throughput_2_n29
#tpc.allscalecc.throughput_2_n30
#tpc.allscalecc.throughput_2_n32
#tpc.allscalecc.throughput_2_n34
#tpc.allscalecc.throughput_2_n36
#tpc.allscalecc.throughput_n29
#tpc.allscalecc.throughput_n30
#tpc.allscalecc.throughput_n32
#tpc.allscalecc.throughput_n34
#tpc.allscalecc.throughput_n36
transpose
stencil
)
Expand Down
2 changes: 2 additions & 0 deletions tools/fixup_allscalecc.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ def fixup(pattern, replace):
fixup(r'std::vector<(.*), std::allocator<\1 > >', r'std::vector<\1>')
fixup(r'__gnu_cxx::__normal_iterator<const ([^\*]*)\*, std::vector<\1> >', r'std::vector<\1>::const_iterator')
fixup(r'__gnu_cxx::__normal_iterator<([^\*]*)\*, std::vector<\1> >', r'std::vector<\1>::iterator')
fixup(r'std::_Rb_tree_iterator<std::pair<const ([^>]*)> >', r'std::map<\1>::iterator')
fixup(r'std::_Rb_tree_const_iterator<std::pair<const ([^>]*)> >', r'std::map<\1>::const_iterator')
fixup(r'__gnu_cxx::__normal_iterator<const char\*, std::string >', r'std::string::const_iterator')
fixup(r'__gnu_cxx::__normal_iterator<char\*, std::string >', r'std::string::iterator')
fixup(r'(var_[0-9]+)\.operator\*\(\).operator string_type\(\)', r'*\1')

0 comments on commit 542f7d7

Please sign in to comment.