Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 44 additions & 3 deletions FWCore/Concurrency/scripts/edmStreamStallGrapher.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def printHelp():
kSourceDelayedRead ="sourceDelayedRead"

#----------------------------------------------
def processingStepsFromStallMonitorOutput(f,moduleNames):
def processingStepsFromStallMonitorOutput(f,moduleNames, esModuleNames):
for rawl in f:
l = rawl.strip()
if not l or l[0] == '#':
Expand Down Expand Up @@ -113,6 +113,19 @@ def processingStepsFromStallMonitorOutput(f,moduleNames):
isEvent = (int(payload[2]) == 0)
name = moduleNames[moduleID]

# 'q' = end of esmodule prefetching
# 'N' = begin of esmodule processing
# 'n' = end of esmodule processing
if step == 'q' or step == 'N' or step == 'n':
trans = kStarted
if step == 'q':
trans = kPrefetchEnd
elif step == 'n':
trans = kFinished
if step == 'n' or step == 'N':
isEvent = (int(payload[2]) == 0)
name = esModuleNames[moduleID]

# 'A' = begin of module acquire function
# 'a' = end of module acquire function
elif step == 'A' or step == 'a':
Expand Down Expand Up @@ -140,6 +153,7 @@ def __init__(self,f):
numStreams = 0
numStreamsFromSource = 0
moduleNames = {}
esModuleNames = {}
for rawl in f:
l = rawl.strip()
if l and l[0] == 'M':
Expand All @@ -156,22 +170,30 @@ def __init__(self,f):
(id,name)=tuple(l[2:].split())
moduleNames[id] = name
continue
if len(l) > 5 and l[0:2] == "#N":
(id,name)=tuple(l[2:].split())
esModuleNames[id] = name
continue

self._f = f
if numStreams == 0:
numStreams = numStreamsFromSource +1
self.numStreams =numStreams
self._moduleNames = moduleNames
self._esModuleNames = esModuleNames
self.maxNameSize =0
for n in moduleNames.items():
self.maxNameSize = max(self.maxNameSize,len(n))
for n in esModuleNames.items():
self.maxNameSize = max(self.maxNameSize,len(n))
self.maxNameSize = max(self.maxNameSize,len(kSourceDelayedRead))

def processingSteps(self):
"""Create a generator which can step through the file and return each processing step.
Using a generator reduces the memory overhead when parsing a large file.
"""
self._f.seek(0)
return processingStepsFromStallMonitorOutput(self._f,self._moduleNames)
return processingStepsFromStallMonitorOutput(self._f,self._moduleNames, self._esModuleNames)

#----------------------------------------------
# Utility to get time out of Tracer output text format
Expand Down Expand Up @@ -574,11 +596,30 @@ def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, he
allStackTimes[color].extend(theTS*(nthreads-threadOffset))

#----------------------------------------------
# The same ES module can have multiple Proxies running concurrently
# so we need to reference count the names of the active modules
class RefCountSet(set):
def __init__(self):
super().__init__()
self.__itemsAndCount = dict()
def add(self, item):
v = self.__itemsAndCount.setdefault(item,0)
self.__itemsAndCount[item]=v+1
return super().add(item)
def remove(self, item):
v = self.__itemsAndCount[item]
if v == 1:
del self.__itemsAndCount[item]
super().remove(item)
else:
self.__itemsAndCount[item]=v-1


def createPDFImage(pdfFile, shownStacks, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder, setXAxis, xLower, xUpper):

stalledModuleNames = set([x for x in iter(stalledModuleInfo)])
streamLowestRow = [[] for x in range(numStreams)]
modulesActiveOnStreams = [set() for x in range(numStreams)]
modulesActiveOnStreams = [RefCountSet() for x in range(numStreams)]
acquireActiveOnStreams = [set() for x in range(numStreams)]
externalWorkOnStreams = [set() for x in range(numStreams)]
previousFinishTime = [None for x in range(numStreams)]
Expand Down
22 changes: 21 additions & 1 deletion FWCore/Concurrency/test/streamGrapher_stallMonitor_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,29 @@

