diff --git a/src/XrdApps.cmake b/src/XrdApps.cmake index e5c208ae768..df2134ce3d2 100644 --- a/src/XrdApps.cmake +++ b/src/XrdApps.cmake @@ -225,6 +225,10 @@ add_library( XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc XrdApps/XrdClJCachePlugin/vector/XrdClVectorCache.cc XrdApps/XrdClJCachePlugin/vector/XrdClVectorCache.hh + XrdApps/XrdClJCachePlugin/handler/XrdClJCacheOpenHandler.cc + XrdApps/XrdClJCachePlugin/handler/XrdClJCacheOpenHandler.hh + XrdApps/XrdClJCachePlugin/handler/XrdClJCacheReadHandler.hh + XrdApps/XrdClJCachePlugin/handler/XrdClJCacheReadVHandler.hh XrdApps/XrdClJCachePlugin/cache/Journal.cc XrdApps/XrdClJCachePlugin/cache/Journal.hh XrdApps/XrdClJCachePlugin/cache/IntervalTree.hh diff --git a/src/XrdApps/XrdClJCachePlugin/cache/Journal.cc b/src/XrdApps/XrdClJCachePlugin/cache/Journal.cc index 5bcec2ed07c..4344fb30383 100644 --- a/src/XrdApps/XrdClJCachePlugin/cache/Journal.cc +++ b/src/XrdApps/XrdClJCachePlugin/cache/Journal.cc @@ -89,18 +89,22 @@ void Journal::read_jheader() { return; } // TODO: understand why the mtime is +-1s - if ((abs(fheader.mtime - jheader.mtime) > 1) || - (fheader.mtime_nsec != jheader.mtime_nsec) || - (fheader.filesize != jheader.filesize)) { - if (fheader.mtime) { + if (jheader.mtime) { + if ((abs(fheader.mtime - jheader.mtime) > 1) || + (fheader.mtime_nsec != jheader.mtime_nsec) || + (jheader.filesize && (fheader.filesize != jheader.filesize))) { std::cerr << "warning: remote file change detected - purging path:" << path << std::endl; std::cerr << fheader.mtime << ":" << jheader.mtime << " " << fheader.mtime_nsec << ":" << jheader.mtime_nsec << " " << fheader.filesize << ":" << jheader.filesize << std::endl; + reset(); + return; } - reset(); - return; + } else { + // we assume the contents referenced in the header is ok to allow disconnected ops + jheader.mtime = fheader.mtime; + jheader.filesize = fheader.filesize; } } @@ -155,13 +159,27 @@ int Journal::read_journal() { //! Journal attach //------------------------------------------------------------------------------ int Journal::attach(const std::string &lpath, uint64_t mtime, - uint64_t mtime_nsec, uint64_t size) { + uint64_t mtime_nsec, uint64_t size, bool ifexists) { std::lock_guard guard(mtx); path = lpath; - jheader.mtime = mtime; - jheader.mtime_nsec = mtime_nsec; - jheader.filesize = size; + if (!ifexists) { + jheader.mtime = mtime; + jheader.mtime_nsec = mtime_nsec; + jheader.filesize = size; + } + + if (ifexists) { + struct stat buf; + // check if there is already a journal for this file known + if (::stat(path.c_str(), &buf)) { + return -ENOENT; + } else { + if ((size_t)buf.st_size < sizeof(jheader_t)) { + return -EINVAL; + } + } + } if ((fd == -1)) { // need to open the file size_t tries = 0; diff --git a/src/XrdApps/XrdClJCachePlugin/cache/Journal.hh b/src/XrdApps/XrdClJCachePlugin/cache/Journal.hh index 405bc9a20cb..85c79dc8fef 100644 --- a/src/XrdApps/XrdClJCachePlugin/cache/Journal.hh +++ b/src/XrdApps/XrdClJCachePlugin/cache/Journal.hh @@ -81,7 +81,7 @@ public: // base class interface int attach(const std::string &path, uint64_t mtime, uint64_t mtime_nsec, - uint64_t size); + uint64_t size, bool ifexists=false); int detach(); int unlink(); @@ -100,6 +100,16 @@ public: std::string dump(); + off_t getHeaderFileSize() { + std::lock_guard guard(mtx); + return jheader.filesize; + } + + off_t getHeaderMtime() { + std::lock_guard guard(mtx); + return jheader.mtime; + } + private: void process_intersection(interval_tree &write, interval_tree::iterator acr, diff --git a/src/XrdApps/XrdClJCachePlugin/file/CacheStats.hh b/src/XrdApps/XrdClJCachePlugin/file/CacheStats.hh index f35435dc81d..8989f4c9136 100644 --- a/src/XrdApps/XrdClJCachePlugin/file/CacheStats.hh +++ b/src/XrdApps/XrdClJCachePlugin/file/CacheStats.hh @@ -47,7 +47,7 @@ namespace JCache { struct CacheStats { CacheStats(bool doe = false) : bytesRead(0), bytesReadV(0), bytesCached(0), bytesCachedV(0), - readOps(0), readVOps(0), readVreadOps(0), nreadfiles(0), + readOps(0), readVOps(0), readVreadOps(0), nreadfiles(0), totaldatasize(0), dumponexit(doe), peakrate(0) { // Get the current real time struct timeval now; @@ -219,6 +219,7 @@ struct CacheStats { outFile << " \"readVreadOps\": " << readVreadOps.load() << ",\n"; outFile << " \"nreadfiles\": " << nreadfiles.load() << ",\n"; outFile << " \"totaldatasize\": " << totaldatasize.load() << ",\n"; + outFile << " \"opentime_ms\": " << opentime.load() << ",\n"; std::lock_guard lock(urlMutex); outFile << " \"urls\": ["; @@ -315,6 +316,8 @@ struct CacheStats { << std::endl; oss << "# JCache : open unique f. read : " << sStats.UniqueUrls() << std::endl; + oss << "# JCache : time to open files (s) : " << std::setprecision(3) << sStats.opentime.load() + << std::endl; oss << "# " "-------------------------------------------------------------------" "---- #" @@ -361,6 +364,8 @@ struct CacheStats { std::atomic readVreadOps; std::atomic nreadfiles; std::atomic totaldatasize; + std::atomic opentime; + std::atomic dumponexit; std::set urls; std::mutex urlMutex; diff --git a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc index 952827377c3..96c2e633648 100644 --- a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc +++ b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc @@ -33,7 +33,7 @@ std::string XrdCl::JCacheFile::sJsonPath = ""; bool XrdCl::JCacheFile::sEnableJournalCache = true; bool XrdCl::JCacheFile::sEnableVectorCache = false; bool XrdCl::JCacheFile::sEnableSummary = true; - +bool XrdCl::JCacheFile::sOpenAsync = false; JCache::CacheStats XrdCl::JCacheFile::sStats(true); JCache::Cleaner XrdCl::JCacheFile::sCleaner; JournalManager XrdCl::JCacheFile::sJournalManager; @@ -43,17 +43,21 @@ namespace XrdCl { //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ -JCacheFile::JCacheFile(const std::string &url) : mIsOpen(false), pFile(0) { +JCacheFile::JCacheFile(const std::string &url) : mIsOpen(false), pFile(0), mOpenAsync(false) { mAttachedForRead = false; + mOpenState=JCacheFile::CLOSED; mLog = DefaultEnv::GetLog(); + pOpenHandler = nullptr; } //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ -JCacheFile::JCacheFile() : mIsOpen(false), pFile(0) { +JCacheFile::JCacheFile() : mIsOpen(false), pFile(0), mOpenAsync(false) { mAttachedForRead = false; - mLog = DefaultEnv::GetLog(); + mOpenState=JCacheFile::CLOSED; pStats = new JCache::CacheStats(); + mLog = DefaultEnv::GetLog(); + pOpenHandler = nullptr; } //------------------------------------------------------------------------------ @@ -68,6 +72,9 @@ JCacheFile::~JCacheFile() { if (pStats) { delete pStats; } + if (pOpenHandler) { + delete pOpenHandler; + } } //------------------------------------------------------------------------------ @@ -81,7 +88,7 @@ XRootDStatus JCacheFile::Open(const std::string &url, OpenFlags::Flags flags, if (mIsOpen) { st = XRootDStatus(stError, errInvalidOp); - std::cerr << "error: file is already opened: " << pUrl << std::endl; + mLog->Error(1, "File is already opened: %s", pUrl.c_str()); return st; } @@ -95,23 +102,55 @@ XRootDStatus JCacheFile::Open(const std::string &url, OpenFlags::Flags flags, cleanUrl.SetPort(origUrl.GetPort()); cleanUrl.SetPath(origUrl.GetPath()); pUrl = cleanUrl.GetURL(); - st = pFile->Open(url, flags, mode, handler, timeout); - if (st.IsOK()) { - mIsOpen = true; - if (sEnableVectorCache || sEnableJournalCache) { - if ((flags & OpenFlags::Flags::Read) == OpenFlags::Flags::Read) { - std::string JournalDir = - sCachePath + "/" + VectorCache::computeSHA256(pUrl); - pJournalPath = JournalDir + "/journal"; - // it can be that we cannot write the journal directory - if (!VectorCache::ensureLastSubdirectoryExists(JournalDir)) { - st = XRootDStatus(stError, errOSError); - std::cerr << "error: unable to create cache directory: " << JournalDir - << std::endl; - return st; + + + // allow to enable asynchronous operation globally + if (sOpenAsync) { + mOpenAsync = true; + } + + // allow to enable asynchronous operation by CGI per file + if (origUrl.GetParams().count("xrd.jcache.async") && + origUrl.GetParams().at("xrd.jcache.async")=="1") { + mLog->Info(1, "JCache : user allowed async/detached mode"); + mOpenAsync =true; + } + + if ((flags & OpenFlags::Flags::Read) == OpenFlags::Flags::Read) { + pOpenHandler = new JCacheOpenHandler(this); + st = pFile->Open(url, flags, mode, pOpenHandler, timeout); + + if (!mOpenAsync) { + // we have to be sure the file is opened + st = pOpenHandler->Wait(); + } + if (st.IsOK()) { + mIsOpen = true; + mOpenState = OPENING; + if (sEnableVectorCache || sEnableJournalCache) { + if ((flags & OpenFlags::Flags::Read) == OpenFlags::Flags::Read) { + std::string JournalDir = + sCachePath + "/" + VectorCache::computeSHA256(pUrl); + pJournalPath = JournalDir + "/journal"; + // it can be that we cannot write the journal directory + if (!VectorCache::ensureLastSubdirectoryExists(JournalDir)) { + st = XRootDStatus(stError, errOSError); + std::cerr << "error: unable to create cache directory: " << JournalDir + << std::endl; + return st; + } + } } - } + mOpenState = OPENING; + // call the external handler to pretend all is already good! + handler->HandleResponseWithHosts(new XRootDStatus(st),0,0); + } else { + mOpenState = FAILED; } + } else { + // run with the user handler + st = pFile->Open(url, flags, mode, handler, timeout); + mOpenState = OPEN; } return st; } @@ -123,7 +162,11 @@ XRootDStatus JCacheFile::Close(ResponseHandler *handler, uint16_t timeout) { XRootDStatus st; if (mIsOpen) { + if (mOpenState == OPENING) { + pOpenHandler->Wait(); + } mIsOpen = false; + mOpenState = CLOSED; pUrl = ""; if (pFile) { st = pFile->Close(handler, timeout); @@ -148,6 +191,22 @@ XRootDStatus JCacheFile::Stat(bool force, ResponseHandler *handler, XRootDStatus st; if (pFile) { + if (!force && mOpenAsync) { + // let's create a stat response using the cache + AnyObject *obj = new AnyObject(); + std::string id = pUrl; + if (sEnableJournalCache && AttachForRead() && mOpenAsync) { + auto statInfo = new StatInfo(id, pJournal->getHeaderFileSize(), 0, pJournal->getHeaderMtime()); + obj->Set(statInfo); + XRootDStatus *ret_st = new XRootDStatus(XRootDStatus(stOK, 0)); + handler->HandleResponse(ret_st,obj); + st = XRootDStatus(stOK, 0); + return st; + } + } + // we have to be sure the file is opened + st = pOpenHandler->Wait(); + if (!st.IsOK()) {return st;} st = pFile->Stat(force, handler, timeout); } else { st = XRootDStatus(stError, errInvalidOp); @@ -165,6 +224,7 @@ XRootDStatus JCacheFile::Read(uint64_t offset, uint32_t size, void *buffer, if (pFile) { sStats.bench.AddMeasurement(size); + if (sEnableJournalCache && AttachForRead()) { mLog->Info(1, "JCache : Read: offset=%llu size=%llu buffer=%x path='%s'", offset, size, buffer, pUrl.c_str()); @@ -184,6 +244,10 @@ XRootDStatus JCacheFile::Read(uint64_t offset, uint32_t size, void *buffer, } } + // we have to be sure the file is opened + st = pOpenHandler->Wait(); + if (!st.IsOK()) {return st;} + auto jhandler = new JCacheReadHandler(handler, &pStats->bytesRead, sEnableJournalCache ? pJournal.get() : nullptr); @@ -220,6 +284,7 @@ XRootDStatus JCacheFile::PgRead(uint64_t offset, uint32_t size, void *buffer, XRootDStatus st; if (pFile) { sStats.bench.AddMeasurement(size); + if (sEnableJournalCache && AttachForRead()) { mLog->Info(1, "JCache : PgRead: offset=%llu size=%llu buffer=%x path='%s'", offset, size, buffer, pUrl.c_str()); @@ -242,6 +307,10 @@ XRootDStatus JCacheFile::PgRead(uint64_t offset, uint32_t size, void *buffer, } } + // we have to be sure the file is opened + st = pOpenHandler->Wait(); + if (!st.IsOK()) {return st;} + auto jhandler = new JCachePgReadHandler(handler, &pStats->bytesRead, sEnableJournalCache ? pJournal.get() : nullptr); @@ -374,6 +443,10 @@ XRootDStatus JCacheFile::VectorRead(const ChunkList &chunks, void *buffer, } } + // we have to be sure the file is opened + st = pOpenHandler->Wait(); + if (!st.IsOK()) {return st;} + auto jhandler = new JCacheReadVHandler( handler, &pStats->bytesReadV, sEnableJournalCache ? pJournal.get() : nullptr, @@ -398,6 +471,10 @@ XRootDStatus JCacheFile::Fcntl(const XrdCl::Buffer &arg, ResponseHandler *handler, uint16_t timeout) { XRootDStatus st; + // we have to be sure the file is opened + st = pOpenHandler->Wait(); + if (!st.IsOK()) {return st;} + if (pFile) { st = pFile->Fcntl(arg, handler, timeout); } else { @@ -413,6 +490,10 @@ XRootDStatus JCacheFile::Fcntl(const XrdCl::Buffer &arg, XRootDStatus JCacheFile::Visa(ResponseHandler *handler, uint16_t timeout) { XRootDStatus st; + // we have to be sure the file is opened + st = pOpenHandler->Wait(); + if (!st.IsOK()) {return st;} + if (pFile) { st = pFile->Visa(handler, timeout); } else { @@ -444,6 +525,7 @@ bool JCacheFile::SetProperty(const std::string &name, //------------------------------------------------------------------------------ bool JCacheFile::GetProperty(const std::string &name, std::string &value) const { + if (!pOpenHandler->Wait().IsOK()) { return false;} if (pFile) { return pFile->GetProperty(name, value); } else { @@ -462,12 +544,33 @@ bool JCacheFile::AttachForRead() { mLog->Info(1, "JCache : attaching via journalmanager to '%s'", pUrl.c_str()); pJournal = sJournalManager.attach(pUrl); + + // try to attach to an existing journal (disconnected mode) + if (mOpenAsync) { + if (!pJournal->attach(pJournalPath, 0, 0, 0, true)) { + if (!sStats.HasUrl(pUrl)) { + sStats.totaldatasize += pJournal->getHeaderFileSize(); + } + mLog->Info(1, "JCache : attached (async) to cache file: %s", + pJournalPath.c_str()); + sStats.AddUrl(pUrl); + mAttachedForRead = true; + return true; + } else { + mOpenAsync = false; + } + } + + // We need an open file here to proceed + pOpenHandler->Wait(); + StatInfo *sinfo = 0; auto st = pFile->Stat(false, sinfo); if (sinfo) { // only add a file if it wasn't yet added if (!sStats.HasUrl(pUrl)) { sStats.totaldatasize += sinfo->GetSize(); + sStats.opentime = sStats.opentime.load() + pOpenHandler->GetTimeToOpen(); } if (pJournal->attach(pJournalPath, sinfo->GetModTime(), 0, sinfo->GetSize())) { diff --git a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh index b1e90b006e2..444cb30e5d3 100644 --- a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh +++ b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh @@ -35,6 +35,7 @@ #include "handler/XrdClJCachePgReadHandler.hh" #include "handler/XrdClJCacheReadHandler.hh" #include "handler/XrdClJCacheReadVHandler.hh" +#include "handler/XrdClJCacheOpenHandler.hh" #include "vector/XrdClVectorCache.hh" /*----------------------------------------------------------------------------*/ #include @@ -221,7 +222,8 @@ public: static void SetVector(const bool &value) { sEnableVectorCache = value; } static void SetJsonPath(const std::string &path) { sJsonPath = path; } static void SetSummary(const bool &value) { sEnableSummary = value; } - static void SetSize(uint64_t size) { sCleaner.SetSize(size,sCachePath);} + static void SetSize(uint64_t size) { sCleaner.SetSize(size,sCachePath); } + static void SetAsync(bool async) { sOpenAsync = async; } //---------------------------------------------------------------------------- //! @brief static members pointing to cache settings @@ -231,6 +233,7 @@ public: static bool sEnableVectorCache; static bool sEnableJournalCache; static bool sEnableSummary; + static bool sOpenAsync; static JournalManager sJournalManager; //! @brief set stats interval in seconds @@ -247,6 +250,15 @@ public: //! @brief cleaner instance static JCache::Cleaner sCleaner; + enum State { + CLOSED = 0, + OPENING, + OPEN, + FAILED + }; + //! @brief openstate + std::atomic mOpenState; + private: //! @brief attach for read bool AttachForRead(); @@ -259,8 +271,12 @@ private: OpenFlags::Flags mFlags; //! @brief boolean to track if file is open bool mIsOpen; + //! @brief async open handler + JCacheOpenHandler* pOpenHandler; //! @brief pointer to the remote file XrdCl::File *pFile; + //! @brief boolean if file open is async + bool mOpenAsync; //! @brief URL of the remote file std::string pUrl; //! @brief instance of a local journal for this file diff --git a/src/XrdApps/XrdClJCachePlugin/plugin/XrdClJCachePlugin.hh b/src/XrdApps/XrdClJCachePlugin/plugin/XrdClJCachePlugin.hh index 7a910f94897..1eae6d590b6 100644 --- a/src/XrdApps/XrdClJCachePlugin/plugin/XrdClJCachePlugin.hh +++ b/src/XrdApps/XrdClJCachePlugin/plugin/XrdClJCachePlugin.hh @@ -54,16 +54,19 @@ public: JCacheFile::SetSize(itsz != config->end() ? std::stoll(std::string(itsz->second),0,10) : 0); auto itv = config->find("vector"); - JCacheFile::SetVector(itv != config->end() ? itv->second == "true" + JCacheFile::SetVector(itv != config->end() ? (itv->second == "true") || (itv->second == "1") : false); auto itj = config->find("journal"); - JCacheFile::SetJournal(itj != config->end() ? itj->second == "true" + JCacheFile::SetJournal(itj != config->end() ? (itj->second == "true") || (itj->second == "1") : true); + auto ita = config->find("async"); + JCacheFile::SetAsync(ita != config->end() ? (ita->second == "true") || (ita->second == "1") + : false); auto itjson = config->find("json"); JCacheFile::SetJsonPath(itjson != config->end() ? itjson->second : ""); auto its = config->find("summary"); - JCacheFile::SetSummary(its != config->end() ? its->second == "true" + JCacheFile::SetSummary(its != config->end() ? (its->second == "true") || (its->second == "1") : true); auto itsi = config->find("stats"); JCacheFile::SetStatsInterval(itsi != config->end() ? std::stoll(std::string(itsi->second),0,10) : 0); @@ -78,15 +81,19 @@ public: } if (const char *v = getenv("XRD_JCACHE_SUMMARY")) { - JCacheFile::SetSummary((std::string(v) == "true") ? true : false); + JCacheFile::SetSummary(((std::string(v) == "true") || (std::string(v) == "1")) ? true : false); } if (const char *v = getenv("XRD_JCACHE_JOURNAL")) { - JCacheFile::SetJournal((std::string(v) == "true") ? true : false); + JCacheFile::SetJournal(((std::string(v) == "true") || (std::string(v) == "1")) ? true : false); } if (const char *v = getenv("XRD_JCACHE_VECTOR")) { - JCacheFile::SetVector((std::string(v) == "true") ? true : false); + JCacheFile::SetVector(((std::string(v) == "true") || (std::string(v) == "1")) ? true : false); + } + + if (const char *v = getenv("XRD_JCACHE_ASYNC")) { + JCacheFile::SetAsync(((std::string(v) == "true") || (std::string(v) == "1")) ? true : false); } if (const char *v = getenv("XRD_JCACHE_JSON")) { @@ -107,6 +114,8 @@ public: JCacheFile::sEnableJournalCache ? "true" : "false"); log->Info(1, "JCache : summary output is: %s", JCacheFile::sEnableSummary ? "true" : "false"); + log->Info(1, "JCache : asynchrous/disconnected operation: %s", + JCacheFile::sOpenAsync ? "true" : "false"); if (JCacheFile::sJsonPath.length()) { log->Info(1, "JCache : json output to prefix: %s", JCacheFile::sJsonPath.c_str());