Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions src/alpaka/AlpakaCore/ContextState.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include <memory>

#include "AlpakaCore/alpakaConfig.h"
#include "AlpakaCore/SharedStreamPtr.h"

namespace cms::alpakatools::ALPAKA_ACCELERATOR_NAMESPACE {

Expand All @@ -15,6 +14,9 @@ namespace cms::alpakatools::ALPAKA_ACCELERATOR_NAMESPACE {
*/
class ContextState {
public:
using Queue = ::ALPAKA_ACCELERATOR_NAMESPACE::Queue;
using Device = alpaka::Dev<Queue>;

ContextState() = default;
~ContextState() = default;

Expand All @@ -28,33 +30,31 @@ namespace cms::alpakatools::ALPAKA_ACCELERATOR_NAMESPACE {
friend class ScopedContextProduce;
friend class ScopedContextTask;

void set(int device, SharedStreamPtr stream) {
void set(std::shared_ptr<Queue> stream) {
throwIfStream();
device_ = device;
stream_ = std::move(stream);
}

int device() const { return device_; }
Device device() const { return alpaka::getDev(*stream_); }

const SharedStreamPtr& streamPtr() const {
const std::shared_ptr<Queue>& streamPtr() const {
throwIfNoStream();
return stream_;
}

SharedStreamPtr releaseStreamPtr() {
std::shared_ptr<Queue> releaseStreamPtr() {
throwIfNoStream();
// This function needs to effectively reset stream_ (i.e. stream_
// must be empty after this function). This behavior ensures that
// the SharedStreamPtr is not hold for inadvertedly long (i.e. to
// the std::shared_ptr<Queue> is not hold for inadvertedly long (i.e. to
// the next event), and is checked at run time.
return std::move(stream_);
}

void throwIfStream() const;
void throwIfNoStream() const;

SharedStreamPtr stream_;
int device_;
std::shared_ptr<Queue> stream_;
};

} // namespace cms::alpakatools::ALPAKA_ACCELERATOR_NAMESPACE
Expand Down
8 changes: 4 additions & 4 deletions src/alpaka/AlpakaCore/ESProduct.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

#include "AlpakaCore/alpakaConfig.h"
#include "AlpakaCore/EventCache.h"
#include "AlpakaCore/SharedEventPtr.h"
#include "AlpakaCore/currentDevice.h"
#include "AlpakaCore/deviceCount.h"
#include "AlpakaCore/eventWorkHasCompleted.h"
Expand All @@ -18,11 +17,12 @@ namespace cms::alpakatools::ALPAKA_ACCELERATOR_NAMESPACE {
template <typename T>
class ESProduct {
public:
using Event = ::ALPAKA_ACCELERATOR_NAMESPACE::Event;

template <typename T_Acc>
ESProduct(T_Acc acc) : gpuDataPerDevice_(::cms::alpakatools::ALPAKA_ACCELERATOR_NAMESPACE::deviceCount()) {
for (size_t i = 0; i < gpuDataPerDevice_.size(); ++i) {
gpuDataPerDevice_[i].m_event =
::cms::alpakatools::getEventCache<::ALPAKA_ACCELERATOR_NAMESPACE::Event>().get(acc);
gpuDataPerDevice_[i].m_event = ::cms::alpakatools::getEventCache<Event>().get(acc);
}
}

Expand Down Expand Up @@ -92,7 +92,7 @@ namespace cms::alpakatools::ALPAKA_ACCELERATOR_NAMESPACE {
private:
struct Item {
mutable std::mutex m_mutex;
mutable SharedEventPtr m_event; // guarded by m_mutex
mutable std::shared_ptr<Event> m_event; // guarded by m_mutex
// non-null if some thread is already filling (cudaStream_t is just a pointer)
mutable ::ALPAKA_ACCELERATOR_NAMESPACE::Queue* m_fillingStream = nullptr; // guarded by m_mutex
mutable std::atomic<bool> m_filled = false; // easy check if data has been filled already or not
Expand Down
8 changes: 4 additions & 4 deletions src/alpaka/AlpakaCore/Product.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ namespace cms::alpakatools::ALPAKA_ACCELERATOR_NAMESPACE {
friend class ScopedContextProduce;
friend class edm::Wrapper<Product<T>>;

explicit Product(int device, SharedStreamPtr stream, SharedEventPtr event, T data)
: ProductBase(device, std::move(stream), std::move(event)), data_(std::move(data)) {}
explicit Product(std::shared_ptr<Queue> stream, std::shared_ptr<Event> event, T data)
: ProductBase(std::move(stream), std::move(event)), data_(std::move(data)) {}

template <typename... Args>
explicit Product(int device, SharedStreamPtr stream, SharedEventPtr event, Args&&... args)
: ProductBase(device, std::move(stream), std::move(event)), data_(std::forward<Args>(args)...) {}
explicit Product(std::shared_ptr<Queue> stream, std::shared_ptr<Event> event, Args&&... args)
: ProductBase(std::move(stream), std::move(event)), data_(std::forward<Args>(args)...) {}

T data_; //!
};
Expand Down
31 changes: 15 additions & 16 deletions src/alpaka/AlpakaCore/ProductBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
#include <atomic>
#include <memory>

#include "AlpakaCore/SharedEventPtr.h"
#include "AlpakaCore/SharedStreamPtr.h"
#include <alpaka/alpaka.hpp>

#include "AlpakaCore/alpakaConfigAcc.h"

namespace cms::alpakatools::ALPAKA_ACCELERATOR_NAMESPACE {

Expand All @@ -19,6 +20,9 @@ namespace cms::alpakatools::ALPAKA_ACCELERATOR_NAMESPACE {
*/
class ProductBase {
public:
using Queue = ::ALPAKA_ACCELERATOR_NAMESPACE::Queue;
using Event = alpaka::Event<Queue>;

ProductBase() = default; // Needed only for ROOT dictionary generation
~ProductBase();

Expand All @@ -27,43 +31,41 @@ namespace cms::alpakatools::ALPAKA_ACCELERATOR_NAMESPACE {
ProductBase(ProductBase&& other)
: stream_{std::move(other.stream_)},
event_{std::move(other.event_)},
mayReuseStream_{other.mayReuseStream_.load()},
device_{other.device_} {}
mayReuseStream_{other.mayReuseStream_.load()} {}
ProductBase& operator=(ProductBase&& other) {
stream_ = std::move(other.stream_);
event_ = std::move(other.event_);
mayReuseStream_ = other.mayReuseStream_.load();
device_ = other.device_;
return *this;
}

bool isValid() const { return stream_.get() != nullptr; }
bool isAvailable() const;

int device() const { return device_; }
alpaka::Dev<Queue> device() const { return alpaka::getDev(stream()); }

// cudaStream_t is a pointer to a thread-safe object, for which a
// mutable access is needed even if the ::cms::alpakatools::ScopedContext itself
// would be const. Therefore it is ok to return a non-const
// pointer from a const method here.
::ALPAKA_ACCELERATOR_NAMESPACE::Queue& stream() const { return *(stream_.get()); }
Queue& stream() const { return *(stream_.get()); }

// cudaEvent_t is a pointer to a thread-safe object, for which a
// mutable access is needed even if the ::cms::alpakatools::ScopedContext itself
// would be const. Therefore it is ok to return a non-const
// pointer from a const method here.
alpaka::Event<::ALPAKA_ACCELERATOR_NAMESPACE::Queue>& event() const { return *(event_.get()); }
Event& event() const { return *(event_.get()); }

protected:
explicit ProductBase(int device, SharedStreamPtr stream, SharedEventPtr event)
: stream_{std::move(stream)}, event_{std::move(event)}, device_{device} {}
explicit ProductBase(std::shared_ptr<Queue> stream, std::shared_ptr<Event> event)
: stream_{std::move(stream)}, event_{std::move(event)} {}

private:
friend class impl::ScopedContextBase;
friend class ScopedContextProduce;

// The following function is intended to be used only from ScopedContext
const SharedStreamPtr& streamPtr() const { return stream_; }
const std::shared_ptr<Queue>& streamPtr() const { return stream_; }

bool mayReuseStream() const {
bool expected = true;
Expand All @@ -75,17 +77,14 @@ namespace cms::alpakatools::ALPAKA_ACCELERATOR_NAMESPACE {

// The cudaStream_t is really shared among edm::Event products, so
// using shared_ptr also here
SharedStreamPtr stream_; //!
std::shared_ptr<Queue> stream_; //!
// shared_ptr because of caching in ::cms::alpakatools::EventCache
SharedEventPtr event_; //!
std::shared_ptr<Event> event_; //!

// This flag tells whether the CUDA stream may be reused by a
// consumer or not. The goal is to have a "chain" of modules to
// queue their work to the same stream.
mutable std::atomic<bool> mayReuseStream_ = true; //!

// The CUDA device associated with this product
int device_ = -1; //!
};

} // namespace cms::alpakatools::ALPAKA_ACCELERATOR_NAMESPACE
Expand Down
Loading