process.p = cms.Path(process.strt, process.t)

#exercise ES monitoring
process.testESSource = cms.ESSource("TestESConcurrentSource",
firstValidLumis = cms.vuint32(1, 4, 6, 7, 8, 9),
iterations = cms.uint32(10*1000*1000),
checkIOVInitialization = cms.bool(True),
expectedNumberOfConcurrentIOVs = cms.uint32(1)
)
process.concurrentIOVESProducer = cms.ESProducer("ConcurrentIOVESProducer")
process.test = cms.EDAnalyzer("ConcurrentIOVAnalyzer",
checkExpectedValues = cms.untracked.bool(False)
)
process.testOther = cms.EDAnalyzer("ConcurrentIOVAnalyzer",
checkExpectedValues = cms.untracked.bool(False),
fromSource = cms.untracked.ESInputTag(":other")
)
process.busy1 = cms.EDProducer("BusyWaitIntProducer",ivalue = cms.int32(1), iterations = cms.uint32(10*1000*1000))
process.p1 = cms.Path(process.busy1 * process.test * process.testOther)

process.options = dict( numberOfStreams = 4,
numberOfThreads = 5,
numberOfConcurrentLuminosityBlocks = 1)
numberOfConcurrentLuminosityBlocks = 1,
numberOfConcurrentRuns = 1
)

