Skip to content

Commit

Permalink
XrdApps::JCache: add time vs rate benchmarking and rate output
Browse files Browse the repository at this point in the history
  • Loading branch information
Andreas Joachim Peters committed Jun 10, 2024
1 parent 8b65278 commit 1a73711
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 14 deletions.
88 changes: 88 additions & 0 deletions src/XrdApps/XrdClJCachePlugin/file/Art.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//------------------------------------------------------------------------------
// Copyright (c) 2024 by European Organization for Nuclear Research (CERN)
// Author: Andreas-Joachim Peters <[email protected]>
//------------------------------------------------------------------------------
// This file is part of the XRootD software suite.
//
// XRootD is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// XRootD is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
//
// In applying this licence, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#pragma once

/*----------------------------------------------------------------------------*/
#include <iostream>
#include <iomanip>
#include <vector>
#include <cmath>
#include <numeric>
/*----------------------------------------------------------------------------*/


class Art {
public:
Art() {}
virtual ~Art() {}

void drawCurve(const std::vector<double>& dataPoints) {
if (dataPoints.size() != 10) {
std::cerr << "Error: Exactly 10 data points are required." << std::endl;
return;
}

double maxValue = *std::max_element(dataPoints.begin(), dataPoints.end());
double minValue = *std::min_element(dataPoints.begin(), dataPoints.end());

const int plotHeight = 10; // Number of lines in the plot
const int plotWidth = 40; // Width of the plot in characters
const int yLegendWidth = 8; // Width of the Y legend in characters

std::vector<std::string> plot(plotHeight, std::string(plotWidth, ' '));

// Normalize data points to the plot height
std::vector<int> normalizedDataPoints;
for (double point : dataPoints) {
int normalizedValue = static_cast<int>((point - minValue) / (maxValue - minValue) * (plotHeight - 1));
normalizedDataPoints.push_back(normalizedValue);
}

// Draw the curve
for (size_t i = 0; i < normalizedDataPoints.size(); ++i) {
int y = plotHeight - 1 - normalizedDataPoints[i];
plot[y][i * (plotWidth / (dataPoints.size() - 1))] = '*';
}

// Print the plot with Y legend
for (int i = 0; i < plotHeight; ++i) {
double yValue = minValue + (maxValue - minValue) * (plotHeight - 1 - i) / (plotHeight - 1);
std::cout << std::setw(yLegendWidth) << std::fixed << std::setprecision(2) << yValue << " | ";
std::cout << plot[i] << std::endl;
}

// Print the X axis
std::cout << std::string(yLegendWidth + 3, ' ') << std::string(plotWidth, '-') << std::endl;
std::cout << std::string(yLegendWidth + 3, ' ') << "0 1 2 3 4 5 6 7 8 9" << std::endl;
}

void drawCurve(const std::vector<long unsigned int>& data, double interval) {
std::vector<double> newdata;
for ( auto i:data ) {
newdata.push_back(i/1000000.0 / interval);
}
return drawCurve(newdata);
}
};

87 changes: 87 additions & 0 deletions src/XrdApps/XrdClJCachePlugin/file/TimeBench.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
//------------------------------------------------------------------------------
// Copyright (c) 2024 by European Organization for Nuclear Research (CERN)
// Author: Andreas-Joachim Peters <[email protected]>
//------------------------------------------------------------------------------
// This file is part of the XRootD software suite.
//
// XRootD is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// XRootD is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
//
// In applying this licence, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#pragma once

#include <iostream>
#include <vector>
#include <chrono>
#include <mutex>

