Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 75 additions & 45 deletions src/realm/transfer/transfer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3534,6 +3534,19 @@ namespace Realm {
}
}

TransferDesc::TransferDesc(TestTag, TransferDomain *_domain)
: refcount(1)
, deferred_analysis(this)
, prs()
, analysis_complete(false)
, analysis_successful(false)
, fill_data(0)
, fill_size(0)
, analysis_init_done(false)
{
domain = _domain;
}

void TransferDesc::check_analysis_preconditions()
{
log_xplan.info() << "created: plan=" << (void *)this << " domain=" << *domain
Expand Down Expand Up @@ -3609,7 +3622,7 @@ namespace Realm {
}

// no (untriggered) preconditions, so we fall through to immediate analysis
perform_analysis();
perform_analysis(TimeLimit());
}

static size_t compute_ib_size(size_t combined_field_size, size_t domain_size,
Expand Down Expand Up @@ -3678,65 +3691,81 @@ namespace Realm {
const std::vector<TransferGraph::IBInfo> &edges;
};

void TransferDesc::perform_analysis()
bool TransferDesc::perform_analysis(TimeLimit work_until)
{
// initialize profiling data
prof_usage.source = Memory::NO_MEMORY;
prof_usage.target = Memory::NO_MEMORY;
prof_usage.size = 0;
if(!analysis_init_done) {
// initialize profiling data
prof_usage.source = Memory::NO_MEMORY;
prof_usage.target = Memory::NO_MEMORY;
prof_usage.size = 0;

// quick check - if the domain is empty, there's nothing to actually do
if(domain->empty()) {
log_xplan.debug() << "analysis: plan=" << (void *)this << " empty";

// well, we still have to poke pending ops
std::vector<TransferOperation *> to_alloc;
{
AutoLock<> al(mutex);
to_alloc.swap(pending_ops);
// release before the mutex is released so to_alloc is visible before the
// analysis_complete flag is set
analysis_complete.store_release(true);
}

// quick check - if the domain is empty, there's nothing to actually do
if(domain->empty()) {
log_xplan.debug() << "analysis: plan=" << (void *)this << " empty";
for(size_t i = 0; i < to_alloc.size(); i++) {
to_alloc[i]->allocate_ibs();
}
return true;
}

// well, we still have to poke pending ops
std::vector<TransferOperation *> to_alloc;
{
AutoLock<> al(mutex);
to_alloc.swap(pending_ops);
// release before the mutex is released so to_alloc is visible before the
// analysis_complete flag is set
analysis_complete.store_release(true);
// first, scan over the sources and figure out how much space we need
// for fill data - don't need to know field order yet
for(size_t i = 0; i < srcs.size(); i++) {
if(srcs[i].field_id == FieldID(-1)) {
fill_size += srcs[i].size;
}
}

for(size_t i = 0; i < to_alloc.size(); i++) {
to_alloc[i]->allocate_ibs();
if(fill_size > 0) {
fill_data = malloc(fill_size);
assert(fill_data);
}
return;
}

size_t domain_size = domain->volume();
// for now, pick a global dimension ordering
// TODO: allow this to vary for independent subgraphs (or dependent ones
// with transposes in line)
domain->choose_dim_order(dim_order, srcs, dsts, indirects,
false /*!force_fortran_order*/, 65536 /*max_stride*/);

// first, scan over the sources and figure out how much space we need
// for fill data - don't need to know field order yet
for(size_t i = 0; i < srcs.size(); i++) {
if(srcs[i].field_id == FieldID(-1)) {
fill_size += srcs[i].size;
}
}
src_fields.resize(srcs.size());
dst_fields.resize(dsts.size());

size_t fill_ofs = 0;
if(fill_size > 0) {
fill_data = malloc(fill_size);
assert(fill_data);
analysis_init_done = true;
analysis_field_idx = 0;
analysis_fld_start = 0;
analysis_fill_ofs = 0;
analysis_field_done.assign(srcs.size(), false);
}

// for now, pick a global dimension ordering
// TODO: allow this to vary for independent subgraphs (or dependent ones
// with transposes in line)
domain->choose_dim_order(dim_order, srcs, dsts, indirects,
false /*!force_fortran_order*/, 65536 /*max_stride*/);

src_fields.resize(srcs.size());
dst_fields.resize(dsts.size());
size_t domain_size = domain->volume();

// TODO: look at layouts and decide if fields should be grouped into
// a smaller number of copies
assert(srcs.size() == dsts.size());
std::vector<bool> field_done(srcs.size(), false);
size_t fill_ofs = analysis_fill_ofs;
size_t fld_start = analysis_fld_start;
auto &field_done = analysis_field_done;
// fields will get reordered to be contiguous per xd subgraph
size_t fld_start = 0;
for(size_t i = 0; i < srcs.size(); i++) {
for(size_t i = analysis_field_idx; i < srcs.size(); i++) {
// check time limit between field iterations
if(work_until.is_expired()) {
analysis_fill_ofs = fill_ofs;
analysis_fld_start = fld_start;
analysis_field_idx = i;
return false;
}

// did this field already get grouped into a previous path?
if(field_done[i]) {
continue;
Expand Down Expand Up @@ -4257,6 +4286,7 @@ namespace Realm {
for(size_t i = 0; i < to_alloc.size(); i++) {
to_alloc[i]->allocate_ibs();
}
return true;
}

void TransferDesc::cancel_analysis(Event failed_precondition)
Expand Down Expand Up @@ -4294,7 +4324,7 @@ namespace Realm {
if(poisoned) {
desc->cancel_analysis(precondition);
} else {
desc->perform_analysis();
desc->perform_analysis(TimeLimit());
}
}

Expand Down
15 changes: 14 additions & 1 deletion src/realm/transfer/transfer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include "realm/transfer/channel.h"
#include "realm/profiling.h"

class PerformAnalysisTest;

namespace Realm {

// the data transfer engine has too much code to have it all be templated on the
Expand Down Expand Up @@ -424,8 +426,13 @@ namespace Realm {
protected:
atomic<int> refcount;

// test-only constructor: creates a TransferDesc with the given domain,
// bypassing check_analysis_preconditions and TransferDomain::construct
struct TestTag {};
TransferDesc(TestTag, TransferDomain *_domain);

void check_analysis_preconditions();
void perform_analysis();
bool perform_analysis(TimeLimit work_until);
void cancel_analysis(Event failed_precondition);

class DeferredAnalysis : public EventWaiter {
Expand All @@ -441,6 +448,7 @@ namespace Realm {
DeferredAnalysis deferred_analysis;

friend class TransferOperation;
friend class ::PerformAnalysisTest;

TransferDomain *domain;
std::vector<CopySrcDstField> srcs, dsts;
Expand All @@ -456,6 +464,11 @@ namespace Realm {
std::vector<FieldInfo> src_fields, dst_fields;
void *fill_data;
size_t fill_size;
bool analysis_init_done;
size_t analysis_field_idx;
size_t analysis_fld_start;
size_t analysis_fill_ofs;
std::vector<bool> analysis_field_done;
ProfilingMeasurements::OperationMemoryUsage prof_usage;
ProfilingMeasurements::OperationCopyInfo prof_cpinfo;
};
Expand Down
1 change: 1 addition & 0 deletions src/realm/transfer/transfer.inl
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ namespace Realm {
, analysis_successful(false)
, fill_data(0)
, fill_size(0)
, analysis_init_done(false)
{
domain = TransferDomain::construct(_is);

Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ list(
nodeset_test.cc
transfer_iterator_test.cc
lowlevel_dma_test.cc
perform_analysis_test.cc
circ_queue_test.cc
gather_scatter_test.cc
sparsity_map_test.cc
Expand Down
Loading
Loading