From cdb0b9fcf66fc36fa0f4cb5959129c1ba4fb0681 Mon Sep 17 00:00:00 2001 From: Andreas Joachim Peters Date: Thu, 6 Jun 2024 16:50:46 +0200 Subject: [PATCH] XrdApps:JCache : add python and C++ cache cleaner execs - add journal cache implementation to cache simple reads - add option to enable journal and readv caching on demand --- src/XrdApps/XrdClJCachePlugin/CMakeLists.txt | 37 - src/XrdApps/XrdClJCachePlugin/CacheCleaner.py | 90 ++ .../XrdClJCachePlugin/XrdClCacheCleaner.cc | 129 +++ .../XrdClJCachePlugin/XrdClJCacheFile.cc | 168 +++- .../XrdClJCachePlugin/XrdClJCacheFile.hh | 42 +- .../XrdClJCachePlugin/XrdClJCachePlugin.hh | 12 +- .../XrdClJCachePlugin/XrdClVectorCache.cc | 226 +++++ .../XrdClJCachePlugin/XrdClVectorCache.hh | 64 ++ .../XrdClJCachePlugin/cache/IntervalTree.hh | 306 +++++++ .../XrdClJCachePlugin/cache/Journal.cc | 470 ++++++++++ .../XrdClJCachePlugin/cache/Journal.hh | 130 +++ src/XrdApps/XrdClJCachePlugin/cache/RbTree.hh | 809 ++++++++++++++++++ 12 files changed, 2430 insertions(+), 53 deletions(-) delete mode 100644 src/XrdApps/XrdClJCachePlugin/CMakeLists.txt create mode 100644 src/XrdApps/XrdClJCachePlugin/CacheCleaner.py create mode 100644 src/XrdApps/XrdClJCachePlugin/XrdClCacheCleaner.cc create mode 100644 src/XrdApps/XrdClJCachePlugin/XrdClVectorCache.cc create mode 100644 src/XrdApps/XrdClJCachePlugin/XrdClVectorCache.hh create mode 100644 src/XrdApps/XrdClJCachePlugin/cache/IntervalTree.hh create mode 100644 src/XrdApps/XrdClJCachePlugin/cache/Journal.cc create mode 100644 src/XrdApps/XrdClJCachePlugin/cache/Journal.hh create mode 100644 src/XrdApps/XrdClJCachePlugin/cache/RbTree.hh diff --git a/src/XrdApps/XrdClJCachePlugin/CMakeLists.txt b/src/XrdApps/XrdClJCachePlugin/CMakeLists.txt deleted file mode 100644 index 5c3a1e52302..00000000000 --- a/src/XrdApps/XrdClJCachePlugin/CMakeLists.txt +++ /dev/null @@ -1,37 +0,0 @@ -//------------------------------------------------------------------------------ -// Copyright (c) 2024 by European Organization for Nuclear Research (CERN) -// Author: Andreas-Joachim Peters -//------------------------------------------------------------------------------ -// This file is part of the XRootD software suite. -// -// XRootD is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// XRootD is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with XRootD. If not, see . -// -// In applying this licence, CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -add_library(XrdClJCacheClient MODULE - XrdCLJCachePlugin.cc XrdClJCachePlugin.hh - XrdClJCacheFile.cc XrdClJCacheFile.hh) - -set_target_properties(XrdClJCacheClient PROPERTIES - VERSION ${VERSION} - SOVERSION ${VERSION_MAJOR} - MACOSX_RPATH TRUE) - -install(TARGETS XrdClJCacheClient - LIBRARY DESTINATION ${CMAKE_INSTALL_FULL_LIBDIR} - RUNTIME DESTINATION ${CMAKE_INSTALL_FULL_BINDIR} - ARCHIVE DESTINATION ${CMAKE_INSTALL_FULL_LIBDIR}) - diff --git a/src/XrdApps/XrdClJCachePlugin/CacheCleaner.py b/src/XrdApps/XrdClJCachePlugin/CacheCleaner.py new file mode 100644 index 00000000000..026bbc23f91 --- /dev/null +++ b/src/XrdApps/XrdClJCachePlugin/CacheCleaner.py @@ -0,0 +1,90 @@ +#/usr/bin/pythyon3 +##------------------------------------------------------------------------------ +## Copyright (c) 2024 by European Organization for Nuclear Research (CERN) +## Author: Andreas-Joachim Peters +##------------------------------------------------------------------------------ +## This file is part of the XRootD software suite. +## +## XRootD is free software: you can redistribute it and/or modify +## it under the terms of the GNU Lesser General Public License as published by +## the Free Software Foundation, either version 3 of the License, or +## (at your option) any later version. +## +## XRootD is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU Lesser General Public License +## along with XRootD. If not, see . +## +## In applying this licence, CERN does not waive the privileges and immunities +## granted to it by virtue of its status as an Intergovernmental Organization +## or submit itself to any jurisdiction. + +import os +import time +import argparse + +def get_directory_size(directory): + """Calculate the total size of all files in the directory subtree.""" + total_size = 0 + for dirpath, dirnames, filenames in os.walk(directory): + for f in filenames: + fp = os.path.join(dirpath, f) + if os.path.isfile(fp): + total_size += os.path.getsize(fp) + return total_size + +def get_files_by_access_time(directory): + """Get a list of files sorted by their access time (oldest first).""" + file_list = [] + for dirpath, dirnames, filenames in os.walk(directory): + for f in filenames: + fp = os.path.join(dirpath, f) + if os.path.isfile(fp): + access_time = os.path.getatime(fp) + file_list.append((access_time, fp)) + file_list.sort() # Sort by access time (oldest first) + return file_list + +def clean_directory(directory, high_watermark, low_watermark): + """Clean the directory by deleting files until the size is below the low watermark.""" + current_size = get_directory_size(directory) + if current_size <= high_watermark: + print("Directory size is within the limit. No action needed.") + return + + files = get_files_by_access_time(directory) + + for access_time, file_path in files: + if current_size <= low_watermark: + break + file_size = os.path.getsize(file_path) + try: + os.remove(file_path) + current_size -= file_size + print(f"Deleted: {file_path} (Size: {file_size} bytes)") + except Exception as e: + print(f"Error deleting {file_path}: {e}") + +def main(): + parser = argparse.ArgumentParser(description="Directory size monitor and cleaner.") + parser.add_argument("directory", type=str, help="Directory to monitor and clean.") + parser.add_argument("highwatermark", type=int, help="High watermark in bytes.") + parser.add_argument("lowwatermark", type=int, help="Low watermark in bytes.") + parser.add_argument("interval", type=int, help="Interval time in seconds between checks.") + + args = parser.parse_args() + + directory = args.directory + high_watermark = args.highwatermark + low_watermark = args.lowwatermark + interval = args.interval + + while True: + clean_directory(directory, high_watermark, low_watermark) + time.sleep(interval) + +if __name__ == "__main__": + main() diff --git a/src/XrdApps/XrdClJCachePlugin/XrdClCacheCleaner.cc b/src/XrdApps/XrdClJCachePlugin/XrdClCacheCleaner.cc new file mode 100644 index 00000000000..f33c5a82fac --- /dev/null +++ b/src/XrdApps/XrdClJCachePlugin/XrdClCacheCleaner.cc @@ -0,0 +1,129 @@ +//------------------------------------------------------------------------------ +// Copyright (c) 2024 by European Organization for Nuclear Research (CERN) +// Author: Andreas-Joachim Peters +//------------------------------------------------------------------------------ +// This file is part of the XRootD software suite. +// +// XRootD is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// XRootD is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with XRootD. If not, see . +// +// In applying this licence, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace fs = std::filesystem; + +void printCurrentTime() { + auto now = std::chrono::system_clock::now(); + auto now_ns = std::chrono::time_point_cast(now); + auto now_ns_since_epoch = now_ns.time_since_epoch(); + auto seconds = std::chrono::duration_cast(now_ns_since_epoch); + auto nanoseconds = std::chrono::duration_cast(now_ns_since_epoch - seconds); + + auto now_t = std::chrono::system_clock::to_time_t(now); + struct tm* tm = std::localtime(&now_t); + + int year = tm->tm_year - 100; // tm_year represents years since 1900 + + std::cout << std::setfill('0') << std::setw(2) << year << std::setfill('0') << std::setw(4) << (tm->tm_mon + 1) * 100 + tm->tm_mday << " "; + std::cout << std::setw(2) << std::setfill('0') << tm->tm_hour << ":" << std::setw(2) << tm->tm_min << ":" << std::setw(2) << tm->tm_sec << " "; + std::cout << "time=" << seconds.count() << "." << std::setw(9) << std::setfill('0') << nanoseconds.count() << " "; +} + +time_t getLastAccessTime(const fs::path& filePath) { + struct stat fileInfo; + if (stat(filePath.c_str(), &fileInfo) != 0) { + return -1; // Error occurred + } + return fileInfo.st_atime; +} + +long long getDirectorySize(const fs::path& directory) { + long long totalSize = 0; + for (const auto& entry : fs::recursive_directory_iterator(directory)) { + if (fs::is_regular_file(entry)) { + totalSize += fs::file_size(entry); + } + } + return totalSize; +} + +std::vector> getFilesByAccessTime(const fs::path& directory) { + std::vector> fileList; + for (const auto& entry : fs::recursive_directory_iterator(directory)) { + if (fs::is_regular_file(entry)) { + auto accessTime = getLastAccessTime(entry.path()); + fileList.emplace_back(accessTime, entry.path()); + } + } + std::sort(fileList.begin(), fileList.end()); + return fileList; +} + +void cleanDirectory(const fs::path& directory, long long highWatermark, long long lowWatermark) { + long long currentSize = getDirectorySize(directory); + if (currentSize <= highWatermark) { + printCurrentTime(); + std::cout << "Directory size is within the limit. No action needed." << std::endl; + return; + } + + auto files = getFilesByAccessTime(directory); + + for (const auto& [accessTime, filePath] : files) { + if (currentSize <= lowWatermark) { + break; + } + long long fileSize = fs::file_size(filePath); + try { + fs::remove(filePath); + currentSize -= fileSize; + printCurrentTime(); + std::cout << "Deleted: " << filePath << " (Size: " << fileSize << " bytes)" << std::endl; + } catch (const std::exception& e) { + printCurrentTime(); + std::cerr << "Error deleting " << filePath << ": " << e.what() << std::endl; + } + } +} + +int main(int argc, char* argv[]) { + if (argc != 5) { + std::cerr << "Usage: " << argv[0] << " " << std::endl; + return 1; + } + + fs::path directory = argv[1]; + long long highWatermark = std::stoll(argv[2]); + long long lowWatermark = std::stoll(argv[3]); + int interval = std::stoi(argv[4]); + + while (true) { + cleanDirectory(directory, highWatermark, lowWatermark); + std::this_thread::sleep_for(std::chrono::seconds(interval)); + } + + return 0; +} diff --git a/src/XrdApps/XrdClJCachePlugin/XrdClJCacheFile.cc b/src/XrdApps/XrdClJCachePlugin/XrdClJCacheFile.cc index 779a1c122b0..e7b5778136d 100644 --- a/src/XrdApps/XrdClJCachePlugin/XrdClJCacheFile.cc +++ b/src/XrdApps/XrdClJCachePlugin/XrdClJCacheFile.cc @@ -24,8 +24,12 @@ /*----------------------------------------------------------------------------*/ #include "XrdClJCacheFile.hh" /*----------------------------------------------------------------------------*/ +#include "XrdCl/XrdClMessageUtils.hh" +/*----------------------------------------------------------------------------*/ std::string JCacheFile::sCachePath=""; +bool JCacheFile::sEnableJournalCache = true; +bool JCacheFile::sEnableVectorCache = true; //------------------------------------------------------------------------------ // Constructor @@ -34,6 +38,7 @@ JCacheFile::JCacheFile(): mIsOpen(false), pFile(0) { + mAttachedForRead = false; } @@ -60,24 +65,31 @@ JCacheFile::Open(const std::string& url, uint16_t timeout) { XRootDStatus st; + mFlags = flags; if (mIsOpen) { st = XRootDStatus(stError, errInvalidOp); + std::cerr << "error: file is already opened: " << pUrl << std::endl; return st; } pFile = new XrdCl::File(false); + pUrl = url; st = pFile->Open(url, flags, mode, handler, timeout); if (st.IsOK()) { mIsOpen = true; + if ((flags & OpenFlags::Flags::Read) == OpenFlags::Flags::Read) { + std::string JournalDir = sCachePath + "/" + VectorCache::computeSHA256(pUrl); + pJournalPath = JournalDir + "/journal"; + // it can be that we cannot write the journal directory + if (!VectorCache::ensureLastSubdirectoryExists(JournalDir)) { + st = XRootDStatus(stError, errOSError); + std::cerr << "error: unable to create cache directory: " << JournalDir << std::endl; + return st; + } + } } - - - if ((flags & OpenFlags::Flags::Read) == OpenFlags::Flags::Read) { - // attach to a cache - } - return st; } @@ -93,15 +105,17 @@ JCacheFile::Close(ResponseHandler* handler, if (mIsOpen) { mIsOpen = false; - + pUrl = ""; if (pFile) { st = pFile->Close(handler, timeout); + } else { + st = XRootDStatus(stOK, 0); + } + if (sEnableJournalCache) { + pJournal.detach(); } } else { - // File already closed - st = XRootDStatus(stError, errInvalidOp); - XRootDStatus* ret_st = new XRootDStatus(st); - handler->HandleResponse(ret_st, 0); + st = XRootDStatus(stOK, 0); } return st; @@ -141,9 +155,27 @@ JCacheFile::Read(uint64_t offset, XRootDStatus st; if (pFile) { + if (sEnableJournalCache && AttachForRead()) { + auto rb = pJournal.pread(buffer, size, offset); + if (rb == size) { + // we can only serve success full reads from the cache for now + XRootDStatus* ret_st = new XRootDStatus(st); + ChunkInfo* chunkInfo = new ChunkInfo(offset, rb, buffer); + AnyObject* obj = new AnyObject(); + obj->Set(chunkInfo); + handler->HandleResponse(ret_st, obj); + st = XRootDStatus(stOK, 0); + return st; + } + } st = pFile->Read(offset, size, buffer, handler, timeout); + if (st.IsOK()) { + if (sEnableJournalCache) { + pJournal.pwrite(buffer, size, offset); + } + } } else { - st = XRootDStatus(stError, errInvalidOp); + st = XRootDStatus(stError, errInvalidOp); } return st; } @@ -170,6 +202,81 @@ JCacheFile::Write(uint64_t offset, return st; } +//------------------------------------------------------------------------ +//! PgRead +//------------------------------------------------------------------------ +XRootDStatus +JCacheFile::PgRead( uint64_t offset, + uint32_t size, + void *buffer, + ResponseHandler *handler, + uint16_t timeout ) +{ + XRootDStatus st; + + if (pFile) { + if (sEnableJournalCache && AttachForRead()) { + auto rb = pJournal.pread(buffer, size, offset); + if (rb == size) { + // we can only serve success full reads from the cache for now + XRootDStatus* ret_st = new XRootDStatus(st); + ChunkInfo* chunkInfo = new ChunkInfo(offset, rb, buffer); + AnyObject* obj = new AnyObject(); + obj->Set(chunkInfo); + handler->HandleResponse(ret_st, obj); + st = XRootDStatus(stOK, 0); + return st; + } + } + + std::vector cksums; + uint32_t bytesRead = 0; + + // run a synchronous read + st = pFile->PgRead(offset, size, buffer, cksums, bytesRead, timeout); + if (st.IsOK()) { + if (sEnableJournalCache) { + if (bytesRead) { + // store into journal + pJournal.pwrite(buffer, size, offset); + } + } + // emit a chunk + XRootDStatus* ret_st = new XRootDStatus(st); + ChunkInfo* chunkInfo = new ChunkInfo(offset, bytesRead, buffer); + AnyObject* obj = new AnyObject(); + obj->Set(chunkInfo); + handler->HandleResponse(ret_st, obj); + st = XRootDStatus(stOK, 0); + } + } else { + st = XRootDStatus(stError, errInvalidOp); + } + return st; +} + + +//------------------------------------------------------------------------ +//! PgWrite +//------------------------------------------------------------------------ +XRootDStatus +JCacheFile::PgWrite( uint64_t offset, + uint32_t nbpgs, + const void *buffer, + std::vector &cksums, + ResponseHandler *handler, + uint16_t timeout ) +{ + XRootDStatus st; + + if (pFile) { + st = pFile->PgWrite(offset, nbpgs, buffer, cksums, handler, timeout); + } else { + st = XRootDStatus(stError, errInvalidOp); + } + + return st; +} //------------------------------------------------------------------------------ // Sync @@ -222,7 +329,19 @@ JCacheFile::VectorRead(const ChunkList& chunks, XRootDStatus st; if (pFile) { + VectorCache cache(chunks, pUrl, (const char*)buffer, sCachePath); + if (sEnableVectorCache) { + if (cache.retrieve()) { + handler->HandleResponse(&st, 0); + return st; + } + } + st = pFile->VectorRead(chunks, buffer, handler, timeout); + + if (st.IsOK() && sEnableVectorCache) { + cache.store(); + } } else { st = XRootDStatus(stError, errInvalidOp); } @@ -309,3 +428,28 @@ JCacheFile::GetProperty(const std::string& name, } } +bool +JCacheFile::AttachForRead() +{ + std::lock_guard guard(mAttachMutex); + if (mAttachedForRead) { + return true; + } + if ((mFlags & OpenFlags::Flags::Read) == OpenFlags::Flags::Read) { + // attach to a cache + if (sEnableJournalCache && pFile) { + StatInfo* sinfo = 0; + auto st = pFile->Stat(false, sinfo); + if (sinfo) { + if (pJournal.attach(pJournalPath,sinfo->GetSize(),sinfo->GetModTime(),0)) { + std::cerr << "error: unable to attach to journal: " << pJournalPath << std::endl; + mAttachedForRead = true; + return false; + } + } + } + } + mAttachedForRead = true; + return true; +} + diff --git a/src/XrdApps/XrdClJCachePlugin/XrdClJCacheFile.hh b/src/XrdApps/XrdClJCachePlugin/XrdClJCacheFile.hh index ea4c7ef437c..c553e48768b 100644 --- a/src/XrdApps/XrdClJCachePlugin/XrdClJCacheFile.hh +++ b/src/XrdApps/XrdClJCachePlugin/XrdClJCacheFile.hh @@ -25,8 +25,11 @@ /*----------------------------------------------------------------------------*/ #include "XrdCl/XrdClPlugInInterface.hh" +#include "cache/Journal.hh" +#include "XrdClVectorCache.hh" +/*----------------------------------------------------------------------------*/ +#include /*----------------------------------------------------------------------------*/ - using namespace XrdCl; //---------------------------------------------------------------------------- @@ -116,6 +119,25 @@ public: ResponseHandler* handler, uint16_t timeout); + //------------------------------------------------------------------------ + //! PgRead + //------------------------------------------------------------------------ + virtual XRootDStatus PgRead( uint64_t offset, + uint32_t size, + void *buffer, + ResponseHandler *handler, + uint16_t timeout ) override; + + //------------------------------------------------------------------------ + //! PgWrite + //------------------------------------------------------------------------ + virtual XRootDStatus PgWrite( uint64_t offset, + uint32_t nbpgs, + const void *buffer, + std::vector &cksums, + ResponseHandler *handler, + uint16_t timeout ) override; + //------------------------------------------------------------------------ //! Fcntl @@ -165,10 +187,26 @@ public: //---------------------------------------------------------------------------- static void SetCache(const std::string& path) { sCachePath = path; } - + static void SetJournal(const bool& value) { sEnableJournalCache = value; } + static void SetVector(const bool& value) { sEnableVectorCache = value; } + + //---------------------------------------------------------------------------- + //! get the local cache path + //---------------------------------------------------------------------------- static std::string sCachePath; + static bool sEnableVectorCache; + static bool sEnableJournalCache; private: + + bool AttachForRead(); + + std::atomic mAttachedForRead; + std::mutex mAttachMutex; + OpenFlags::Flags mFlags; bool mIsOpen; XrdCl::File* pFile; + std::string pUrl; + Journal pJournal; + std::string pJournalPath; }; diff --git a/src/XrdApps/XrdClJCachePlugin/XrdClJCachePlugin.hh b/src/XrdApps/XrdClJCachePlugin/XrdClJCachePlugin.hh index aaf80e8a492..4bffbcdde8d 100644 --- a/src/XrdApps/XrdClJCachePlugin/XrdClJCachePlugin.hh +++ b/src/XrdApps/XrdClJCachePlugin/XrdClJCachePlugin.hh @@ -44,8 +44,16 @@ class JCacheFactory : public PlugInFactory { if( config ) { - auto itr = config->find( "cache" ); - JCacheFile::SetCache( itr != config->end() ? itr->second : "" ); + auto itc = config->find( "cache" ); + JCacheFile::SetCache( itc != config->end() ? itc->second : "" ); + auto itv = config->find( "vector" ); + JCacheFile::SetVector( itv != config->end() ? itv->second == "true": false ); + auto itj = config->find( "journal" ); + JCacheFile::SetJournal( itj != config->end() ? itj->second == "true": false ); + Log* log = DefaultEnv::GetLog(); + log->Info(1, "JCache : cache directory: %s", JCacheFile::sCachePath.c_str()); + log->Info(1, "JCache : caching readv in vector cache : %s", JCacheFile::sEnableVectorCache ? "true" : "false"); + log->Info(1, "JCache : caching reads in journal cache: %s", JCacheFile::sEnableJournalCache ? "true" : "false"); } } diff --git a/src/XrdApps/XrdClJCachePlugin/XrdClVectorCache.cc b/src/XrdApps/XrdClJCachePlugin/XrdClVectorCache.cc new file mode 100644 index 00000000000..9d26a138e56 --- /dev/null +++ b/src/XrdApps/XrdClJCachePlugin/XrdClVectorCache.cc @@ -0,0 +1,226 @@ +//------------------------------------------------------------------------------ +// Copyright (c) 2024 by European Organization for Nuclear Research (CERN) +// Author: Andreas-Joachim Peters +//------------------------------------------------------------------------------ +// This file is part of the XRootD software suite. +// +// XRootD is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// XRootD is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with XRootD. If not, see . +// +// In applying this licence, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/*----------------------------------------------------------------------------*/ +#include "XrdClVectorCache.hh" +#include +/*----------------------------------------------------------------------------*/ + +namespace fs = std::filesystem; + +//---------------------------------------------------------------------------- +//! serialize a vector into a buffer +//---------------------------------------------------------------------------- +std::vector VectorCache::serializeVector() const { + std::vector serializedData; + for (const auto& i : chunks) { + uint64_t o = i.GetOffset(); + uint64_t n = i.GetLength(); + unsigned char buffer[sizeof(uint64_t) + sizeof(size_t)]; + std::memcpy(buffer, &o, sizeof(uint64_t)); + std::memcpy(buffer + sizeof(uint64_t), &n, sizeof(size_t)); + serializedData.insert(serializedData.end(), buffer, buffer + sizeof(buffer)); + } + return serializedData; +} + +//---------------------------------------------------------------------------- +//! compute SHA256 signature for a given vector read +//---------------------------------------------------------------------------- +std::string VectorCache::computeSHA256(const std::vector& data) { + unsigned char hash[SHA256_DIGEST_LENGTH]; + SHA256_CTX sha256; + SHA256_Init(&sha256); + SHA256_Update(&sha256, data.data(), data.size()); + SHA256_Final(hash, &sha256); + + std::stringstream ss; + for (int i = 0; i < SHA256_DIGEST_LENGTH; ++i) { + ss << std::hex << std::setw(2) << std::setfill('0') << (int)hash[i]; + } + return ss.str(); +} + +//---------------------------------------------------------------------------- +//! compute SHA256 signature for a string +//---------------------------------------------------------------------------- +std::string VectorCache::computeSHA256(const std::string& data) { + unsigned char hash[SHA256_DIGEST_LENGTH]; + SHA256_CTX sha256; + SHA256_Init(&sha256); + SHA256_Update(&sha256, data.c_str(), data.size()); + SHA256_Final(hash, &sha256); + + std::stringstream ss; + for (int i = 0; i < SHA256_DIGEST_LENGTH; ++i) { + ss << std::hex << std::setw(2) << std::setfill('0') << (int)hash[i]; + } + return ss.str(); +} + +//---------------------------------------------------------------------------- +//! compute SHA256 for vector read and name +//---------------------------------------------------------------------------- +std::pair VectorCache::computeHash() const { + std::vector serializedData = serializeVector(); + std::string vectorHash = computeSHA256(serializedData); + std::string nameHash = computeSHA256(name); + return {vectorHash, nameHash}; +} + +//---------------------------------------------------------------------------- +//! ensure that the last subdirectory directory exists +//---------------------------------------------------------------------------- +bool VectorCache::ensureLastSubdirectoryExists(const std::string& dirName) { + fs::path dirPath(dirName); + + if (fs::exists(dirPath) && fs::is_directory(dirPath)) { + return true; + } + + // Extract the parent path + fs::path parentPath = dirPath.parent_path(); + + if (!fs::exists(parentPath)) { + std::cerr << "error: parent directory does not exist. Cannot create subdirectory.\n"; + return false; + } + + if (fs::create_directory(dirPath)) { + return true; + } else { + std::cerr << "error: failed to create subdirectory.\n"; + return false; + } + + return false; +} + +//---------------------------------------------------------------------------- +//! store a vector read in the cache +//---------------------------------------------------------------------------- +bool VectorCache::store() const { + // Compute hashes + auto [vectorHash, nameHash] = computeHash(); + + // Compute the total expected length from the input vector + size_t expectedLen = 0; + for (const auto& chunk : chunks) { + expectedLen += chunk.GetLength(); + } + + // Try to have a cache toplevel directory + if (!ensureLastSubdirectoryExists(prefix)) { + return false; + } + + // Generate the dir name using the prefix and the hash of the name + std::string dirName = prefix + nameHash; + std::string fileName = dirName + "/" + vectorHash; + std::string tmpName = fileName + ".tmp"; + + // Try to have a cache subdirectory for this file + if (!ensureLastSubdirectoryExists(dirName)) { + return false; + } + + // Open the file for writing (binary mode + // Write specified segments of data to the file + std::ofstream outFile(tmpName, std::ios::binary); + if (outFile.is_open()) { + std::error_code ec; + outFile.write(data, expectedLen); + if (outFile.fail()) { + std::cerr << "error: failed writing to file: " << tmpName << std::endl; + outFile.close(); + fs::remove(tmpName, ec); + if (ec) { + std::cerr << "error: failed cleanup of temporary file: " << tmpName << std::endl; + } + return false; + } + outFile.close(); + fs::rename(tmpName, fileName, ec); + if (ec) { + outFile.close(); + std::cerr << "error: failed atomic rename to file: " << fileName << std::endl; + fs::remove(tmpName, ec); + if (ec) { + std::cerr << "error: failed cleanup of temporary file: " << tmpName << std::endl; + } + return false; + } + return true; + } else { + std::cerr << "error: failed to open file: " << tmpName << std::endl; + return false; + } +} + +//---------------------------------------------------------------------------- +//! retrieve a vector read from the cache +//---------------------------------------------------------------------------- +bool VectorCache::retrieve() const { + // Compute the total expected length from the input vector + size_t expectedLen = 0; + for (const auto& chunk : chunks) { + expectedLen += chunk.GetLength(); + } + + // Compute hashes + auto [vectorHash, nameHash] = computeHash(); + + // Generate the dir name using the prefix and the hash of the name + std::string dirName = prefix + nameHash; + std::string fileName = dirName + "/" + vectorHash; + + // Check if the file exists + struct stat fileInfo; + if (stat(fileName.c_str(), &fileInfo) != 0) { + if (verbose) { + std::cerr << "error: file does not exist: " << fileName << std::endl; + } + return false; + } + + // Check if the file size matches the expected length + if ((size_t)fileInfo.st_size != expectedLen) { + if (verbose) { + std::cerr << "error: file size mismatch. Expected: " << expectedLen << ", Actual: " << fileInfo.st_size << std::endl; + } + return false; + } + + // Open the file for reading + std::ifstream inFile(fileName, std::ios::binary); + if (inFile.is_open()) { + inFile.read((char*)data, fileInfo.st_size); + inFile.close(); + return true; + } else { + if (verbose) { + std::cerr << "error: failed to open file: " << fileName << std::endl; + } + return false; + } +} diff --git a/src/XrdApps/XrdClJCachePlugin/XrdClVectorCache.hh b/src/XrdApps/XrdClJCachePlugin/XrdClVectorCache.hh new file mode 100644 index 00000000000..1c71e10f0c3 --- /dev/null +++ b/src/XrdApps/XrdClJCachePlugin/XrdClVectorCache.hh @@ -0,0 +1,64 @@ +//------------------------------------------------------------------------------ +// Copyright (c) 2024 by European Organization for Nuclear Research (CERN) +// Author: Andreas-Joachim Peters +//------------------------------------------------------------------------------ +// This file is part of the XRootD software suite. +// +// XRootD is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// XRootD is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with XRootD. If not, see . +// +// In applying this licence, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#pragma once + +/*----------------------------------------------------------------------------*/ +#include +#include +#include +#include // For SHA-256 +#include +#include +#include // For std::memcpy +#include +#include // For file operations +#include // For checking if file exists +/*----------------------------------------------------------------------------*/ +#include "XrdCl/XrdClXRootDResponses.hh" +//---------------------------------------------------------------------------- +//! VectorCache class caching readv buffers on a filesystem +//---------------------------------------------------------------------------- + +class VectorCache { +public: + VectorCache(const XrdCl::ChunkList chunks, const std::string& name, const char* data, const std::string& prefix, bool verbose=false) + : chunks(chunks), name(name), data(data), prefix(prefix), verbose(verbose) {} + + std::pair computeHash() const; + bool store() const; + bool retrieve() const; + + static std::string computeSHA256(const std::vector& data); + static std::string computeSHA256(const std::string& data); + static bool ensureLastSubdirectoryExists(const std::string& dirName); + +private: + XrdCl::ChunkList chunks; + std::string name; + const char* data; + std::string prefix; + bool verbose; + + std::vector serializeVector() const; +}; diff --git a/src/XrdApps/XrdClJCachePlugin/cache/IntervalTree.hh b/src/XrdApps/XrdClJCachePlugin/cache/IntervalTree.hh new file mode 100644 index 00000000000..aa9bc81f904 --- /dev/null +++ b/src/XrdApps/XrdClJCachePlugin/cache/IntervalTree.hh @@ -0,0 +1,306 @@ +//------------------------------------------------------------------------------ +// Copyright (c) 2024 by European Organization for Nuclear Research (CERN) +// Author: Andreas-Joachim Peters +// Michal Simon +//------------------------------------------------------------------------------ +// This file is part of the XRootD software suite. +// +// XRootD is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// XRootD is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with XRootD. If not, see . +// +// In applying this licence, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#pragma once + +/*----------------------------------------------------------------------------*/ +#include "RbTree.hh" +/*----------------------------------------------------------------------------*/ +#include +#include +#include +#include +#include +#include +/*----------------------------------------------------------------------------*/ + +template +class interval_node_t +{ +public: + + template friend class interval_tree; + template friend class rbtree; + friend class IntervalTreeTest; + +public: + + interval_node_t(I low, I high, const V& value) : + low(low), high(high), value(value), key(this->low), max(high), colour(RED), + parent(nullptr) { } + + const I low; + const I high; + V value; + +private: + const I& key; + I max; + colour_t colour; + interval_node_t* parent; + + std::unique_ptr left; + std::unique_ptr right; +}; + +template +class interval_tree : public rbtree< I, V, interval_node_t > +{ +private: + + typedef interval_node_t N; + + typedef typename rbtree::leaf_node_t leaf_node_t; + + std::unique_ptr make_node(I low, I high, const V& value) + { + return std::unique_ptr(new N(low, high, value)); + } + +public: + + typedef typename rbtree::iterator iterator; + + struct less { + + bool operator()(const iterator& x, const iterator& y) const + { + return x->low < y->low; + } + }; + + virtual ~interval_tree() { } + + void insert(I low, I high, const V& value) + { + insert_into(low, high, value, this->tree_root); + } + + void erase(I low, I high) + { + std::unique_ptr& node = this->find_in(low, this->tree_root); + + if (!node || node->low != low || node->high != high) { + return; + } + + this->erase_node(node); + } + + std::set query(I low, I high) + { + std::set result; + query(low, high, this->tree_root, result); + return result; + } + +private: + + using rbtree::insert; + using rbtree::erase; + using rbtree::find; + + static bool overlaps(I low, I high, const N* node) + { + int64_t s1 = low + high; + int64_t d1 = high - low; + int64_t s2 = node->low + node->high; + int64_t d2 = node->high - node->low; + return std::abs(s2 - s1) < d1 + d2; + } + + static void query(I low, I high, std::unique_ptr& node, + std::set& result) + { + // base case + if (!node) { + return; + } + + // the interval is to the right of the rightmost point of any interval + if (low > node->max) { + return; + } + + // check if the interval overlaps fully with current node + if (overlaps(low, high, node.get())) { + result.insert(iterator(node.get())); + } + + // check the left subtree + query(low, high, node->left, result); + + // Do we need to check the right subtree? + if (high > node->low) { + query(low, high, node->right, result); + } + } + + void insert_into(I low, I high, const V& value, std::unique_ptr& node, + N* parent = nullptr) + { + if (!node) { + node = make_node(low, high, value); + node->parent = parent; + ++this->tree_size; + update_max(node->parent, node->max); + this->rb_insert_case1(node.get()); + return; + } + + if (low == node->low) { + return; + } + + if (low < node->low) { + insert_into(low, high, value, node->left, node.get()); + } else { + insert_into(low, high, value, node->right, node.get()); + } + } + + void erase_node(std::unique_ptr& node) + { + if (!node) { + return; + } + + if (this->has_two(node.get())) { + // in this case: + // 1. look for the in-order successor + // 2. replace the node with the in-order successor + // 3. erase the in-order successor + N* n = node.get(); + std::unique_ptr& successor = this->find_successor(node); + this->swap_successor(node, successor); + + // we don't update max since in erase_node we + // will do it after removing respective node + // we swapped the node with successor and the + // 'successor' unique pointer holds now the node + if (successor.get() == n) { + erase_node(successor); + }// otherwise the successor was the right child of node, + // hence node should be now the right child of 'node' + // unoique pointer + else if (node->right.get() == n) { + erase_node(node->right); + }// there are no other cases so anything else is wrong + else { + throw std::logic_error("Bad rbtree swap."); + } + + return; + } + + // node has at most one child + // in this case simply replace the node with the + // single child or null if there are no children + N* parent = node->parent; + std::unique_ptr& child = node->left ? node->left : node->right; + colour_t old_colour = node->colour; + + if (child) { + child->parent = node->parent; + } + + node.reset(child.release()); + update_max(parent); + --this->tree_size; + + if (old_colour == BLACK) { + if (node && node->colour == RED) { + node->colour = BLACK; + } else { + // if we are here the node is null because a BLACK + // node that has at most one non-leaf child must + // have two null children (null children are BLACK) + if (node) { + throw rb_invariant_error(); + } + + this->rb_erase_case1(leaf_node_t(parent)); + } + } else if (node) + // if the node was red it has to have two BLACK children + // and since at most one of those children is a non-leaf + // child actually both have to be leafs (null) in order + // to satisfy the red-black tree invariant + { + throw rb_invariant_error(); + } + } + + void update_max(N* node, I new_high) + { + while (node) { + if (new_high > node->max) { + node->max = new_high; + node = node->parent; + } else { + break; + } + } + } + + void update_max(N* node) + { + while (node) { + set_max(node); + node = node->parent; + } + } + + void set_max(N* node) + { + if (!node->left && !node->right) { + node->max = node->high; + return; + } + + if (!node->left || !node->right) { + node->max = std::max(node->high, + (node->left ? node->left->max : node->right->max)); + return; + } + + node->max = std::max(node->high, std::max(node->left->max, node->right->max)); + } + + virtual void right_rotation(N* node) + { + N* pivot = node->left.get(); + rbtree::right_rotation(node); + set_max(node); // set first max for node since now it's lower in the tree + set_max(pivot); + } + + virtual void left_rotation(N* node) + { + N* pivot = node->right.get(); + rbtree::left_rotation(node); + set_max(node); // set first max for node since now it's lower in the tree + set_max(pivot); + } +}; diff --git a/src/XrdApps/XrdClJCachePlugin/cache/Journal.cc b/src/XrdApps/XrdClJCachePlugin/cache/Journal.cc new file mode 100644 index 00000000000..f3ebc6faaef --- /dev/null +++ b/src/XrdApps/XrdClJCachePlugin/cache/Journal.cc @@ -0,0 +1,470 @@ +//------------------------------------------------------------------------------ +// Copyright (c) 2024 by European Organization for Nuclear Research (CERN) +// Author: Andreas-Joachim Peters +// Michal Simon +//------------------------------------------------------------------------------ +// This file is part of the XRootD software suite. +// +// XRootD is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// XRootD is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with XRootD. If not, see . +// +// In applying this licence, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + + +/*----------------------------------------------------------------------------*/ +#include "Journal.hh" +/*----------------------------------------------------------------------------*/ +#include +#include +#include +#include +#include +#include +/*----------------------------------------------------------------------------*/ + +//------------------------------------------------------------------------------ +//! Journal Constructor +//------------------------------------------------------------------------------ +Journal::Journal() : cachesize(0), + max_offset(0), fd(-1) +{ + std::lock_guard guard(mtx); + jheader.magic = JOURNAL_MAGIC; + jheader.mtime = 0; + jheader.mtime_nsec = 0; + jheader.filesize = 0; + jheader.placeholder1 = 0; + jheader.placeholder2 = 0; + jheader.placeholder3 = 0; + jheader.placeholder4 = 0; +} +//------------------------------------------------------------------------------ +//! Journal Destructor +//------------------------------------------------------------------------------ +Journal::~Journal() +{ + std::lock_guard guard(mtx); + if (fd > 0) { + int rc = close(fd); + + if (rc) { + std::abort(); + } + + fd = -1; + } +} + +//------------------------------------------------------------------------------ +//! Routine to read a journal header +//------------------------------------------------------------------------------ +void Journal::read_jheader() +{ + jheader_t fheader; + auto hr = ::pread64(fd, &fheader, sizeof(jheader), 0); + if ( (hr>0) && + ( (hr != sizeof(jheader)) || + (fheader.magic != JOURNAL_MAGIC)) ) { + std::cerr << "warning: inconsistent journal header found (I) - purging path:" << path << std::endl; + reset(); + return; + } + if ( (fheader.mtime != jheader.mtime) + || (fheader.mtime_nsec != jheader.mtime_nsec) + || (fheader.filesize != jheader.filesize) ) { + std::cerr << "warning: remote file change detected - purging path:" << path << std::endl; + reset(); + return; + } + } + +//------------------------------------------------------------------------------ +//! Routine to write a journal header +//------------------------------------------------------------------------------ +int Journal::write_jheader() +{ + auto hw = ::pwrite64(fd, &jheader, sizeof(jheader), 0); + if ( (hw != sizeof(jheader)) ) { + std::cerr << "warning: failed to write journal header - purging path:" << path << std::endl; + return -errno; + } + return 0; + } + +//------------------------------------------------------------------------------ +//! Routine to read a journal +//------------------------------------------------------------------------------ +int Journal::read_journal() +{ + journal.clear(); + const size_t bufsize = sizeof(header_t); + char buffer[bufsize]; + ssize_t bytesRead = 0, totalBytesRead = sizeof(jheader_t); + int64_t pos = 0; + + ssize_t journalsize = lseek(fd, 0, SEEK_END); + + do { + bytesRead = ::pread(fd, buffer, bufsize, totalBytesRead); + if (bytesRead < (ssize_t)bufsize) { + if (bytesRead == 0 && (totalBytesRead==journalsize)) { + break; + } else { + std::cerr << "warning: inconsistent journal found - purging path:" << path << std::endl; + reset(); + return 0; + } + } + header_t* header = reinterpret_cast(buffer); + journal.insert(header->offset, header->offset + header->size, + totalBytesRead); + totalBytesRead += header->size; // size of the fragment + totalBytesRead += bytesRead; // size of the header + } while (pos < bytesRead); + + return totalBytesRead; +} + +//------------------------------------------------------------------------------ +//! Journal attach +//------------------------------------------------------------------------------ +int Journal::attach(const std::string& lpath, uint64_t mtime, uint64_t mtime_nsec, uint64_t size) +{ + std::lock_guard guard(mtx); + path = lpath; + jheader.mtime = mtime; + jheader.mtime_nsec = mtime_nsec; + jheader.filesize = size; + + if ((fd == -1)) { + // need to open the file + size_t tries = 0; + + do { + fd = open(path.c_str(), O_CREAT | O_RDWR, S_IRWXU); + + if (fd < 0) { + if (errno == ENOENT) { + tries++; + if (tries < 10) { + continue; + } else { + return -errno; + } + } + + return -errno; + } + + break; + } while (1); + + read_jheader(); // this might fail, no problem + cachesize = read_journal(); + if (write_jheader()) { + return -errno; + } + } + + return 0; +} + +//------------------------------------------------------------------------------ +//! Journal detach +//------------------------------------------------------------------------------ +int Journal::detach() +{ + std::lock_guard guard(mtx); + return 0; +} + +//------------------------------------------------------------------------------ +//! Journal unlink +//------------------------------------------------------------------------------ +int Journal::unlink() +{ + std::lock_guard guard(mtx); + struct stat buf; + int rc = stat(path.c_str(), &buf); + if (!rc) { + rc = ::unlink(path.c_str()); + } + + return rc; +} + +//------------------------------------------------------------------------------ +//! Journal pread +//------------------------------------------------------------------------------ +ssize_t Journal::pread(void* buf, size_t count, off_t offset) +{ + if (fd<0) { + return 0; + } + + std::lock_guard guard(mtx); + auto result = journal.query(offset, offset + count); + + // there is not a single interval that overlaps + if (result.empty()) { + return 0; + } + + char* buffer = reinterpret_cast(buf); + uint64_t off = offset; + uint64_t bytesRead = 0; + + + for (auto& itr : result) { + if (itr->low <= off && off < itr->high) { + // read from cache + uint64_t cacheoff = itr->value + sizeof(header_t) + (off - itr->low); + int64_t intervalsize = itr->high - off; + int64_t bytesLeft = count - bytesRead; + int64_t bufsize = intervalsize < bytesLeft ? intervalsize : bytesLeft; + ssize_t ret = ::pread(fd, buffer, bufsize, cacheoff); + if (ret < 0) { + return -1; + } + + bytesRead += ret; + off += ret; + buffer += ret; + + if (bytesRead >= count) { + break; + } + } + } + return bytesRead; +} + +//------------------------------------------------------------------------------ +//! Journal process intersection +//------------------------------------------------------------------------------ +void Journal::process_intersection(interval_tree& + to_write, interval_tree::iterator itr, + std::vector& updates) +{ + auto result = to_write.query(itr->low, itr->high); + + if (result.empty()) { + return; + } + + if (result.size() > 1) { + throw std::logic_error("Journal: overlapping journal entries"); + } + + const interval_tree::iterator to_wrt = *result.begin(); + // the intersection + uint64_t low = std::max(to_wrt->low, itr->low); + uint64_t high = std::min(to_wrt->high, itr->high); + // update + chunk_t update; + update.offset = offset_for_update(itr->value, low - itr->low); + update.size = high - low; + update.buff = static_cast(to_wrt->value) + (low - to_wrt->low); + updates.push_back(std::move(update)); + // update the 'to write' intervals + uint64_t wrtlow = to_wrt->low; + uint64_t wrthigh = to_wrt->high; + const void* wrtbuff = to_wrt->value; + to_write.erase(wrtlow, wrthigh); + + // the intersection overlaps with the given + // interval so there is nothing more to do + if (low == wrtlow && high == wrthigh) { + return; + } + + if (high < wrthigh) { + // the remaining right-hand-side interval + const char* buff = static_cast(wrtbuff) + (high - wrtlow); + to_write.insert(high, wrthigh, buff); + } + + if (low > wrtlow) { + // the remaining left-hand-side interval + to_write.insert(wrtlow, low, wrtbuff); + } +} + +//------------------------------------------------------------------------------ +//! Journal update +//------------------------------------------------------------------------------ +int Journal::update_cache(std::vector& updates) +{ + // make sure we are updating the cache in ascending order + std::sort(updates.begin(), updates.end()); + int rc = 0; + + for (auto& u : updates) { + rc = ::pwrite(fd, u.buff, u.size, + u.offset); // TODO is it safe to assume it will write it all + + if (rc <= 0) { + return errno; + } + } + + return 0; +} + +//------------------------------------------------------------------------------ +//! Journal pwrite +//------------------------------------------------------------------------------ +ssize_t Journal::pwrite(const void* buf, size_t count, off_t offset) +{ + if (fd<0) { + return 0; + } + + std::lock_guard guard(mtx); + if (count <= 0) { + return 0; + } + + interval_tree to_write; + std::vector updates; + to_write.insert(offset, offset + count, buf); + auto res = journal.query(offset, offset + count); + + for (auto itr : res) { + process_intersection(to_write, itr, updates); + } + + int rc = update_cache(updates); + + if (rc) { + return -1; + } + + interval_tree::iterator itr; + + for (itr = to_write.begin(); itr != to_write.end(); ++itr) { + uint64_t size = itr->high - itr->low; + header_t header; + header.offset = itr->low; + header.size = size; + iovec iov[2]; + iov[0].iov_base = &header; + iov[0].iov_len = sizeof(header_t); + iov[1].iov_base = const_cast(itr->value); + iov[1].iov_len = size; + + rc = ::pwrite(fd, iov[0].iov_base, iov[0].iov_len, cachesize); + rc += ::pwrite(fd, iov[1].iov_base, iov[1].iov_len, + cachesize + iov[0].iov_len); + + if (rc <= 0) { + return -1; + } + + journal.insert(itr->low, itr->high, cachesize); + cachesize += sizeof(header_t) + size; + } + + if ((ssize_t)(offset + count) > max_offset) { + max_offset = offset + count; + } + + return count; +} + +//------------------------------------------------------------------------------ +//! Journal data sync +//------------------------------------------------------------------------------ +int Journal::sync() +{ + if (fd<0) { + return -1; + } + return ::fdatasync(fd); +} + +//------------------------------------------------------------------------------ +//! Journal get size +//------------------------------------------------------------------------------ +size_t Journal::size() +{ + std::lock_guard guard(mtx); + return cachesize; +} + +//------------------------------------------------------------------------------ +//! Journal get max offset in the journal +//------------------------------------------------------------------------------ +off_t Journal::get_max_offset() +{ + std::lock_guard guard(mtx); + return max_offset; +} + +//------------------------------------------------------------------------------ +//! Journal reset +//------------------------------------------------------------------------------ +int Journal::reset() +{ + journal.clear(); + int retc=0; + if (fd>=0) { + retc = ftruncate(fd,0); + retc |= write_jheader(); + } + cachesize = 0; + max_offset = 0; + return retc; +} + +std::string Journal::dump() +{ + std::lock_guard guard(mtx); + std::string out; + out += "fd="; + out += std::to_string(fd); + out += " cachesize="; + out += std::to_string(cachesize); + out += " maxoffset="; + out += std::to_string(max_offset); + return out; +} + +//------------------------------------------------------------------------------ +//! Journal get chunks +//------------------------------------------------------------------------------ +std::vector Journal::get_chunks(off_t offset, + size_t size) +{ + auto result = journal.query(offset, offset + size); + std::vector ret; + + for (auto& itr : result) { + uint64_t off = (off_t) itr->low < (off_t) offset ? offset : itr->low; + uint64_t count = itr->high < offset + size ? itr->high - off : offset + size - + off; + uint64_t cacheoff = itr->value + sizeof(header_t) + (off - itr->low); + std::unique_ptr buffer(new char[count]); + ssize_t rc = ::pread(fd, buffer.get(), count, cacheoff); + + if (rc < 0) { + return ret; + } + + ret.push_back(chunk_t(off, count, std::move(buffer))); + } + + return ret; +} diff --git a/src/XrdApps/XrdClJCachePlugin/cache/Journal.hh b/src/XrdApps/XrdClJCachePlugin/cache/Journal.hh new file mode 100644 index 00000000000..badc3225266 --- /dev/null +++ b/src/XrdApps/XrdClJCachePlugin/cache/Journal.hh @@ -0,0 +1,130 @@ +//------------------------------------------------------------------------------ +// Copyright (c) 2024 by European Organization for Nuclear Research (CERN) +// Author: Andreas-Joachim Peters +// Michal Simon +//------------------------------------------------------------------------------ +// This file is part of the XRootD software suite. +// +// XRootD is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// XRootD is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with XRootD. If not, see . +// +// In applying this licence, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#pragma once +/*----------------------------------------------------------------------------*/ +#include "IntervalTree.hh" +/*----------------------------------------------------------------------------*/ +#include +#include +#include +/*----------------------------------------------------------------------------*/ + +class Journal +{ + static constexpr uint64_t JOURNAL_MAGIC = 0xcafecafecafecafe; + + struct jheader_t { + uint64_t magic; + uint64_t mtime; + uint64_t mtime_nsec; // XRootD does not support this though + uint64_t filesize; + uint64_t placeholder1; + uint64_t placeholder2; + uint64_t placeholder3; + uint64_t placeholder4; + }; + + struct header_t { + uint64_t offset; + uint64_t size; + }; + +public: + + struct chunk_t { + + chunk_t() : offset(0), size(0), buff(0) { } + + /* constructor - no ownership of underlying buffer */ + chunk_t(off_t offset, size_t size, const void* buff) : offset(offset), + size(size), buff(buff) { } + + /* constructor - with ownership of underlying buffer */ + chunk_t(off_t offset, size_t size, std::unique_ptr buff) : + offset(offset), size(size), buffOwnership(std::move(buff)), + buff((const void*) buffOwnership.get()) {} + + off_t offset; + size_t size; + std::unique_ptr buffOwnership; + const void* buff; + + bool operator<(const chunk_t& u) const + { + return offset < u.offset; + } + }; + + Journal(); + virtual ~Journal(); + + // base class interface + int attach(const std::string& path, uint64_t mtime, uint64_t mtime_nsec, uint64_t size); + int detach(); + int unlink(); + + ssize_t pread(void* buf, size_t count, off_t offset); + ssize_t pwrite(const void* buf, size_t count, off_t offset); + + int sync(); + + size_t size(); + + off_t get_max_offset(); + + int reset(); + + std::vector get_chunks(off_t offset, size_t size); + + std::string dump(); +private: + + void process_intersection(interval_tree& write, + interval_tree::iterator acr, std::vector& updates); + + static uint64_t offset_for_update(uint64_t offset, uint64_t shift) + { + return offset + sizeof(header_t) + shift; + } + + int update_cache(std::vector& updates); + int read_journal(); + + jheader_t jheader; + int write_jheader(); + void read_jheader(); + + std::string path; + size_t cachesize; + off_t max_offset; + int fd; + + // the value is the offset in the cache file + interval_tree journal; + std::mutex mtx; + int flags; + +}; + diff --git a/src/XrdApps/XrdClJCachePlugin/cache/RbTree.hh b/src/XrdApps/XrdClJCachePlugin/cache/RbTree.hh new file mode 100644 index 00000000000..5d9514759b8 --- /dev/null +++ b/src/XrdApps/XrdClJCachePlugin/cache/RbTree.hh @@ -0,0 +1,809 @@ +/* + * rbtree.hh + * + * Created on: Mar 6, 2017 + * Author: Michal Simon + * + ************************************************************************ + * EOS - the CERN Disk Storage System * + * Copyright (C) 2016 CERN/Switzerland * + * * + * This program is free software: you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation, either version 3 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program. If not, see .* + ************************************************************************/ + +#pragma once + +#include +#include + +class rb_invariant_error : public std::exception +{ +public: + + rb_invariant_error() { } + + virtual const char* what() const throw() + { + return "Red-black tree invariant violation!"; + } + +}; + +enum colour_t { + RED = true, + BLACK = false +}; + +template +class node_t +{ + template friend class rbtree; + friend class RBTreeTest; + +public: + + node_t(const K& key, const V& value) : key(key), value(value), colour(RED), + parent(nullptr) { } + + const K key; + V value; + +private: + colour_t colour; + node_t* parent; + + std::unique_ptr left; + std::unique_ptr right; +}; + +template > +class rbtree +{ + friend class RBTreeTest; + friend class IntervalTreeTest; + +protected: + + static std::unique_ptr make_node(const K& key, const V& value) + { + return std::unique_ptr(new N(key, value)); + } + + static void swap_right_child(std::unique_ptr& node, + std::unique_ptr& successor) + { + std::swap(node->colour, successor->colour); + // first do the obvious + std::swap(node->left, successor->left); + + if (node->left) { + node->left->parent = node.get(); + } + + if (successor->left) { + successor->left->parent = successor.get(); + } + + // now gather remaining pointers + N* p = node->parent; + N* n = node.release(); + N* s = successor.release(); + N* s_right = s->right.release(); + // and finally reassign those pointers + s->parent = p; + node.reset(s); + s->right.reset(n); + n->parent = s; + n->right.reset(s_right); + + if (s_right) { + s_right->parent = n; + } + } + + static void swap_successor(std::unique_ptr& node, + std::unique_ptr& successor) + { + // first check if successor is a direct child of node, + // since it is the in-order successor it can be only + // the right child + if (node->right.get() == successor.get()) { + // it is the right child + swap_right_child(node, successor); + return; + } + + // swap colour + std::swap(node->colour, successor->colour); + // swap parents + std::swap(node, successor); + std::swap(node->parent, successor->parent); + // swap left + std::swap(node->left, successor->left); + + if (node->left) { + node->left->parent = node.get(); + } + + if (successor->left) { + successor->left->parent = successor.get(); + } + + // swap right + std::swap(node->right, successor->right); + + if (node->right) { + node->right->parent = node.get(); + } + + if (successor->right) { + successor->right->parent = successor.get(); + } + } + + static std::unique_ptr null_node; + + // this class is just used in rb_erase_case# methods as + // they need to accept a leaf (null) node as an argument + // that can return its parent and is BLACK + + struct leaf_node_t { + + leaf_node_t(N* parent) : colour(BLACK), parent(parent) { } + + leaf_node_t(const leaf_node_t& leaf) : colour(leaf.colour), + parent(leaf.parent) { } + + leaf_node_t& operator=(const leaf_node_t& leaf) + { + colour = leaf.colour; + parent = leaf.parent; + return *this; + } + + leaf_node_t* operator->() + { + return this; + } + + leaf_node_t& operator*() + { + return *this; + } + + operator bool() const + { + return true; + } + + bool operator==(N* node) const + { + return node == nullptr; + } + + colour_t colour; + N* parent; + }; + +public: + + class iterator + { + public: + + iterator(N* node = 0) : node(node) { } + + N* operator->() + { + return node; + } + + N& operator*() + { + return *node; + } + + const N* operator->() const + { + return node; + } + + const N& operator*() const + { + return *node; + } + + operator bool() const + { + return bool(node); + } + + iterator& operator++() + { + if (!node) { + return *this; + } + + if (node->right) { + node = node->right.get(); + + while (node->left) { + node = node->left.get(); + } + + return *this; + } + + N* parent = node->parent; + + while (parent && is_right(node)) { + node = parent; + parent = node->parent; + } + + node = parent; + return *this; + } + + bool operator!=(const iterator& itr) + { + return node != itr.node; + } + + private: + + N* node; + }; + + rbtree() : tree_size(0) { } + + virtual ~rbtree() { } + + void insert(const K& key, const V& value) + { + insert_into(key, value, tree_root); + } + + void erase(const K& key) + { + std::unique_ptr& node = find_in(key, tree_root); + erase_node(node); + } + + void clear() + { + tree_root.reset(); + tree_size = 0; + } + + iterator find(const K& key) + { + const std::unique_ptr& n = find_in(key, tree_root); + return iterator(n.get()); + } + + const iterator find(const K& key) const + { + const std::unique_ptr& n = find_in(key, tree_root); + return iterator(n.get()); + } + + size_t size() const + { + return tree_size; + } + + bool empty() const + { + return !tree_root; + } + + iterator begin() + { + N* node = tree_root.get(); + + if (!node) { + return iterator(); + } + + while (node->left) { + node = node->left.get(); + } + + return iterator(node); + } + + iterator end() + { + return iterator(); + } + +protected: + + void insert_into(const K& key, const V& value, std::unique_ptr& node, + N* parent = nullptr) + { + if (!node) { + node = make_node(key, value); + node->parent = parent; + ++tree_size; + rb_insert_case1(node.get()); + return; + } + + if (key == node->key) { + return; + } + + if (key < node->key) { + insert_into(key, value, node->left, node.get()); + } else { + insert_into(key, value, node->right, node.get()); + } + } + + void erase_node(std::unique_ptr& node) + { + if (!node) { + return; + } + + if (has_two(node.get())) { + // in this case: + // 1. look for the in-order successor + // 2. replace the node with the in-order successor + // 3. erase the in-order successor + N* n = node.get(); + std::unique_ptr& successor = find_successor(node); + swap_successor(node, successor); + + // we swapped the node with successor and the + // 'successor' unique pointer holds now the node + if (successor.get() == n) { + erase_node(successor); + }// otherwise the successor was the right child of node, + // hence node should be now the right child of 'node' + // unique pointer + else if (node->right.get() == n) { + erase_node(node->right); + }// there are no other cases so anything else is wrong + else { + throw std::logic_error("Bad rbtree swap."); + } + + return; + } + + // node has at most one child + // in this case simply replace the node with the + // single child or null if there are no children + N* parent = node->parent; + std::unique_ptr& child = node->left ? node->left : node->right; + colour_t old_colour = node->colour; + + if (child) { + child->parent = node->parent; + } + + node.reset(child.release()); + --tree_size; + + if (old_colour == BLACK) { + if (node && node->colour == RED) { + node->colour = BLACK; + } else { + // if we are here the node is null because a BLACK + // node that has at most one non-leaf child must + // have two null children (null children are BLACK) + if (node) { + throw rb_invariant_error(); + } + + rb_erase_case1(leaf_node_t(parent)); + } + } else if (node) + // if the node was red it has to have two BLACK children + // and since at most one of those children is a non-leaf + // child actually both have to be leafs (null) in order + // to satisfy the red-black tree invariant + { + throw rb_invariant_error(); + } + } + + template // make it a template so it works both for constant and mutable pointers + static PTR& find_in(const K& key, PTR& node) + { + if (!node) { + return null_node; + } + + if (key == node->key) { + return node; + } + + if (key < node->key) { + return find_in(key, node->left); + } else { + return find_in(key, node->right); + } + } + + template // make it a template so it works both for constant and mutable pointers + static PTR& find_min(PTR& node) + { + if (!node) { + return null_node; + } + + if (node->left) { + return find_min(node->left); + } + + return node; + } + + template // make it a template so it works both for constant and mutable pointers + static PTR& find_successor(PTR& node) + { + if (!node) { + return null_node; + } + + return find_min(node->right); + } + + static bool has_two(const N* node) + { + return node->left && node->right; + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////// + + static void replace(std::unique_ptr& ptr, N* node) + { + ptr.release(); + ptr.reset(node); + } + + virtual void right_rotation(N* node) + { + if (!node) { + return; + } + + N* parent = node->parent; + N* left_child = node->left.release(); + bool is_left = (parent && parent->left.get() == node) ? true : false; + node->left.reset(left_child->right.release()); + + if (node->left) { + node->left->parent = node; + } + + left_child->right.reset(node); + + if (left_child->right) { + left_child->right->parent = left_child; + } + + left_child->parent = parent; + + if (!parent) { + replace(tree_root, left_child); + } else if (is_left) { + replace(parent->left, left_child); + } else { + replace(parent->right, left_child); + } + } + + virtual void left_rotation(N* node) + { + if (!node) { + return; + } + + N* parent = node->parent; + N* right_child = node->right.release(); + bool is_left = (parent && parent->left.get() == node) ? true : false; + node->right.reset(right_child->left.release()); + + if (node->right) { + node->right->parent = node; + } + + right_child->left.reset(node); + + if (right_child->left) { + right_child->left->parent = right_child; + } + + right_child->parent = parent; + + if (!parent) { + replace(tree_root, right_child); + } else if (is_left) { + replace(parent->left, right_child); + } else { + replace(parent->right, right_child); + } + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////// + + N* get_grandparent(N* node) + { + if (!node || !node->parent) { + return nullptr; + } + + return node->parent->parent; + } + + N* get_uncle(N* node) + { + N* grandparent = get_grandparent(node); + + if (!grandparent) { + return nullptr; + } + + if (grandparent->left.get() == node->parent) { + return grandparent->right.get(); + } else { + return grandparent->left.get(); + } + } + + void rb_insert_case1(N* node) + { + if (node->parent == nullptr) { // it is the root + node->colour = BLACK; + } else { + rb_insert_case2(node); + } + } + + void rb_insert_case2(N* node) + { + if (node->parent->colour == BLACK) { + return; // the invariant is OK + } else { + rb_insert_case3(node); + } + } + + void rb_insert_case3(N* node) + { + N* uncle = get_uncle(node); + + if (uncle && uncle->colour == RED) { + node->parent->colour = BLACK; + uncle->colour = BLACK; + N* grandparent = get_grandparent(node); + grandparent->colour = RED; + rb_insert_case1(grandparent); + } else { + rb_insert_case4(node); + } + } + + void rb_insert_case4(N* node) + { + N* grandparent = get_grandparent(node); + + if ((node == node->parent->right.get()) && + (node->parent == grandparent->left.get())) { + left_rotation(grandparent->left.get()); + node = node->left.get(); + } else if ((node == node->parent->left.get()) && + (node->parent == grandparent->right.get())) { + right_rotation(grandparent->right.get()); + node = node->right.get(); + } + + rb_insert_case5(node); + } + + void rb_insert_case5(N* node) + { + N* grandparent = get_grandparent(node); + node->parent->colour = BLACK; + grandparent->colour = RED; + + if (node == node->parent->left.get()) { + right_rotation(grandparent); + } else { + left_rotation(grandparent); + } + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////// + + template + static bool is_left(NODE node) + { + return node == node->parent->left.get(); + } + + template + static bool is_right(NODE node) + { + return node == node->parent->right.get(); + } + + template + N* get_sibling(NODE node) + { + if (!node || !node->parent) { + return nullptr; + } + + if (is_left(node)) { + return node->parent->right.get(); + } else { + return node->parent->left.get(); + } + } + + template + void rb_erase_case1(NODE node) + { + if (node->parent != nullptr) { + rb_erase_case2(node); + } + } + + template + void rb_erase_case2(NODE node) + { + N* sibling = get_sibling(node); + + if (!sibling) { + throw rb_invariant_error(); + } + + if (sibling->colour == RED) { + node->parent->colour = RED; + sibling->colour = BLACK; + + if (is_left(node)) { + left_rotation(node->parent); + } else { + right_rotation(node->parent); + } + } + + rb_erase_case3(node); + } + + template + void rb_erase_case3(NODE node) + { + N* sibling = get_sibling(node); + + if (!sibling) { + throw rb_invariant_error(); + } + + colour_t sibling_left_colour = sibling->left ? sibling->left->colour : BLACK; + colour_t sibling_right_colour = sibling->right ? sibling->right->colour : BLACK; + + if (node->parent->colour == BLACK && + sibling->colour == BLACK && + sibling_left_colour == BLACK && + sibling_right_colour == BLACK) { + sibling->colour = RED; + rb_erase_case1(node->parent); + } else { + rb_erase_case4(node); + } + } + + template + void rb_erase_case4(NODE node) + { + N* sibling = get_sibling(node); + + if (!sibling) { + throw rb_invariant_error(); + } + + colour_t sibling_left_colour = sibling->left ? sibling->left->colour : BLACK; + colour_t sibling_right_colour = sibling->right ? sibling->right->colour : BLACK; + + if (node->parent->colour == RED && + sibling->colour == BLACK && + sibling_left_colour == BLACK && + sibling_right_colour == BLACK) { + sibling->colour = RED; + node->parent->colour = BLACK; + } else { + rb_erase_case5(node); + } + } + + template + void rb_erase_case5(NODE node) + { + N* sibling = get_sibling(node); + + if (!sibling) { + throw rb_invariant_error(); + } + + colour_t sibling_left_colour = sibling->left ? sibling->left->colour : BLACK; + colour_t sibling_right_colour = sibling->right ? sibling->right->colour : BLACK; + + if (sibling->colour == BLACK) { + if (is_left(node) && + sibling_right_colour == BLACK && + sibling_left_colour == RED) { + sibling->colour = RED; + + if (sibling->left) { + sibling->left->colour = BLACK; + } + + right_rotation(sibling); + } else if (is_right(node) && + sibling_left_colour == BLACK && + sibling_right_colour == RED) { + sibling->colour = RED; + + if (sibling->right) { + sibling->right->colour = BLACK; + } + + left_rotation(sibling); + } + } + + rb_erase_case6(node); + } + + template + void rb_erase_case6(NODE node) + { + N* sibling = get_sibling(node); + + if (!sibling) { + throw rb_invariant_error(); + } + + sibling->colour = node->parent->colour; + node->parent->colour = BLACK; + + if (is_left(node)) { + if (sibling->right) { + sibling->right->colour = BLACK; + } + + left_rotation(node->parent); + } else { + if (sibling->left) { + sibling->left->colour = BLACK; + } + + right_rotation(node->parent); + } + } + + std::unique_ptr tree_root; + size_t tree_size; +}; + +template +std::unique_ptr rbtree::null_node;