Skip to content

Commit

Permalink
1.heartbeat reports disk full error and mds set copyset availflag false.
Browse files Browse the repository at this point in the history
2.copyset node leader set readonly when receive copyset availflag false from heartbeat.
3.When a disk error occurs, the copyset node leader calls set_error_and_rollback and selects a new leader.
4.Added new rpc method to delete copyset node with wrong status.

Signed-off-by: liuminjian <[email protected]>
  • Loading branch information
liuminjian committed Nov 6, 2023
1 parent e1aa430 commit 45cf27d
Show file tree
Hide file tree
Showing 34 changed files with 849 additions and 128 deletions.
2 changes: 2 additions & 0 deletions conf/chunkserver.conf
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ chunkfilepool.allocate_percent=80
chunkfilepool.chunk_file_pool_size=1GB
# The thread num for format chunks
chunkfilepool.thread_num=1
# 当chunkserver磁盘使用率超过百分比,heartbeat设置disk状态
chunkfilepool.diskUsagePercentLimit=95

#
# WAL file pool
Expand Down
1 change: 1 addition & 0 deletions proto/chunk.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ enum CHUNK_OP_STATUS {
CHUNK_OP_STATUS_BACKWARD = 10; // 请求的版本落后当前chunk的版本
CHUNK_OP_STATUS_CHUNK_EXIST = 11; // chunk已存在
CHUNK_OP_STATUS_EPOCH_TOO_OLD = 12; // request epoch too old
CHUNK_OP_STATUS_READONLY = 13; // copyset其他节点故障,设为只读
};

