Skip to content

Commit

Permalink
XrdApps::JCache: restructure source directories - make readV,read/pgR…
Browse files Browse the repository at this point in the history
…ead completely asynchronous functions using response handlers
  • Loading branch information
apeters1971 committed Jun 7, 2024
1 parent e1b5439 commit 013514d
Showing 11 changed files with 288 additions and 77 deletions.
12 changes: 7 additions & 5 deletions src/XrdApps.cmake
Original file line number Diff line number Diff line change
@@ -208,7 +208,7 @@ target_link_libraries(

add_executable(
xrdclcacheclean
XrdApps/XrdClJCachePlugin/XrdClCacheCleaner.cc )
XrdApps/XrdClJCachePlugin/app/XrdClCacheCleaner.cc )

target_link_libraries(
xrdclcacheclean
@@ -221,16 +221,18 @@ target_link_libraries(
add_library(
${LIB_XRDCL_JCACHE_PLUGIN}
MODULE
XrdApps/XrdClJCachePlugin/XrdClJCachePlugin.cc
XrdApps/XrdClJCachePlugin/XrdClJCacheFile.cc
XrdApps/XrdClJCachePlugin/XrdClVectorCache.cc
XrdApps/XrdClJCachePlugin/XrdClVectorCache.hh
XrdApps/XrdClJCachePlugin/plugin/XrdClJCachePlugin.cc
XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc
XrdApps/XrdClJCachePlugin/vector/XrdClVectorCache.cc
XrdApps/XrdClJCachePlugin/vector/XrdClVectorCache.hh
XrdApps/XrdClJCachePlugin/cache/Journal.cc
XrdApps/XrdClJCachePlugin/cache/Journal.hh
XrdApps/XrdClJCachePlugin/cache/IntervalTree.hh
XrdApps/XrdClJCachePlugin/cache/RbTree.hh
)

target_include_directories( ${LIB_XRDCL_JCACHE_PLUGIN} PRIVATE "${CMAKE_SOURCE_DIR}/src/XrdApps/XrdClJCachePlugin/" )

target_link_libraries(${LIB_XRDCL_JCACHE_PLUGIN} PRIVATE XrdCl stdc++fs)


Original file line number Diff line number Diff line change
@@ -27,10 +27,23 @@
#include "XrdCl/XrdClMessageUtils.hh"
/*----------------------------------------------------------------------------*/

std::string JCacheFile::sCachePath="";
bool JCacheFile::sEnableJournalCache = true;
bool JCacheFile::sEnableVectorCache = true;
std::string XrdCl::JCacheFile::sCachePath="";
bool XrdCl::JCacheFile::sEnableJournalCache = true;
bool XrdCl::JCacheFile::sEnableVectorCache = true;

namespace XrdCl
{

//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
JCacheFile::JCacheFile(const std::string& url):
mIsOpen(false),
pFile(0)
{
mAttachedForRead = false;
mLog = DefaultEnv::GetLog();
}
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
@@ -39,6 +52,7 @@ JCacheFile::JCacheFile():
pFile(0)
{
mAttachedForRead = false;
mLog = DefaultEnv::GetLog();
}


@@ -47,7 +61,7 @@ JCacheFile::JCacheFile():
//------------------------------------------------------------------------------
JCacheFile::~JCacheFile()
{

LogStats();
if (pFile) {
delete pFile;
}
@@ -158,6 +172,8 @@ JCacheFile::Read(uint64_t offset,
if (sEnableJournalCache && AttachForRead()) {
auto rb = pJournal.pread(buffer, size, offset);
if (rb == size) {
pStats.bytesCached += rb;
pStats.readOps++;
// we can only serve success full reads from the cache for now
XRootDStatus* ret_st = new XRootDStatus(st);
ChunkInfo* chunkInfo = new ChunkInfo(offset, rb, buffer);
@@ -169,20 +185,9 @@ JCacheFile::Read(uint64_t offset,
}
}

// run a synchronous read
uint32_t bytesRead = 0;
st = pFile->Read(offset, size, buffer, bytesRead, timeout);
if (st.IsOK()) {
if (sEnableJournalCache) {
pJournal.pwrite(buffer, size, offset);
}
// emit a chunk
XRootDStatus* ret_st = new XRootDStatus(st);
ChunkInfo* chunkInfo = new ChunkInfo(offset, bytesRead, buffer);
AnyObject* obj = new AnyObject();
obj->Set(chunkInfo);
handler->HandleResponse(ret_st, obj);
}
auto jhandler = new JCacheReadHandler(handler, &pStats.bytesRead,sEnableJournalCache?&pJournal:nullptr);
pStats.readOps++;
st = pFile->PgRead(offset, size, buffer, jhandler, timeout);
} else {
st = XRootDStatus(stError, errInvalidOp);
}
@@ -227,6 +232,8 @@ JCacheFile::PgRead( uint64_t offset,
if (sEnableJournalCache && AttachForRead()) {
auto rb = pJournal.pread(buffer, size, offset);
if (rb == size) {
pStats.bytesCached += rb;
pStats.readOps++;
// we can only serve success full reads from the cache for now
XRootDStatus* ret_st = new XRootDStatus(st);
ChunkInfo* chunkInfo = new ChunkInfo(offset, rb, buffer);
@@ -238,25 +245,9 @@ JCacheFile::PgRead( uint64_t offset,
}
}

std::vector<uint32_t> cksums;
uint32_t bytesRead = 0;

// run a synchronous read
st = pFile->PgRead(offset, size, buffer, cksums, bytesRead, timeout);
if (st.IsOK()) {
if (sEnableJournalCache) {
if (bytesRead) {
// store into journal
pJournal.pwrite(buffer, size, offset);
}
}
// emit a chunk
XRootDStatus* ret_st = new XRootDStatus(st);
ChunkInfo* chunkInfo = new ChunkInfo(offset, bytesRead, buffer);
AnyObject* obj = new AnyObject();
obj->Set(chunkInfo);
handler->HandleResponse(ret_st, obj);
}
auto jhandler = new JCacheReadHandler(handler, &pStats.bytesRead,sEnableJournalCache?&pJournal:nullptr);
pStats.readOps++;
st = pFile->PgRead(offset, size, buffer, jhandler, timeout);
} else {
st = XRootDStatus(stError, errInvalidOp);
}
@@ -358,33 +349,12 @@ JCacheFile::VectorRead(const ChunkList& chunks,
}
}

// run a synchronous vector read
auto jhandler = new JCacheReadVHandler(handler, &pStats.bytesReadV,sEnableJournalCache?&pJournal:nullptr, buffer, sEnableVectorCache?sCachePath:"", pUrl);
pStats.readVOps++;
pStats.readVreadOps += chunks.size();

VectorReadInfo* vReadInfo;
st = pFile->VectorRead(chunks, buffer, vReadInfo, timeout);
st = pFile->VectorRead(chunks, buffer, jhandler, timeout);

if (st.IsOK()) {
if (sEnableVectorCache) {
// store into cache
cache.store();
}
// emit a chunk
XRootDStatus* ret_st = new XRootDStatus(st);
AnyObject* obj = new AnyObject();
ChunkList vResp = vReadInfo->GetChunks();
vResp = chunks;
obj->Set(vReadInfo);

if (sEnableJournalCache && !sEnableVectorCache) {
// if we run with journal cache but don't cache vectors, we need to
// copy the vector data into the journal cache
for (auto it = chunks.begin(); it != chunks.end(); ++it) {
pJournal.pwrite(it->buffer, it->GetLength(), it->GetOffset());
}
}
handler->HandleResponse(ret_st, obj);
return st;
}
} else {
st = XRootDStatus(stError, errInvalidOp);
}
@@ -485,14 +455,18 @@ JCacheFile::AttachForRead()
auto st = pFile->Stat(false, sinfo);
if (sinfo) {
if (pJournal.attach(pJournalPath,sinfo->GetSize(),sinfo->GetModTime(),0)) {
std::cerr << "error: unable to attach to journal: " << pJournalPath << std::endl;
mLog->Error(1, "JCache : failed to attach to cache directory: %s", pJournalPath.c_str());
mAttachedForRead = true;
return false;
}
} else {
mLog->Info(1, "JCache : attached to cache directory: %s", pJournalPath.c_str());
}
}
}
}
mAttachedForRead = true;
return true;
}

} // namespace XrdCl

