Skip to content

Commit

Permalink
[feat] warmup report disk is full or not
Browse files Browse the repository at this point in the history
Signed-off-by: Cyber-SiKu <[email protected]>
  • Loading branch information
Cyber-SiKu committed Nov 29, 2023
1 parent b134e88 commit bc95eae
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 12 deletions.
7 changes: 5 additions & 2 deletions curvefs/src/client/curve_fuse_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

#include "curvefs/src/client/curve_fuse_op.h"

#include <fmt/format.h>

#include <cstring>
#include <memory>
#include <string>
Expand Down Expand Up @@ -285,8 +287,9 @@ void QueryWarmupTask(fuse_ino_t key, std::string *data) {
if (!ret) {
*data = "finished";
} else {
*data = std::to_string(progress.GetFinished()) + "/" +
std::to_string(progress.GetTotal());
*data =
fmt::format("{}/{}/{}", progress.GetFinished(), progress.GetTotal(),
progress.GetWarmupStorageErr());
}
VLOG(9) << "Warmup [" << key << "]" << *data;
}
Expand Down
2 changes: 1 addition & 1 deletion curvefs/src/client/s3/disk_cache_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ int DiskCacheManagerImpl::WriteReadDirect(const std::string fileName,
if (!diskCacheManager_->IsDiskUsedInited() ||
diskCacheManager_->IsDiskCacheFull()) {
VLOG(6) << "write disk file fail, disk full.";
return -1;
return 0;
}
int ret = diskCacheManager_->WriteReadDirect(fileName, buf, length);
if (ret < 0) {
Expand Down
11 changes: 11 additions & 0 deletions curvefs/src/client/s3/disk_cache_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,17 @@ class DiskCacheManagerImpl {
virtual int UmountDiskCache();

bool IsDiskCacheFull();
/**
* @brief
*
* @param fileName
* @param buf
* @param length
* @return int
* -2: disk full
* -1: write fail
* >=0: write success
*/
virtual int WriteReadDirect(const std::string fileName, const char* buf,
uint64_t length);
void InitMetrics(std::string fsName, std::shared_ptr<S3Metric> s3Metric);
Expand Down
2 changes: 1 addition & 1 deletion curvefs/src/client/s3/disk_cache_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ int DiskCacheRead::WriteDiskFile(const std::string fileName, const char *buf,
if (fd < 0) {
LOG(ERROR) << "open disk file error. errno = " << errno
<< ", file = " << fileName;
return fd;
return -1;
}
ssize_t writeLen = posixWrapper_->write(fd, buf, length);
if (writeLen < static_cast<ssize_t>(length)) {
Expand Down
12 changes: 11 additions & 1 deletion curvefs/src/client/s3/disk_cache_read.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,18 @@ class DiskCacheRead : public DiskCacheBase {
virtual ~DiskCacheRead() {}
virtual void Init(std::shared_ptr<PosixWrapper> posixWrapper,
const std::string cacheDir, uint32_t objectPrefix);
virtual int ReadDiskFile(const std::string name, char *buf, uint64_t offset,
virtual int ReadDiskFile(const std::string name, char* buf, uint64_t offset,
uint64_t length);
/**
* @brief
*
* @param fileName
* @param buf
* @param length
* @return int
-1: write fail
>=0: write success
*/
virtual int WriteDiskFile(const std::string fileName, const char *buf,
uint64_t length);
virtual int LinkWriteToRead(const std::string fileName,
Expand Down
21 changes: 19 additions & 2 deletions curvefs/src/client/warmup/warmup_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -777,9 +777,14 @@ void WarmupManagerS3Impl::PutObjectToCache(
case curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk:
ret = s3Adaptor_->GetDiskCacheManager()->WriteReadDirect(
context->key, context->buf, context->len);
if (ret < 0) {
if (ret == -1) {
iter->second.SetWarmupStorageErrorType(
WarmupStorageErrorType::WriteFail);
LOG_EVERY_SECOND(INFO)
<< "write read directly failed, key: " << context->key;
} else if (ret == -2) {
iter->second.SetWarmupStorageErrorType(
WarmupStorageErrorType::Full);
}
delete[] context->buf;
break;
Expand All @@ -788,7 +793,19 @@ void WarmupManagerS3Impl::PutObjectToCache(
if (kvClientManager_ != nullptr) {
kvClientManager_->Set(std::make_shared<SetKVCacheTask>(
context->key, context->buf, context->len,
[context](const std::shared_ptr<SetKVCacheTask>&) {
[context, this,
key](const std::shared_ptr<SetKVCacheTask>& task) {
{
ReadLockGuard lock(inode2ProgressMutex_);
auto iter = FindWarmupProgressByKeyLocked(key);
if (iter->second.GetStorageType() ==
curvefs::client::common::WarmupStorageType::
kWarmupStorageTypeKvClient &&
!task->res) {
iter->second.SetWarmupStorageErrorType(
WarmupStorageErrorType::WriteFail);
}
}
delete[] context->buf;
}));
}
Expand Down
41 changes: 37 additions & 4 deletions curvefs/src/client/warmup/warmup_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#ifndef CURVEFS_SRC_CLIENT_WARMUP_WARMUP_MANAGER_H_
#define CURVEFS_SRC_CLIENT_WARMUP_WARMUP_MANAGER_H_

#include <fmt/format.h>

#include <algorithm>
#include <atomic>
#include <cstdint>
Expand Down Expand Up @@ -64,6 +66,12 @@ using curve::common::BthreadRWLock;

using curvefs::client::common::WarmupStorageType;

enum WarmupStorageErrorType {
Ok = 0,
WriteFail = 1,
Full = 2,
};

class WarmupFile {
public:
explicit WarmupFile(fuse_ino_t key = 0, uint64_t fileLen = 0)
Expand Down Expand Up @@ -121,13 +129,15 @@ class WarmupProgress {
: total_(0),
finished_(0),
storageType_(type),
filePathInClient_(filePath) {}
filePathInClient_(filePath),
storageErr_(WarmupStorageErrorType::Ok) {}

WarmupProgress(const WarmupProgress& wp)
: total_(wp.total_),
finished_(wp.finished_),
storageType_(wp.storageType_),
filePathInClient_(wp.filePathInClient_) {}
filePathInClient_(wp.filePathInClient_),
storageErr_(wp.storageErr_) {}

void AddTotal(uint64_t add) {
std::lock_guard<std::mutex> lock(totalMutex_);
Expand All @@ -137,6 +147,7 @@ class WarmupProgress {
WarmupProgress& operator=(const WarmupProgress& wp) {
total_ = wp.total_;
finished_ = wp.finished_;
storageErr_ = wp.storageErr_;
return *this;
}

Expand All @@ -158,21 +169,43 @@ class WarmupProgress {
std::string ToString() {
std::lock_guard<std::mutex> lockT(totalMutex_);
std::lock_guard<std::mutex> lockF(finishedMutex_);
return "total:" + std::to_string(total_) +
",finished:" + std::to_string(finished_);
std::lock_guard<std::mutex> lockS(storageErrMutex_);
return fmt::format("total:{},finished:{},err:{}", total_, finished_,
storageErr_);
}

std::string GetFilePathInClient() { return filePathInClient_; }

WarmupStorageType GetStorageType() { return storageType_; }

void SetWarmupStorageErrorType(WarmupStorageErrorType err) {
std::lock_guard<std::mutex> lockS(storageErrMutex_);
storageErr_ = std::max(err, storageErr_);
}

std::string GetWarmupStorageErr() {
std::lock_guard<std::mutex> lockS(storageErrMutex_);
switch (storageErr_) {
case WarmupStorageErrorType::Ok:
return "Ok";
case WarmupStorageErrorType::WriteFail:
return "write fail";
case WarmupStorageErrorType::Full:
return "full";
default:
return "unkown";
}
}

private:
uint64_t total_;
std::mutex totalMutex_;
uint64_t finished_;
std::mutex finishedMutex_;
WarmupStorageType storageType_;
std::string filePathInClient_;
std::mutex storageErrMutex_;
WarmupStorageErrorType storageErr_;
};

using FuseOpReadFunctionType =
Expand Down
4 changes: 3 additions & 1 deletion tools-v2/pkg/cli/command/curvefs/warmup/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (qCmd *QueryCommand) RunCommand(cmd *cobra.Command, args []string) error {
break
}
strs := strings.Split(resultStr, "/")
if len(strs) != 2 {
if len(strs) < 3 {
break
}
finished, err := strconv.ParseUint(strs[0], 10, 64)
Expand All @@ -129,6 +129,8 @@ func (qCmd *QueryCommand) RunCommand(cmd *cobra.Command, args []string) error {
if err != nil {
break
}
status := strs[2]
bar.Describe(status)
bar.ChangeMax64(int64(total))
bar.Set64(int64(finished))
}
Expand Down

0 comments on commit bc95eae

Please sign in to comment.