process.add_(cms.Service("Tracer", printTimestamps = cms.untracked.bool(True)))
process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log")))
4 changes: 4 additions & 0 deletions FWCore/Framework/src/EventSetupProvider.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "FWCore/Framework/interface/EventSetupsController.h"
#include "FWCore/Framework/interface/NumberOfConcurrentIOVs.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/Utilities/interface/Algorithms.h"
#include "FWCore/Utilities/interface/Exception.h"
Expand Down Expand Up @@ -89,6 +90,9 @@ namespace edm {
void EventSetupProvider::add(std::shared_ptr<DataProxyProvider> iProvider) {
assert(iProvider.get() != nullptr);
dataProviders_->push_back(iProvider);
if (activityRegistry_) {
activityRegistry_->postESModuleRegistrationSignal_(iProvider->description());
}
}

void EventSetupProvider::replaceExisting(std::shared_ptr<DataProxyProvider> dataProxyProvider) {
Expand Down
8 changes: 8 additions & 0 deletions FWCore/ServiceRegistry/interface/ActivityRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,14 @@ namespace edm {
}
AR_WATCH_USING_METHOD_1(watchPreSourceEarlyTermination)

/// signal is emitted after the ESModule is registered with EventSetupProvider
using PostESModuleRegistration = signalslot::Signal<void(eventsetup::ComponentDescription const&)>;
PostESModuleRegistration postESModuleRegistrationSignal_;
void watchPostESModuleRegistration(PostESModuleRegistration::slot_type const& iSlot) {
postESModuleRegistrationSignal_.connect(iSlot);
}
AR_WATCH_USING_METHOD_1(watchPostESModuleRegistration)

/// signal is emitted before the esmodule starts processing and before prefetching has started
typedef signalslot::Signal<void(eventsetup::EventSetupRecordKey const&, ESModuleCallingContext const&)>
PreESModulePrefetching;
Expand Down
4 changes: 4 additions & 0 deletions FWCore/ServiceRegistry/src/ActivityRegistry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ namespace edm {
preESModuleSignal_.connect(std::cref(iOther.preESModuleSignal_));
postESModuleSignal_.connect(std::cref(iOther.postESModuleSignal_));

postESModuleRegistrationSignal_.connect(std::cref(iOther.postESModuleRegistrationSignal_));

//preModuleSignal_.connect(std::cref(iOther.preModuleSignal_));
//postModuleSignal_.connect(std::cref(iOther.postModuleSignal_));

Expand Down Expand Up @@ -500,6 +502,8 @@ namespace edm {

copySlotsToFrom(preESModuleSignal_, iOther.preESModuleSignal_);
copySlotsToFromReverse(postESModuleSignal_, iOther.postESModuleSignal_);

copySlotsToFromReverse(postESModuleRegistrationSignal_, iOther.postESModuleRegistrationSignal_);
/*
copySlotsToFrom(preModuleSignal_, iOther.preModuleSignal_);
copySlotsToFromReverse(postModuleSignal_, iOther.postModuleSignal_);
Expand Down
102 changes: 81 additions & 21 deletions FWCore/Services/plugins/StallMonitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "DataFormats/Provenance/interface/ModuleDescription.h"
#include "FWCore/Concurrency/interface/ThreadSafeOutputFileStream.h"
#include "FWCore/Framework/interface/ComponentDescription.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
Expand All @@ -20,6 +21,7 @@
#include "FWCore/ServiceRegistry/interface/StreamContext.h"
#include "FWCore/ServiceRegistry/interface/GlobalContext.h"
#include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
#include "FWCore/ServiceRegistry/interface/ESModuleCallingContext.h"
#include "FWCore/ServiceRegistry/interface/SystemBounds.h"
#include "FWCore/Utilities/interface/Algorithms.h"
#include "FWCore/Utilities/interface/OStreamColumn.h"
Expand All @@ -43,6 +45,8 @@ namespace {

inline auto module_id(edm::ModuleCallingContext const& mcc) { return mcc.moduleDescription()->id(); }

inline auto module_id(edm::ESModuleCallingContext const& mcc) { return mcc.componentDescription()->id_; }

//===============================================================
class StallStatistics {
public:
Expand Down Expand Up @@ -102,7 +106,10 @@ namespace {
preEventReadFromSource = 'R',
postEventReadFromSource = 'r',
postModuleEvent = 'm',
postEvent = 'e'
postEvent = 'e',
postESModulePrefetching = 'q',
preESModule = 'N',
postESModule = 'n'
};

enum class Phase : short {
Expand All @@ -114,7 +121,8 @@ namespace {
streamBeginLumi = 1,
globalBeginLumi = 2,
streamBeginRun = 3,
globalBeginRun = 4
globalBeginRun = 4,
eventSetupCall = 5
};

std::ostream& operator<<(std::ostream& os, step const s) {
Expand Down Expand Up @@ -234,6 +242,7 @@ namespace edm {

std::vector<std::string> moduleLabels_{};
std::vector<StallStatistics> moduleStats_{};
std::vector<std::string> esModuleLabels_{};
unsigned int numStreams_;
};

Expand Down Expand Up @@ -298,13 +307,38 @@ StallMonitor::StallMonitor(ParameterSet const& iPS, ActivityRegistry& iRegistry)
iRegistry.watchPreModuleWriteLumi(this, &StallMonitor::preModuleGlobalTransition);
iRegistry.watchPostModuleWriteLumi(this, &StallMonitor::postModuleGlobalTransition);

iRegistry.postESModuleRegistrationSignal_.connect([this](auto const& iDescription) {
if (esModuleLabels_.size() <= iDescription.id_) {
esModuleLabels_.resize(iDescription.id_ + 1);
}
if (not iDescription.label_.empty()) {
esModuleLabels_[iDescription.id_] = iDescription.label_;
} else {
esModuleLabels_[iDescription.id_] = iDescription.type_;
}
});

iRegistry.preESModuleSignal_.connect([this](auto const&, auto const& context) {
auto const t = duration_cast<duration_t>(now() - beginTime_).count();
auto msg = assembleMessage<step::preESModule>(
numStreams_, module_id(context), std::underlying_type_t<Phase>(Phase::eventSetupCall), t);
file_.write(std::move(msg));
});
iRegistry.postESModuleSignal_.connect([this](auto const&, auto const& context) {
auto const t = duration_cast<duration_t>(now() - beginTime_).count();
auto msg = assembleMessage<step::postESModule>(
numStreams_, module_id(context), std::underlying_type_t<Phase>(Phase::eventSetupCall), t);
file_.write(std::move(msg));
});

iRegistry.preallocateSignal_.connect(
[this](service::SystemBounds const& iBounds) { numStreams_ = iBounds.maxNumberOfStreams(); });

std::ostringstream oss;
oss << "# Transition Symbol\n";
oss << "#----------------- ------\n";
oss << "# globalBeginRun " << Phase::globalBeginRun << "\n"
oss << "# eventSetupCall " << Phase::eventSetupCall << "\n"
<< "# globalBeginRun " << Phase::globalBeginRun << "\n"
<< "# streamBeginRun " << Phase::streamBeginRun << "\n"
<< "# globalBeginLumi " << Phase::globalBeginLumi << "\n"
<< "# streamBeginLumi " << Phase::streamBeginLumi << "\n"
Expand Down Expand Up @@ -334,7 +368,13 @@ StallMonitor::StallMonitor(ParameterSet const& iPS, ActivityRegistry& iRegistry)
<< "# postModuleTransition " << step::postModuleEvent
<< " <Stream ID> <Module ID> <Transition type> <Time since beginJob (ms)>\n"
<< "# postEvent " << step::postEvent
<< " <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n";
<< " <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n"
<< "# postESModulePrefetching " << step::postESModulePrefetching
<< " <Stream ID> <ESModule ID> <Transition type> <Time since beginJob (ms)>\n"
<< "# preESModuleTransition " << step::preESModule
<< " <StreamID> <ESModule ID> <TransitionType> <Time since beginJob (ms)>\n"
<< "# postESModuleTransition " << step::postESModule
<< " <StreamID> <ESModule ID> <TransitionType> <Time since beginJob (ms)>\n";
file_.write(oss.str());
}
}
Expand Down Expand Up @@ -394,27 +434,47 @@ void StallMonitor::postBeginJob() {
}

if (validFile_) {
std::size_t const width{std::to_string(moduleLabels_.size()).size()};

OStreamColumn col0{"Module ID", width};
std::string const lastCol{"Module label"};

std::ostringstream oss;
oss << "\n# " << col0 << space << lastCol << '\n';
oss << "# " << std::string(col0.width() + space.size() + lastCol.size(), '-') << '\n';

for (std::size_t i{}; i < moduleLabels_.size(); ++i) {
auto const& label = moduleLabels_[i];
if (label.empty())
continue; // See comment in filling of moduleLabels_;
oss << "#M " << std::setw(width) << std::left << col0(i) << space << std::left << moduleLabels_[i] << '\n';
{
std::size_t const width{std::to_string(moduleLabels_.size()).size()};
OStreamColumn col0{"Module ID", width};
std::string const lastCol{"Module label"};

std::ostringstream oss;
oss << "\n# " << col0 << space << lastCol << '\n';
oss << "# " << std::string(col0.width() + space.size() + lastCol.size(), '-') << '\n';

for (std::size_t i{}; i < moduleLabels_.size(); ++i) {
auto const& label = moduleLabels_[i];
if (label.empty())
continue; // See comment in filling of moduleLabels_;
oss << "#M " << std::setw(width) << std::left << col0(i) << space << std::left << moduleLabels_[i] << '\n';
}
oss << '\n';
file_.write(oss.str());
}
{
std::size_t const width{std::to_string(esModuleLabels_.size()).size()};
OStreamColumn col0{"ESModule ID", width};
std::string const lastCol{"ESModule label"};

std::ostringstream oss;
oss << "\n# " << col0 << space << lastCol << '\n';
oss << "# " << std::string(col0.width() + space.size() + lastCol.size(), '-') << '\n';

for (std::size_t i{}; i < esModuleLabels_.size(); ++i) {
auto const& label = esModuleLabels_[i];
if (label.empty())
continue; // See comment in filling of moduleLabels_;
oss << "#N " << std::setw(width) << std::left << col0(i) << space << std::left << esModuleLabels_[i] << '\n';
}
oss << '\n';
file_.write(oss.str());
}
oss << '\n';
file_.write(oss.str());
}
// Don't need the labels anymore--info. is now part of the
// module-statistics objects.
moduleLabels_.clear();
decltype(moduleLabels_)().swap(moduleLabels_);
decltype(esModuleLabels_)().swap(esModuleLabels_);

beginTime_ = now();
}
Expand Down