From d1ae2d7a6c1fe48bd6dc35526c8fc25de3f0d028 Mon Sep 17 00:00:00 2001 From: Brett Boston Date: Thu, 30 Jan 2025 16:45:20 -0800 Subject: [PATCH] Add function to allow checking thread type Closes #4613 This change adds a function `threadIsType` function to `Application` that allows a thread to check its type. This is intended to support more detailed assertions than the usual `releaseAssert(threadIsMain())` assertions we're currently using everywhere. The implementation differs a bit from the proposed solution in #4613 as it uses a mapping in `ApplicationImpl` to track thread types, rather than using static variables. I chose this approach because as far as I can tell, it's not possible to assign a thread an id. Given that, `ApplicationImpl` would need to set these variables in its constructor, and the variables would hold meaningless values prior to that. I figure it's safer to ensure that thread types can only be reasoned about after the creation of the threads themselves in `ApplicationImpl`'s constructor. However, if it's important that thread types be reasoned about without an `Application` or `AppConnector`, then I'm open to changing the design. --- src/main/AppConnector.cpp | 6 ++++++ src/main/AppConnector.h | 3 ++- src/main/Application.h | 13 +++++++++++++ src/main/ApplicationImpl.cpp | 14 ++++++++++++++ src/main/ApplicationImpl.h | 7 +++++++ src/overlay/Peer.cpp | 3 ++- 6 files changed, 44 insertions(+), 2 deletions(-) diff --git a/src/main/AppConnector.cpp b/src/main/AppConnector.cpp index a387154fa6..50b3048ddc 100644 --- a/src/main/AppConnector.cpp +++ b/src/main/AppConnector.cpp @@ -140,6 +140,12 @@ AppConnector::checkScheduledAndCache( return mApp.getOverlayManager().checkScheduledAndCache(msgTracker); } +bool +AppConnector::threadIsType(Application::ThreadType type) const +{ + return mApp.threadIsType(type); +} + SearchableHotArchiveSnapshotConstPtr AppConnector::copySearchableHotArchiveBucketListSnapshot() { diff --git a/src/main/AppConnector.h b/src/main/AppConnector.h index 756a768bba..b0c50e0545 100644 --- a/src/main/AppConnector.h +++ b/src/main/AppConnector.h @@ -1,12 +1,12 @@ #pragma once #include "bucket/BucketUtils.h" +#include "main/Application.h" #include "main/Config.h" #include "medida/metrics_registry.h" namespace stellar { -class Application; class OverlayManager; class LedgerManager; class Herder; @@ -57,6 +57,7 @@ class AppConnector checkScheduledAndCache(std::shared_ptr msgTracker); SorobanNetworkConfig const& getSorobanNetworkConfigReadOnly() const; SorobanNetworkConfig const& getSorobanNetworkConfigForApply() const; + bool threadIsType(Application::ThreadType type) const; medida::MetricsRegistry& getMetrics() const; SearchableHotArchiveSnapshotConstPtr diff --git a/src/main/Application.h b/src/main/Application.h index 3b1eaa67b4..f7fb181e72 100644 --- a/src/main/Application.h +++ b/src/main/Application.h @@ -164,6 +164,16 @@ class Application APP_NUM_STATE }; + // Types of threads that may be running + enum class ThreadType + { + MAIN, + WORKER, + EVICTION, + OVERLAY, + APPLY + }; + virtual ~Application(){}; virtual void initialize(bool createNewDB, bool forceRebuild) = 0; @@ -330,6 +340,9 @@ class Application return ret; } + // Returns true iff the calling thread has the same type as `type` + virtual bool threadIsType(ThreadType type) const = 0; + virtual AppConnector& getAppConnector() = 0; protected: diff --git a/src/main/ApplicationImpl.cpp b/src/main/ApplicationImpl.cpp index f9f64795f2..37032bdd8c 100644 --- a/src/main/ApplicationImpl.cpp +++ b/src/main/ApplicationImpl.cpp @@ -160,11 +160,14 @@ ApplicationImpl::ApplicationImpl(VirtualClock& clock, Config const& cfg) releaseAssert(mConfig.WORKER_THREADS > 0); releaseAssert(mEvictionIOContext); + mThreadTypes[std::this_thread::get_id()] = ThreadType::MAIN; + // Allocate one thread for Eviction scan mEvictionThread = std::thread{[this]() { runCurrentThreadWithMediumPriority(); mEvictionIOContext->run(); }}; + mThreadTypes[mEvictionThread->get_id()] = ThreadType::EVICTION; --t; @@ -174,6 +177,7 @@ ApplicationImpl::ApplicationImpl(VirtualClock& clock, Config const& cfg) runCurrentThreadWithLowPriority(); mWorkerIOContext.run(); }}; + mThreadTypes[thread.get_id()] = ThreadType::WORKER; mWorkerThreads.emplace_back(std::move(thread)); } @@ -181,12 +185,14 @@ ApplicationImpl::ApplicationImpl(VirtualClock& clock, Config const& cfg) { // Keep priority unchanged as overlay processes time-sensitive tasks mOverlayThread = std::thread{[this]() { mOverlayIOContext->run(); }}; + mThreadTypes[mOverlayThread->get_id()] = ThreadType::OVERLAY; } if (mConfig.parallelLedgerClose()) { mLedgerCloseThread = std::thread{[this]() { mLedgerCloseIOContext->run(); }}; + mThreadTypes[mLedgerCloseThread->get_id()] = ThreadType::APPLY; } } @@ -1197,6 +1203,14 @@ ApplicationImpl::getMetrics() return *mMetrics; } +bool +ApplicationImpl::threadIsType(ThreadType type) const +{ + auto it = mThreadTypes.find(std::this_thread::get_id()); + releaseAssert(it != mThreadTypes.end()); + return it->second == type; +} + void ApplicationImpl::syncOwnMetrics() { diff --git a/src/main/ApplicationImpl.h b/src/main/ApplicationImpl.h index 6780a719d1..98ad3d3cc4 100644 --- a/src/main/ApplicationImpl.h +++ b/src/main/ApplicationImpl.h @@ -115,6 +115,8 @@ class ApplicationImpl : public Application manualClose(std::optional const& manualLedgerSeq, std::optional const& manualCloseTime) override; + bool threadIsType(ThreadType type) const override; + #ifdef BUILD_TESTS virtual void generateLoad(GeneratedLoadConfig cfg) override; @@ -220,6 +222,11 @@ class ApplicationImpl : public Application // thread for eviction scans. std::optional mEvictionThread; + // NOTE: It is important that this map not be updated outside of the + // constructor. `unordered_map` is safe for multiple threads to read from, + // so long as there are no concurrent writers. + std::unordered_map mThreadTypes; + asio::signal_set mStopSignals; bool mStarted; diff --git a/src/overlay/Peer.cpp b/src/overlay/Peer.cpp index c331c8d170..dcb9fafa6a 100644 --- a/src/overlay/Peer.cpp +++ b/src/overlay/Peer.cpp @@ -460,7 +460,8 @@ void Peer::maybeExecuteInBackground(std::string const& jobName, std::function)> f) { - if (useBackgroundThread() && threadIsMain()) + if (useBackgroundThread() && + !mAppConnector.threadIsType(Application::ThreadType::OVERLAY)) { mAppConnector.postOnOverlayThread( [self = shared_from_this(), f]() { f(self); }, jobName);