diff --git a/src/XrdApps/XrdClJCachePlugin/file/Art.hh b/src/XrdApps/XrdClJCachePlugin/file/Art.hh index dbedb748ce2..783f848b494 100644 --- a/src/XrdApps/XrdClJCachePlugin/file/Art.hh +++ b/src/XrdApps/XrdClJCachePlugin/file/Art.hh @@ -31,66 +31,66 @@ #include /*----------------------------------------------------------------------------*/ +namespace JCache { + class Art { + public: + Art() {} + virtual ~Art() {} -class Art { -public: - Art() {} - virtual ~Art() {} - - void drawCurve(const std::vector& dataPoints, double runtime) { - if (dataPoints.size() != 40) { - std::cerr << "Error: Exactly 40 data points are required." << std::endl; - return; - } - - double maxValue = *std::max_element(dataPoints.begin(), dataPoints.end()); - double minValue = *std::min_element(dataPoints.begin(), dataPoints.end()); - - const int plotHeight = 10; // Number of lines in the plot - const int plotWidth = 40; // Width of the plot in characters - const int yLegendWidth = 8; // Width of the Y legend in characters - - std::vector plot(plotHeight, std::string(plotWidth, ' ')); - - // Normalize data points to the plot height - std::vector normalizedDataPoints; - for (double point : dataPoints) { - int normalizedValue = static_cast((point - minValue) / (maxValue - minValue) * (plotHeight - 1)); - normalizedDataPoints.push_back(normalizedValue); - } - - // Draw the curve - for (size_t i = 0; i < normalizedDataPoints.size(); ++i) { - int y = plotHeight - 1 - normalizedDataPoints[i]; - plot[y][i * (plotWidth / (dataPoints.size() - 1))] = '*'; - } - - // Print the plot with Y legend - for (int i = 0; i < plotHeight; ++i) { - double yValue = minValue + (maxValue - minValue) * (plotHeight - 1 - i) / (plotHeight - 1); - if (i==0) { - std::cerr << std::setw(yLegendWidth) << std::fixed << std::setprecision(2) << yValue << " MB/s | "; - } else { - std::cerr << std::setw(yLegendWidth) << std::fixed << std::setprecision(2) << yValue << " | "; + void drawCurve(const std::vector& dataPoints, double runtime) { + if (dataPoints.size() != 40) { + std::cerr << "Error: Exactly 40 data points are required." << std::endl; + return; + } + + double maxValue = *std::max_element(dataPoints.begin(), dataPoints.end()); + double minValue = *std::min_element(dataPoints.begin(), dataPoints.end()); + + const int plotHeight = 10; // Number of lines in the plot + const int plotWidth = 40; // Width of the plot in characters + const int yLegendWidth = 8; // Width of the Y legend in characters + + std::vector plot(plotHeight, std::string(plotWidth, ' ')); + + // Normalize data points to the plot height + std::vector normalizedDataPoints; + for (double point : dataPoints) { + int normalizedValue = static_cast((point - minValue) / (maxValue - minValue) * (plotHeight - 1)); + normalizedDataPoints.push_back(normalizedValue); + } + + // Draw the curve + for (size_t i = 0; i < normalizedDataPoints.size(); ++i) { + int y = plotHeight - 1 - normalizedDataPoints[i]; + plot[y][i * (plotWidth / (dataPoints.size() - 1))] = '*'; + } + + // Print the plot with Y legend + for (int i = 0; i < plotHeight; ++i) { + double yValue = minValue + (maxValue - minValue) * (plotHeight - 1 - i) / (plotHeight - 1); + if (i==0) { + std::cerr << std::setw(yLegendWidth) << std::fixed << std::setprecision(2) << yValue << " MB/s | "; + } else { + std::cerr << std::setw(yLegendWidth) << std::fixed << std::setprecision(2) << yValue << " | "; + } + std::cerr << plot[i] << std::endl; } - std::cerr << plot[i] << std::endl; - } - // Print the X axis - std::cerr << std::string(yLegendWidth + 7, ' ') << std::string(plotWidth, '-') << std::endl; - std::cerr << std::string(yLegendWidth + 7, ' '); - for (size_t i = 0 ; i < dataPoints.size()/4; ++i) { - std::cerr << std::fixed << std::setw(4) << std::left << (i*10); + // Print the X axis + std::cerr << std::string(yLegendWidth + 7, ' ') << std::string(plotWidth, '-') << std::endl; + std::cerr << std::string(yLegendWidth + 7, ' '); + for (size_t i = 0 ; i < dataPoints.size()/4; ++i) { + std::cerr << std::fixed << std::setw(4) << std::left << (i*10); + } + std::cerr << "[ " << 100 << " % = " << std::fixed << std::setprecision(2) << runtime << "s ]"<< std::endl; } - std::cerr << "[ " << 100 << " % = " << std::fixed << std::setprecision(2) << runtime << "s ]"<< std::endl; - } - void drawCurve(const std::vector& data, double interval, double runtime) { - std::vector newdata; - for ( auto i:data ) { - newdata.push_back(i/1000000.0 / interval); + void drawCurve(const std::vector& data, double interval, double runtime) { + std::vector newdata; + for ( auto i:data ) { + newdata.push_back(i/1000000.0 / interval); + } + return drawCurve(newdata, runtime); } - return drawCurve(newdata, runtime); - } -}; - + }; +} // namespace JCache diff --git a/src/XrdApps/XrdClJCachePlugin/file/CacheStats.hh b/src/XrdApps/XrdClJCachePlugin/file/CacheStats.hh new file mode 100644 index 00000000000..7bccbeee8f3 --- /dev/null +++ b/src/XrdApps/XrdClJCachePlugin/file/CacheStats.hh @@ -0,0 +1,274 @@ +//------------------------------------------------------------------------------ +// Copyright (c) 2024 by European Organization for Nuclear Research (CERN) +// Author: Andreas-Joachim Peters +//------------------------------------------------------------------------------ +// This file is part of the XRootD software suite. +// +// XRootD is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// XRootD is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with XRootD. If not, see . +// +// In applying this licence, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include "file/XrdClJCacheFile.hh" + + + +namespace JCache +{ + //! structure for cache hit statistics + struct CacheStats { + CacheStats(bool doe=false) : + bytesRead(0), + bytesReadV(0), + bytesCached(0), + bytesCachedV(0), + readOps(0), + readVOps(0), + readVreadOps(0), + nreadfiles(0), + dumponexit(doe), + peakrate(0) + { + // Get the current real time + struct timeval now; + gettimeofday(&now, nullptr); + startTime = now.tv_sec + now.tv_usec / 1000000.0; + } + + ~CacheStats() { + if (dumponexit.load()) { + using namespace std::chrono; + std::string jsonpath = XrdCl::JCacheFile::sJsonPath + "jcache."; + std::string name = getenv("XRD_APPNAME")?getenv("XRD_APPNAME"):"none"+std::string(".")+std::to_string(getpid()); + jsonpath += name; + jsonpath += ".json"; + XrdCl::JCacheFile::sStats.GetTimes(); + + XrdCl::JCacheFile::sStats.bytes_per_second = XrdCl::JCacheFile::sStats.bench.GetBins((int)(realTime)); + XrdCl::JCacheFile::sStats.peakrate = *(std::max_element(XrdCl::JCacheFile::sStats.bytes_per_second.begin(), XrdCl::JCacheFile::sStats.bytes_per_second.end())); + if (XrdCl::JCacheFile::sJsonPath.length()) { + XrdCl::JCacheFile::sStats.persistToJson(jsonpath, name); + } + if (XrdCl::JCacheFile::sEnableSummary) { + std::cerr << CacheStats::GlobalStats(XrdCl::JCacheFile::sStats); + } + std::vector bins = XrdCl::JCacheFile::sStats.bench.GetBins(40); + JCache::Art art; + if (XrdCl::JCacheFile::sEnableSummary) { + art.drawCurve(bins, XrdCl::JCacheFile::sStats.bench.GetTimePerBin().count() / 1000000.0, realTime); + } + } + } + + static std::string bytesToHumanReadable(double bytes) { + const char* suffixes[] = {"B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"}; + const int numSuffixes = sizeof(suffixes) / sizeof(suffixes[0]); + + if (bytes == 0) return "0 B"; + + int exp = std::min((int)(std::log(bytes) / std::log(1000)), numSuffixes - 1); + double val = bytes / std::pow(1000, exp); + std::ostringstream oss; + oss << std::fixed << std::setprecision(2) << val << " " << suffixes[exp]; + return oss.str(); + } + + double HitRate() { + auto n = this->bytesCached.load()+this->bytesRead.load(); + if (!n) return 100.0; + return 100.0*(this->bytesCached.load()) / n; + } + double HitRateV() { + auto n = this->bytesCachedV.load()+this->bytesReadV.load(); + if (!n) return 100.0; + return 100.0*(this->bytesCachedV.load()) / n; + } + double CombinedHitRate() { + auto n = (this->bytesCached.load()+this->bytesRead.load()+this->bytesCachedV.load()+this->bytesReadV.load()); + if (!n) return 100.0; + return 100.0*(this->bytesCached.load()+this->bytesCachedV.load()) / n; + } + void AddUrl(const std::string& url) { + std::lock_guard guard(urlMutex); + urls.insert(url); + } + bool HasUrl(const std::string& url) { + std::lock_guard guard(urlMutex); + return urls.count(url); + } + double ReadBytes() { + return (bytesRead.load()+bytesReadV.load() + bytesCached.load() + bytesCachedV.load()); + } + + double Used() { + if (totaldatasize) { + return 100.0*(bytesRead.load()+bytesReadV.load() + bytesCached.load() + bytesCachedV.load()) / totaldatasize; + } else { + return 100.0; + } + } + + size_t UniqueUrls() { + std::lock_guard guard(urlMutex); + return urls.size(); + } + + void GetTimes() { + struct rusage usage; + struct timeval now; + + // Get the current real time + gettimeofday(&now, nullptr); + realTime = now.tv_sec + now.tv_usec / 1000000.0 - startTime; + + // Get resource usage + getrusage(RUSAGE_SELF, &usage); + + // Get user and system time + userTime = usage.ru_utime.tv_sec + usage.ru_utime.tv_usec / 1000000.0; + sysTime = usage.ru_stime.tv_sec + usage.ru_stime.tv_usec / 1000000.0; + } + + void persistToJson(const std::string& path, const std::string& name) { + std::ofstream outFile(path); + if (!outFile.is_open()) { + std::cerr << "error: failed to open JSON statistics file: " << path << std::endl; + return; + } + + outFile << "{\n"; + outFile << " \"appname\": \"" << name << "\",\n"; + outFile << " \"pid\": \"" << getpid() << "\",\n"; + outFile << " \"bytesRead\": " << bytesRead.load() << ",\n"; + outFile << " \"bytesReadV\": " << bytesReadV.load() << ",\n"; + outFile << " \"bytesCached\": " << bytesCached.load() << ",\n"; + outFile << " \"bytesCachedV\": " << bytesCachedV.load() << ",\n"; + outFile << " \"readOps\": " << readOps.load() << ",\n"; + outFile << " \"readVOps\": " << readVOps.load() << ",\n"; + outFile << " \"readVreadOps\": " << readVreadOps.load() << ",\n"; + outFile << " \"nreadfiles\": " << nreadfiles.load() << ",\n"; + outFile << " \"totaldatasize\": " << totaldatasize.load() << ",\n"; + + std::lock_guard lock(urlMutex); + outFile << " \"urls\": ["; + for (auto it = urls.begin(); it != urls.end(); ++it) { + if (it != urls.begin()) { + outFile << ", "; + } + outFile << "\"" << *it << "\""; + } + outFile << "],\n"; + + outFile << " \"bytes_per_second\": ["; + for (size_t i = 0; i < bytes_per_second.size(); ++i) { + if (i != 0) { + outFile << ", "; + } + outFile << bytes_per_second[i]; + } + outFile << "],\n"; + + outFile << std::fixed << std::setprecision(6); // Set precision for double values + + outFile << " \"userTime\": " << userTime.load() << ",\n"; + outFile << " \"realTime\": " << realTime.load() << ",\n"; + outFile << " \"sysTime\": " << sysTime.load() << ",\n"; + outFile << " \"startTime\": " << startTime.load() << "\n"; + outFile << "}\n"; + + outFile.close(); + } + + void AddToStats(CacheStats& gStats) { + gStats.readOps += readOps.load(); + gStats.readVOps += readVOps.load(); + gStats.readVreadOps += readVreadOps.load(); + gStats.bytesRead += bytesRead.load(); + gStats.bytesReadV += bytesReadV.load(); + gStats.bytesCached += bytesCached.load(); + gStats.bytesCachedV += bytesCachedV.load(); + gStats.nreadfiles += 1; + } + + static std::string GlobalStats(CacheStats& sStats) { + std::ostringstream oss; + oss << "# ----------------------------------------------------------- #" << std::endl; + oss << "# JCache : cache combined hit rate : " << std::fixed << std::setprecision(2) << sStats.CombinedHitRate() << " %" << std::endl; + oss << "# JCache : cache read hit rate : " << std::fixed << std::setprecision(2) << sStats.HitRate() << " %" << std::endl; + oss << "# JCache : cache readv hit rate : " << std::fixed << std::setprecision(2) << sStats.HitRateV() << " %" << std::endl; + oss << "# ----------------------------------------------------------- #" << std::endl; + oss << "# JCache : total bytes read : " << sStats.bytesRead.load()+sStats.bytesCached.load() << std::endl; + oss << "# JCache : total bytes readv : " << sStats.bytesReadV.load()+sStats.bytesCachedV.load() << std::endl; + oss << "# ----------------------------------------------------------- #" << std::endl; + oss << "# JCache : total iops read : " << sStats.readOps.load() << std::endl; + oss << "# JCache : total iops readv : " << sStats.readVOps.load() << std::endl; + oss << "# JCache : total iops readvread : " << sStats.readVreadOps.load() << std::endl; + oss << "# ----------------------------------------------------------- #" << std::endl; + oss << "# JCache : open files read : " << sStats.nreadfiles.load() << std::endl; + oss << "# JCache : open unique f. read : " << sStats.UniqueUrls() << std::endl; + oss << "# ----------------------------------------------------------- #" << std::endl; + oss << "# JCache : total unique files bytes : " << sStats.totaldatasize << std::endl; + oss << "# JCache : total unique files size : " << sStats.bytesToHumanReadable((double)sStats.totaldatasize) << std::endl; + oss << "# JCache : percentage dataset read : " << std::fixed << std::setprecision(2) << sStats.Used() << " %" << std::endl; + oss << "# ----------------------------------------------------------- #" << std::endl; + oss << "# JCache : app user time : " << std::fixed << std::setprecision(2) << sStats.userTime << " s" << std::endl; + oss << "# JCache : app real time : " << std::fixed << std::setprecision(2) << sStats.realTime << " s" << std::endl; + oss << "# JCache : app sys time : " << std::fixed << std::setprecision(2) << sStats.sysTime << " s" << std::endl; + oss << "# JCache : app acceleration : " << std::fixed << std::setprecision(2) << sStats.userTime / sStats.realTime << "x" << std::endl; + oss << "# JCache : app readrate : " << std::fixed << std::setprecision(2) << sStats.bytesToHumanReadable((sStats.ReadBytes()/sStats.realTime)) << "/s" << " [ peak " << sStats.bytesToHumanReadable(sStats.peakrate) << "/s ]" << std::endl; + oss << "# ----------------------------------------------------------- #" << std::endl; + + return oss.str(); + } + + std::atomic bytesRead; + std::atomic bytesReadV; + std::atomic bytesCached; + std::atomic bytesCachedV; + std::atomic readOps; + std::atomic readVOps; + std::atomic readVreadOps; + std::atomic nreadfiles; + std::atomic totaldatasize; + std::atomic dumponexit; + std::set urls; + std::mutex urlMutex; + std::atomic userTime; + std::atomic realTime; + std::atomic sysTime; + std::atomic startTime; + JCache::TimeBench bench; + std::vector bytes_per_second; + std::atomic peakrate; + }; // class CacheStats +} // namespace JCache \ No newline at end of file diff --git a/src/XrdApps/XrdClJCachePlugin/file/TimeBench.hh b/src/XrdApps/XrdClJCachePlugin/file/TimeBench.hh index b110d000073..b3fc236ebe8 100644 --- a/src/XrdApps/XrdClJCachePlugin/file/TimeBench.hh +++ b/src/XrdApps/XrdClJCachePlugin/file/TimeBench.hh @@ -28,59 +28,62 @@ #include #include -class TimeBench { -private: - using Clock = std::chrono::high_resolution_clock; - using TimePoint = std::chrono::time_point; - using Duration = std::chrono::microseconds; +namespace JCache +{ + class TimeBench { + private: + using Clock = std::chrono::high_resolution_clock; + using TimePoint = std::chrono::time_point; + using Duration = std::chrono::microseconds; - std::vector> measurements; - std::vector bins; - TimePoint start; - TimePoint end; - uint64_t totalBytes; - size_t nbins; - std::mutex mtx; - -public: - TimeBench() : totalBytes(0), nbins(10) {} + std::vector> measurements; + std::vector bins; + TimePoint start; + TimePoint end; + uint64_t totalBytes; + size_t nbins; + std::mutex mtx; + + public: + TimeBench() : totalBytes(0), nbins(10) {} - void AddMeasurement(uint64_t bytes) { - std::lock_guard guard(mtx); - auto now = Clock::now(); - if (measurements.empty()) { - start = now; + void AddMeasurement(uint64_t bytes) { + std::lock_guard guard(mtx); + auto now = Clock::now(); + if (measurements.empty()) { + start = now; + } + measurements.push_back(std::make_pair(now,bytes)); + totalBytes += bytes; + end = now; } - measurements.push_back(std::make_pair(now,bytes)); - totalBytes += bytes; - end = now; - } - std::vector GetBins(size_t bin = 10) { - std::lock_guard guard(mtx); - nbins = bin?bin:1; - Duration totalTime = std::chrono::duration_cast(end - start); - Duration binSize = totalTime / nbins; - bins.clear(); - bins.resize(nbins, 0); + std::vector GetBins(size_t bin = 10) { + std::lock_guard guard(mtx); + nbins = bin?bin:1; + Duration totalTime = std::chrono::duration_cast(end - start); + Duration binSize = totalTime / nbins; + bins.clear(); + bins.resize(nbins, 0); - size_t binIndex = 0; + size_t binIndex = 0; - for (auto i : measurements) { - binIndex = (i.first - start)/ binSize; - if (binIndex < nbins) { - bins[binIndex] += i.second; - } else { - break; // Don't process future measurements + for (auto i : measurements) { + binIndex = (i.first - start)/ binSize; + if (binIndex < nbins) { + bins[binIndex] += i.second; + } else { + break; // Don't process future measurements + } } - } - return bins; - } + return bins; + } - Duration GetTimePerBin() { - Duration totalTime = std::chrono::duration_cast(end - start); - Duration binSize = totalTime / nbins; - return binSize; - } -}; + Duration GetTimePerBin() { + Duration totalTime = std::chrono::duration_cast(end - start); + Duration binSize = totalTime / nbins; + return binSize; + } + }; +} // namespace JCache \ No newline at end of file diff --git a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc index 26e344e0dc8..9d95a9c6429 100644 --- a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc +++ b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc @@ -23,6 +23,7 @@ /*----------------------------------------------------------------------------*/ #include "XrdClJCacheFile.hh" +#include "file/CacheStats.hh" /*----------------------------------------------------------------------------*/ #include "XrdCl/XrdClMessageUtils.hh" /*----------------------------------------------------------------------------*/ @@ -32,7 +33,7 @@ std::string XrdCl::JCacheFile::sJsonPath="./"; bool XrdCl::JCacheFile::sEnableJournalCache = true; bool XrdCl::JCacheFile::sEnableVectorCache = false; bool XrdCl::JCacheFile::sEnableSummary = true; -XrdCl::JCacheFile::CacheStats XrdCl::JCacheFile::sStats(true); +JCache::CacheStats XrdCl::JCacheFile::sStats(true); JournalManager XrdCl::JCacheFile::sJournalManager; @@ -58,6 +59,7 @@ JCacheFile::JCacheFile(): { mAttachedForRead = false; mLog = DefaultEnv::GetLog(); + pStats = new JCache::CacheStats(); } @@ -67,10 +69,13 @@ JCacheFile::JCacheFile(): JCacheFile::~JCacheFile() { LogStats(); - AddToGlobalStats(); + pStats->AddToStats(sStats); if (pFile) { delete pFile; } + if (pStats) { + delete pStats; + } } @@ -189,8 +194,8 @@ JCacheFile::Read(uint64_t offset, bool eof = false; auto rb = pJournal->pread(buffer, size, offset, eof); if ((rb == size) || (eof && rb)) { - pStats.bytesCached += rb; - pStats.readOps++; + pStats->bytesCached += rb; + pStats->readOps++; // we can only serve success full reads from the cache for now XRootDStatus* ret_st = new XRootDStatus(st); ChunkInfo* chunkInfo = new ChunkInfo(offset, rb, buffer); @@ -202,8 +207,8 @@ JCacheFile::Read(uint64_t offset, } } - auto jhandler = new JCacheReadHandler(handler, &pStats.bytesRead,sEnableJournalCache?pJournal.get():nullptr); - pStats.readOps++; + auto jhandler = new JCacheReadHandler(handler, &pStats->bytesRead,sEnableJournalCache?pJournal.get():nullptr); + pStats->readOps++; st = pFile->Read(offset, size, buffer, jhandler, timeout); } else { st = XRootDStatus(stError, errInvalidOp); @@ -250,8 +255,8 @@ JCacheFile::PgRead( uint64_t offset, bool eof = false; auto rb = pJournal->pread(buffer, size, offset, eof); if ((rb == size) || (eof && rb)) { - pStats.bytesCached += rb; - pStats.readOps++; + pStats->bytesCached += rb; + pStats->readOps++; // we can only serve success full reads from the cache for now XRootDStatus* ret_st = new XRootDStatus(st); ChunkInfo* chunkInfo = new ChunkInfo(offset, rb, buffer); @@ -263,8 +268,8 @@ JCacheFile::PgRead( uint64_t offset, } } - auto jhandler = new JCachePgReadHandler(handler, &pStats.bytesRead,sEnableJournalCache?pJournal.get():nullptr); - pStats.readOps++; + auto jhandler = new JCachePgReadHandler(handler, &pStats->bytesRead,sEnableJournalCache?pJournal.get():nullptr); + pStats->readOps++; st = pFile->PgRead(offset, size, buffer, jhandler, timeout); } else { st = XRootDStatus(stError, errInvalidOp); @@ -366,9 +371,9 @@ JCacheFile::VectorRead(const ChunkList& chunks, vResp = chunks; obj->Set(vReadInfo); handler->HandleResponse(ret_st, obj); - pStats.readVOps++; - pStats.readVreadOps += chunks.size(); - pStats.bytesCachedV += len; + pStats->readVOps++; + pStats->readVreadOps += chunks.size(); + pStats->bytesCachedV += len; return st; } } else { @@ -389,9 +394,9 @@ JCacheFile::VectorRead(const ChunkList& chunks, } if (inJournal) { // we found everything in the journal - pStats.readVOps++; - pStats.readVreadOps += chunks.size(); - pStats.bytesCachedV += len; + pStats->readVOps++; + pStats->readVreadOps += chunks.size(); + pStats->bytesCachedV += len; XRootDStatus* ret_st = new XRootDStatus(st); *ret_st = XRootDStatus(stOK, 0); AnyObject* obj = new AnyObject(); @@ -407,9 +412,9 @@ JCacheFile::VectorRead(const ChunkList& chunks, } - auto jhandler = new JCacheReadVHandler(handler, &pStats.bytesReadV,sEnableJournalCache?pJournal.get():nullptr, buffer?(char*)buffer:(char*)(chunks.begin()->buffer), sEnableVectorCache?sCachePath:"", pUrl); - pStats.readVOps++; - pStats.readVreadOps += chunks.size(); + auto jhandler = new JCacheReadVHandler(handler, &pStats->bytesReadV,sEnableJournalCache?pJournal.get():nullptr, buffer?(char*)buffer:(char*)(chunks.begin()->buffer), sEnableVectorCache?sCachePath:"", pUrl); + pStats->readVOps++; + pStats->readVreadOps += chunks.size(); st = pFile->VectorRead(chunks, buffer, jhandler, timeout); @@ -514,10 +519,10 @@ JCacheFile::AttachForRead() StatInfo* sinfo = 0; auto st = pFile->Stat(false, sinfo); if (sinfo) { - // only add a file if it wasn't yet added - if (!sStats.HasUrl(pUrl)) { - sStats.totaldatasize+=sinfo->GetSize(); - } + // only add a file if it wasn't yet added + if (!sStats.HasUrl(pUrl)) { + sStats.totaldatasize+=sinfo->GetSize(); + } if (pJournal->attach(pJournalPath, sinfo->GetModTime(),0, sinfo->GetSize())) { mLog->Error(1, "JCache : failed to attach to cache directory: %s", pJournalPath.c_str()); mAttachedForRead = true; @@ -533,5 +538,22 @@ JCacheFile::AttachForRead() return true; } + +//---------------------------------------------------------------------------- +//! @brief log cache hit statistics +//---------------------------------------------------------------------------- +void JCacheFile::LogStats() { + mLog->Info(1, "JCache : read:readv-ops:readv-read-ops: %lu:%lu:%lus hit-rate: total [read/readv]=%.02f%% [%.02f%%/%.02f%%] remote-bytes-read/readv: %lu / %lu cached-bytes-read/readv: %lu / %lu", + pStats->readOps.load(), + pStats->readVOps.load(), + pStats->readVreadOps.load(), + pStats->CombinedHitRate(), + pStats->HitRate(), + pStats->HitRateV(), + pStats->bytesRead.load(), + pStats->bytesReadV.load(), + pStats->bytesCached.load(), + pStats->bytesCachedV.load()); + } } // namespace XrdCl diff --git a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh index 992b59df157..3eeac533553 100644 --- a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh +++ b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh @@ -37,7 +37,6 @@ #include "handler/XrdClJCacheReadVHandler.hh" /*----------------------------------------------------------------------------*/ #include -#include #include #include #include @@ -46,6 +45,10 @@ #include /*----------------------------------------------------------------------------*/ +namespace JCache { + class CacheStats; +} + namespace XrdCl { //---------------------------------------------------------------------------- @@ -272,244 +275,11 @@ public: //---------------------------------------------------------------------------- //! @brief log cache hit statistics //---------------------------------------------------------------------------- - void LogStats() { - mLog->Info(1, "JCache : read:readv-ops:readv-read-ops: %lu:%lu:%lus hit-rate: total [read/readv]=%.02f%% [%.02f%%/%.02f%%] remote-bytes-read/readv: %lu / %lu cached-bytes-read/readv: %lu / %lu", - pStats.readOps.load(), - pStats.readVOps.load(), - pStats.readVreadOps.load(), - pStats.CombinedHitRate(), - pStats.HitRate(), - pStats.HitRateV(), - pStats.bytesRead.load(), - pStats.bytesReadV.load(), - pStats.bytesCached.load(), - pStats.bytesCachedV.load()); - } - - void AddToGlobalStats() { - sStats.readOps += pStats.readOps.load(); - sStats.readVOps += pStats.readVOps.load(); - sStats.readVreadOps += pStats.readVreadOps.load(); - sStats.bytesRead += pStats.bytesRead.load(); - sStats.bytesReadV += pStats.bytesReadV.load(); - sStats.bytesCached += pStats.bytesCached.load(); - sStats.bytesCachedV += pStats.bytesCachedV.load(); - sStats.nreadfiles += 1; - } - - static std::string GlobalStats() { - std::ostringstream oss; - oss << "# ----------------------------------------------------------- #" << std::endl; - oss << "# JCache : cache combined hit rate : " << std::fixed << std::setprecision(2) << sStats.CombinedHitRate() << " %" << std::endl; - oss << "# JCache : cache read hit rate : " << std::fixed << std::setprecision(2) << sStats.HitRate() << " %" << std::endl; - oss << "# JCache : cache readv hit rate : " << std::fixed << std::setprecision(2) << sStats.HitRateV() << " %" << std::endl; - oss << "# ----------------------------------------------------------- #" << std::endl; - oss << "# JCache : total bytes read : " << sStats.bytesRead.load()+sStats.bytesCached.load() << std::endl; - oss << "# JCache : total bytes readv : " << sStats.bytesReadV.load()+sStats.bytesCachedV.load() << std::endl; - oss << "# ----------------------------------------------------------- #" << std::endl; - oss << "# JCache : total iops read : " << sStats.readOps.load() << std::endl; - oss << "# JCache : total iops readv : " << sStats.readVOps.load() << std::endl; - oss << "# JCache : total iops readvread : " << sStats.readVreadOps.load() << std::endl; - oss << "# ----------------------------------------------------------- #" << std::endl; - oss << "# JCache : open files read : " << sStats.nreadfiles.load() << std::endl; - oss << "# JCache : open unique f. read : " << sStats.UniqueUrls() << std::endl; - oss << "# ----------------------------------------------------------- #" << std::endl; - oss << "# JCache : total unique files bytes : " << sStats.totaldatasize << std::endl; - oss << "# JCache : total unique files size : " << sStats.bytesToHumanReadable((double)sStats.totaldatasize) << std::endl; - oss << "# JCache : percentage dataset read : " << std::fixed << std::setprecision(2) << sStats.Used() << " %" << std::endl; - oss << "# ----------------------------------------------------------- #" << std::endl; - oss << "# JCache : app user time : " << std::fixed << std::setprecision(2) << sStats.userTime << " s" << std::endl; - oss << "# JCache : app real time : " << std::fixed << std::setprecision(2) << sStats.realTime << " s" << std::endl; - oss << "# JCache : app sys time : " << std::fixed << std::setprecision(2) << sStats.sysTime << " s" << std::endl; - oss << "# JCache : app acceleration : " << std::fixed << std::setprecision(2) << sStats.userTime / sStats.realTime << "x" << std::endl; - oss << "# JCache : app readrate : " << std::fixed << std::setprecision(2) << sStats.bytesToHumanReadable((sStats.ReadBytes()/sStats.realTime)) << "/s" << " [ peak " << sStats.bytesToHumanReadable(sStats.peakrate) << "/s ]" << std::endl; - oss << "# ----------------------------------------------------------- #" << std::endl; - - return oss.str(); - } - //! structure about cache hit statistics - struct CacheStats { - CacheStats(bool doe=false) : - bytesRead(0), - bytesReadV(0), - bytesCached(0), - bytesCachedV(0), - readOps(0), - readVOps(0), - readVreadOps(0), - nreadfiles(0), - dumponexit(doe), - peakrate(0) - { - // Get the current real time - struct timeval now; - gettimeofday(&now, nullptr); - startTime = now.tv_sec + now.tv_usec / 1000000.0; - } - - ~CacheStats() { - if (dumponexit.load()) { - using namespace std::chrono; - std::string jsonpath = sJsonPath + "jcache."; - std::string name = getenv("XRD_APPNAME")?getenv("XRD_APPNAME"):"none"+std::string(".")+std::to_string(getpid()); - jsonpath += name; - jsonpath += ".json"; - sStats.GetTimes(); - - sStats.bytes_per_second = sStats.bench.GetBins((int)(realTime)); - sStats.peakrate = *(std::max_element(sStats.bytes_per_second.begin(), sStats.bytes_per_second.end())); - if (sJsonPath.length()) { - sStats.persistToJson(jsonpath, name); - } - if (sEnableSummary) { - std::cerr << GlobalStats(); - } - std::vector bins = sStats.bench.GetBins(40); - Art art; - if (sEnableSummary) { - art.drawCurve(bins, sStats.bench.GetTimePerBin().count() / 1000000.0, realTime); - } - } - } - - static std::string bytesToHumanReadable(double bytes) { - const char* suffixes[] = {"B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"}; - const int numSuffixes = sizeof(suffixes) / sizeof(suffixes[0]); - - if (bytes == 0) return "0 B"; - - int exp = std::min((int)(std::log(bytes) / std::log(1000)), numSuffixes - 1); - double val = bytes / std::pow(1000, exp); - std::ostringstream oss; - oss << std::fixed << std::setprecision(2) << val << " " << suffixes[exp]; - return oss.str(); - } - - double HitRate() { - auto n = this->bytesCached.load()+this->bytesRead.load(); - if (!n) return 100.0; - return 100.0*(this->bytesCached.load()) / n; - } - double HitRateV() { - auto n = this->bytesCachedV.load()+this->bytesReadV.load(); - if (!n) return 100.0; - return 100.0*(this->bytesCachedV.load()) / n; - } - double CombinedHitRate() { - auto n = (this->bytesCached.load()+this->bytesRead.load()+this->bytesCachedV.load()+this->bytesReadV.load()); - if (!n) return 100.0; - return 100.0*(this->bytesCached.load()+this->bytesCachedV.load()) / n; - } - void AddUrl(const std::string& url) { - std::lock_guard guard(urlMutex); - urls.insert(url); - } - bool HasUrl(const std::string& url) { - std::lock_guard guard(urlMutex); - return urls.count(url); - } - double ReadBytes() { - return (sStats.bytesRead.load()+sStats.bytesReadV.load() + sStats.bytesCached.load() + sStats.bytesCachedV.load()); - } - - double Used() { - if (sStats.totaldatasize) { - return 100.0*(sStats.bytesRead.load()+sStats.bytesReadV.load() + sStats.bytesCached.load() + sStats.bytesCachedV.load()) / sStats.totaldatasize; - } else { - return 100.0; - } - } - - size_t UniqueUrls() { - std::lock_guard guard(urlMutex); - return urls.size(); - } - - void GetTimes() { - struct rusage usage; - struct timeval now; - - // Get the current real time - gettimeofday(&now, nullptr); - realTime = now.tv_sec + now.tv_usec / 1000000.0 - startTime; - - // Get resource usage - getrusage(RUSAGE_SELF, &usage); - - // Get user and system time - userTime = usage.ru_utime.tv_sec + usage.ru_utime.tv_usec / 1000000.0; - sysTime = usage.ru_stime.tv_sec + usage.ru_stime.tv_usec / 1000000.0; - } - - void persistToJson(const std::string& path, const std::string& name) { - std::ofstream outFile(path); - if (!outFile.is_open()) { - std::cerr << "error: failed to open JSON statistics file: " << path << std::endl; - return; - } - - outFile << "{\n"; - outFile << " \"appname\": \"" << name << "\",\n"; - outFile << " \"bytesRead\": " << bytesRead.load() << ",\n"; - outFile << " \"bytesReadV\": " << bytesReadV.load() << ",\n"; - outFile << " \"bytesCached\": " << bytesCached.load() << ",\n"; - outFile << " \"bytesCachedV\": " << bytesCachedV.load() << ",\n"; - outFile << " \"readOps\": " << readOps.load() << ",\n"; - outFile << " \"readVOps\": " << readVOps.load() << ",\n"; - outFile << " \"readVreadOps\": " << readVreadOps.load() << ",\n"; - outFile << " \"nreadfiles\": " << nreadfiles.load() << ",\n"; - outFile << " \"totaldatasize\": " << totaldatasize.load() << ",\n"; - - std::lock_guard lock(urlMutex); - outFile << " \"urls\": ["; - for (auto it = urls.begin(); it != urls.end(); ++it) { - if (it != urls.begin()) { - outFile << ", "; - } - outFile << "\"" << *it << "\""; - } - outFile << "],\n"; - - outFile << " \"bytes_per_second\": ["; - for (size_t i = 0; i < bytes_per_second.size(); ++i) { - if (i != 0) { - outFile << ", "; - } - outFile << bytes_per_second[i]; - } - outFile << "],\n"; - - outFile << std::fixed << std::setprecision(6); // Set precision for double values - - outFile << " \"userTime\": " << userTime.load() << ",\n"; - outFile << " \"realTime\": " << realTime.load() << ",\n"; - outFile << " \"sysTime\": " << sysTime.load() << ",\n"; - outFile << " \"startTime\": " << startTime.load() << "\n"; - outFile << "}\n"; - - outFile.close(); - } - - std::atomic bytesRead; - std::atomic bytesReadV; - std::atomic bytesCached; - std::atomic bytesCachedV; - std::atomic readOps; - std::atomic readVOps; - std::atomic readVreadOps; - std::atomic nreadfiles; - std::atomic totaldatasize; - std::atomic dumponexit; - std::set urls; - std::mutex urlMutex; - std::atomic userTime; - std::atomic realTime; - std::atomic sysTime; - std::atomic startTime; - TimeBench bench; - std::vector bytes_per_second; - std::atomic peakrate; - }; + void LogStats(); + + //! @brief global plugin cache hit statistics + static JCache::CacheStats sStats; + private: //! @brief attach for read @@ -535,10 +305,7 @@ private: Log* mLog; //! @brief cache hit statistics - CacheStats pStats; - - //! @brief global plugin cache hit statistics - static CacheStats sStats; + JCache::CacheStats* pStats; }; } // namespace XrdCl