Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 33 additions & 35 deletions src/alpaka/AlpakaCore/AtomicPairCounter.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,49 @@

#include <cstdint>

namespace cms {
namespace alpakatools {
namespace cms::alpakatools {

class AtomicPairCounter {
public:
using c_type = unsigned long long int;
class AtomicPairCounter {
public:
using c_type = unsigned long long int;

ALPAKA_FN_HOST_ACC AtomicPairCounter() {}
ALPAKA_FN_HOST_ACC AtomicPairCounter(c_type i) { counter.ac = i; }
ALPAKA_FN_HOST_ACC AtomicPairCounter() {}
ALPAKA_FN_HOST_ACC AtomicPairCounter(c_type i) { counter.ac = i; }

ALPAKA_FN_HOST_ACC AtomicPairCounter& operator=(c_type i) {
counter.ac = i;
return *this;
}
ALPAKA_FN_HOST_ACC AtomicPairCounter& operator=(c_type i) {
counter.ac = i;
return *this;
}

struct Counters {
uint32_t n; // in a "One to Many" association is the number of "One"
uint32_t m; // in a "One to Many" association is the total number of associations
};
struct Counters {
uint32_t n; // in a "One to Many" association is the number of "One"
uint32_t m; // in a "One to Many" association is the total number of associations
};

union Atomic2 {
Counters counters;
c_type ac;
};
union Atomic2 {
Counters counters;
c_type ac;
};

static constexpr c_type incr = 1UL << 32;
static constexpr c_type incr = 1UL << 32;

ALPAKA_FN_HOST_ACC Counters get() const { return counter.counters; }
ALPAKA_FN_HOST_ACC Counters get() const { return counter.counters; }

// increment n by 1 and m by i. return previous value
template <typename T_Acc>
ALPAKA_FN_ACC ALPAKA_FN_INLINE Counters add(const T_Acc& acc, uint32_t i) {
c_type c = i;
c += incr;
// increment n by 1 and m by i. return previous value
template <typename T_Acc>
ALPAKA_FN_ACC ALPAKA_FN_INLINE Counters add(const T_Acc& acc, uint32_t i) {
c_type c = i;
c += incr;

Atomic2 ret;
ret.ac = alpaka::atomicAdd(acc, &counter.ac, c, alpaka::hierarchy::Blocks{});
return ret.counters;
}
Atomic2 ret;
ret.ac = alpaka::atomicAdd(acc, &counter.ac, c, alpaka::hierarchy::Blocks{});
return ret.counters;
}

private:
Atomic2 counter;
};
private:
Atomic2 counter;
};

} // namespace alpakatools
} // namespace cms
} // namespace cms::alpakatools

#endif // HeterogeneousCore_CUDAUtilities_interface_AtomicPairCounter_h
62 changes: 62 additions & 0 deletions src/alpaka/AlpakaCore/ContextState.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#ifndef HeterogeneousCore_AlpakaCore_ContextState_h
#define HeterogeneousCore_AlpakaCore_ContextState_h

#include <memory>

#include "AlpakaCore/alpakaConfig.h"
#include "AlpakaCore/SharedStreamPtr.h"

namespace cms::alpakatools::ALPAKA_ACCELERATOR_NAMESPACE {

/**
* The purpose of this class is to deliver the device and CUDA stream
* information from ExternalWork's acquire() to producer() via a
* member/StreamCache variable.
*/
class ContextState {
public:
ContextState() = default;
~ContextState() = default;

ContextState(const ContextState&) = delete;
ContextState& operator=(const ContextState&) = delete;
ContextState(ContextState&&) = delete;
ContextState& operator=(ContextState&& other) = delete;

private:
friend class ScopedContextAcquire;
friend class ScopedContextProduce;
friend class ScopedContextTask;

void set(int device, SharedStreamPtr stream) {
throwIfStream();
device_ = device;
stream_ = std::move(stream);
}

int device() const { return device_; }

const SharedStreamPtr& streamPtr() const {
throwIfNoStream();
return stream_;
}

SharedStreamPtr releaseStreamPtr() {
throwIfNoStream();
// This function needs to effectively reset stream_ (i.e. stream_
// must be empty after this function). This behavior ensures that
// the SharedStreamPtr is not hold for inadvertedly long (i.e. to
// the next event), and is checked at run time.
return std::move(stream_);
}

void throwIfStream() const;
void throwIfNoStream() const;

SharedStreamPtr stream_;
int device_;
};

} // namespace cms::alpakatools::ALPAKA_ACCELERATOR_NAMESPACE

#endif // HeterogeneousCore_AlpakaCore_ContextState_h
104 changes: 104 additions & 0 deletions src/alpaka/AlpakaCore/ESProduct.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#ifndef HeterogeneousCore_AlpakaCore_ESProduct_h
#define HeterogeneousCore_AlpakaCore_ESProduct_h

#include <atomic>
#include <cassert>
#include <mutex>
#include <vector>

#include "AlpakaCore/alpakaConfig.h"
#include "AlpakaCore/EventCache.h"
#include "AlpakaCore/currentDevice.h"
#include "AlpakaCore/eventWorkHasCompleted.h"

