Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

curvefs/client: fix the delayed inode not being retrieved in nocto scenario. #2909

Merged
merged 2 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion curvefs/devops/util/tmpl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Usage:
# tmpl.sh DSV SOURCE DESTINATION
# Example:
# tmpl.sh = /usr/local/metaserver.conf /tmp/metaserver.con
# tmpl.sh = /usr/local/metaserver.conf /tmp/metaserver.conf

g_dsv=$1
g_src=$2
Expand Down
124 changes: 105 additions & 19 deletions curvefs/src/client/filesystem/defer_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,94 @@ namespace curvefs {
namespace client {
namespace filesystem {

DeferSync::DeferSync(DeferSyncOption option)
: option_(option),
using ::curve::common::LockGuard;
using ::curve::common::ReadLockGuard;
using ::curve::common::WriteLockGuard;
using ::curvefs::client::filesystem::AttrCtime;

#define RETURN_FALSE_IF_CTO_ON() \
do { \
if (cto_) { \
return false; \
} \
} while (0)

DeferInodes::DeferInodes(bool cto)
: cto_(cto),
rwlock_(),
inodes_() {}

bool DeferInodes::Add(const std::shared_ptr<InodeWrapper>& inode) {
RETURN_FALSE_IF_CTO_ON();
WriteLockGuard lk(rwlock_);
Ino ino = inode->GetInodeId();
auto ret = inodes_.emplace(ino, inode);
auto iter = ret.first;
bool yes = ret.second;
if (!yes) { // already exists
iter->second = inode;
}
return true;
}

bool DeferInodes::Get(Ino ino, std::shared_ptr<InodeWrapper>* inode) {
RETURN_FALSE_IF_CTO_ON();
ReadLockGuard lk(rwlock_);
auto iter = inodes_.find(ino);
if (iter == inodes_.end()) {
return false;
}
*inode = iter->second;
return true;
}

bool DeferInodes::Remove(const std::shared_ptr<InodeWrapper>& inode) {
RETURN_FALSE_IF_CTO_ON();
WriteLockGuard lk(rwlock_);
InodeAttr attr;
inode->GetInodeAttrLocked(&attr);
auto iter = inodes_.find(attr.inodeid());
if (iter == inodes_.end()) {
return false;
}

InodeAttr defered;
iter->second->GetInodeAttrLocked(&defered);
if (AttrCtime(attr) < AttrCtime(defered)) {
// it means the old defered inode already replaced by the lastest one,
// so we can't remove it before it synced yet.
return false;
}
inodes_.erase(iter);
return true;
}

size_t DeferInodes::Size() {
ReadLockGuard lk(rwlock_);
return inodes_.size();
}

SyncInodeClosure::SyncInodeClosure(const std::shared_ptr<DeferInodes>& inodes,
const std::shared_ptr<InodeWrapper>& inode)
: inodes_(inodes), inode_(inode) {}

void SyncInodeClosure::Run() {
std::unique_ptr<SyncInodeClosure> self_guard(this);
MetaStatusCode rc = GetStatusCode();
if (rc == MetaStatusCode::OK || rc == MetaStatusCode::NOT_FOUND) {
inodes_->Remove(inode_);
}
}

DeferSync::DeferSync(bool cto, DeferSyncOption option)
: cto_(cto),
option_(option),
mutex_(),
running_(false),
thread_(),
sleeper_(),
inodes_() {
}
pending_(),
inodes_(std::make_shared<DeferInodes>(cto)) {}

void DeferSync::Start() {
if (!running_.exchange(true)) {
Expand All @@ -55,20 +135,32 @@ void DeferSync::Stop() {
}
}

SyncInodeClosure* DeferSync::NewSyncInodeClosure(
const std::shared_ptr<InodeWrapper>& inode) {
// NOTE: we only store the defer inodes in nocto scenario,
// which means we don't need to remove the inode from defer inodes
// even if the inode already synced done in cto scenario.
if (cto_) {
return nullptr;
}
return new SyncInodeClosure(inodes_, inode);
}

void DeferSync::SyncTask() {
std::vector<std::shared_ptr<InodeWrapper>> inodes;
std::vector<std::shared_ptr<InodeWrapper>> syncing;
for ( ;; ) {
bool running = sleeper_.wait_for(std::chrono::seconds(option_.delay));

{
LockGuard lk(mutex_);
inodes.swap(inodes_);
syncing.swap(pending_);
}
for (const auto& inode : inodes) {
for (const auto& inode : syncing) {
auto closure = NewSyncInodeClosure(inode);
UniqueLock lk(inode->GetUniqueLock());
inode->Async(nullptr, true);
inode->Async(closure, true);
}
inodes.clear();
syncing.clear();

if (!running) {
break;
Expand All @@ -78,18 +170,12 @@ void DeferSync::SyncTask() {

void DeferSync::Push(const std::shared_ptr<InodeWrapper>& inode) {
LockGuard lk(mutex_);
inodes_.emplace_back(inode);
pending_.emplace_back(inode);
inodes_->Add(inode);
}

bool DeferSync::IsDefered(Ino ino, InodeAttr* attr) {
LockGuard lk(mutex_);
for (const auto& inode : inodes_) {
if (inode->GetInodeId() == ino) {
inode->GetInodeAttr(attr);
return true;
}
}
return false;
bool DeferSync::IsDefered(Ino ino, std::shared_ptr<InodeWrapper>* inode) {
return inodes_->Get(ino, inode);
}

} // namespace filesystem
Expand Down
53 changes: 47 additions & 6 deletions curvefs/src/client/filesystem/defer_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,42 +27,83 @@
#include <vector>
#include <memory>

#include "absl/container/btree_map.h"
#include "src/common/interruptible_sleeper.h"
#include "curvefs/src/client/common/config.h"
#include "curvefs/src/client/rpcclient/task_excutor.h"
#include "curvefs/src/client/filesystem/meta.h"

namespace curvefs {
namespace client {
namespace filesystem {

using ::curvefs::client::common::DeferSyncOption;

using ::curve::common::RWLock;
using ::curve::common::Mutex;
using ::curve::common::LockGuard;
using ::curve::common::InterruptibleSleeper;
using ::curvefs::client::common::DeferSyncOption;
using ::curvefs::client::rpcclient::MetaServerClientDone;

// NOTE: we only store the defer inodes in nocto scenario.
class DeferInodes {
public:
explicit DeferInodes(bool cto);

bool Add(const std::shared_ptr<InodeWrapper>& inode);

bool Get(Ino ino, std::shared_ptr<InodeWrapper>* inode);

bool Remove(const std::shared_ptr<InodeWrapper>& inode);

size_t Size();

private:
bool cto_;
RWLock rwlock_;
absl::btree_map<Ino, std::shared_ptr<InodeWrapper>> inodes_;
};

class SyncInodeClosure : public MetaServerClientDone {
public:
explicit SyncInodeClosure(const std::shared_ptr<DeferInodes>& inodes,
const std::shared_ptr<InodeWrapper>& inode);

void Run() override;

private:
std::shared_ptr<DeferInodes> inodes_;
std::shared_ptr<InodeWrapper> inode_;
};

class DeferSync {
public:
explicit DeferSync(DeferSyncOption option);
explicit DeferSync(bool cto, DeferSyncOption option);

void Start();

void Stop();

void Push(const std::shared_ptr<InodeWrapper>& inode);

bool IsDefered(Ino ino, InodeAttr* attr);
bool IsDefered(Ino ino, std::shared_ptr<InodeWrapper>* inode);

private:
SyncInodeClosure* NewSyncInodeClosure(
const std::shared_ptr<InodeWrapper>& inode);

void SyncTask();

private:
friend class SyncInodeClosure;

private:
bool cto_;
DeferSyncOption option_;
Mutex mutex_;
std::atomic<bool> running_;
std::thread thread_;
InterruptibleSleeper sleeper_;
std::vector<std::shared_ptr<InodeWrapper>> inodes_;
std::vector<std::shared_ptr<InodeWrapper>> pending_;
std::shared_ptr<DeferInodes> inodes_;
};

} // namespace filesystem
Expand Down
10 changes: 3 additions & 7 deletions curvefs/src/client/filesystem/filesystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ namespace filesystem {

FileSystem::FileSystem(FileSystemOption option, ExternalMember member)
: option_(option), member(member) {
deferSync_ = std::make_shared<DeferSync>(option.deferSyncOption);
deferSync_ = std::make_shared<DeferSync>(option.cto,
option.deferSyncOption);
negative_ = std::make_shared<LookupCache>(option.lookupCacheOption);
dirCache_ = std::make_shared<DirCache>(option.dirCacheOption);
openFiles_ = std::make_shared<OpenFiles>(option_.openFilesOption,
Expand Down Expand Up @@ -257,11 +258,6 @@ CURVEFS_ERROR FileSystem::Lookup(Ino parent,

CURVEFS_ERROR FileSystem::GetAttr(Ino ino, AttrOut* attrOut) {
InodeAttr attr;
if (!option_.cto && deferSync_->IsDefered(ino, &attr)) {
*attrOut = AttrOut(attr);
return CURVEFS_ERROR::OK;
}

auto rc = rpc_->GetAttr(ino, &attr);
if (rc == CURVEFS_ERROR::OK) {
*attrOut = AttrOut(attr);
Expand Down Expand Up @@ -319,7 +315,7 @@ CURVEFS_ERROR FileSystem::Open(Ino ino, FileInfo* fi) {
bool yes = openFiles_->IsOpened(ino, &inode);
if (yes) {
openFiles_->Open(ino, inode);
// fi->keep_cache = 1;
// fi->keep_cache = 1; // FIXME(Wine93): let it works.
return CURVEFS_ERROR::OK;
}

Expand Down
Loading