Skip to content

Commit

Permalink
Add asynchronous operations and access to physical resources
Browse files Browse the repository at this point in the history
  • Loading branch information
MCKZX-llx committed May 6, 2024
1 parent 667fca7 commit eff9872
Show file tree
Hide file tree
Showing 13 changed files with 403 additions and 163 deletions.
2 changes: 2 additions & 0 deletions protos/Crane.proto
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ message TaskStatusChangeReply {

message CranedRegisterRequest {
string craned_id = 1;
double cpu = 2;
uint64 memory = 3;
}

message CranedRegisterReply {
Expand Down
3 changes: 2 additions & 1 deletion protos/PublicDefs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ enum CranedState{
CRANE_MIX = 1;
CRANE_ALLOC = 2;
CRANE_DOWN = 3;
CRANE_DRAIN = 4;
}

enum TaskStatus {
Expand Down Expand Up @@ -218,7 +219,7 @@ message PartitionInfo {
uint64 avail_mem = 10;
uint64 alloc_mem = 11;

uint32 priority = 12;
int64 priority = 12;
repeated string allow_list = 13;
repeated string deny_list = 14;
}
Expand Down
77 changes: 49 additions & 28 deletions src/CraneCtld/CranedKeeper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,15 +303,26 @@ CranedKeeper::CranedKeeper(uint32_t node_num) : m_cq_closed_(false) {
i);
}

m_period_connect_thread_ =
std::thread(&CranedKeeper::PeriodConnectCranedThreadFunc_, this);
std::shared_ptr<uvw::loop> uvw_keeper_loop = uvw::loop::create();
m_period_connect_handle_ = uvw_keeper_loop->resource<uvw::timer_handle>();
m_period_connect_handle_->on<uvw::timer_event>(
[this](const uvw::timer_event &, uvw::timer_handle &) {
PeriodConnectCranedTimerCb_();
});
m_period_connect_handle_->start(std::chrono::milliseconds(300 * 3),
std::chrono::milliseconds(300));

m_craned_keeper_thread_ =
std::thread([this, loop = std::move(uvw_keeper_loop)]() {
PeriodConnectCranedThread_(loop);
});
}

CranedKeeper::~CranedKeeper() {
Shutdown();

for (auto &cq_thread : m_cq_thread_vec_) cq_thread.join();
m_period_connect_thread_.join();
m_craned_keeper_thread_.join();

CRANE_TRACE("CranedKeeper has been closed.");
}
Expand Down Expand Up @@ -622,7 +633,7 @@ void CranedKeeper::PutNodeIntoUnavailList(const CranedId &crane_id) {
m_unavail_craned_set_.emplace(crane_id);
}

void CranedKeeper::ResetConnection(const CranedId&craned_id) {
void CranedKeeper::ResetConnection(const CranedId &craned_id) {
WriterLock lock(&m_connected_craned_mtx_);
m_connected_craned_id_stub_map_[craned_id].reset();
m_connected_craned_id_stub_map_.erase(craned_id);
Expand Down Expand Up @@ -720,35 +731,45 @@ void CranedKeeper::CranedChannelConnectFail_(CranedStub *stub) {
craned_keeper->m_connecting_craned_set_.erase(stub->m_craned_id_);
}

void CranedKeeper::PeriodConnectCranedThreadFunc_() {
void CranedKeeper::PeriodConnectCranedTimerCb_() {
absl::ReaderMutexLock connected_reader_lock(&m_connected_craned_mtx_);
util::lock_guard guard(m_unavail_craned_set_mtx_);

uint32_t fetch_num = kConcurrentStreamQuota - m_connecting_craned_set_.size();

auto it = m_unavail_craned_set_.begin();
while (it != m_unavail_craned_set_.end() && fetch_num > 0) {
if (!m_connecting_craned_set_.contains(*it) &&
!m_connected_craned_id_stub_map_.contains(*it)) {
m_connecting_craned_set_.emplace(*it);
g_thread_pool->detach_task(
[this, craned_id = *it]() { ConnectCranedNode_(craned_id); });
fetch_num--;
}
it = m_unavail_craned_set_.erase(it);
}
}

void CranedKeeper::PeriodConnectCranedThread_(
const std::shared_ptr<uvw::loop> &uvw_loop) {
util::SetCurrentThreadName("PeriConnCraned");

while (true) {
if (m_cq_closed_) break;

// Use a window to limit the maximum number of connecting craned nodes.
{
absl::ReaderMutexLock connected_reader_lock(&m_connected_craned_mtx_);
util::lock_guard guard(m_unavail_craned_set_mtx_);

uint32_t fetch_num =
kConcurrentStreamQuota - m_connecting_craned_set_.size();

auto it = m_unavail_craned_set_.begin();
while (it != m_unavail_craned_set_.end() && fetch_num > 0) {
if (!m_connecting_craned_set_.contains(*it) &&
!m_connected_craned_id_stub_map_.contains(*it)) {
m_connecting_craned_set_.emplace(*it);
g_thread_pool->detach_task(
[this, craned_id = *it]() { ConnectCranedNode_(craned_id); });
fetch_num--;
std::shared_ptr<uvw::idle_handle> idle_handle =
uvw_loop->resource<uvw::idle_handle>();
idle_handle->on<uvw::idle_event>(
[this](const uvw::idle_event &, uvw::idle_handle &h) {
if (m_cq_closed_) {
h.parent().walk([](auto &&h) { h.close(); });
h.parent().stop();
}
it = m_unavail_craned_set_.erase(it);
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
});

std::this_thread::sleep_for(std::chrono::milliseconds(300));
if (idle_handle->start() != 0) {
CRANE_ERROR("Failed to start the idle event in craned keeper loop.");
}

uvw_loop->run();
}

} // namespace Ctld
11 changes: 8 additions & 3 deletions src/CraneCtld/CranedKeeper.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class CranedKeeper {

void PutNodeIntoUnavailList(const CranedId &crane_id);

void ResetConnection(const CranedId&craned_id);
void ResetConnection(const CranedId &craned_id);

private:
struct CqTag {
Expand All @@ -141,7 +141,9 @@ class CranedKeeper {

void StateMonitorThreadFunc_(int thread_id);

void PeriodConnectCranedThreadFunc_();
void PeriodConnectCranedTimerCb_();

void PeriodConnectCranedThread_(const std::shared_ptr<uvw::loop> &uvw_loop);

std::function<void(CranedId)> m_craned_is_up_cb_;

Expand Down Expand Up @@ -173,7 +175,10 @@ class CranedKeeper {

std::vector<std::thread> m_cq_thread_vec_;

std::thread m_period_connect_thread_;
std::shared_ptr<uvw::timer_handle> m_period_connect_handle_;
std::shared_ptr<uvw::async_handle> m_remove_craned_handle_;

std::thread m_craned_keeper_thread_;

std::atomic_uint64_t m_channel_count_{0};
};
Expand Down
Loading

0 comments on commit eff9872

Please sign in to comment.