Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 100 additions & 5 deletions PhysicsTools/NanoAOD/plugins/rntuple/NanoAODRNTupleOutputModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,28 @@ class NanoAODRNTupleOutputModule : public edm::one::OutputModule<> {
RunNTuple m_run;

std::vector<std::pair<std::string, edm::EDGetToken>> m_nanoMetadata;

std::vector<std::string> m_noSplitFields;
ROOT::RNTupleWriteOptions m_writeOptions;
};

namespace {
ROOT::RNTupleWriteOptions writeOptions(edm::ParameterSet const& iConfig) {
ROOT::RNTupleWriteOptions options;

options.SetApproxZippedClusterSize(iConfig.getUntrackedParameter<unsigned long long>("approxZippedClusterSize"));

options.SetMaxUnzippedClusterSize(iConfig.getUntrackedParameter<unsigned long long>("maxUnzippedClusterSize"));

options.SetInitialUnzippedPageSize(iConfig.getUntrackedParameter<unsigned long long>("initialUnzippedPageSize"));
options.SetMaxUnzippedPageSize(iConfig.getUntrackedParameter<unsigned long long>("maxUnzippedPageSize"));
options.SetPageBufferBudget(iConfig.getUntrackedParameter<unsigned long long>("pageBufferBudget"));
options.SetUseBufferedWrite(iConfig.getUntrackedParameter<bool>("useBufferedWrite"));
options.SetUseDirectIO(iConfig.getUntrackedParameter<bool>("useDirectIO"));
return options;
}
} // namespace

