Skip to content

Commit

Permalink
pfor using data_items but no real synchronization yet
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrx committed Mar 3, 2017
1 parent e9097a1 commit 4c1facd
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 117 deletions.
1 change: 1 addition & 0 deletions allscale/data_item.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ namespace allscale
region_(other.region_),
fragment_(other.fragment_)
{
std::cout<<"copy called" << std::endl;
}

data_item( data_item && other)
Expand Down
105 changes: 51 additions & 54 deletions allscale/fragment.hpp
Original file line number Diff line number Diff line change
@@ -1,62 +1,59 @@
#ifndef ALLSCALE_FRAGMENT_HPP
#define ALLSCALE_FRAGMENT_HPP


#include <hpx/include/serialization.hpp>
#include <memory>

namespace allscale
{
template<typename Region, typename T>
struct fragment
{
public:
using value_type = T;
using region_type = Region;

fragment ()
{
}

fragment(Region const& r,T t) : region_(r)
{
T *tmp = new T(t);
ptr_ = std::shared_ptr<T> (tmp);
}

void resize(fragment const& fragment, Region const& region)
{
}

fragment mask(fragment const& fragment, Region const& region)
{
}

void insert(fragment & destination, fragment const& source, Region const& region)
{
}

template<typename Archive>
void save(Archive& ar, fragment const& fragment)
{
}


template<typename Archive>
void serialize(Archive & ar, unsigned)
{
ar & region_;
ar & ptr_;
}

template<typename Archive>
void load(Archive& ar, fragment & fragment)
{
}

Region region_;
std::shared_ptr<T> ptr_;
};
#include <iostream>
namespace allscale {
template<typename Region, typename T>
struct fragment {
public:
using value_type = T;
using region_type = Region;

fragment() {
}

fragment(Region const& r, T t) :
region_(r) {
T *tmp = new T(t);
ptr_ = std::shared_ptr < T > (tmp);
}

fragment(Region const& r, std::shared_ptr<T> ptr ) :
region_(r),
ptr_(ptr)
{

}

void resize(fragment const& fragment, Region const& region) {
}

fragment mask(fragment const& fragment, Region const& region) {
}

void insert(fragment & destination, fragment const& source,
Region const& region) {
}

template<typename Archive>
void save(Archive& ar, fragment const& fragment) {
}

template<typename Archive>
void serialize(Archive & ar, unsigned) {
ar & region_;
ar & ptr_;
}

template<typename Archive>
void load(Archive& ar, fragment & fragment) {
}

Region region_;
std::shared_ptr<T> ptr_;
};
}

#endif
Binary file not shown.
3 changes: 3 additions & 0 deletions examples/pfor/fine_grained.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ int hpx_main(int argc, char **argv)
allscale::scheduler::stop();
}




return hpx::finalize();
}

Expand Down
177 changes: 120 additions & 57 deletions examples/pfor/fine_grained_with_data_items.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

#include <allscale/no_split.hpp>
#include <allscale/no_serialization.hpp>
#include <allscale/do_serialization.hpp>
Expand All @@ -16,80 +15,144 @@

#include "pfor_with_data_items.hpp"

#include <allscale/data_item.hpp>
#include <allscale/treeture.hpp>
#include <allscale/data_item_description.hpp>
#include <allscale/region.hpp>
#include <allscale/fragment.hpp>

static const int DEFAULT_SIZE = 128 * 1024 * 1024;

static std::vector<int> dataA;
static std::vector<int> dataB;

