From e5339605cf09fd4a13a030c8f0fe2680544a6059 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Wed, 12 Mar 2025 13:51:06 -0500 Subject: [PATCH 1/2] Test provenance of dropped products Only provenance of dropped ancestors are stored in ROOT files. Using untracked parameter for InputTag keeps the ProcessHistory the same so there is only one Run in the workflow. --- .../Framework/interface/OccurrenceForOutput.h | 1 + FWCore/Framework/src/OccurrenceForOutput.cc | 4 ++ .../Framework/test/stubs/ToyIntProducers.cc | 10 +++++ FWCore/Integration/test/BuildFile.xml | 1 + .../Integration/test/provenance_check_cfg.py | 13 ++++++ .../Integration/test/provenance_prod_cfg.py | 41 +++++++++++++++++++ FWCore/Integration/test/provenance_test.sh | 12 ++++++ FWCore/Modules/src/AsciiOutputModule.cc | 29 ++++++++++++- 8 files changed, 110 insertions(+), 1 deletion(-) create mode 100644 FWCore/Integration/test/provenance_check_cfg.py create mode 100644 FWCore/Integration/test/provenance_prod_cfg.py create mode 100755 FWCore/Integration/test/provenance_test.sh diff --git a/FWCore/Framework/interface/OccurrenceForOutput.h b/FWCore/Framework/interface/OccurrenceForOutput.h index aa2065ca85710..ab136fcf731cd 100644 --- a/FWCore/Framework/interface/OccurrenceForOutput.h +++ b/FWCore/Framework/interface/OccurrenceForOutput.h @@ -64,6 +64,7 @@ namespace edm { Handle getHandle(EDGetTokenT token) const; Provenance getProvenance(BranchID const& theID) const; + StableProvenance const& getStableProvenance(BranchID const& ithID) const; void getAllProvenance(std::vector& provenances) const; diff --git a/FWCore/Framework/src/OccurrenceForOutput.cc b/FWCore/Framework/src/OccurrenceForOutput.cc index 46453619c6150..e36f9770b771a 100644 --- a/FWCore/Framework/src/OccurrenceForOutput.cc +++ b/FWCore/Framework/src/OccurrenceForOutput.cc @@ -29,6 +29,10 @@ namespace edm { return provRecorder_.principal().getProvenance(bid); } + StableProvenance const& OccurrenceForOutput::getStableProvenance(BranchID const& bid) const { + return provRecorder_.principal().getStableProvenance(bid); + } + void OccurrenceForOutput::getAllProvenance(std::vector& provenances) const { provRecorder_.principal().getAllProvenance(provenances); } diff --git a/FWCore/Framework/test/stubs/ToyIntProducers.cc b/FWCore/Framework/test/stubs/ToyIntProducers.cc index 4e1433e25d80e..bcad657d3765a 100644 --- a/FWCore/Framework/test/stubs/ToyIntProducers.cc +++ b/FWCore/Framework/test/stubs/ToyIntProducers.cc @@ -458,6 +458,12 @@ namespace edmtest { for (auto const& label : labels) { tokens_.emplace_back(consumes(label)); } + { + auto const& labels = p.getUntrackedParameter>("untrackedLabels"); + for (auto const& label : labels) { + tokens_.emplace_back(consumes(label)); + } + } } void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override; @@ -465,6 +471,10 @@ namespace edmtest { edm::ParameterSetDescription desc; desc.addUntracked("onlyGetOnEvent", 0u); desc.add>("labels"); + desc.addUntracked>("untrackedLabels", {}) + ->setComment( + "Using this can change the stored ProductRegistry for the same ProcessHistory if this is the only module " + "that depends on these labels."); descriptions.addDefault(desc); } diff --git a/FWCore/Integration/test/BuildFile.xml b/FWCore/Integration/test/BuildFile.xml index 7abb1a2c2d89a..008dd1ca98312 100644 --- a/FWCore/Integration/test/BuildFile.xml +++ b/FWCore/Integration/test/BuildFile.xml @@ -43,6 +43,7 @@ + diff --git a/FWCore/Integration/test/provenance_check_cfg.py b/FWCore/Integration/test/provenance_check_cfg.py new file mode 100644 index 0000000000000..d1d316d4c309c --- /dev/null +++ b/FWCore/Integration/test/provenance_check_cfg.py @@ -0,0 +1,13 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("READ") + +from IOPool.Input.modules import PoolSource + +process.source = PoolSource(fileNames = ["file:prov.root", "file:prov_extra.root"]) + +from FWCore.Modules.modules import ProvenanceCheckerOutputModule, AsciiOutputModule +process.out = ProvenanceCheckerOutputModule() +process.prnt = AsciiOutputModule(verbosity = 2, allProvenance=True) + +process.e = cms.EndPath(process.out+process.prnt) diff --git a/FWCore/Integration/test/provenance_prod_cfg.py b/FWCore/Integration/test/provenance_prod_cfg.py new file mode 100644 index 0000000000000..e1cd6390317a5 --- /dev/null +++ b/FWCore/Integration/test/provenance_prod_cfg.py @@ -0,0 +1,41 @@ +import FWCore.ParameterSet.Config as cms +from argparse import ArgumentParser + +parser = ArgumentParser(description='Write streamer output file for provenance read test') +parser.add_argument("--consumeProd2", help="add an extra producer to the job and drop on output", action="store_true") +args = parser.parse_args() + + +process = cms.Process("OUTPUT") + +from FWCore.Modules.modules import EmptySource + +runNumber = 1 +eventNumber = 1 +if args.consumeProd2: + eventNumber = 2 + +process.source = EmptySource(firstRun = runNumber, firstEvent = eventNumber ) + +from FWCore.Framework.modules import AddIntsProducer, IntProducer + +process.one = IntProducer(ivalue=1) +process.two = IntProducer(ivalue=2) +process.sum = AddIntsProducer(labels=['one']) +process.t = cms.Task(process.one, process.two, process.sum) + +baseOutFileName = "prov" +if args.consumeProd2 : + process.sum.untrackedLabels = ['two'] + baseOutFileName += "_extra" + + +from IOPool.Output.modules import PoolOutputModule + +process.out = PoolOutputModule(fileName = baseOutFileName+".root", + outputCommands = ["drop *", "keep *_sum_*_*"]) + +from FWCore.Modules.modules import AsciiOutputModule +process.prnt = AsciiOutputModule(verbosity = 2, allProvenance = True) +process.e = cms.EndPath(process.out+process.prnt, process.t) +process.maxEvents.input = 1 diff --git a/FWCore/Integration/test/provenance_test.sh b/FWCore/Integration/test/provenance_test.sh new file mode 100755 index 0000000000000..5b506006ff4ba --- /dev/null +++ b/FWCore/Integration/test/provenance_test.sh @@ -0,0 +1,12 @@ +#!/bin/sh + + +function die { echo $1: status $2 ; exit $2; } + +#The two jobs will have different ProductRegistries in their output files but have the same ProcessHistory. +# The ProductRegistry just differ because the internal dependencies between the data products is different +# and PoolOutputModule only stores provenance of 'dropped' data products IFF they are parents of a kept product. +# The check makes sure the provenance in the ProductRegistry is properly updated when the new file is read +cmsRun ${SCRAM_TEST_PATH}/provenance_prod_cfg.py || die 'Failed in provenance_prod_cfg.py' $? +cmsRun ${SCRAM_TEST_PATH}/provenance_prod_cfg.py --consumeProd2 || die 'Failed in provenance_prod_cfg.py --consumeProd2' $? +cmsRun ${SCRAM_TEST_PATH}/provenance_check_cfg.py || die 'Failed test of provenance' $? \ No newline at end of file diff --git a/FWCore/Modules/src/AsciiOutputModule.cc b/FWCore/Modules/src/AsciiOutputModule.cc index e9d9fed462149..19897c68b0b5d 100644 --- a/FWCore/Modules/src/AsciiOutputModule.cc +++ b/FWCore/Modules/src/AsciiOutputModule.cc @@ -34,6 +34,7 @@ namespace edm { int prescale_; int verbosity_; int counter_; + bool allProvenance_; }; AsciiOutputModule::AsciiOutputModule(ParameterSet const& pset) @@ -41,7 +42,8 @@ namespace edm { global::OutputModule<>(pset), prescale_(pset.getUntrackedParameter("prescale")), verbosity_(pset.getUntrackedParameter("verbosity")), - counter_(0) { + counter_(0), + allProvenance_(pset.getUntrackedParameter("allProvenance")) { if (prescale_ == 0) prescale_ = 1; } @@ -81,6 +83,29 @@ namespace edm { auto const& prov = e.getProvenance(desc.originalBranchID()); LogAbsolute("AsciiOut") << prov; + if (verbosity_ > 2) { + ProductDescription const& desc2 = prov.productDescription(); + std::string const& process = desc2.processName(); + std::string const& label = desc2.moduleLabel(); + ProcessHistory const& processHistory = e.processHistory(); + + for (ProcessConfiguration const& pc : processHistory) { + if (pc.processName() == process) { + ParameterSetID const& psetID = pc.parameterSetID(); + pset::Registry const* psetRegistry = pset::Registry::instance(); + ParameterSet const* processPset = psetRegistry->getMapped(psetID); + if (processPset) { + if (desc.isAlias()) { + LogAbsolute("AsciiOut") << "Alias PSet\n" << processPset->getParameterSet(desc.moduleLabel()); + } + LogAbsolute("AsciiOut") << processPset->getParameterSet(label) << "\n"; + } + } + } + } + } else if (allProvenance_) { + auto const& prov = e.getStableProvenance(desc.originalBranchID()); + LogAbsolute("AsciiOut") << prov; if (verbosity_ > 2) { ProductDescription const& desc2 = prov.productDescription(); std::string const& process = desc2.processName(); @@ -115,6 +140,8 @@ namespace edm { "1: event ID and timestamp only\n" "2: provenance for each kept product\n" ">2: PSet and provenance for each kept product"); + desc.addUntracked("allProvenance", false) + ->setComment("when printing provenance info, also print stable provenance of non-kept data products."); OutputModule::fillDescription(desc); descriptions.add("asciiOutput", desc); } From af6456ca1edfaf8ff61fae97f2a162a346d19a39 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Wed, 12 Mar 2025 14:05:53 -0500 Subject: [PATCH 2/2] Make new main ProductRegistry when input updates If the ProductRegistry of the source changes, we now create a new main ProductRegistry. The main Registry is then passed to the Principals which update their data structures. --- FWCore/Framework/interface/Principal.h | 5 +++- FWCore/Framework/interface/PrincipalCache.h | 2 +- FWCore/Framework/src/EventProcessor.cc | 27 ++++++++++++------- FWCore/Framework/src/Principal.cc | 8 +++++- FWCore/Framework/src/PrincipalCache.cc | 8 +++--- .../TestProcessor/src/TestSourceProcessor.cc | 2 +- 6 files changed, 35 insertions(+), 17 deletions(-) diff --git a/FWCore/Framework/interface/Principal.h b/FWCore/Framework/interface/Principal.h index 954526fea7515..bf8a9bc95dbe1 100644 --- a/FWCore/Framework/interface/Principal.h +++ b/FWCore/Framework/interface/Principal.h @@ -74,7 +74,8 @@ namespace edm { ~Principal() override; - void adjustIndexesAfterProductRegistryAddition(); + //This should only be called when this Principal is not being actively used + void possiblyUpdateAfterAddition(std::shared_ptr); void fillPrincipal(DelayedReader* reader); void fillPrincipal(ProcessHistoryID const& hist, ProcessHistory const* phr, DelayedReader* reader); @@ -213,6 +214,8 @@ namespace edm { } private: + void adjustIndexesAfterProductRegistryAddition(); + //called by adjustIndexesAfterProductRegistryAddition only if an index actually changed virtual void changedIndexes_() {} diff --git a/FWCore/Framework/interface/PrincipalCache.h b/FWCore/Framework/interface/PrincipalCache.h index de66679f581bf..06f1ad7908d94 100644 --- a/FWCore/Framework/interface/PrincipalCache.h +++ b/FWCore/Framework/interface/PrincipalCache.h @@ -53,7 +53,7 @@ namespace edm { void adjustEventsToNewProductRegistry(std::shared_ptr); - void adjustIndexesAfterProductRegistryAddition(); + void adjustIndexesAfterProductRegistryAddition(std::shared_ptr); private: std::unique_ptr processBlockPrincipal_; diff --git a/FWCore/Framework/src/EventProcessor.cc b/FWCore/Framework/src/EventProcessor.cc index 479047f8a2d9b..caed4bfd4ce1c 100644 --- a/FWCore/Framework/src/EventProcessor.cc +++ b/FWCore/Framework/src/EventProcessor.cc @@ -1023,20 +1023,27 @@ namespace edm { SendSourceTerminationSignalIfException sentry(actReg_.get()); if (streamRunActive_ > 0) { + //deals with data structures that allows merged Run products to be split on Lumi boundaries then + // in later processes reintegrated. streamRunStatus_[0]->runPrincipal()->preReadFile(); - streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition(); - } - - if (streamLumiActive_ > 0) { - streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition(); } + auto sizeBefore = input_->productRegistry().size(); fb_ = input_->readFile(); //incase the input's registry changed - const size_t size = preg_->size(); - preg_->merge(input_->productRegistry(), fb_ ? fb_->fileName() : std::string()); - if (size < preg_->size()) { - principalCache_.adjustIndexesAfterProductRegistryAddition(); + if (input_->productRegistry().size() != sizeBefore) { + auto temp = std::make_shared(*preg_); + temp->merge(input_->productRegistry(), fb_ ? fb_->fileName() : std::string()); + preg_ = std::move(temp); + //This handles are presently unused Run/Lumis + principalCache_.adjustIndexesAfterProductRegistryAddition(edm::get_underlying_safe(preg_)); + if (streamLumiActive_ > 0) { + //Can update the active ones now, even before an `end` transition is called because no OutputModule + // supports storing ProductDescriptions for Run/LuminosityBlock products which were dropped. Since only + // dropped products can change the ProductRegistry, only changes in Event can cause that. + streamRunStatus_[0]->runPrincipal()->possiblyUpdateAfterAddition(edm::get_underlying_safe(preg_)); + streamLumiStatus_[0]->lumiPrincipal()->possiblyUpdateAfterAddition(edm::get_underlying_safe(preg_)); + } } principalCache_.adjustEventsToNewProductRegistry(preg()); if (preallocations_.numberOfStreams() > 1 and preallocations_.numberOfThreads() > 1) { @@ -2022,6 +2029,7 @@ namespace edm { std::shared_ptr EventProcessor::readRun() { auto rp = principalCache_.getAvailableRunPrincipalPtr(); + rp->possiblyUpdateAfterAddition(preg()); assert(rp); rp->setAux(*input_->runAuxiliary()); { @@ -2046,6 +2054,7 @@ namespace edm { std::shared_ptr EventProcessor::readLuminosityBlock(std::shared_ptr rp) { auto lbp = principalCache_.getAvailableLumiPrincipalPtr(); + lbp->possiblyUpdateAfterAddition(preg()); assert(lbp); lbp->setAux(*input_->luminosityBlockAuxiliary()); { diff --git a/FWCore/Framework/src/Principal.cc b/FWCore/Framework/src/Principal.cc index 14c431b7087d4..4154960351651 100644 --- a/FWCore/Framework/src/Principal.cc +++ b/FWCore/Framework/src/Principal.cc @@ -31,7 +31,6 @@ #include #include #include - namespace edm { static ProcessHistory const s_emptyProcessHistory; @@ -149,6 +148,13 @@ namespace edm { return size; } + void Principal::possiblyUpdateAfterAddition(std::shared_ptr iProd) { + if (iProd.get() != preg_.get()) { + preg_ = iProd; + adjustIndexesAfterProductRegistryAddition(); + } + } + void Principal::addDroppedProduct(ProductDescription const& bd) { addProductOrThrow(std::make_unique(std::make_shared(bd))); } diff --git a/FWCore/Framework/src/PrincipalCache.cc b/FWCore/Framework/src/PrincipalCache.cc index 52c8dee0d7e46..8d039d9d3e721 100644 --- a/FWCore/Framework/src/PrincipalCache.cc +++ b/FWCore/Framework/src/PrincipalCache.cc @@ -43,22 +43,22 @@ namespace edm { void PrincipalCache::adjustEventsToNewProductRegistry(std::shared_ptr reg) { for (auto& eventPrincipal : eventPrincipals_) { if (eventPrincipal) { - eventPrincipal->adjustIndexesAfterProductRegistryAddition(); + eventPrincipal->possiblyUpdateAfterAddition(reg); } } } - void PrincipalCache::adjustIndexesAfterProductRegistryAddition() { + void PrincipalCache::adjustIndexesAfterProductRegistryAddition(std::shared_ptr iReg) { //Need to temporarily hold all the runs to clear out the runHolder_ std::vector> tempRunPrincipals; while (auto p = runHolder_.tryToGet()) { - p->adjustIndexesAfterProductRegistryAddition(); + p->possiblyUpdateAfterAddition(iReg); tempRunPrincipals.emplace_back(std::move(p)); } //Need to temporarily hold all the lumis to clear out the lumiHolder_ std::vector> tempLumiPrincipals; while (auto p = lumiHolder_.tryToGet()) { - p->adjustIndexesAfterProductRegistryAddition(); + p->possiblyUpdateAfterAddition(iReg); tempLumiPrincipals.emplace_back(std::move(p)); } } diff --git a/FWCore/TestProcessor/src/TestSourceProcessor.cc b/FWCore/TestProcessor/src/TestSourceProcessor.cc index eb59d7ce1ff1a..386b77f2871dd 100644 --- a/FWCore/TestProcessor/src/TestSourceProcessor.cc +++ b/FWCore/TestProcessor/src/TestSourceProcessor.cc @@ -229,7 +229,7 @@ namespace edm::test { const size_t size = preg_->size(); preg_->merge(source_->productRegistry(), fb_ ? fb_->fileName() : std::string()); if (size < preg_->size()) { - principalCache_.adjustIndexesAfterProductRegistryAddition(); + principalCache_.adjustIndexesAfterProductRegistryAddition(preg_); } principalCache_.adjustEventsToNewProductRegistry(preg_);