Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions FWCore/Framework/bin/cmsRun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,26 +267,27 @@ int main(int argc, const char* argv[]) {
TaskCleanupSentry sentry{proc.get()};

alwaysAddContext = false;

proc.on();
context = "Calling beginJob";
proc->beginJob();

// EventSetupsController uses pointers to the ParameterSet
// owned by ProcessDesc while it is dealing with sharing of
// ESProducers among the top-level process and the
// SubProcesses. Therefore the ProcessDesc needs to be kept
// alive until the beginJob transition has finished.
processDesc.reset();

alwaysAddContext = false;
context =
"Calling EventProcessor::runToCompletion (which does almost everything after beginJob and before endJob)";
proc.on();
auto status = proc->runToCompletion();
if (status == edm::EventProcessor::epSignal) {
returnCode = edm::errors::CaughtSignal;
}
proc.off();

context = "Calling endJob";
proc.off();
context = "Calling endJob and endStream";
proc->endJob();
});
return returnCode;
Expand Down
9 changes: 9 additions & 0 deletions FWCore/Framework/interface/EventProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ configured in the user's main() function, and is set running.
#include "FWCore/Utilities/interface/get_underlying_safe.h"
#include "FWCore/Utilities/interface/propagate_const.h"

#include "oneapi/tbb/task_group.h"

#include <atomic>
#include <map>
#include <memory>
Expand All @@ -46,6 +48,7 @@ configured in the user's main() function, and is set running.

