From 64f574c8d25f013eba887d0862f894ecd89d041c Mon Sep 17 00:00:00 2001 From: Andrea Bocci Date: Thu, 3 Dec 2020 10:11:27 +0100 Subject: [PATCH 1/9] Add a doneWaiting overload without the exception argument --- src/fwtest/Framework/WaitingTaskHolder.h | 14 +++++++++----- .../Framework/WaitingTaskWithArenaHolder.cc | 18 +++++++++++++----- .../Framework/WaitingTaskWithArenaHolder.h | 6 ++++++ src/fwtest/bin/StreamSchedule.cc | 2 +- src/fwtest/plugin-Test2/TestProducer2.cc | 2 +- 5 files changed, 30 insertions(+), 12 deletions(-) diff --git a/src/fwtest/Framework/WaitingTaskHolder.h b/src/fwtest/Framework/WaitingTaskHolder.h index d74c5d8ed..9cc1b5c66 100644 --- a/src/fwtest/Framework/WaitingTaskHolder.h +++ b/src/fwtest/Framework/WaitingTaskHolder.h @@ -34,7 +34,7 @@ namespace edm { explicit WaitingTaskHolder(edm::WaitingTask* iTask) : m_task(iTask) { m_task->increment_ref_count(); } ~WaitingTaskHolder() { if (m_task) { - doneWaiting(std::exception_ptr{}); + doneWaiting(); } } @@ -66,10 +66,7 @@ namespace edm { } } - void doneWaiting(std::exception_ptr iExcept) { - if (iExcept) { - m_task->dependentTaskFailed(iExcept); - } + void doneWaiting() { //spawn can run the task before we finish // doneWaiting and some other thread might // try to reuse this object. Resetting @@ -81,6 +78,13 @@ namespace edm { } } + void doneWaiting(std::exception_ptr iExcept) { + if (iExcept) { + m_task->dependentTaskFailed(iExcept); + } + doneWaiting(); + } + private: // ---------- member data -------------------------------- WaitingTask* m_task; diff --git a/src/fwtest/Framework/WaitingTaskWithArenaHolder.cc b/src/fwtest/Framework/WaitingTaskWithArenaHolder.cc index 7c852d041..d7c88001e 100644 --- a/src/fwtest/Framework/WaitingTaskWithArenaHolder.cc +++ b/src/fwtest/Framework/WaitingTaskWithArenaHolder.cc @@ -24,7 +24,7 @@ namespace edm { WaitingTaskWithArenaHolder::~WaitingTaskWithArenaHolder() { if (m_task) { - doneWaiting(std::exception_ptr{}); + doneWaiting(); } } @@ -58,10 +58,7 @@ namespace edm { // into the correct arena of threads. Use of the arena allows doneWaiting // to be called from a thread outside the arena of threads that will manage // the task. doneWaiting can be called from a non-TBB thread. - void WaitingTaskWithArenaHolder::doneWaiting(std::exception_ptr iExcept) { - if (iExcept) { - m_task->dependentTaskFailed(iExcept); - } + void WaitingTaskWithArenaHolder::doneWaiting() { //enqueue can run the task before we finish // doneWaiting and some other thread might // try to reuse this object. Resetting @@ -75,6 +72,17 @@ namespace edm { } } + // This spawns the task. The arena is needed to get the task spawned + // into the correct arena of threads. Use of the arena allows doneWaiting + // to be called from a thread outside the arena of threads that will manage + // the task. doneWaiting can be called from a non-TBB thread. + void WaitingTaskWithArenaHolder::doneWaiting(std::exception_ptr iExcept) { + if (iExcept) { + m_task->dependentTaskFailed(iExcept); + } + doneWaiting(); + } + // This next function is useful if you know from the context that // m_arena (which is set when the constructor was executes) is the // same arena in which you want to execute the doneWaiting function. diff --git a/src/fwtest/Framework/WaitingTaskWithArenaHolder.h b/src/fwtest/Framework/WaitingTaskWithArenaHolder.h index 4b14febfb..bab615686 100644 --- a/src/fwtest/Framework/WaitingTaskWithArenaHolder.h +++ b/src/fwtest/Framework/WaitingTaskWithArenaHolder.h @@ -48,6 +48,12 @@ namespace edm { WaitingTaskWithArenaHolder& operator=(WaitingTaskWithArenaHolder&& iRHS); + // This spawns the task. The arena is needed to get the task spawned + // into the correct arena of threads. Use of the arena allows doneWaiting + // to be called from a thread outside the arena of threads that will manage + // the task. doneWaiting can be called from a non-TBB thread. + void doneWaiting(); + // This spawns the task. The arena is needed to get the task spawned // into the correct arena of threads. Use of the arena allows doneWaiting // to be called from a thread outside the arena of threads that will manage diff --git a/src/fwtest/bin/StreamSchedule.cc b/src/fwtest/bin/StreamSchedule.cc index 8636bce24..2f1360fef 100644 --- a/src/fwtest/bin/StreamSchedule.cc +++ b/src/fwtest/bin/StreamSchedule.cc @@ -82,7 +82,7 @@ namespace edm { (*iWorker)->doWorkAsync(*eventPtr, *eventSetup_, nextEventTask); } } else { - h.doneWaiting(std::exception_ptr{}); + h.doneWaiting(); } } diff --git a/src/fwtest/plugin-Test2/TestProducer2.cc b/src/fwtest/plugin-Test2/TestProducer2.cc index 6df7bdb0d..6f88839ae 100644 --- a/src/fwtest/plugin-Test2/TestProducer2.cc +++ b/src/fwtest/plugin-Test2/TestProducer2.cc @@ -39,7 +39,7 @@ void TestProducer2::acquire(edm::Event const& event, future_ = std::async([holder = std::move(holder)]() mutable { using namespace std::chrono_literals; std::this_thread::sleep_for(1s); - holder.doneWaiting(std::exception_ptr()); + holder.doneWaiting(); return 42; }); From 277830df3a2c4d0a8aabee7b0548c662666e4246 Mon Sep 17 00:00:00 2001 From: Andrea Bocci Date: Tue, 1 Dec 2020 15:43:39 +0100 Subject: [PATCH 2/9] Add an AsyncState to communicate between acquire() and produce() Add a template type parameter to the edm::EDProducerExternalWork class. The acquire() method is now const, and it communicates its asynchronous state to the the produce() method via an object of this type, rather than using data members. Implementations that do not need to pass any state between acquire() and produce() can use "void" or leave the template parameter empty. --- src/fwtest/Framework/EDProducer.h | 35 +++++++++++++++++++++++- src/fwtest/plugin-Test2/TestProducer2.cc | 27 +++++++++--------- 2 files changed, 48 insertions(+), 14 deletions(-) diff --git a/src/fwtest/Framework/EDProducer.h b/src/fwtest/Framework/EDProducer.h index 8160b8c96..1dea5fa28 100644 --- a/src/fwtest/Framework/EDProducer.h +++ b/src/fwtest/Framework/EDProducer.h @@ -27,7 +27,40 @@ namespace edm { private: }; + template class EDProducerExternalWork { + public: + using AsyncState = T; + + EDProducerExternalWork() = default; + virtual ~EDProducerExternalWork() = default; + + bool hasAcquire() const { return true; } + + void doAcquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { + acquire(event, eventSetup, std::move(holder), state_); + } + + virtual void acquire(Event const& event, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder, + AsyncState& state) const = 0; + + void doProduce(Event& event, EventSetup const& eventSetup) { + produce(event, eventSetup, state_); + } + + virtual void produce(Event& event, EventSetup const& eventSetup, AsyncState& state) = 0; + + void doEndJob() { endJob(); } + virtual void endJob() {} + + private: + AsyncState state_; + }; + + template <> + class EDProducerExternalWork { public: EDProducerExternalWork() = default; virtual ~EDProducerExternalWork() = default; @@ -40,7 +73,7 @@ namespace edm { void doProduce(Event& event, EventSetup const& eventSetup) { produce(event, eventSetup); } - virtual void acquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) = 0; + virtual void acquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) const = 0; virtual void produce(Event& event, EventSetup const& eventSetup) = 0; void doEndJob() { endJob(); } diff --git a/src/fwtest/plugin-Test2/TestProducer2.cc b/src/fwtest/plugin-Test2/TestProducer2.cc index 6f88839ae..5981218fe 100644 --- a/src/fwtest/plugin-Test2/TestProducer2.cc +++ b/src/fwtest/plugin-Test2/TestProducer2.cc @@ -12,31 +12,30 @@ namespace { std::atomic nevents = 0; } -class TestProducer2 : public edm::EDProducerExternalWork { +class TestProducer2 : public edm::EDProducerExternalWork> { public: explicit TestProducer2(edm::ProductRegistry& reg); private: - void acquire(edm::Event const& event, - edm::EventSetup const& eventSetup, - edm::WaitingTaskWithArenaHolder holder) override; - void produce(edm::Event& event, edm::EventSetup const& eventSetup) override; + AsyncState acquire(edm::Event const& event, + edm::EventSetup const& eventSetup, + edm::WaitingTaskWithArenaHolder holder) const override; + void produce(edm::Event& event, edm::EventSetup const& eventSetup, AsyncState&& state) override; void endJob() override; - edm::EDGetTokenT getToken_; - std::future future_; + const edm::EDGetTokenT getToken_; }; TestProducer2::TestProducer2(edm::ProductRegistry& reg) : getToken_(reg.consumes()) {} -void TestProducer2::acquire(edm::Event const& event, - edm::EventSetup const& eventSetup, - edm::WaitingTaskWithArenaHolder holder) { +TestProducer2::AsyncState TestProducer2::acquire(edm::Event const& event, + edm::EventSetup const& eventSetup, + edm::WaitingTaskWithArenaHolder holder) const { auto const value = event.get(getToken_); assert(value == static_cast(event.eventID() + 10 * event.streamID() + 100)); - future_ = std::async([holder = std::move(holder)]() mutable { + auto state = std::async([holder = std::move(holder)]() mutable { using namespace std::chrono_literals; std::this_thread::sleep_for(1s); holder.doneWaiting(); @@ -47,12 +46,14 @@ void TestProducer2::acquire(edm::Event const& event, std::cout << "TestProducer2::acquire Event " << event.eventID() << " stream " << event.streamID() << " value " << value << std::endl; #endif + + return state; } -void TestProducer2::produce(edm::Event& event, edm::EventSetup const& eventSetup) { +void TestProducer2::produce(edm::Event& event, edm::EventSetup const& eventSetup, AsyncState&& state) { #ifndef FWTEST_SILENT std::cout << "TestProducer2::produce Event " << event.eventID() << " stream " << event.streamID() << " from future " - << future_.get() << std::endl; + << state.get() << std::endl; #endif ++nevents; } From 5adaa3da58a96e93194a5e399bf58995dd4341ea Mon Sep 17 00:00:00 2001 From: Andrea Bocci Date: Mon, 7 Dec 2020 22:32:53 +0100 Subject: [PATCH 3/9] Implement simple support for event batching --- src/fwtest/Framework/EDProducer.h | 60 +++++++++++++++++++----- src/fwtest/Framework/Worker.cc | 5 +- src/fwtest/Framework/Worker.h | 20 ++++---- src/fwtest/bin/EventProcessor.cc | 5 +- src/fwtest/bin/EventProcessor.h | 3 +- src/fwtest/bin/PluginManager.cc | 2 +- src/fwtest/bin/Source.cc | 46 ++++++++++++------ src/fwtest/bin/Source.h | 7 ++- src/fwtest/bin/StreamSchedule.cc | 48 ++++++++++--------- src/fwtest/bin/StreamSchedule.h | 2 +- src/fwtest/bin/main.cc | 9 +++- src/fwtest/plugin-Test2/TestProducer2.cc | 22 ++++----- 12 files changed, 149 insertions(+), 80 deletions(-) diff --git a/src/fwtest/Framework/EDProducer.h b/src/fwtest/Framework/EDProducer.h index 1dea5fa28..3e639203c 100644 --- a/src/fwtest/Framework/EDProducer.h +++ b/src/fwtest/Framework/EDProducer.h @@ -1,6 +1,10 @@ #ifndef EDProducerBase_h #define EDProducerBase_h +#include +#include +#include + #include "Framework/WaitingTaskWithArenaHolder.h" namespace edm { @@ -14,9 +18,16 @@ namespace edm { bool hasAcquire() const { return false; } - void doAcquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {} + void doAcquire(std::vector const& events, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder) {} - void doProduce(Event& event, EventSetup const& eventSetup) { produce(event, eventSetup); } + void doProduce(std::vector const& events, EventSetup const& eventSetup) { + for (Event* event : events) { + assert(event); + produce(*event, eventSetup); + } + } virtual void produce(Event& event, EventSetup const& eventSetup) = 0; @@ -37,8 +48,17 @@ namespace edm { bool hasAcquire() const { return true; } - void doAcquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { - acquire(event, eventSetup, std::move(holder), state_); + void doAcquire(std::vector const& events, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder) { + if (events.size() > statesSize_) { + statesSize_ = events.size(); + states_ = std::make_unique(statesSize_); + } + for (size_t i = 0; i < events.size(); ++i) { + assert(events[i]); + acquire(*events[i], eventSetup, holder, states_[i]); + } } virtual void acquire(Event const& event, @@ -46,17 +66,22 @@ namespace edm { WaitingTaskWithArenaHolder holder, AsyncState& state) const = 0; - void doProduce(Event& event, EventSetup const& eventSetup) { - produce(event, eventSetup, state_); + void doProduce(std::vector const& events, EventSetup const& eventSetup) { + for (size_t i = 0; i < events.size(); ++i) { + assert(events[i]); + produce(*events[i], eventSetup, states_[i]); + } } virtual void produce(Event& event, EventSetup const& eventSetup, AsyncState& state) = 0; void doEndJob() { endJob(); } + virtual void endJob() {} private: - AsyncState state_; + size_t statesSize_ = 0; + std::unique_ptr states_; }; template <> @@ -67,20 +92,33 @@ namespace edm { bool hasAcquire() const { return true; } - void doAcquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { - acquire(event, eventSetup, std::move(holder)); + void doAcquire(std::vector const& events, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder) { + for (Event const* event : events) { + assert(event); + acquire(*event, eventSetup, holder); + } } - void doProduce(Event& event, EventSetup const& eventSetup) { produce(event, eventSetup); } - virtual void acquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) const = 0; + + void doProduce(std::vector const& events, EventSetup const& eventSetup) { + for (Event* event : events) { + assert(event); + produce(*event, eventSetup); + } + } + virtual void produce(Event& event, EventSetup const& eventSetup) = 0; void doEndJob() { endJob(); } + virtual void endJob() {} private: }; + } // namespace edm #endif diff --git a/src/fwtest/Framework/Worker.cc b/src/fwtest/Framework/Worker.cc index b3b3df74c..d7b3d4b01 100644 --- a/src/fwtest/Framework/Worker.cc +++ b/src/fwtest/Framework/Worker.cc @@ -1,7 +1,8 @@ +//#include #include "Framework/Worker.h" namespace edm { - void Worker::prefetchAsync(Event& event, EventSetup const& eventSetup, WaitingTask* iTask) { + void Worker::prefetchAsync(std::vector const& events, EventSetup const& eventSetup, WaitingTask* iTask) { //std::cout << "prefetchAsync for " << this << " iTask " << iTask << std::endl; bool expected = false; if (prefetchRequested_.compare_exchange_strong(expected, true)) { @@ -10,7 +11,7 @@ namespace edm { iTask->increment_ref_count(); for (Worker* dep : itemsToGet_) { //std::cout << "calling doWorkAsync for " << dep << " with " << iTask << std::endl; - dep->doWorkAsync(event, eventSetup, iTask); + dep->doWorkAsync(events, eventSetup, iTask); } auto count = iTask->decrement_ref_count(); diff --git a/src/fwtest/Framework/Worker.h b/src/fwtest/Framework/Worker.h index 0a03670a4..04489287e 100644 --- a/src/fwtest/Framework/Worker.h +++ b/src/fwtest/Framework/Worker.h @@ -2,8 +2,8 @@ #define Worker_h #include -#include //#include +#include #include "Framework/WaitingTask.h" #include "Framework/WaitingTaskHolder.h" @@ -23,10 +23,10 @@ namespace edm { void setItemsToGet(std::vector workers) { itemsToGet_ = std::move(workers); } // thread safe - void prefetchAsync(Event& event, EventSetup const& eventSetup, WaitingTask* iTask); + void prefetchAsync(std::vector const& events, EventSetup const& eventSetup, WaitingTask* iTask); // not thread safe - virtual void doWorkAsync(Event& event, EventSetup const& eventSetup, WaitingTask* iTask) = 0; + virtual void doWorkAsync(std::vector const& events, EventSetup const& eventSetup, WaitingTask* iTask) = 0; // not thread safe virtual void doEndJob() = 0; @@ -50,7 +50,7 @@ namespace edm { public: explicit WorkerT(ProductRegistry& reg) : producer_(reg) {} - void doWorkAsync(Event& event, EventSetup const& eventSetup, WaitingTask* iTask) override { + void doWorkAsync(std::vector const& events, EventSetup const& eventSetup, WaitingTask* iTask) override { waitingTasksWork_.add(iTask); //std::cout << "doWorkAsync for " << this << " with iTask " << iTask << std::endl; bool expected = false; @@ -58,14 +58,14 @@ namespace edm { //std::cout << "first doWorkAsync call" << std::endl; WaitingTask* moduleTask = make_waiting_task( - tbb::task::allocate_root(), [this, &event, &eventSetup](std::exception_ptr const* iPtr) mutable { + tbb::task::allocate_root(), [this, events, &eventSetup](std::exception_ptr const* iPtr) mutable { if (iPtr) { waitingTasksWork_.doneWaiting(*iPtr); } else { std::exception_ptr exceptionPtr; try { //std::cout << "calling doProduce " << this << std::endl; - producer_.doProduce(event, eventSetup); + producer_.doProduce(events, eventSetup); } catch (...) { exceptionPtr = std::current_exception(); } @@ -76,14 +76,16 @@ namespace edm { if (producer_.hasAcquire()) { WaitingTaskWithArenaHolder runProduceHolder{moduleTask}; moduleTask = make_waiting_task(tbb::task::allocate_root(), - [this, &event, &eventSetup, runProduceHolder = std::move(runProduceHolder)]( + [this, events, &eventSetup, runProduceHolder = std::move(runProduceHolder)]( std::exception_ptr const* iPtr) mutable { if (iPtr) { runProduceHolder.doneWaiting(*iPtr); } else { std::exception_ptr exceptionPtr; try { - producer_.doAcquire(event, eventSetup, runProduceHolder); + //auto const & const_events = reinterpret_cast const&>(events); + std::vector const_events(events.begin(), events.end()); + producer_.doAcquire(const_events, eventSetup, runProduceHolder); } catch (...) { exceptionPtr = std::current_exception(); } @@ -92,7 +94,7 @@ namespace edm { }); } //std::cout << "calling prefetchAsync " << this << " with moduleTask " << moduleTask << std::endl; - prefetchAsync(event, eventSetup, moduleTask); + prefetchAsync(events, eventSetup, moduleTask); } } diff --git a/src/fwtest/bin/EventProcessor.cc b/src/fwtest/bin/EventProcessor.cc index a50cc6570..e1157472c 100644 --- a/src/fwtest/bin/EventProcessor.cc +++ b/src/fwtest/bin/EventProcessor.cc @@ -6,13 +6,14 @@ #include "EventProcessor.h" namespace edm { - EventProcessor::EventProcessor(int maxEvents, + EventProcessor::EventProcessor(int batchEvents, + int maxEvents, int numberOfStreams, std::vector const& path, std::vector const& esproducers, std::filesystem::path const& datadir, bool validation) - : source_(maxEvents, registry_, datadir, validation) { + : source_(batchEvents, maxEvents, registry_, datadir, validation) { for (auto const& name : esproducers) { pluginManager_.load(name); auto esp = ESPluginFactory::create(name, datadir); diff --git a/src/fwtest/bin/EventProcessor.h b/src/fwtest/bin/EventProcessor.h index 614e60c38..bb5cbfc3e 100644 --- a/src/fwtest/bin/EventProcessor.h +++ b/src/fwtest/bin/EventProcessor.h @@ -14,7 +14,8 @@ namespace edm { class EventProcessor { public: - explicit EventProcessor(int maxEvents, + explicit EventProcessor(int batchEvents, + int maxEvents, int numberOfStreams, std::vector const& path, std::vector const& esproducers, diff --git a/src/fwtest/bin/PluginManager.cc b/src/fwtest/bin/PluginManager.cc index 7977cdbc2..ecb28032b 100644 --- a/src/fwtest/bin/PluginManager.cc +++ b/src/fwtest/bin/PluginManager.cc @@ -1,5 +1,5 @@ -#include #include +//#include #include "PluginManager.h" diff --git a/src/fwtest/bin/Source.cc b/src/fwtest/bin/Source.cc index 64d1a3b5c..3e98ba998 100644 --- a/src/fwtest/bin/Source.cc +++ b/src/fwtest/bin/Source.cc @@ -23,8 +23,13 @@ namespace { } // namespace namespace edm { - Source::Source(int maxEvents, ProductRegistry ®, std::filesystem::path const &datadir, bool validation) - : maxEvents_(maxEvents), numEvents_(0), rawToken_(reg.produces()), validation_(validation) { + Source::Source( + int batchEvents, int maxEvents, ProductRegistry ®, std::filesystem::path const &datadir, bool validation) + : batchEvents_(batchEvents), + maxEvents_(maxEvents), + numEvents_(0), + rawToken_(reg.produces()), + validation_(validation) { std::ifstream in_raw(datadir / "raw.bin", std::ios::binary); std::ifstream in_digiclusters; std::ifstream in_tracks; @@ -74,27 +79,38 @@ namespace edm { assert(raw_.size() == vertices_.size()); } + if (batchEvents_ < 1) { + batchEvents_ = 1; + } + if (maxEvents_ < 0) { maxEvents_ = raw_.size(); } } - std::unique_ptr Source::produce(int streamId, ProductRegistry const ®) { - const int old = numEvents_.fetch_add(1); - const int iev = old + 1; - if (old >= maxEvents_) { - return nullptr; + std::vector Source::produce(int streamId, ProductRegistry const ®) { + const int old = numEvents_.fetch_add(batchEvents_); + const int size = std::min(batchEvents_, maxEvents_ - old); + std::vector events; + if (size <= 0) { + return events; } - auto ev = std::make_unique(streamId, iev, reg); - const int index = old % raw_.size(); - ev->emplace(rawToken_, raw_[index]); - if (validation_) { - ev->emplace(digiClusterToken_, digiclusters_[index]); - ev->emplace(trackToken_, tracks_[index]); - ev->emplace(vertexToken_, vertices_[index]); + events.reserve(size); + for (int i = 1; i <= size; ++i) { + const int iev = old + i; + events.emplace_back(streamId, iev, reg); + auto ev = &events.back(); + const int index = (iev - 1) % raw_.size(); + + ev->emplace(rawToken_, raw_[index]); + if (validation_) { + ev->emplace(digiClusterToken_, digiclusters_[index]); + ev->emplace(trackToken_, tracks_[index]); + ev->emplace(vertexToken_, vertices_[index]); + } } - return ev; + return events; } } // namespace edm diff --git a/src/fwtest/bin/Source.h b/src/fwtest/bin/Source.h index c13534f33..21264b4db 100644 --- a/src/fwtest/bin/Source.h +++ b/src/fwtest/bin/Source.h @@ -5,6 +5,7 @@ #include #include #include +#include #include "Framework/Event.h" #include "DataFormats/FEDRawDataCollection.h" @@ -15,14 +16,16 @@ namespace edm { class Source { public: - explicit Source(int maxEvents, ProductRegistry& reg, std::filesystem::path const& datadir, bool validation); + explicit Source( + int batchEvents, int maxEvents, ProductRegistry& reg, std::filesystem::path const& datadir, bool validation); int maxEvents() const { return maxEvents_; } // thread safe - std::unique_ptr produce(int streamId, ProductRegistry const& reg); + std::vector produce(int streamId, ProductRegistry const& reg); private: + int batchEvents_; int maxEvents_; std::atomic numEvents_; EDPutTokenT const rawToken_; diff --git a/src/fwtest/bin/StreamSchedule.cc b/src/fwtest/bin/StreamSchedule.cc index 2f1360fef..3e3a0913d 100644 --- a/src/fwtest/bin/StreamSchedule.cc +++ b/src/fwtest/bin/StreamSchedule.cc @@ -1,4 +1,5 @@ -//#include +#include +#include #include @@ -44,7 +45,7 @@ namespace edm { void StreamSchedule::runToCompletionAsync(WaitingTaskHolder h) { auto task = - make_functor_task(tbb::task::allocate_root(), [this, h]() mutable { processOneEventAsync(std::move(h)); }); + make_functor_task(tbb::task::allocate_root(), [this, h]() mutable { processEventBatchAsync(std::move(h)); }); if (streamId_ == 0) { tbb::task::spawn(*task); } else { @@ -52,26 +53,27 @@ namespace edm { } } - void StreamSchedule::processOneEventAsync(WaitingTaskHolder h) { - auto event = source_->produce(streamId_, registry_); - if (event) { - // Pass the event object ownership to the "end-of-event" task - // Pass a non-owning pointer to the event to preceding tasks - //std::cout << "Begin processing event " << event->eventID() << std::endl; - auto eventPtr = event.get(); - auto nextEventTask = - make_waiting_task(tbb::task::allocate_root(), - [this, h = std::move(h), ev = std::move(event)](std::exception_ptr const* iPtr) mutable { - ev.reset(); - if (iPtr) { - h.doneWaiting(*iPtr); - } else { - for (auto const& worker : path_) { - worker->reset(); - } - processOneEventAsync(std::move(h)); - } - }); + void StreamSchedule::processEventBatchAsync(WaitingTaskHolder h) { + auto events = source_->produce(streamId_, registry_); + if (not events.empty()) { + // Pass the event batch ownership to the "end-of-event" task + // Pass non-owning pointers to the event to preceding tasks + //std::cout << "Begin processing a batch of " << events.size() << " events starting from " << events.front().eventID() << std::endl; + auto eventsPtr = std::vector(events.size(), nullptr); + std::transform(events.begin(), events.end(), eventsPtr.begin(), [](auto& event) { return &event; }); + auto nextEventTask = make_waiting_task( + tbb::task::allocate_root(), + [this, h = std::move(h), events = std::move(events)](std::exception_ptr const* iPtr) mutable { + events.clear(); + if (iPtr) { + h.doneWaiting(*iPtr); + } else { + for (auto const& worker : path_) { + worker->reset(); + } + processEventBatchAsync(std::move(h)); + } + }); // To guarantee that the nextEventTask is spawned also in // absence of Workers, and also to prevent spawning it before // all workers have been processed (should not happen though) @@ -79,7 +81,7 @@ namespace edm { for (auto iWorker = path_.rbegin(); iWorker != path_.rend(); ++iWorker) { //std::cout << "calling doWorkAsync for " << iWorker->get() << " with nextEventTask " << nextEventTask << std::endl; - (*iWorker)->doWorkAsync(*eventPtr, *eventSetup_, nextEventTask); + (*iWorker)->doWorkAsync(eventsPtr, *eventSetup_, nextEventTask); } } else { h.doneWaiting(); diff --git a/src/fwtest/bin/StreamSchedule.h b/src/fwtest/bin/StreamSchedule.h index 1bd364c70..a62dc3961 100644 --- a/src/fwtest/bin/StreamSchedule.h +++ b/src/fwtest/bin/StreamSchedule.h @@ -38,7 +38,7 @@ namespace edm { void endJob(); private: - void processOneEventAsync(WaitingTaskHolder h); + void processEventBatchAsync(WaitingTaskHolder h); ProductRegistry registry_; Source* source_; diff --git a/src/fwtest/bin/main.cc b/src/fwtest/bin/main.cc index b91b11d67..3a42cec38 100644 --- a/src/fwtest/bin/main.cc +++ b/src/fwtest/bin/main.cc @@ -18,7 +18,8 @@ namespace { "[--empty]\n\n" << "Options\n" << " --numberOfThreads Number of threads to use (default 1)\n" - << " --numberOfStreams Number of concurrent events (default 0=numberOfThreads)\n" + << " --numberOfStreams Number of concurrent batch of events (default 0=numberOfThreads)\n" + << " --batchEvents Number of events to process in a batch (default 1 for individual events)\n" << " --maxEvents Number of events to process (default -1 for all events in the input file)\n" << " --data Path to the 'data' directory (default 'data' in the directory of the executable)\n" << " --transfer Transfer results from GPU to CPU (default is to leave them on GPU)\n" @@ -33,6 +34,7 @@ int main(int argc, char** argv) { std::vector args(argv, argv + argc); int numberOfThreads = 1; int numberOfStreams = 0; + int batchEvents = 1; int maxEvents = -1; std::filesystem::path datadir; bool transfer = false; @@ -48,6 +50,9 @@ int main(int argc, char** argv) { } else if (*i == "--numberOfStreams") { ++i; numberOfStreams = std::stoi(*i); + } else if (*i == "--batchEvents") { + ++i; + batchEvents = std::stoul(*i); } else if (*i == "--maxEvents") { ++i; maxEvents = std::stoi(*i); @@ -89,7 +94,7 @@ int main(int argc, char** argv) { } } edm::EventProcessor processor( - maxEvents, numberOfStreams, std::move(edmodules), std::move(esmodules), datadir, validation); + batchEvents, maxEvents, numberOfStreams, std::move(edmodules), std::move(esmodules), datadir, validation); maxEvents = processor.maxEvents(); std::cout << "Processing " << maxEvents << " events, of which " << numberOfStreams << " concurrently, with " diff --git a/src/fwtest/plugin-Test2/TestProducer2.cc b/src/fwtest/plugin-Test2/TestProducer2.cc index 5981218fe..d0ca7fb4b 100644 --- a/src/fwtest/plugin-Test2/TestProducer2.cc +++ b/src/fwtest/plugin-Test2/TestProducer2.cc @@ -17,10 +17,11 @@ class TestProducer2 : public edm::EDProducerExternalWork> { explicit TestProducer2(edm::ProductRegistry& reg); private: - AsyncState acquire(edm::Event const& event, - edm::EventSetup const& eventSetup, - edm::WaitingTaskWithArenaHolder holder) const override; - void produce(edm::Event& event, edm::EventSetup const& eventSetup, AsyncState&& state) override; + void acquire(edm::Event const& event, + edm::EventSetup const& eventSetup, + edm::WaitingTaskWithArenaHolder holder, + AsyncState& state) const override; + void produce(edm::Event& event, edm::EventSetup const& eventSetup, AsyncState& state) override; void endJob() override; @@ -29,13 +30,14 @@ class TestProducer2 : public edm::EDProducerExternalWork> { TestProducer2::TestProducer2(edm::ProductRegistry& reg) : getToken_(reg.consumes()) {} -TestProducer2::AsyncState TestProducer2::acquire(edm::Event const& event, - edm::EventSetup const& eventSetup, - edm::WaitingTaskWithArenaHolder holder) const { +void TestProducer2::acquire(edm::Event const& event, + edm::EventSetup const& eventSetup, + edm::WaitingTaskWithArenaHolder holder, + AsyncState& state) const { auto const value = event.get(getToken_); assert(value == static_cast(event.eventID() + 10 * event.streamID() + 100)); - auto state = std::async([holder = std::move(holder)]() mutable { + state = std::async([holder = std::move(holder)]() mutable { using namespace std::chrono_literals; std::this_thread::sleep_for(1s); holder.doneWaiting(); @@ -46,11 +48,9 @@ TestProducer2::AsyncState TestProducer2::acquire(edm::Event const& event, std::cout << "TestProducer2::acquire Event " << event.eventID() << " stream " << event.streamID() << " value " << value << std::endl; #endif - - return state; } -void TestProducer2::produce(edm::Event& event, edm::EventSetup const& eventSetup, AsyncState&& state) { +void TestProducer2::produce(edm::Event& event, edm::EventSetup const& eventSetup, AsyncState& state) { #ifndef FWTEST_SILENT std::cout << "TestProducer2::produce Event " << event.eventID() << " stream " << event.streamID() << " from future " << state.get() << std::endl; From a8ce51877a09f9df3aee3d1c08429bcd4db5af78 Mon Sep 17 00:00:00 2001 From: Andrea Bocci Date: Mon, 7 Dec 2020 22:36:22 +0100 Subject: [PATCH 4/9] Add EDBatchingProducer and EDBatchingProducerExternalWork base classes --- src/fwtest/Framework/EDProducer.h | 86 +++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/src/fwtest/Framework/EDProducer.h b/src/fwtest/Framework/EDProducer.h index 3e639203c..e1b172bd4 100644 --- a/src/fwtest/Framework/EDProducer.h +++ b/src/fwtest/Framework/EDProducer.h @@ -38,6 +38,28 @@ namespace edm { private: }; + class EDBatchingProducer { + public: + EDBatchingProducer() = default; + virtual ~EDBatchingProducer() = default; + + bool hasAcquire() const { return false; } + + void doAcquire(std::vector const& events, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder) {} + + void doProduce(std::vector const& events, EventSetup const& eventSetup) { produce(events, eventSetup); } + + virtual void produce(std::vector const& events, EventSetup const& eventSetup) = 0; + + void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + }; + template class EDProducerExternalWork { public: @@ -119,6 +141,70 @@ namespace edm { private: }; + template + class EDBatchingProducerExternalWork { + public: + using AsyncState = T; + + EDBatchingProducerExternalWork() = default; + virtual ~EDBatchingProducerExternalWork() = default; + + bool hasAcquire() const { return true; } + + void doAcquire(std::vector const& events, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder) { + acquire(events, eventSetup, holder, state_); + } + + virtual void acquire(std::vector const& events, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder, + AsyncState& state) const = 0; + + void doProduce(std::vector const& events, EventSetup const& eventSetup) { + produce(events, eventSetup, state_); + } + + virtual void produce(std::vector const& events, EventSetup const& eventSetup, AsyncState& state) = 0; + + void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + AsyncState state_; + }; + + template <> + class EDBatchingProducerExternalWork { + public: + EDBatchingProducerExternalWork() = default; + virtual ~EDBatchingProducerExternalWork() = default; + + bool hasAcquire() const { return true; } + + void doAcquire(std::vector const& events, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder) { + acquire(events, eventSetup, holder); + } + + virtual void acquire(std::vector const& events, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder) const = 0; + + void doProduce(std::vector const& events, EventSetup const& eventSetup) { produce(events, eventSetup); } + + virtual void produce(std::vector const& events, EventSetup const& eventSetup) = 0; + + void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + }; + } // namespace edm #endif From 5fc49aa5a55e41dde94a39a0cdf6f9b54cdc59a6 Mon Sep 17 00:00:00 2001 From: Andrea Bocci Date: Mon, 7 Dec 2020 22:40:31 +0100 Subject: [PATCH 5/9] Implement tests for all kind of EDProducer Reorganise the existing EDProducer tests, and add tests for - EDBatchingProducer - EDBatchingProducerExternalWork --- src/fwtest/Makefile.deps | 4 +- src/fwtest/bin/main.cc | 3 +- .../IntESProducer.cc | 0 .../TestProducer.cc | 0 .../plugin-Test/TestProducerExternalWork.cc | 68 ++++++++++++++++ src/fwtest/plugin-Test2/TestProducer2.cc | 65 --------------- src/fwtest/plugin-Test2/TestProducer3.cc | 29 ------- .../TestBatchingProducer.cc | 31 +++++++ .../TestBatchingProducerExternalWork.cc | 80 +++++++++++++++++++ src/fwtest/plugins.txt | 9 ++- 10 files changed, 188 insertions(+), 101 deletions(-) rename src/fwtest/{plugin-Test1 => plugin-Test}/IntESProducer.cc (100%) rename src/fwtest/{plugin-Test1 => plugin-Test}/TestProducer.cc (100%) create mode 100644 src/fwtest/plugin-Test/TestProducerExternalWork.cc delete mode 100644 src/fwtest/plugin-Test2/TestProducer2.cc delete mode 100644 src/fwtest/plugin-Test2/TestProducer3.cc create mode 100644 src/fwtest/plugin-TestBatching/TestBatchingProducer.cc create mode 100644 src/fwtest/plugin-TestBatching/TestBatchingProducerExternalWork.cc diff --git a/src/fwtest/Makefile.deps b/src/fwtest/Makefile.deps index b531eb578..3c07a802b 100644 --- a/src/fwtest/Makefile.deps +++ b/src/fwtest/Makefile.deps @@ -1,3 +1,3 @@ fwtest_EXTERNAL_DEPENDS := TBB -Test1_DEPENDS := Framework DataFormats -Test2_DEPENDS := Framework +Test_DEPENDS := Framework DataFormats +TestBatching_DEPENDS := Framework diff --git a/src/fwtest/bin/main.cc b/src/fwtest/bin/main.cc index 3a42cec38..32572df99 100644 --- a/src/fwtest/bin/main.cc +++ b/src/fwtest/bin/main.cc @@ -87,7 +87,8 @@ int main(int argc, char** argv) { std::vector edmodules; std::vector esmodules; if (not empty) { - edmodules = {"TestProducer", "TestProducer3", "TestProducer2"}; + edmodules = { + "TestProducer", "TestProducerExternalWork", "TestBatchingProducer", "TestBatchingProducerExternalWork"}; esmodules = {"IntESProducer"}; if (transfer) { // add modules for transfer diff --git a/src/fwtest/plugin-Test1/IntESProducer.cc b/src/fwtest/plugin-Test/IntESProducer.cc similarity index 100% rename from src/fwtest/plugin-Test1/IntESProducer.cc rename to src/fwtest/plugin-Test/IntESProducer.cc diff --git a/src/fwtest/plugin-Test1/TestProducer.cc b/src/fwtest/plugin-Test/TestProducer.cc similarity index 100% rename from src/fwtest/plugin-Test1/TestProducer.cc rename to src/fwtest/plugin-Test/TestProducer.cc diff --git a/src/fwtest/plugin-Test/TestProducerExternalWork.cc b/src/fwtest/plugin-Test/TestProducerExternalWork.cc new file mode 100644 index 000000000..37ce351d3 --- /dev/null +++ b/src/fwtest/plugin-Test/TestProducerExternalWork.cc @@ -0,0 +1,68 @@ +#include +#include +#include +#include +#include + +#include "Framework/EDProducer.h" +#include "Framework/Event.h" +#include "Framework/PluginFactory.h" + +namespace { + std::atomic nevents = 0; +} + +using TestProducerExternalWorkAsyncState = std::future; + +class TestProducerExternalWork : public edm::EDProducerExternalWork { +public: + explicit TestProducerExternalWork(edm::ProductRegistry& reg); + +private: + void acquire(edm::Event const& event, + edm::EventSetup const& eventSetup, + edm::WaitingTaskWithArenaHolder holder, + AsyncState& state) const override; + void produce(edm::Event& event, edm::EventSetup const& eventSetup, AsyncState& state) override; + + void endJob() override; + + const edm::EDGetTokenT getToken_; +}; + +TestProducerExternalWork::TestProducerExternalWork(edm::ProductRegistry& reg) + : getToken_(reg.consumes()) {} + +void TestProducerExternalWork::acquire(edm::Event const& event, + edm::EventSetup const& eventSetup, + edm::WaitingTaskWithArenaHolder holder, + AsyncState& state) const { + auto const value = event.get(getToken_); + assert(value == static_cast(event.eventID() + 10 * event.streamID() + 100)); + + state = std::async([holder = std::move(holder)]() mutable { + using namespace std::chrono_literals; + std::this_thread::sleep_for(1s); + holder.doneWaiting(); + return 42; + }); + +#ifndef FWTEST_SILENT + std::cout << "TestProducerExternalWork::acquire Event " << event.eventID() << " stream " << event.streamID() + << " value " << value << std::endl; +#endif +} + +void TestProducerExternalWork::produce(edm::Event& event, edm::EventSetup const& eventSetup, AsyncState& state) { +#ifndef FWTEST_SILENT + std::cout << "TestProducerExternalWork::produce Event " << event.eventID() << " stream " << event.streamID() + << " from future " << state.get() << std::endl; +#endif + ++nevents; +} + +void TestProducerExternalWork::endJob() { + std::cout << "TestProducerExternalWork::endJob processed " << nevents.load() << " events" << std::endl; +} + +DEFINE_FWK_MODULE(TestProducerExternalWork); diff --git a/src/fwtest/plugin-Test2/TestProducer2.cc b/src/fwtest/plugin-Test2/TestProducer2.cc deleted file mode 100644 index d0ca7fb4b..000000000 --- a/src/fwtest/plugin-Test2/TestProducer2.cc +++ /dev/null @@ -1,65 +0,0 @@ -#include -#include -#include -#include -#include - -#include "Framework/EDProducer.h" -#include "Framework/Event.h" -#include "Framework/PluginFactory.h" - -namespace { - std::atomic nevents = 0; -} - -class TestProducer2 : public edm::EDProducerExternalWork> { -public: - explicit TestProducer2(edm::ProductRegistry& reg); - -private: - void acquire(edm::Event const& event, - edm::EventSetup const& eventSetup, - edm::WaitingTaskWithArenaHolder holder, - AsyncState& state) const override; - void produce(edm::Event& event, edm::EventSetup const& eventSetup, AsyncState& state) override; - - void endJob() override; - - const edm::EDGetTokenT getToken_; -}; - -TestProducer2::TestProducer2(edm::ProductRegistry& reg) : getToken_(reg.consumes()) {} - -void TestProducer2::acquire(edm::Event const& event, - edm::EventSetup const& eventSetup, - edm::WaitingTaskWithArenaHolder holder, - AsyncState& state) const { - auto const value = event.get(getToken_); - assert(value == static_cast(event.eventID() + 10 * event.streamID() + 100)); - - state = std::async([holder = std::move(holder)]() mutable { - using namespace std::chrono_literals; - std::this_thread::sleep_for(1s); - holder.doneWaiting(); - return 42; - }); - -#ifndef FWTEST_SILENT - std::cout << "TestProducer2::acquire Event " << event.eventID() << " stream " << event.streamID() << " value " - << value << std::endl; -#endif -} - -void TestProducer2::produce(edm::Event& event, edm::EventSetup const& eventSetup, AsyncState& state) { -#ifndef FWTEST_SILENT - std::cout << "TestProducer2::produce Event " << event.eventID() << " stream " << event.streamID() << " from future " - << state.get() << std::endl; -#endif - ++nevents; -} - -void TestProducer2::endJob() { - std::cout << "TestProducer2::endJob processed " << nevents.load() << " events" << std::endl; -} - -DEFINE_FWK_MODULE(TestProducer2); diff --git a/src/fwtest/plugin-Test2/TestProducer3.cc b/src/fwtest/plugin-Test2/TestProducer3.cc deleted file mode 100644 index ef8354058..000000000 --- a/src/fwtest/plugin-Test2/TestProducer3.cc +++ /dev/null @@ -1,29 +0,0 @@ -#include -#include -#include - -#include "Framework/EDProducer.h" -#include "Framework/Event.h" -#include "Framework/PluginFactory.h" - -class TestProducer3 : public edm::EDProducer { -public: - explicit TestProducer3(edm::ProductRegistry& reg); - -private: - void produce(edm::Event& event, edm::EventSetup const& eventSetup) override; - - edm::EDGetTokenT getToken_; -}; - -TestProducer3::TestProducer3(edm::ProductRegistry& reg) : getToken_(reg.consumes()) {} - -void TestProducer3::produce(edm::Event& event, edm::EventSetup const& eventSetup) { - auto const value = event.get(getToken_); -#ifndef FWTEST_SILENT - std::cout << "TestProducer3 Event " << event.eventID() << " stream " << event.streamID() << " value " << value - << std::endl; -#endif -} - -DEFINE_FWK_MODULE(TestProducer3); diff --git a/src/fwtest/plugin-TestBatching/TestBatchingProducer.cc b/src/fwtest/plugin-TestBatching/TestBatchingProducer.cc new file mode 100644 index 000000000..380319d1b --- /dev/null +++ b/src/fwtest/plugin-TestBatching/TestBatchingProducer.cc @@ -0,0 +1,31 @@ +#include +#include +#include + +#include "Framework/EDProducer.h" +#include "Framework/Event.h" +#include "Framework/PluginFactory.h" + +class TestBatchingProducer : public edm::EDBatchingProducer { +public: + explicit TestBatchingProducer(edm::ProductRegistry& reg); + +private: + void produce(std::vector const& events, edm::EventSetup const& eventSetup) override; + + edm::EDGetTokenT getToken_; +}; + +TestBatchingProducer::TestBatchingProducer(edm::ProductRegistry& reg) : getToken_(reg.consumes()) {} + +void TestBatchingProducer::produce(std::vector const& events, edm::EventSetup const& eventSetup) { + for (edm::Event* event : events) { + auto const value = event->get(getToken_); +#ifndef FWTEST_SILENT + std::cout << "TestBatchingProducer Event " << event->eventID() << " stream " << event->streamID() << " value " + << value << std::endl; +#endif + } +} + +DEFINE_FWK_MODULE(TestBatchingProducer); diff --git a/src/fwtest/plugin-TestBatching/TestBatchingProducerExternalWork.cc b/src/fwtest/plugin-TestBatching/TestBatchingProducerExternalWork.cc new file mode 100644 index 000000000..964ed0993 --- /dev/null +++ b/src/fwtest/plugin-TestBatching/TestBatchingProducerExternalWork.cc @@ -0,0 +1,80 @@ +#include +#include +#include +#include +#include +#include + +#include "Framework/EDProducer.h" +#include "Framework/Event.h" +#include "Framework/PluginFactory.h" + +namespace { + std::atomic nevents = 0; +} + +// test using an std::map instead of a simpler std::vector +using TestBatchingProducerExternalWorkAsyncState = std::map>; + +class TestBatchingProducerExternalWork + : public edm::EDBatchingProducerExternalWork { +public: + explicit TestBatchingProducerExternalWork(edm::ProductRegistry& reg); + +private: + void acquire(std::vector const& events, + edm::EventSetup const& eventSetup, + edm::WaitingTaskWithArenaHolder holder, + AsyncState& state) const override; + void produce(std::vector const& events, edm::EventSetup const& eventSetup, AsyncState& state) override; + + void endJob() override; + + const edm::EDGetTokenT getToken_; +}; + +TestBatchingProducerExternalWork::TestBatchingProducerExternalWork(edm::ProductRegistry& reg) + : getToken_(reg.consumes()) {} + +void TestBatchingProducerExternalWork::acquire(std::vector const& events, + edm::EventSetup const& eventSetup, + edm::WaitingTaskWithArenaHolder holder, + AsyncState& state) const { + for (edm::Event const* event : events) { + assert(event); + auto const value = event->get(getToken_); + assert(value == static_cast(event->eventID() + 10 * event->streamID() + 100)); + + // cannot move form the holder as it is used more than once + state[event->eventID()] = std::async([holder]() mutable { + using namespace std::chrono_literals; + std::this_thread::sleep_for(1s); + holder.doneWaiting(); + return 42; + }); + +#ifndef FWTEST_SILENT + std::cout << "TestBatchingProducerExternalWork::acquire Event " << event->eventID() << " stream " + << event->streamID() << " value " << value << std::endl; +#endif + } +} + +void TestBatchingProducerExternalWork::produce(std::vector const& events, + edm::EventSetup const& eventSetup, + AsyncState& state) { +#ifndef FWTEST_SILENT + for (edm::Event* event : events) { + assert(event); + std::cout << "TestBatchingProducerExternalWork::produce Event " << event->eventID() << " stream " + << event->streamID() << " from future " << state[event->eventID()].get() << std::endl; + } +#endif + ++nevents; +} + +void TestBatchingProducerExternalWork::endJob() { + std::cout << "TestBatchingProducerExternalWork::endJob processed " << nevents.load() << " events" << std::endl; +} + +DEFINE_FWK_MODULE(TestBatchingProducerExternalWork); diff --git a/src/fwtest/plugins.txt b/src/fwtest/plugins.txt index 42f92a9a4..f01f94ac3 100644 --- a/src/fwtest/plugins.txt +++ b/src/fwtest/plugins.txt @@ -1,4 +1,5 @@ -IntESProducer pluginTest1.so -TestProducer pluginTest1.so -TestProducer2 pluginTest2.so -TestProducer3 pluginTest2.so +IntESProducer pluginTest.so +TestProducer pluginTest.so +TestProducerExternalWork pluginTest.so +TestBatchingProducer pluginTestBatching.so +TestBatchingProducerExternalWork pluginTestBatching.so From 424f889a05d77ae5b91661c6d5cd64efcc15a3eb Mon Sep 17 00:00:00 2001 From: Andrea Bocci Date: Mon, 7 Dec 2020 22:52:34 +0100 Subject: [PATCH 6/9] Wrap ownership of and access to event batches Introduce three new edm classes: - edm::EventBatch implements the ownership of a batch of events, implemented as an std::vector; used by the Source to provide events to the StreamSchedule. - edm::EventRange implements non-owning, non-const access to a batch of events; wraps a pair of Event* to the begin and end of an event batch; passed (by value) to doProduce() and produce(). - edm::ConstEventRange implements non-owning const access to a batch of events; wraps a pair of Event* to the begin and end of an event batch; passed (by value) to doAcquire() and acquire(). --- src/fwtest/Framework/EDProducer.h | 72 +++++--------- src/fwtest/Framework/EventBatch.h | 36 +++++++ src/fwtest/Framework/EventRange.h | 96 +++++++++++++++++++ src/fwtest/Framework/Worker.cc | 2 +- src/fwtest/Framework/Worker.h | 40 ++++---- src/fwtest/bin/Source.cc | 15 ++- src/fwtest/bin/Source.h | 3 +- src/fwtest/bin/StreamSchedule.cc | 9 +- .../TestBatchingProducer.cc | 11 ++- .../TestBatchingProducerExternalWork.cc | 29 +++--- 10 files changed, 212 insertions(+), 101 deletions(-) create mode 100644 src/fwtest/Framework/EventBatch.h create mode 100644 src/fwtest/Framework/EventRange.h diff --git a/src/fwtest/Framework/EDProducer.h b/src/fwtest/Framework/EDProducer.h index e1b172bd4..1cc63f58d 100644 --- a/src/fwtest/Framework/EDProducer.h +++ b/src/fwtest/Framework/EDProducer.h @@ -1,10 +1,9 @@ #ifndef EDProducerBase_h #define EDProducerBase_h -#include #include -#include +#include "Framework/EventRange.h" #include "Framework/WaitingTaskWithArenaHolder.h" namespace edm { @@ -18,14 +17,11 @@ namespace edm { bool hasAcquire() const { return false; } - void doAcquire(std::vector const& events, - EventSetup const& eventSetup, - WaitingTaskWithArenaHolder holder) {} + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {} - void doProduce(std::vector const& events, EventSetup const& eventSetup) { - for (Event* event : events) { - assert(event); - produce(*event, eventSetup); + void doProduce(EventRange events, EventSetup const& eventSetup) { + for (Event& event : events) { + produce(event, eventSetup); } } @@ -45,13 +41,11 @@ namespace edm { bool hasAcquire() const { return false; } - void doAcquire(std::vector const& events, - EventSetup const& eventSetup, - WaitingTaskWithArenaHolder holder) {} + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {} - void doProduce(std::vector const& events, EventSetup const& eventSetup) { produce(events, eventSetup); } + void doProduce(EventRange events, EventSetup const& eventSetup) { produce(events, eventSetup); } - virtual void produce(std::vector const& events, EventSetup const& eventSetup) = 0; + virtual void produce(EventRange events, EventSetup const& eventSetup) = 0; void doEndJob() { endJob(); } @@ -70,16 +64,13 @@ namespace edm { bool hasAcquire() const { return true; } - void doAcquire(std::vector const& events, - EventSetup const& eventSetup, - WaitingTaskWithArenaHolder holder) { + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { if (events.size() > statesSize_) { statesSize_ = events.size(); states_ = std::make_unique(statesSize_); } for (size_t i = 0; i < events.size(); ++i) { - assert(events[i]); - acquire(*events[i], eventSetup, holder, states_[i]); + acquire(events[i], eventSetup, holder, states_[i]); } } @@ -88,10 +79,9 @@ namespace edm { WaitingTaskWithArenaHolder holder, AsyncState& state) const = 0; - void doProduce(std::vector const& events, EventSetup const& eventSetup) { + void doProduce(EventRange events, EventSetup const& eventSetup) { for (size_t i = 0; i < events.size(); ++i) { - assert(events[i]); - produce(*events[i], eventSetup, states_[i]); + produce(events[i], eventSetup, states_[i]); } } @@ -114,21 +104,17 @@ namespace edm { bool hasAcquire() const { return true; } - void doAcquire(std::vector const& events, - EventSetup const& eventSetup, - WaitingTaskWithArenaHolder holder) { - for (Event const* event : events) { - assert(event); - acquire(*event, eventSetup, holder); + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { + for (Event const& event : events) { + acquire(event, eventSetup, holder); } } virtual void acquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) const = 0; - void doProduce(std::vector const& events, EventSetup const& eventSetup) { - for (Event* event : events) { - assert(event); - produce(*event, eventSetup); + void doProduce(EventRange events, EventSetup const& eventSetup) { + for (Event& event : events) { + produce(event, eventSetup); } } @@ -151,22 +137,18 @@ namespace edm { bool hasAcquire() const { return true; } - void doAcquire(std::vector const& events, - EventSetup const& eventSetup, - WaitingTaskWithArenaHolder holder) { + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { acquire(events, eventSetup, holder, state_); } - virtual void acquire(std::vector const& events, + virtual void acquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder, AsyncState& state) const = 0; - void doProduce(std::vector const& events, EventSetup const& eventSetup) { - produce(events, eventSetup, state_); - } + void doProduce(EventRange events, EventSetup const& eventSetup) { produce(events, eventSetup, state_); } - virtual void produce(std::vector const& events, EventSetup const& eventSetup, AsyncState& state) = 0; + virtual void produce(EventRange events, EventSetup const& eventSetup, AsyncState& states) = 0; void doEndJob() { endJob(); } @@ -184,19 +166,17 @@ namespace edm { bool hasAcquire() const { return true; } - void doAcquire(std::vector const& events, - EventSetup const& eventSetup, - WaitingTaskWithArenaHolder holder) { + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { acquire(events, eventSetup, holder); } - virtual void acquire(std::vector const& events, + virtual void acquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) const = 0; - void doProduce(std::vector const& events, EventSetup const& eventSetup) { produce(events, eventSetup); } + void doProduce(EventRange events, EventSetup const& eventSetup) { produce(events, eventSetup); } - virtual void produce(std::vector const& events, EventSetup const& eventSetup) = 0; + virtual void produce(EventRange events, EventSetup const& eventSetup) = 0; void doEndJob() { endJob(); } diff --git a/src/fwtest/Framework/EventBatch.h b/src/fwtest/Framework/EventBatch.h new file mode 100644 index 000000000..3632c41ea --- /dev/null +++ b/src/fwtest/Framework/EventBatch.h @@ -0,0 +1,36 @@ +#ifndef EventBatch_h +#define EventBatch_h + +#include + +#include "Framework/Event.h" +#include "Framework/EventRange.h" + +namespace edm { + + class EventBatch { + public: + Event& emplace(int streamId, int eventId, ProductRegistry const& reg) { + events_.emplace_back(streamId, eventId, reg); + return events_.back(); + } + + void reserve(size_t capacity) { events_.reserve(capacity); } + + void clear() { events_.clear(); } + + size_t size() const { return events_.size(); } + + bool empty() const { return events_.empty(); } + + EventRange range() { return EventRange(&events_.front(), &events_.back() + 1); } + + ConstEventRange range() const { return ConstEventRange(&events_.front(), &events_.back() + 1); } + + private: + std::vector events_; + }; + +} // namespace edm + +#endif // EventBatch_h diff --git a/src/fwtest/Framework/EventRange.h b/src/fwtest/Framework/EventRange.h new file mode 100644 index 000000000..9a9cca645 --- /dev/null +++ b/src/fwtest/Framework/EventRange.h @@ -0,0 +1,96 @@ +#ifndef EventRange_h +#define EventRange_h + +#include +#include +#include + +#include "Framework/Event.h" + +namespace edm { + + class EventRange { + public: + EventRange(Event* begin, Event* end) : begin_(begin), end_(end) { + assert(begin_); + assert(end_); + assert(end_ >= begin_); + } + + Event* begin() { return begin_; } + + Event const* begin() const { return begin_; } + + Event* end() { return end_; } + + Event const* end() const { return end_; } + + size_t size() const { return end_ - begin_; } + + bool empty() const { return end_ == begin_; } + + Event& at(size_t index) { + if (index >= size()) { + std::stringstream msg; + msg << "EventRange::at() range check failed: index " << index << " is outside the range [0.." << size() << ")"; + throw std::out_of_range(msg.str()); + } + return begin_[index]; + } + + Event const& at(size_t index) const { + if (index >= size()) { + std::stringstream msg; + msg << "EventRange::at() range check failed: index " << index << " is outside the range [0.." << size() << ")"; + throw std::out_of_range(msg.str()); + } + return begin_[index]; + } + + Event& operator[](size_t index) { return begin_[index]; } + + Event const& operator[](size_t index) const { return begin_[index]; } + + private: + Event* begin_; + Event* end_; + }; + + class ConstEventRange { + public: + ConstEventRange(Event const* begin, Event const* end) : begin_(begin), end_(end) { + assert(begin_); + assert(end_); + assert(end_ >= begin_); + } + + ConstEventRange(EventRange range) : begin_(range.begin()), end_(range.end()) {} + + Event const* begin() const { return begin_; } + + Event const* end() const { return end_; } + + size_t size() const { return end_ - begin_; } + + bool empty() const { return end_ == begin_; } + + Event const& at(size_t index) { + if (index >= size()) { + std::stringstream msg; + msg << "ConstEventRange::at() range check failed: index " << index << " is outside the range [0.." << size() + << ")"; + throw std::out_of_range(msg.str()); + } + return begin_[index]; + } + + Event const& operator[](size_t index) const { return begin_[index]; } + + private: + Event const* begin_; + Event const* end_; + }; + +} // namespace edm + +#endif // EventRange_h diff --git a/src/fwtest/Framework/Worker.cc b/src/fwtest/Framework/Worker.cc index d7b3d4b01..9c1c5c3c4 100644 --- a/src/fwtest/Framework/Worker.cc +++ b/src/fwtest/Framework/Worker.cc @@ -2,7 +2,7 @@ #include "Framework/Worker.h" namespace edm { - void Worker::prefetchAsync(std::vector const& events, EventSetup const& eventSetup, WaitingTask* iTask) { + void Worker::prefetchAsync(EventRange events, EventSetup const& eventSetup, WaitingTask* iTask) { //std::cout << "prefetchAsync for " << this << " iTask " << iTask << std::endl; bool expected = false; if (prefetchRequested_.compare_exchange_strong(expected, true)) { diff --git a/src/fwtest/Framework/Worker.h b/src/fwtest/Framework/Worker.h index 04489287e..26bdfed0b 100644 --- a/src/fwtest/Framework/Worker.h +++ b/src/fwtest/Framework/Worker.h @@ -5,6 +5,7 @@ //#include #include +#include "Framework/EventRange.h" #include "Framework/WaitingTask.h" #include "Framework/WaitingTaskHolder.h" #include "Framework/WaitingTaskList.h" @@ -23,10 +24,10 @@ namespace edm { void setItemsToGet(std::vector workers) { itemsToGet_ = std::move(workers); } // thread safe - void prefetchAsync(std::vector const& events, EventSetup const& eventSetup, WaitingTask* iTask); + void prefetchAsync(EventRange events, EventSetup const& eventSetup, WaitingTask* iTask); // not thread safe - virtual void doWorkAsync(std::vector const& events, EventSetup const& eventSetup, WaitingTask* iTask) = 0; + virtual void doWorkAsync(EventRange events, EventSetup const& eventSetup, WaitingTask* iTask) = 0; // not thread safe virtual void doEndJob() = 0; @@ -50,7 +51,7 @@ namespace edm { public: explicit WorkerT(ProductRegistry& reg) : producer_(reg) {} - void doWorkAsync(std::vector const& events, EventSetup const& eventSetup, WaitingTask* iTask) override { + void doWorkAsync(EventRange events, EventSetup const& eventSetup, WaitingTask* iTask) override { waitingTasksWork_.add(iTask); //std::cout << "doWorkAsync for " << this << " with iTask " << iTask << std::endl; bool expected = false; @@ -75,23 +76,22 @@ namespace edm { }); if (producer_.hasAcquire()) { WaitingTaskWithArenaHolder runProduceHolder{moduleTask}; - moduleTask = make_waiting_task(tbb::task::allocate_root(), - [this, events, &eventSetup, runProduceHolder = std::move(runProduceHolder)]( - std::exception_ptr const* iPtr) mutable { - if (iPtr) { - runProduceHolder.doneWaiting(*iPtr); - } else { - std::exception_ptr exceptionPtr; - try { - //auto const & const_events = reinterpret_cast const&>(events); - std::vector const_events(events.begin(), events.end()); - producer_.doAcquire(const_events, eventSetup, runProduceHolder); - } catch (...) { - exceptionPtr = std::current_exception(); - } - runProduceHolder.doneWaiting(exceptionPtr); - } - }); + moduleTask = make_waiting_task( + tbb::task::allocate_root(), + [this, events = ConstEventRange(events), &eventSetup, runProduceHolder = std::move(runProduceHolder)]( + std::exception_ptr const* iPtr) mutable { + if (iPtr) { + runProduceHolder.doneWaiting(*iPtr); + } else { + std::exception_ptr exceptionPtr; + try { + producer_.doAcquire(events, eventSetup, runProduceHolder); + } catch (...) { + exceptionPtr = std::current_exception(); + } + runProduceHolder.doneWaiting(exceptionPtr); + } + }); } //std::cout << "calling prefetchAsync " << this << " with moduleTask " << moduleTask << std::endl; prefetchAsync(events, eventSetup, moduleTask); diff --git a/src/fwtest/bin/Source.cc b/src/fwtest/bin/Source.cc index 3e98ba998..860f1dbc1 100644 --- a/src/fwtest/bin/Source.cc +++ b/src/fwtest/bin/Source.cc @@ -88,10 +88,10 @@ namespace edm { } } - std::vector Source::produce(int streamId, ProductRegistry const ®) { + EventBatch Source::produce(int streamId, ProductRegistry const ®) { const int old = numEvents_.fetch_add(batchEvents_); const int size = std::min(batchEvents_, maxEvents_ - old); - std::vector events; + EventBatch events; if (size <= 0) { return events; } @@ -99,15 +99,14 @@ namespace edm { events.reserve(size); for (int i = 1; i <= size; ++i) { const int iev = old + i; - events.emplace_back(streamId, iev, reg); - auto ev = &events.back(); + Event &event = events.emplace(streamId, iev, reg); const int index = (iev - 1) % raw_.size(); - ev->emplace(rawToken_, raw_[index]); + event.emplace(rawToken_, raw_[index]); if (validation_) { - ev->emplace(digiClusterToken_, digiclusters_[index]); - ev->emplace(trackToken_, tracks_[index]); - ev->emplace(vertexToken_, vertices_[index]); + event.emplace(digiClusterToken_, digiclusters_[index]); + event.emplace(trackToken_, tracks_[index]); + event.emplace(vertexToken_, vertices_[index]); } } diff --git a/src/fwtest/bin/Source.h b/src/fwtest/bin/Source.h index 21264b4db..a3c99798d 100644 --- a/src/fwtest/bin/Source.h +++ b/src/fwtest/bin/Source.h @@ -8,6 +8,7 @@ #include #include "Framework/Event.h" +#include "Framework/EventBatch.h" #include "DataFormats/FEDRawDataCollection.h" #include "DataFormats/DigiClusterCount.h" #include "DataFormats/TrackCount.h" @@ -22,7 +23,7 @@ namespace edm { int maxEvents() const { return maxEvents_; } // thread safe - std::vector produce(int streamId, ProductRegistry const& reg); + EventBatch produce(int streamId, ProductRegistry const& reg); private: int batchEvents_; diff --git a/src/fwtest/bin/StreamSchedule.cc b/src/fwtest/bin/StreamSchedule.cc index 3e3a0913d..1f34f0f58 100644 --- a/src/fwtest/bin/StreamSchedule.cc +++ b/src/fwtest/bin/StreamSchedule.cc @@ -57,10 +57,9 @@ namespace edm { auto events = source_->produce(streamId_, registry_); if (not events.empty()) { // Pass the event batch ownership to the "end-of-event" task - // Pass non-owning pointers to the event to preceding tasks - //std::cout << "Begin processing a batch of " << events.size() << " events starting from " << events.front().eventID() << std::endl; - auto eventsPtr = std::vector(events.size(), nullptr); - std::transform(events.begin(), events.end(), eventsPtr.begin(), [](auto& event) { return &event; }); + // Pass a non-owning event range to the preceding tasks + //std::cout << "Begin processing a batch of " << events.size() << " events starting from " << events.range().at(0).eventID() << std::endl; + auto eventsRange = events.range(); auto nextEventTask = make_waiting_task( tbb::task::allocate_root(), [this, h = std::move(h), events = std::move(events)](std::exception_ptr const* iPtr) mutable { @@ -81,7 +80,7 @@ namespace edm { for (auto iWorker = path_.rbegin(); iWorker != path_.rend(); ++iWorker) { //std::cout << "calling doWorkAsync for " << iWorker->get() << " with nextEventTask " << nextEventTask << std::endl; - (*iWorker)->doWorkAsync(eventsPtr, *eventSetup_, nextEventTask); + (*iWorker)->doWorkAsync(eventsRange, *eventSetup_, nextEventTask); } } else { h.doneWaiting(); diff --git a/src/fwtest/plugin-TestBatching/TestBatchingProducer.cc b/src/fwtest/plugin-TestBatching/TestBatchingProducer.cc index 380319d1b..485929c0f 100644 --- a/src/fwtest/plugin-TestBatching/TestBatchingProducer.cc +++ b/src/fwtest/plugin-TestBatching/TestBatchingProducer.cc @@ -4,6 +4,7 @@ #include "Framework/EDProducer.h" #include "Framework/Event.h" +#include "Framework/EventRange.h" #include "Framework/PluginFactory.h" class TestBatchingProducer : public edm::EDBatchingProducer { @@ -11,18 +12,18 @@ class TestBatchingProducer : public edm::EDBatchingProducer { explicit TestBatchingProducer(edm::ProductRegistry& reg); private: - void produce(std::vector const& events, edm::EventSetup const& eventSetup) override; + void produce(edm::EventRange events, edm::EventSetup const& eventSetup) override; edm::EDGetTokenT getToken_; }; TestBatchingProducer::TestBatchingProducer(edm::ProductRegistry& reg) : getToken_(reg.consumes()) {} -void TestBatchingProducer::produce(std::vector const& events, edm::EventSetup const& eventSetup) { - for (edm::Event* event : events) { - auto const value = event->get(getToken_); +void TestBatchingProducer::produce(edm::EventRange events, edm::EventSetup const& eventSetup) { + for (edm::Event& event : events) { + auto const value = event.get(getToken_); #ifndef FWTEST_SILENT - std::cout << "TestBatchingProducer Event " << event->eventID() << " stream " << event->streamID() << " value " + std::cout << "TestBatchingProducer Event " << event.eventID() << " stream " << event.streamID() << " value " << value << std::endl; #endif } diff --git a/src/fwtest/plugin-TestBatching/TestBatchingProducerExternalWork.cc b/src/fwtest/plugin-TestBatching/TestBatchingProducerExternalWork.cc index 964ed0993..f76d87417 100644 --- a/src/fwtest/plugin-TestBatching/TestBatchingProducerExternalWork.cc +++ b/src/fwtest/plugin-TestBatching/TestBatchingProducerExternalWork.cc @@ -7,6 +7,7 @@ #include "Framework/EDProducer.h" #include "Framework/Event.h" +#include "Framework/EventRange.h" #include "Framework/PluginFactory.h" namespace { @@ -22,11 +23,11 @@ class TestBatchingProducerExternalWork explicit TestBatchingProducerExternalWork(edm::ProductRegistry& reg); private: - void acquire(std::vector const& events, + void acquire(edm::ConstEventRange events, edm::EventSetup const& eventSetup, edm::WaitingTaskWithArenaHolder holder, AsyncState& state) const override; - void produce(std::vector const& events, edm::EventSetup const& eventSetup, AsyncState& state) override; + void produce(edm::EventRange events, edm::EventSetup const& eventSetup, AsyncState& state) override; void endJob() override; @@ -36,17 +37,16 @@ class TestBatchingProducerExternalWork TestBatchingProducerExternalWork::TestBatchingProducerExternalWork(edm::ProductRegistry& reg) : getToken_(reg.consumes()) {} -void TestBatchingProducerExternalWork::acquire(std::vector const& events, +void TestBatchingProducerExternalWork::acquire(edm::ConstEventRange events, edm::EventSetup const& eventSetup, edm::WaitingTaskWithArenaHolder holder, AsyncState& state) const { - for (edm::Event const* event : events) { - assert(event); - auto const value = event->get(getToken_); - assert(value == static_cast(event->eventID() + 10 * event->streamID() + 100)); + for (edm::Event const& event : events) { + auto const value = event.get(getToken_); + assert(value == static_cast(event.eventID() + 10 * event.streamID() + 100)); // cannot move form the holder as it is used more than once - state[event->eventID()] = std::async([holder]() mutable { + state[event.eventID()] = std::async([holder]() mutable { using namespace std::chrono_literals; std::this_thread::sleep_for(1s); holder.doneWaiting(); @@ -54,20 +54,19 @@ void TestBatchingProducerExternalWork::acquire(std::vector co }); #ifndef FWTEST_SILENT - std::cout << "TestBatchingProducerExternalWork::acquire Event " << event->eventID() << " stream " - << event->streamID() << " value " << value << std::endl; + std::cout << "TestBatchingProducerExternalWork::acquire Event " << event.eventID() << " stream " << event.streamID() + << " value " << value << std::endl; #endif } } -void TestBatchingProducerExternalWork::produce(std::vector const& events, +void TestBatchingProducerExternalWork::produce(edm::EventRange events, edm::EventSetup const& eventSetup, AsyncState& state) { #ifndef FWTEST_SILENT - for (edm::Event* event : events) { - assert(event); - std::cout << "TestBatchingProducerExternalWork::produce Event " << event->eventID() << " stream " - << event->streamID() << " from future " << state[event->eventID()].get() << std::endl; + for (edm::Event& event : events) { + std::cout << "TestBatchingProducerExternalWork::produce Event " << event.eventID() << " stream " << event.streamID() + << " from future " << state[event.eventID()].get() << std::endl; } #endif ++nevents; From 8594f860494b733bfd4e2787a9ad52ad2174abf2 Mon Sep 17 00:00:00 2001 From: Andrea Bocci Date: Fri, 4 Dec 2020 01:36:28 +0100 Subject: [PATCH 7/9] Avoid potential overrun while reading multiple events per batch with multiple threads --- src/fwtest/bin/Source.cc | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/fwtest/bin/Source.cc b/src/fwtest/bin/Source.cc index 860f1dbc1..276652b4f 100644 --- a/src/fwtest/bin/Source.cc +++ b/src/fwtest/bin/Source.cc @@ -89,8 +89,16 @@ namespace edm { } EventBatch Source::produce(int streamId, ProductRegistry const ®) { - const int old = numEvents_.fetch_add(batchEvents_); - const int size = std::min(batchEvents_, maxEvents_ - old); + + // atomically increase the event counter, without overflowing over maxEvents_ + int old_value = numEvents_; + int new_value; + do { + new_value = std::min(old_value + batchEvents_, maxEvents_); + } while (not numEvents_.compare_exchange_weak(old_value, new_value)); + + // check how many events should be read + const int size = new_value - old_value; EventBatch events; if (size <= 0) { return events; @@ -98,7 +106,7 @@ namespace edm { events.reserve(size); for (int i = 1; i <= size; ++i) { - const int iev = old + i; + const int iev = old_value + i; Event &event = events.emplace(streamId, iev, reg); const int index = (iev - 1) % raw_.size(); From 2b0de2082aaccdd30e39d1a7730f38ce24d555cb Mon Sep 17 00:00:00 2001 From: Andrea Bocci Date: Sat, 5 Dec 2020 00:19:34 +0100 Subject: [PATCH 8/9] Copy event batching support from fwtest to cuda --- src/cuda/Framework/EDProducer.h | 149 +++++++++++++++++- src/cuda/Framework/EventBatch.h | 36 +++++ src/cuda/Framework/EventRange.h | 96 +++++++++++ src/cuda/Framework/WaitingTaskHolder.h | 14 +- .../Framework/WaitingTaskWithArenaHolder.cc | 18 ++- .../Framework/WaitingTaskWithArenaHolder.h | 6 + src/cuda/Framework/Worker.cc | 5 +- src/cuda/Framework/Worker.h | 46 +++--- src/cuda/bin/EventProcessor.cc | 5 +- src/cuda/bin/EventProcessor.h | 3 +- src/cuda/bin/PluginManager.cc | 2 +- src/cuda/bin/Source.cc | 53 +++++-- src/cuda/bin/Source.h | 8 +- src/cuda/bin/StreamSchedule.cc | 49 +++--- src/cuda/bin/StreamSchedule.h | 2 +- src/cuda/bin/main.cc | 9 +- 16 files changed, 413 insertions(+), 88 deletions(-) create mode 100644 src/cuda/Framework/EventBatch.h create mode 100644 src/cuda/Framework/EventRange.h diff --git a/src/cuda/Framework/EDProducer.h b/src/cuda/Framework/EDProducer.h index 8160b8c96..1cc63f58d 100644 --- a/src/cuda/Framework/EDProducer.h +++ b/src/cuda/Framework/EDProducer.h @@ -1,6 +1,9 @@ #ifndef EDProducerBase_h #define EDProducerBase_h +#include + +#include "Framework/EventRange.h" #include "Framework/WaitingTaskWithArenaHolder.h" namespace edm { @@ -14,9 +17,13 @@ namespace edm { bool hasAcquire() const { return false; } - void doAcquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {} + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {} - void doProduce(Event& event, EventSetup const& eventSetup) { produce(event, eventSetup); } + void doProduce(EventRange events, EventSetup const& eventSetup) { + for (Event& event : events) { + produce(event, eventSetup); + } + } virtual void produce(Event& event, EventSetup const& eventSetup) = 0; @@ -27,27 +34,157 @@ namespace edm { private: }; + class EDBatchingProducer { + public: + EDBatchingProducer() = default; + virtual ~EDBatchingProducer() = default; + + bool hasAcquire() const { return false; } + + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {} + + void doProduce(EventRange events, EventSetup const& eventSetup) { produce(events, eventSetup); } + + virtual void produce(EventRange events, EventSetup const& eventSetup) = 0; + + void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + }; + + template class EDProducerExternalWork { public: + using AsyncState = T; + EDProducerExternalWork() = default; virtual ~EDProducerExternalWork() = default; bool hasAcquire() const { return true; } - void doAcquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { - acquire(event, eventSetup, std::move(holder)); + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { + if (events.size() > statesSize_) { + statesSize_ = events.size(); + states_ = std::make_unique(statesSize_); + } + for (size_t i = 0; i < events.size(); ++i) { + acquire(events[i], eventSetup, holder, states_[i]); + } } - void doProduce(Event& event, EventSetup const& eventSetup) { produce(event, eventSetup); } + virtual void acquire(Event const& event, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder, + AsyncState& state) const = 0; + + void doProduce(EventRange events, EventSetup const& eventSetup) { + for (size_t i = 0; i < events.size(); ++i) { + produce(events[i], eventSetup, states_[i]); + } + } + + virtual void produce(Event& event, EventSetup const& eventSetup, AsyncState& state) = 0; + + void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + size_t statesSize_ = 0; + std::unique_ptr states_; + }; + + template <> + class EDProducerExternalWork { + public: + EDProducerExternalWork() = default; + virtual ~EDProducerExternalWork() = default; + + bool hasAcquire() const { return true; } + + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { + for (Event const& event : events) { + acquire(event, eventSetup, holder); + } + } + + virtual void acquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) const = 0; + + void doProduce(EventRange events, EventSetup const& eventSetup) { + for (Event& event : events) { + produce(event, eventSetup); + } + } - virtual void acquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) = 0; virtual void produce(Event& event, EventSetup const& eventSetup) = 0; void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + }; + + template + class EDBatchingProducerExternalWork { + public: + using AsyncState = T; + + EDBatchingProducerExternalWork() = default; + virtual ~EDBatchingProducerExternalWork() = default; + + bool hasAcquire() const { return true; } + + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { + acquire(events, eventSetup, holder, state_); + } + + virtual void acquire(ConstEventRange events, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder, + AsyncState& state) const = 0; + + void doProduce(EventRange events, EventSetup const& eventSetup) { produce(events, eventSetup, state_); } + + virtual void produce(EventRange events, EventSetup const& eventSetup, AsyncState& states) = 0; + + void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + AsyncState state_; + }; + + template <> + class EDBatchingProducerExternalWork { + public: + EDBatchingProducerExternalWork() = default; + virtual ~EDBatchingProducerExternalWork() = default; + + bool hasAcquire() const { return true; } + + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { + acquire(events, eventSetup, holder); + } + + virtual void acquire(ConstEventRange events, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder) const = 0; + + void doProduce(EventRange events, EventSetup const& eventSetup) { produce(events, eventSetup); } + + virtual void produce(EventRange events, EventSetup const& eventSetup) = 0; + + void doEndJob() { endJob(); } + virtual void endJob() {} private: }; + } // namespace edm #endif diff --git a/src/cuda/Framework/EventBatch.h b/src/cuda/Framework/EventBatch.h new file mode 100644 index 000000000..3632c41ea --- /dev/null +++ b/src/cuda/Framework/EventBatch.h @@ -0,0 +1,36 @@ +#ifndef EventBatch_h +#define EventBatch_h + +#include + +#include "Framework/Event.h" +#include "Framework/EventRange.h" + +namespace edm { + + class EventBatch { + public: + Event& emplace(int streamId, int eventId, ProductRegistry const& reg) { + events_.emplace_back(streamId, eventId, reg); + return events_.back(); + } + + void reserve(size_t capacity) { events_.reserve(capacity); } + + void clear() { events_.clear(); } + + size_t size() const { return events_.size(); } + + bool empty() const { return events_.empty(); } + + EventRange range() { return EventRange(&events_.front(), &events_.back() + 1); } + + ConstEventRange range() const { return ConstEventRange(&events_.front(), &events_.back() + 1); } + + private: + std::vector events_; + }; + +} // namespace edm + +#endif // EventBatch_h diff --git a/src/cuda/Framework/EventRange.h b/src/cuda/Framework/EventRange.h new file mode 100644 index 000000000..9a9cca645 --- /dev/null +++ b/src/cuda/Framework/EventRange.h @@ -0,0 +1,96 @@ +#ifndef EventRange_h +#define EventRange_h + +#include +#include +#include + +#include "Framework/Event.h" + +namespace edm { + + class EventRange { + public: + EventRange(Event* begin, Event* end) : begin_(begin), end_(end) { + assert(begin_); + assert(end_); + assert(end_ >= begin_); + } + + Event* begin() { return begin_; } + + Event const* begin() const { return begin_; } + + Event* end() { return end_; } + + Event const* end() const { return end_; } + + size_t size() const { return end_ - begin_; } + + bool empty() const { return end_ == begin_; } + + Event& at(size_t index) { + if (index >= size()) { + std::stringstream msg; + msg << "EventRange::at() range check failed: index " << index << " is outside the range [0.." << size() << ")"; + throw std::out_of_range(msg.str()); + } + return begin_[index]; + } + + Event const& at(size_t index) const { + if (index >= size()) { + std::stringstream msg; + msg << "EventRange::at() range check failed: index " << index << " is outside the range [0.." << size() << ")"; + throw std::out_of_range(msg.str()); + } + return begin_[index]; + } + + Event& operator[](size_t index) { return begin_[index]; } + + Event const& operator[](size_t index) const { return begin_[index]; } + + private: + Event* begin_; + Event* end_; + }; + + class ConstEventRange { + public: + ConstEventRange(Event const* begin, Event const* end) : begin_(begin), end_(end) { + assert(begin_); + assert(end_); + assert(end_ >= begin_); + } + + ConstEventRange(EventRange range) : begin_(range.begin()), end_(range.end()) {} + + Event const* begin() const { return begin_; } + + Event const* end() const { return end_; } + + size_t size() const { return end_ - begin_; } + + bool empty() const { return end_ == begin_; } + + Event const& at(size_t index) { + if (index >= size()) { + std::stringstream msg; + msg << "ConstEventRange::at() range check failed: index " << index << " is outside the range [0.." << size() + << ")"; + throw std::out_of_range(msg.str()); + } + return begin_[index]; + } + + Event const& operator[](size_t index) const { return begin_[index]; } + + private: + Event const* begin_; + Event const* end_; + }; + +} // namespace edm + +#endif // EventRange_h diff --git a/src/cuda/Framework/WaitingTaskHolder.h b/src/cuda/Framework/WaitingTaskHolder.h index d74c5d8ed..9cc1b5c66 100644 --- a/src/cuda/Framework/WaitingTaskHolder.h +++ b/src/cuda/Framework/WaitingTaskHolder.h @@ -34,7 +34,7 @@ namespace edm { explicit WaitingTaskHolder(edm::WaitingTask* iTask) : m_task(iTask) { m_task->increment_ref_count(); } ~WaitingTaskHolder() { if (m_task) { - doneWaiting(std::exception_ptr{}); + doneWaiting(); } } @@ -66,10 +66,7 @@ namespace edm { } } - void doneWaiting(std::exception_ptr iExcept) { - if (iExcept) { - m_task->dependentTaskFailed(iExcept); - } + void doneWaiting() { //spawn can run the task before we finish // doneWaiting and some other thread might // try to reuse this object. Resetting @@ -81,6 +78,13 @@ namespace edm { } } + void doneWaiting(std::exception_ptr iExcept) { + if (iExcept) { + m_task->dependentTaskFailed(iExcept); + } + doneWaiting(); + } + private: // ---------- member data -------------------------------- WaitingTask* m_task; diff --git a/src/cuda/Framework/WaitingTaskWithArenaHolder.cc b/src/cuda/Framework/WaitingTaskWithArenaHolder.cc index 7c852d041..d7c88001e 100644 --- a/src/cuda/Framework/WaitingTaskWithArenaHolder.cc +++ b/src/cuda/Framework/WaitingTaskWithArenaHolder.cc @@ -24,7 +24,7 @@ namespace edm { WaitingTaskWithArenaHolder::~WaitingTaskWithArenaHolder() { if (m_task) { - doneWaiting(std::exception_ptr{}); + doneWaiting(); } } @@ -58,10 +58,7 @@ namespace edm { // into the correct arena of threads. Use of the arena allows doneWaiting // to be called from a thread outside the arena of threads that will manage // the task. doneWaiting can be called from a non-TBB thread. - void WaitingTaskWithArenaHolder::doneWaiting(std::exception_ptr iExcept) { - if (iExcept) { - m_task->dependentTaskFailed(iExcept); - } + void WaitingTaskWithArenaHolder::doneWaiting() { //enqueue can run the task before we finish // doneWaiting and some other thread might // try to reuse this object. Resetting @@ -75,6 +72,17 @@ namespace edm { } } + // This spawns the task. The arena is needed to get the task spawned + // into the correct arena of threads. Use of the arena allows doneWaiting + // to be called from a thread outside the arena of threads that will manage + // the task. doneWaiting can be called from a non-TBB thread. + void WaitingTaskWithArenaHolder::doneWaiting(std::exception_ptr iExcept) { + if (iExcept) { + m_task->dependentTaskFailed(iExcept); + } + doneWaiting(); + } + // This next function is useful if you know from the context that // m_arena (which is set when the constructor was executes) is the // same arena in which you want to execute the doneWaiting function. diff --git a/src/cuda/Framework/WaitingTaskWithArenaHolder.h b/src/cuda/Framework/WaitingTaskWithArenaHolder.h index 4b14febfb..bab615686 100644 --- a/src/cuda/Framework/WaitingTaskWithArenaHolder.h +++ b/src/cuda/Framework/WaitingTaskWithArenaHolder.h @@ -48,6 +48,12 @@ namespace edm { WaitingTaskWithArenaHolder& operator=(WaitingTaskWithArenaHolder&& iRHS); + // This spawns the task. The arena is needed to get the task spawned + // into the correct arena of threads. Use of the arena allows doneWaiting + // to be called from a thread outside the arena of threads that will manage + // the task. doneWaiting can be called from a non-TBB thread. + void doneWaiting(); + // This spawns the task. The arena is needed to get the task spawned // into the correct arena of threads. Use of the arena allows doneWaiting // to be called from a thread outside the arena of threads that will manage diff --git a/src/cuda/Framework/Worker.cc b/src/cuda/Framework/Worker.cc index b3b3df74c..9c1c5c3c4 100644 --- a/src/cuda/Framework/Worker.cc +++ b/src/cuda/Framework/Worker.cc @@ -1,7 +1,8 @@ +//#include #include "Framework/Worker.h" namespace edm { - void Worker::prefetchAsync(Event& event, EventSetup const& eventSetup, WaitingTask* iTask) { + void Worker::prefetchAsync(EventRange events, EventSetup const& eventSetup, WaitingTask* iTask) { //std::cout << "prefetchAsync for " << this << " iTask " << iTask << std::endl; bool expected = false; if (prefetchRequested_.compare_exchange_strong(expected, true)) { @@ -10,7 +11,7 @@ namespace edm { iTask->increment_ref_count(); for (Worker* dep : itemsToGet_) { //std::cout << "calling doWorkAsync for " << dep << " with " << iTask << std::endl; - dep->doWorkAsync(event, eventSetup, iTask); + dep->doWorkAsync(events, eventSetup, iTask); } auto count = iTask->decrement_ref_count(); diff --git a/src/cuda/Framework/Worker.h b/src/cuda/Framework/Worker.h index 0a03670a4..26bdfed0b 100644 --- a/src/cuda/Framework/Worker.h +++ b/src/cuda/Framework/Worker.h @@ -2,9 +2,10 @@ #define Worker_h #include -#include //#include +#include +#include "Framework/EventRange.h" #include "Framework/WaitingTask.h" #include "Framework/WaitingTaskHolder.h" #include "Framework/WaitingTaskList.h" @@ -23,10 +24,10 @@ namespace edm { void setItemsToGet(std::vector workers) { itemsToGet_ = std::move(workers); } // thread safe - void prefetchAsync(Event& event, EventSetup const& eventSetup, WaitingTask* iTask); + void prefetchAsync(EventRange events, EventSetup const& eventSetup, WaitingTask* iTask); // not thread safe - virtual void doWorkAsync(Event& event, EventSetup const& eventSetup, WaitingTask* iTask) = 0; + virtual void doWorkAsync(EventRange events, EventSetup const& eventSetup, WaitingTask* iTask) = 0; // not thread safe virtual void doEndJob() = 0; @@ -50,7 +51,7 @@ namespace edm { public: explicit WorkerT(ProductRegistry& reg) : producer_(reg) {} - void doWorkAsync(Event& event, EventSetup const& eventSetup, WaitingTask* iTask) override { + void doWorkAsync(EventRange events, EventSetup const& eventSetup, WaitingTask* iTask) override { waitingTasksWork_.add(iTask); //std::cout << "doWorkAsync for " << this << " with iTask " << iTask << std::endl; bool expected = false; @@ -58,14 +59,14 @@ namespace edm { //std::cout << "first doWorkAsync call" << std::endl; WaitingTask* moduleTask = make_waiting_task( - tbb::task::allocate_root(), [this, &event, &eventSetup](std::exception_ptr const* iPtr) mutable { + tbb::task::allocate_root(), [this, events, &eventSetup](std::exception_ptr const* iPtr) mutable { if (iPtr) { waitingTasksWork_.doneWaiting(*iPtr); } else { std::exception_ptr exceptionPtr; try { //std::cout << "calling doProduce " << this << std::endl; - producer_.doProduce(event, eventSetup); + producer_.doProduce(events, eventSetup); } catch (...) { exceptionPtr = std::current_exception(); } @@ -75,24 +76,25 @@ namespace edm { }); if (producer_.hasAcquire()) { WaitingTaskWithArenaHolder runProduceHolder{moduleTask}; - moduleTask = make_waiting_task(tbb::task::allocate_root(), - [this, &event, &eventSetup, runProduceHolder = std::move(runProduceHolder)]( - std::exception_ptr const* iPtr) mutable { - if (iPtr) { - runProduceHolder.doneWaiting(*iPtr); - } else { - std::exception_ptr exceptionPtr; - try { - producer_.doAcquire(event, eventSetup, runProduceHolder); - } catch (...) { - exceptionPtr = std::current_exception(); - } - runProduceHolder.doneWaiting(exceptionPtr); - } - }); + moduleTask = make_waiting_task( + tbb::task::allocate_root(), + [this, events = ConstEventRange(events), &eventSetup, runProduceHolder = std::move(runProduceHolder)]( + std::exception_ptr const* iPtr) mutable { + if (iPtr) { + runProduceHolder.doneWaiting(*iPtr); + } else { + std::exception_ptr exceptionPtr; + try { + producer_.doAcquire(events, eventSetup, runProduceHolder); + } catch (...) { + exceptionPtr = std::current_exception(); + } + runProduceHolder.doneWaiting(exceptionPtr); + } + }); } //std::cout << "calling prefetchAsync " << this << " with moduleTask " << moduleTask << std::endl; - prefetchAsync(event, eventSetup, moduleTask); + prefetchAsync(events, eventSetup, moduleTask); } } diff --git a/src/cuda/bin/EventProcessor.cc b/src/cuda/bin/EventProcessor.cc index a50cc6570..e1157472c 100644 --- a/src/cuda/bin/EventProcessor.cc +++ b/src/cuda/bin/EventProcessor.cc @@ -6,13 +6,14 @@ #include "EventProcessor.h" namespace edm { - EventProcessor::EventProcessor(int maxEvents, + EventProcessor::EventProcessor(int batchEvents, + int maxEvents, int numberOfStreams, std::vector const& path, std::vector const& esproducers, std::filesystem::path const& datadir, bool validation) - : source_(maxEvents, registry_, datadir, validation) { + : source_(batchEvents, maxEvents, registry_, datadir, validation) { for (auto const& name : esproducers) { pluginManager_.load(name); auto esp = ESPluginFactory::create(name, datadir); diff --git a/src/cuda/bin/EventProcessor.h b/src/cuda/bin/EventProcessor.h index 614e60c38..bb5cbfc3e 100644 --- a/src/cuda/bin/EventProcessor.h +++ b/src/cuda/bin/EventProcessor.h @@ -14,7 +14,8 @@ namespace edm { class EventProcessor { public: - explicit EventProcessor(int maxEvents, + explicit EventProcessor(int batchEvents, + int maxEvents, int numberOfStreams, std::vector const& path, std::vector const& esproducers, diff --git a/src/cuda/bin/PluginManager.cc b/src/cuda/bin/PluginManager.cc index 7977cdbc2..ecb28032b 100644 --- a/src/cuda/bin/PluginManager.cc +++ b/src/cuda/bin/PluginManager.cc @@ -1,5 +1,5 @@ -#include #include +//#include #include "PluginManager.h" diff --git a/src/cuda/bin/Source.cc b/src/cuda/bin/Source.cc index 64d1a3b5c..276652b4f 100644 --- a/src/cuda/bin/Source.cc +++ b/src/cuda/bin/Source.cc @@ -23,8 +23,13 @@ namespace { } // namespace namespace edm { - Source::Source(int maxEvents, ProductRegistry ®, std::filesystem::path const &datadir, bool validation) - : maxEvents_(maxEvents), numEvents_(0), rawToken_(reg.produces()), validation_(validation) { + Source::Source( + int batchEvents, int maxEvents, ProductRegistry ®, std::filesystem::path const &datadir, bool validation) + : batchEvents_(batchEvents), + maxEvents_(maxEvents), + numEvents_(0), + rawToken_(reg.produces()), + validation_(validation) { std::ifstream in_raw(datadir / "raw.bin", std::ios::binary); std::ifstream in_digiclusters; std::ifstream in_tracks; @@ -74,27 +79,45 @@ namespace edm { assert(raw_.size() == vertices_.size()); } + if (batchEvents_ < 1) { + batchEvents_ = 1; + } + if (maxEvents_ < 0) { maxEvents_ = raw_.size(); } } - std::unique_ptr Source::produce(int streamId, ProductRegistry const ®) { - const int old = numEvents_.fetch_add(1); - const int iev = old + 1; - if (old >= maxEvents_) { - return nullptr; + EventBatch Source::produce(int streamId, ProductRegistry const ®) { + + // atomically increase the event counter, without overflowing over maxEvents_ + int old_value = numEvents_; + int new_value; + do { + new_value = std::min(old_value + batchEvents_, maxEvents_); + } while (not numEvents_.compare_exchange_weak(old_value, new_value)); + + // check how many events should be read + const int size = new_value - old_value; + EventBatch events; + if (size <= 0) { + return events; } - auto ev = std::make_unique(streamId, iev, reg); - const int index = old % raw_.size(); - ev->emplace(rawToken_, raw_[index]); - if (validation_) { - ev->emplace(digiClusterToken_, digiclusters_[index]); - ev->emplace(trackToken_, tracks_[index]); - ev->emplace(vertexToken_, vertices_[index]); + events.reserve(size); + for (int i = 1; i <= size; ++i) { + const int iev = old_value + i; + Event &event = events.emplace(streamId, iev, reg); + const int index = (iev - 1) % raw_.size(); + + event.emplace(rawToken_, raw_[index]); + if (validation_) { + event.emplace(digiClusterToken_, digiclusters_[index]); + event.emplace(trackToken_, tracks_[index]); + event.emplace(vertexToken_, vertices_[index]); + } } - return ev; + return events; } } // namespace edm diff --git a/src/cuda/bin/Source.h b/src/cuda/bin/Source.h index c13534f33..a3c99798d 100644 --- a/src/cuda/bin/Source.h +++ b/src/cuda/bin/Source.h @@ -5,8 +5,10 @@ #include #include #include +#include #include "Framework/Event.h" +#include "Framework/EventBatch.h" #include "DataFormats/FEDRawDataCollection.h" #include "DataFormats/DigiClusterCount.h" #include "DataFormats/TrackCount.h" @@ -15,14 +17,16 @@ namespace edm { class Source { public: - explicit Source(int maxEvents, ProductRegistry& reg, std::filesystem::path const& datadir, bool validation); + explicit Source( + int batchEvents, int maxEvents, ProductRegistry& reg, std::filesystem::path const& datadir, bool validation); int maxEvents() const { return maxEvents_; } // thread safe - std::unique_ptr produce(int streamId, ProductRegistry const& reg); + EventBatch produce(int streamId, ProductRegistry const& reg); private: + int batchEvents_; int maxEvents_; std::atomic numEvents_; EDPutTokenT const rawToken_; diff --git a/src/cuda/bin/StreamSchedule.cc b/src/cuda/bin/StreamSchedule.cc index 8636bce24..1f34f0f58 100644 --- a/src/cuda/bin/StreamSchedule.cc +++ b/src/cuda/bin/StreamSchedule.cc @@ -1,4 +1,5 @@ -//#include +#include +#include #include @@ -44,7 +45,7 @@ namespace edm { void StreamSchedule::runToCompletionAsync(WaitingTaskHolder h) { auto task = - make_functor_task(tbb::task::allocate_root(), [this, h]() mutable { processOneEventAsync(std::move(h)); }); + make_functor_task(tbb::task::allocate_root(), [this, h]() mutable { processEventBatchAsync(std::move(h)); }); if (streamId_ == 0) { tbb::task::spawn(*task); } else { @@ -52,26 +53,26 @@ namespace edm { } } - void StreamSchedule::processOneEventAsync(WaitingTaskHolder h) { - auto event = source_->produce(streamId_, registry_); - if (event) { - // Pass the event object ownership to the "end-of-event" task - // Pass a non-owning pointer to the event to preceding tasks - //std::cout << "Begin processing event " << event->eventID() << std::endl; - auto eventPtr = event.get(); - auto nextEventTask = - make_waiting_task(tbb::task::allocate_root(), - [this, h = std::move(h), ev = std::move(event)](std::exception_ptr const* iPtr) mutable { - ev.reset(); - if (iPtr) { - h.doneWaiting(*iPtr); - } else { - for (auto const& worker : path_) { - worker->reset(); - } - processOneEventAsync(std::move(h)); - } - }); + void StreamSchedule::processEventBatchAsync(WaitingTaskHolder h) { + auto events = source_->produce(streamId_, registry_); + if (not events.empty()) { + // Pass the event batch ownership to the "end-of-event" task + // Pass a non-owning event range to the preceding tasks + //std::cout << "Begin processing a batch of " << events.size() << " events starting from " << events.range().at(0).eventID() << std::endl; + auto eventsRange = events.range(); + auto nextEventTask = make_waiting_task( + tbb::task::allocate_root(), + [this, h = std::move(h), events = std::move(events)](std::exception_ptr const* iPtr) mutable { + events.clear(); + if (iPtr) { + h.doneWaiting(*iPtr); + } else { + for (auto const& worker : path_) { + worker->reset(); + } + processEventBatchAsync(std::move(h)); + } + }); // To guarantee that the nextEventTask is spawned also in // absence of Workers, and also to prevent spawning it before // all workers have been processed (should not happen though) @@ -79,10 +80,10 @@ namespace edm { for (auto iWorker = path_.rbegin(); iWorker != path_.rend(); ++iWorker) { //std::cout << "calling doWorkAsync for " << iWorker->get() << " with nextEventTask " << nextEventTask << std::endl; - (*iWorker)->doWorkAsync(*eventPtr, *eventSetup_, nextEventTask); + (*iWorker)->doWorkAsync(eventsRange, *eventSetup_, nextEventTask); } } else { - h.doneWaiting(std::exception_ptr{}); + h.doneWaiting(); } } diff --git a/src/cuda/bin/StreamSchedule.h b/src/cuda/bin/StreamSchedule.h index 1bd364c70..a62dc3961 100644 --- a/src/cuda/bin/StreamSchedule.h +++ b/src/cuda/bin/StreamSchedule.h @@ -38,7 +38,7 @@ namespace edm { void endJob(); private: - void processOneEventAsync(WaitingTaskHolder h); + void processEventBatchAsync(WaitingTaskHolder h); ProductRegistry registry_; Source* source_; diff --git a/src/cuda/bin/main.cc b/src/cuda/bin/main.cc index c8a76eee5..61a7eb05c 100644 --- a/src/cuda/bin/main.cc +++ b/src/cuda/bin/main.cc @@ -21,7 +21,8 @@ namespace { "[--histogram] [--empty]\n\n" << "Options\n" << " --numberOfThreads Number of threads to use (default 1)\n" - << " --numberOfStreams Number of concurrent events (default 0=numberOfThreads)\n" + << " --numberOfStreams Number of concurrent batch of events (default 0=numberOfThreads)\n" + << " --batchEvents Number of events to process in a batch (default 1 for individual events)\n" << " --maxEvents Number of events to process (default -1 for all events in the input file)\n" << " --data Path to the 'data' directory (default 'data' in the directory of the executable)\n" << " --transfer Transfer results from GPU to CPU (default is to leave them on GPU)\n" @@ -37,6 +38,7 @@ int main(int argc, char** argv) { std::vector args(argv, argv + argc); int numberOfThreads = 1; int numberOfStreams = 0; + int batchEvents = 1; int maxEvents = -1; std::filesystem::path datadir; bool transfer = false; @@ -53,6 +55,9 @@ int main(int argc, char** argv) { } else if (*i == "--numberOfStreams") { ++i; numberOfStreams = std::stoi(*i); + } else if (*i == "--batchEvents") { + ++i; + batchEvents = std::stoul(*i); } else if (*i == "--maxEvents") { ++i; maxEvents = std::stoi(*i); @@ -119,7 +124,7 @@ int main(int argc, char** argv) { } } edm::EventProcessor processor( - maxEvents, numberOfStreams, std::move(edmodules), std::move(esmodules), datadir, validation); + batchEvents, maxEvents, numberOfStreams, std::move(edmodules), std::move(esmodules), datadir, validation); maxEvents = processor.maxEvents(); std::cout << "Processing " << maxEvents << " events, of which " << numberOfStreams << " concurrently, with " From 8070c2c020287861837b08746ee7d65363f12e29 Mon Sep 17 00:00:00 2001 From: Andrea Bocci Date: Sat, 5 Dec 2020 01:53:20 +0100 Subject: [PATCH 9/9] Adapt the EDProducerExternalWork modules to the new interface --- .../PixelTrackSoAFromCUDA.cc | 24 ++-- .../PixelVertexSoAFromCUDA.cc | 20 +-- .../SiPixelRawToClusterCUDA.cc | 74 +++++------ .../SiPixelDigisSoAFromCUDA.cc | 50 ++++---- src/cuda/plugin-Validation/HistoValidator.cc | 121 +++++++++--------- 5 files changed, 149 insertions(+), 140 deletions(-) diff --git a/src/cuda/plugin-PixelTrackFitting/PixelTrackSoAFromCUDA.cc b/src/cuda/plugin-PixelTrackFitting/PixelTrackSoAFromCUDA.cc index 66e93f818..943dd44ce 100644 --- a/src/cuda/plugin-PixelTrackFitting/PixelTrackSoAFromCUDA.cc +++ b/src/cuda/plugin-PixelTrackFitting/PixelTrackSoAFromCUDA.cc @@ -9,7 +9,9 @@ #include "Framework/EDProducer.h" #include "CUDACore/ScopedContext.h" -class PixelTrackSoAFromCUDA : public edm::EDProducerExternalWork { +using PixelTrackSoAFromCUDA_AsyncState = cms::cuda::host::unique_ptr; + +class PixelTrackSoAFromCUDA : public edm::EDProducerExternalWork { public: explicit PixelTrackSoAFromCUDA(edm::ProductRegistry& reg); ~PixelTrackSoAFromCUDA() override = default; @@ -17,13 +19,12 @@ class PixelTrackSoAFromCUDA : public edm::EDProducerExternalWork { private: void acquire(edm::Event const& iEvent, edm::EventSetup const& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) override; - void produce(edm::Event& iEvent, edm::EventSetup const& iSetup) override; + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const override; + void produce(edm::Event& iEvent, edm::EventSetup const& iSetup, AsyncState& state) override; edm::EDGetTokenT> tokenCUDA_; edm::EDPutTokenT tokenSOA_; - - cms::cuda::host::unique_ptr m_soa; }; PixelTrackSoAFromCUDA::PixelTrackSoAFromCUDA(edm::ProductRegistry& reg) @@ -32,17 +33,18 @@ PixelTrackSoAFromCUDA::PixelTrackSoAFromCUDA(edm::ProductRegistry& reg) void PixelTrackSoAFromCUDA::acquire(edm::Event const& iEvent, edm::EventSetup const& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) { + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const { cms::cuda::Product const& inputDataWrapped = iEvent.get(tokenCUDA_); cms::cuda::ScopedContextAcquire ctx{inputDataWrapped, std::move(waitingTaskHolder)}; auto const& inputData = ctx.get(inputDataWrapped); - m_soa = inputData.toHostAsync(ctx.stream()); + state = inputData.toHostAsync(ctx.stream()); } -void PixelTrackSoAFromCUDA::produce(edm::Event& iEvent, edm::EventSetup const& iSetup) { +void PixelTrackSoAFromCUDA::produce(edm::Event& iEvent, edm::EventSetup const& iSetup, AsyncState& state) { /* - auto const & tsoa = *m_soa; + auto const & tsoa = *state; auto maxTracks = tsoa.stride(); std::cout << "size of SoA" << sizeof(tsoa) << " stride " << maxTracks << std::endl; @@ -57,9 +59,9 @@ void PixelTrackSoAFromCUDA::produce(edm::Event& iEvent, edm::EventSetup const& i */ // DO NOT make a copy (actually TWO....) - iEvent.emplace(tokenSOA_, PixelTrackHeterogeneous(std::move(m_soa))); + iEvent.emplace(tokenSOA_, PixelTrackHeterogeneous(std::move(state))); - assert(!m_soa); + assert(!state); } DEFINE_FWK_MODULE(PixelTrackSoAFromCUDA); diff --git a/src/cuda/plugin-PixelVertexFinding/PixelVertexSoAFromCUDA.cc b/src/cuda/plugin-PixelVertexFinding/PixelVertexSoAFromCUDA.cc index d709f0c5e..cbb4da507 100644 --- a/src/cuda/plugin-PixelVertexFinding/PixelVertexSoAFromCUDA.cc +++ b/src/cuda/plugin-PixelVertexFinding/PixelVertexSoAFromCUDA.cc @@ -10,7 +10,9 @@ #include "Framework/RunningAverage.h" #include "CUDACore/ScopedContext.h" -class PixelVertexSoAFromCUDA : public edm::EDProducerExternalWork { +using PixelVertexSoAFromCUDA_AsyncState = cms::cuda::host::unique_ptr; + +class PixelVertexSoAFromCUDA : public edm::EDProducerExternalWork { public: explicit PixelVertexSoAFromCUDA(edm::ProductRegistry& reg); ~PixelVertexSoAFromCUDA() override = default; @@ -18,13 +20,12 @@ class PixelVertexSoAFromCUDA : public edm::EDProducerExternalWork { private: void acquire(edm::Event const& iEvent, edm::EventSetup const& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) override; - void produce(edm::Event& iEvent, edm::EventSetup const& iSetup) override; + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const override; + void produce(edm::Event& iEvent, edm::EventSetup const& iSetup, AsyncState& state) override; edm::EDGetTokenT> tokenCUDA_; edm::EDPutTokenT tokenSOA_; - - cms::cuda::host::unique_ptr m_soa; }; PixelVertexSoAFromCUDA::PixelVertexSoAFromCUDA(edm::ProductRegistry& reg) @@ -33,17 +34,18 @@ PixelVertexSoAFromCUDA::PixelVertexSoAFromCUDA(edm::ProductRegistry& reg) void PixelVertexSoAFromCUDA::acquire(edm::Event const& iEvent, edm::EventSetup const& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) { + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const { auto const& inputDataWrapped = iEvent.get(tokenCUDA_); cms::cuda::ScopedContextAcquire ctx{inputDataWrapped, std::move(waitingTaskHolder)}; auto const& inputData = ctx.get(inputDataWrapped); - m_soa = inputData.toHostAsync(ctx.stream()); + state = inputData.toHostAsync(ctx.stream()); } -void PixelVertexSoAFromCUDA::produce(edm::Event& iEvent, edm::EventSetup const& iSetup) { +void PixelVertexSoAFromCUDA::produce(edm::Event& iEvent, edm::EventSetup const& iSetup, AsyncState& state) { // No copies.... - iEvent.emplace(tokenSOA_, ZVertexHeterogeneous(std::move(m_soa))); + iEvent.emplace(tokenSOA_, ZVertexHeterogeneous(std::move(state))); } DEFINE_FWK_MODULE(PixelVertexSoAFromCUDA); diff --git a/src/cuda/plugin-SiPixelClusterizer/SiPixelRawToClusterCUDA.cc b/src/cuda/plugin-SiPixelClusterizer/SiPixelRawToClusterCUDA.cc index 06624744e..ff387e1b4 100644 --- a/src/cuda/plugin-SiPixelClusterizer/SiPixelRawToClusterCUDA.cc +++ b/src/cuda/plugin-SiPixelClusterizer/SiPixelRawToClusterCUDA.cc @@ -22,7 +22,13 @@ #include #include -class SiPixelRawToClusterCUDA : public edm::EDProducerExternalWork { +struct SiPixelRawToClusterCUDA_AsyncState { + cms::cuda::ContextState ctx; + pixelgpudetails::SiPixelRawToClusterGPUKernel gpuAlgo; + pixelgpudetails::SiPixelRawToClusterGPUKernel::WordFedAppender wordFedAppender; +}; + +class SiPixelRawToClusterCUDA : public edm::EDProducerExternalWork { public: explicit SiPixelRawToClusterCUDA(edm::ProductRegistry& reg); ~SiPixelRawToClusterCUDA() override = default; @@ -30,19 +36,14 @@ class SiPixelRawToClusterCUDA : public edm::EDProducerExternalWork { private: void acquire(const edm::Event& iEvent, const edm::EventSetup& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) override; - void produce(edm::Event& iEvent, const edm::EventSetup& iSetup) override; - - cms::cuda::ContextState ctxState_; + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const override; + void produce(edm::Event& iEvent, const edm::EventSetup& iSetup, AsyncState& state) override; - edm::EDGetTokenT rawGetToken_; - edm::EDPutTokenT> digiPutToken_; + const edm::EDGetTokenT rawGetToken_; + const edm::EDPutTokenT> digiPutToken_; edm::EDPutTokenT> digiErrorPutToken_; - edm::EDPutTokenT> clusterPutToken_; - - pixelgpudetails::SiPixelRawToClusterGPUKernel gpuAlgo_; - std::unique_ptr wordFedAppender_; - PixelFormatterErrors errors_; + const edm::EDPutTokenT> clusterPutToken_; const bool isRun2_; const bool includeErrors_; @@ -59,14 +60,13 @@ SiPixelRawToClusterCUDA::SiPixelRawToClusterCUDA(edm::ProductRegistry& reg) if (includeErrors_) { digiErrorPutToken_ = reg.produces>(); } - - wordFedAppender_ = std::make_unique(); } void SiPixelRawToClusterCUDA::acquire(const edm::Event& iEvent, const edm::EventSetup& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) { - cms::cuda::ScopedContextAcquire ctx{iEvent.streamID(), std::move(waitingTaskHolder), ctxState_}; + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const { + cms::cuda::ScopedContextAcquire ctx{iEvent.streamID(), std::move(waitingTaskHolder), state.ctx}; auto const& hgpuMap = iSetup.get(); if (hgpuMap.hasQuality() != useQuality_) { @@ -85,7 +85,7 @@ void SiPixelRawToClusterCUDA::acquire(const edm::Event& iEvent, const auto& buffers = iEvent.get(rawGetToken_); - errors_.clear(); + PixelFormatterErrors errors; // GPU specific: Data extraction for RawToDigi GPU unsigned int wordCounterGPU = 0; @@ -115,7 +115,7 @@ void SiPixelRawToClusterCUDA::acquire(const edm::Event& iEvent, // check CRC bit const uint64_t* trailer = reinterpret_cast(rawData.data()) + (nWords - 1); - if (not errorcheck.checkCRC(errorsInEvent, fedId, trailer, errors_)) { + if (not errorcheck.checkCRC(errorsInEvent, fedId, trailer, errors)) { continue; } @@ -125,7 +125,7 @@ void SiPixelRawToClusterCUDA::acquire(const edm::Event& iEvent, bool moreHeaders = true; while (moreHeaders) { header++; - bool headerStatus = errorcheck.checkHeader(errorsInEvent, fedId, header, errors_); + bool headerStatus = errorcheck.checkHeader(errorsInEvent, fedId, header, errors); moreHeaders = headerStatus; } @@ -134,7 +134,7 @@ void SiPixelRawToClusterCUDA::acquire(const edm::Event& iEvent, trailer++; while (moreTrailers) { trailer--; - bool trailerStatus = errorcheck.checkTrailer(errorsInEvent, fedId, nWords, trailer, errors_); + bool trailerStatus = errorcheck.checkTrailer(errorsInEvent, fedId, nWords, trailer, errors); moreTrailers = trailerStatus; } @@ -142,33 +142,33 @@ void SiPixelRawToClusterCUDA::acquire(const edm::Event& iEvent, const uint32_t* ew = (const uint32_t*)(trailer); assert(0 == (ew - bw) % 2); - wordFedAppender_->initializeWordFed(fedId, wordCounterGPU, bw, (ew - bw)); + state.wordFedAppender.initializeWordFed(fedId, wordCounterGPU, bw, (ew - bw)); wordCounterGPU += (ew - bw); } // end of for loop - gpuAlgo_.makeClustersAsync(isRun2_, - gpuMap, - gpuModulesToUnpack, - gpuGains, - *wordFedAppender_, - std::move(errors_), - wordCounterGPU, - fedCounter, - useQuality_, - includeErrors_, - false, // debug - ctx.stream()); + state.gpuAlgo.makeClustersAsync(isRun2_, + gpuMap, + gpuModulesToUnpack, + gpuGains, + state.wordFedAppender, + std::move(errors), + wordCounterGPU, + fedCounter, + useQuality_, + includeErrors_, + false, // debug + ctx.stream()); } -void SiPixelRawToClusterCUDA::produce(edm::Event& iEvent, const edm::EventSetup& iSetup) { - cms::cuda::ScopedContextProduce ctx{ctxState_}; +void SiPixelRawToClusterCUDA::produce(edm::Event& iEvent, const edm::EventSetup& iSetup, AsyncState& state) { + cms::cuda::ScopedContextProduce ctx{state.ctx}; - auto tmp = gpuAlgo_.getResults(); + auto tmp = state.gpuAlgo.getResults(); ctx.emplace(iEvent, digiPutToken_, std::move(tmp.first)); ctx.emplace(iEvent, clusterPutToken_, std::move(tmp.second)); if (includeErrors_) { - ctx.emplace(iEvent, digiErrorPutToken_, gpuAlgo_.getErrors()); + ctx.emplace(iEvent, digiErrorPutToken_, state.gpuAlgo.getErrors()); } } diff --git a/src/cuda/plugin-SiPixelRawToDigi/SiPixelDigisSoAFromCUDA.cc b/src/cuda/plugin-SiPixelRawToDigi/SiPixelDigisSoAFromCUDA.cc index 448f4b797..d3e806a49 100644 --- a/src/cuda/plugin-SiPixelRawToDigi/SiPixelDigisSoAFromCUDA.cc +++ b/src/cuda/plugin-SiPixelRawToDigi/SiPixelDigisSoAFromCUDA.cc @@ -8,7 +8,15 @@ #include "CUDACore/ScopedContext.h" #include "CUDACore/host_unique_ptr.h" -class SiPixelDigisSoAFromCUDA : public edm::EDProducerExternalWork { +struct SiPixelDigisSoAFromCUDA_AsyncState { + cms::cuda::host::unique_ptr pdigi; + cms::cuda::host::unique_ptr rawIdArr; + cms::cuda::host::unique_ptr adc; + cms::cuda::host::unique_ptr clus; + size_t nDigis; +}; + +class SiPixelDigisSoAFromCUDA : public edm::EDProducerExternalWork { public: explicit SiPixelDigisSoAFromCUDA(edm::ProductRegistry& reg); ~SiPixelDigisSoAFromCUDA() override = default; @@ -16,18 +24,12 @@ class SiPixelDigisSoAFromCUDA : public edm::EDProducerExternalWork { private: void acquire(const edm::Event& iEvent, const edm::EventSetup& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) override; - void produce(edm::Event& iEvent, const edm::EventSetup& iSetup) override; - - edm::EDGetTokenT> digiGetToken_; - edm::EDPutTokenT digiPutToken_; + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const override; + void produce(edm::Event& iEvent, const edm::EventSetup& iSetup, AsyncState& state) override; - cms::cuda::host::unique_ptr pdigi_; - cms::cuda::host::unique_ptr rawIdArr_; - cms::cuda::host::unique_ptr adc_; - cms::cuda::host::unique_ptr clus_; - - size_t nDigis_; + const edm::EDGetTokenT> digiGetToken_; + const edm::EDPutTokenT digiPutToken_; }; SiPixelDigisSoAFromCUDA::SiPixelDigisSoAFromCUDA(edm::ProductRegistry& reg) @@ -36,20 +38,20 @@ SiPixelDigisSoAFromCUDA::SiPixelDigisSoAFromCUDA(edm::ProductRegistry& reg) void SiPixelDigisSoAFromCUDA::acquire(const edm::Event& iEvent, const edm::EventSetup& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) { + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const { // Do the transfer in a CUDA stream parallel to the computation CUDA stream cms::cuda::ScopedContextAcquire ctx{iEvent.streamID(), std::move(waitingTaskHolder)}; const auto& gpuDigis = ctx.get(iEvent, digiGetToken_); - - nDigis_ = gpuDigis.nDigis(); - pdigi_ = gpuDigis.pdigiToHostAsync(ctx.stream()); - rawIdArr_ = gpuDigis.rawIdArrToHostAsync(ctx.stream()); - adc_ = gpuDigis.adcToHostAsync(ctx.stream()); - clus_ = gpuDigis.clusToHostAsync(ctx.stream()); + state.pdigi = gpuDigis.pdigiToHostAsync(ctx.stream()); + state.rawIdArr = gpuDigis.rawIdArrToHostAsync(ctx.stream()); + state.adc = gpuDigis.adcToHostAsync(ctx.stream()); + state.clus = gpuDigis.clusToHostAsync(ctx.stream()); + state.nDigis = gpuDigis.nDigis(); } -void SiPixelDigisSoAFromCUDA::produce(edm::Event& iEvent, const edm::EventSetup& iSetup) { +void SiPixelDigisSoAFromCUDA::produce(edm::Event& iEvent, const edm::EventSetup& iSetup, AsyncState& state) { // The following line copies the data from the pinned host memory to // regular host memory. In principle that feels unnecessary (why not // just use the pinned host memory?). There are a few arguments for @@ -60,12 +62,8 @@ void SiPixelDigisSoAFromCUDA::produce(edm::Event& iEvent, const edm::EventSetup& // host memory to be allocated without a CUDA stream // - What if a CPU algorithm would produce the same SoA? We can't // use cudaMallocHost without a GPU... - iEvent.emplace(digiPutToken_, nDigis_, pdigi_.get(), rawIdArr_.get(), adc_.get(), clus_.get()); - - pdigi_.reset(); - rawIdArr_.reset(); - adc_.reset(); - clus_.reset(); + iEvent.emplace( + digiPutToken_, state.nDigis, state.pdigi.get(), state.rawIdArr.get(), state.adc.get(), state.clus.get()); } // define as framework plugin diff --git a/src/cuda/plugin-Validation/HistoValidator.cc b/src/cuda/plugin-Validation/HistoValidator.cc index d7b11d4b2..90b18f2dc 100644 --- a/src/cuda/plugin-Validation/HistoValidator.cc +++ b/src/cuda/plugin-Validation/HistoValidator.cc @@ -15,15 +15,29 @@ #include #include -class HistoValidator : public edm::EDProducerExternalWork { +struct HistoValidator_AsyncState { + uint32_t nDigis; + uint32_t nModules; + uint32_t nClusters; + uint32_t nHits; + cms::cuda::host::unique_ptr adc; + cms::cuda::host::unique_ptr clusInModule; + cms::cuda::host::unique_ptr localCoord; + cms::cuda::host::unique_ptr globalCoord; + cms::cuda::host::unique_ptr charge; + cms::cuda::host::unique_ptr size; +}; + +class HistoValidator : public edm::EDProducerExternalWork { public: explicit HistoValidator(edm::ProductRegistry& reg); private: void acquire(const edm::Event& iEvent, const edm::EventSetup& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) override; - void produce(edm::Event& iEvent, const edm::EventSetup& iSetup) override; + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const override; + void produce(edm::Event& iEvent, const edm::EventSetup& iSetup, AsyncState& state) override; void endJob() override; edm::EDGetTokenT> digiToken_; @@ -32,17 +46,6 @@ class HistoValidator : public edm::EDProducerExternalWork { edm::EDGetTokenT trackToken_; edm::EDGetTokenT vertexToken_; - uint32_t nDigis; - uint32_t nModules; - uint32_t nClusters; - uint32_t nHits; - cms::cuda::host::unique_ptr h_adc; - cms::cuda::host::unique_ptr h_clusInModule; - cms::cuda::host::unique_ptr h_localCoord; - cms::cuda::host::unique_ptr h_globalCoord; - cms::cuda::host::unique_ptr h_charge; - cms::cuda::host::unique_ptr h_size; - static std::map histos; }; @@ -90,61 +93,65 @@ HistoValidator::HistoValidator(edm::ProductRegistry& reg) void HistoValidator::acquire(const edm::Event& iEvent, const edm::EventSetup& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) { + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const { auto const& pdigis = iEvent.get(digiToken_); cms::cuda::ScopedContextAcquire ctx{pdigis, std::move(waitingTaskHolder)}; auto const& digis = ctx.get(iEvent, digiToken_); auto const& clusters = ctx.get(iEvent, clusterToken_); auto const& hits = ctx.get(iEvent, hitToken_); - nDigis = digis.nDigis(); - nModules = digis.nModules(); - h_adc = digis.adcToHostAsync(ctx.stream()); - - nClusters = clusters.nClusters(); - h_clusInModule = cms::cuda::make_host_unique(nModules, ctx.stream()); - cudaCheck(cudaMemcpyAsync( - h_clusInModule.get(), clusters.clusInModule(), sizeof(uint32_t) * nModules, cudaMemcpyDefault, ctx.stream())); - - nHits = hits.nHits(); - h_localCoord = hits.localCoordToHostAsync(ctx.stream()); - h_globalCoord = hits.globalCoordToHostAsync(ctx.stream()); - h_charge = hits.chargeToHostAsync(ctx.stream()); - h_size = hits.sizeToHostAsync(ctx.stream()); + state.nDigis = digis.nDigis(); + state.nModules = digis.nModules(); + state.adc = digis.adcToHostAsync(ctx.stream()); + + state.nClusters = clusters.nClusters(); + state.clusInModule = cms::cuda::make_host_unique(state.nModules, ctx.stream()); + cudaCheck(cudaMemcpyAsync(state.clusInModule.get(), + clusters.clusInModule(), + sizeof(uint32_t) * state.nModules, + cudaMemcpyDefault, + ctx.stream())); + + state.nHits = hits.nHits(); + state.localCoord = hits.localCoordToHostAsync(ctx.stream()); + state.globalCoord = hits.globalCoordToHostAsync(ctx.stream()); + state.charge = hits.chargeToHostAsync(ctx.stream()); + state.size = hits.sizeToHostAsync(ctx.stream()); } -void HistoValidator::produce(edm::Event& iEvent, const edm::EventSetup& iSetup) { - histos["digi_n"].fill(nDigis); - for (uint32_t i = 0; i < nDigis; ++i) { - histos["digi_adc"].fill(h_adc[i]); +void HistoValidator::produce(edm::Event& iEvent, const edm::EventSetup& iSetup, AsyncState& state) { + histos["digi_n"].fill(state.nDigis); + for (uint32_t i = 0; i < state.nDigis; ++i) { + histos["digi_adc"].fill(state.adc[i]); } - h_adc.reset(); - histos["module_n"].fill(nModules); + //adc.reset(); + histos["module_n"].fill(state.nModules); - histos["cluster_n"].fill(nClusters); - for (uint32_t i = 0; i < nModules; ++i) { - histos["cluster_per_module_n"].fill(h_clusInModule[i]); + histos["cluster_n"].fill(state.nClusters); + for (uint32_t i = 0; i < state.nModules; ++i) { + histos["cluster_per_module_n"].fill(state.clusInModule[i]); } - h_clusInModule.reset(); - - histos["hit_n"].fill(nHits); - for (uint32_t i = 0; i < nHits; ++i) { - histos["hit_lx"].fill(h_localCoord[i]); - histos["hit_ly"].fill(h_localCoord[i + nHits]); - histos["hit_lex"].fill(h_localCoord[i + 2 * nHits]); - histos["hit_ley"].fill(h_localCoord[i + 3 * nHits]); - histos["hit_gx"].fill(h_globalCoord[i]); - histos["hit_gy"].fill(h_globalCoord[i + nHits]); - histos["hit_gz"].fill(h_globalCoord[i + 2 * nHits]); - histos["hit_gr"].fill(h_globalCoord[i + 3 * nHits]); - histos["hit_charge"].fill(h_charge[i]); - histos["hit_sizex"].fill(h_size[i]); - histos["hit_sizey"].fill(h_size[i + nHits]); + //clusInModule.reset(); + + histos["hit_n"].fill(state.nHits); + for (uint32_t i = 0; i < state.nHits; ++i) { + histos["hit_lx"].fill(state.localCoord[i]); + histos["hit_ly"].fill(state.localCoord[i + state.nHits]); + histos["hit_lex"].fill(state.localCoord[i + 2 * state.nHits]); + histos["hit_ley"].fill(state.localCoord[i + 3 * state.nHits]); + histos["hit_gx"].fill(state.globalCoord[i]); + histos["hit_gy"].fill(state.globalCoord[i + state.nHits]); + histos["hit_gz"].fill(state.globalCoord[i + 2 * state.nHits]); + histos["hit_gr"].fill(state.globalCoord[i + 3 * state.nHits]); + histos["hit_charge"].fill(state.charge[i]); + histos["hit_sizex"].fill(state.size[i]); + histos["hit_sizey"].fill(state.size[i + state.nHits]); } - h_localCoord.reset(); - h_globalCoord.reset(); - h_charge.reset(); - h_size.reset(); + //state.localCoord.reset(); + //state.globalCoord.reset(); + //state.charge.reset(); + //state.size.reset(); { auto const& tracks = iEvent.get(trackToken_);