Skip to content

Commit

Permalink
MGM: Reinforce the check of the various internal components of the MG…
Browse files Browse the repository at this point in the history
…M that

should not run when the daemon in running in slave mode.
  • Loading branch information
esindril committed Jan 31, 2025
1 parent 1ff12ac commit 9b59c4a
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 36 deletions.
18 changes: 12 additions & 6 deletions mgm/FileSystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ FileSystem::SetConfigStatus(eos::common::ConfigStatus new_status)
eos::common::ConfigStatus old_status = GetConfigStatus();
int drain_tx = IsDrainTransition(old_status, new_status);

// Only master drains
// Only master drains and updates the configuration status
if (ShouldBroadCast()) {
std::string out_msg;

Expand All @@ -237,10 +237,12 @@ FileSystem::SetConfigStatus(eos::common::ConfigStatus new_status)
}
}
}

std::string val = eos::common::FileSystem::GetConfigStatusAsString(new_status);
return eos::common::FileSystem::SetString("configstatus", val.c_str());
}

std::string val = eos::common::FileSystem::GetConfigStatusAsString(new_status);
return eos::common::FileSystem::SetString("configstatus", val.c_str());
return true;
}

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -297,11 +299,15 @@ bool
FileSystem::ShouldBroadCast()
{
if (mRealm) {
if (mRealm->getSom()) {
if (mRealm->getSom()) { // MQ backend
return mRealm->getSom()->ShouldBroadCast();
} else {
// @note (esindril) to review when active-passive is actually enabled
return true;
// QDB pub-sub
if (gOFS && gOFS->mMaster->IsMaster()) {
return true;
} else {
return false;
}
}
} else {
return false;
Expand Down
19 changes: 12 additions & 7 deletions mgm/LRU.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,16 @@ void LRU::performCycleQDB(ThreadAssistant& assistant) noexcept
while (explorer.fetch(item)) {
eos_static_debug("lru-dir-qdb=\"%s\" attrs=%d", item.fullPath.c_str(),
item.attrs.size());
processDirectory(item.fullPath,item.attrs);
processDirectory(item.fullPath, item.attrs);
processed++;

if (processed % 1000 == 0) {
eos_static_info("msg=\"LRU scan in progress\" num_scanned_dirs=%lli",
processed);

if (assistant.terminationRequested()) {
eos_static_info("%s", "msg=\"termination requested, quit LRU\"");
if (assistant.terminationRequested() || !gOFS->mMaster->IsMaster()) {
eos_static_info("%s", "msg=\"quit LRU due termination request "
"or MGM running in slave mode\"");
break;
}
}
Expand Down Expand Up @@ -265,10 +266,11 @@ void LRU::processDirectory(const std::string& dir,
}

// Don't walk into the proc directory
if (dir.substr(0,gOFS->MgmProcPath.length()) == gOFS->MgmProcPath.c_str()) {
if (dir.substr(0, gOFS->MgmProcPath.length()) == gOFS->MgmProcPath.c_str()) {
eos_static_debug("skipping proc tree %s\n", dir.c_str());
return;
}

// Sort out the individual LRU policies
if (map.count("sys.lru.expire.empty")) {
// Remove empty directories older than <age>
Expand Down Expand Up @@ -358,18 +360,20 @@ LRU::AgeExpire(const char* dir, const std::string& policy)

// Loop through all file names
for (auto it = eos::FileMapIterator(cmd); it.valid(); it.next()) {

// no need to lock the cmd
fmd = cmd->findFile(it.key());
if(fmd == nullptr) {

if (fmd == nullptr) {
eos_static_err("msg=\"file is null\" fxid=%08llx", it.key().c_str());
continue;
}

{
eos::MDLocking::FileReadLock fmdLock(fmd);
fname = fmd->getName().c_str();
fmd->getCTime(fctime);
}

fullpath = dir;
fullpath += fname.c_str();
eos_static_debug("check_file=\"%s\"", fullpath.c_str());
Expand Down Expand Up @@ -795,7 +799,8 @@ LRU::ConvertMatch(const char* dir,
space = cenv.Get("eos.space");
}

std::string conv_tag = ConversionTag::Get(it->first, space.c_str(), conversion, plctplcy);
std::string conv_tag = ConversionTag::Get(it->first, space.c_str(), conversion,
plctplcy);

if (gOFS->mConverterDriver->ScheduleJob(fid, conv_tag)) {
eos_static_info("msg=\"LRU scheduled conversion job\" tag=\"%s\"",
Expand Down
5 changes: 2 additions & 3 deletions mgm/XrdMgmOfs/FsConfigListener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,11 @@ XrdMgmOfs::processIncomingMgmConfigurationChange(const std::string& key)
(key.substr(0, 6) != "quota:") &&
(key.substr(0, 4) != "vid:") &&
(key.substr(0, 7) != "policy:")) {
XrdOucString skey = key.c_str();
eos_info("msg=\"apply access config\" key=\"%s\" val=\"%s\"",
key.c_str(), value.c_str());
Access::ApplyAccessConfig(false);

if (skey.beginswith("iostat:")) {
if (key.find("iostat:") == 0) {
gOFS->IoStats->ApplyIostatConfig(&FsView::gFsView);
}
} else {
Expand Down Expand Up @@ -119,7 +118,7 @@ XrdMgmOfs::ProcessGeotagChange(const std::string& queue)
if (FsView::gFsView.mNodeView.count(fs->GetQueue())) {
// Check if the change notification is an actual change in the geotag
FsNode* node = FsView::gFsView.mNodeView[fs->GetQueue()];
static_cast<GeoTree*>(node)->getGeoTagInTree(fsid , oldgeotag);
static_cast<GeoTree*>(node)->getGeoTagInTree(fsid, oldgeotag);
oldgeotag.erase(0, 8); // to get rid of the "<ROOT>::" prefix
}

Expand Down
6 changes: 4 additions & 2 deletions mgm/balancer/FsBalancer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ FsBalancer::Balance(ThreadAssistant& assistant) noexcept
const auto& src_fses = it_current->first;

for (const auto& src : src_fses) {
if (assistant.terminationRequested()) {
if (assistant.terminationRequested() || !gOFS->mMaster->IsMaster()) {
break;
}

Expand Down Expand Up @@ -276,7 +276,9 @@ FsBalancer::Balance(ThreadAssistant& assistant) noexcept
if (it_current == vect_tx.end()) {
it_current = vect_tx.begin();
}
} while ((it_current != it_start) && !assistant.terminationRequested());
} while ((it_current != it_start) &&
!assistant.terminationRequested() &&
gOFS->mMaster->IsMaster());

if (no_slots) {
eos_static_info("%s", "msg=\"sleep no slots\"");
Expand Down
50 changes: 32 additions & 18 deletions mgm/fsck/Fsck.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@

EOSMGMNAMESPACE_BEGIN

const std::string Fsck::sFsckKey
{"fsck"
const std::string Fsck::sFsckKey {
"fsck"
};
const std::string Fsck::sCollectKey {"toggle-collect"};
const std::string Fsck::sCollectIntervalKey {"collect-interval-min"};
Expand Down Expand Up @@ -106,6 +106,7 @@ Fsck::ApplyFsckConfig()

if (kv_map.count(sCollectKey) && kv_map.count(sCollectIntervalKey)) {
bool enable_collect = (kv_map[sCollectKey] == "1");

// Make fsck config enforcement idempotent. Note the value in the Config
// call is overloaded with the "off" string marking the fact that fsck
// collection should be disabled.
Expand Down Expand Up @@ -317,13 +318,22 @@ Fsck::CollectErrs(ThreadAssistant& assistant) noexcept

gOFS->WaitUntilNamespaceIsBooted();

// Wait that current MGM becomes a master
do {
eos_debug("%s", "msg=\"fsck waiting for master MGM\"");
assistant.wait_for(std::chrono::seconds(10));
} while (!assistant.terminationRequested() && !gOFS->mMaster->IsMaster());

while (!assistant.terminationRequested()) {
// Wait for the current MGM to become a master
while (!gOFS->mMaster->IsMaster()) {
eos_debug("%s", "msg=\"fsck collect disabled for slave\"");
assistant.wait_for(std::chrono::seconds(5));

if (assistant.terminationRequested()) {
eos_info("%s", "msg=\"stopped fsck collect thread\"");
break;
}
}

if (assistant.terminationRequested()) {
break;
}

Log("Start error collection");
Log("Filesystems to check: %lu", FsView::gFsView.GetNumFileSystems());
decltype(eFsMap) tmp_err_map;
Expand Down Expand Up @@ -392,32 +402,35 @@ Fsck::RepairErrs(ThreadAssistant& assistant) noexcept
gOFS->WaitUntilNamespaceIsBooted();

while (!assistant.terminationRequested()) {
// Don't run if we are not a master
// Wait for the current MGM to become a master
while (!gOFS->mMaster->IsMaster()) {
assistant.wait_for(std::chrono::seconds(1));
eos_debug("%s", "msg=\"fsck repair disabled for slave\"");
assistant.wait_for(std::chrono::seconds(5));

if (assistant.terminationRequested()) {
eos_info("%s", "msg=\"stopped fsck repair thread\"");
mRepairRunning = false;
return;
break;
}
}

// Wait for the collector thread to signal us
while (!mStartProcessing) {
while (!mStartProcessing && !assistant.terminationRequested()) {
assistant.wait_for(std::chrono::seconds(1));

if (assistant.terminationRequested()) {
eos_info("%s", "msg=\"stopped fsck repair thread\"");
mRepairRunning = false;
return;
break;
}
}

if (assistant.terminationRequested()) {
break;
}

// Create local struct for errors so that we avoid the iterator invalidation
// and the long locks
std::map<std::string,
std::map<eos::common::FileId::fileid_t ,
std::map<eos::common::FileId::fileid_t,
std::set <eos::common::FileSystem::fsid_t> > > local_emap;
{
eos::common::RWMutexReadLock rd_lock(mErrMutex);
Expand Down Expand Up @@ -453,7 +466,8 @@ Fsck::RepairErrs(ThreadAssistant& assistant) noexcept
// Check regularly if we should exit and sleep if queue is full
while ((mThreadPool.GetQueueSize() > mMaxQueuedJobs) ||
(++count % 100 == 0)) {
if (assistant.terminationRequested()) {
if (assistant.terminationRequested() ||
!gOFS->mMaster->IsMaster()) {
// Wait that there are not more jobs in the queue - this can
// take a while depending on the queue size
while (mThreadPool.GetQueueSize()) {
Expand Down Expand Up @@ -1184,7 +1198,7 @@ Fsck::QueryQdb(ErrMapT& err_map)
if ((pos == std::string::npos) || (pos == data.length()))
{
eos_static_err("msg=\"failed to parse fsck element\" data=\"%s\"",
data.c_str());
data.c_str());
return {0ull, 0ul};
}

Expand Down

0 comments on commit 9b59c4a

Please sign in to comment.