Original file line number Diff line number Diff line change
@@ -25,13 +25,19 @@

/*----------------------------------------------------------------------------*/
#include "XrdCl/XrdClPlugInInterface.hh"
#include "XrdCl/XrdClDefaultEnv.hh"
#include "XrdCl/XrdClLog.hh"
/*----------------------------------------------------------------------------*/
#include "cache/Journal.hh"
#include "XrdClVectorCache.hh"
#include "vector/XrdClVectorCache.hh"
#include "handler/XrdClJCacheReadHandler.hh"
#include "handler/XrdClJCacheReadVHandler.hh"
/*----------------------------------------------------------------------------*/
#include <atomic>
/*----------------------------------------------------------------------------*/
using namespace XrdCl;

namespace XrdCl
{
//----------------------------------------------------------------------------
//! RAIN file plugin
//----------------------------------------------------------------------------
@@ -43,7 +49,7 @@ public:
//! Constructor
//----------------------------------------------------------------------------
JCacheFile();

JCacheFile(const std::string& url);

//----------------------------------------------------------------------------
//! Destructor
@@ -196,10 +202,54 @@ public:
static std::string sCachePath;
static bool sEnableVectorCache;
static bool sEnableJournalCache;

void LogStats() {
mLog->Info(1, "JCache : read:readv-ops:readv-read-ops: %lu:%lu:%lus hit-rate: total [read/readv]=%.02f%% [%.02f%%/%.02f%%] remote-bytes-read/readv: %lu / %lu cached-bytes-read/readv: %lu / %lu",
pStats.readOps.load(),
pStats.readVOps.load(),
pStats.readVreadOps.load(),
pStats.CombinedHitRate(),
pStats.HitRate(),
pStats.HitRateV(),
pStats.bytesRead.load(),
pStats.bytesReadV.load(),
pStats.bytesCached.load(),
pStats.bytesCachedV.load());
}
//! structure about cache hit statistics
struct CacheStats {
CacheStats() :
bytesRead(0),
bytesReadV(0),
bytesCached(0),
bytesCachedV(0),
readOps(0),
readVOps(0),
readVreadOps(0)
{}

double HitRate() {
return 100.0*(this->bytesCached.load()+1) /(this->bytesCached.load()+this->bytesRead.load()+1);
}
double HitRateV() {
return 100.0*(this->bytesCachedV.load()+1) /(this->bytesCachedV.load()+this->bytesReadV.load()+1);
}
double CombinedHitRate() {
return 100.0*(this->bytesCached.load()+this->bytesCachedV.load()+1) /(this->bytesCached.load()+this->bytesRead.load()+this->bytesCachedV.load()+this->bytesReadV.load()+1);
}

std::atomic<uint64_t> bytesRead;
std::atomic<uint64_t> bytesReadV;
std::atomic<uint64_t> bytesCached;
std::atomic<uint64_t> bytesCachedV;
std::atomic<uint64_t> readOps;
std::atomic<uint64_t> readVOps;
std::atomic<uint64_t> readVreadOps;
};
private:

bool AttachForRead();

std::atomic<bool> mAttachedForRead;
std::mutex mAttachMutex;
OpenFlags::Flags mFlags;
@@ -208,5 +258,11 @@ private:
std::string pUrl;
Journal pJournal;
std::string pJournalPath;
Log* mLog;

CacheStats pStats;

std::vector<XrdCl::JCacheReadHandler> mReadHandlers;
};

} // namespace XrdCl
72 changes: 72 additions & 0 deletions src/XrdApps/XrdClJCachePlugin/handler/XrdClJCacheReadHandler.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
//------------------------------------------------------------------------------
// Copyright (c) 2024 by European Organization for Nuclear Research (CERN)
// Author: Andreas-Joachim Peters <andreas.joachim.peters@cern.ch>
//------------------------------------------------------------------------------
// This file is part of the XRootD software suite.
//
// XRootD is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// XRootD is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
//
// In applying this licence, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#pragma once
/*----------------------------------------------------------------------------*/
#include "XrdCl/XrdClFile.hh"
#include "XrdCl/XrdClXRootDResponses.hh"
/*----------------------------------------------------------------------------*/
#include "cache/Journal.hh"
/*----------------------------------------------------------------------------*/

