diff --git a/EventFilter/Utilities/interface/EvFDaqDirector.h b/EventFilter/Utilities/interface/EvFDaqDirector.h index 74c99b1a3eede..ccc45166c7b94 100644 --- a/EventFilter/Utilities/interface/EvFDaqDirector.h +++ b/EventFilter/Utilities/interface/EvFDaqDirector.h @@ -139,8 +139,12 @@ namespace evf { bool requireHeader, bool retry, bool closeFile); + + uint16_t frdFileDataType(const void* buf) const; + int grabNextJsonFromRaw(std::string const& rawSourcePath, int& rawFd, + uint16_t& rawDataType, uint16_t& rawHeaderSize, int64_t& fileSizeFromHeader, bool& fileFound, @@ -157,6 +161,7 @@ namespace evf { unsigned int& ls, std::string& nextFile, int& rawFd, + uint16_t& rawDataType, uint16_t& rawHeaderSize, int32_t& serverEventsInNewFile_, int64_t& fileSize, diff --git a/EventFilter/Utilities/interface/SourceRawFile.h b/EventFilter/Utilities/interface/SourceRawFile.h index 5ec3840d603d6..f9d110b47c160 100644 --- a/EventFilter/Utilities/interface/SourceRawFile.h +++ b/EventFilter/Utilities/interface/SourceRawFile.h @@ -95,6 +95,7 @@ class InputFile { std::vector bufferEnds_; std::vector fileSizes_; std::vector fileOrder_; + std::vector fileDataType_; bool deleteFile_; int rawFd_; uint64_t fileSize_; @@ -115,6 +116,7 @@ class InputFile { std::string const& name = std::string(), bool deleteFile = true, int rawFd = -1, + uint16_t rawDataType = 0, uint64_t fileSize = 0, uint16_t rawHeaderSize = 0, uint16_t nChunks = 0, @@ -134,6 +136,7 @@ class InputFile { nProcessed_(0) { fileNames_.push_back(name); fileOrder_.push_back(fileOrder_.size()); + fileDataType_.push_back(rawDataType); diskFileSizes_.push_back(fileSize); fileSizes_.push_back(0); bufferOffsets_.push_back(0); @@ -159,6 +162,7 @@ class InputFile { numFiles_++; fileNames_.push_back(name); fileOrder_.push_back(fileOrder_.size()); + fileDataType_.push_back(0); diskFileSizes_.push_back(size); fileSizes_.push_back(0); bufferOffsets_.push_back(prevOffset + prevSize); @@ -205,6 +209,13 @@ class InputFile { throw cms::Exception("InputFile") << "buffers are inconsistent for input files with primary " << fileName_; return complete > 0; } + void setFileDataType(unsigned int j, uint16_t dataType) { fileDataType_[j] = dataType; } + int daqRunEndFlagIndex() const { + for (unsigned j = 0; j < fileDataType_.size(); j++) + if (fileDataType_[j] == 0xffff) + return (int)j; + return -1; + } }; class DAQSource; @@ -216,12 +227,14 @@ class RawInputFile : public InputFile { std::string const& name = std::string(), bool deleteFile = true, int rawFd = -1, + uint16_t rawDataType = 0, uint64_t fileSize = 0, uint16_t rawHeaderSize = 0, uint32_t nChunks = 0, int nEvents = 0, DAQSource* parent = nullptr) - : InputFile(status, lumi, name, deleteFile, rawFd, fileSize, rawHeaderSize, nChunks, nEvents, nullptr), + : InputFile( + status, lumi, name, deleteFile, rawFd, rawDataType, fileSize, rawHeaderSize, nChunks, nEvents, nullptr), sourceParent_(parent) {} bool advance(std::mutex& m, std::condition_variable& cv, unsigned char*& dataPosition, const size_t size); void advance(const size_t size) { diff --git a/EventFilter/Utilities/plugins/RawEventFileWriterForBU.cc b/EventFilter/Utilities/plugins/RawEventFileWriterForBU.cc index 5052af2ed8beb..4526c72335eb5 100644 --- a/EventFilter/Utilities/plugins/RawEventFileWriterForBU.cc +++ b/EventFilter/Utilities/plugins/RawEventFileWriterForBU.cc @@ -25,6 +25,7 @@ using namespace edm::streamer; RawEventFileWriterForBU::RawEventFileWriterForBU(edm::ParameterSet const& ps) : microSleep_(ps.getParameter("microSleep")), frdFileVersion_(ps.getParameter("frdFileVersion")), + dataType_(ps.getUntrackedParameter("dataType")), writeEoR_(ps.getUntrackedParameter("writeEoR")), writeToOpen_(ps.getUntrackedParameter("writeToOpen")) { if (edm::Service().isAvailable()) @@ -247,7 +248,8 @@ void RawEventFileWriterForBU::finishFileWrite(unsigned int ls) { << " and size " << perFileSize_.value(); } else if (frdFileVersion_ == 2) { lseek(outfd_, 0, SEEK_SET); - FRDFileHeader_v2 frdFileHeader(0, perFileEventCount_.value(), (uint32_t)run_, (uint32_t)ls, perFileSize_.value()); + FRDFileHeader_v2 frdFileHeader( + (uint16_t)(dataType_ & 0xffff), perFileEventCount_.value(), (uint32_t)run_, (uint32_t)ls, perFileSize_.value()); write(outfd_, (char*)&frdFileHeader, sizeof(FRDFileHeader_v2)); closefd(); //move raw file from open to run directory @@ -342,5 +344,6 @@ void RawEventFileWriterForBU::stop() { void RawEventFileWriterForBU::extendDescription(edm::ParameterSetDescription& desc) { desc.add("microSleep", 0); desc.add("frdFileVersion", 0); + desc.addUntracked("dataType", 0)->setComment("data typw field in FRD file header v2"); desc.addUntracked("writeEoR", true); } diff --git a/EventFilter/Utilities/plugins/RawEventFileWriterForBU.h b/EventFilter/Utilities/plugins/RawEventFileWriterForBU.h index e1bfc6cc49ba0..2919def148ba2 100644 --- a/EventFilter/Utilities/plugins/RawEventFileWriterForBU.h +++ b/EventFilter/Utilities/plugins/RawEventFileWriterForBU.h @@ -88,6 +88,7 @@ class RawEventFileWriterForBU { int microSleep_; unsigned int frdFileVersion_; + unsigned int dataType_; bool writeEoR_; bool writeToOpen_; diff --git a/EventFilter/Utilities/src/DAQSource.cc b/EventFilter/Utilities/src/DAQSource.cc index 5c6d2d84a4f25..53c1d4a232aad 100644 --- a/EventFilter/Utilities/src/DAQSource.cc +++ b/EventFilter/Utilities/src/DAQSource.cc @@ -464,7 +464,21 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() { heldFilesCount_--; //release last chunk (it is never released elsewhere) freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]); - if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) { + + bool filesIncomplete = currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_); + bool retRunEnd = false; + + int runEndFlagIndex = currentFile_->daqRunEndFlagIndex(); + if (runEndFlagIndex != -1) { + if (filesIncomplete) + edm::LogError("DAQSource::getNextDataBlock") + << "Detected DAQ Run End flag in RAW file " << currentFile_->fileNames_[runEndFlagIndex]; + else + edm::LogError("DAQSource::getNextDataBlock") + << "Detected DAQ Run End flag in RAW file " << currentFile_->fileNames_[runEndFlagIndex] + << " but files appear to be complete"; + retRunEnd = true; + } else if (filesIncomplete) { std::stringstream str; for (auto& s : currentFile_->fileNames_) { struct stat bufs; @@ -499,7 +513,10 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() { currentFile_.reset(); } setMonState(inProcessingFile); - return evf::EvFDaqDirector::noFile; + if (retRunEnd) + return evf::EvFDaqDirector::runEnded; + else + return evf::EvFDaqDirector::noFile; } //handle RAW file header in new file @@ -794,6 +811,7 @@ void DAQSource::readSupervisor() { uint32_t lsFromRaw = 0; int32_t serverEventsInNewFile = -1; int rawFd = -1; + uint16_t rawDataType = 0; int backoff_exp = 0; @@ -838,7 +856,6 @@ void DAQSource::readSupervisor() { //return LS if LS not set, otherwise return file status = getFile(ls, nextFile, thisLockWaitTimeUs); if (status == evf::EvFDaqDirector::newFile) { - uint16_t rawDataType; if (evf::EvFDaqDirector::parseFRDFileHeader(nextFile, rawFd, rawHeaderSize, ///possibility to use by new formats @@ -866,6 +883,7 @@ void DAQSource::readSupervisor() { ls, nextFile, rawFd, + rawDataType, rawHeaderSize, //which format? serverEventsInNewFile, fileSizeFromMetadata, @@ -1058,6 +1076,7 @@ void DAQSource::readSupervisor() { rawFile, !fileListMode_, rawFd, + rawDataType, fileSize, rawHeaderSize, //for which format 0, @@ -1347,6 +1366,7 @@ void DAQSource::readWorker(unsigned int tid) { size_t skipped = bufferLeft; auto start = std::chrono::high_resolution_clock::now(); + for (unsigned int i = 0; i < readBlocks; i++) { ssize_t last; edm::LogInfo("DAQSource") << "readWorker read -: " << (int64_t)(chunk->usedSize_ - bufferLeft) << " or " @@ -1408,8 +1428,7 @@ void DAQSource::readWorker(unsigned int tid) { LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count()) << " GB/s)"; - }; - //END primary function + }; //END primary function //SECONDARY files function auto readSecondary = [&](uint64_t bufferLeft, unsigned int j) { @@ -1446,6 +1465,11 @@ void DAQSource::readWorker(unsigned int tid) { close(fileDescriptor); break; } + if (i == 0) { + uint16_t dataType = daqDirector_->frdFileDataType(chunk->buf_ + bufferLeft); + if (dataType) + file->setFileDataType(j, dataType); + } if (last > 0) { bufferLeft += last; fileLen += last; diff --git a/EventFilter/Utilities/src/EvFDaqDirector.cc b/EventFilter/Utilities/src/EvFDaqDirector.cc index b143702f830ac..e80703c6ba0e0 100644 --- a/EventFilter/Utilities/src/EvFDaqDirector.cc +++ b/EventFilter/Utilities/src/EvFDaqDirector.cc @@ -1226,8 +1226,22 @@ namespace evf { return false; } + uint16_t EvFDaqDirector::frdFileDataType(const void* buf) const { + //v2 is the largest possible read + const FRDFileHeader_v2* hdr = static_cast(buf); + + const FRDFileHeaderIdentifier* fileId = (const FRDFileHeaderIdentifier*)hdr; + uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_); + + if (frd_version == 2) { + return hdr->c_.dataType_; + } + return 0; + } + int EvFDaqDirector::grabNextJsonFromRaw(std::string const& rawSourcePath, int& rawFd, + uint16_t& rawDataType, uint16_t& rawHeaderSize, int64_t& fileSizeFromHeader, bool& fileFound, @@ -1255,7 +1269,6 @@ namespace evf { uint32_t lsFromRaw; int32_t nbEventsWrittenRaw; int64_t fileSizeFromRaw; - uint16_t rawDataType; auto ret = parseFRDFileHeader(rawSourcePath, rawFd, rawHeaderSize, @@ -2068,6 +2081,7 @@ namespace evf { unsigned int& ls, std::string& nextFileRaw, int& rawFd, + uint16_t& rawDataType, uint16_t& rawHeaderSize, int32_t& serverEventsInNewFile, int64_t& fileSizeFromMetadata, @@ -2166,8 +2180,15 @@ namespace evf { //error reading header, set to -1 and trigger error downstream serverEventsInNewFile = -1; } else if (rawHeader) { - serverEventsInNewFile = grabNextJsonFromRaw( - nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false, requireHeader); + serverEventsInNewFile = grabNextJsonFromRaw(nextFileRaw, + rawFd, + rawDataType, + rawHeaderSize, + fileSizeFromMetadata, + fileFound, + serverLS, + false, + requireHeader); } else if (eventCounter) { //there is no header: then try to use model to count events serverEventsInNewFile = eventCounter(nextFileRaw, rawFd, fileSizeFromMetadata, serverLS, fileFound); diff --git a/EventFilter/Utilities/src/FedRawDataInputSource.cc b/EventFilter/Utilities/src/FedRawDataInputSource.cc index be7754d531a1f..0dad45de16f9b 100644 --- a/EventFilter/Utilities/src/FedRawDataInputSource.cc +++ b/EventFilter/Utilities/src/FedRawDataInputSource.cc @@ -850,6 +850,7 @@ void FedRawDataInputSource::readSupervisor() { uint32_t lsFromRaw = 0; int32_t serverEventsInNewFile = -1; int rawFd = -1; + uint16_t rawDataType = 0; int backoff_exp = 0; @@ -895,7 +896,6 @@ void FedRawDataInputSource::readSupervisor() { //return LS if LS not set, otherwise return file status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs); if (status == evf::EvFDaqDirector::newFile) { - uint16_t rawDataType; if (evf::EvFDaqDirector::parseFRDFileHeader(nextFile, rawFd, rawHeaderSize, @@ -922,6 +922,7 @@ void FedRawDataInputSource::readSupervisor() { ls, nextFile, rawFd, + rawDataType, rawHeaderSize, serverEventsInNewFile, fileSizeFromMetadata, @@ -1102,7 +1103,7 @@ void FedRawDataInputSource::readSupervisor() { uint16_t rawHeaderCheck; bool fileFound; eventsInNewFile = daqDirector_->grabNextJsonFromRaw( - nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0, true); + nextFile, rawFdEmpty, rawDataType, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0, true); assert(fileFound && rawHeaderCheck == rawHeaderSize); daqDirector_->unlockFULocal(); } else @@ -1125,6 +1126,7 @@ void FedRawDataInputSource::readSupervisor() { rawFile, !fileListMode_, rawFd, + rawDataType, fileSize, rawHeaderSize, neededChunks, diff --git a/EventFilter/Utilities/test/startBU.py b/EventFilter/Utilities/test/startBU.py index 3e14eff964617..b64c24e417936 100644 --- a/EventFilter/Utilities/test/startBU.py +++ b/EventFilter/Utilities/test/startBU.py @@ -82,7 +82,11 @@ VarParsing.VarParsing.varType.int, # string, int, or float "Write only to open directory") - +options.register ('eventDataType', + 0, + VarParsing.VarParsing.multiplicity.singleton, + VarParsing.VarParsing.varType.int, + "Event data type value in FRD file header v2") options.parseArguments() @@ -161,6 +165,7 @@ numEventsPerFile = cms.uint32(options.eventsPerFile), frdVersion = cms.uint32(6), frdFileVersion = cms.uint32(options.frdFileVersion), + dataType = cms.untracked.uint32(options.eventDataType), writeToOpen = cms.untracked.bool(True if options.writeToOpen else False) ) @@ -178,6 +183,7 @@ numEventsPerFile = cms.uint32(options.eventsPerFile), frdVersion = cms.uint32(0), frdFileVersion = cms.uint32(0), + dataType = cms.untracked.uint32(options.eventDataType), sourceIdList = cms.untracked.vuint32(66,1511), rawProductName = cms.untracked.string("RawDataBuffer") ) diff --git a/EventFilter/Utilities/test/testIncompleteSFB.sh b/EventFilter/Utilities/test/testIncompleteSFB.sh new file mode 100755 index 0000000000000..4d825e721243a --- /dev/null +++ b/EventFilter/Utilities/test/testIncompleteSFB.sh @@ -0,0 +1,96 @@ +#!/bin/bash +SCRIPTDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +function diebu { echo Failure $1: status $2 ; echo "" ; echo "----- Error -----"; echo ""; cat out_2_bu.log; rm -rf $3/{ramdisk,data,dqmdisk,ecalInDir,*.py}; exit $2 ; } +function diefu { echo Failure $1: status $2 ; echo "" ; echo "----- Error -----"; echo ""; cat out_2_fu.log; rm -rf $3/{ramdisk,data,dqmdisk,ecalInDir,*.py}; exit $2 ; } +function diedqm { echo Failure $1: status $2 ; echo "" ; echo "----- Error -----"; echo ""; cat out_2_dqm.log; rm -rf $3/{ramdisk,data,dqmdisk,ecalInDir,*.py}; exit $2 ; } +function dieecal { echo Failure $1: status $2 ; echo "" ; echo "----- Error -----"; echo ""; cat out_2_ecal.log; rm -rf $3/{ramdisk,data,dqmdisk,ecalInDir,*.py}; exit $2 ; } + +copy_index_files() { + directory=$1 + sourceid=$2 + del_orig=$3 + shopt -s nullglob + for file in "$directory"/*_index*.raw; do + filename=$(basename "$file") + if [[ "$filename" =~ ^(.*)_index([0-9]+)\.raw$ ]]; then + base="${BASH_REMATCH[1]}" + x="${BASH_REMATCH[2]}" + new_name="${base}_index${x}_source${sourceid}.raw" + cp -- "$file" "$directory/$new_name" + #echo "Copied: $filename -> $new_name" + if [[ $del_orig -eq 1 ]]; then + rm -rf $file + fi + fi + done + shopt -u nullglob +} + +copy_json_files() { + directory=$1 + sourceid=$2 + shopt -s nullglob + for file in "$directory"/*.jsn; do + filename=$(basename "$file") + if [[ "$filename" =~ ^(.*)_EoR.jsn$ ]]; then + base="${BASH_REMATCH[1]}" + x="${BASH_REMATCH[2]}" + new_name="${base}_EoR_source${sourceid}.jsn" + mv "$file" "$directory/$new_name" + fi + if [[ "$filename" =~ ^(.*)_EoLS.jsn$ ]]; then + base="${BASH_REMATCH[1]}" + x="${BASH_REMATCH[2]}" + new_name="${base}_EoLS_source${sourceid}.jsn" + mv "$file" "$directory/$new_name" + fi + done + shopt -u nullglob +} + +FUSCRIPT="startFU.py" + +if [ -z ${SCRAM_TEST_PATH} ]; then +SCRAM_TEST_PATH="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +fi +echo "SCRAM_TEST_PATH = ${SCRAM_TEST_PATH}" + +RC=0 +P=$$ +PREFIX=results_${USER}${P} +OUTDIR=${PWD}/${PREFIX} + +echo "OUT_TMP_DIR = $OUTDIR" + +mkdir ${OUTDIR} +cp ${SCRIPTDIR}/startBU.py ${OUTDIR} +cp ${SCRIPTDIR}/startFU.py ${OUTDIR} +cp ${SCRIPTDIR}/ufu2.py ${OUTDIR} +cp ${SCRIPTDIR}/startFU_daqsource.py ${OUTDIR} +cp ${SCRIPTDIR}/unittest_FU_daqsource.py ${OUTDIR} +cp ${SCRIPTDIR}/startFU_ds_multi.py ${OUTDIR} +cp ${SCRIPTDIR}/test_dqmstream.py ${OUTDIR} +cp ${SCRIPTDIR}/testECALCalib_cfg.py ${OUTDIR} +cd ${OUTDIR} + +rm -rf $OUTDIR/{ramdisk,data,dqmdisk,ecalInDir,*.log} + +runnumber="100101" + +################ +echo "Running fileListMode test" + +echo "running DAQSource test with striped event FRD (SFB)" +CMDLINE_STARTBU="cmsRun startBU.py runNumber=${runnumber} fffBaseDir=${OUTDIR} maxLS=2 fedMeanSize=128 eventsPerFile=20 eventsPerLS=35 frdFileVersion=2 buBaseDir=ramdisk1 subsystems=TCDS,SiPixel,ECAL,RPC" +#CMDLINE_STARTBU="cmsRun startBU.py runNumber=${runnumber} fffBaseDir=${OUTDIR} maxLS=2 fedMeanSize=128 eventsPerFile=20 eventsPerLS=35 frdFileVersion=2 buBaseDir=ramdisk1 subsystems=TCDS,SiPixel,ECAL,RPC eventDataType=65535" +${CMDLINE_STARTBU} +CMDLINE_STARTBU="cmsRun startBU.py runNumber=${runnumber} fffBaseDir=${OUTDIR} maxLS=2 fedMeanSize=128 eventsPerFile=20 eventsPerLS=35 frdFileVersion=2 buBaseDir=ramdisk2 subsystems=SiStrip,HCAL,DT,CSC eventDataType=65535" +#CMDLINE_STARTBU="cmsRun startBU.py runNumber=${runnumber} fffBaseDir=${OUTDIR} maxLS=2 fedMeanSize=128 eventsPerFile=20 eventsPerLS=35 frdFileVersion=2 buBaseDir=ramdisk2 subsystems=SiStrip,HCAL,DT,CSC" +${CMDLINE_STARTBU} +#run reader +CMDLINE_STARTFU="cmsRun startFU_daqsource.py daqSourceMode=FRDStriped runNumber=${runnumber} fffBaseDir=${OUTDIR} numRamdisks=2" +${CMDLINE_STARTFU} + +rm -rf $OUTDIR +exit ${RC}