From 803e350c62556e86382effaf9a5b75939fa8946d Mon Sep 17 00:00:00 2001 From: Andreas Joachim Peters Date: Mon, 17 Jun 2024 15:05:31 +0200 Subject: [PATCH] XrdApps::JCache implement bypass functionality which does not create any cache folders or uses cached file contents, but is keesp all the IO statistics. This can be enabled using XRD_JCACHE_BYPASS=true|1 in the environment or 'bypass=true', 'bypass=1' in the plug-in configuration file. --- .../XrdClJCachePlugin/cache/Journal.cc | 27 ++++++++----- src/XrdApps/XrdClJCachePlugin/file/Art.hh | 6 +++ .../XrdClJCachePlugin/file/CacheStats.hh | 38 ++++++++++--------- .../XrdClJCachePlugin/file/XrdClJCacheFile.cc | 21 ++++++---- .../XrdClJCachePlugin/file/XrdClJCacheFile.hh | 2 + .../plugin/XrdClJCachePlugin.hh | 9 +++++ 6 files changed, 68 insertions(+), 35 deletions(-) diff --git a/src/XrdApps/XrdClJCachePlugin/cache/Journal.cc b/src/XrdApps/XrdClJCachePlugin/cache/Journal.cc index 4344fb30383..6b92ca6d2e6 100644 --- a/src/XrdApps/XrdClJCachePlugin/cache/Journal.cc +++ b/src/XrdApps/XrdClJCachePlugin/cache/Journal.cc @@ -79,6 +79,7 @@ Journal::~Journal() { //------------------------------------------------------------------------------ void Journal::read_jheader() { jheader_t fheader; + bool exists=false; auto hr = ::pread64(fd, &fheader, sizeof(jheader), 0); if ((hr > 0) && ((hr != sizeof(jheader)) || (fheader.magic != JOURNAL_MAGIC))) { @@ -88,18 +89,24 @@ void Journal::read_jheader() { reset(); return; } + + exists = (hr==sizeof(jheader)); + // TODO: understand why the mtime is +-1s if (jheader.mtime) { - if ((abs(fheader.mtime - jheader.mtime) > 1) || - (fheader.mtime_nsec != jheader.mtime_nsec) || - (jheader.filesize && (fheader.filesize != jheader.filesize))) { - std::cerr << "warning: remote file change detected - purging path:" - << path << std::endl; - std::cerr << fheader.mtime << ":" << jheader.mtime << " " - << fheader.mtime_nsec << ":" << jheader.mtime_nsec << " " - << fheader.filesize << ":" << jheader.filesize << std::endl; - reset(); - return; + if (exists) { + // we only compare if there was a header in the journal + if ((abs(fheader.mtime - jheader.mtime) > 1) || + (fheader.mtime_nsec != jheader.mtime_nsec) || + (jheader.filesize && (fheader.filesize != jheader.filesize))) { + std::cerr << "warning: remote file change detected - purging path:" + << path << std::endl; + std::cerr << fheader.mtime << ":" << jheader.mtime << " " + << fheader.mtime_nsec << ":" << jheader.mtime_nsec << " " + << fheader.filesize << ":" << jheader.filesize << std::endl; + reset(); + return; + } } } else { // we assume the contents referenced in the header is ok to allow disconnected ops diff --git a/src/XrdApps/XrdClJCachePlugin/file/Art.hh b/src/XrdApps/XrdClJCachePlugin/file/Art.hh index 342f08f21b5..98d80026dd2 100644 --- a/src/XrdApps/XrdClJCachePlugin/file/Art.hh +++ b/src/XrdApps/XrdClJCachePlugin/file/Art.hh @@ -46,6 +46,12 @@ public: double maxValue = *std::max_element(dataPoints.begin(), dataPoints.end()); double minValue = *std::min_element(dataPoints.begin(), dataPoints.end()); + // we want to see the full range starting at 0 on the Y axis for now + minValue = 0; + + // we round the axis to clean 100 + maxValue = (int)(maxValue+9) / 10 * 10; + const int plotHeight = 10; // Number of lines in the plot const int plotWidth = 40; // Width of the plot in characters const int yLegendWidth = 8; // Width of the Y legend in characters diff --git a/src/XrdApps/XrdClJCachePlugin/file/CacheStats.hh b/src/XrdApps/XrdClJCachePlugin/file/CacheStats.hh index bbbe4c5e25b..cffd527ee99 100644 --- a/src/XrdApps/XrdClJCachePlugin/file/CacheStats.hh +++ b/src/XrdApps/XrdClJCachePlugin/file/CacheStats.hh @@ -82,7 +82,7 @@ struct CacheStats { XrdCl::JCacheFile::sStats.persistToJson(jsonpath, name); } if (XrdCl::JCacheFile::sEnableSummary) { - std::cerr << CacheStats::GlobalStats(XrdCl::JCacheFile::sStats); + std::cerr << CacheStats::GlobalStats(XrdCl::JCacheFile::sStats, XrdCl::JCacheFile::sEnableBypass); } std::vector bins = XrdCl::JCacheFile::sStats.bench.GetBins(40); JCache::Art art; @@ -264,7 +264,7 @@ struct CacheStats { gStats.nreadfiles += 1; } - static std::string GlobalStats(CacheStats &sStats) { + static std::string GlobalStats(CacheStats &sStats, bool bypass) { std::ostringstream oss; oss << "# " @@ -279,22 +279,26 @@ struct CacheStats { "---- #" << std::endl; - - oss << "# JCache : cache combined hit rate : " << std::fixed - << std::setprecision(2) << sStats.CombinedHitRate() << " %" - << std::endl; - oss << "# JCache : cache read hit rate : " << std::fixed - << std::setprecision(2) << (!sStats.ReadOpBytes() ? "\033[9m" : "") - << sStats.HitRate() << " %" << (!sStats.ReadOpBytes() ? "\033[0m" : "") - << std::endl; - oss << "# JCache : cache readv hit rate : " << std::fixed - << std::setprecision(2) << (!sStats.ReadVOpBytes() ? "\033[9m" : "") - << sStats.HitRateV() << " %" << (!sStats.ReadOpBytes() ? "\033[0m" : "") - << std::endl; + if (bypass) { + oss << "# JCache : cache runs in bypass mode (hitrate=0%) " << std::endl; + } else { + oss << "# JCache : cache combined hit rate : " << std::fixed + << std::setprecision(2) << sStats.CombinedHitRate() << " %" + << std::endl; + oss << "# JCache : cache read hit rate : " << std::fixed + << std::setprecision(2) << (!sStats.ReadOpBytes() ? "\033[9m" : "") + << sStats.HitRate() << " %" << (!sStats.ReadOpBytes() ? "\033[0m" : "") + << std::endl; + oss << "# JCache : cache readv hit rate : " << std::fixed + << std::setprecision(2) << (!sStats.ReadVOpBytes() ? "\033[9m" : "") + << sStats.HitRateV() << " %" << (!sStats.ReadOpBytes() ? "\033[0m" : "") + << std::endl; + } oss << "# " - "-------------------------------------------------------------------" - "---- #" + "-------------------------------------------------------------------" + "---- #" << std::endl; + oss << "# JCache : total bytes read : " << sStats.bytesRead.load() + sStats.bytesCached.load() << std::endl; oss << "# JCache : total bytes readv : " @@ -401,7 +405,7 @@ struct CacheStats { if (XrdCl::JCacheFile::sStats.realTime < 1) { XrdCl::JCacheFile::sStats.peakrate = XrdCl::JCacheFile::sStats.ReadBytes() / XrdCl::JCacheFile::sStats.realTime; } - std::string st = CacheStats::GlobalStats(XrdCl::JCacheFile::sStats); + std::string st = CacheStats::GlobalStats(XrdCl::JCacheFile::sStats, XrdCl::JCacheFile::sEnableBypass); std::cerr << st << std::endl; } } diff --git a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc index 99658d7cd73..fd86d59d3d5 100644 --- a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc +++ b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc @@ -33,6 +33,7 @@ std::string XrdCl::JCacheFile::sJsonPath = ""; bool XrdCl::JCacheFile::sEnableJournalCache = true; bool XrdCl::JCacheFile::sEnableVectorCache = false; bool XrdCl::JCacheFile::sEnableSummary = true; +bool XrdCl::JCacheFile::sEnableBypass = false; bool XrdCl::JCacheFile::sOpenAsync = false; JCache::CacheStats XrdCl::JCacheFile::sStats(true); JCache::Cleaner XrdCl::JCacheFile::sCleaner; @@ -127,7 +128,7 @@ XRootDStatus JCacheFile::Open(const std::string &url, OpenFlags::Flags flags, if (st.IsOK()) { mIsOpen = true; mOpenState = OPENING; - if (sEnableVectorCache || sEnableJournalCache) { + if (sEnableVectorCache || (sEnableJournalCache && !sEnableBypass)) { if ((flags & OpenFlags::Flags::Read) == OpenFlags::Flags::Read) { std::string JournalDir = sCachePath + "/" + VectorCache::computeSHA256(pUrl); @@ -228,7 +229,7 @@ XRootDStatus JCacheFile::Read(uint64_t offset, uint32_t size, void *buffer, if (pFile) { sStats.bench.AddMeasurement(size); - if (sEnableJournalCache && AttachForRead()) { + if (!sEnableBypass && sEnableJournalCache && AttachForRead()) { mLog->Info(1, "JCache : Read: offset=%llu size=%llu buffer=%x path='%s'", offset, size, buffer, pUrl.c_str()); bool eof = false; @@ -255,7 +256,7 @@ XRootDStatus JCacheFile::Read(uint64_t offset, uint32_t size, void *buffer, auto jhandler = new JCacheReadHandler(handler, &pStats->bytesRead, - sEnableJournalCache ? pJournal.get() : nullptr); + sEnableJournalCache &&! sEnableBypass? pJournal.get() : nullptr); pStats->readOps++; st = pFile->Read(offset, size, buffer, jhandler, timeout); } else { @@ -289,7 +290,7 @@ XRootDStatus JCacheFile::PgRead(uint64_t offset, uint32_t size, void *buffer, if (pFile) { sStats.bench.AddMeasurement(size); - if (sEnableJournalCache && AttachForRead()) { + if (sEnableJournalCache && AttachForRead() && !sEnableBypass) { mLog->Info(1, "JCache : PgRead: offset=%llu size=%llu buffer=%x path='%s'", offset, size, buffer, pUrl.c_str()); bool eof = false; @@ -318,7 +319,7 @@ XRootDStatus JCacheFile::PgRead(uint64_t offset, uint32_t size, void *buffer, } auto jhandler = new JCachePgReadHandler(handler, &pStats->bytesRead, - sEnableJournalCache ? pJournal.get() : nullptr); + sEnableJournalCache && !sEnableBypass ? pJournal.get() : nullptr); pStats->readOps++; st = pFile->PgRead(offset, size, buffer, jhandler, timeout); } else { @@ -414,7 +415,7 @@ XRootDStatus JCacheFile::VectorRead(const ChunkList &chunks, void *buffer, return st; } } else { - if (sEnableJournalCache) { + if (!sEnableBypass && sEnableJournalCache) { bool inJournal = true; size_t len = 0; // try to get chunks from journal cache @@ -456,7 +457,7 @@ XRootDStatus JCacheFile::VectorRead(const ChunkList &chunks, void *buffer, auto jhandler = new JCacheReadVHandler( handler, &pStats->bytesReadV, - sEnableJournalCache ? pJournal.get() : nullptr, + sEnableJournalCache && !sEnableBypass ? pJournal.get() : nullptr, buffer ? (char *)buffer : (char *)(chunks.begin()->buffer), sEnableVectorCache ? sCachePath : "", pUrl); pStats->readVOps++; @@ -586,8 +587,12 @@ bool JCacheFile::AttachForRead() { } if (pJournal->attach(pJournalPath, sinfo->GetModTime(), 0, sinfo->GetSize())) { - mLog->Error(1, "JCache : failed to attach to cache file: %s", + if (!sEnableBypass) { + // when bypass=true this might throw an error because we don't create the + // journal directory - we just don't want to see this + mLog->Error(1, "JCache : failed to attach to cache file: %s", pJournalPath.c_str()); + } mAttachedForRead = true; delete sinfo; return false; diff --git a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh index 444cb30e5d3..625243a01c1 100644 --- a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh +++ b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh @@ -222,6 +222,7 @@ public: static void SetVector(const bool &value) { sEnableVectorCache = value; } static void SetJsonPath(const std::string &path) { sJsonPath = path; } static void SetSummary(const bool &value) { sEnableSummary = value; } + static void SetBypass(const bool &value) { sEnableBypass = value; } static void SetSize(uint64_t size) { sCleaner.SetSize(size,sCachePath); } static void SetAsync(bool async) { sOpenAsync = async; } @@ -232,6 +233,7 @@ public: static std::string sJsonPath; static bool sEnableVectorCache; static bool sEnableJournalCache; + static bool sEnableBypass; static bool sEnableSummary; static bool sOpenAsync; static JournalManager sJournalManager; diff --git a/src/XrdApps/XrdClJCachePlugin/plugin/XrdClJCachePlugin.hh b/src/XrdApps/XrdClJCachePlugin/plugin/XrdClJCachePlugin.hh index 1eae6d590b6..4fe8f2ab899 100644 --- a/src/XrdApps/XrdClJCachePlugin/plugin/XrdClJCachePlugin.hh +++ b/src/XrdApps/XrdClJCachePlugin/plugin/XrdClJCachePlugin.hh @@ -62,6 +62,9 @@ public: auto ita = config->find("async"); JCacheFile::SetAsync(ita != config->end() ? (ita->second == "true") || (ita->second == "1") : false); + auto itb = config->find("bypass"); + JCacheFile::SetBypass(itb != config->end() ? (itb->second == "true") || (itb->second == "1") + : false); auto itjson = config->find("json"); JCacheFile::SetJsonPath(itjson != config->end() ? itjson->second : ""); @@ -96,6 +99,10 @@ public: JCacheFile::SetAsync(((std::string(v) == "true") || (std::string(v) == "1")) ? true : false); } + if (const char *v = getenv("XRD_JCACHE_BYPASS")) { + JCacheFile::SetBypass(((std::string(v) == "true") || (std::string(v) == "1")) ? true : false); + } + if (const char *v = getenv("XRD_JCACHE_JSON")) { JCacheFile::SetJsonPath((std::string(v).length()) ? std::string(v) : ""); @@ -116,6 +123,8 @@ public: JCacheFile::sEnableSummary ? "true" : "false"); log->Info(1, "JCache : asynchrous/disconnected operation: %s", JCacheFile::sOpenAsync ? "true" : "false"); + log->Info(1, "JCache : bypass operation: %s", + JCacheFile::sEnableBypass ? "true" : "false"); if (JCacheFile::sJsonPath.length()) { log->Info(1, "JCache : json output to prefix: %s", JCacheFile::sJsonPath.c_str());