From 0a3161d5f342ccdcc781b6fab939ee6839fc1fc6 Mon Sep 17 00:00:00 2001 From: Andrea Bocci Date: Thu, 3 Dec 2020 10:11:27 +0100 Subject: [PATCH 1/3] Add a doneWaiting overload without the exception argument --- src/fwtest/Framework/WaitingTaskHolder.h | 17 +++++++++++------ .../Framework/WaitingTaskWithArenaHolder.cc | 18 +++++++++++++----- .../Framework/WaitingTaskWithArenaHolder.h | 6 ++++++ src/fwtest/bin/StreamSchedule.cc | 2 +- src/fwtest/plugin-Test2/TestProducer2.cc | 2 +- 5 files changed, 32 insertions(+), 13 deletions(-) diff --git a/src/fwtest/Framework/WaitingTaskHolder.h b/src/fwtest/Framework/WaitingTaskHolder.h index 89ed970c0..00bdd3a0d 100644 --- a/src/fwtest/Framework/WaitingTaskHolder.h +++ b/src/fwtest/Framework/WaitingTaskHolder.h @@ -40,7 +40,7 @@ namespace edm { } ~WaitingTaskHolder() { if (m_task) { - doneWaiting(std::exception_ptr{}); + doneWaiting(); } } @@ -89,11 +89,9 @@ namespace edm { } } - void doneWaiting(std::exception_ptr iExcept) { - if (iExcept) { - m_task->dependentTaskFailed(iExcept); - } - //task_group::run can run the task before we finish + void doneWaiting() { + // spawn can run the task before we finish + // task_group::run can run the task before we finish // doneWaiting and some other thread might // try to reuse this object. Resetting // before spawn avoids problems @@ -107,6 +105,13 @@ namespace edm { } } + void doneWaiting(std::exception_ptr iExcept) { + if (iExcept) { + m_task->dependentTaskFailed(iExcept); + } + doneWaiting(); + } + private: WaitingTask* release_no_decrement() noexcept { auto t = m_task; diff --git a/src/fwtest/Framework/WaitingTaskWithArenaHolder.cc b/src/fwtest/Framework/WaitingTaskWithArenaHolder.cc index abcb920ec..a6b76be0d 100644 --- a/src/fwtest/Framework/WaitingTaskWithArenaHolder.cc +++ b/src/fwtest/Framework/WaitingTaskWithArenaHolder.cc @@ -38,7 +38,7 @@ namespace edm { WaitingTaskWithArenaHolder::~WaitingTaskWithArenaHolder() { if (m_task) { - doneWaiting(std::exception_ptr{}); + doneWaiting(); } } @@ -79,10 +79,7 @@ namespace edm { // into the correct arena of threads. Use of the arena allows doneWaiting // to be called from a thread outside the arena of threads that will manage // the task. doneWaiting can be called from a non-TBB thread. - void WaitingTaskWithArenaHolder::doneWaiting(std::exception_ptr iExcept) { - if (iExcept) { - m_task->dependentTaskFailed(iExcept); - } + void WaitingTaskWithArenaHolder::doneWaiting() { //enqueue can run the task before we finish // doneWaiting and some other thread might // try to reuse this object. Resetting @@ -96,6 +93,17 @@ namespace edm { } } + // This spawns the task. The arena is needed to get the task spawned + // into the correct arena of threads. Use of the arena allows doneWaiting + // to be called from a thread outside the arena of threads that will manage + // the task. doneWaiting can be called from a non-TBB thread. + void WaitingTaskWithArenaHolder::doneWaiting(std::exception_ptr iExcept) { + if (iExcept) { + m_task->dependentTaskFailed(iExcept); + } + doneWaiting(); + } + // This next function is useful if you know from the context that // m_arena (which is set when the constructor was executes) is the // same arena in which you want to execute the doneWaiting function. diff --git a/src/fwtest/Framework/WaitingTaskWithArenaHolder.h b/src/fwtest/Framework/WaitingTaskWithArenaHolder.h index d0c27743c..f6763544d 100644 --- a/src/fwtest/Framework/WaitingTaskWithArenaHolder.h +++ b/src/fwtest/Framework/WaitingTaskWithArenaHolder.h @@ -53,6 +53,12 @@ namespace edm { WaitingTaskWithArenaHolder& operator=(WaitingTaskWithArenaHolder&& iRHS); + // This spawns the task. The arena is needed to get the task spawned + // into the correct arena of threads. Use of the arena allows doneWaiting + // to be called from a thread outside the arena of threads that will manage + // the task. doneWaiting can be called from a non-TBB thread. + void doneWaiting(); + // This spawns the task. The arena is needed to get the task spawned // into the correct arena of threads. Use of the arena allows doneWaiting // to be called from a thread outside the arena of threads that will manage diff --git a/src/fwtest/bin/StreamSchedule.cc b/src/fwtest/bin/StreamSchedule.cc index 4a18c39a3..c0676e341 100644 --- a/src/fwtest/bin/StreamSchedule.cc +++ b/src/fwtest/bin/StreamSchedule.cc @@ -88,7 +88,7 @@ namespace edm { (*iWorker)->doWorkAsync(*eventPtr, *eventSetup_, nextEventTaskHolder); } } else { - h.doneWaiting(std::exception_ptr{}); + h.doneWaiting(); } } diff --git a/src/fwtest/plugin-Test2/TestProducer2.cc b/src/fwtest/plugin-Test2/TestProducer2.cc index 6df7bdb0d..6f88839ae 100644 --- a/src/fwtest/plugin-Test2/TestProducer2.cc +++ b/src/fwtest/plugin-Test2/TestProducer2.cc @@ -39,7 +39,7 @@ void TestProducer2::acquire(edm::Event const& event, future_ = std::async([holder = std::move(holder)]() mutable { using namespace std::chrono_literals; std::this_thread::sleep_for(1s); - holder.doneWaiting(std::exception_ptr()); + holder.doneWaiting(); return 42; }); From be7dad156c0edcb2981a680744c9b61877a282ee Mon Sep 17 00:00:00 2001 From: Andrea Bocci Date: Tue, 1 Dec 2020 15:43:39 +0100 Subject: [PATCH 2/3] Add event batching support for fwtest Add an AsyncState to communicate between acquire() and produce() Add a template type parameter to the edm::EDProducerExternalWork class. The acquire() method is now const, and it communicates its asynchronous state to the the produce() method via an object of this type, rather than using data members. Implementations that do not need to pass any state between acquire() and produce() can use "void" or leave the template parameter empty. Implement simple support for event batching Add EDBatchingProducer and EDBatchingProducerExternalWork base classes Implement tests for all kind of EDProducer Reorganise the existing EDProducer tests, and add tests for - EDBatchingProducer - EDBatchingProducerExternalWork Wrap ownership of and access to event batches Introduce three new edm classes: - edm::EventBatch implements the ownership of a batch of events, implemented as an std::vector; used by the Source to provide events to the StreamSchedule. - edm::EventRange implements non-owning, non-const access to a batch of events; wraps a pair of Event* to the begin and end of an event batch; passed (by value) to doProduce() and produce(). - edm::ConstEventRange implements non-owning const access to a batch of events; wraps a pair of Event* to the begin and end of an event batch; passed (by value) to doAcquire() and acquire(). Avoid potential overrun while reading multiple events per batch with multiple threads --- src/fwtest/Framework/EDProducer.h | 149 +++++++++++++++++- src/fwtest/Framework/EventBatch.h | 36 +++++ src/fwtest/Framework/EventRange.h | 96 +++++++++++ src/fwtest/Framework/Worker.cc | 5 +- src/fwtest/Framework/Worker.h | 45 +++--- src/fwtest/Makefile.deps | 4 +- src/fwtest/bin/EventProcessor.cc | 5 +- src/fwtest/bin/EventProcessor.h | 3 +- src/fwtest/bin/PluginManager.cc | 2 +- src/fwtest/bin/Source.cc | 60 ++++--- src/fwtest/bin/Source.h | 7 +- src/fwtest/bin/StreamSchedule.cc | 29 ++-- src/fwtest/bin/StreamSchedule.h | 2 +- src/fwtest/bin/main.cc | 13 +- .../IntESProducer.cc | 0 .../TestProducer.cc | 0 .../plugin-Test/TestProducerExternalWork.cc | 68 ++++++++ src/fwtest/plugin-Test2/TestProducer2.cc | 64 -------- src/fwtest/plugin-Test2/TestProducer3.cc | 29 ---- .../TestBatchingProducer.cc | 32 ++++ .../TestBatchingProducerExternalWork.cc | 79 ++++++++++ 21 files changed, 560 insertions(+), 168 deletions(-) create mode 100644 src/fwtest/Framework/EventBatch.h create mode 100644 src/fwtest/Framework/EventRange.h rename src/fwtest/{plugin-Test1 => plugin-Test}/IntESProducer.cc (100%) rename src/fwtest/{plugin-Test1 => plugin-Test}/TestProducer.cc (100%) create mode 100644 src/fwtest/plugin-Test/TestProducerExternalWork.cc delete mode 100644 src/fwtest/plugin-Test2/TestProducer2.cc delete mode 100644 src/fwtest/plugin-Test2/TestProducer3.cc create mode 100644 src/fwtest/plugin-TestBatching/TestBatchingProducer.cc create mode 100644 src/fwtest/plugin-TestBatching/TestBatchingProducerExternalWork.cc diff --git a/src/fwtest/Framework/EDProducer.h b/src/fwtest/Framework/EDProducer.h index 8160b8c96..1cc63f58d 100644 --- a/src/fwtest/Framework/EDProducer.h +++ b/src/fwtest/Framework/EDProducer.h @@ -1,6 +1,9 @@ #ifndef EDProducerBase_h #define EDProducerBase_h +#include + +#include "Framework/EventRange.h" #include "Framework/WaitingTaskWithArenaHolder.h" namespace edm { @@ -14,9 +17,13 @@ namespace edm { bool hasAcquire() const { return false; } - void doAcquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {} + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {} - void doProduce(Event& event, EventSetup const& eventSetup) { produce(event, eventSetup); } + void doProduce(EventRange events, EventSetup const& eventSetup) { + for (Event& event : events) { + produce(event, eventSetup); + } + } virtual void produce(Event& event, EventSetup const& eventSetup) = 0; @@ -27,27 +34,157 @@ namespace edm { private: }; + class EDBatchingProducer { + public: + EDBatchingProducer() = default; + virtual ~EDBatchingProducer() = default; + + bool hasAcquire() const { return false; } + + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {} + + void doProduce(EventRange events, EventSetup const& eventSetup) { produce(events, eventSetup); } + + virtual void produce(EventRange events, EventSetup const& eventSetup) = 0; + + void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + }; + + template class EDProducerExternalWork { public: + using AsyncState = T; + EDProducerExternalWork() = default; virtual ~EDProducerExternalWork() = default; bool hasAcquire() const { return true; } - void doAcquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { - acquire(event, eventSetup, std::move(holder)); + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { + if (events.size() > statesSize_) { + statesSize_ = events.size(); + states_ = std::make_unique(statesSize_); + } + for (size_t i = 0; i < events.size(); ++i) { + acquire(events[i], eventSetup, holder, states_[i]); + } } - void doProduce(Event& event, EventSetup const& eventSetup) { produce(event, eventSetup); } + virtual void acquire(Event const& event, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder, + AsyncState& state) const = 0; + + void doProduce(EventRange events, EventSetup const& eventSetup) { + for (size_t i = 0; i < events.size(); ++i) { + produce(events[i], eventSetup, states_[i]); + } + } + + virtual void produce(Event& event, EventSetup const& eventSetup, AsyncState& state) = 0; + + void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + size_t statesSize_ = 0; + std::unique_ptr states_; + }; + + template <> + class EDProducerExternalWork { + public: + EDProducerExternalWork() = default; + virtual ~EDProducerExternalWork() = default; + + bool hasAcquire() const { return true; } + + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { + for (Event const& event : events) { + acquire(event, eventSetup, holder); + } + } + + virtual void acquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) const = 0; + + void doProduce(EventRange events, EventSetup const& eventSetup) { + for (Event& event : events) { + produce(event, eventSetup); + } + } - virtual void acquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) = 0; virtual void produce(Event& event, EventSetup const& eventSetup) = 0; void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + }; + + template + class EDBatchingProducerExternalWork { + public: + using AsyncState = T; + + EDBatchingProducerExternalWork() = default; + virtual ~EDBatchingProducerExternalWork() = default; + + bool hasAcquire() const { return true; } + + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { + acquire(events, eventSetup, holder, state_); + } + + virtual void acquire(ConstEventRange events, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder, + AsyncState& state) const = 0; + + void doProduce(EventRange events, EventSetup const& eventSetup) { produce(events, eventSetup, state_); } + + virtual void produce(EventRange events, EventSetup const& eventSetup, AsyncState& states) = 0; + + void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + AsyncState state_; + }; + + template <> + class EDBatchingProducerExternalWork { + public: + EDBatchingProducerExternalWork() = default; + virtual ~EDBatchingProducerExternalWork() = default; + + bool hasAcquire() const { return true; } + + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { + acquire(events, eventSetup, holder); + } + + virtual void acquire(ConstEventRange events, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder) const = 0; + + void doProduce(EventRange events, EventSetup const& eventSetup) { produce(events, eventSetup); } + + virtual void produce(EventRange events, EventSetup const& eventSetup) = 0; + + void doEndJob() { endJob(); } + virtual void endJob() {} private: }; + } // namespace edm #endif diff --git a/src/fwtest/Framework/EventBatch.h b/src/fwtest/Framework/EventBatch.h new file mode 100644 index 000000000..3632c41ea --- /dev/null +++ b/src/fwtest/Framework/EventBatch.h @@ -0,0 +1,36 @@ +#ifndef EventBatch_h +#define EventBatch_h + +#include + +#include "Framework/Event.h" +#include "Framework/EventRange.h" + +namespace edm { + + class EventBatch { + public: + Event& emplace(int streamId, int eventId, ProductRegistry const& reg) { + events_.emplace_back(streamId, eventId, reg); + return events_.back(); + } + + void reserve(size_t capacity) { events_.reserve(capacity); } + + void clear() { events_.clear(); } + + size_t size() const { return events_.size(); } + + bool empty() const { return events_.empty(); } + + EventRange range() { return EventRange(&events_.front(), &events_.back() + 1); } + + ConstEventRange range() const { return ConstEventRange(&events_.front(), &events_.back() + 1); } + + private: + std::vector events_; + }; + +} // namespace edm + +#endif // EventBatch_h diff --git a/src/fwtest/Framework/EventRange.h b/src/fwtest/Framework/EventRange.h new file mode 100644 index 000000000..9a9cca645 --- /dev/null +++ b/src/fwtest/Framework/EventRange.h @@ -0,0 +1,96 @@ +#ifndef EventRange_h +#define EventRange_h + +#include +#include +#include + +#include "Framework/Event.h" + +namespace edm { + + class EventRange { + public: + EventRange(Event* begin, Event* end) : begin_(begin), end_(end) { + assert(begin_); + assert(end_); + assert(end_ >= begin_); + } + + Event* begin() { return begin_; } + + Event const* begin() const { return begin_; } + + Event* end() { return end_; } + + Event const* end() const { return end_; } + + size_t size() const { return end_ - begin_; } + + bool empty() const { return end_ == begin_; } + + Event& at(size_t index) { + if (index >= size()) { + std::stringstream msg; + msg << "EventRange::at() range check failed: index " << index << " is outside the range [0.." << size() << ")"; + throw std::out_of_range(msg.str()); + } + return begin_[index]; + } + + Event const& at(size_t index) const { + if (index >= size()) { + std::stringstream msg; + msg << "EventRange::at() range check failed: index " << index << " is outside the range [0.." << size() << ")"; + throw std::out_of_range(msg.str()); + } + return begin_[index]; + } + + Event& operator[](size_t index) { return begin_[index]; } + + Event const& operator[](size_t index) const { return begin_[index]; } + + private: + Event* begin_; + Event* end_; + }; + + class ConstEventRange { + public: + ConstEventRange(Event const* begin, Event const* end) : begin_(begin), end_(end) { + assert(begin_); + assert(end_); + assert(end_ >= begin_); + } + + ConstEventRange(EventRange range) : begin_(range.begin()), end_(range.end()) {} + + Event const* begin() const { return begin_; } + + Event const* end() const { return end_; } + + size_t size() const { return end_ - begin_; } + + bool empty() const { return end_ == begin_; } + + Event const& at(size_t index) { + if (index >= size()) { + std::stringstream msg; + msg << "ConstEventRange::at() range check failed: index " << index << " is outside the range [0.." << size() + << ")"; + throw std::out_of_range(msg.str()); + } + return begin_[index]; + } + + Event const& operator[](size_t index) const { return begin_[index]; } + + private: + Event const* begin_; + Event const* end_; + }; + +} // namespace edm + +#endif // EventRange_h diff --git a/src/fwtest/Framework/Worker.cc b/src/fwtest/Framework/Worker.cc index 6c1a5a57d..1b7d6ffb1 100644 --- a/src/fwtest/Framework/Worker.cc +++ b/src/fwtest/Framework/Worker.cc @@ -1,14 +1,15 @@ +//#include #include "Framework/Worker.h" namespace edm { - void Worker::prefetchAsync(Event& event, EventSetup const& eventSetup, WaitingTaskHolder iTask) { + void Worker::prefetchAsync(EventRange events, EventSetup const& eventSetup, WaitingTaskHolder iTask) { //std::cout << "prefetchAsync for " << this << " iTask " << iTask << std::endl; bool expected = false; if (prefetchRequested_.compare_exchange_strong(expected, true)) { //std::cout << "first prefetch call" << std::endl; for (Worker* dep : itemsToGet_) { //std::cout << "calling doWorkAsync for " << dep << " with " << iTask << std::endl; - dep->doWorkAsync(event, eventSetup, iTask); + dep->doWorkAsync(events, eventSetup, iTask); } } } diff --git a/src/fwtest/Framework/Worker.h b/src/fwtest/Framework/Worker.h index cd5631c69..897e4772a 100644 --- a/src/fwtest/Framework/Worker.h +++ b/src/fwtest/Framework/Worker.h @@ -2,9 +2,10 @@ #define Worker_h #include -#include //#include +#include +#include "Framework/EventRange.h" #include "Framework/WaitingTask.h" #include "Framework/WaitingTaskHolder.h" #include "Framework/WaitingTaskList.h" @@ -23,10 +24,10 @@ namespace edm { void setItemsToGet(std::vector workers) { itemsToGet_ = std::move(workers); } // thread safe - void prefetchAsync(Event& event, EventSetup const& eventSetup, WaitingTaskHolder iTask); + void prefetchAsync(EventRange events, EventSetup const& eventSetup, WaitingTaskHolder iTask); // not thread safe - virtual void doWorkAsync(Event& event, EventSetup const& eventSetup, WaitingTaskHolder iTask) = 0; + virtual void doWorkAsync(EventRange events, EventSetup const& eventSetup, WaitingTaskHolder iTask) = 0; // not thread safe virtual void doEndJob() = 0; @@ -50,22 +51,22 @@ namespace edm { public: explicit WorkerT(ProductRegistry& reg) : producer_(reg) {} - void doWorkAsync(Event& event, EventSetup const& eventSetup, WaitingTaskHolder task) override { + void doWorkAsync(EventRange events, EventSetup const& eventSetup, WaitingTaskHolder task) override { waitingTasksWork_.add(task); //std::cout << "doWorkAsync for " << this << " with iTask " << iTask << std::endl; bool expected = false; if (workStarted_.compare_exchange_strong(expected, true)) { //std::cout << "first doWorkAsync call" << std::endl; - WaitingTask* moduleTask = - make_waiting_task([this, &event, &eventSetup](std::exception_ptr const* iPtr) mutable { + WaitingTask* moduleTask = + make_waiting_task([this, events, &eventSetup](std::exception_ptr const* iPtr) mutable { if (iPtr) { waitingTasksWork_.doneWaiting(*iPtr); } else { std::exception_ptr exceptionPtr; try { //std::cout << "calling doProduce " << this << std::endl; - producer_.doProduce(event, eventSetup); + producer_.doProduce(events, eventSetup); } catch (...) { exceptionPtr = std::current_exception(); } @@ -76,23 +77,23 @@ namespace edm { auto* group = task.group(); if (producer_.hasAcquire()) { WaitingTaskWithArenaHolder runProduceHolder{*group, moduleTask}; - moduleTask = make_waiting_task([this, &event, &eventSetup, runProduceHolder = std::move(runProduceHolder)]( - std::exception_ptr const* iPtr) mutable { - if (iPtr) { - runProduceHolder.doneWaiting(*iPtr); - } else { - std::exception_ptr exceptionPtr; - try { - producer_.doAcquire(event, eventSetup, runProduceHolder); - } catch (...) { - exceptionPtr = std::current_exception(); - } - runProduceHolder.doneWaiting(exceptionPtr); - } - }); + moduleTask = make_waiting_task([this, events = ConstEventRange(events), &eventSetup, runProduceHolder = std::move(runProduceHolder)]( + std::exception_ptr const* iPtr) mutable { + if (iPtr) { + runProduceHolder.doneWaiting(*iPtr); + } else { + std::exception_ptr exceptionPtr; + try { + producer_.doAcquire(events, eventSetup, runProduceHolder); + } catch (...) { + exceptionPtr = std::current_exception(); + } + runProduceHolder.doneWaiting(exceptionPtr); + } + }); } //std::cout << "calling prefetchAsync " << this << " with moduleTask " << moduleTask << std::endl; - prefetchAsync(event, eventSetup, WaitingTaskHolder(*group, moduleTask)); + prefetchAsync(events, eventSetup, WaitingTaskHolder(*group, moduleTask)); } } diff --git a/src/fwtest/Makefile.deps b/src/fwtest/Makefile.deps index b531eb578..3c07a802b 100644 --- a/src/fwtest/Makefile.deps +++ b/src/fwtest/Makefile.deps @@ -1,3 +1,3 @@ fwtest_EXTERNAL_DEPENDS := TBB -Test1_DEPENDS := Framework DataFormats -Test2_DEPENDS := Framework +Test_DEPENDS := Framework DataFormats +TestBatching_DEPENDS := Framework diff --git a/src/fwtest/bin/EventProcessor.cc b/src/fwtest/bin/EventProcessor.cc index 7e4a4e3a6..011f0b33c 100644 --- a/src/fwtest/bin/EventProcessor.cc +++ b/src/fwtest/bin/EventProcessor.cc @@ -5,14 +5,15 @@ #include "EventProcessor.h" namespace edm { - EventProcessor::EventProcessor(int maxEvents, + EventProcessor::EventProcessor(int batchEvents, + int maxEvents, int runForMinutes, int numberOfStreams, std::vector const& path, std::vector const& esproducers, std::filesystem::path const& datadir, bool validation) - : source_(maxEvents, runForMinutes, registry_, datadir, validation) { + : source_(batchEvents, maxEvents, runForMinutes, registry_, datadir, validation) { for (auto const& name : esproducers) { pluginManager_.load(name); auto esp = ESPluginFactory::create(name, datadir); diff --git a/src/fwtest/bin/EventProcessor.h b/src/fwtest/bin/EventProcessor.h index dfaa15e1b..1185c5073 100644 --- a/src/fwtest/bin/EventProcessor.h +++ b/src/fwtest/bin/EventProcessor.h @@ -14,7 +14,8 @@ namespace edm { class EventProcessor { public: - explicit EventProcessor(int maxEvents, + explicit EventProcessor(int batchEvents, + int maxEvents, int runForMinutes, int numberOfStreams, std::vector const& path, diff --git a/src/fwtest/bin/PluginManager.cc b/src/fwtest/bin/PluginManager.cc index 7a854f8ce..7de2de2f5 100644 --- a/src/fwtest/bin/PluginManager.cc +++ b/src/fwtest/bin/PluginManager.cc @@ -1,5 +1,5 @@ -#include #include +//#include #include "PluginManager.h" diff --git a/src/fwtest/bin/Source.cc b/src/fwtest/bin/Source.cc index b1e46b044..0fb5eeb7d 100644 --- a/src/fwtest/bin/Source.cc +++ b/src/fwtest/bin/Source.cc @@ -24,9 +24,11 @@ namespace { namespace edm { Source::Source( - int maxEvents, int runForMinutes, ProductRegistry ®, std::filesystem::path const &datadir, bool validation) - : maxEvents_(maxEvents), + int batchEvents, int maxEvents, int runForMinutes, ProductRegistry ®, std::filesystem::path const &datadir, bool validation) + : batchEvents_(batchEvents), + maxEvents_(maxEvents), runForMinutes_(runForMinutes), + numEvents_(0), rawToken_(reg.produces()), validation_(validation) { std::ifstream in_raw(datadir / "raw.bin", std::ios::binary); @@ -78,6 +80,10 @@ namespace edm { assert(raw_.size() == vertices_.size()); } + if (batchEvents_ < 1) { + batchEvents_ = 1; + } + if (runForMinutes_ < 0 and maxEvents_ < 0) { maxEvents_ = raw_.size(); } @@ -89,20 +95,29 @@ namespace edm { } } - std::unique_ptr Source::produce(int streamId, ProductRegistry const ®) { + EventBatch Source::produce(int streamId, ProductRegistry const ®) { if (shouldStop_) { - return nullptr; + return {}; } - const int old = numEvents_.fetch_add(1); - const int iev = old + 1; + // atomically increase the event counter, without overflowing over maxEvents_ + int old_value, new_value; + if (runForMinutes_ < 0) { - if (old >= maxEvents_) { + // atomically increase the event counter, without overflowing over maxEvents_ + old_value = numEvents_; + do { + new_value = std::min(old_value + batchEvents_, maxEvents_); + } + while (not numEvents_.compare_exchange_weak(old_value, new_value)); + if (old_value >= maxEvents_) { shouldStop_ = true; - --numEvents_; - return nullptr; + return {}; } } else { + // atomically increase the event counter, and periodically check if runForMinutes_ have passed + old_value = numEvents_.fetch_add(batchEvents_); + new_value = old_value + batchEvents_; if (numEvents_ - numEventsTimeLastCheck_ > static_cast(raw_.size())) { std::scoped_lock lock(timeMutex_); // if some other thread beat us, no need to do anything @@ -114,21 +129,28 @@ namespace edm { numEventsTimeLastCheck_ = (numEvents_ / raw_.size()) * raw_.size(); } if (shouldStop_) { - --numEvents_; - return nullptr; + numEvents_ -= batchEvents_; + return {}; } } } - auto ev = std::make_unique(streamId, iev, reg); - const int index = old % raw_.size(); - ev->emplace(rawToken_, raw_[index]); - if (validation_) { - ev->emplace(digiClusterToken_, digiclusters_[index]); - ev->emplace(trackToken_, tracks_[index]); - ev->emplace(vertexToken_, vertices_[index]); + // check how many events should be read + const int size = new_value - old_value; + EventBatch events; + events.reserve(size); + for (int iev = old_value + 1; iev <= new_value; ++iev) { + Event &event = events.emplace(streamId, iev, reg); + const int index = (iev - 1) % raw_.size(); + + event.emplace(rawToken_, raw_[index]); + if (validation_) { + event.emplace(digiClusterToken_, digiclusters_[index]); + event.emplace(trackToken_, tracks_[index]); + event.emplace(vertexToken_, vertices_[index]); + } } - return ev; + return events; } } // namespace edm diff --git a/src/fwtest/bin/Source.h b/src/fwtest/bin/Source.h index c29685c07..a7ea6713d 100644 --- a/src/fwtest/bin/Source.h +++ b/src/fwtest/bin/Source.h @@ -7,8 +7,10 @@ #include #include #include +#include #include "Framework/Event.h" +#include "Framework/EventBatch.h" #include "DataFormats/FEDRawDataCollection.h" #include "DataFormats/DigiClusterCount.h" #include "DataFormats/TrackCount.h" @@ -18,7 +20,7 @@ namespace edm { class Source { public: explicit Source( - int maxEvents, int runForMinutes, ProductRegistry& reg, std::filesystem::path const& datadir, bool validation); + int batchEvents, int maxEvents, int runForMinutes, ProductRegistry& reg, std::filesystem::path const& datadir, bool validation); void startProcessing(); @@ -26,9 +28,10 @@ namespace edm { int processedEvents() const { return numEvents_; } // thread safe - std::unique_ptr produce(int streamId, ProductRegistry const& reg); + EventBatch produce(int streamId, ProductRegistry const& reg); private: + int batchEvents_; int maxEvents_; // these are all for the mode where the processing length is limited by time diff --git a/src/fwtest/bin/StreamSchedule.cc b/src/fwtest/bin/StreamSchedule.cc index c0676e341..d510f94aa 100644 --- a/src/fwtest/bin/StreamSchedule.cc +++ b/src/fwtest/bin/StreamSchedule.cc @@ -1,4 +1,5 @@ -//#include +#include +#include #include @@ -43,7 +44,7 @@ namespace edm { StreamSchedule& StreamSchedule::operator=(StreamSchedule&&) = default; void StreamSchedule::runToCompletionAsync(WaitingTaskHolder h) { - auto task = make_functor_task([this, h]() mutable { processOneEventAsync(std::move(h)); }); + auto task = make_functor_task([this, h]() mutable { processEventBatchAsync(std::move(h)); }); if (streamId_ == 0) { h.group()->run([task]() { TaskSentry s{task}; @@ -58,24 +59,24 @@ namespace edm { } } - void StreamSchedule::processOneEventAsync(WaitingTaskHolder h) { - auto event = source_->produce(streamId_, registry_); - if (event) { - // Pass the event object ownership to the "end-of-event" task - // Pass a non-owning pointer to the event to preceding tasks - //std::cout << "Begin processing event " << event->eventID() << std::endl; - auto eventPtr = event.get(); + void StreamSchedule::processEventBatchAsync(WaitingTaskHolder h) { + auto events = source_->produce(streamId_, registry_); + if (not events.empty()) { + // Pass the event batch ownership to the "end-of-event" task + // Pass a non-owning event range to the preceding tasks + //std::cout << "Begin processing a batch of " << events.size() << " events starting from " << events.range().at(0).eventID() << std::endl; + auto eventsRange = events.range(); auto* group = h.group(); - auto nextEventTask = - make_waiting_task([this, h = std::move(h), ev = std::move(event)](std::exception_ptr const* iPtr) mutable { - ev.reset(); + auto nextEventTask = + make_waiting_task([this, h = std::move(h), events = std::move(events)](std::exception_ptr const* iPtr) mutable { + events.clear(); if (iPtr) { h.doneWaiting(*iPtr); } else { for (auto const& worker : path_) { worker->reset(); } - processOneEventAsync(std::move(h)); + processEventBatchAsync(std::move(h)); } }); // To guarantee that the nextEventTask is spawned also in @@ -85,7 +86,7 @@ namespace edm { for (auto iWorker = path_.rbegin(); iWorker != path_.rend(); ++iWorker) { //std::cout << "calling doWorkAsync for " << iWorker->get() << " with nextEventTask " << nextEventTask << std::endl; - (*iWorker)->doWorkAsync(*eventPtr, *eventSetup_, nextEventTaskHolder); + (*iWorker)->doWorkAsync(eventsRange, *eventSetup_, nextEventTaskHolder); } } else { h.doneWaiting(); diff --git a/src/fwtest/bin/StreamSchedule.h b/src/fwtest/bin/StreamSchedule.h index 1bd364c70..a62dc3961 100644 --- a/src/fwtest/bin/StreamSchedule.h +++ b/src/fwtest/bin/StreamSchedule.h @@ -38,7 +38,7 @@ namespace edm { void endJob(); private: - void processOneEventAsync(WaitingTaskHolder h); + void processEventBatchAsync(WaitingTaskHolder h); ProductRegistry registry_; Source* source_; diff --git a/src/fwtest/bin/main.cc b/src/fwtest/bin/main.cc index f8196a9ca..131f49f4e 100644 --- a/src/fwtest/bin/main.cc +++ b/src/fwtest/bin/main.cc @@ -21,7 +21,8 @@ namespace { "[--empty]\n\n" << "Options\n" << " --numberOfThreads Number of threads to use (default 1, use 0 to use all CPU cores)\n" - << " --numberOfStreams Number of concurrent events (default 0 = numberOfThreads)\n" + << " --numberOfStreams Number of concurrent batch of events (default 0=numberOfThreads)\n" + << " --batchEvents Number of events to process in a batch (default 1 for individual events)\n" << " --maxEvents Number of events to process (default -1 for all events in the input file)\n" << " --runForMinutes Continue processing the set of 1000 events until this many minutes have passed " "(default -1 for disabled; conflicts with --maxEvents)\n" @@ -38,6 +39,7 @@ int main(int argc, char** argv) { std::vector args(argv, argv + argc); int numberOfThreads = 1; int numberOfStreams = 0; + int batchEvents = 1; int maxEvents = -1; int runForMinutes = -1; std::filesystem::path datadir; @@ -54,6 +56,9 @@ int main(int argc, char** argv) { } else if (*i == "--numberOfStreams") { ++i; numberOfStreams = std::stoi(*i); + } else if (*i == "--batchEvents") { + ++i; + batchEvents = std::stoul(*i); } else if (*i == "--maxEvents") { ++i; maxEvents = std::stoi(*i); @@ -98,14 +103,16 @@ int main(int argc, char** argv) { std::vector edmodules; std::vector esmodules; if (not empty) { - edmodules = {"TestProducer", "TestProducer3", "TestProducer2"}; + edmodules = { + "TestProducer", "TestProducerExternalWork", "TestBatchingProducer", "TestBatchingProducerExternalWork"}; esmodules = {"IntESProducer"}; if (transfer) { // add modules for transfer } } edm::EventProcessor processor( - maxEvents, runForMinutes, numberOfStreams, std::move(edmodules), std::move(esmodules), datadir, validation); + batchEvents, maxEvents, runForMinutes, numberOfStreams, std::move(edmodules), std::move(esmodules), datadir, validation); + maxEvents = processor.maxEvents(); if (runForMinutes < 0) { std::cout << "Processing " << processor.maxEvents() << " events, of which " << numberOfStreams diff --git a/src/fwtest/plugin-Test1/IntESProducer.cc b/src/fwtest/plugin-Test/IntESProducer.cc similarity index 100% rename from src/fwtest/plugin-Test1/IntESProducer.cc rename to src/fwtest/plugin-Test/IntESProducer.cc diff --git a/src/fwtest/plugin-Test1/TestProducer.cc b/src/fwtest/plugin-Test/TestProducer.cc similarity index 100% rename from src/fwtest/plugin-Test1/TestProducer.cc rename to src/fwtest/plugin-Test/TestProducer.cc diff --git a/src/fwtest/plugin-Test/TestProducerExternalWork.cc b/src/fwtest/plugin-Test/TestProducerExternalWork.cc new file mode 100644 index 000000000..37ce351d3 --- /dev/null +++ b/src/fwtest/plugin-Test/TestProducerExternalWork.cc @@ -0,0 +1,68 @@ +#include +#include +#include +#include +#include + +#include "Framework/EDProducer.h" +#include "Framework/Event.h" +#include "Framework/PluginFactory.h" + +namespace { + std::atomic nevents = 0; +} + +using TestProducerExternalWorkAsyncState = std::future; + +class TestProducerExternalWork : public edm::EDProducerExternalWork { +public: + explicit TestProducerExternalWork(edm::ProductRegistry& reg); + +private: + void acquire(edm::Event const& event, + edm::EventSetup const& eventSetup, + edm::WaitingTaskWithArenaHolder holder, + AsyncState& state) const override; + void produce(edm::Event& event, edm::EventSetup const& eventSetup, AsyncState& state) override; + + void endJob() override; + + const edm::EDGetTokenT getToken_; +}; + +TestProducerExternalWork::TestProducerExternalWork(edm::ProductRegistry& reg) + : getToken_(reg.consumes()) {} + +void TestProducerExternalWork::acquire(edm::Event const& event, + edm::EventSetup const& eventSetup, + edm::WaitingTaskWithArenaHolder holder, + AsyncState& state) const { + auto const value = event.get(getToken_); + assert(value == static_cast(event.eventID() + 10 * event.streamID() + 100)); + + state = std::async([holder = std::move(holder)]() mutable { + using namespace std::chrono_literals; + std::this_thread::sleep_for(1s); + holder.doneWaiting(); + return 42; + }); + +#ifndef FWTEST_SILENT + std::cout << "TestProducerExternalWork::acquire Event " << event.eventID() << " stream " << event.streamID() + << " value " << value << std::endl; +#endif +} + +void TestProducerExternalWork::produce(edm::Event& event, edm::EventSetup const& eventSetup, AsyncState& state) { +#ifndef FWTEST_SILENT + std::cout << "TestProducerExternalWork::produce Event " << event.eventID() << " stream " << event.streamID() + << " from future " << state.get() << std::endl; +#endif + ++nevents; +} + +void TestProducerExternalWork::endJob() { + std::cout << "TestProducerExternalWork::endJob processed " << nevents.load() << " events" << std::endl; +} + +DEFINE_FWK_MODULE(TestProducerExternalWork); diff --git a/src/fwtest/plugin-Test2/TestProducer2.cc b/src/fwtest/plugin-Test2/TestProducer2.cc deleted file mode 100644 index 6f88839ae..000000000 --- a/src/fwtest/plugin-Test2/TestProducer2.cc +++ /dev/null @@ -1,64 +0,0 @@ -#include -#include -#include -#include -#include - -#include "Framework/EDProducer.h" -#include "Framework/Event.h" -#include "Framework/PluginFactory.h" - -namespace { - std::atomic nevents = 0; -} - -class TestProducer2 : public edm::EDProducerExternalWork { -public: - explicit TestProducer2(edm::ProductRegistry& reg); - -private: - void acquire(edm::Event const& event, - edm::EventSetup const& eventSetup, - edm::WaitingTaskWithArenaHolder holder) override; - void produce(edm::Event& event, edm::EventSetup const& eventSetup) override; - - void endJob() override; - - edm::EDGetTokenT getToken_; - std::future future_; -}; - -TestProducer2::TestProducer2(edm::ProductRegistry& reg) : getToken_(reg.consumes()) {} - -void TestProducer2::acquire(edm::Event const& event, - edm::EventSetup const& eventSetup, - edm::WaitingTaskWithArenaHolder holder) { - auto const value = event.get(getToken_); - assert(value == static_cast(event.eventID() + 10 * event.streamID() + 100)); - - future_ = std::async([holder = std::move(holder)]() mutable { - using namespace std::chrono_literals; - std::this_thread::sleep_for(1s); - holder.doneWaiting(); - return 42; - }); - -#ifndef FWTEST_SILENT - std::cout << "TestProducer2::acquire Event " << event.eventID() << " stream " << event.streamID() << " value " - << value << std::endl; -#endif -} - -void TestProducer2::produce(edm::Event& event, edm::EventSetup const& eventSetup) { -#ifndef FWTEST_SILENT - std::cout << "TestProducer2::produce Event " << event.eventID() << " stream " << event.streamID() << " from future " - << future_.get() << std::endl; -#endif - ++nevents; -} - -void TestProducer2::endJob() { - std::cout << "TestProducer2::endJob processed " << nevents.load() << " events" << std::endl; -} - -DEFINE_FWK_MODULE(TestProducer2); diff --git a/src/fwtest/plugin-Test2/TestProducer3.cc b/src/fwtest/plugin-Test2/TestProducer3.cc deleted file mode 100644 index ef8354058..000000000 --- a/src/fwtest/plugin-Test2/TestProducer3.cc +++ /dev/null @@ -1,29 +0,0 @@ -#include -#include -#include - -#include "Framework/EDProducer.h" -#include "Framework/Event.h" -#include "Framework/PluginFactory.h" - -class TestProducer3 : public edm::EDProducer { -public: - explicit TestProducer3(edm::ProductRegistry& reg); - -private: - void produce(edm::Event& event, edm::EventSetup const& eventSetup) override; - - edm::EDGetTokenT getToken_; -}; - -TestProducer3::TestProducer3(edm::ProductRegistry& reg) : getToken_(reg.consumes()) {} - -void TestProducer3::produce(edm::Event& event, edm::EventSetup const& eventSetup) { - auto const value = event.get(getToken_); -#ifndef FWTEST_SILENT - std::cout << "TestProducer3 Event " << event.eventID() << " stream " << event.streamID() << " value " << value - << std::endl; -#endif -} - -DEFINE_FWK_MODULE(TestProducer3); diff --git a/src/fwtest/plugin-TestBatching/TestBatchingProducer.cc b/src/fwtest/plugin-TestBatching/TestBatchingProducer.cc new file mode 100644 index 000000000..485929c0f --- /dev/null +++ b/src/fwtest/plugin-TestBatching/TestBatchingProducer.cc @@ -0,0 +1,32 @@ +#include +#include +#include + +#include "Framework/EDProducer.h" +#include "Framework/Event.h" +#include "Framework/EventRange.h" +#include "Framework/PluginFactory.h" + +class TestBatchingProducer : public edm::EDBatchingProducer { +public: + explicit TestBatchingProducer(edm::ProductRegistry& reg); + +private: + void produce(edm::EventRange events, edm::EventSetup const& eventSetup) override; + + edm::EDGetTokenT getToken_; +}; + +TestBatchingProducer::TestBatchingProducer(edm::ProductRegistry& reg) : getToken_(reg.consumes()) {} + +void TestBatchingProducer::produce(edm::EventRange events, edm::EventSetup const& eventSetup) { + for (edm::Event& event : events) { + auto const value = event.get(getToken_); +#ifndef FWTEST_SILENT + std::cout << "TestBatchingProducer Event " << event.eventID() << " stream " << event.streamID() << " value " + << value << std::endl; +#endif + } +} + +DEFINE_FWK_MODULE(TestBatchingProducer); diff --git a/src/fwtest/plugin-TestBatching/TestBatchingProducerExternalWork.cc b/src/fwtest/plugin-TestBatching/TestBatchingProducerExternalWork.cc new file mode 100644 index 000000000..f76d87417 --- /dev/null +++ b/src/fwtest/plugin-TestBatching/TestBatchingProducerExternalWork.cc @@ -0,0 +1,79 @@ +#include +#include +#include +#include +#include +#include + +#include "Framework/EDProducer.h" +#include "Framework/Event.h" +#include "Framework/EventRange.h" +#include "Framework/PluginFactory.h" + +namespace { + std::atomic nevents = 0; +} + +// test using an std::map instead of a simpler std::vector +using TestBatchingProducerExternalWorkAsyncState = std::map>; + +class TestBatchingProducerExternalWork + : public edm::EDBatchingProducerExternalWork { +public: + explicit TestBatchingProducerExternalWork(edm::ProductRegistry& reg); + +private: + void acquire(edm::ConstEventRange events, + edm::EventSetup const& eventSetup, + edm::WaitingTaskWithArenaHolder holder, + AsyncState& state) const override; + void produce(edm::EventRange events, edm::EventSetup const& eventSetup, AsyncState& state) override; + + void endJob() override; + + const edm::EDGetTokenT getToken_; +}; + +TestBatchingProducerExternalWork::TestBatchingProducerExternalWork(edm::ProductRegistry& reg) + : getToken_(reg.consumes()) {} + +void TestBatchingProducerExternalWork::acquire(edm::ConstEventRange events, + edm::EventSetup const& eventSetup, + edm::WaitingTaskWithArenaHolder holder, + AsyncState& state) const { + for (edm::Event const& event : events) { + auto const value = event.get(getToken_); + assert(value == static_cast(event.eventID() + 10 * event.streamID() + 100)); + + // cannot move form the holder as it is used more than once + state[event.eventID()] = std::async([holder]() mutable { + using namespace std::chrono_literals; + std::this_thread::sleep_for(1s); + holder.doneWaiting(); + return 42; + }); + +#ifndef FWTEST_SILENT + std::cout << "TestBatchingProducerExternalWork::acquire Event " << event.eventID() << " stream " << event.streamID() + << " value " << value << std::endl; +#endif + } +} + +void TestBatchingProducerExternalWork::produce(edm::EventRange events, + edm::EventSetup const& eventSetup, + AsyncState& state) { +#ifndef FWTEST_SILENT + for (edm::Event& event : events) { + std::cout << "TestBatchingProducerExternalWork::produce Event " << event.eventID() << " stream " << event.streamID() + << " from future " << state[event.eventID()].get() << std::endl; + } +#endif + ++nevents; +} + +void TestBatchingProducerExternalWork::endJob() { + std::cout << "TestBatchingProducerExternalWork::endJob processed " << nevents.load() << " events" << std::endl; +} + +DEFINE_FWK_MODULE(TestBatchingProducerExternalWork); From 0320980bd5706fb2f3b97dafe22d069dddacd587 Mon Sep 17 00:00:00 2001 From: Andrea Bocci Date: Sat, 5 Dec 2020 00:19:34 +0100 Subject: [PATCH 3/3] Copy event batching support from fwtest to cuda Adapt the EDProducerExternalWork modules to the new interface --- src/cuda/Framework/EDProducer.h | 149 +++++++++++++++++- src/cuda/Framework/EventBatch.h | 36 +++++ src/cuda/Framework/EventRange.h | 96 +++++++++++ src/cuda/Framework/WaitingTaskHolder.h | 16 +- .../Framework/WaitingTaskWithArenaHolder.cc | 18 ++- .../Framework/WaitingTaskWithArenaHolder.h | 6 + src/cuda/Framework/Worker.cc | 5 +- src/cuda/Framework/Worker.h | 46 +++--- src/cuda/bin/EventProcessor.cc | 5 +- src/cuda/bin/EventProcessor.h | 3 +- src/cuda/bin/PluginManager.cc | 2 +- src/cuda/bin/Source.cc | 60 ++++--- src/cuda/bin/Source.h | 7 +- src/cuda/bin/StreamSchedule.cc | 31 ++-- src/cuda/bin/StreamSchedule.h | 2 +- src/cuda/bin/main.cc | 10 +- .../PixelTrackSoAFromCUDA.cc | 24 +-- .../PixelVertexSoAFromCUDA.cc | 20 +-- .../SiPixelRawToClusterCUDA.cc | 74 ++++----- .../SiPixelDigisSoAFromCUDA.cc | 50 +++--- src/cuda/plugin-Validation/HistoValidator.cc | 121 +++++++------- 21 files changed, 557 insertions(+), 224 deletions(-) create mode 100644 src/cuda/Framework/EventBatch.h create mode 100644 src/cuda/Framework/EventRange.h diff --git a/src/cuda/Framework/EDProducer.h b/src/cuda/Framework/EDProducer.h index 8160b8c96..1cc63f58d 100644 --- a/src/cuda/Framework/EDProducer.h +++ b/src/cuda/Framework/EDProducer.h @@ -1,6 +1,9 @@ #ifndef EDProducerBase_h #define EDProducerBase_h +#include + +#include "Framework/EventRange.h" #include "Framework/WaitingTaskWithArenaHolder.h" namespace edm { @@ -14,9 +17,13 @@ namespace edm { bool hasAcquire() const { return false; } - void doAcquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {} + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {} - void doProduce(Event& event, EventSetup const& eventSetup) { produce(event, eventSetup); } + void doProduce(EventRange events, EventSetup const& eventSetup) { + for (Event& event : events) { + produce(event, eventSetup); + } + } virtual void produce(Event& event, EventSetup const& eventSetup) = 0; @@ -27,27 +34,157 @@ namespace edm { private: }; + class EDBatchingProducer { + public: + EDBatchingProducer() = default; + virtual ~EDBatchingProducer() = default; + + bool hasAcquire() const { return false; } + + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {} + + void doProduce(EventRange events, EventSetup const& eventSetup) { produce(events, eventSetup); } + + virtual void produce(EventRange events, EventSetup const& eventSetup) = 0; + + void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + }; + + template class EDProducerExternalWork { public: + using AsyncState = T; + EDProducerExternalWork() = default; virtual ~EDProducerExternalWork() = default; bool hasAcquire() const { return true; } - void doAcquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { - acquire(event, eventSetup, std::move(holder)); + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { + if (events.size() > statesSize_) { + statesSize_ = events.size(); + states_ = std::make_unique(statesSize_); + } + for (size_t i = 0; i < events.size(); ++i) { + acquire(events[i], eventSetup, holder, states_[i]); + } } - void doProduce(Event& event, EventSetup const& eventSetup) { produce(event, eventSetup); } + virtual void acquire(Event const& event, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder, + AsyncState& state) const = 0; + + void doProduce(EventRange events, EventSetup const& eventSetup) { + for (size_t i = 0; i < events.size(); ++i) { + produce(events[i], eventSetup, states_[i]); + } + } + + virtual void produce(Event& event, EventSetup const& eventSetup, AsyncState& state) = 0; + + void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + size_t statesSize_ = 0; + std::unique_ptr states_; + }; + + template <> + class EDProducerExternalWork { + public: + EDProducerExternalWork() = default; + virtual ~EDProducerExternalWork() = default; + + bool hasAcquire() const { return true; } + + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { + for (Event const& event : events) { + acquire(event, eventSetup, holder); + } + } + + virtual void acquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) const = 0; + + void doProduce(EventRange events, EventSetup const& eventSetup) { + for (Event& event : events) { + produce(event, eventSetup); + } + } - virtual void acquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) = 0; virtual void produce(Event& event, EventSetup const& eventSetup) = 0; void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + }; + + template + class EDBatchingProducerExternalWork { + public: + using AsyncState = T; + + EDBatchingProducerExternalWork() = default; + virtual ~EDBatchingProducerExternalWork() = default; + + bool hasAcquire() const { return true; } + + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { + acquire(events, eventSetup, holder, state_); + } + + virtual void acquire(ConstEventRange events, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder, + AsyncState& state) const = 0; + + void doProduce(EventRange events, EventSetup const& eventSetup) { produce(events, eventSetup, state_); } + + virtual void produce(EventRange events, EventSetup const& eventSetup, AsyncState& states) = 0; + + void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + AsyncState state_; + }; + + template <> + class EDBatchingProducerExternalWork { + public: + EDBatchingProducerExternalWork() = default; + virtual ~EDBatchingProducerExternalWork() = default; + + bool hasAcquire() const { return true; } + + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { + acquire(events, eventSetup, holder); + } + + virtual void acquire(ConstEventRange events, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder) const = 0; + + void doProduce(EventRange events, EventSetup const& eventSetup) { produce(events, eventSetup); } + + virtual void produce(EventRange events, EventSetup const& eventSetup) = 0; + + void doEndJob() { endJob(); } + virtual void endJob() {} private: }; + } // namespace edm #endif diff --git a/src/cuda/Framework/EventBatch.h b/src/cuda/Framework/EventBatch.h new file mode 100644 index 000000000..3632c41ea --- /dev/null +++ b/src/cuda/Framework/EventBatch.h @@ -0,0 +1,36 @@ +#ifndef EventBatch_h +#define EventBatch_h + +#include + +#include "Framework/Event.h" +#include "Framework/EventRange.h" + +namespace edm { + + class EventBatch { + public: + Event& emplace(int streamId, int eventId, ProductRegistry const& reg) { + events_.emplace_back(streamId, eventId, reg); + return events_.back(); + } + + void reserve(size_t capacity) { events_.reserve(capacity); } + + void clear() { events_.clear(); } + + size_t size() const { return events_.size(); } + + bool empty() const { return events_.empty(); } + + EventRange range() { return EventRange(&events_.front(), &events_.back() + 1); } + + ConstEventRange range() const { return ConstEventRange(&events_.front(), &events_.back() + 1); } + + private: + std::vector events_; + }; + +} // namespace edm + +#endif // EventBatch_h diff --git a/src/cuda/Framework/EventRange.h b/src/cuda/Framework/EventRange.h new file mode 100644 index 000000000..9a9cca645 --- /dev/null +++ b/src/cuda/Framework/EventRange.h @@ -0,0 +1,96 @@ +#ifndef EventRange_h +#define EventRange_h + +#include +#include +#include + +#include "Framework/Event.h" + +namespace edm { + + class EventRange { + public: + EventRange(Event* begin, Event* end) : begin_(begin), end_(end) { + assert(begin_); + assert(end_); + assert(end_ >= begin_); + } + + Event* begin() { return begin_; } + + Event const* begin() const { return begin_; } + + Event* end() { return end_; } + + Event const* end() const { return end_; } + + size_t size() const { return end_ - begin_; } + + bool empty() const { return end_ == begin_; } + + Event& at(size_t index) { + if (index >= size()) { + std::stringstream msg; + msg << "EventRange::at() range check failed: index " << index << " is outside the range [0.." << size() << ")"; + throw std::out_of_range(msg.str()); + } + return begin_[index]; + } + + Event const& at(size_t index) const { + if (index >= size()) { + std::stringstream msg; + msg << "EventRange::at() range check failed: index " << index << " is outside the range [0.." << size() << ")"; + throw std::out_of_range(msg.str()); + } + return begin_[index]; + } + + Event& operator[](size_t index) { return begin_[index]; } + + Event const& operator[](size_t index) const { return begin_[index]; } + + private: + Event* begin_; + Event* end_; + }; + + class ConstEventRange { + public: + ConstEventRange(Event const* begin, Event const* end) : begin_(begin), end_(end) { + assert(begin_); + assert(end_); + assert(end_ >= begin_); + } + + ConstEventRange(EventRange range) : begin_(range.begin()), end_(range.end()) {} + + Event const* begin() const { return begin_; } + + Event const* end() const { return end_; } + + size_t size() const { return end_ - begin_; } + + bool empty() const { return end_ == begin_; } + + Event const& at(size_t index) { + if (index >= size()) { + std::stringstream msg; + msg << "ConstEventRange::at() range check failed: index " << index << " is outside the range [0.." << size() + << ")"; + throw std::out_of_range(msg.str()); + } + return begin_[index]; + } + + Event const& operator[](size_t index) const { return begin_[index]; } + + private: + Event const* begin_; + Event const* end_; + }; + +} // namespace edm + +#endif // EventRange_h diff --git a/src/cuda/Framework/WaitingTaskHolder.h b/src/cuda/Framework/WaitingTaskHolder.h index 89ed970c0..592fbb99c 100644 --- a/src/cuda/Framework/WaitingTaskHolder.h +++ b/src/cuda/Framework/WaitingTaskHolder.h @@ -40,7 +40,7 @@ namespace edm { } ~WaitingTaskHolder() { if (m_task) { - doneWaiting(std::exception_ptr{}); + doneWaiting(); } } @@ -89,11 +89,8 @@ namespace edm { } } - void doneWaiting(std::exception_ptr iExcept) { - if (iExcept) { - m_task->dependentTaskFailed(iExcept); - } - //task_group::run can run the task before we finish + void doneWaiting() { + //spawn can run the task before we finish // doneWaiting and some other thread might // try to reuse this object. Resetting // before spawn avoids problems @@ -107,6 +104,13 @@ namespace edm { } } + void doneWaiting(std::exception_ptr iExcept) { + if (iExcept) { + m_task->dependentTaskFailed(iExcept); + } + doneWaiting(); + } + private: WaitingTask* release_no_decrement() noexcept { auto t = m_task; diff --git a/src/cuda/Framework/WaitingTaskWithArenaHolder.cc b/src/cuda/Framework/WaitingTaskWithArenaHolder.cc index abcb920ec..a6b76be0d 100644 --- a/src/cuda/Framework/WaitingTaskWithArenaHolder.cc +++ b/src/cuda/Framework/WaitingTaskWithArenaHolder.cc @@ -38,7 +38,7 @@ namespace edm { WaitingTaskWithArenaHolder::~WaitingTaskWithArenaHolder() { if (m_task) { - doneWaiting(std::exception_ptr{}); + doneWaiting(); } } @@ -79,10 +79,7 @@ namespace edm { // into the correct arena of threads. Use of the arena allows doneWaiting // to be called from a thread outside the arena of threads that will manage // the task. doneWaiting can be called from a non-TBB thread. - void WaitingTaskWithArenaHolder::doneWaiting(std::exception_ptr iExcept) { - if (iExcept) { - m_task->dependentTaskFailed(iExcept); - } + void WaitingTaskWithArenaHolder::doneWaiting() { //enqueue can run the task before we finish // doneWaiting and some other thread might // try to reuse this object. Resetting @@ -96,6 +93,17 @@ namespace edm { } } + // This spawns the task. The arena is needed to get the task spawned + // into the correct arena of threads. Use of the arena allows doneWaiting + // to be called from a thread outside the arena of threads that will manage + // the task. doneWaiting can be called from a non-TBB thread. + void WaitingTaskWithArenaHolder::doneWaiting(std::exception_ptr iExcept) { + if (iExcept) { + m_task->dependentTaskFailed(iExcept); + } + doneWaiting(); + } + // This next function is useful if you know from the context that // m_arena (which is set when the constructor was executes) is the // same arena in which you want to execute the doneWaiting function. diff --git a/src/cuda/Framework/WaitingTaskWithArenaHolder.h b/src/cuda/Framework/WaitingTaskWithArenaHolder.h index d0c27743c..f6763544d 100644 --- a/src/cuda/Framework/WaitingTaskWithArenaHolder.h +++ b/src/cuda/Framework/WaitingTaskWithArenaHolder.h @@ -53,6 +53,12 @@ namespace edm { WaitingTaskWithArenaHolder& operator=(WaitingTaskWithArenaHolder&& iRHS); + // This spawns the task. The arena is needed to get the task spawned + // into the correct arena of threads. Use of the arena allows doneWaiting + // to be called from a thread outside the arena of threads that will manage + // the task. doneWaiting can be called from a non-TBB thread. + void doneWaiting(); + // This spawns the task. The arena is needed to get the task spawned // into the correct arena of threads. Use of the arena allows doneWaiting // to be called from a thread outside the arena of threads that will manage diff --git a/src/cuda/Framework/Worker.cc b/src/cuda/Framework/Worker.cc index 6c1a5a57d..1b7d6ffb1 100644 --- a/src/cuda/Framework/Worker.cc +++ b/src/cuda/Framework/Worker.cc @@ -1,14 +1,15 @@ +//#include #include "Framework/Worker.h" namespace edm { - void Worker::prefetchAsync(Event& event, EventSetup const& eventSetup, WaitingTaskHolder iTask) { + void Worker::prefetchAsync(EventRange events, EventSetup const& eventSetup, WaitingTaskHolder iTask) { //std::cout << "prefetchAsync for " << this << " iTask " << iTask << std::endl; bool expected = false; if (prefetchRequested_.compare_exchange_strong(expected, true)) { //std::cout << "first prefetch call" << std::endl; for (Worker* dep : itemsToGet_) { //std::cout << "calling doWorkAsync for " << dep << " with " << iTask << std::endl; - dep->doWorkAsync(event, eventSetup, iTask); + dep->doWorkAsync(events, eventSetup, iTask); } } } diff --git a/src/cuda/Framework/Worker.h b/src/cuda/Framework/Worker.h index cd5631c69..c248fc562 100644 --- a/src/cuda/Framework/Worker.h +++ b/src/cuda/Framework/Worker.h @@ -2,9 +2,10 @@ #define Worker_h #include -#include //#include +#include +#include "Framework/EventRange.h" #include "Framework/WaitingTask.h" #include "Framework/WaitingTaskHolder.h" #include "Framework/WaitingTaskList.h" @@ -23,10 +24,10 @@ namespace edm { void setItemsToGet(std::vector workers) { itemsToGet_ = std::move(workers); } // thread safe - void prefetchAsync(Event& event, EventSetup const& eventSetup, WaitingTaskHolder iTask); + void prefetchAsync(EventRange events, EventSetup const& eventSetup, WaitingTaskHolder iTask); // not thread safe - virtual void doWorkAsync(Event& event, EventSetup const& eventSetup, WaitingTaskHolder iTask) = 0; + virtual void doWorkAsync(EventRange events, EventSetup const& eventSetup, WaitingTaskHolder iTask) = 0; // not thread safe virtual void doEndJob() = 0; @@ -50,22 +51,22 @@ namespace edm { public: explicit WorkerT(ProductRegistry& reg) : producer_(reg) {} - void doWorkAsync(Event& event, EventSetup const& eventSetup, WaitingTaskHolder task) override { + void doWorkAsync(EventRange events, EventSetup const& eventSetup, WaitingTaskHolder task) override { waitingTasksWork_.add(task); //std::cout << "doWorkAsync for " << this << " with iTask " << iTask << std::endl; bool expected = false; if (workStarted_.compare_exchange_strong(expected, true)) { //std::cout << "first doWorkAsync call" << std::endl; - WaitingTask* moduleTask = - make_waiting_task([this, &event, &eventSetup](std::exception_ptr const* iPtr) mutable { + WaitingTask* moduleTask = + make_waiting_task([this, events, &eventSetup](std::exception_ptr const* iPtr) mutable { if (iPtr) { waitingTasksWork_.doneWaiting(*iPtr); } else { std::exception_ptr exceptionPtr; try { //std::cout << "calling doProduce " << this << std::endl; - producer_.doProduce(event, eventSetup); + producer_.doProduce(events, eventSetup); } catch (...) { exceptionPtr = std::current_exception(); } @@ -76,23 +77,24 @@ namespace edm { auto* group = task.group(); if (producer_.hasAcquire()) { WaitingTaskWithArenaHolder runProduceHolder{*group, moduleTask}; - moduleTask = make_waiting_task([this, &event, &eventSetup, runProduceHolder = std::move(runProduceHolder)]( - std::exception_ptr const* iPtr) mutable { - if (iPtr) { - runProduceHolder.doneWaiting(*iPtr); - } else { - std::exception_ptr exceptionPtr; - try { - producer_.doAcquire(event, eventSetup, runProduceHolder); - } catch (...) { - exceptionPtr = std::current_exception(); - } - runProduceHolder.doneWaiting(exceptionPtr); - } - }); + moduleTask = make_waiting_task([this, events = ConstEventRange(events), &eventSetup, runProduceHolder = + std::move(runProduceHolder)]( + std::exception_ptr const* iPtr) mutable { + if (iPtr) { + runProduceHolder.doneWaiting(*iPtr); + } else { + std::exception_ptr exceptionPtr; + try { + producer_.doAcquire(events, eventSetup, runProduceHolder); + } catch (...) { + exceptionPtr = std::current_exception(); + } + runProduceHolder.doneWaiting(exceptionPtr); + } + }); } //std::cout << "calling prefetchAsync " << this << " with moduleTask " << moduleTask << std::endl; - prefetchAsync(event, eventSetup, WaitingTaskHolder(*group, moduleTask)); + prefetchAsync(events, eventSetup, WaitingTaskHolder(*group, moduleTask)); } } diff --git a/src/cuda/bin/EventProcessor.cc b/src/cuda/bin/EventProcessor.cc index 7e4a4e3a6..011f0b33c 100644 --- a/src/cuda/bin/EventProcessor.cc +++ b/src/cuda/bin/EventProcessor.cc @@ -5,14 +5,15 @@ #include "EventProcessor.h" namespace edm { - EventProcessor::EventProcessor(int maxEvents, + EventProcessor::EventProcessor(int batchEvents, + int maxEvents, int runForMinutes, int numberOfStreams, std::vector const& path, std::vector const& esproducers, std::filesystem::path const& datadir, bool validation) - : source_(maxEvents, runForMinutes, registry_, datadir, validation) { + : source_(batchEvents, maxEvents, runForMinutes, registry_, datadir, validation) { for (auto const& name : esproducers) { pluginManager_.load(name); auto esp = ESPluginFactory::create(name, datadir); diff --git a/src/cuda/bin/EventProcessor.h b/src/cuda/bin/EventProcessor.h index dfaa15e1b..1185c5073 100644 --- a/src/cuda/bin/EventProcessor.h +++ b/src/cuda/bin/EventProcessor.h @@ -14,7 +14,8 @@ namespace edm { class EventProcessor { public: - explicit EventProcessor(int maxEvents, + explicit EventProcessor(int batchEvents, + int maxEvents, int runForMinutes, int numberOfStreams, std::vector const& path, diff --git a/src/cuda/bin/PluginManager.cc b/src/cuda/bin/PluginManager.cc index 7a854f8ce..7de2de2f5 100644 --- a/src/cuda/bin/PluginManager.cc +++ b/src/cuda/bin/PluginManager.cc @@ -1,5 +1,5 @@ -#include #include +//#include #include "PluginManager.h" diff --git a/src/cuda/bin/Source.cc b/src/cuda/bin/Source.cc index b1e46b044..fe5360be3 100644 --- a/src/cuda/bin/Source.cc +++ b/src/cuda/bin/Source.cc @@ -24,9 +24,11 @@ namespace { namespace edm { Source::Source( - int maxEvents, int runForMinutes, ProductRegistry ®, std::filesystem::path const &datadir, bool validation) - : maxEvents_(maxEvents), + int batchEvents, int maxEvents, int runForMinutes, ProductRegistry ®, std::filesystem::path const &datadir, bool validation) + : batchEvents_(batchEvents), + maxEvents_(maxEvents), runForMinutes_(runForMinutes), + numEvents_(0), rawToken_(reg.produces()), validation_(validation) { std::ifstream in_raw(datadir / "raw.bin", std::ios::binary); @@ -78,6 +80,10 @@ namespace edm { assert(raw_.size() == vertices_.size()); } + if (batchEvents_ < 1) { + batchEvents_ = 1; + } + if (runForMinutes_ < 0 and maxEvents_ < 0) { maxEvents_ = raw_.size(); } @@ -89,20 +95,29 @@ namespace edm { } } - std::unique_ptr Source::produce(int streamId, ProductRegistry const ®) { + EventBatch Source::produce(int streamId, ProductRegistry const ®) { if (shouldStop_) { - return nullptr; + return {}; } - const int old = numEvents_.fetch_add(1); - const int iev = old + 1; + // atomically increase the event counter, without overflowing over maxEvents_ + int old_value, new_value; + if (runForMinutes_ < 0) { - if (old >= maxEvents_) { + // atomically increase the event counter, without overflowing over maxEvents_ + old_value = numEvents_; + do { + new_value = std::min(old_value + batchEvents_, maxEvents_); + } + while (not numEvents_.compare_exchange_weak(old_value, new_value)); + if (old_value >= maxEvents_) { shouldStop_ = true; - --numEvents_; - return nullptr; + return {}; } } else { + // atomically increase the event counter, and periodically check if runForMinutes_ have passed + old_value = numEvents_.fetch_add(batchEvents_); + new_value = old_value + batchEvents_; if (numEvents_ - numEventsTimeLastCheck_ > static_cast(raw_.size())) { std::scoped_lock lock(timeMutex_); // if some other thread beat us, no need to do anything @@ -114,21 +129,28 @@ namespace edm { numEventsTimeLastCheck_ = (numEvents_ / raw_.size()) * raw_.size(); } if (shouldStop_) { - --numEvents_; - return nullptr; + numEvents_ -= batchEvents_; + return {}; } } } - auto ev = std::make_unique(streamId, iev, reg); - const int index = old % raw_.size(); - ev->emplace(rawToken_, raw_[index]); - if (validation_) { - ev->emplace(digiClusterToken_, digiclusters_[index]); - ev->emplace(trackToken_, tracks_[index]); - ev->emplace(vertexToken_, vertices_[index]); + // check how many events should be read + const int size = new_value - old_value; + EventBatch events; + events.reserve(size); + for (int iev = old_value + 1; iev <= new_value; ++iev) { + Event &event = events.emplace(streamId, iev, reg); + const int index = (iev - 1) % raw_.size(); + + event.emplace(rawToken_, raw_[index]); + if (validation_) { + event.emplace(digiClusterToken_, digiclusters_[index]); + event.emplace(trackToken_, tracks_[index]); + event.emplace(vertexToken_, vertices_[index]); + } } - return ev; + return events; } } // namespace edm diff --git a/src/cuda/bin/Source.h b/src/cuda/bin/Source.h index c29685c07..a7ea6713d 100644 --- a/src/cuda/bin/Source.h +++ b/src/cuda/bin/Source.h @@ -7,8 +7,10 @@ #include #include #include +#include #include "Framework/Event.h" +#include "Framework/EventBatch.h" #include "DataFormats/FEDRawDataCollection.h" #include "DataFormats/DigiClusterCount.h" #include "DataFormats/TrackCount.h" @@ -18,7 +20,7 @@ namespace edm { class Source { public: explicit Source( - int maxEvents, int runForMinutes, ProductRegistry& reg, std::filesystem::path const& datadir, bool validation); + int batchEvents, int maxEvents, int runForMinutes, ProductRegistry& reg, std::filesystem::path const& datadir, bool validation); void startProcessing(); @@ -26,9 +28,10 @@ namespace edm { int processedEvents() const { return numEvents_; } // thread safe - std::unique_ptr produce(int streamId, ProductRegistry const& reg); + EventBatch produce(int streamId, ProductRegistry const& reg); private: + int batchEvents_; int maxEvents_; // these are all for the mode where the processing length is limited by time diff --git a/src/cuda/bin/StreamSchedule.cc b/src/cuda/bin/StreamSchedule.cc index 4a18c39a3..d510f94aa 100644 --- a/src/cuda/bin/StreamSchedule.cc +++ b/src/cuda/bin/StreamSchedule.cc @@ -1,4 +1,5 @@ -//#include +#include +#include #include @@ -43,7 +44,7 @@ namespace edm { StreamSchedule& StreamSchedule::operator=(StreamSchedule&&) = default; void StreamSchedule::runToCompletionAsync(WaitingTaskHolder h) { - auto task = make_functor_task([this, h]() mutable { processOneEventAsync(std::move(h)); }); + auto task = make_functor_task([this, h]() mutable { processEventBatchAsync(std::move(h)); }); if (streamId_ == 0) { h.group()->run([task]() { TaskSentry s{task}; @@ -58,24 +59,24 @@ namespace edm { } } - void StreamSchedule::processOneEventAsync(WaitingTaskHolder h) { - auto event = source_->produce(streamId_, registry_); - if (event) { - // Pass the event object ownership to the "end-of-event" task - // Pass a non-owning pointer to the event to preceding tasks - //std::cout << "Begin processing event " << event->eventID() << std::endl; - auto eventPtr = event.get(); + void StreamSchedule::processEventBatchAsync(WaitingTaskHolder h) { + auto events = source_->produce(streamId_, registry_); + if (not events.empty()) { + // Pass the event batch ownership to the "end-of-event" task + // Pass a non-owning event range to the preceding tasks + //std::cout << "Begin processing a batch of " << events.size() << " events starting from " << events.range().at(0).eventID() << std::endl; + auto eventsRange = events.range(); auto* group = h.group(); - auto nextEventTask = - make_waiting_task([this, h = std::move(h), ev = std::move(event)](std::exception_ptr const* iPtr) mutable { - ev.reset(); + auto nextEventTask = + make_waiting_task([this, h = std::move(h), events = std::move(events)](std::exception_ptr const* iPtr) mutable { + events.clear(); if (iPtr) { h.doneWaiting(*iPtr); } else { for (auto const& worker : path_) { worker->reset(); } - processOneEventAsync(std::move(h)); + processEventBatchAsync(std::move(h)); } }); // To guarantee that the nextEventTask is spawned also in @@ -85,10 +86,10 @@ namespace edm { for (auto iWorker = path_.rbegin(); iWorker != path_.rend(); ++iWorker) { //std::cout << "calling doWorkAsync for " << iWorker->get() << " with nextEventTask " << nextEventTask << std::endl; - (*iWorker)->doWorkAsync(*eventPtr, *eventSetup_, nextEventTaskHolder); + (*iWorker)->doWorkAsync(eventsRange, *eventSetup_, nextEventTaskHolder); } } else { - h.doneWaiting(std::exception_ptr{}); + h.doneWaiting(); } } diff --git a/src/cuda/bin/StreamSchedule.h b/src/cuda/bin/StreamSchedule.h index 1bd364c70..a62dc3961 100644 --- a/src/cuda/bin/StreamSchedule.h +++ b/src/cuda/bin/StreamSchedule.h @@ -38,7 +38,7 @@ namespace edm { void endJob(); private: - void processOneEventAsync(WaitingTaskHolder h); + void processEventBatchAsync(WaitingTaskHolder h); ProductRegistry registry_; Source* source_; diff --git a/src/cuda/bin/main.cc b/src/cuda/bin/main.cc index 38a5c24d6..5a403c50c 100644 --- a/src/cuda/bin/main.cc +++ b/src/cuda/bin/main.cc @@ -25,7 +25,8 @@ namespace { "[--histogram] [--empty]\n\n" << "Options\n" << " --numberOfThreads Number of threads to use (default 1, use 0 to use all CPU cores)\n" - << " --numberOfStreams Number of concurrent events (default 0 = numberOfThreads)\n" + << " --numberOfStreams Number of concurrent batch of events (default 0=numberOfThreads)\n" + << " --batchEvents Number of events to process in a batch (default 1 for individual events)\n" << " --maxEvents Number of events to process (default -1 for all events in the input file)\n" << " --runForMinutes Continue processing the set of 1000 events until this many minutes have passed " "(default -1 for disabled; conflicts with --maxEvents)\n" @@ -43,6 +44,7 @@ int main(int argc, char** argv) { std::vector args(argv, argv + argc); int numberOfThreads = 1; int numberOfStreams = 0; + int batchEvents = 1; int maxEvents = -1; int runForMinutes = -1; std::filesystem::path datadir; @@ -60,6 +62,9 @@ int main(int argc, char** argv) { } else if (*i == "--numberOfStreams") { ++i; numberOfStreams = std::stoi(*i); + } else if (*i == "--batchEvents") { + ++i; + batchEvents = std::stoul(*i); } else if (*i == "--maxEvents") { ++i; maxEvents = std::stoi(*i); @@ -146,7 +151,8 @@ int main(int argc, char** argv) { } } edm::EventProcessor processor( - maxEvents, runForMinutes, numberOfStreams, std::move(edmodules), std::move(esmodules), datadir, validation); + batchEvents, maxEvents, runForMinutes, numberOfStreams, std::move(edmodules), std::move(esmodules), datadir, validation); + maxEvents = processor.maxEvents(); if (runForMinutes < 0) { std::cout << "Processing " << processor.maxEvents() << " events, of which " << numberOfStreams diff --git a/src/cuda/plugin-PixelTrackFitting/PixelTrackSoAFromCUDA.cc b/src/cuda/plugin-PixelTrackFitting/PixelTrackSoAFromCUDA.cc index 66e93f818..943dd44ce 100644 --- a/src/cuda/plugin-PixelTrackFitting/PixelTrackSoAFromCUDA.cc +++ b/src/cuda/plugin-PixelTrackFitting/PixelTrackSoAFromCUDA.cc @@ -9,7 +9,9 @@ #include "Framework/EDProducer.h" #include "CUDACore/ScopedContext.h" -class PixelTrackSoAFromCUDA : public edm::EDProducerExternalWork { +using PixelTrackSoAFromCUDA_AsyncState = cms::cuda::host::unique_ptr; + +class PixelTrackSoAFromCUDA : public edm::EDProducerExternalWork { public: explicit PixelTrackSoAFromCUDA(edm::ProductRegistry& reg); ~PixelTrackSoAFromCUDA() override = default; @@ -17,13 +19,12 @@ class PixelTrackSoAFromCUDA : public edm::EDProducerExternalWork { private: void acquire(edm::Event const& iEvent, edm::EventSetup const& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) override; - void produce(edm::Event& iEvent, edm::EventSetup const& iSetup) override; + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const override; + void produce(edm::Event& iEvent, edm::EventSetup const& iSetup, AsyncState& state) override; edm::EDGetTokenT> tokenCUDA_; edm::EDPutTokenT tokenSOA_; - - cms::cuda::host::unique_ptr m_soa; }; PixelTrackSoAFromCUDA::PixelTrackSoAFromCUDA(edm::ProductRegistry& reg) @@ -32,17 +33,18 @@ PixelTrackSoAFromCUDA::PixelTrackSoAFromCUDA(edm::ProductRegistry& reg) void PixelTrackSoAFromCUDA::acquire(edm::Event const& iEvent, edm::EventSetup const& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) { + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const { cms::cuda::Product const& inputDataWrapped = iEvent.get(tokenCUDA_); cms::cuda::ScopedContextAcquire ctx{inputDataWrapped, std::move(waitingTaskHolder)}; auto const& inputData = ctx.get(inputDataWrapped); - m_soa = inputData.toHostAsync(ctx.stream()); + state = inputData.toHostAsync(ctx.stream()); } -void PixelTrackSoAFromCUDA::produce(edm::Event& iEvent, edm::EventSetup const& iSetup) { +void PixelTrackSoAFromCUDA::produce(edm::Event& iEvent, edm::EventSetup const& iSetup, AsyncState& state) { /* - auto const & tsoa = *m_soa; + auto const & tsoa = *state; auto maxTracks = tsoa.stride(); std::cout << "size of SoA" << sizeof(tsoa) << " stride " << maxTracks << std::endl; @@ -57,9 +59,9 @@ void PixelTrackSoAFromCUDA::produce(edm::Event& iEvent, edm::EventSetup const& i */ // DO NOT make a copy (actually TWO....) - iEvent.emplace(tokenSOA_, PixelTrackHeterogeneous(std::move(m_soa))); + iEvent.emplace(tokenSOA_, PixelTrackHeterogeneous(std::move(state))); - assert(!m_soa); + assert(!state); } DEFINE_FWK_MODULE(PixelTrackSoAFromCUDA); diff --git a/src/cuda/plugin-PixelVertexFinding/PixelVertexSoAFromCUDA.cc b/src/cuda/plugin-PixelVertexFinding/PixelVertexSoAFromCUDA.cc index d709f0c5e..cbb4da507 100644 --- a/src/cuda/plugin-PixelVertexFinding/PixelVertexSoAFromCUDA.cc +++ b/src/cuda/plugin-PixelVertexFinding/PixelVertexSoAFromCUDA.cc @@ -10,7 +10,9 @@ #include "Framework/RunningAverage.h" #include "CUDACore/ScopedContext.h" -class PixelVertexSoAFromCUDA : public edm::EDProducerExternalWork { +using PixelVertexSoAFromCUDA_AsyncState = cms::cuda::host::unique_ptr; + +class PixelVertexSoAFromCUDA : public edm::EDProducerExternalWork { public: explicit PixelVertexSoAFromCUDA(edm::ProductRegistry& reg); ~PixelVertexSoAFromCUDA() override = default; @@ -18,13 +20,12 @@ class PixelVertexSoAFromCUDA : public edm::EDProducerExternalWork { private: void acquire(edm::Event const& iEvent, edm::EventSetup const& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) override; - void produce(edm::Event& iEvent, edm::EventSetup const& iSetup) override; + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const override; + void produce(edm::Event& iEvent, edm::EventSetup const& iSetup, AsyncState& state) override; edm::EDGetTokenT> tokenCUDA_; edm::EDPutTokenT tokenSOA_; - - cms::cuda::host::unique_ptr m_soa; }; PixelVertexSoAFromCUDA::PixelVertexSoAFromCUDA(edm::ProductRegistry& reg) @@ -33,17 +34,18 @@ PixelVertexSoAFromCUDA::PixelVertexSoAFromCUDA(edm::ProductRegistry& reg) void PixelVertexSoAFromCUDA::acquire(edm::Event const& iEvent, edm::EventSetup const& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) { + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const { auto const& inputDataWrapped = iEvent.get(tokenCUDA_); cms::cuda::ScopedContextAcquire ctx{inputDataWrapped, std::move(waitingTaskHolder)}; auto const& inputData = ctx.get(inputDataWrapped); - m_soa = inputData.toHostAsync(ctx.stream()); + state = inputData.toHostAsync(ctx.stream()); } -void PixelVertexSoAFromCUDA::produce(edm::Event& iEvent, edm::EventSetup const& iSetup) { +void PixelVertexSoAFromCUDA::produce(edm::Event& iEvent, edm::EventSetup const& iSetup, AsyncState& state) { // No copies.... - iEvent.emplace(tokenSOA_, ZVertexHeterogeneous(std::move(m_soa))); + iEvent.emplace(tokenSOA_, ZVertexHeterogeneous(std::move(state))); } DEFINE_FWK_MODULE(PixelVertexSoAFromCUDA); diff --git a/src/cuda/plugin-SiPixelClusterizer/SiPixelRawToClusterCUDA.cc b/src/cuda/plugin-SiPixelClusterizer/SiPixelRawToClusterCUDA.cc index 06624744e..ff387e1b4 100644 --- a/src/cuda/plugin-SiPixelClusterizer/SiPixelRawToClusterCUDA.cc +++ b/src/cuda/plugin-SiPixelClusterizer/SiPixelRawToClusterCUDA.cc @@ -22,7 +22,13 @@ #include #include -class SiPixelRawToClusterCUDA : public edm::EDProducerExternalWork { +struct SiPixelRawToClusterCUDA_AsyncState { + cms::cuda::ContextState ctx; + pixelgpudetails::SiPixelRawToClusterGPUKernel gpuAlgo; + pixelgpudetails::SiPixelRawToClusterGPUKernel::WordFedAppender wordFedAppender; +}; + +class SiPixelRawToClusterCUDA : public edm::EDProducerExternalWork { public: explicit SiPixelRawToClusterCUDA(edm::ProductRegistry& reg); ~SiPixelRawToClusterCUDA() override = default; @@ -30,19 +36,14 @@ class SiPixelRawToClusterCUDA : public edm::EDProducerExternalWork { private: void acquire(const edm::Event& iEvent, const edm::EventSetup& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) override; - void produce(edm::Event& iEvent, const edm::EventSetup& iSetup) override; - - cms::cuda::ContextState ctxState_; + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const override; + void produce(edm::Event& iEvent, const edm::EventSetup& iSetup, AsyncState& state) override; - edm::EDGetTokenT rawGetToken_; - edm::EDPutTokenT> digiPutToken_; + const edm::EDGetTokenT rawGetToken_; + const edm::EDPutTokenT> digiPutToken_; edm::EDPutTokenT> digiErrorPutToken_; - edm::EDPutTokenT> clusterPutToken_; - - pixelgpudetails::SiPixelRawToClusterGPUKernel gpuAlgo_; - std::unique_ptr wordFedAppender_; - PixelFormatterErrors errors_; + const edm::EDPutTokenT> clusterPutToken_; const bool isRun2_; const bool includeErrors_; @@ -59,14 +60,13 @@ SiPixelRawToClusterCUDA::SiPixelRawToClusterCUDA(edm::ProductRegistry& reg) if (includeErrors_) { digiErrorPutToken_ = reg.produces>(); } - - wordFedAppender_ = std::make_unique(); } void SiPixelRawToClusterCUDA::acquire(const edm::Event& iEvent, const edm::EventSetup& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) { - cms::cuda::ScopedContextAcquire ctx{iEvent.streamID(), std::move(waitingTaskHolder), ctxState_}; + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const { + cms::cuda::ScopedContextAcquire ctx{iEvent.streamID(), std::move(waitingTaskHolder), state.ctx}; auto const& hgpuMap = iSetup.get(); if (hgpuMap.hasQuality() != useQuality_) { @@ -85,7 +85,7 @@ void SiPixelRawToClusterCUDA::acquire(const edm::Event& iEvent, const auto& buffers = iEvent.get(rawGetToken_); - errors_.clear(); + PixelFormatterErrors errors; // GPU specific: Data extraction for RawToDigi GPU unsigned int wordCounterGPU = 0; @@ -115,7 +115,7 @@ void SiPixelRawToClusterCUDA::acquire(const edm::Event& iEvent, // check CRC bit const uint64_t* trailer = reinterpret_cast(rawData.data()) + (nWords - 1); - if (not errorcheck.checkCRC(errorsInEvent, fedId, trailer, errors_)) { + if (not errorcheck.checkCRC(errorsInEvent, fedId, trailer, errors)) { continue; } @@ -125,7 +125,7 @@ void SiPixelRawToClusterCUDA::acquire(const edm::Event& iEvent, bool moreHeaders = true; while (moreHeaders) { header++; - bool headerStatus = errorcheck.checkHeader(errorsInEvent, fedId, header, errors_); + bool headerStatus = errorcheck.checkHeader(errorsInEvent, fedId, header, errors); moreHeaders = headerStatus; } @@ -134,7 +134,7 @@ void SiPixelRawToClusterCUDA::acquire(const edm::Event& iEvent, trailer++; while (moreTrailers) { trailer--; - bool trailerStatus = errorcheck.checkTrailer(errorsInEvent, fedId, nWords, trailer, errors_); + bool trailerStatus = errorcheck.checkTrailer(errorsInEvent, fedId, nWords, trailer, errors); moreTrailers = trailerStatus; } @@ -142,33 +142,33 @@ void SiPixelRawToClusterCUDA::acquire(const edm::Event& iEvent, const uint32_t* ew = (const uint32_t*)(trailer); assert(0 == (ew - bw) % 2); - wordFedAppender_->initializeWordFed(fedId, wordCounterGPU, bw, (ew - bw)); + state.wordFedAppender.initializeWordFed(fedId, wordCounterGPU, bw, (ew - bw)); wordCounterGPU += (ew - bw); } // end of for loop - gpuAlgo_.makeClustersAsync(isRun2_, - gpuMap, - gpuModulesToUnpack, - gpuGains, - *wordFedAppender_, - std::move(errors_), - wordCounterGPU, - fedCounter, - useQuality_, - includeErrors_, - false, // debug - ctx.stream()); + state.gpuAlgo.makeClustersAsync(isRun2_, + gpuMap, + gpuModulesToUnpack, + gpuGains, + state.wordFedAppender, + std::move(errors), + wordCounterGPU, + fedCounter, + useQuality_, + includeErrors_, + false, // debug + ctx.stream()); } -void SiPixelRawToClusterCUDA::produce(edm::Event& iEvent, const edm::EventSetup& iSetup) { - cms::cuda::ScopedContextProduce ctx{ctxState_}; +void SiPixelRawToClusterCUDA::produce(edm::Event& iEvent, const edm::EventSetup& iSetup, AsyncState& state) { + cms::cuda::ScopedContextProduce ctx{state.ctx}; - auto tmp = gpuAlgo_.getResults(); + auto tmp = state.gpuAlgo.getResults(); ctx.emplace(iEvent, digiPutToken_, std::move(tmp.first)); ctx.emplace(iEvent, clusterPutToken_, std::move(tmp.second)); if (includeErrors_) { - ctx.emplace(iEvent, digiErrorPutToken_, gpuAlgo_.getErrors()); + ctx.emplace(iEvent, digiErrorPutToken_, state.gpuAlgo.getErrors()); } } diff --git a/src/cuda/plugin-SiPixelRawToDigi/SiPixelDigisSoAFromCUDA.cc b/src/cuda/plugin-SiPixelRawToDigi/SiPixelDigisSoAFromCUDA.cc index 448f4b797..d3e806a49 100644 --- a/src/cuda/plugin-SiPixelRawToDigi/SiPixelDigisSoAFromCUDA.cc +++ b/src/cuda/plugin-SiPixelRawToDigi/SiPixelDigisSoAFromCUDA.cc @@ -8,7 +8,15 @@ #include "CUDACore/ScopedContext.h" #include "CUDACore/host_unique_ptr.h" -class SiPixelDigisSoAFromCUDA : public edm::EDProducerExternalWork { +struct SiPixelDigisSoAFromCUDA_AsyncState { + cms::cuda::host::unique_ptr pdigi; + cms::cuda::host::unique_ptr rawIdArr; + cms::cuda::host::unique_ptr adc; + cms::cuda::host::unique_ptr clus; + size_t nDigis; +}; + +class SiPixelDigisSoAFromCUDA : public edm::EDProducerExternalWork { public: explicit SiPixelDigisSoAFromCUDA(edm::ProductRegistry& reg); ~SiPixelDigisSoAFromCUDA() override = default; @@ -16,18 +24,12 @@ class SiPixelDigisSoAFromCUDA : public edm::EDProducerExternalWork { private: void acquire(const edm::Event& iEvent, const edm::EventSetup& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) override; - void produce(edm::Event& iEvent, const edm::EventSetup& iSetup) override; - - edm::EDGetTokenT> digiGetToken_; - edm::EDPutTokenT digiPutToken_; + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const override; + void produce(edm::Event& iEvent, const edm::EventSetup& iSetup, AsyncState& state) override; - cms::cuda::host::unique_ptr pdigi_; - cms::cuda::host::unique_ptr rawIdArr_; - cms::cuda::host::unique_ptr adc_; - cms::cuda::host::unique_ptr clus_; - - size_t nDigis_; + const edm::EDGetTokenT> digiGetToken_; + const edm::EDPutTokenT digiPutToken_; }; SiPixelDigisSoAFromCUDA::SiPixelDigisSoAFromCUDA(edm::ProductRegistry& reg) @@ -36,20 +38,20 @@ SiPixelDigisSoAFromCUDA::SiPixelDigisSoAFromCUDA(edm::ProductRegistry& reg) void SiPixelDigisSoAFromCUDA::acquire(const edm::Event& iEvent, const edm::EventSetup& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) { + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const { // Do the transfer in a CUDA stream parallel to the computation CUDA stream cms::cuda::ScopedContextAcquire ctx{iEvent.streamID(), std::move(waitingTaskHolder)}; const auto& gpuDigis = ctx.get(iEvent, digiGetToken_); - - nDigis_ = gpuDigis.nDigis(); - pdigi_ = gpuDigis.pdigiToHostAsync(ctx.stream()); - rawIdArr_ = gpuDigis.rawIdArrToHostAsync(ctx.stream()); - adc_ = gpuDigis.adcToHostAsync(ctx.stream()); - clus_ = gpuDigis.clusToHostAsync(ctx.stream()); + state.pdigi = gpuDigis.pdigiToHostAsync(ctx.stream()); + state.rawIdArr = gpuDigis.rawIdArrToHostAsync(ctx.stream()); + state.adc = gpuDigis.adcToHostAsync(ctx.stream()); + state.clus = gpuDigis.clusToHostAsync(ctx.stream()); + state.nDigis = gpuDigis.nDigis(); } -void SiPixelDigisSoAFromCUDA::produce(edm::Event& iEvent, const edm::EventSetup& iSetup) { +void SiPixelDigisSoAFromCUDA::produce(edm::Event& iEvent, const edm::EventSetup& iSetup, AsyncState& state) { // The following line copies the data from the pinned host memory to // regular host memory. In principle that feels unnecessary (why not // just use the pinned host memory?). There are a few arguments for @@ -60,12 +62,8 @@ void SiPixelDigisSoAFromCUDA::produce(edm::Event& iEvent, const edm::EventSetup& // host memory to be allocated without a CUDA stream // - What if a CPU algorithm would produce the same SoA? We can't // use cudaMallocHost without a GPU... - iEvent.emplace(digiPutToken_, nDigis_, pdigi_.get(), rawIdArr_.get(), adc_.get(), clus_.get()); - - pdigi_.reset(); - rawIdArr_.reset(); - adc_.reset(); - clus_.reset(); + iEvent.emplace( + digiPutToken_, state.nDigis, state.pdigi.get(), state.rawIdArr.get(), state.adc.get(), state.clus.get()); } // define as framework plugin diff --git a/src/cuda/plugin-Validation/HistoValidator.cc b/src/cuda/plugin-Validation/HistoValidator.cc index d7b11d4b2..90b18f2dc 100644 --- a/src/cuda/plugin-Validation/HistoValidator.cc +++ b/src/cuda/plugin-Validation/HistoValidator.cc @@ -15,15 +15,29 @@ #include #include -class HistoValidator : public edm::EDProducerExternalWork { +struct HistoValidator_AsyncState { + uint32_t nDigis; + uint32_t nModules; + uint32_t nClusters; + uint32_t nHits; + cms::cuda::host::unique_ptr adc; + cms::cuda::host::unique_ptr clusInModule; + cms::cuda::host::unique_ptr localCoord; + cms::cuda::host::unique_ptr globalCoord; + cms::cuda::host::unique_ptr charge; + cms::cuda::host::unique_ptr size; +}; + +class HistoValidator : public edm::EDProducerExternalWork { public: explicit HistoValidator(edm::ProductRegistry& reg); private: void acquire(const edm::Event& iEvent, const edm::EventSetup& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) override; - void produce(edm::Event& iEvent, const edm::EventSetup& iSetup) override; + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const override; + void produce(edm::Event& iEvent, const edm::EventSetup& iSetup, AsyncState& state) override; void endJob() override; edm::EDGetTokenT> digiToken_; @@ -32,17 +46,6 @@ class HistoValidator : public edm::EDProducerExternalWork { edm::EDGetTokenT trackToken_; edm::EDGetTokenT vertexToken_; - uint32_t nDigis; - uint32_t nModules; - uint32_t nClusters; - uint32_t nHits; - cms::cuda::host::unique_ptr h_adc; - cms::cuda::host::unique_ptr h_clusInModule; - cms::cuda::host::unique_ptr h_localCoord; - cms::cuda::host::unique_ptr h_globalCoord; - cms::cuda::host::unique_ptr h_charge; - cms::cuda::host::unique_ptr h_size; - static std::map histos; }; @@ -90,61 +93,65 @@ HistoValidator::HistoValidator(edm::ProductRegistry& reg) void HistoValidator::acquire(const edm::Event& iEvent, const edm::EventSetup& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) { + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const { auto const& pdigis = iEvent.get(digiToken_); cms::cuda::ScopedContextAcquire ctx{pdigis, std::move(waitingTaskHolder)}; auto const& digis = ctx.get(iEvent, digiToken_); auto const& clusters = ctx.get(iEvent, clusterToken_); auto const& hits = ctx.get(iEvent, hitToken_); - nDigis = digis.nDigis(); - nModules = digis.nModules(); - h_adc = digis.adcToHostAsync(ctx.stream()); - - nClusters = clusters.nClusters(); - h_clusInModule = cms::cuda::make_host_unique(nModules, ctx.stream()); - cudaCheck(cudaMemcpyAsync( - h_clusInModule.get(), clusters.clusInModule(), sizeof(uint32_t) * nModules, cudaMemcpyDefault, ctx.stream())); - - nHits = hits.nHits(); - h_localCoord = hits.localCoordToHostAsync(ctx.stream()); - h_globalCoord = hits.globalCoordToHostAsync(ctx.stream()); - h_charge = hits.chargeToHostAsync(ctx.stream()); - h_size = hits.sizeToHostAsync(ctx.stream()); + state.nDigis = digis.nDigis(); + state.nModules = digis.nModules(); + state.adc = digis.adcToHostAsync(ctx.stream()); + + state.nClusters = clusters.nClusters(); + state.clusInModule = cms::cuda::make_host_unique(state.nModules, ctx.stream()); + cudaCheck(cudaMemcpyAsync(state.clusInModule.get(), + clusters.clusInModule(), + sizeof(uint32_t) * state.nModules, + cudaMemcpyDefault, + ctx.stream())); + + state.nHits = hits.nHits(); + state.localCoord = hits.localCoordToHostAsync(ctx.stream()); + state.globalCoord = hits.globalCoordToHostAsync(ctx.stream()); + state.charge = hits.chargeToHostAsync(ctx.stream()); + state.size = hits.sizeToHostAsync(ctx.stream()); } -void HistoValidator::produce(edm::Event& iEvent, const edm::EventSetup& iSetup) { - histos["digi_n"].fill(nDigis); - for (uint32_t i = 0; i < nDigis; ++i) { - histos["digi_adc"].fill(h_adc[i]); +void HistoValidator::produce(edm::Event& iEvent, const edm::EventSetup& iSetup, AsyncState& state) { + histos["digi_n"].fill(state.nDigis); + for (uint32_t i = 0; i < state.nDigis; ++i) { + histos["digi_adc"].fill(state.adc[i]); } - h_adc.reset(); - histos["module_n"].fill(nModules); + //adc.reset(); + histos["module_n"].fill(state.nModules); - histos["cluster_n"].fill(nClusters); - for (uint32_t i = 0; i < nModules; ++i) { - histos["cluster_per_module_n"].fill(h_clusInModule[i]); + histos["cluster_n"].fill(state.nClusters); + for (uint32_t i = 0; i < state.nModules; ++i) { + histos["cluster_per_module_n"].fill(state.clusInModule[i]); } - h_clusInModule.reset(); - - histos["hit_n"].fill(nHits); - for (uint32_t i = 0; i < nHits; ++i) { - histos["hit_lx"].fill(h_localCoord[i]); - histos["hit_ly"].fill(h_localCoord[i + nHits]); - histos["hit_lex"].fill(h_localCoord[i + 2 * nHits]); - histos["hit_ley"].fill(h_localCoord[i + 3 * nHits]); - histos["hit_gx"].fill(h_globalCoord[i]); - histos["hit_gy"].fill(h_globalCoord[i + nHits]); - histos["hit_gz"].fill(h_globalCoord[i + 2 * nHits]); - histos["hit_gr"].fill(h_globalCoord[i + 3 * nHits]); - histos["hit_charge"].fill(h_charge[i]); - histos["hit_sizex"].fill(h_size[i]); - histos["hit_sizey"].fill(h_size[i + nHits]); + //clusInModule.reset(); + + histos["hit_n"].fill(state.nHits); + for (uint32_t i = 0; i < state.nHits; ++i) { + histos["hit_lx"].fill(state.localCoord[i]); + histos["hit_ly"].fill(state.localCoord[i + state.nHits]); + histos["hit_lex"].fill(state.localCoord[i + 2 * state.nHits]); + histos["hit_ley"].fill(state.localCoord[i + 3 * state.nHits]); + histos["hit_gx"].fill(state.globalCoord[i]); + histos["hit_gy"].fill(state.globalCoord[i + state.nHits]); + histos["hit_gz"].fill(state.globalCoord[i + 2 * state.nHits]); + histos["hit_gr"].fill(state.globalCoord[i + 3 * state.nHits]); + histos["hit_charge"].fill(state.charge[i]); + histos["hit_sizex"].fill(state.size[i]); + histos["hit_sizey"].fill(state.size[i + state.nHits]); } - h_localCoord.reset(); - h_globalCoord.reset(); - h_charge.reset(); - h_size.reset(); + //state.localCoord.reset(); + //state.globalCoord.reset(); + //state.charge.reset(); + //state.size.reset(); { auto const& tracks = iEvent.get(trackToken_);