class TimeBench {
private:
using Clock = std::chrono::high_resolution_clock;
using TimePoint = std::chrono::time_point<Clock>;
using Duration = std::chrono::microseconds;

std::vector<std::pair<TimePoint,uint64_t>> measurements;
std::vector<uint64_t> bins;
TimePoint start;
TimePoint end;
uint64_t totalBytes;
std::mutex mtx;

public:
TimeBench() : totalBytes(0) {}

void AddMeasurement(uint64_t bytes) {
std::lock_guard<std::mutex> guard(mtx);
auto now = Clock::now();
if (measurements.empty()) {
start = now;
}
measurements.push_back(std::make_pair(now,bytes));
totalBytes += bytes;
end = now;
}

std::vector<uint64_t> GetBins() {
std::lock_guard<std::mutex> guard(mtx);
Duration totalTime = std::chrono::duration_cast<Duration>(end - start);
Duration binSize = totalTime / 10;
bins.clear();
bins.resize(10, 0);

TimePoint binStart = start;
size_t binIndex = 0;

for (auto i : measurements) {
TimePoint measurementTime = binStart + binSize;

binIndex = (i.first - start)/ binSize;
if (Clock::now() >= measurementTime) {
bins[binIndex] += i.second;
} else {
break; // Don't process future measurements
}
}

return bins;
}

Duration GetTimePerBin() {
Duration totalTime = std::chrono::duration_cast<Duration>(end - start);
Duration binSize = totalTime / 10;
return binSize;
}
};
19 changes: 11 additions & 8 deletions src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ JCacheFile::Open(const std::string& url,
cleanUrl.SetPath(origUrl.GetPath());
pUrl = cleanUrl.GetURL();
st = pFile->Open(url, flags, mode, handler, timeout);

if (st.IsOK()) {
mIsOpen = true;
if (sEnableVectorCache || sEnableJournalCache) {
Expand Down Expand Up @@ -133,8 +132,7 @@ JCacheFile::Close(ResponseHandler* handler,

if (mIsOpen) {
mIsOpen = false;
sStats.AddUrl(pUrl);
pUrl = "";
pUrl = "";
if (pFile) {
st = pFile->Close(handler, timeout);
} else {
Expand Down Expand Up @@ -184,6 +182,7 @@ JCacheFile::Read(uint64_t offset,
XRootDStatus st;

if (pFile) {
sStats.bench.AddMeasurement(size);
if (sEnableJournalCache && AttachForRead()) {
bool eof = false;
auto rb = pJournal->pread(buffer, size, offset, eof);
Expand Down Expand Up @@ -244,6 +243,7 @@ JCacheFile::PgRead( uint64_t offset,
{
XRootDStatus st;
if (pFile) {
sStats.bench.AddMeasurement(size);
if (sEnableJournalCache && AttachForRead()) {
bool eof = false;
auto rb = pJournal->pread(buffer, size, offset, eof);
Expand Down Expand Up @@ -344,12 +344,14 @@ JCacheFile::VectorRead(const ChunkList& chunks,
XRootDStatus st;

if (pFile) {
if (sEnableVectorCache) {
uint32_t len = 0;
for (auto it = chunks.begin(); it != chunks.end(); ++it) {
len += it->length;
}
uint32_t len = 0;
for (auto it = chunks.begin(); it != chunks.end(); ++it) {
len += it->length;
}

sStats.bench.AddMeasurement(len);

if (sEnableVectorCache) {
VectorCache cache(chunks, pUrl, buffer?(char*)buffer:(char*)(chunks.begin()->buffer), sCachePath);

if (cache.retrieve()) {
Expand Down Expand Up @@ -524,6 +526,7 @@ JCacheFile::AttachForRead()
}
}
}
sStats.AddUrl(pUrl);
mAttachedForRead = true;
return true;
}
Expand Down
48 changes: 42 additions & 6 deletions src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include "XrdCl/XrdClDefaultEnv.hh"
#include "XrdCl/XrdClLog.hh"
/*----------------------------------------------------------------------------*/
#include "file/Art.hh"
#include "file/TimeBench.hh"
#include "cache/Journal.hh"
#include "vector/XrdClVectorCache.hh"
#include "handler/XrdClJCacheReadHandler.hh"
Expand All @@ -38,6 +40,10 @@
#include <sys/resource.h>
#include <sys/time.h>
#include <chrono>
#include <iostream>
#include <string>
#include <iomanip>
#include <cmath>
/*----------------------------------------------------------------------------*/

namespace XrdCl
Expand Down Expand Up @@ -291,9 +297,9 @@ public:
sStats.GetTimes();
std::ostringstream oss;
oss << "# ----------------------------------------------------------- #" << std::endl;
oss << "# JCache : cache combined hit rate : " << std::fixed << std::setprecision(2) << sStats.CombinedHitRate() << " %%" << std::endl;
oss << "# JCache : cache read hit rate : " << std::fixed << std::setprecision(2) << sStats.HitRate() << " %%" << std::endl;
oss << "# JCache : cache readv hit rate : " << std::fixed << std::setprecision(2) << sStats.HitRateV() << " %%" << std::endl;
oss << "# JCache : cache combined hit rate : " << std::fixed << std::setprecision(2) << sStats.CombinedHitRate() << " %" << std::endl;
oss << "# JCache : cache read hit rate : " << std::fixed << std::setprecision(2) << sStats.HitRate() << " %" << std::endl;
oss << "# JCache : cache readv hit rate : " << std::fixed << std::setprecision(2) << sStats.HitRateV() << " %" << std::endl;
oss << "# ----------------------------------------------------------- #" << std::endl;
oss << "# JCache : total bytes read : " << sStats.bytesRead.load()+sStats.bytesCached.load() << std::endl;
oss << "# JCache : total bytes readv : " << sStats.bytesReadV.load()+sStats.bytesCachedV.load() << std::endl;
Expand All @@ -305,13 +311,26 @@ public:
oss << "# JCache : open files read : " << sStats.nreadfiles.load() << std::endl;
oss << "# JCache : open unique f. read : " << sStats.UniqueUrls() << std::endl;
oss << "# ----------------------------------------------------------- #" << std::endl;
oss << "# JCache : total dataset size : " << sStats.totaldatasize << std::endl;
oss << "# JCache : percentage dataset read : " << std::fixed << std::setprecision(2) << sStats.Used() << " %%" << std::endl;
oss << "# JCache : total unique files bytes : " << sStats.totaldatasize << std::endl;
oss << "# JCache : total unique files size : " << sStats.bytesToHumanReadable((double)sStats.totaldatasize) << std::endl;
oss << "# JCache : percentage dataset read : " << std::fixed << std::setprecision(2) << sStats.Used() << " %" << std::endl;
oss << "# ----------------------------------------------------------- #" << std::endl;
oss << "# JCache : app user time : " << std::fixed << std::setprecision(2) << sStats.userTime << " s" << std::endl;
oss << "# JCache : app real time : " << std::fixed << std::setprecision(2) << sStats.realTime << " s" << std::endl;
oss << "# JCache : app sys time : " << std::fixed << std::setprecision(2) << sStats.sysTime << " s" << std::endl;
oss << "# JCache : app acceleration : " << std::fixed << std::setprecision(2) << sStats.userTime / sStats.realTime << "x" << std::endl;
oss << "# JCache : app readrate : " << std::fixed << std::setprecision(2) << sStats.bytesToHumanReadable((sStats.ReadBytes()/sStats.realTime)) << "/s" << std::endl;
oss << "# ----------------------------------------------------------- #" << std::endl;

using namespace std::chrono;

std::vector<uint64_t> bins = sStats.bench.GetBins();

for (size_t i = 0; i < bins.size(); ++i) {
std::cout << "Bin " << i + 1 << ": " << bins[i] << " bytes" << std::endl;
}
Art art;
art.drawCurve(bins, sStats.bench.GetTimePerBin().count() / 1000000.0);
return oss.str();
}
//! structure about cache hit statistics
Expand Down Expand Up @@ -339,6 +358,19 @@ public:
}
}

static std::string bytesToHumanReadable(double bytes) {
const char* suffixes[] = {"B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"};
const int numSuffixes = sizeof(suffixes) / sizeof(suffixes[0]);

if (bytes == 0) return "0 B";

int exp = std::min((int)(std::log(bytes) / std::log(1000)), numSuffixes - 1);
double val = bytes / std::pow(1000, exp);
std::ostringstream oss;
oss << std::fixed << std::setprecision(2) << val << " " << suffixes[exp];
return oss.str();
}

double HitRate() {
auto n = this->bytesCached.load()+this->bytesRead.load();
if (!n) return 100.0;
Expand All @@ -362,7 +394,10 @@ public:
std::lock_guard<std::mutex> guard(urlMutex);
return urls.count(url);
}

double ReadBytes() {
return (sStats.bytesRead.load()+sStats.bytesReadV.load() + sStats.bytesCached.load() + sStats.bytesCachedV.load());
}

double Used() {
if (sStats.totaldatasize) {
return 100.0*(sStats.bytesRead.load()+sStats.bytesReadV.load() + sStats.bytesCached.load() + sStats.bytesCachedV.load()) / sStats.totaldatasize;
Expand Down Expand Up @@ -408,6 +443,7 @@ public:
std::atomic<double> realTime;
std::atomic<double> sysTime;
std::atomic<double> startTime;
TimeBench bench;
};
private:

Expand Down

0 comments on commit 1a73711

Please sign in to comment.