Skip to content

Commit

Permalink
curvefs/client: fix trash bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
wuhongsong committed Nov 20, 2023
1 parent fea6c06 commit e4441a7
Show file tree
Hide file tree
Showing 11 changed files with 233 additions and 11 deletions.
2 changes: 1 addition & 1 deletion curvefs/src/metaserver/copyset/copyset_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ bool CopysetNode::Init(const CopysetNodeOptions& options) {
LOG(ERROR) << "Failed to create meta store";
return false;
}

InitRaftNodeOptions();

butil::ip_t ip;
Expand Down
18 changes: 11 additions & 7 deletions curvefs/src/metaserver/inode_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ MetaStatusCode InodeManager::DeleteInode(uint32_t fsId, uint64_t inodeId,
MetaStatusCode InodeManager::UpdateInode(const UpdateInodeRequest& request,
int64_t logIndex) {
CHECK_APPLIED();
VLOG(9) << "update inode, fsid: " << request.fsid()
VLOG(0) << "whs update inode, fsid: " << request.fsid()
<< ", inodeid: " << request.inodeid();
NameLockGuard lg(inodeLock_,
GetInodeLockName(request.fsid(), request.inodeid()));
Expand Down Expand Up @@ -388,11 +388,6 @@ MetaStatusCode InodeManager::UpdateInode(const UpdateInodeRequest& request,
}
}

if (s3NeedTrash) {
trash_->Add(old.fsid(), old.inodeid(), old.dtime());
--(*type2InodeNum_)[old.type()];
}

const S3ChunkInfoMap &map2add = request.s3chunkinfoadd();
const S3ChunkInfoList *list2add;
VLOG(9) << "UpdateInode inode " << old.inodeid() << " map2add size "
Expand Down Expand Up @@ -443,7 +438,16 @@ MetaStatusCode InodeManager::UpdateInode(const UpdateInodeRequest& request,
return MetaStatusCode::STORAGE_INTERNAL_ERROR;
}
}
VLOG(9) << "UpdateInode success, " << request.ShortDebugString();

if (s3NeedTrash) {
VLOG(0) << "whs add need trash, " << request.ShortDebugString();

inodeStorage_->UpdateDeletingKey(old, logIndex);
trash_->Add(old.fsid(), old.inodeid(), old.dtime());
--(*type2InodeNum_)[old.type()];
}

VLOG(0) << "whs UpdateInode success, " << request.ShortDebugString();
return MetaStatusCode::OK;
}

Expand Down
38 changes: 38 additions & 0 deletions curvefs/src/metaserver/inode_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,44 @@ MetaStatusCode InodeStorage::Insert(const Inode& inode, int64_t logIndex) {
return MetaStatusCode::STORAGE_INTERNAL_ERROR;
}

MetaStatusCode InodeStorage::UpdateDeletingKey(const Inode& inode, int64_t logIndex) {
WriteLockGuard lg(rwLock_);
Key4Inode key(inode.fsid(), inode.inodeid());
std::string skey = conv_.SerializeToString(key);
// std::string skeyDeleting = absl::StrCat(DELETING_PREFIX, skey);
VLOG(0) << "need UpdateDeletingKey, " << inode.inodeid();

const char* step = "Begin transaction";
std::shared_ptr<storage::StorageTransaction> txn;
txn = kvStorage_->BeginTransaction();
if (txn == nullptr) {
LOG(ERROR) << "Begin transaction failed";
return MetaStatusCode::STORAGE_INTERNAL_ERROR;
}
auto rc = txn->HSetDeleting(table4Inode_, skey , inode);
// auto rc = txn->HSet(table4Inode_, skeyDeleting, inode);
step = "insert inode ";
if (rc.ok()) {
rc = SetAppliedIndex(txn.get(), logIndex);
step = "Insert applied index to transaction";
}
if (rc.ok()) {
rc = DeleteInternal(txn.get(), key);;
step = "delete inode ";
}
if (rc.ok()) {
VLOG(0) << "whs UpdateDeletingKey ok, " << inode.inodeid();

return MetaStatusCode::OK;
}
LOG(ERROR) << step << "whs failed, status = " << rc.ToString();
if (txn != nullptr && !txn->Rollback().ok()) {
LOG(ERROR) << "Rollback delete inode transaction failed, status = "
<< rc.ToString();
}
return MetaStatusCode::STORAGE_INTERNAL_ERROR;
}

