Skip to content

Commit

Permalink
fix review
Browse files Browse the repository at this point in the history
Signed-off-by: wanghai01 <[email protected]>
  • Loading branch information
SeanHai committed Dec 5, 2023
1 parent f763504 commit b7d6ff1
Show file tree
Hide file tree
Showing 42 changed files with 401 additions and 532 deletions.
2 changes: 1 addition & 1 deletion curvefs/conf/metaserver.conf
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ storage.rocksdb.perf_sampling_ratio=0
# padding its into inode (default: 25000, about 25000 * 41 (byte) = 1MB)
storage.s3_meta_inside_inode.limit_size=25000
# TTL(millisecond) for tx lock
storage.rocksdb.tx_lock_ttl_ms=5000
storage.tx_lock_ttl_ms=5000

# recycle options
# metaserver scan recycle period, default 1h
Expand Down
6 changes: 4 additions & 2 deletions curvefs/proto/mds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,13 @@ message CommitTxResponse {
required FSStatusCode statusCode = 1;
}

message TsoRequest {}
message TsoRequest {
required uint32 fsId = 1;
}

message TsoResponse {
required FSStatusCode statusCode = 1;
optional uint64 sn = 2;
optional uint64 ts = 2; // transaction sequence number
optional uint64 timestamp = 3;
}

