Skip to content

Commit

Permalink
XrdApps::JCache implement bypass functionality which does not create …
Browse files Browse the repository at this point in the history
…any cache folders or uses cached file contents, but is keesp all the IO statistics. This can be enabled using XRD_JCACHE_BYPASS=true|1 in the environment or 'bypass=true', 'bypass=1' in the plug-in configuration file.
  • Loading branch information
apeters1971 committed Jun 17, 2024
1 parent 70dd73b commit 803e350
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 35 deletions.
27 changes: 17 additions & 10 deletions src/XrdApps/XrdClJCachePlugin/cache/Journal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Journal::~Journal() {
//------------------------------------------------------------------------------
void Journal::read_jheader() {
jheader_t fheader;
bool exists=false;
auto hr = ::pread64(fd, &fheader, sizeof(jheader), 0);
if ((hr > 0) &&
((hr != sizeof(jheader)) || (fheader.magic != JOURNAL_MAGIC))) {
Expand All @@ -88,18 +89,24 @@ void Journal::read_jheader() {
reset();
return;
}

exists = (hr==sizeof(jheader));

// TODO: understand why the mtime is +-1s
if (jheader.mtime) {
if ((abs(fheader.mtime - jheader.mtime) > 1) ||
(fheader.mtime_nsec != jheader.mtime_nsec) ||
(jheader.filesize && (fheader.filesize != jheader.filesize))) {
std::cerr << "warning: remote file change detected - purging path:"
<< path << std::endl;
std::cerr << fheader.mtime << ":" << jheader.mtime << " "
<< fheader.mtime_nsec << ":" << jheader.mtime_nsec << " "
<< fheader.filesize << ":" << jheader.filesize << std::endl;
reset();
return;
if (exists) {
// we only compare if there was a header in the journal
if ((abs(fheader.mtime - jheader.mtime) > 1) ||
(fheader.mtime_nsec != jheader.mtime_nsec) ||
(jheader.filesize && (fheader.filesize != jheader.filesize))) {
std::cerr << "warning: remote file change detected - purging path:"
<< path << std::endl;
std::cerr << fheader.mtime << ":" << jheader.mtime << " "
<< fheader.mtime_nsec << ":" << jheader.mtime_nsec << " "
<< fheader.filesize << ":" << jheader.filesize << std::endl;
reset();
return;
}
}
} else {
// we assume the contents referenced in the header is ok to allow disconnected ops
Expand Down
6 changes: 6 additions & 0 deletions src/XrdApps/XrdClJCachePlugin/file/Art.hh
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ public:
double maxValue = *std::max_element(dataPoints.begin(), dataPoints.end());
double minValue = *std::min_element(dataPoints.begin(), dataPoints.end());

// we want to see the full range starting at 0 on the Y axis for now
minValue = 0;

// we round the axis to clean 100
maxValue = (int)(maxValue+9) / 10 * 10;

const int plotHeight = 10; // Number of lines in the plot
const int plotWidth = 40; // Width of the plot in characters
const int yLegendWidth = 8; // Width of the Y legend in characters
Expand Down
38 changes: 21 additions & 17 deletions src/XrdApps/XrdClJCachePlugin/file/CacheStats.hh
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ struct CacheStats {
XrdCl::JCacheFile::sStats.persistToJson(jsonpath, name);
}
if (XrdCl::JCacheFile::sEnableSummary) {
std::cerr << CacheStats::GlobalStats(XrdCl::JCacheFile::sStats);
std::cerr << CacheStats::GlobalStats(XrdCl::JCacheFile::sStats, XrdCl::JCacheFile::sEnableBypass);
}
std::vector<uint64_t> bins = XrdCl::JCacheFile::sStats.bench.GetBins(40);
JCache::Art art;
Expand Down Expand Up @@ -264,7 +264,7 @@ struct CacheStats {
gStats.nreadfiles += 1;
}

static std::string GlobalStats(CacheStats &sStats) {
static std::string GlobalStats(CacheStats &sStats, bool bypass) {
std::ostringstream oss;

oss << "# "
Expand All @@ -279,22 +279,26 @@ struct CacheStats {
"---- #"
<< std::endl;


oss << "# JCache : cache combined hit rate : " << std::fixed
<< std::setprecision(2) << sStats.CombinedHitRate() << " %"
<< std::endl;
oss << "# JCache : cache read hit rate : " << std::fixed
<< std::setprecision(2) << (!sStats.ReadOpBytes() ? "\033[9m" : "")
<< sStats.HitRate() << " %" << (!sStats.ReadOpBytes() ? "\033[0m" : "")
<< std::endl;
oss << "# JCache : cache readv hit rate : " << std::fixed
<< std::setprecision(2) << (!sStats.ReadVOpBytes() ? "\033[9m" : "")
<< sStats.HitRateV() << " %" << (!sStats.ReadOpBytes() ? "\033[0m" : "")
<< std::endl;
if (bypass) {
oss << "# JCache : cache runs in bypass mode (hitrate=0%) " << std::endl;
} else {
oss << "# JCache : cache combined hit rate : " << std::fixed
<< std::setprecision(2) << sStats.CombinedHitRate() << " %"
<< std::endl;
oss << "# JCache : cache read hit rate : " << std::fixed
<< std::setprecision(2) << (!sStats.ReadOpBytes() ? "\033[9m" : "")
<< sStats.HitRate() << " %" << (!sStats.ReadOpBytes() ? "\033[0m" : "")
<< std::endl;
oss << "# JCache : cache readv hit rate : " << std::fixed
<< std::setprecision(2) << (!sStats.ReadVOpBytes() ? "\033[9m" : "")
<< sStats.HitRateV() << " %" << (!sStats.ReadOpBytes() ? "\033[0m" : "")
<< std::endl;
}
oss << "# "
"-------------------------------------------------------------------"
"---- #"
"-------------------------------------------------------------------"
"---- #"
<< std::endl;

oss << "# JCache : total bytes read : "
<< sStats.bytesRead.load() + sStats.bytesCached.load() << std::endl;
oss << "# JCache : total bytes readv : "
Expand Down Expand Up @@ -401,7 +405,7 @@ struct CacheStats {
if (XrdCl::JCacheFile::sStats.realTime < 1) {
XrdCl::JCacheFile::sStats.peakrate = XrdCl::JCacheFile::sStats.ReadBytes() / XrdCl::JCacheFile::sStats.realTime;
}
std::string st = CacheStats::GlobalStats(XrdCl::JCacheFile::sStats);
std::string st = CacheStats::GlobalStats(XrdCl::JCacheFile::sStats, XrdCl::JCacheFile::sEnableBypass);
std::cerr << st << std::endl;
}
}
Expand Down
21 changes: 13 additions & 8 deletions src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ std::string XrdCl::JCacheFile::sJsonPath = "";
bool XrdCl::JCacheFile::sEnableJournalCache = true;
bool XrdCl::JCacheFile::sEnableVectorCache = false;
bool XrdCl::JCacheFile::sEnableSummary = true;
bool XrdCl::JCacheFile::sEnableBypass = false;
bool XrdCl::JCacheFile::sOpenAsync = false;
JCache::CacheStats XrdCl::JCacheFile::sStats(true);
JCache::Cleaner XrdCl::JCacheFile::sCleaner;
Expand Down Expand Up @@ -127,7 +128,7 @@ XRootDStatus JCacheFile::Open(const std::string &url, OpenFlags::Flags flags,
if (st.IsOK()) {
mIsOpen = true;
mOpenState = OPENING;
if (sEnableVectorCache || sEnableJournalCache) {
if (sEnableVectorCache || (sEnableJournalCache && !sEnableBypass)) {
if ((flags & OpenFlags::Flags::Read) == OpenFlags::Flags::Read) {
std::string JournalDir =
sCachePath + "/" + VectorCache::computeSHA256(pUrl);
Expand Down Expand Up @@ -228,7 +229,7 @@ XRootDStatus JCacheFile::Read(uint64_t offset, uint32_t size, void *buffer,
if (pFile) {
sStats.bench.AddMeasurement(size);

if (sEnableJournalCache && AttachForRead()) {
if (!sEnableBypass && sEnableJournalCache && AttachForRead()) {
mLog->Info(1, "JCache : Read: offset=%llu size=%llu buffer=%x path='%s'",
offset, size, buffer, pUrl.c_str());
bool eof = false;
Expand All @@ -255,7 +256,7 @@ XRootDStatus JCacheFile::Read(uint64_t offset, uint32_t size, void *buffer,

auto jhandler =
new JCacheReadHandler(handler, &pStats->bytesRead,
sEnableJournalCache ? pJournal.get() : nullptr);
sEnableJournalCache &&! sEnableBypass? pJournal.get() : nullptr);
pStats->readOps++;
st = pFile->Read(offset, size, buffer, jhandler, timeout);
} else {
Expand Down Expand Up @@ -289,7 +290,7 @@ XRootDStatus JCacheFile::PgRead(uint64_t offset, uint32_t size, void *buffer,
if (pFile) {
sStats.bench.AddMeasurement(size);

if (sEnableJournalCache && AttachForRead()) {
if (sEnableJournalCache && AttachForRead() && !sEnableBypass) {
mLog->Info(1, "JCache : PgRead: offset=%llu size=%llu buffer=%x path='%s'",
offset, size, buffer, pUrl.c_str());
bool eof = false;
Expand Down Expand Up @@ -318,7 +319,7 @@ XRootDStatus JCacheFile::PgRead(uint64_t offset, uint32_t size, void *buffer,
}
auto jhandler =
new JCachePgReadHandler(handler, &pStats->bytesRead,
sEnableJournalCache ? pJournal.get() : nullptr);
sEnableJournalCache && !sEnableBypass ? pJournal.get() : nullptr);
pStats->readOps++;
st = pFile->PgRead(offset, size, buffer, jhandler, timeout);
} else {
Expand Down Expand Up @@ -414,7 +415,7 @@ XRootDStatus JCacheFile::VectorRead(const ChunkList &chunks, void *buffer,
return st;
}
} else {
if (sEnableJournalCache) {
if (!sEnableBypass && sEnableJournalCache) {
bool inJournal = true;
size_t len = 0;
// try to get chunks from journal cache
Expand Down Expand Up @@ -456,7 +457,7 @@ XRootDStatus JCacheFile::VectorRead(const ChunkList &chunks, void *buffer,

auto jhandler = new JCacheReadVHandler(
handler, &pStats->bytesReadV,
sEnableJournalCache ? pJournal.get() : nullptr,
sEnableJournalCache && !sEnableBypass ? pJournal.get() : nullptr,
buffer ? (char *)buffer : (char *)(chunks.begin()->buffer),
sEnableVectorCache ? sCachePath : "", pUrl);
pStats->readVOps++;
Expand Down Expand Up @@ -586,8 +587,12 @@ bool JCacheFile::AttachForRead() {
}
if (pJournal->attach(pJournalPath, sinfo->GetModTime(), 0,
sinfo->GetSize())) {
mLog->Error(1, "JCache : failed to attach to cache file: %s",
if (!sEnableBypass) {
// when bypass=true this might throw an error because we don't create the
// journal directory - we just don't want to see this
mLog->Error(1, "JCache : failed to attach to cache file: %s",
pJournalPath.c_str());
}
mAttachedForRead = true;
delete sinfo;
return false;
Expand Down
2 changes: 2 additions & 0 deletions src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ public:
static void SetVector(const bool &value) { sEnableVectorCache = value; }
static void SetJsonPath(const std::string &path) { sJsonPath = path; }
static void SetSummary(const bool &value) { sEnableSummary = value; }
static void SetBypass(const bool &value) { sEnableBypass = value; }
static void SetSize(uint64_t size) { sCleaner.SetSize(size,sCachePath); }
static void SetAsync(bool async) { sOpenAsync = async; }

Expand All @@ -232,6 +233,7 @@ public:
static std::string sJsonPath;
static bool sEnableVectorCache;
static bool sEnableJournalCache;
static bool sEnableBypass;
static bool sEnableSummary;
static bool sOpenAsync;
static JournalManager sJournalManager;
Expand Down
9 changes: 9 additions & 0 deletions src/XrdApps/XrdClJCachePlugin/plugin/XrdClJCachePlugin.hh
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ public:
auto ita = config->find("async");
JCacheFile::SetAsync(ita != config->end() ? (ita->second == "true") || (ita->second == "1")
: false);
auto itb = config->find("bypass");
JCacheFile::SetBypass(itb != config->end() ? (itb->second == "true") || (itb->second == "1")
: false);
auto itjson = config->find("json");
JCacheFile::SetJsonPath(itjson != config->end() ? itjson->second : "");

Expand Down Expand Up @@ -96,6 +99,10 @@ public:
JCacheFile::SetAsync(((std::string(v) == "true") || (std::string(v) == "1")) ? true : false);
}

if (const char *v = getenv("XRD_JCACHE_BYPASS")) {
JCacheFile::SetBypass(((std::string(v) == "true") || (std::string(v) == "1")) ? true : false);
}

if (const char *v = getenv("XRD_JCACHE_JSON")) {
JCacheFile::SetJsonPath((std::string(v).length()) ? std::string(v)
: "");
Expand All @@ -116,6 +123,8 @@ public:
JCacheFile::sEnableSummary ? "true" : "false");
log->Info(1, "JCache : asynchrous/disconnected operation: %s",
JCacheFile::sOpenAsync ? "true" : "false");
log->Info(1, "JCache : bypass operation: %s",
JCacheFile::sEnableBypass ? "true" : "false");
if (JCacheFile::sJsonPath.length()) {
log->Info(1, "JCache : json output to prefix: %s",
JCacheFile::sJsonPath.c_str());
Expand Down

0 comments on commit 803e350

Please sign in to comment.