From d0e22c7dac5222314544bc326d7bc4f7d9d91f66 Mon Sep 17 00:00:00 2001
From: Andreas Joachim Peters <andreas.joachim.peters@cern.ch>
Date: Tue, 11 Jun 2024 15:49:11 +0200
Subject: [PATCH] XrdApps::JCache: add cleaner thread functionality which is
 enabled in the plug-in using 'size=<bytes>'.

---
 .../XrdClJCachePlugin/cleaner/Cleaner.cc      | 136 ++++++++++++++++
 .../XrdClJCachePlugin/cleaner/Cleaner.hh      | 145 ++++++++++++++++++
 .../XrdClJCachePlugin/file/CacheStats.hh      |   1 +
 .../XrdClJCachePlugin/file/XrdClJCacheFile.cc |   2 +-
 .../XrdClJCachePlugin/file/XrdClJCacheFile.hh |   5 +
 .../handler/XrdClJCachePgReadHandler.hh       |   2 +-
 .../plugin/XrdClJCachePlugin.hh               |  13 +-
 7 files changed, 300 insertions(+), 4 deletions(-)
 create mode 100644 src/XrdApps/XrdClJCachePlugin/cleaner/Cleaner.cc
 create mode 100644 src/XrdApps/XrdClJCachePlugin/cleaner/Cleaner.hh