namespace XrdCl {

class JCacheReadHandler : public XrdCl::ResponseHandler
// ---------------------------------------------------------------------- //
{
public:
JCacheReadHandler() { }

JCacheReadHandler(JCacheReadHandler* other) {
rbytes = other->rbytes;
journal = other->journal;
}

JCacheReadHandler(XrdCl::ResponseHandler* handler,
std::atomic<uint64_t>* rbytes,
Journal* journal) : handler(handler), rbytes(rbytes), journal(journal) {}

virtual ~JCacheReadHandler() {}

virtual void HandleResponse(XrdCl::XRootDStatus* pStatus,
XrdCl::AnyObject* pResponse) {

XrdCl::PageInfo* pageInfo;
if (pStatus->IsOK()) {
if (pResponse) {
pResponse->Get(pageInfo);
// store successfull reads in the journal
if (journal) journal->pwrite(pageInfo->GetBuffer(), pageInfo->GetLength(), pageInfo->GetOffset());
*rbytes+= pageInfo->GetLength();
}
}
handler->HandleResponse(pStatus, pResponse);
}

XrdCl::ResponseHandler* handler;
std::atomic<uint64_t>* rbytes;
Journal* journal;

};

} // namespace XrdCl
Loading

0 comments on commit 013514d

Please sign in to comment.