diff --git a/src/main/AppConnector.cpp b/src/main/AppConnector.cpp index a387154fa6..50b3048ddc 100644 --- a/src/main/AppConnector.cpp +++ b/src/main/AppConnector.cpp @@ -140,6 +140,12 @@ AppConnector::checkScheduledAndCache( return mApp.getOverlayManager().checkScheduledAndCache(msgTracker); } +bool +AppConnector::threadIsType(Application::ThreadType type) const +{ + return mApp.threadIsType(type); +} + SearchableHotArchiveSnapshotConstPtr AppConnector::copySearchableHotArchiveBucketListSnapshot() { diff --git a/src/main/AppConnector.h b/src/main/AppConnector.h index 756a768bba..b0c50e0545 100644 --- a/src/main/AppConnector.h +++ b/src/main/AppConnector.h @@ -1,12 +1,12 @@ #pragma once #include "bucket/BucketUtils.h" +#include "main/Application.h" #include "main/Config.h" #include "medida/metrics_registry.h" namespace stellar { -class Application; class OverlayManager; class LedgerManager; class Herder; @@ -57,6 +57,7 @@ class AppConnector checkScheduledAndCache(std::shared_ptr msgTracker); SorobanNetworkConfig const& getSorobanNetworkConfigReadOnly() const; SorobanNetworkConfig const& getSorobanNetworkConfigForApply() const; + bool threadIsType(Application::ThreadType type) const; medida::MetricsRegistry& getMetrics() const; SearchableHotArchiveSnapshotConstPtr diff --git a/src/main/Application.h b/src/main/Application.h index 3b1eaa67b4..f7fb181e72 100644 --- a/src/main/Application.h +++ b/src/main/Application.h @@ -164,6 +164,16 @@ class Application APP_NUM_STATE }; + // Types of threads that may be running + enum class ThreadType + { + MAIN, + WORKER, + EVICTION, + OVERLAY, + APPLY + }; + virtual ~Application(){}; virtual void initialize(bool createNewDB, bool forceRebuild) = 0; @@ -330,6 +340,9 @@ class Application return ret; } + // Returns true iff the calling thread has the same type as `type` + virtual bool threadIsType(ThreadType type) const = 0; + virtual AppConnector& getAppConnector() = 0; protected: diff --git a/src/main/ApplicationImpl.cpp b/src/main/ApplicationImpl.cpp index f9f64795f2..37032bdd8c 100644 --- a/src/main/ApplicationImpl.cpp +++ b/src/main/ApplicationImpl.cpp @@ -160,11 +160,14 @@ ApplicationImpl::ApplicationImpl(VirtualClock& clock, Config const& cfg) releaseAssert(mConfig.WORKER_THREADS > 0); releaseAssert(mEvictionIOContext); + mThreadTypes[std::this_thread::get_id()] = ThreadType::MAIN; + // Allocate one thread for Eviction scan mEvictionThread = std::thread{[this]() { runCurrentThreadWithMediumPriority(); mEvictionIOContext->run(); }}; + mThreadTypes[mEvictionThread->get_id()] = ThreadType::EVICTION; --t; @@ -174,6 +177,7 @@ ApplicationImpl::ApplicationImpl(VirtualClock& clock, Config const& cfg) runCurrentThreadWithLowPriority(); mWorkerIOContext.run(); }}; + mThreadTypes[thread.get_id()] = ThreadType::WORKER; mWorkerThreads.emplace_back(std::move(thread)); } @@ -181,12 +185,14 @@ ApplicationImpl::ApplicationImpl(VirtualClock& clock, Config const& cfg) { // Keep priority unchanged as overlay processes time-sensitive tasks mOverlayThread = std::thread{[this]() { mOverlayIOContext->run(); }}; + mThreadTypes[mOverlayThread->get_id()] = ThreadType::OVERLAY; } if (mConfig.parallelLedgerClose()) { mLedgerCloseThread = std::thread{[this]() { mLedgerCloseIOContext->run(); }}; + mThreadTypes[mLedgerCloseThread->get_id()] = ThreadType::APPLY; } } @@ -1197,6 +1203,14 @@ ApplicationImpl::getMetrics() return *mMetrics; } +bool +ApplicationImpl::threadIsType(ThreadType type) const +{ + auto it = mThreadTypes.find(std::this_thread::get_id()); + releaseAssert(it != mThreadTypes.end()); + return it->second == type; +} + void ApplicationImpl::syncOwnMetrics() { diff --git a/src/main/ApplicationImpl.h b/src/main/ApplicationImpl.h index 6780a719d1..98ad3d3cc4 100644 --- a/src/main/ApplicationImpl.h +++ b/src/main/ApplicationImpl.h @@ -115,6 +115,8 @@ class ApplicationImpl : public Application manualClose(std::optional const& manualLedgerSeq, std::optional const& manualCloseTime) override; + bool threadIsType(ThreadType type) const override; + #ifdef BUILD_TESTS virtual void generateLoad(GeneratedLoadConfig cfg) override; @@ -220,6 +222,11 @@ class ApplicationImpl : public Application // thread for eviction scans. std::optional mEvictionThread; + // NOTE: It is important that this map not be updated outside of the + // constructor. `unordered_map` is safe for multiple threads to read from, + // so long as there are no concurrent writers. + std::unordered_map mThreadTypes; + asio::signal_set mStopSignals; bool mStarted; diff --git a/src/overlay/Peer.cpp b/src/overlay/Peer.cpp index c331c8d170..dcb9fafa6a 100644 --- a/src/overlay/Peer.cpp +++ b/src/overlay/Peer.cpp @@ -460,7 +460,8 @@ void Peer::maybeExecuteInBackground(std::string const& jobName, std::function)> f) { - if (useBackgroundThread() && threadIsMain()) + if (useBackgroundThread() && + !mAppConnector.threadIsType(Application::ThreadType::OVERLAY)) { mAppConnector.postOnOverlayThread( [self = shared_from_this(), f]() { f(self); }, jobName);