Expand Down
8 changes: 4 additions & 4 deletions curvefs/src/client/client_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ CURVEFS_ERROR RenameOperator::PrewriteRenameTx(

CURVEFS_ERROR RenameOperator::PrewriteTx() {
uint64_t timestamp;
auto rc = mdsClient_->Tso(&startTs_, &timestamp);
auto rc = mdsClient_->Tso(srcDentry_.fsid(), &startTs_, &timestamp);
if (rc != FSStatusCode::OK) {
LOG_ERROR("start Tso", rc);
return CURVEFS_ERROR::INTERNAL;
Expand All @@ -333,8 +333,8 @@ CURVEFS_ERROR RenameOperator::PrewriteTx() {
txLockIn.set_timestamp(timestamp);

if (!metaClient_->GetPartitionId(dentry_.fsid(), dentry_.parentinodeid(),
&srcPartitionId_) || !metaClient_->GetPartitionId(newDentry_.fsid(),
newDentry_.parentinodeid(), &dstPartitionId_)) {
&srcPartitionId_) || !metaClient_->GetPartitionId(newDentry_.fsid(),
newDentry_.parentinodeid(), &dstPartitionId_)) {
LOG_ERROR("GetPartitionId", rc);
return CURVEFS_ERROR::INTERNAL;
}
Expand Down Expand Up @@ -362,7 +362,7 @@ CURVEFS_ERROR RenameOperator::PrewriteTx() {
CURVEFS_ERROR RenameOperator::CommitTxV2() {
uint64_t commitTs;
uint64_t timestamp;
auto rc = mdsClient_->Tso(&commitTs, &timestamp);
auto rc = mdsClient_->Tso(srcDentry_.fsid(), &commitTs, &timestamp);
if (rc != FSStatusCode::OK) {
LOG_ERROR("CommitTxV2 Tso", rc);
return CURVEFS_ERROR::INTERNAL;
Expand Down
8 changes: 4 additions & 4 deletions curvefs/src/client/dentry_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ CURVEFS_ERROR DentryCacheManagerImpl::GetDentry(uint64_t parent,
while (ret == MetaStatusCode::TX_KEY_LOCKED) {
uint64_t ts = 0;
uint64_t timestamp = 0;
if (mdsClient_->Tso(&ts, &timestamp) != FSStatusCode::OK) {
if (mdsClient_->Tso(fsId_, &ts, &timestamp) != FSStatusCode::OK) {
LOG(ERROR) << "GetDentry Tso failed, parent = " << parent
<< ", name = " << name;
return CURVEFS_ERROR::INTERNAL;
Expand Down Expand Up @@ -130,7 +130,7 @@ CURVEFS_ERROR DentryCacheManagerImpl::CreateDentry(const Dentry &dentry) {
while (ret == MetaStatusCode::TX_KEY_LOCKED) {
uint64_t ts = 0;
uint64_t timestamp = 0;
if (mdsClient_->Tso(&ts, &timestamp) != FSStatusCode::OK) {
if (mdsClient_->Tso(fsId_, &ts, &timestamp) != FSStatusCode::OK) {
LOG(ERROR) << "CreateDentry Tso failed, dentry = "
<< dentry.ShortDebugString();
return CURVEFS_ERROR::INTERNAL;
Expand Down Expand Up @@ -166,7 +166,7 @@ CURVEFS_ERROR DentryCacheManagerImpl::DeleteDentry(uint64_t parent,
while (ret == MetaStatusCode::TX_KEY_LOCKED) {
uint64_t ts = 0;
uint64_t timestamp = 0;
if (mdsClient_->Tso(&ts, &timestamp) != FSStatusCode::OK) {
if (mdsClient_->Tso(fsId_, &ts, &timestamp) != FSStatusCode::OK) {
LOG(ERROR) << "DeleteDentry Tso failed, parent = " << parent
<< ", name = " << name;
return CURVEFS_ERROR::INTERNAL;
Expand Down Expand Up @@ -222,7 +222,7 @@ CURVEFS_ERROR DentryCacheManagerImpl::ListDentry(uint64_t parent,
if (ret == MetaStatusCode::TX_KEY_LOCKED) {
uint64_t ts = 0;
uint64_t timestamp = 0;
if (mdsClient_->Tso(&ts, &timestamp) != FSStatusCode::OK) {
if (mdsClient_->Tso(fsId_, &ts, &timestamp) != FSStatusCode::OK) {
LOG(ERROR) << "ListDentry Tso failed, parent = " << parent;
return CURVEFS_ERROR::INTERNAL;
}
Expand Down
10 changes: 7 additions & 3 deletions curvefs/src/client/rpcclient/mds_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,30 +682,34 @@ FSStatusCode MdsClientImpl::CommitTxWithLock(
return CommitTx(request);
}

FSStatusCode MdsClientImpl::Tso(uint64_t* sn, uint64_t* timestamp) {
FSStatusCode MdsClientImpl::Tso(
uint32_t fsId, uint64_t* ts, uint64_t* timestamp) {
auto task = RPCTask {
(void)addrindex;
(void)rpctimeoutMS;
mdsClientMetric_.tso.qps.count << 1;
LatencyUpdater updater(&mdsClientMetric_.tso.latency);
TsoRequest request;
request.set_fsid(fsId);
TsoResponse response;
mdsbasecli_->Tso(request, &response, cntl, channel);
if (cntl->Failed()) {
mdsClientMetric_.tso.eps.count << 1;
LOG(WARNING) << "Tso Failed, errorcode = " << cntl->ErrorCode()
<< ", error content:" << cntl->ErrorText()
<< ", fsId = " << fsId
<< ", log id = " << cntl->log_id();
return -cntl->ErrorCode();
}

FSStatusCode ret = response.statuscode();
if (ret != FSStatusCode::OK) {
LOG(ERROR) << "Tso: errcode = " << ret
<< ", errmsg = " << FSStatusCode_Name(ret);
<< ", errmsg = " << FSStatusCode_Name(ret)
<< ", fsId = " << fsId;
return ret;
} else {
*sn = response.sn();
*ts = response.ts();
*timestamp = response.timestamp();
}
return ret;
Expand Down
5 changes: 3 additions & 2 deletions curvefs/src/client/rpcclient/mds_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ class MdsClient {
const std::string& uuid,
uint64_t sequence) = 0;

virtual FSStatusCode Tso(uint64_t* sn, uint64_t* timestamp) = 0;
virtual FSStatusCode Tso(uint32_t fsId, uint64_t* ts,
uint64_t* timestamp) = 0;

// allocate block group
virtual SpaceErrCode AllocateVolumeBlockGroup(
Expand Down Expand Up @@ -226,7 +227,7 @@ class MdsClientImpl : public MdsClient {
const std::string& uuid,
uint64_t sequence) override;

FSStatusCode Tso(uint64_t* sn, uint64_t* timestamp) override;
FSStatusCode Tso(uint32_t fsId, uint64_t* ts, uint64_t* timestamp) override;

// allocate block group
SpaceErrCode AllocateVolumeBlockGroup(
Expand Down
18 changes: 18 additions & 0 deletions curvefs/src/mds/codec/codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ namespace codec {

using ::curve::common::EncodeBigEndian;
using ::curve::common::EncodeBigEndian_uint32;
using ::curve::common::DecodeBigEndian_uint32;
using ::curvefs::mds::BLOCKGROUP_KEY_END;
using ::curvefs::mds::BLOCKGROUP_KEY_PREFIX;
using ::curvefs::mds::COMMON_PREFIX_LENGTH;
Expand Down Expand Up @@ -78,6 +79,23 @@ std::string EncodeFsUsageKey(const std::string& fsName) {
return key;
}

std::string EncodeTsKey(uint32_t fsId) {
static const size_t size = COMMON_PREFIX_LENGTH + sizeof(fsId);

std::string key;
key.resize(size);

memcpy(&key[0], TS_INFO_KEY_PREFIX, COMMON_PREFIX_LENGTH);

EncodeBigEndian_uint32(&key[COMMON_PREFIX_LENGTH], fsId);

return key;
}

uint32_t DecodeTsKey(const std::string& key) {
return DecodeBigEndian_uint32(&key[COMMON_PREFIX_LENGTH]);
}

} // namespace codec
} // namespace mds
} // namespace curvefs
4 changes: 4 additions & 0 deletions curvefs/src/mds/codec/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ std::string EncodeBlockGroupKey(uint32_t fsId, uint64_t offset);

std::string EncodeFsUsageKey(const std::string& fsName);

std::string EncodeTsKey(uint32_t fsId);

uint32_t DecodeTsKey(const std::string& key);

} // namespace codec
} // namespace mds
} // namespace curvefs
Expand Down
3 changes: 2 additions & 1 deletion curvefs/src/mds/common/storage_key.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const char BLOCKGROUP_KEY_PREFIX[] = "fs_04";
const char BLOCKGROUP_KEY_END[] = "fs_05";
const char FS_USAGE_KEY_PREFIX[] = "fs_05";
const char FS_USAGE_KEY_END[] = "fs_06";
const char TS_INFO_KEY_PREFIX[] = "fs_06";
const char TS_INFO_KEY_END[] = "fs_07";

constexpr uint32_t COMMON_PREFIX_LENGTH = 5;

Expand All @@ -65,7 +67,6 @@ const char MEMCACHE_CLUSTER_KEY_PREFIX[] = "fs_1009";
const char MEMCACHE_CLUSTER_KEY_END[] = "fs_1010";
const char FS_2_MEMCACHE_CLUSTER_KEY_PREFIX[] = "fs_1010";
const char FS_2_MEMCACHE_CLUSTER_KEY_END[] = "fs_1011";
const char TS_INFO_KEY[] = "fs_1012";

constexpr uint32_t TOPOLOGY_PREFIX_LENGTH = 7;

Expand Down
15 changes: 11 additions & 4 deletions curvefs/src/mds/fs_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1297,11 +1297,18 @@ bool FsManager::FillVolumeInfo(common::Volume* volume) {
}

void FsManager::Tso(const TsoRequest* request, TsoResponse* response) {
if (topoManager_->Tso(request, response)) {
response->set_statuscode(FSStatusCode::OK);
} else {
response->set_statuscode(FSStatusCode::INTERNAL_ERROR);
uint64_t ts;
uint64_t timestamp;
auto ret = fsStorage_->Tso(request->fsid(), &ts, &timestamp);
if (ret != FSStatusCode::OK) {
LOG(ERROR) << "Tso fail, fsid = " << request->fsid()
<< ", ret = " << FSStatusCode_Name(ret);
response->set_statuscode(ret);
return;
}
response->set_ts(ts);
response->set_timestamp(timestamp);
response->set_statuscode(ret);
}

} // namespace mds
Expand Down
82 changes: 81 additions & 1 deletion curvefs/src/mds/fs_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "curvefs/src/mds/codec/codec.h"
#include "curvefs/src/mds/metric/fs_metric.h"
#include "src/common/timeutility.h"

namespace curvefs {
namespace mds {
Expand Down Expand Up @@ -186,6 +187,22 @@ FSStatusCode MemoryFsStorage::DeleteFsUsage(const std::string& fsName) {
return FSStatusCode::OK;
}

FSStatusCode MemoryFsStorage::Tso(uint32_t fsId, uint64_t* ts,
uint64_t* timestamp) {
WriteLockGuard lock(tsLock_);
*timestamp = curve::common::TimeUtility::GetTimeofDayMs();
auto it = tsMap_.find(fsId);
if (it == tsMap_.end()) {
*ts = 1;
} else {
*ts = it->second.ts() + 1;
}
TS tsInfo;
tsInfo.set_ts(*ts);
tsMap_[fsId] = tsInfo;
return FSStatusCode::OK;
}

PersisKVStorage::PersisKVStorage(
const std::shared_ptr<curve::kvstorage::KVStorageClient>& storage)
: storage_(storage),
Expand All @@ -208,7 +225,16 @@ FSStatusCode PersisKVStorage::Get(uint64_t fsId, FsInfoWrapper* fsInfo) {

bool PersisKVStorage::Init() {
bool ret = LoadAllFs();
return ret;
if (!ret) {
LOG(ERROR) << "Load all fs failed";
return false;
}
ret = LoadAllTsInfo();
if (!ret) {
LOG(ERROR) << "Load all tsInfo failed";
return false;
}
return true;
}

void PersisKVStorage::Uninit() {}
Expand Down Expand Up @@ -419,6 +445,30 @@ bool PersisKVStorage::LoadAllFs() {
return true;
}

bool PersisKVStorage::LoadAllTsInfo() {
WriteLockGuard lock(tsLock_);
tsMap_.clear();
std::vector<std::pair<std::string, std::string>> out;
int err = storage_->List(TS_INFO_KEY_PREFIX,
TS_INFO_KEY_END, &out);

if (err != EtcdErrCode::EtcdOK) {
LOG(ERROR) << "List all tsInfo from etcd failed, error: " << err;
return false;
}

for (const auto& kv : out) {
TS tsInfo;
uint32_t fsId = codec::DecodeTsKey(kv.first);
if (!codec::DecodeProtobufMessage(kv.second, &tsInfo)) {
LOG(ERROR) << "Decode ts info failed, fsId: " << fsId;
return false;
}
tsMap_[fsId] = tsInfo;
}
return true;
}

bool PersisKVStorage::FsIDToName(uint64_t fsId, std::string* name) const {
ReadLockGuard lock(idToNameLock_);
auto iter = idToName_.find(fsId);
Expand Down Expand Up @@ -570,5 +620,35 @@ FSStatusCode PersisKVStorage::DeleteFsUsage(const std::string& fsName) {
return FSStatusCode::OK;
}

FSStatusCode PersisKVStorage::Tso(uint32_t fsId, uint64_t* ts,
uint64_t* timestamp) {
WriteLockGuard lock(tsLock_);
*timestamp = curve::common::TimeUtility::GetTimeofDayMs();
auto it = tsMap_.find(fsId);
if (it == tsMap_.end()) {
*ts = 1;
} else {
*ts = it->second.ts() + 1;
}
TS tsInfo;
tsInfo.set_ts(*ts);
// persist to storage
std::string key = codec::EncodeTsKey(fsId);
std::string value;
if (!codec::EncodeProtobufMessage(tsInfo, &value)) {
LOG(ERROR) << "Encode tsInfo failed, " << tsInfo.ShortDebugString();
return FSStatusCode::INTERNAL_ERROR;
}

int ret = storage_->Put(key, value);
if (ret != EtcdErrCode::EtcdOK) {
LOG(ERROR) << "Put key-value to storage failed, tsInfo, "
<< tsInfo.ShortDebugString();
return FSStatusCode::INTERNAL_ERROR;
}
tsMap_[fsId] = tsInfo;
return FSStatusCode::OK;
}

} // namespace mds
} // namespace curvefs
Loading

0 comments on commit b7d6ff1

Please sign in to comment.