namespace cms::alpakatools::ALPAKA_ACCELERATOR_NAMESPACE {

template <typename T>
class ESProduct {
public:
template <typename T_Acc>
ESProduct(T_Acc acc) : gpuDataPerDevice_(::cms::alpakatools::ALPAKA_ACCELERATOR_NAMESPACE::deviceCount()) {
for (size_t i = 0; i < gpuDataPerDevice_.size(); ++i) {
gpuDataPerDevice_[i].m_event = ::cms::alpakatools::ALPAKA_ACCELERATOR_NAMESPACE::getEventCache().get(acc);
}
}

~ESProduct() = default;

// transferAsync should be a function of (T&, cudaStream_t)
// which enqueues asynchronous transfers (possibly kernels as well)
// to the CUDA stream
template <typename F>
const T& dataForCurrentDeviceAsync(::ALPAKA_ACCELERATOR_NAMESPACE::Queue queue, F transferAsync) const {
auto device = currentDevice();
auto& data = gpuDataPerDevice_[device];

// If GPU data has already been filled, we can return it
// immediately
if (not data.m_filled.load()) {
// It wasn't, so need to fill it
std::scoped_lock<std::mutex> lk{data.m_mutex};

if (data.m_filled.load()) {
// Other thread marked it filled while we were locking the mutex, so we're free to return it
return data.m_data;
}

if (data.m_fillingStream != nullptr) {
// Someone else is filling

// Check first if the recorded event has occurred
if (eventWorkHasCompleted(data.m_event.get())) {
// It was, so data is accessible from all CUDA streams on
// the device. Set the 'filled' for all subsequent calls and
// return the value
auto should_be_false = data.m_filled.exchange(true);
assert(not should_be_false);
data.m_fillingStream = nullptr;
} else if (data.m_fillingStream != queue) {
// Filling is still going on. For other CUDA stream, add
// wait on the CUDA stream and return the value. Subsequent
// work queued on the stream will wait for the event to
// occur (i.e. transfer to finish).
alpaka::wait(queue, data.m_event.get());
}
// else: filling is still going on. But for the same CUDA
// stream (which would be a bit strange but fine), we can just
// return as all subsequent work should be enqueued to the
// same CUDA stream (or stream to be explicitly synchronized
// by the caller)
} else {
// Now we can be sure that the data is not yet on the GPU, and
// this thread is the first to try that.
transferAsync(data.m_data, queue);
assert(data.m_fillingStream == nullptr);
data.m_fillingStream = queue;
// Record in the cudaStream an event to mark the readiness of the
// EventSetup data on the GPU, so other streams can check for it
alpaka::enqueue(queue, data.m_event.get());
// Now the filling has been enqueued to the cudaStream, so we
// can return the GPU data immediately, since all subsequent
// work must be either enqueued to the cudaStream, or the cudaStream
// must be synchronized by the caller
}
}

return data.m_data;
}

private:
struct Item {
mutable std::mutex m_mutex;
mutable SharedEventPtr m_event; // guarded by m_mutex
// non-null if some thread is already filling (cudaStream_t is just a pointer)
mutable ::ALPAKA_ACCELERATOR_NAMESPACE::Queue* m_fillingStream = nullptr; // guarded by m_mutex
mutable std::atomic<bool> m_filled = false; // easy check if data has been filled already or not
mutable T m_data; // guarded by m_mutex
};

std::vector<Item> gpuDataPerDevice_;
};

} // namespace cms::alpakatools::ALPAKA_ACCELERATOR_NAMESPACE

#endif // HeterogeneousCore_AlpakaCore_ESProduct_h
76 changes: 76 additions & 0 deletions src/alpaka/AlpakaCore/EventCache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#ifndef HeterogeneousCore_AlpakaUtilities_EventCache_h
#define HeterogeneousCore_AlpakaUtilities_EventCache_h

#include <vector>

#include <cuda_runtime.h>

#include "AlpakaCore/ScopedSetDevice.h"
#include "AlpakaCore/SharedEventPtr.h"
#include "AlpakaCore/alpakaConfig.h"
#include "AlpakaCore/currentDevice.h"
#include "AlpakaCore/deviceCount.h"
#include "AlpakaCore/eventWorkHasCompleted.h"
#include "Framework/ReusableObjectHolder.h"

class CUDAService;

namespace cms::alpakatools::ALPAKA_ACCELERATOR_NAMESPACE {

class EventCache {
public:
EventCache();

// Gets a (cached) CUDA event for the current device. The event
// will be returned to the cache by the shared_ptr destructor. The
// returned event is guaranteed to be in the state where all
// captured work has completed, i.e. cudaEventQuery() == cudaSuccess.
//
// This function is thread safe
template <typename T_Acc>
SharedEventPtr get(T_Acc acc) {
const auto dev = currentDevice();
auto event = makeOrGet(dev, acc);
// captured work has completed, or a just-created event
if (eventWorkHasCompleted(*(event.get()))) {
return event;
}

// Got an event with incomplete captured work. Try again until we
// get a completed (or a just-created) event. Need to keep all
// incomplete events until a completed event is found in order to
// avoid ping-pong with an incomplete event.
std::vector<SharedEventPtr> ptrs{std::move(event)};
bool completed;
do {
event = makeOrGet(dev, acc);
completed = eventWorkHasCompleted(*(event.get()));
if (not completed) {
ptrs.emplace_back(std::move(event));
}
} while (not completed);
return event;
}

private:
friend class ::CUDAService;

template <typename T_Acc>
SharedEventPtr makeOrGet(int dev, T_Acc acc) {
return cache_[dev].makeOrGet(
[dev, acc]() { return std::make_unique<::ALPAKA_ACCELERATOR_NAMESPACE::Event>(acc); });
}

// not thread safe, intended to be called only from CUDAService destructor
void clear();

std::vector<edm::ReusableObjectHolder<::ALPAKA_ACCELERATOR_NAMESPACE::Event>> cache_;
};

// Gets the global instance of a EventCache
// This function is thread safe
EventCache& getEventCache();

} // namespace cms::alpakatools::ALPAKA_ACCELERATOR_NAMESPACE

#endif // HeterogeneousCore_AlpakaUtilities_EventCache_h
Loading