diff --git a/presto-native-execution/presto_cpp/main/QueryContextManager.cpp b/presto-native-execution/presto_cpp/main/QueryContextManager.cpp index 3598603c300eb..95908e6903995 100644 --- a/presto-native-execution/presto_cpp/main/QueryContextManager.cpp +++ b/presto-native-execution/presto_cpp/main/QueryContextManager.cpp @@ -11,15 +11,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #include "presto_cpp/main/QueryContextManager.h" -#include #include "presto_cpp/main/PrestoToVeloxQueryConfig.h" #include "presto_cpp/main/SessionProperties.h" #include "presto_cpp/main/common/Configs.h" + #include "velox/connectors/hive/HiveConfig.h" #include "velox/core/QueryConfig.h" +#include + using namespace facebook::velox; using facebook::presto::protocol::QueryId; @@ -60,8 +61,11 @@ std::shared_ptr QueryContextCache::insert( evict(); } queryIds_.push_front(queryId); - queryCtxs_[queryId] = { - folly::to_weak_ptr(queryCtx), queryIds_.begin(), false}; + queryCtxs_.insert( + {queryId, + {.queryCtx = folly::to_weak_ptr(queryCtx), + .idListIterator = queryIds_.begin(), + .hasStartedTasks = false}}); return queryCtx; } @@ -83,11 +87,11 @@ void QueryContextCache::setTasksStarted(const protocol::QueryId& queryId) { void QueryContextCache::evict() { // Evict least recently used queryCtx if it is not referenced elsewhere. - for (auto victim = queryIds_.end(); victim != queryIds_.begin();) { - --victim; - if (!queryCtxs_[*victim].queryCtx.lock()) { - queryCtxs_.erase(*victim); - queryIds_.erase(victim); + for (auto victim = queryIds_.rbegin(); victim != queryIds_.rend(); ++victim) { + auto iter = queryCtxs_.find(*victim); + if (iter != queryCtxs_.end() && !iter->second.queryCtx.lock()) { + queryCtxs_.erase(iter); + queryIds_.erase(std::next(victim).base()); return; } } diff --git a/presto-native-execution/presto_cpp/main/QueryContextManager.h b/presto-native-execution/presto_cpp/main/QueryContextManager.h index 3a07b73c36335..70faa88b86381 100644 --- a/presto-native-execution/presto_cpp/main/QueryContextManager.h +++ b/presto-native-execution/presto_cpp/main/QueryContextManager.h @@ -35,7 +35,7 @@ class QueryContextCache { }; using QueryCtxMap = std::unordered_map; - QueryContextCache(size_t initial_capacity = kInitialCapacity) + explicit QueryContextCache(size_t initial_capacity = kInitialCapacity) : capacity_(initial_capacity) {} size_t capacity() const { diff --git a/presto-native-execution/presto_cpp/main/TaskManager.cpp b/presto-native-execution/presto_cpp/main/TaskManager.cpp index 257841d115dae..82a50ceb11194 100644 --- a/presto-native-execution/presto_cpp/main/TaskManager.cpp +++ b/presto-native-execution/presto_cpp/main/TaskManager.cpp @@ -354,11 +354,14 @@ void enqueueTask( TaskManager::TaskManager( folly::Executor* driverExecutor, folly::Executor* httpSrvCpuExecutor, - folly::Executor* spillerExecutor) + folly::Executor* spillerExecutor, + std::unique_ptr queryContextManager) : queryContextManager_( - std::make_unique( - driverExecutor, - spillerExecutor)), + queryContextManager == nullptr + ? std::make_unique( + driverExecutor, + spillerExecutor) + : std::move(queryContextManager)), bufferManager_(velox::exec::OutputBufferManager::getInstanceRef()), httpSrvCpuExecutor_(httpSrvCpuExecutor), lastNotOverloadedTimeInSecs_(velox::getCurrentTimeSec()) { diff --git a/presto-native-execution/presto_cpp/main/TaskManager.h b/presto-native-execution/presto_cpp/main/TaskManager.h index 6d0fac7af8396..8b0926b044d46 100644 --- a/presto-native-execution/presto_cpp/main/TaskManager.h +++ b/presto-native-execution/presto_cpp/main/TaskManager.h @@ -32,10 +32,23 @@ class TaskManager { TaskManager( folly::Executor* driverExecutor, folly::Executor* httpSrvExecutor, - folly::Executor* spillerExecutor); + folly::Executor* spillerExecutor, + std::unique_ptr queryContextManager = nullptr); virtual ~TaskManager() = default; + /// Always returns tuple of non-empty string containing the spill directory + /// and the date string directory, which is parent directory of task spill + /// directory. + static std::tuple buildTaskSpillDirectoryPath( + const std::string& baseSpillPath, + const std::string& nodeIp, + const std::string& nodeId, + const std::string& queryId, + const protocol::TaskId& taskId, + bool includeNodeInSpillPath); + + protected: /// Invoked by Presto server shutdown to wait for all the tasks to complete /// and cleanup the completed tasks. void shutdown(); @@ -166,17 +179,6 @@ class TaskManager { std::vector& deadlockTasks, std::vector& stuckOpCalls) const; - /// Always returns tuple of non-empty string containing the spill directory - /// and the date string directory, which is parent directory of task spill - /// directory. - static std::tuple buildTaskSpillDirectoryPath( - const std::string& baseSpillPath, - const std::string& nodeIp, - const std::string& nodeId, - const std::string& queryId, - const protocol::TaskId& taskId, - bool includeNodeInSpillPath); - /// Presto Server can notify the Task Manager that the former is overloaded, /// so the Task Manager can optionally change Task admission algorithm. void setServerOverloaded(bool serverOverloaded);