namespace edm {

class ExceptionCollector;
class ExceptionToActionTable;
class BranchIDListHelper;
class MergeableRunProductMetadata;
Expand Down Expand Up @@ -120,6 +123,10 @@ namespace edm {
*/
void beginJob();

void beginStreams();

void endStreams(ExceptionCollector&) noexcept;

/**This should be called before the EventProcessor is destroyed
throws if any module's endJob throws an exception.
*/
Expand Down Expand Up @@ -351,6 +358,8 @@ namespace edm {
std::shared_ptr<std::recursive_mutex> sourceMutex_;
PrincipalCache principalCache_;
bool beginJobCalled_;
bool beginJobStartedModules_ = false;
bool beginJobSucceeded_ = false;
bool shouldWeStop_;
bool fileModeNoMerge_;
std::string exceptionMessageFiles_;
Expand Down
26 changes: 15 additions & 11 deletions FWCore/Framework/interface/GlobalSchedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "FWCore/MessageLogger/interface/ExceptionMessages.h"
#include "FWCore/ServiceRegistry/interface/GlobalContext.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
#include "FWCore/ServiceRegistry/interface/ServiceToken.h"
#include "FWCore/Utilities/interface/Algorithms.h"
#include "FWCore/Utilities/interface/BranchType.h"
Expand All @@ -38,9 +39,7 @@

namespace edm {

class ActivityRegistry;
class ExceptionCollector;
class ProcessContext;
class PreallocationConfiguration;
class ModuleRegistry;
class TriggerResultInserter;
Expand Down Expand Up @@ -76,7 +75,9 @@ namespace edm {

void beginJob(ProductRegistry const&,
eventsetup::ESRecordsToProductResolverIndices const&,
ProcessBlockHelperBase const&);
ProcessBlockHelperBase const&,
PathsAndConsumesOfModulesBase const&,
ProcessContext const&);
void endJob(ExceptionCollector& collector);

/// Return a vector allowing const access to all the
Expand Down Expand Up @@ -118,7 +119,14 @@ namespace edm {
std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
std::vector<edm::propagate_const<WorkerPtr>> extraWorkers_;
ProcessContext const* processContext_;

// The next 4 variables use the same naming convention, even though we have no intention
// to ever have concurrent ProcessBlocks or Jobs. They are all related to the number of
// WorkerManagers needed for global transitions.
unsigned int numberOfConcurrentLumis_;
unsigned int numberOfConcurrentRuns_;
static constexpr unsigned int numberOfConcurrentProcessBlocks_ = 1;
static constexpr unsigned int numberOfConcurrentJobs_ = 1;
};

template <typename T>
Expand Down Expand Up @@ -155,6 +163,8 @@ namespace edm {
unsigned int managerIndex = principal.index();
if constexpr (T::branchType_ == InRun) {
managerIndex += numberOfConcurrentLumis_;
} else if constexpr (T::branchType_ == InProcess) {
managerIndex += (numberOfConcurrentLumis_ + numberOfConcurrentRuns_);
}
WorkerManager& workerManager = workerManagers_[managerIndex];
workerManager.resetAll();
Expand Down Expand Up @@ -184,10 +194,7 @@ namespace edm {
ServiceRegistry::Operate op(token);
convertException::wrap([this, globalContext]() { T::preScheduleSignal(actReg_.get(), globalContext); });
} catch (cms::Exception& ex) {
std::ostringstream ost;
ex.addContext("Handling pre signal, likely in a service function");
exceptionContext(ost, *globalContext);
ex.addContext(ost.str());
exceptionContext(ex, *globalContext, "Handling pre signal, likely in a service function");
throw;
}
}
Expand All @@ -205,10 +212,7 @@ namespace edm {
});
} catch (cms::Exception& ex) {
if (not excpt) {
std::ostringstream ost;
ex.addContext("Handling post signal, likely in a service function");
exceptionContext(ost, *globalContext);
ex.addContext(ost.str());
exceptionContext(ex, *globalContext, "Handling post signal, likely in a service function");
excpt = std::current_exception();
}
}
Expand Down
14 changes: 8 additions & 6 deletions FWCore/Framework/interface/Schedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@
#include "FWCore/MessageLogger/interface/JobReport.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
#include "FWCore/Utilities/interface/Algorithms.h"
#include "FWCore/Utilities/interface/BranchType.h"
#include "FWCore/Utilities/interface/ConvertException.h"
#include "FWCore/Utilities/interface/Exception.h"
#include "FWCore/Utilities/interface/StreamID.h"
#include "FWCore/Utilities/interface/get_underlying_safe.h"
Expand All @@ -85,6 +85,7 @@
#include <array>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <vector>
Expand All @@ -100,13 +101,11 @@ namespace edm {
class ESRecordsToProductResolverIndices;
}

class ActivityRegistry;
class BranchIDListHelper;
class EventTransitionInfo;
class ExceptionCollector;
class MergeableRunProductMetadata;
class OutputModuleCommunicator;
class ProcessContext;
class ProductRegistry;
class PreallocationConfiguration;
class StreamSchedule;
Expand Down Expand Up @@ -171,11 +170,14 @@ namespace edm {

void beginJob(ProductRegistry const&,
eventsetup::ESRecordsToProductResolverIndices const&,
ProcessBlockHelperBase const&);
ProcessBlockHelperBase const&,
PathsAndConsumesOfModulesBase const&,
ProcessContext const&);
void endJob(ExceptionCollector& collector);
void sendFwkSummaryToMessageLogger() const;

void beginStream(unsigned int);
void endStream(unsigned int);
void beginStream(unsigned int streamID);
void endStream(unsigned int streamID, ExceptionCollector& collector, std::mutex& collectorMutex) noexcept;

// Write the luminosity block
void writeLumiAsync(WaitingTaskHolder iTask,
Expand Down
41 changes: 21 additions & 20 deletions FWCore/Framework/interface/StreamSchedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@
#include "FWCore/MessageLogger/interface/JobReport.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
#include "FWCore/ServiceRegistry/interface/ServiceToken.h"
#include "FWCore/ServiceRegistry/interface/StreamContext.h"
#include "FWCore/Concurrency/interface/FunctorTask.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
Expand All @@ -88,8 +90,10 @@
#include "FWCore/Utilities/interface/propagate_const.h"
#include "FWCore/Utilities/interface/thread_safety_macros.h"

#include <exception>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <vector>
Expand All @@ -111,8 +115,6 @@ namespace edm {
class PathStatusInserter;
class EndPathStatusInserter;
class PreallocationConfiguration;
class WaitingTaskHolder;

class ConditionalTaskHelper;

namespace service {
Expand All @@ -123,7 +125,6 @@ namespace edm {
public:
typedef std::vector<std::string> vstring;
typedef std::vector<Path> TrigPaths;
typedef std::vector<Path> NonTrigPaths;
typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
typedef std::shared_ptr<Worker> WorkerPtr;
Expand Down Expand Up @@ -162,7 +163,7 @@ namespace edm {
bool cleaningUpAfterException = false);

void beginStream();
void endStream();
void endStream(ExceptionCollector& collector, std::mutex& collectorMutex) noexcept;

StreamID streamID() const { return streamID_; }

Expand Down Expand Up @@ -306,12 +307,9 @@ namespace edm {
void preScheduleSignal(StreamContext const*) const;

template <typename T>
void postScheduleSignal(StreamContext const*, ServiceWeakToken const&, std::exception_ptr&) const noexcept;
void postScheduleSignal(StreamContext const*, std::exception_ptr&) const noexcept;

void handleException(StreamContext const&,
ServiceWeakToken const&,
bool cleaningUpAfterException,
std::exception_ptr&) const noexcept;
void handleException(StreamContext const&, bool cleaningUpAfterException, std::exception_ptr&) const noexcept;

WorkerManager workerManagerBeginEnd_;
WorkerManager workerManagerRuns_;
Expand Down Expand Up @@ -370,11 +368,15 @@ namespace edm {
auto doneTask = make_waiting_task([this, iHolder = std::move(iHolder), cleaningUpAfterException, weakToken](
std::exception_ptr const* iPtr) mutable {
std::exception_ptr excpt;
if (iPtr) {
excpt = *iPtr;
handleException(streamContext_, weakToken, cleaningUpAfterException, excpt);
}
postScheduleSignal<T>(&streamContext_, weakToken, excpt);
{
ServiceRegistry::Operate op(weakToken.lock());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the activation of Services here, instead of separately in handleException() and postScheduleSignal(), a simplification, or is there a deeper reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, beginStream and endStream are now using those functions. They were not using them before. As currently implemented the service token is not available in beginStream and endStream and the services are turned on at a higher level for those. I wanted to share that code for those transitions also. Partially, it is also a simplification.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks.


if (iPtr) {
excpt = *iPtr;
handleException(streamContext_, cleaningUpAfterException, excpt);
}
postScheduleSignal<T>(&streamContext_, excpt);
} // release service token before calling doneWaiting
iHolder.doneWaiting(excpt);
});

Expand All @@ -391,7 +393,10 @@ namespace edm {
preScheduleSignal<T>(&streamContext_);
workerManager->resetAll();
} catch (...) {
h.doneWaiting(std::current_exception());
// Just remember the exception at this point,
// let the destructor of h call doneWaiting() so the
// ServiceRegistry::Operator object is destroyed first
h.presetTaskAsFailed(std::current_exception());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain what is achieved by this change? AFAIU it moves the signaling of completion from here to wherever the lambda object is destructed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is good to let the ServiceRegistry::Operate object go out of scope and be destroyed before WaitingTaskHolder::doneWaiting is called. I think there is a very small chance of major problems if the ServiceToken stays around until the near the end of the job when things are being shutdown and destroyed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment in the code along "doneWaiting() signaling is left to the the destructor of h in order to ServiceRegistry::Operator object to be destructed first"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Slightly reworded the comment.

return;
}

Expand Down Expand Up @@ -430,13 +435,9 @@ namespace edm {

template <typename T>
void StreamSchedule::postScheduleSignal(StreamContext const* streamContext,
ServiceWeakToken const& weakToken,
std::exception_ptr& excpt) const noexcept {
try {
convertException::wrap([this, &weakToken, streamContext]() {
ServiceRegistry::Operate op(weakToken.lock());
T::postScheduleSignal(actReg_.get(), streamContext);
});
convertException::wrap([this, streamContext]() { T::postScheduleSignal(actReg_.get(), streamContext); });
} catch (cms::Exception& ex) {
if (not excpt) {
std::ostringstream ost;
Expand Down
11 changes: 7 additions & 4 deletions FWCore/Framework/interface/SubProcess.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "FWCore/Framework/interface/ProductSelector.h"
#include "FWCore/ServiceRegistry/interface/ProcessContext.h"
#include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
#include "FWCore/ServiceRegistry/interface/ServiceToken.h"
#include "FWCore/Utilities/interface/Algorithms.h"
#include "FWCore/Utilities/interface/BranchType.h"
Expand All @@ -23,6 +24,7 @@

#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <vector>

Expand All @@ -32,6 +34,7 @@ namespace edm {
class BranchIDListHelper;
class EventPrincipal;
class EventSetupImpl;
class ExceptionCollector;
class HistoryAppender;
class LuminosityBlockPrincipal;
class LumiTransitionInfo;
Expand Down Expand Up @@ -86,7 +89,7 @@ namespace edm {
std::vector<ModuleProcessName> keepOnlyConsumedUnscheduledModules(bool deleteModules);

void doBeginJob();
void doEndJob();
void doEndJob(ExceptionCollector&);

void doEventAsync(WaitingTaskHolder iHolder,
EventPrincipal const& principal,
Expand All @@ -113,8 +116,8 @@ namespace edm {
LumiTransitionInfo const& iTransitionInfo,
bool cleaningUpAfterException);

void doBeginStream(unsigned int);
void doEndStream(unsigned int);
void doBeginStream(unsigned int streamID);
void doEndStream(unsigned int streamID, ExceptionCollector& collector, std::mutex& collectorMutex) noexcept;
void doStreamBeginRunAsync(WaitingTaskHolder iHolder, unsigned int iID, RunTransitionInfo const&);

void doStreamEndRunAsync(WaitingTaskHolder iHolder,
Expand Down Expand Up @@ -238,7 +241,7 @@ namespace edm {

private:
void beginJob();
void endJob();
void endJob(ExceptionCollector&);
void processAsync(WaitingTaskHolder iHolder,
EventPrincipal const& e,
std::vector<std::shared_ptr<const EventSetupImpl>> const*);
Expand Down
12 changes: 7 additions & 5 deletions FWCore/Framework/interface/WorkerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/UnscheduledCallProducer.h"
#include "FWCore/Framework/interface/WorkerRegistry.h"
#include "FWCore/ServiceRegistry/interface/ParentContext.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
#include "FWCore/Utilities/interface/StreamID.h"

#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <utility>
Expand Down Expand Up @@ -70,12 +72,12 @@ namespace edm {

void beginJob(ProductRegistry const& iRegistry,
eventsetup::ESRecordsToProductResolverIndices const&,
ProcessBlockHelperBase const&);
void endJob();
void endJob(ExceptionCollector& collector);
ProcessBlockHelperBase const&,
GlobalContext const&);
void endJob(ExceptionCollector&, GlobalContext const&);

void beginStream(StreamID iID, StreamContext& streamContext);
void endStream(StreamID iID, StreamContext& streamContext);
void beginStream(StreamID, StreamContext const&);
void endStream(StreamID, StreamContext const&, ExceptionCollector&, std::mutex& collectorMutex) noexcept;

AllWorkers const& allWorkers() const { return allWorkers_; }
AllWorkers const& unscheduledWorkers() const { return unscheduled_.workers(); }
Expand Down
Loading