Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions EventFilter/Utilities/interface/EvFDaqDirector.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,12 @@ namespace evf {
bool requireHeader,
bool retry,
bool closeFile);

uint16_t frdFileDataType(const void* buf) const;

int grabNextJsonFromRaw(std::string const& rawSourcePath,
int& rawFd,
uint16_t& rawDataType,
uint16_t& rawHeaderSize,
int64_t& fileSizeFromHeader,
bool& fileFound,
Expand All @@ -157,6 +161,7 @@ namespace evf {
unsigned int& ls,
std::string& nextFile,
int& rawFd,
uint16_t& rawDataType,
uint16_t& rawHeaderSize,
int32_t& serverEventsInNewFile_,
int64_t& fileSize,
Expand Down
15 changes: 14 additions & 1 deletion EventFilter/Utilities/interface/SourceRawFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class InputFile {
std::vector<uint64_t> bufferEnds_;
std::vector<uint64_t> fileSizes_;
std::vector<unsigned int> fileOrder_;
std::vector<uint16_t> fileDataType_;
bool deleteFile_;
int rawFd_;
uint64_t fileSize_;
Expand All @@ -115,6 +116,7 @@ class InputFile {
std::string const& name = std::string(),
bool deleteFile = true,
int rawFd = -1,
uint16_t rawDataType = 0,
uint64_t fileSize = 0,
uint16_t rawHeaderSize = 0,
uint16_t nChunks = 0,
Expand All @@ -134,6 +136,7 @@ class InputFile {
nProcessed_(0) {
fileNames_.push_back(name);
fileOrder_.push_back(fileOrder_.size());
fileDataType_.push_back(rawDataType);
diskFileSizes_.push_back(fileSize);
fileSizes_.push_back(0);
bufferOffsets_.push_back(0);
Expand All @@ -159,6 +162,7 @@ class InputFile {
numFiles_++;
fileNames_.push_back(name);
fileOrder_.push_back(fileOrder_.size());
fileDataType_.push_back(0);
diskFileSizes_.push_back(size);
fileSizes_.push_back(0);
bufferOffsets_.push_back(prevOffset + prevSize);
Expand Down Expand Up @@ -205,6 +209,13 @@ class InputFile {
throw cms::Exception("InputFile") << "buffers are inconsistent for input files with primary " << fileName_;
return complete > 0;
}
void setFileDataType(unsigned int j, uint16_t dataType) { fileDataType_[j] = dataType; }
int daqRunEndFlagIndex() const {
for (unsigned j = 0; j < fileDataType_.size(); j++)
if (fileDataType_[j] == 0xffff)
return (int)j;
return -1;
}
};

class DAQSource;
Expand All @@ -216,12 +227,14 @@ class RawInputFile : public InputFile {
std::string const& name = std::string(),
bool deleteFile = true,
int rawFd = -1,
uint16_t rawDataType = 0,
uint64_t fileSize = 0,
uint16_t rawHeaderSize = 0,
uint32_t nChunks = 0,
int nEvents = 0,
DAQSource* parent = nullptr)
: InputFile(status, lumi, name, deleteFile, rawFd, fileSize, rawHeaderSize, nChunks, nEvents, nullptr),
: InputFile(
status, lumi, name, deleteFile, rawFd, rawDataType, fileSize, rawHeaderSize, nChunks, nEvents, nullptr),
sourceParent_(parent) {}
bool advance(std::mutex& m, std::condition_variable& cv, unsigned char*& dataPosition, const size_t size);
void advance(const size_t size) {
Expand Down
5 changes: 4 additions & 1 deletion EventFilter/Utilities/plugins/RawEventFileWriterForBU.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ using namespace edm::streamer;
RawEventFileWriterForBU::RawEventFileWriterForBU(edm::ParameterSet const& ps)
: microSleep_(ps.getParameter<int>("microSleep")),
frdFileVersion_(ps.getParameter<unsigned int>("frdFileVersion")),
dataType_(ps.getUntrackedParameter<unsigned int>("dataType")),
writeEoR_(ps.getUntrackedParameter<bool>("writeEoR")),
writeToOpen_(ps.getUntrackedParameter<bool>("writeToOpen")) {
if (edm::Service<evf::FastMonitoringService>().isAvailable())
Expand Down Expand Up @@ -247,7 +248,8 @@ void RawEventFileWriterForBU::finishFileWrite(unsigned int ls) {
<< " and size " << perFileSize_.value();
} else if (frdFileVersion_ == 2) {
lseek(outfd_, 0, SEEK_SET);
FRDFileHeader_v2 frdFileHeader(0, perFileEventCount_.value(), (uint32_t)run_, (uint32_t)ls, perFileSize_.value());
FRDFileHeader_v2 frdFileHeader(
(uint16_t)(dataType_ & 0xffff), perFileEventCount_.value(), (uint32_t)run_, (uint32_t)ls, perFileSize_.value());
write(outfd_, (char*)&frdFileHeader, sizeof(FRDFileHeader_v2));
closefd();
//move raw file from open to run directory
Expand Down Expand Up @@ -342,5 +344,6 @@ void RawEventFileWriterForBU::stop() {
void RawEventFileWriterForBU::extendDescription(edm::ParameterSetDescription& desc) {
desc.add<int>("microSleep", 0);
desc.add<unsigned int>("frdFileVersion", 0);
desc.addUntracked<unsigned int>("dataType", 0)->setComment("data typw field in FRD file header v2");
desc.addUntracked<bool>("writeEoR", true);
}
1 change: 1 addition & 0 deletions EventFilter/Utilities/plugins/RawEventFileWriterForBU.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class RawEventFileWriterForBU {

int microSleep_;
unsigned int frdFileVersion_;
unsigned int dataType_;
bool writeEoR_;
bool writeToOpen_;

Expand Down
34 changes: 29 additions & 5 deletions EventFilter/Utilities/src/DAQSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,21 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
heldFilesCount_--;
//release last chunk (it is never released elsewhere)
freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {

bool filesIncomplete = currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_);
bool retRunEnd = false;

int runEndFlagIndex = currentFile_->daqRunEndFlagIndex();
if (runEndFlagIndex != -1) {
if (filesIncomplete)
edm::LogError("DAQSource::getNextDataBlock")
<< "Detected DAQ Run End flag in RAW file " << currentFile_->fileNames_[runEndFlagIndex];
else
edm::LogError("DAQSource::getNextDataBlock")
<< "Detected DAQ Run End flag in RAW file " << currentFile_->fileNames_[runEndFlagIndex]
<< " but files appear to be complete";
retRunEnd = true;
} else if (filesIncomplete) {
std::stringstream str;
for (auto& s : currentFile_->fileNames_) {
struct stat bufs;
Expand Down Expand Up @@ -499,7 +513,10 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
currentFile_.reset();
}
setMonState(inProcessingFile);
return evf::EvFDaqDirector::noFile;
if (retRunEnd)
return evf::EvFDaqDirector::runEnded;
else
return evf::EvFDaqDirector::noFile;
}

//handle RAW file header in new file
Expand Down Expand Up @@ -794,6 +811,7 @@ void DAQSource::readSupervisor() {
uint32_t lsFromRaw = 0;
int32_t serverEventsInNewFile = -1;
int rawFd = -1;
uint16_t rawDataType = 0;

int backoff_exp = 0;

Expand Down Expand Up @@ -838,7 +856,6 @@ void DAQSource::readSupervisor() {
//return LS if LS not set, otherwise return file
status = getFile(ls, nextFile, thisLockWaitTimeUs);
if (status == evf::EvFDaqDirector::newFile) {
uint16_t rawDataType;
if (evf::EvFDaqDirector::parseFRDFileHeader(nextFile,
rawFd,
rawHeaderSize, ///possibility to use by new formats
Expand Down Expand Up @@ -866,6 +883,7 @@ void DAQSource::readSupervisor() {
ls,
nextFile,
rawFd,
rawDataType,
rawHeaderSize, //which format?
serverEventsInNewFile,
fileSizeFromMetadata,
Expand Down Expand Up @@ -1058,6 +1076,7 @@ void DAQSource::readSupervisor() {
rawFile,
!fileListMode_,
rawFd,
rawDataType,
fileSize,
rawHeaderSize, //for which format
0,
Expand Down Expand Up @@ -1347,6 +1366,7 @@ void DAQSource::readWorker(unsigned int tid) {

size_t skipped = bufferLeft;
auto start = std::chrono::high_resolution_clock::now();

for (unsigned int i = 0; i < readBlocks; i++) {
ssize_t last;
edm::LogInfo("DAQSource") << "readWorker read -: " << (int64_t)(chunk->usedSize_ - bufferLeft) << " or "
Expand Down Expand Up @@ -1408,8 +1428,7 @@ void DAQSource::readWorker(unsigned int tid) {
LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
<< " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
<< " GB/s)";
};
//END primary function
}; //END primary function

//SECONDARY files function
auto readSecondary = [&](uint64_t bufferLeft, unsigned int j) {
Expand Down Expand Up @@ -1446,6 +1465,11 @@ void DAQSource::readWorker(unsigned int tid) {
close(fileDescriptor);
break;
}
if (i == 0) {
uint16_t dataType = daqDirector_->frdFileDataType(chunk->buf_ + bufferLeft);
if (dataType)
file->setFileDataType(j, dataType);
}
if (last > 0) {
bufferLeft += last;
fileLen += last;
Expand Down
27 changes: 24 additions & 3 deletions EventFilter/Utilities/src/EvFDaqDirector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1226,8 +1226,22 @@ namespace evf {
return false;
}

uint16_t EvFDaqDirector::frdFileDataType(const void* buf) const {
//v2 is the largest possible read
const FRDFileHeader_v2* hdr = static_cast<const FRDFileHeader_v2*>(buf);

const FRDFileHeaderIdentifier* fileId = (const FRDFileHeaderIdentifier*)hdr;
uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);

if (frd_version == 2) {
return hdr->c_.dataType_;
}
return 0;
}

int EvFDaqDirector::grabNextJsonFromRaw(std::string const& rawSourcePath,
int& rawFd,
uint16_t& rawDataType,
uint16_t& rawHeaderSize,
int64_t& fileSizeFromHeader,
bool& fileFound,
Expand Down Expand Up @@ -1255,7 +1269,6 @@ namespace evf {
uint32_t lsFromRaw;
int32_t nbEventsWrittenRaw;
int64_t fileSizeFromRaw;
uint16_t rawDataType;
auto ret = parseFRDFileHeader(rawSourcePath,
rawFd,
rawHeaderSize,
Expand Down Expand Up @@ -2068,6 +2081,7 @@ namespace evf {
unsigned int& ls,
std::string& nextFileRaw,
int& rawFd,
uint16_t& rawDataType,
uint16_t& rawHeaderSize,
int32_t& serverEventsInNewFile,
int64_t& fileSizeFromMetadata,
Expand Down Expand Up @@ -2166,8 +2180,15 @@ namespace evf {
//error reading header, set to -1 and trigger error downstream
serverEventsInNewFile = -1;
} else if (rawHeader) {
serverEventsInNewFile = grabNextJsonFromRaw(
nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false, requireHeader);
serverEventsInNewFile = grabNextJsonFromRaw(nextFileRaw,
rawFd,
rawDataType,
rawHeaderSize,
fileSizeFromMetadata,
fileFound,
serverLS,
false,
requireHeader);
} else if (eventCounter) {
//there is no header: then try to use model to count events
serverEventsInNewFile = eventCounter(nextFileRaw, rawFd, fileSizeFromMetadata, serverLS, fileFound);
Expand Down
6 changes: 4 additions & 2 deletions EventFilter/Utilities/src/FedRawDataInputSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,7 @@ void FedRawDataInputSource::readSupervisor() {
uint32_t lsFromRaw = 0;
int32_t serverEventsInNewFile = -1;
int rawFd = -1;
uint16_t rawDataType = 0;

int backoff_exp = 0;

Expand Down Expand Up @@ -895,7 +896,6 @@ void FedRawDataInputSource::readSupervisor() {
//return LS if LS not set, otherwise return file
status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
if (status == evf::EvFDaqDirector::newFile) {
uint16_t rawDataType;
if (evf::EvFDaqDirector::parseFRDFileHeader(nextFile,
rawFd,
rawHeaderSize,
Expand All @@ -922,6 +922,7 @@ void FedRawDataInputSource::readSupervisor() {
ls,
nextFile,
rawFd,
rawDataType,
rawHeaderSize,
serverEventsInNewFile,
fileSizeFromMetadata,
Expand Down Expand Up @@ -1102,7 +1103,7 @@ void FedRawDataInputSource::readSupervisor() {
uint16_t rawHeaderCheck;
bool fileFound;
eventsInNewFile = daqDirector_->grabNextJsonFromRaw(
nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0, true);
nextFile, rawFdEmpty, rawDataType, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0, true);
assert(fileFound && rawHeaderCheck == rawHeaderSize);
daqDirector_->unlockFULocal();
} else
Expand All @@ -1125,6 +1126,7 @@ void FedRawDataInputSource::readSupervisor() {
rawFile,
!fileListMode_,
rawFd,
rawDataType,
fileSize,
rawHeaderSize,
neededChunks,
Expand Down
8 changes: 7 additions & 1 deletion EventFilter/Utilities/test/startBU.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@
VarParsing.VarParsing.varType.int, # string, int, or float
"Write only to open directory")


options.register ('eventDataType',
0,
VarParsing.VarParsing.multiplicity.singleton,
VarParsing.VarParsing.varType.int,
"Event data type value in FRD file header v2")

options.parseArguments()

Expand Down Expand Up @@ -161,6 +165,7 @@
numEventsPerFile = cms.uint32(options.eventsPerFile),
frdVersion = cms.uint32(6),
frdFileVersion = cms.uint32(options.frdFileVersion),
dataType = cms.untracked.uint32(options.eventDataType),
writeToOpen = cms.untracked.bool(True if options.writeToOpen else False)
)

Expand All @@ -178,6 +183,7 @@
numEventsPerFile = cms.uint32(options.eventsPerFile),
frdVersion = cms.uint32(0),
frdFileVersion = cms.uint32(0),
dataType = cms.untracked.uint32(options.eventDataType),
sourceIdList = cms.untracked.vuint32(66,1511),
rawProductName = cms.untracked.string("RawDataBuffer")
)
Expand Down
Loading