message ChunkResponse {
Expand Down
2 changes: 2 additions & 0 deletions proto/copyset.proto
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,6 @@ service CopysetService {
rpc DeleteBrokenCopyset(CopysetRequest) returns (CopysetResponse);

rpc GetCopysetStatus (CopysetStatusRequest) returns (CopysetStatusResponse);

rpc DeleteBrokenCopysetNode (CopysetRequest2) returns (CopysetResponse2);
};
9 changes: 8 additions & 1 deletion proto/heartbeat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,13 @@ message CopysetStatistics {
required uint32 writeIOPS = 4;
}

enum ErrorType {
NORMAL = 0;
DISKFULL = 1;
}

message DiskState {
required uint32 errType = 1;
required ErrorType errType = 1;
required string errMsg = 2;
}

Expand Down Expand Up @@ -145,6 +150,8 @@ message CopySetConf {
// 表示待删除节点。
// chunkserver收到CHANGE_PEER,根据peers,configchangeItem,oldPeer拼出新的conf
optional common.Peer oldPeer = 7;
// copyset availflag
optional bool availflag = 8;
};

enum HeartbeatStatusCode {
Expand Down
10 changes: 10 additions & 0 deletions proto/topology.proto
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,14 @@ message ListUnAvailCopySetsResponse {
repeated common.CopysetInfo copysets = 2;
}

message DeleteBrokenCopysetInChunkServerRequest {
required uint32 chunkServerID = 1;
}

message DeleteBrokenCopysetInChunkServerResponse {
required sint32 statusCode = 1;
}

//TODO(hzsunjianliang): update userPolicy and so on
service TopologyService {
rpc RegistChunkServer(ChunkServerRegistRequest) returns (ChunkServerRegistResponse);
Expand Down Expand Up @@ -609,4 +617,6 @@ service TopologyService {
rpc SetCopysetsAvailFlag(SetCopysetsAvailFlagRequest) returns (SetCopysetsAvailFlagResponse);
rpc ListUnAvailCopySets(ListUnAvailCopySetsRequest) returns (ListUnAvailCopySetsResponse);
rpc ListChunkFormatStatus(ListChunkFormatStatusRequest) returns (ListChunkFormatStatusResponse);
rpc DeleteBrokenCopysetInChunkServer(DeleteBrokenCopysetInChunkServerRequest) returns (DeleteBrokenCopysetInChunkServerResponse);

}
2 changes: 2 additions & 0 deletions src/chunkserver/chunkserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,8 @@ void ChunkServer::InitHeartbeatOptions(
&heartbeatOptions->intervalSec));
LOG_IF(FATAL, !conf->GetUInt32Value("mds.heartbeat_timeout",
&heartbeatOptions->timeout));
LOG_IF(FATAL, !conf->GetUInt32Value("chunkfilepool.diskUsagePercentLimit",
&heartbeatOptions->chunkserverDiskLimit));
}

void ChunkServer::InitRegisterOptions(
Expand Down
26 changes: 23 additions & 3 deletions src/chunkserver/copyset_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ void CopysetNode::on_apply(::braft::Iterator &iter) {
*/
braft::Closure *closure = iter.done();

braft::Iterator* iterPtr = &iter;
std::shared_ptr<IteratorWrapper> wrapperPtr = std::make_shared<IteratorWrapper>(iterPtr);
if (nullptr != closure) {
/**
* 1.closure不是null,那么说明当前节点正常,直接从内存中拿到Op
Expand All @@ -305,7 +307,7 @@ void CopysetNode::on_apply(::braft::Iterator &iter) {
std::shared_ptr<ChunkOpRequest>& opRequest = chunkClosure->request_;
concurrentapply_->Push(opRequest->ChunkId(), ChunkOpRequest::Schedule(opRequest->OpType()), // NOLINT
&ChunkOpRequest::OnApply, opRequest,
iter.index(), doneGuard.release());
iter.index(), doneGuard.release(), wrapperPtr);
} else {
// 获取log entry
butil::IOBuf log = iter.data();
Expand All @@ -322,7 +324,7 @@ void CopysetNode::on_apply(::braft::Iterator &iter) {
auto chunkId = request.chunkid();
concurrentapply_->Push(chunkId, ChunkOpRequest::Schedule(request.optype()), // NOLINT
&ChunkOpRequest::OnApplyFromLog, opReq,
dataStore_, std::move(request), data);
dataStore_, std::move(request), data, wrapperPtr);
}
}
}
Expand All @@ -345,6 +347,14 @@ void CopysetNode::WaitSnapshotDone() {
}
}

bool CopysetNode::ReadOnly() {
return readOnly_.load(std::memory_order_acquire);
}

void CopysetNode::SetReadOnly(bool readOnly) {
readOnly_.store(readOnly, std::memory_order_release);
}

void CopysetNode::save_snapshot_background(::braft::SnapshotWriter *writer,
::braft::Closure *done) {
brpc::ClosureGuard doneGuard(done);
Expand Down Expand Up @@ -545,7 +555,7 @@ void CopysetNode::on_leader_stop(const butil::Status &status) {
}

void CopysetNode::on_error(const ::braft::Error &e) {
LOG(FATAL) << "Copyset: " << GroupIdString()
LOG(ERROR) << "Copyset: " << GroupIdString()
<< ", peer id: " << peerId_.to_string()
<< " meet raft error: " << e;
}
Expand Down Expand Up @@ -978,6 +988,12 @@ void CopysetNode::GetStatus(NodeStatus *status) {
raftNode_->get_status(status);
}

bool CopysetNode::StatusOK() {
NodeStatus status;
GetStatus(&status);
return status.state != braft::STATE_ERROR;
}

void CopysetNode::GetLeaderLeaseStatus(braft::LeaderLeaseStatus *status) {
raftNode_->get_leader_lease_status(status);
}
Expand Down Expand Up @@ -1115,5 +1131,9 @@ SyncChunkThread::~SyncChunkThread() {
Stop();
}

void IteratorWrapper::set_error_and_rollback(size_t ntail, const butil::Status* st) {
iter_->set_error_and_rollback(ntail, st);
}

} // namespace chunkserver
} // namespace curve
22 changes: 22 additions & 0 deletions src/chunkserver/copyset_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,17 @@ class SyncChunkThread : public curve::common::Uncopyable {
CopysetNode* node_;
};

// 用于unitest mock braft::Iterator
class IteratorWrapper {
public:
IteratorWrapper() {}
IteratorWrapper(braft::Iterator *iter): iter_(iter) {}
~IteratorWrapper() {}
virtual void set_error_and_rollback(size_t ntail = 1, const butil::Status* st = NULL);
private:
braft::Iterator *iter_;
};

/**
* 一个Copyset Node就是一个复制组的副本
*/
Expand Down Expand Up @@ -285,6 +296,11 @@ class CopysetNode : public braft::StateMachine,
*/
virtual void GetStatus(NodeStatus *status);

/**
* @brief: 判断copyset node的status是否为ERROR
*/
virtual bool StatusOK() ;

/**
* @brief: get raft node leader lease status
* @param status[out]: raft node leader lease status
Expand Down Expand Up @@ -469,6 +485,10 @@ class CopysetNode : public braft::StateMachine,

void WaitSnapshotDone();

bool ReadOnly();

void SetReadOnly(bool readOnly);

private:
inline std::string GroupId() {
return ToGroupId(logicPoolId_, copysetId_);
Expand Down Expand Up @@ -545,6 +565,8 @@ class CopysetNode : public braft::StateMachine,
uint32_t checkSyncingIntervalMs_;
// async snapshot future object
std::future<void> snapshotFuture_;
// copyset readonly flag
std::atomic<bool> readOnly_;
};

} // namespace chunkserver
Expand Down
40 changes: 40 additions & 0 deletions src/chunkserver/copyset_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,5 +232,45 @@ void CopysetServiceImpl::GetCopysetStatus(RpcController *controller,
request->copysetid());
}

void CopysetServiceImpl::DeleteBrokenCopysetNode(RpcController *controller,
const CopysetRequest2 *request,
CopysetResponse2 *response,
Closure *done) {
(void)controller;
brpc::ClosureGuard doneGuard(done);

Copyset copyset;

LOG(INFO) << "Received DeleteBrokenCopysetNode request";

for (int i = 0; i < request->copysets_size(); ++i) {
copyset = request->copysets(i);

// 判断copyset是否存在
auto nodePtr = copysetNodeManager_->GetCopysetNode(copyset.logicpoolid(),
copyset.copysetid());
if (nullptr == nodePtr) {
continue;
}

NodeStatus status;
nodePtr->GetStatus(&status);
// 只删除状态有问题的copyset node
if (status.state != braft::State::STATE_ERROR) {
continue;
}

copysetNodeManager_->DeleteCopysetNode(copyset.logicpoolid(), copyset.copysetid());

LOG(INFO) << "Delete copyset node"
<< ToGroupIdString(copyset.logicpoolid(),
copyset.copysetid())
<< " success.";
}

response->set_status(COPYSET_OP_STATUS::COPYSET_OP_STATUS_SUCCESS);
LOG(INFO) << "DeleteBrokenCopysetNode " << request->copysets().size() << " copysets success";
}

} // namespace chunkserver
} // namespace curve
8 changes: 8 additions & 0 deletions src/chunkserver/copyset_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ class CopysetServiceImpl : public CopysetService {
CopysetStatusResponse *response,
Closure *done);

/**
* 删除状态ERROR的copyset node
*/
void DeleteBrokenCopysetNode(RpcController *controller,
const CopysetRequest2 *request,
CopysetResponse2 *response,
Closure *done);

private:
// 复制组管理者
CopysetNodeManager* copysetNodeManager_;
Expand Down
10 changes: 9 additions & 1 deletion src/chunkserver/heartbeat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ int Heartbeat::BuildRequest(HeartbeatRequest* req) {
*/
curve::mds::heartbeat::DiskState* diskState =
new curve::mds::heartbeat::DiskState();
diskState->set_errtype(0);
diskState->set_errtype(curve::mds::heartbeat::NORMAL);
diskState->set_errmsg("");
req->set_allocated_diskstate(diskState);

Expand Down Expand Up @@ -295,6 +295,11 @@ int Heartbeat::BuildRequest(HeartbeatRequest* req) {
req->set_diskcapacity(cap);
req->set_diskused(cap - avail);

if (options_.chunkserverDiskLimit > 0 && req->diskused() * 100 / cap > options_.chunkserverDiskLimit) {
diskState->set_errtype(curve::mds::heartbeat::DISKFULL);
diskState->set_errmsg("Disk near full");
}

std::vector<CopysetNodePtr> copysets;
copysetMan_->GetAllCopysetNodes(&copysets);

Expand Down Expand Up @@ -434,6 +439,9 @@ int Heartbeat::ExecTask(const HeartbeatResponse& response) {
continue;
}

// 判断copyset是否avail,否则设置readonly
copyset->SetReadOnly(!conf.availflag());

// 解析该chunkserver上的copyset是否需要删除
// 需要删除则清理copyset
if (HeartbeatHelper::NeedPurge(csEp_, conf, copyset)) {
Expand Down
1 change: 1 addition & 0 deletions src/chunkserver/heartbeat.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ struct HeartbeatOptions {
uint32_t timeout;
CopysetNodeManager* copysetNodeManager;
ScanManager* scanManager;
uint32_t chunkserverDiskLimit;

std::shared_ptr<LocalFileSystem> fs;
std::shared_ptr<FilePool> chunkFilePool;
Expand Down
Loading

0 comments on commit 45cf27d

Please sign in to comment.