diff --git a/src/XrdApps/XrdClJCachePlugin/cleaner/Cleaner.cc b/src/XrdApps/XrdClJCachePlugin/cleaner/Cleaner.cc new file mode 100644 index 00000000000..dfb625c1a08 --- /dev/null +++ b/src/XrdApps/XrdClJCachePlugin/cleaner/Cleaner.cc @@ -0,0 +1,136 @@ +//------------------------------------------------------------------------------ +// Copyright (c) 2024 by European Organization for Nuclear Research (CERN) +// Author: Andreas-Joachim Peters +//------------------------------------------------------------------------------ +// This file is part of the XRootD software suite. +// +// XRootD is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// XRootD is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with XRootD. If not, see . +// +// In applying this licence, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/*----------------------------------------------------------------------------*/ +#include "cleaner/Cleaner.hh" +#include +/*----------------------------------------------------------------------------*/ + +namespace fs = std::filesystem; +namespace JCache { +/*----------------------------------------------------------------------------*/ + +//---------------------------------------------------------------------------- +// @brief getLastAccessTime() returns the last access time of a file. +// +// @param filePath - path to the file +// @return last access time of the file in seconds +//---------------------------------------------------------------------------- +time_t Cleaner::getLastAccessTime(const fs::path &filePath) { + struct stat fileInfo; + if (stat(filePath.c_str(), &fileInfo) != 0) { + return -1; // Error occurred + } + return fileInfo.st_atime; +} + +//---------------------------------------------------------------------------- +// @brief getDirectorySize() returns the total size of all files in a +// directory. +// +// @param directory - path to the directory +// @return total size of all files in the directory in bytes +//---------------------------------------------------------------------------- +long long Cleaner::getDirectorySize(const fs::path &directory, bool scan) { + + long long totalSize = 0; + if (scan) { + for (const auto &entry : fs::recursive_directory_iterator(directory)) { + if (stopFlag.load()) { + return 0; + } + if (fs::is_regular_file(entry)) { + totalSize += fs::file_size(entry); + } + } + } else { + struct statfs stat; + + if (statfs(directory.c_str(), &stat) != 0) { + mLog->Error(1,"JCache:Cleaner: failed to get directory size using statfs."); + return 0; + } + } + return totalSize; +} + +//---------------------------------------------------------------------------- +// @brief getFilesByAccessTime() returns a list of files sorted by their +// last access time. +// +// @param directory - path to the directory +// @return list of files sorted by their last access time +//---------------------------------------------------------------------------- +std::vector> +Cleaner::getFilesByAccessTime(const fs::path &directory) { + std::vector> fileList; + for (const auto &entry : fs::recursive_directory_iterator(directory)) { + if (fs::is_regular_file(entry)) { + auto accessTime = getLastAccessTime(entry.path()); + fileList.emplace_back(accessTime, entry.path()); + } + } + std::sort(fileList.begin(), fileList.end()); + return fileList; +} + +//---------------------------------------------------------------------------- +// @brief cleanDirectory() deletes files from a directory that are older than +// a given threshold. +// +// @param directory - path to the directory +// @param highWatermark - threshold for high watermark in bytes +// @param lowWatermark - threshold for low watermark in bytes +// @return none +//---------------------------------------------------------------------------- +void Cleaner::cleanDirectory(const fs::path &directory, long long highWatermark, + long long lowWatermark) { + long long currentSize = getDirectorySize(directory); + if (currentSize <= highWatermark) { + /*----------------------------------------------------------------------------*/ + mLog->Info(1,"JCache:Cleaner: Directory size is within the limit (%lu/%lu). No action needed.", + currentSize, highWatermark); + return; + } + + auto files = getFilesByAccessTime(directory); + + for (const auto &[accessTime, filePath] : files) { + if (stopFlag.load()) { + return; + } + if (currentSize <= lowWatermark) { + break; + } + long long fileSize = fs::file_size(filePath); + try { + fs::remove(filePath); + currentSize -= fileSize; + mLog->Info(1, "JCache:Cleaner : deleted '%s' (Size: %lu bytes)", filePath.c_str(), + fileSize); + } catch (const std::exception &e) { + mLog->Error(1,"JCache::Cleaner error deleting '%'", filePath.c_str()); + } + } +} +} // namespace JCache \ No newline at end of file diff --git a/src/XrdApps/XrdClJCachePlugin/cleaner/Cleaner.hh b/src/XrdApps/XrdClJCachePlugin/cleaner/Cleaner.hh new file mode 100644 index 00000000000..f8677a29067 --- /dev/null +++ b/src/XrdApps/XrdClJCachePlugin/cleaner/Cleaner.hh @@ -0,0 +1,145 @@ +//------------------------------------------------------------------------------ +// Copyright (c) 2024 by European Organization for Nuclear Research (CERN) +// Author: Andreas-Joachim Peters +//------------------------------------------------------------------------------ +// This file is part of the XRootD software suite. +// +// XRootD is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// XRootD is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with XRootD. If not, see . +// +// In applying this licence, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#pragma once +/*----------------------------------------------------------------------------*/ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +/*----------------------------------------------------------------------------*/ +#include "XrdCl/XrdClDefaultEnv.hh" +#include "XrdCl/XrdClLog.hh" +/*----------------------------------------------------------------------------*/ +namespace fs = std::filesystem; + +namespace JCache {} +namespace JCache { +class Cleaner { +public: + // Constructor to initialize the configuration variables + Cleaner(double lowWatermark, double highWatermark, const std::string &path, + bool scan, size_t interval) + : lowWatermark(lowWatermark), highWatermark(highWatermark), subtree(path), + scan(scan), interval(interval), stopFlag(false) { + mLog = XrdCl::DefaultEnv::GetLog(); + } + + Cleaner() + : lowWatermark(0), highWatermark(0), subtree(""), + scan(true), interval(60), stopFlag(false) { + mLog = XrdCl::DefaultEnv::GetLog(); + } + + // Method to start the cleaning process in a separate thread + void run() { + stopFlag = false; + cleanerThread = std::thread(&Cleaner::cleanLoop, this); + cleanerThread.detach(); + } + + // Method to stop the cleaning process and join the thread + void stop() { + stopFlag = true; + if (cleanerThread.joinable()) { + cleanerThread.join(); + } + } + + // Destructor to ensure the thread is properly joined + ~Cleaner() { stop(); } + + // Method to Define Cleaner size + void SetSize(uint64_t size, const std::string& path) { + stop(); + if (size > 1024ll*1024ll*1024ll) { + subtree = path; + highWatermark = size; + lowWatermark = size * 0.9; + run(); + } else { + mLog->Error(1, "JCache:Cleaner : the size given to the cleaner is less than 1GB - cleaning is disabled!"); + } + } + + // Method to set the scan option (true means scan, false means don't scan but use statfs!) + void SetScan(bool sc) { + scan = sc; + } + +private: + // Private methods + time_t getLastAccessTime(const fs::path &filePath); + long long getDirectorySize(const fs::path &directory, bool scan = true); + std::vector> + getFilesByAccessTime(const fs::path &directory); + void cleanDirectory(const fs::path &directory, long long highWatermark, + long long lowWatermark); + + // Configuration variables + double lowWatermark; + double highWatermark; + fs::path subtree; + std::atomic scan; + size_t interval; + + // Thread-related variables + std::thread cleanerThread; + std::atomic stopFlag; + + // XRootD logger + XrdCl::Log *mLog; + + // The method that runs in a loop, calling the clean method every interval + // seconds + void cleanLoop() { + + while (!stopFlag.load()) { + auto start = std::chrono::steady_clock::now(); + + cleanDirectory(subtree, highWatermark, lowWatermark); + + auto end = std::chrono::steady_clock::now(); + auto duration = + std::chrono::duration_cast(end - start); + + if ( (size_t)duration.count() < interval) { + auto s = std::chrono::seconds(interval) - duration; + std::this_thread::sleep_for(s); + } + } + } + + // The clean method to be called periodically + void clean() {} +}; +} // namespace JCache diff --git a/src/XrdApps/XrdClJCachePlugin/file/CacheStats.hh b/src/XrdApps/XrdClJCachePlugin/file/CacheStats.hh index b1d9164890a..49279ed3a28 100644 --- a/src/XrdApps/XrdClJCachePlugin/file/CacheStats.hh +++ b/src/XrdApps/XrdClJCachePlugin/file/CacheStats.hh @@ -83,6 +83,7 @@ struct CacheStats { JCache::Art art; if (XrdCl::JCacheFile::sEnableSummary) { std::cerr << "# IO Timeprofile " << std::endl; + std::cerr << "# --------------" << std::endl; art.drawCurve(bins, XrdCl::JCacheFile::sStats.bench.GetTimePerBin().count() / 1000000.0, diff --git a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc index 0d7250cd9bf..d11f5725187 100644 --- a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc +++ b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc @@ -34,7 +34,7 @@ bool XrdCl::JCacheFile::sEnableJournalCache = true; bool XrdCl::JCacheFile::sEnableVectorCache = false; bool XrdCl::JCacheFile::sEnableSummary = true; JCache::CacheStats XrdCl::JCacheFile::sStats(true); - +JCache::Cleaner XrdCl::JCacheFile::sCleaner; JournalManager XrdCl::JCacheFile::sJournalManager; namespace XrdCl { diff --git a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh index 343a1e89063..c807cde6c88 100644 --- a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh +++ b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh @@ -29,6 +29,7 @@ #include "XrdCl/XrdClPlugInInterface.hh" /*----------------------------------------------------------------------------*/ #include "cache/Journal.hh" +#include "cleaner/Cleaner.hh" #include "file/Art.hh" #include "file/TimeBench.hh" #include "handler/XrdClJCachePgReadHandler.hh" @@ -220,6 +221,7 @@ public: static void SetVector(const bool &value) { sEnableVectorCache = value; } static void SetJsonPath(const std::string &path) { sJsonPath = path; } static void SetSummary(const bool &value) { sEnableSummary = value; } + static void SetSize(uint64_t size) { sCleaner.SetSize(size,sCachePath);} //---------------------------------------------------------------------------- //! @brief static members pointing to cache settings @@ -239,6 +241,9 @@ public: //! @brief global plugin cache hit statistics static JCache::CacheStats sStats; + //! @brief cleaner instance + static JCache::Cleaner sCleaner; + private: //! @brief attach for read bool AttachForRead(); diff --git a/src/XrdApps/XrdClJCachePlugin/handler/XrdClJCachePgReadHandler.hh b/src/XrdApps/XrdClJCachePlugin/handler/XrdClJCachePgReadHandler.hh index dd8060e7cf9..7c4fb3bedda 100644 --- a/src/XrdApps/XrdClJCachePlugin/handler/XrdClJCachePgReadHandler.hh +++ b/src/XrdApps/XrdClJCachePlugin/handler/XrdClJCachePgReadHandler.hh @@ -37,7 +37,7 @@ class JCachePgReadHandler : public XrdCl::ResponseHandler public: JCachePgReadHandler() {} - JCachePgReadHandler(JCacheReadHandler *other) { + JCachePgReadHandler(JCachePgReadHandler *other) { rbytes = other->rbytes; journal = other->journal; } diff --git a/src/XrdApps/XrdClJCachePlugin/plugin/XrdClJCachePlugin.hh b/src/XrdApps/XrdClJCachePlugin/plugin/XrdClJCachePlugin.hh index 24775690f95..d0ce85a0ce6 100644 --- a/src/XrdApps/XrdClJCachePlugin/plugin/XrdClJCachePlugin.hh +++ b/src/XrdApps/XrdClJCachePlugin/plugin/XrdClJCachePlugin.hh @@ -49,14 +49,19 @@ public: if (config) { auto itc = config->find("cache"); JCacheFile::SetCache(itc != config->end() ? itc->second : ""); + + auto itsz = config->find("size"); + JCacheFile::SetSize(itsz != config->end() ? std::stoll(std::string(itsz->second),0,10) : 0); + auto itv = config->find("vector"); JCacheFile::SetVector(itv != config->end() ? itv->second == "true" : false); auto itj = config->find("journal"); - JCacheFile::SetJournal(itj != config->end() ? itj->second == "true" - : false); + JCacheFile::SetJournal(itj != config->end() ? itj->second == "false" + : true); auto itjson = config->find("json"); JCacheFile::SetJsonPath(itjson != config->end() ? itjson->second : "./"); + auto its = config->find("summary"); JCacheFile::SetSummary(its != config->end() ? its->second != "false" : true); @@ -65,6 +70,10 @@ public: JCacheFile::SetCache((std::string(v).length()) ? std::string(v) : ""); } + if (const char *v = getenv("XRD_JCACHE_SIZE")) { + JCacheFile::SetSize((std::string(v).length()) ? std::stoll(std::string(v),0,10) : 0); + } + if (const char *v = getenv("XRD_JCACHE_SUMMARY")) { JCacheFile::SetSummary((std::string(v) == "true") ? true : false); }