MetaStatusCode InodeStorage::Get(const Key4Inode& key, Inode* inode) {
ReadLockGuard lg(rwLock_);
std::string skey = conv_.SerializeToString(key);
Expand Down
9 changes: 9 additions & 0 deletions curvefs/src/metaserver/inode_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "curvefs/src/metaserver/storage/utils.h"
#include "src/common/concurrent/rw_lock.h"

#define DELETING_PREFIX "deleting_"
namespace curvefs {
namespace metaserver {

Expand Down Expand Up @@ -76,6 +77,14 @@ class InodeStorage {
*/
MetaStatusCode Insert(const Inode& inode, int64_t logIndex);

/**
* @brief update deleting inode key in storage
* @param[in] inode: the inode want to update
* @param[in] logIndex: the index of raft log
* @return
*/
MetaStatusCode UpdateDeletingKey(const Inode& inode, int64_t logIndex);

/**
* @brief get inode from storage
* @param[in] key: the key of inode want to get
Expand Down
29 changes: 28 additions & 1 deletion curvefs/src/metaserver/metastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ using KVStorage = ::curvefs::metaserver::storage::KVStorage;
using Key4S3ChunkInfoList = ::curvefs::metaserver::storage::Key4S3ChunkInfoList;

using ::curvefs::metaserver::storage::MemoryStorage;
using ::curvefs::metaserver::storage::NameGenerator;
using ::curvefs::metaserver::storage::RocksDBStorage;
using ::curvefs::metaserver::storage::StorageOptions;

Expand Down Expand Up @@ -87,6 +88,8 @@ MetaStoreImpl::MetaStoreImpl(copyset::CopysetNode *node,
storageOptions_(storageOptions) {}

bool MetaStoreImpl::Load(const std::string &pathname) {
LOG(ERROR) << "whs load start";

// Load from raft snap file to memory
WriteLockGuard writeLockGuard(rwLock_);
MetaStoreFStream fstream(&partitionMap_, kvStorage_,
Expand Down Expand Up @@ -147,6 +150,13 @@ bool MetaStoreImpl::Load(const std::string &pathname) {
}

startCompacts();

LOG(ERROR) << "InitStorage start";

BuildTrashList();
LOG(ERROR) << "InitStorage start02";

LOG(ERROR) << "whs load end";
return true;
}

Expand Down Expand Up @@ -859,7 +869,24 @@ bool MetaStoreImpl::InitStorage() {
return false;
}

return kvStorage_->Open();

if (!kvStorage_->Open()) {
return false;
}

return true;
}

void MetaStoreImpl::BuildTrashList() {

std::shared_ptr<NameGenerator> nameGen = std::make_shared<NameGenerator>(0);

std::shared_ptr<InodeStorage> inodeStorage =
std::make_shared<InodeStorage>(kvStorage_, nameGen, 1);

auto trash = std::make_shared<TrashImpl>(inodeStorage);

TrashManager::GetInstance().BuildAbortTrash(kvStorage_, trash);
}

} // namespace metaserver
Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/metaserver/metastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ class MetaStoreImpl : public MetaStore {
// REQUIRES: rwLock_ is held with write permission
bool ClearInternal();

void BuildTrashList();

private:
RWLock rwLock_; // protect partitionMap_
std::shared_ptr<KVStorage> kvStorage_;
Expand Down
71 changes: 70 additions & 1 deletion curvefs/src/metaserver/storage/rocksdb_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ std::string RocksDBStorage::ToInternalKey(const std::string& name,
std::ostringstream oss;
oss << iname << kDelimiter_ << key;
std::string ikey = oss.str();
VLOG(9) << "ikey = " << ikey << " (ordered = " << ordered
VLOG(0) << "whs ikey = " << ikey << " (ordered = " << ordered
<< ", name = " << name << ", key = " << key << ")"
<< ", size = " << ikey.size();
return ikey;
Expand Down Expand Up @@ -241,6 +241,32 @@ Status RocksDBStorage::Set(const std::string& name,
return ToStorageStatus(s);
}

Status RocksDBStorage::SetDeleting(const std::string& name,
const std::string& key,
const ValueType& value,
bool ordered) {
std::string svalue;
if (!inited_) {
return Status::DBClosed();
} else if (!value.SerializeToString(&svalue)) {
return Status::SerializedFailed();
}

auto handle = GetColumnFamilyHandle(ordered);


std::string ikey = ToInternalKey(name, key, ordered);
std::string deletingKey = "deleting_" + ikey;
VLOG(0) << "whs set deleting key = " << deletingKey << ", ikey " << ikey;
RocksDBPerfGuard guard(OP_PUT);
ROCKSDB_NAMESPACE::Status s = InTransaction_ ?
txn_->Put(handle, deletingKey, svalue) :
db_->Put(dbWriteOptions_, handle, deletingKey, svalue);
return ToStorageStatus(s);

}


Status RocksDBStorage::Del(const std::string& name,
const std::string& key,
bool ordered) {
Expand Down Expand Up @@ -273,6 +299,14 @@ std::shared_ptr<Iterator> RocksDBStorage::GetAll(const std::string& name,
this, std::move(ikey), 0, status, ordered);
}

std::shared_ptr<Iterator> RocksDBStorage::GetPrefix(const std::string& prefix,
bool ordered) {
int status = inited_ ? 0 : -1;
return std::make_shared<RocksDBStorageIterator>(
this, std::move(prefix), 0, status, ordered);
}


size_t RocksDBStorage::Size(const std::string& name, bool ordered) {
auto iterator = GetAll(name, ordered);
if (iterator->Status() != 0) {
Expand Down Expand Up @@ -501,6 +535,41 @@ bool RocksDBStorage::Recover(const std::string& dir) {
return false;
}



// 创建迭代器
rocksdb::Iterator* it1 = db_->NewIterator(rocksdb::ReadOptions());
// 遍历键
for (it1->SeekToFirst(); it1->Valid(); it1->Next()) {
std::string key = it1->key().ToString();
LOG(ERROR) << "whs recovery: " << key;
}

if (!it1->status().ok()) {
LOG(ERROR) << "whs recovery: ";

}
// 释放迭代器和数据库资源
delete it1;

/*
rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
// 设置前缀
std::string prefix = "deleting_";
it->Seek(prefix);
// 遍历获取以指定前缀开头的键值对
while (it->Valid() && it->key().starts_with(prefix)) {
std::string key = it->key().ToString();
std::string value = it->value().ToString();
LOG(ERROR) << "whs recovery: " << key;
it->Next();
}
delete it;
*/


LOG(INFO) << "Recovered rocksdb from `" << dir << "`";
return true;
}
Expand Down
17 changes: 17 additions & 0 deletions curvefs/src/metaserver/storage/rocksdb_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ class RocksDBStorage : public KVStorage, public StorageTransaction {
const std::string& key,
const ValueType& value) override;

Status HSetDeleting(const std::string& name,
const std::string& key,
const ValueType& value) override;

Status HDel(const std::string& name, const std::string& key) override;

std::shared_ptr<Iterator> HGetAll(const std::string& name) override;
Expand All @@ -100,6 +104,8 @@ class RocksDBStorage : public KVStorage, public StorageTransaction {

Status HClear(const std::string& name) override;

std::shared_ptr<Iterator> GetPrefix(const std::string& prefix,
bool ordered) override;
// ordered
Status SGet(const std::string& name,
const std::string& key,
Expand Down Expand Up @@ -156,6 +162,11 @@ class RocksDBStorage : public KVStorage, public StorageTransaction {
const ValueType& value,
bool ordered);

Status SetDeleting(const std::string& name,
const std::string& key,
const ValueType& value,
bool ordered);

Status Del(const std::string& name,
const std::string& key,
bool ordered);
Expand Down Expand Up @@ -219,6 +230,12 @@ inline Status RocksDBStorage::HSet(const std::string& name,
return Set(name, key, value, false);
}

inline Status RocksDBStorage::HSetDeleting(const std::string& name,
const std::string& key,
const ValueType& value) {
return SetDeleting(name, key, value, false);
}

inline Status RocksDBStorage::HDel(const std::string& name,
const std::string& key) {
return Del(name, key, false);
Expand Down
9 changes: 9 additions & 0 deletions curvefs/src/metaserver/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ class BaseStorage {
const std::string& key,
const ValueType& value) = 0;

virtual Status HSetDeleting(const std::string& name,
const std::string& key,
const ValueType& value){return Status::NotSupported();};

virtual std::shared_ptr<Iterator> GetPrefix(
const std::string& prefix, bool ordered) {
return nullptr;
}

virtual Status HDel(const std::string& name, const std::string& key) = 0;

virtual std::shared_ptr<Iterator> HGetAll(const std::string& name) = 0;
Expand Down
5 changes: 4 additions & 1 deletion curvefs/src/metaserver/trash.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ void TrashOption::InitTrashOptionFromConf(std::shared_ptr<Configuration> conf) {
}

void TrashImpl::Init(const TrashOption &option) {
VLOG(0) << "whs init trash, ";

options_ = option;
s3Adaptor_ = option.s3Adaptor;
mdsClient_ = option.mdsClient;
Expand All @@ -66,7 +68,7 @@ void TrashImpl::Add(uint32_t fsId, uint64_t inodeId, uint32_t dtime) {
return;
}
trashItems_.push_back(item);
VLOG(6) << "Add Trash Item success, item.fsId = " << item.fsId
VLOG(0) << "whs Add Trash Item success, item.fsId = " << item.fsId
<< ", item.inodeId = " << item.inodeId
<< ", item.dtime = " << item.dtime;
}
Expand All @@ -78,6 +80,7 @@ void TrashImpl::ScanTrash() {
LockGuard lgItems(itemsMutex_);
trashItems_.swap(temp);
}
VLOG(0) << "whs scan trash, ";

for (auto it = temp.begin(); it != temp.end();) {
if (isStop_) {
Expand Down
Loading

0 comments on commit e4441a7

Please sign in to comment.