struct simple_stencil_body {
void operator()(std::int64_t i, const hpx::util::tuple<std::int64_t,std::int64_t>& params) const {
// extract parameters
auto t = hpx::util::get<0>(params);
auto n = hpx::util::get<1>(params);

HPX_ASSERT(i < n);
HPX_ASSERT(i < dataA.size());
HPX_ASSERT(i < dataB.size());

// figure out in which way to move the data
int* A = (t%2) ? dataA.data() : dataB.data();
int* B = (t%2) ? dataB.data() : dataA.data();

// check current state
if ((i > 0 && A[i-1] != A[i]) || (i < n-1 && A[i] != A[i+1])) {
std::cout << "Error in synchronization!\n";
std::cout << " for i=" << i << "\n";
std::cout << " A[i-1]=" << A[i-1] << "\n";
std::cout << " A[ i ]=" << A[ i ] << "\n";
std::cout << " A[i+1]=" << A[i+1] << "\n";
exit(42);
}

// update B
B[i] = A[i] + 1;
}
void operator()(std::int64_t i,
const hpx::util::tuple<std::int64_t, std::int64_t, test_data_item*,
test_data_item*>& params) const {
// extract parameters
auto t = hpx::util::get<0>(params);
auto n = hpx::util::get<1>(params);

test_data_item *td_one = (test_data_item*) hpx::util::get<2>(params);
test_data_item *td_two = (test_data_item*) hpx::util::get<3>(params);

auto a = ((*td_one).fragment_.ptr_);
auto b = ((*td_one).fragment_.ptr_);

//auto b = (std::vector<int>*) ((*td_two).fragment_.ptr_);


//auto td_two = hpx::util::get<3>(params);
//auto b = (std::vector<int>) * (td_two.fragment_.ptr_);
//auto res = (std::int64_t) *(td_one.fragment_.ptr_);
//res++;
//( * ((*td_one).fragment_.ptr_))++;
//std::cout<< ( * ((*td_one).fragment_.ptr_)) <<std::endl;
//(*a)[i]++;
//std::cout<< (*a)[i] << std::endl;
HPX_ASSERT(i < n);

HPX_ASSERT(i < (*a).size());
HPX_ASSERT(i < (*b).size());

//HPX_ASSERT(i < a.size());
//HPX_ASSERT(i < b.size());

//a[0] = a[i]+1;
//std::cout<<a[i];
// figure out in which way to move the data
/*
int* A = (t % 2) ? dataA.data() : dataB.data();
int* B = (t % 2) ? dataB.data() : dataA.data();
*/


int* A = (t % 2) ? (*a).data() : (*b).data();
int* B = (t % 2) ? (*b).data() : (*a).data();


// check current state
/*
if ((i > 0 && A[i - 1] != A[i]) || (i < n - 1 && A[i] != A[i + 1])) {
std::cout << "Error in synchronization!\n";
std::cout << " for i=" << i << "\n";
std::cout << " A[i-1]=" << A[i - 1] << "\n";
std::cout << " A[ i ]=" << A[i] << "\n";
std::cout << " A[i+1]=" << A[i + 1] << "\n";
exit(42);
}
*/
// update B
B[i] = A[i] + 1;

}
};

int hpx_main(int argc, char **argv) {

// start allscale scheduler ...
allscale::scheduler::run(hpx::get_locality_id());

std::int64_t n = argc >= 2 ? std::stoi(std::string(argv[1])) : DEFAULT_SIZE;
std::int64_t steps = argc >= 3 ? std::stoi(std::string(argv[2])) : 5;
std::int64_t iters = argc >= 4 ? std::stoi(std::string(argv[3])) : 1;

// initialize the data array
dataA.resize(n, 0);
dataB.resize(n, 0);

std::vector<int> a(n, 0);
std::vector<int> b(n, 0);


std::vector<hpx::naming::id_type> localities = hpx::find_all_localities();

descr test_descr;
my_region test_region(2);
// my_fragment frag(test_region,7);

my_fragment frag(test_region, std::make_shared<std::vector<int>>(a));
test_data_item td(localities[0], test_descr, frag);

int hpx_main(int argc, char **argv)
{
descr test_descr2;
my_region test_region2(2);
my_fragment frag2(test_region, std::make_shared<std::vector<int>>(b));
//my_fragment frag2(test_region,7);

// start allscale scheduler ...
allscale::scheduler::run(hpx::get_locality_id());
test_data_item td2(localities[0], test_descr2, frag2);

std::int64_t n = argc >= 2 ? std::stoi(std::string(argv[1])) : DEFAULT_SIZE;
std::int64_t steps = argc >= 3 ? std::stoi(std::string(argv[2])) : 1000;
std::int64_t iters = argc >= 4 ? std::stoi(std::string(argv[2])) : 1;
if (hpx::get_locality_id() == 0) {
for (int i = 0; i < iters; i++) {
std::cout << "Starting " << steps << "x pfor(0.." << n << "), "
<< "Iter: " << i << "\n";
hpx::util::high_resolution_timer t;

// initialize the data array
dataA.resize(n, 0);
dataB.resize(n, 0);
{
pfor_loop_handle last;
for (int t = 0; t < steps; t++) {
last = pfor_neighbor_sync<simple_stencil_body>(last, 0, n,
t, n, &td, &td2);
}
}

if(hpx::get_locality_id() == 0)
{
for(int i=0; i<iters; i++) {
std::cout << "Starting " << steps << "x pfor(0.." << n << "), " << "Iter: " << i << "\n";
hpx::util::high_resolution_timer t;
auto elapsed = t.elapsed_microseconds();
std::cout << "pfor(0.." << n << ") taking " << elapsed
<< " microseconds. Iter: " << i << "\n";
}
allscale::scheduler::stop();
}

{
pfor_loop_handle last;
for(int t=0; t<steps; t++) {
last = pfor_neighbor_sync<simple_stencil_body>(last,0,n,t,n);
}
}
auto k = (std::vector<int>) * (td.fragment_.ptr_);
for(auto el : k){
std::cout<<el<<" "<<std::endl;
}

auto elapsed = t.elapsed_microseconds();
std::cout << "pfor(0.." << n << ") taking " << elapsed << " microseconds. Iter: " << i << "\n";
}
allscale::scheduler::stop();
}

return hpx::finalize();
return hpx::finalize();
}

int main(int argc, char **argv)
{
return hpx::init(argc, argv);
int main(int argc, char **argv) {
return hpx::init(argc, argv);
}

Loading

0 comments on commit 4c1facd

Please sign in to comment.