Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 143 additions & 6 deletions src/cuda/Framework/EDProducer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#ifndef EDProducerBase_h
#define EDProducerBase_h

#include <memory>

#include "Framework/EventRange.h"
#include "Framework/WaitingTaskWithArenaHolder.h"

namespace edm {
Expand All @@ -14,9 +17,13 @@ namespace edm {

bool hasAcquire() const { return false; }

void doAcquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {}
void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {}

void doProduce(Event& event, EventSetup const& eventSetup) { produce(event, eventSetup); }
void doProduce(EventRange events, EventSetup const& eventSetup) {
for (Event& event : events) {
produce(event, eventSetup);
}
}

virtual void produce(Event& event, EventSetup const& eventSetup) = 0;

Expand All @@ -27,27 +34,157 @@ namespace edm {
private:
};

class EDBatchingProducer {
public:
EDBatchingProducer() = default;
virtual ~EDBatchingProducer() = default;

bool hasAcquire() const { return false; }

void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {}

void doProduce(EventRange events, EventSetup const& eventSetup) { produce(events, eventSetup); }

virtual void produce(EventRange events, EventSetup const& eventSetup) = 0;

void doEndJob() { endJob(); }

virtual void endJob() {}

private:
};

template <typename T = void>
class EDProducerExternalWork {
public:
using AsyncState = T;

EDProducerExternalWork() = default;
virtual ~EDProducerExternalWork() = default;

bool hasAcquire() const { return true; }

void doAcquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {
acquire(event, eventSetup, std::move(holder));
void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {
if (events.size() > statesSize_) {
statesSize_ = events.size();
states_ = std::make_unique<AsyncState[]>(statesSize_);
}
for (size_t i = 0; i < events.size(); ++i) {
acquire(events[i], eventSetup, holder, states_[i]);
}
}

void doProduce(Event& event, EventSetup const& eventSetup) { produce(event, eventSetup); }
virtual void acquire(Event const& event,
EventSetup const& eventSetup,
WaitingTaskWithArenaHolder holder,
AsyncState& state) const = 0;

void doProduce(EventRange events, EventSetup const& eventSetup) {
for (size_t i = 0; i < events.size(); ++i) {
produce(events[i], eventSetup, states_[i]);
}
}

virtual void produce(Event& event, EventSetup const& eventSetup, AsyncState& state) = 0;

void doEndJob() { endJob(); }

virtual void endJob() {}

private:
size_t statesSize_ = 0;
std::unique_ptr<AsyncState[]> states_;
};

template <>
class EDProducerExternalWork<void> {
public:
EDProducerExternalWork() = default;
virtual ~EDProducerExternalWork() = default;

bool hasAcquire() const { return true; }

void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {
for (Event const& event : events) {
acquire(event, eventSetup, holder);
}
}

virtual void acquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) const = 0;

void doProduce(EventRange events, EventSetup const& eventSetup) {
for (Event& event : events) {
produce(event, eventSetup);
}
}

virtual void acquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) = 0;
virtual void produce(Event& event, EventSetup const& eventSetup) = 0;

void doEndJob() { endJob(); }

virtual void endJob() {}

private:
};

template <typename T = void>
class EDBatchingProducerExternalWork {
public:
using AsyncState = T;

EDBatchingProducerExternalWork() = default;
virtual ~EDBatchingProducerExternalWork() = default;

bool hasAcquire() const { return true; }

void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {
acquire(events, eventSetup, holder, state_);
}

virtual void acquire(ConstEventRange events,
EventSetup const& eventSetup,
WaitingTaskWithArenaHolder holder,
AsyncState& state) const = 0;

void doProduce(EventRange events, EventSetup const& eventSetup) { produce(events, eventSetup, state_); }

virtual void produce(EventRange events, EventSetup const& eventSetup, AsyncState& states) = 0;

void doEndJob() { endJob(); }

virtual void endJob() {}

private:
AsyncState state_;
};

template <>
class EDBatchingProducerExternalWork<void> {
public:
EDBatchingProducerExternalWork() = default;
virtual ~EDBatchingProducerExternalWork() = default;

bool hasAcquire() const { return true; }

void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {
acquire(events, eventSetup, holder);
}

virtual void acquire(ConstEventRange events,
EventSetup const& eventSetup,
WaitingTaskWithArenaHolder holder) const = 0;

void doProduce(EventRange events, EventSetup const& eventSetup) { produce(events, eventSetup); }

virtual void produce(EventRange events, EventSetup const& eventSetup) = 0;

void doEndJob() { endJob(); }

virtual void endJob() {}

private:
};

} // namespace edm

#endif
36 changes: 36 additions & 0 deletions src/cuda/Framework/EventBatch.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#ifndef EventBatch_h
#define EventBatch_h

#include <vector>

#include "Framework/Event.h"
#include "Framework/EventRange.h"

namespace edm {

class EventBatch {
public:
Event& emplace(int streamId, int eventId, ProductRegistry const& reg) {
events_.emplace_back(streamId, eventId, reg);
return events_.back();
}

void reserve(size_t capacity) { events_.reserve(capacity); }

void clear() { events_.clear(); }

size_t size() const { return events_.size(); }

bool empty() const { return events_.empty(); }

EventRange range() { return EventRange(&events_.front(), &events_.back() + 1); }

ConstEventRange range() const { return ConstEventRange(&events_.front(), &events_.back() + 1); }

private:
std::vector<edm::Event> events_;
};

} // namespace edm