NanoAODRNTupleOutputModule::NanoAODRNTupleOutputModule(edm::ParameterSet const& pset)
: edm::one::OutputModuleBase::OutputModuleBase(pset),
edm::one::OutputModule<>(pset),
Expand All @@ -101,7 +121,9 @@ NanoAODRNTupleOutputModule::NanoAODRNTupleOutputModule(edm::ParameterSet const&
m_compressionAlgorithm(pset.getUntrackedParameter<std::string>("compressionAlgorithm")),
m_compressionLevel(pset.getUntrackedParameter<int>("compressionLevel")),
m_writeProvenance(pset.getUntrackedParameter<bool>("saveProvenance", true)),
m_processHistoryRegistry() {}
m_processHistoryRegistry(),
m_noSplitFields{pset.getUntrackedParameter<std::vector<std::string>>("noSplitFields")},
m_writeOptions(writeOptions(pset.getUntrackedParameterSet("rntupleWriteOptions"))) {}

NanoAODRNTupleOutputModule::~NanoAODRNTupleOutputModule() {}

Expand Down Expand Up @@ -159,6 +181,7 @@ void NanoAODRNTupleOutputModule::openFile(edm::FileBlock const&) {
<< "NanoAODOutputModule configured with unknown compression algorithm '" << m_compressionAlgorithm << "'\n"
<< "Allowed compression algorithms are ZLIB and LZMA\n";
}
m_writeOptions.SetCompression(m_file->GetCompressionSettings());

const auto& keeps = keptProducts();
for (const auto& keep : keeps[edm::InRun]) {
Expand All @@ -176,6 +199,35 @@ void NanoAODRNTupleOutputModule::openFile(edm::FileBlock const&) {
}
}

namespace {
void noSplitField(ROOT::RFieldBase& iField) {
auto const& typeName = iField.GetTypeName();
if (typeName == "std::uint16_t") {
iField.SetColumnRepresentatives({{ROOT::ENTupleColumnType::kUInt16}});
} else if (typeName == "std::uint32_t") {
iField.SetColumnRepresentatives({{ROOT::ENTupleColumnType::kUInt32}});
} else if (typeName == "std::uint64_t") {
iField.SetColumnRepresentatives({{ROOT::ENTupleColumnType::kUInt64}});
} else if (typeName == "std::int16_t") {
iField.SetColumnRepresentatives({{ROOT::ENTupleColumnType::kInt16}});
} else if (typeName == "std::int32_t") {
iField.SetColumnRepresentatives({{ROOT::ENTupleColumnType::kInt32}});
} else if (typeName == "std::int64_t") {
iField.SetColumnRepresentatives({{ROOT::ENTupleColumnType::kInt64}});
} else if (typeName == "float") {
iField.SetColumnRepresentatives({{ROOT::ENTupleColumnType::kReal32}});
} else if (typeName == "double") {
iField.SetColumnRepresentatives({{ROOT::ENTupleColumnType::kReal64}});
}
}
void applyNoSplitToSubFields(ROOT::RFieldBase& iField) {
for (auto& subfield : iField) {
noSplitField(subfield);
applyNoSplitToSubFields(subfield);
}
}
} // namespace

void NanoAODRNTupleOutputModule::initializeNTuple(edm::EventForOutput const& iEvent) {
// set up RNTuple schema
auto model = RNTupleModel::Create();
Expand Down Expand Up @@ -203,14 +255,24 @@ void NanoAODRNTupleOutputModule::initializeNTuple(edm::EventForOutput const& iEv
}
m_evstrings.createFields(*model);

if (m_noSplitFields.size() == 1 and m_noSplitFields[0] == "all") {
for (auto const& topName : model->GetFieldNames()) {
auto& field = model->GetMutableField(topName);
noSplitField(field);
applyNoSplitToSubFields(field);
}
} else {
for (auto const& name : m_noSplitFields) {
auto& field = model->GetMutableField(name);
noSplitField(field);
}
}

// Model needs to be frozen before we bind buffers
model->Freeze();

m_tables.bindBuffers(*model);

RNTupleWriteOptions options;
options.SetCompression(m_file->GetCompressionSettings());
m_ntuple = RNTupleWriter::Append(std::move(model), "Events", *m_file, options);
m_ntuple = RNTupleWriter::Append(std::move(model), "Events", *m_file, m_writeOptions);
}

void NanoAODRNTupleOutputModule::write(edm::EventForOutput const& iEvent) {
Expand Down Expand Up @@ -266,6 +328,39 @@ void NanoAODRNTupleOutputModule::fillDescriptions(edm::ConfigurationDescriptions
->setComment(
"Algorithm used to "
"compress data in the ROOT output file, allowed values are ZLIB and LZMA");
desc.addUntracked<std::vector<std::string>>("noSplitFields", {})
->setComment("Name of fields to avoid the standard ROOT split optimization.");
{
edm::ParameterSetDescription optimizations;

ROOT::RNTupleWriteOptions ops;
optimizations.addUntracked<unsigned long long>("approxZippedClusterSize", ops.GetApproxZippedClusterSize())
->setComment("Approximation of the target compressed cluster size");
optimizations.addUntracked<unsigned long long>("maxUnzippedClusterSize", ops.GetMaxUnzippedClusterSize())
->setComment("Memory limit for committing a cluster. High compression leads to high IO buffer size.");

optimizations.addUntracked<unsigned long long>("initialUnzippedPageSize", ops.GetInitialUnzippedPageSize())
->setComment("Initially, columns start with a page of this size (bytes).");
optimizations.addUntracked<unsigned long long>("maxUnzippedPageSize", ops.GetMaxUnzippedPageSize())
->setComment("Pages can grow only to the given limit (bytes).");
optimizations.addUntracked<unsigned long long>("pageBufferBudget", 0)
->setComment(
"The maximum size that the sum of all page buffers used for writing into a persistent sink are allowed "
"to use."
" If set to zero, RNTuple will auto-adjust the budget based on the value of 'approxZippedClusterSize'."
" If set manually, the size needs to be large enough to hold all initial page buffers.");

optimizations.addUntracked<bool>("useBufferedWrite", ops.GetUseBufferedWrite())
->setComment(
"Turn on use of buffered writing. This buffers compressed pages in memory, reorders them to keep pages "
"of the same column adjacent, and coalesces the writes when committing a cluster.");
optimizations.addUntracked<bool>("useDirectIO", ops.GetUseDirectIO())
->setComment(
"Set use of direct IO. this introduces alignment requirements that may vary between filesystems and "
"platforms");
desc.addUntracked("rntupleWriteOptions", optimizations)
->setComment("Options to control RNTuple specific output features.");
}
desc.addUntracked<bool>("saveProvenance", true)
->setComment("Save process provenance information, e.g. for edmProvDump");
const std::vector<std::string> keep = {"drop *",
Expand Down