diff --git a/src/XrdApps/XrdClJCachePlugin/cleaner/Cleaner.cc b/src/XrdApps/XrdClJCachePlugin/cleaner/Cleaner.cc
new file mode 100644
index 00000000000..dfb625c1a08
--- /dev/null
+++ b/src/XrdApps/XrdClJCachePlugin/cleaner/Cleaner.cc
@@ -0,0 +1,136 @@
+//------------------------------------------------------------------------------
+// Copyright (c) 2024 by European Organization for Nuclear Research (CERN)
+// Author: Andreas-Joachim Peters <andreas.joachim.peters@cern.ch>
+//------------------------------------------------------------------------------
+// This file is part of the XRootD software suite.
+//
+// XRootD is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// XRootD is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with XRootD.  If not, see <http://www.gnu.org/licenses/>.
+//
+// In applying this licence, CERN does not waive the privileges and immunities
+// granted to it by virtue of its status as an Intergovernmental Organization
+// or submit itself to any jurisdiction.
+
+/*----------------------------------------------------------------------------*/
+#include "cleaner/Cleaner.hh"
+#include <sys/statfs.h>
+/*----------------------------------------------------------------------------*/
+
+namespace fs = std::filesystem;
+namespace JCache {
+/*----------------------------------------------------------------------------*/
+
+//----------------------------------------------------------------------------
+// @brief getLastAccessTime() returns the last access time of a file.
+//
+// @param  filePath - path to the file
+// @return last access time of the file in seconds
+//----------------------------------------------------------------------------
+time_t Cleaner::getLastAccessTime(const fs::path &filePath) {
+  struct stat fileInfo;
+  if (stat(filePath.c_str(), &fileInfo) != 0) {
+    return -1; // Error occurred
+  }
+  return fileInfo.st_atime;
+}
+
+//----------------------------------------------------------------------------
+// @brief getDirectorySize() returns the total size of all files in a
+//        directory.
+//
+// @param  directory - path to the directory
+// @return total size of all files in the directory in bytes
+//----------------------------------------------------------------------------
+long long Cleaner::getDirectorySize(const fs::path &directory, bool scan) {
+
+  long long totalSize = 0;
+  if (scan) {
+    for (const auto &entry : fs::recursive_directory_iterator(directory)) {
+      if (stopFlag.load()) {
+        return 0;
+      }
+      if (fs::is_regular_file(entry)) {
+        totalSize += fs::file_size(entry);
+      }
+    }
+  } else {
+    struct statfs stat;
+
+    if (statfs(directory.c_str(), &stat) != 0) {
+      mLog->Error(1,"JCache:Cleaner: failed to get directory size using statfs.");
+      return 0; 
+    }
+  }
+  return totalSize;
+}
+
+//----------------------------------------------------------------------------
+// @brief getFilesByAccessTime() returns a list of files sorted by their
+//        last access time.
+//
+// @param  directory - path to the directory
+// @return list of files sorted by their last access time
+//----------------------------------------------------------------------------
+std::vector<std::pair<long long, fs::path>>
+Cleaner::getFilesByAccessTime(const fs::path &directory) {
+  std::vector<std::pair<long long, fs::path>> fileList;
+  for (const auto &entry : fs::recursive_directory_iterator(directory)) {
+    if (fs::is_regular_file(entry)) {
+      auto accessTime = getLastAccessTime(entry.path());
+      fileList.emplace_back(accessTime, entry.path());
+    }
+  }
+  std::sort(fileList.begin(), fileList.end());
+  return fileList;
+}
+
+//----------------------------------------------------------------------------
+// @brief cleanDirectory() deletes files from a directory that are older than
+//        a given threshold.
+//
+// @param  directory - path to the directory
+// @param  highWatermark - threshold for high watermark in bytes
+// @param  lowWatermark - threshold for low watermark in bytes
+// @return none
+//----------------------------------------------------------------------------
+void Cleaner::cleanDirectory(const fs::path &directory, long long highWatermark,
+                             long long lowWatermark) {
+  long long currentSize = getDirectorySize(directory);
+  if (currentSize <= highWatermark) {
+    /*----------------------------------------------------------------------------*/
+    mLog->Info(1,"JCache:Cleaner: Directory size is within the limit (%lu/%lu). No action needed.",
+    currentSize, highWatermark);
+    return;
+  }
+
+  auto files = getFilesByAccessTime(directory);
+
+  for (const auto &[accessTime, filePath] : files) {
+    if (stopFlag.load()) {
+      return;
+    }
+    if (currentSize <= lowWatermark) {
+      break;
+    }
+    long long fileSize = fs::file_size(filePath);
+    try {
+      fs::remove(filePath);
+      currentSize -= fileSize;
+      mLog->Info(1, "JCache:Cleaner : deleted '%s' (Size: %lu bytes)", filePath.c_str(),
+      fileSize);
+    } catch (const std::exception &e) {
+      mLog->Error(1,"JCache::Cleaner error deleting '%'", filePath.c_str());
+    }
+  }
+}
+} // namespace JCache
\ No newline at end of file
diff --git a/src/XrdApps/XrdClJCachePlugin/cleaner/Cleaner.hh b/src/XrdApps/XrdClJCachePlugin/cleaner/Cleaner.hh
new file mode 100644
index 00000000000..f8677a29067
--- /dev/null
+++ b/src/XrdApps/XrdClJCachePlugin/cleaner/Cleaner.hh
@@ -0,0 +1,145 @@
+//------------------------------------------------------------------------------
+// Copyright (c) 2024 by European Organization for Nuclear Research (CERN)
+// Author: Andreas-Joachim Peters <andreas.joachim.peters@cern.ch>
+//------------------------------------------------------------------------------
+// This file is part of the XRootD software suite.
+//
+// XRootD is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// XRootD is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with XRootD.  If not, see <http://www.gnu.org/licenses/>.
+//
+// In applying this licence, CERN does not waive the privileges and immunities
+// granted to it by virtue of its status as an Intergovernmental Organization
+// or submit itself to any jurisdiction.
+
+#pragma once
+/*----------------------------------------------------------------------------*/
+#include <algorithm>
+#include <atomic>
+#include <chrono>
+#include <ctime>
+#include <filesystem>
+#include <iomanip>
+#include <iostream>
+#include <string>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <thread>
+#include <unistd.h>
+#include <vector>
+/*----------------------------------------------------------------------------*/
+#include "XrdCl/XrdClDefaultEnv.hh"
+#include "XrdCl/XrdClLog.hh"
+/*----------------------------------------------------------------------------*/
+namespace fs = std::filesystem;
+
+namespace JCache {}
+namespace JCache {
+class Cleaner {
+public:
+  // Constructor to initialize the configuration variables
+  Cleaner(double lowWatermark, double highWatermark, const std::string &path,
+          bool scan, size_t interval)
+      : lowWatermark(lowWatermark), highWatermark(highWatermark), subtree(path),
+        scan(scan), interval(interval), stopFlag(false) {
+          mLog = XrdCl::DefaultEnv::GetLog();
+        }
+
+   Cleaner()
+      : lowWatermark(0), highWatermark(0), subtree(""),
+        scan(true), interval(60), stopFlag(false) {
+          mLog = XrdCl::DefaultEnv::GetLog();
+        }
+
+  // Method to start the cleaning process in a separate thread
+  void run() {
+    stopFlag = false;
+    cleanerThread = std::thread(&Cleaner::cleanLoop, this);
+    cleanerThread.detach();
+  }
+
+  // Method to stop the cleaning process and join the thread
+  void stop() {
+    stopFlag = true;
+    if (cleanerThread.joinable()) {
+      cleanerThread.join();
+    }
+  }
+
+  // Destructor to ensure the thread is properly joined
+  ~Cleaner() { stop(); }
+
+  // Method to Define Cleaner size
+  void SetSize(uint64_t size, const std::string& path) {
+    stop();
+    if (size > 1024ll*1024ll*1024ll) {
+      subtree = path;
+      highWatermark = size;
+      lowWatermark = size * 0.9;
+      run();
+    } else {
+      mLog->Error(1, "JCache:Cleaner : the size given to the cleaner is less than 1GB - cleaning is disabled!");
+    }
+  }
+
+  // Method to set the scan option (true means scan, false means don't scan but use statfs!)
+  void SetScan(bool sc) {
+    scan = sc;
+  }
+
+private:
+  // Private methods
+  time_t getLastAccessTime(const fs::path &filePath);
+  long long getDirectorySize(const fs::path &directory, bool scan = true);
+  std::vector<std::pair<long long, fs::path>>
+  getFilesByAccessTime(const fs::path &directory);
+  void cleanDirectory(const fs::path &directory, long long highWatermark,
+                      long long lowWatermark);
+
+  // Configuration variables
+  double lowWatermark;
+  double highWatermark;
+  fs::path subtree;
+  std::atomic<bool> scan;
+  size_t interval;
+
+  // Thread-related variables
+  std::thread cleanerThread;
+  std::atomic<bool> stopFlag;
+
+  // XRootD logger
+  XrdCl::Log *mLog;
+
+  // The method that runs in a loop, calling the clean method every interval
+  // seconds
+  void cleanLoop() {
+
+    while (!stopFlag.load()) {
+      auto start = std::chrono::steady_clock::now();
+
+      cleanDirectory(subtree, highWatermark, lowWatermark);
+
+      auto end = std::chrono::steady_clock::now();
+      auto duration =
+          std::chrono::duration_cast<std::chrono::seconds>(end - start);
+
+      if ( (size_t)duration.count() < interval) {
+        auto s = std::chrono::seconds(interval) - duration;
+        std::this_thread::sleep_for(s);
+      }
+    }
+  }
+
+  // The clean method to be called periodically
+  void clean() {}
+};
+} // namespace JCache
diff --git a/src/XrdApps/XrdClJCachePlugin/file/CacheStats.hh b/src/XrdApps/XrdClJCachePlugin/file/CacheStats.hh
index b1d9164890a..49279ed3a28 100644
--- a/src/XrdApps/XrdClJCachePlugin/file/CacheStats.hh
+++ b/src/XrdApps/XrdClJCachePlugin/file/CacheStats.hh
@@ -83,6 +83,7 @@ struct CacheStats {
       JCache::Art art;
       if (XrdCl::JCacheFile::sEnableSummary) {
         std::cerr << "# IO Timeprofile " << std::endl;
+        std::cerr << "# --------------" << std::endl;
         art.drawCurve(bins,
                       XrdCl::JCacheFile::sStats.bench.GetTimePerBin().count() /
                           1000000.0,
diff --git a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc
index 0d7250cd9bf..d11f5725187 100644
--- a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc
+++ b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc
@@ -34,7 +34,7 @@ bool XrdCl::JCacheFile::sEnableJournalCache = true;
 bool XrdCl::JCacheFile::sEnableVectorCache = false;
 bool XrdCl::JCacheFile::sEnableSummary = true;
 JCache::CacheStats XrdCl::JCacheFile::sStats(true);
-
+JCache::Cleaner XrdCl::JCacheFile::sCleaner;
 JournalManager XrdCl::JCacheFile::sJournalManager;
 
 namespace XrdCl {
diff --git a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh
index 343a1e89063..c807cde6c88 100644
--- a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh
+++ b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh
@@ -29,6 +29,7 @@
 #include "XrdCl/XrdClPlugInInterface.hh"
 /*----------------------------------------------------------------------------*/
 #include "cache/Journal.hh"
+#include "cleaner/Cleaner.hh"
 #include "file/Art.hh"
 #include "file/TimeBench.hh"
 #include "handler/XrdClJCachePgReadHandler.hh"
@@ -220,6 +221,7 @@ public:
   static void SetVector(const bool &value) { sEnableVectorCache = value; }
   static void SetJsonPath(const std::string &path) { sJsonPath = path; }
   static void SetSummary(const bool &value) { sEnableSummary = value; }
+  static void SetSize(uint64_t size) { sCleaner.SetSize(size,sCachePath);}
 
   //----------------------------------------------------------------------------
   //! @brief static members pointing to cache settings
@@ -239,6 +241,9 @@ public:
   //! @brief global plugin cache hit statistics
   static JCache::CacheStats sStats;
 
+  //! @brief cleaner instance 
+  static JCache::Cleaner sCleaner;
+
 private:
   //! @brief attach for read
   bool AttachForRead();
diff --git a/src/XrdApps/XrdClJCachePlugin/handler/XrdClJCachePgReadHandler.hh b/src/XrdApps/XrdClJCachePlugin/handler/XrdClJCachePgReadHandler.hh
index dd8060e7cf9..7c4fb3bedda 100644
--- a/src/XrdApps/XrdClJCachePlugin/handler/XrdClJCachePgReadHandler.hh
+++ b/src/XrdApps/XrdClJCachePlugin/handler/XrdClJCachePgReadHandler.hh
@@ -37,7 +37,7 @@ class JCachePgReadHandler : public XrdCl::ResponseHandler
 public:
   JCachePgReadHandler() {}
 
-  JCachePgReadHandler(JCacheReadHandler *other) {
+  JCachePgReadHandler(JCachePgReadHandler *other) {
     rbytes = other->rbytes;
     journal = other->journal;
   }
diff --git a/src/XrdApps/XrdClJCachePlugin/plugin/XrdClJCachePlugin.hh b/src/XrdApps/XrdClJCachePlugin/plugin/XrdClJCachePlugin.hh
index 24775690f95..d0ce85a0ce6 100644
--- a/src/XrdApps/XrdClJCachePlugin/plugin/XrdClJCachePlugin.hh
+++ b/src/XrdApps/XrdClJCachePlugin/plugin/XrdClJCachePlugin.hh
@@ -49,14 +49,19 @@ public:
     if (config) {
       auto itc = config->find("cache");
       JCacheFile::SetCache(itc != config->end() ? itc->second : "");
+
+      auto itsz = config->find("size");
+      JCacheFile::SetSize(itsz != config->end() ? std::stoll(std::string(itsz->second),0,10) : 0);
+
       auto itv = config->find("vector");
       JCacheFile::SetVector(itv != config->end() ? itv->second == "true"
                                                  : false);
       auto itj = config->find("journal");
-      JCacheFile::SetJournal(itj != config->end() ? itj->second == "true"
-                                                  : false);
+      JCacheFile::SetJournal(itj != config->end() ? itj->second == "false"
+                                                  : true);
       auto itjson = config->find("json");
       JCacheFile::SetJsonPath(itjson != config->end() ? itjson->second : "./");
+
       auto its = config->find("summary");
       JCacheFile::SetSummary(its != config->end() ? its->second != "false"
                                                   : true);
@@ -65,6 +70,10 @@ public:
         JCacheFile::SetCache((std::string(v).length()) ? std::string(v) : "");
       }
 
+      if (const char *v = getenv("XRD_JCACHE_SIZE")) {
+        JCacheFile::SetSize((std::string(v).length()) ? std::stoll(std::string(v),0,10) : 0);
+      }
+
       if (const char *v = getenv("XRD_JCACHE_SUMMARY")) {
         JCacheFile::SetSummary((std::string(v) == "true") ? true : false);
       }