#endif // EventBatch_h
96 changes: 96 additions & 0 deletions src/cuda/Framework/EventRange.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#ifndef EventRange_h
#define EventRange_h

#include <cassert>
#include <sstream>
#include <stdexcept>

#include "Framework/Event.h"

namespace edm {

class EventRange {
public:
EventRange(Event* begin, Event* end) : begin_(begin), end_(end) {
assert(begin_);
assert(end_);
assert(end_ >= begin_);
}

Event* begin() { return begin_; }

Event const* begin() const { return begin_; }

Event* end() { return end_; }

Event const* end() const { return end_; }

size_t size() const { return end_ - begin_; }

bool empty() const { return end_ == begin_; }

Event& at(size_t index) {
if (index >= size()) {
std::stringstream msg;
msg << "EventRange::at() range check failed: index " << index << " is outside the range [0.." << size() << ")";
throw std::out_of_range(msg.str());
}
return begin_[index];
}

Event const& at(size_t index) const {
if (index >= size()) {
std::stringstream msg;
msg << "EventRange::at() range check failed: index " << index << " is outside the range [0.." << size() << ")";
throw std::out_of_range(msg.str());
}
return begin_[index];
}

Event& operator[](size_t index) { return begin_[index]; }

Event const& operator[](size_t index) const { return begin_[index]; }

private:
Event* begin_;
Event* end_;
};

class ConstEventRange {
public:
ConstEventRange(Event const* begin, Event const* end) : begin_(begin), end_(end) {
assert(begin_);
assert(end_);
assert(end_ >= begin_);
}

ConstEventRange(EventRange range) : begin_(range.begin()), end_(range.end()) {}

Event const* begin() const { return begin_; }

Event const* end() const { return end_; }

size_t size() const { return end_ - begin_; }

bool empty() const { return end_ == begin_; }

Event const& at(size_t index) {
if (index >= size()) {
std::stringstream msg;
msg << "ConstEventRange::at() range check failed: index " << index << " is outside the range [0.." << size()
<< ")";
throw std::out_of_range(msg.str());
}
return begin_[index];
}

Event const& operator[](size_t index) const { return begin_[index]; }

private:
Event const* begin_;
Event const* end_;
};

} // namespace edm

#endif // EventRange_h
16 changes: 10 additions & 6 deletions src/cuda/Framework/WaitingTaskHolder.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace edm {
}
~WaitingTaskHolder() {
if (m_task) {
doneWaiting(std::exception_ptr{});
doneWaiting();
}
}

Expand Down Expand Up @@ -89,11 +89,8 @@ namespace edm {
}
}

void doneWaiting(std::exception_ptr iExcept) {
if (iExcept) {
m_task->dependentTaskFailed(iExcept);
}
//task_group::run can run the task before we finish
void doneWaiting() {
//spawn can run the task before we finish
// doneWaiting and some other thread might
// try to reuse this object. Resetting
// before spawn avoids problems
Expand All @@ -107,6 +104,13 @@ namespace edm {
}
}

void doneWaiting(std::exception_ptr iExcept) {
if (iExcept) {
m_task->dependentTaskFailed(iExcept);
}
doneWaiting();
}

private:
WaitingTask* release_no_decrement() noexcept {
auto t = m_task;
Expand Down
18 changes: 13 additions & 5 deletions src/cuda/Framework/WaitingTaskWithArenaHolder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace edm {

WaitingTaskWithArenaHolder::~WaitingTaskWithArenaHolder() {
if (m_task) {
doneWaiting(std::exception_ptr{});
doneWaiting();
}
}

Expand Down Expand Up @@ -79,10 +79,7 @@ namespace edm {
// into the correct arena of threads. Use of the arena allows doneWaiting
// to be called from a thread outside the arena of threads that will manage
// the task. doneWaiting can be called from a non-TBB thread.
void WaitingTaskWithArenaHolder::doneWaiting(std::exception_ptr iExcept) {
if (iExcept) {
m_task->dependentTaskFailed(iExcept);
}
void WaitingTaskWithArenaHolder::doneWaiting() {
//enqueue can run the task before we finish
// doneWaiting and some other thread might
// try to reuse this object. Resetting
Expand All @@ -96,6 +93,17 @@ namespace edm {
}
}

// This spawns the task. The arena is needed to get the task spawned
// into the correct arena of threads. Use of the arena allows doneWaiting
// to be called from a thread outside the arena of threads that will manage
// the task. doneWaiting can be called from a non-TBB thread.
void WaitingTaskWithArenaHolder::doneWaiting(std::exception_ptr iExcept) {
if (iExcept) {
m_task->dependentTaskFailed(iExcept);
}
doneWaiting();
}

// This next function is useful if you know from the context that
// m_arena (which is set when the constructor was executes) is the
// same arena in which you want to execute the doneWaiting function.
Expand Down
Loading