Skip to content

Commit 98c3ad8

Browse files
committed
[feat](spill) spill and reserve
Origin commit: 0e7e42d
1 parent 2254956 commit 98c3ad8

File tree

428 files changed

+6529
-2113
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

428 files changed

+6529
-2113
lines changed

be/src/agent/workload_group_listener.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
#include "agent/workload_group_listener.h"
1919

20+
#include <thrift/protocol/TDebugProtocol.h>
21+
2022
#include "runtime/exec_env.h"
2123
#include "runtime/workload_group/workload_group.h"
2224
#include "runtime/workload_group/workload_group_manager.h"
@@ -33,6 +35,8 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
3335
if (!topic_info.__isset.workload_group_info) {
3436
continue;
3537
}
38+
LOG(INFO) << "Received publish workload group info request: "
39+
<< apache::thrift::ThriftDebugString(topic_info).c_str();
3640
is_set_workload_group_info = true;
3741

3842
// 1 parse topic info to group info

be/src/common/config.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
#include <fmt/core.h>
19+
#include <gflags/gflags.h>
1920
#include <stdint.h>
2021

2122
#include <algorithm>
@@ -118,7 +119,7 @@ DEFINE_String(mem_limit, "90%");
118119
DEFINE_Double(soft_mem_limit_frac, "0.9");
119120

120121
// Cache capacity reduce mem limit as a fraction of soft mem limit.
121-
DEFINE_mDouble(cache_capacity_reduce_mem_limit_frac, "0.6");
122+
DEFINE_mDouble(cache_capacity_reduce_mem_limit_frac, "0.7");
122123

123124
// Schema change memory limit as a fraction of soft memory limit.
124125
DEFINE_Double(schema_change_mem_limit_frac, "0.6");
@@ -1271,6 +1272,9 @@ DEFINE_Validator(spill_io_thread_pool_thread_num, [](const int config) -> bool {
12711272
});
12721273
DEFINE_Int32(spill_io_thread_pool_queue_size, "102400");
12731274

1275+
// paused query in queue timeout(ms) will be resumed or canceled
1276+
DEFINE_Int64(spill_in_paused_queue_timeout_ms, "60000");
1277+
12741278
DEFINE_mBool(check_segment_when_build_rowset_meta, "false");
12751279

12761280
DEFINE_mInt32(max_s3_client_retry, "10");

be/src/common/config.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1350,6 +1350,7 @@ DECLARE_mInt32(spill_gc_interval_ms);
13501350
DECLARE_mInt32(spill_gc_work_time_ms);
13511351
DECLARE_Int32(spill_io_thread_pool_thread_num);
13521352
DECLARE_Int32(spill_io_thread_pool_queue_size);
1353+
DECLARE_Int64(spill_in_paused_queue_timeout_ms);
13531354

13541355
DECLARE_mBool(check_segment_when_build_rowset_meta);
13551356

be/src/common/daemon.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -320,15 +320,18 @@ void Daemon::memory_maintenance_thread() {
320320
doris::ExecEnv::GetInstance()->workload_group_mgr()->do_sweep();
321321
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit();
322322

323-
// step 7. Analyze blocking queries.
323+
// step 7: handle paused queries(caused by memory insufficient)
324+
doris::ExecEnv::GetInstance()->workload_group_mgr()->handle_paused_queries();
325+
326+
// step 8. Analyze blocking queries.
324327
// TODO sort the operators that can spill, wake up the pipeline task spill
325328
// or continue execution according to certain rules or cancel query.
326329

327-
// step 8. Flush memtable
330+
// step 9. Flush memtable
328331
doris::GlobalMemoryArbitrator::notify_memtable_memory_refresh();
329332
// TODO notify flush memtable
330333

331-
// step 9. Reset Jemalloc dirty page decay.
334+
// step 10. Reset Jemalloc dirty page decay.
332335
je_reset_dirty_decay();
333336
}
334337
}
@@ -542,7 +545,9 @@ void Daemon::cache_adjust_capacity_thread() {
542545
doris::GlobalMemoryArbitrator::cache_adjust_capacity_cv.wait_for(
543546
l, std::chrono::milliseconds(100));
544547
}
545-
double adjust_weighted = GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted;
548+
double adjust_weighted = std::min<double>(
549+
GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted,
550+
GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted);
546551
if (_stop_background_threads_latch.count() == 0) {
547552
break;
548553
}
@@ -562,6 +567,7 @@ void Daemon::cache_adjust_capacity_thread() {
562567
LOG(INFO) << fmt::format(
563568
"[MemoryGC] refresh cache capacity end, free memory {}, details: {}",
564569
PrettyPrinter::print(freed_mem, TUnit::BYTES), ss.str());
570+
GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted = adjust_weighted;
565571
} while (true);
566572
}
567573

be/src/common/status.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,9 @@ namespace ErrorCode {
131131
E(BAD_CAST, -254, true); \
132132
E(ARITHMETIC_OVERFLOW_ERRROR, -255, false); \
133133
E(PERMISSION_DENIED, -256, false); \
134+
E(QUERY_MEMORY_EXCEEDED, -257, false); \
135+
E(WORKLOAD_GROUP_MEMORY_EXCEEDED, -258, false); \
136+
E(PROCESS_MEMORY_EXCEEDED, -259, false); \
134137
E(CE_CMD_PARAMS_ERROR, -300, true); \
135138
E(CE_BUFFER_TOO_SMALL, -301, true); \
136139
E(CE_CMD_NOT_VALID, -302, true); \
@@ -381,6 +384,11 @@ class [[nodiscard]] Status {
381384
_code = rhs._code;
382385
if (rhs._err_msg) {
383386
_err_msg = std::make_unique<ErrMsg>(*rhs._err_msg);
387+
} else {
388+
// If rhs error msg is empty, then should also clear current error msg
389+
// For example, if rhs is OK and current status is error, then copy to current
390+
// status, should clear current error message.
391+
_err_msg.reset();
384392
}
385393
return *this;
386394
}
@@ -390,6 +398,8 @@ class [[nodiscard]] Status {
390398
_code = rhs._code;
391399
if (rhs._err_msg) {
392400
_err_msg = std::move(rhs._err_msg);
401+
} else {
402+
_err_msg.reset();
393403
}
394404
return *this;
395405
}

be/src/exec/schema_scanner/schema_backend_active_tasks.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ std::vector<SchemaScanner::ColumnDesc> SchemaBackendActiveTasksScanner::_s_tbls_
3131
// name, type, size
3232
{"BE_ID", TYPE_BIGINT, sizeof(int64_t), false},
3333
{"FE_HOST", TYPE_VARCHAR, sizeof(StringRef), false},
34+
{"WORKLOAD_GROUP_ID", TYPE_BIGINT, sizeof(int64_t), false},
3435
{"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false},
3536
{"TASK_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false},
3637
{"TASK_CPU_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false},
@@ -41,6 +42,8 @@ std::vector<SchemaScanner::ColumnDesc> SchemaBackendActiveTasksScanner::_s_tbls_
4142
{"SHUFFLE_SEND_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
4243
{"SHUFFLE_SEND_ROWS", TYPE_BIGINT, sizeof(int64_t), false},
4344
{"QUERY_TYPE", TYPE_VARCHAR, sizeof(StringRef), false},
45+
{"SPILL_WRITE_BYTES_TO_LOCAL_STORAGE", TYPE_BIGINT, sizeof(int64_t), false},
46+
{"SPILL_READ_BYTES_FROM_LOCAL_STORAGE", TYPE_BIGINT, sizeof(int64_t), false},
4447
};
4548

4649
SchemaBackendActiveTasksScanner::SchemaBackendActiveTasksScanner()

be/src/exec/schema_scanner/schema_workload_group_resource_usage_scanner.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ std::vector<SchemaScanner::ColumnDesc> SchemaBackendWorkloadGroupResourceUsage::
3838
{"CPU_USAGE_PERCENT", TYPE_DOUBLE, sizeof(double), false},
3939
{"LOCAL_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false},
4040
{"REMOTE_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false},
41+
{"WRITE_BUFFER_USAGE_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
4142
};
4243

4344
SchemaBackendWorkloadGroupResourceUsage::SchemaBackendWorkloadGroupResourceUsage()

be/src/olap/memtable.cpp

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,8 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) {
143143
}
144144

145145
MemTable::~MemTable() {
146-
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
146+
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
147+
_query_thread_context.query_mem_tracker->write_tracker());
147148
if (_is_flush_success) {
148149
// If the memtable is flush success, then its memtracker's consumption should be 0
149150
if (_mem_tracker->consumption() != 0 && config::crash_in_memory_tracker_inaccurate) {
@@ -182,6 +183,8 @@ int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* r
182183

183184
Status MemTable::insert(const vectorized::Block* input_block,
184185
const std::vector<uint32_t>& row_idxs) {
186+
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
187+
_query_thread_context.query_mem_tracker->write_tracker());
185188
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
186189

187190
if (_is_first_insertion) {
@@ -579,6 +582,8 @@ void MemTable::_aggregate() {
579582
}
580583

581584
void MemTable::shrink_memtable_by_agg() {
585+
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
586+
_query_thread_context.query_mem_tracker->write_tracker());
582587
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
583588
if (_keys_type == KeysType::DUP_KEYS) {
584589
return;
@@ -608,6 +613,20 @@ bool MemTable::need_agg() const {
608613
return false;
609614
}
610615

616+
size_t MemTable::get_flush_reserve_memory_size() const {
617+
size_t reserve_size = 0;
618+
if (_keys_type == KeysType::DUP_KEYS) {
619+
if (_tablet_schema->num_key_columns() == 0) {
620+
// no need to reserve
621+
} else {
622+
reserve_size = _input_mutable_block.allocated_bytes();
623+
}
624+
} else {
625+
reserve_size = _input_mutable_block.allocated_bytes();
626+
}
627+
return reserve_size;
628+
}
629+
611630
Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) {
612631
size_t same_keys_num = _sort();
613632
if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) {

be/src/olap/memtable.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ class MemTable {
181181

182182
int64_t tablet_id() const { return _tablet_id; }
183183
size_t memory_usage() const { return _mem_tracker->consumption(); }
184+
size_t get_flush_reserve_memory_size() const;
184185
// insert tuple from (row_pos) to (row_pos+num_rows)
185186
Status insert(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs);
186187

be/src/olap/memtable_flush_executor.cpp

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "common/signal_handler.h"
2929
#include "olap/memtable.h"
3030
#include "olap/rowset/rowset_writer.h"
31+
#include "olap/storage_engine.h"
3132
#include "util/debug_points.h"
3233
#include "util/doris_metrics.h"
3334
#include "util/metrics.h"
@@ -137,6 +138,37 @@ Status FlushToken::wait() {
137138
return Status::OK();
138139
}
139140

141+
Status FlushToken::_try_reserve_memory(QueryThreadContext query_thread_context, int64_t size) {
142+
auto* thread_context = doris::thread_context();
143+
auto* memtable_flush_executor =
144+
ExecEnv::GetInstance()->storage_engine().memtable_flush_executor();
145+
Status st;
146+
do {
147+
// only try to reserve process memory
148+
st = thread_context->try_reserve_process_memory(size);
149+
if (st.ok()) {
150+
memtable_flush_executor->inc_flushing_task();
151+
break;
152+
}
153+
if (_is_shutdown() || query_thread_context.get_memory_tracker()->is_query_cancelled()) {
154+
st = Status::Cancelled("flush memtable already cancelled");
155+
break;
156+
}
157+
// Make sure at least one memtable is flushing even reserve memory failed.
158+
if (memtable_flush_executor->check_and_inc_has_any_flushing_task()) {
159+
// If there are already any flushing task, Wait for some time and retry.
160+
LOG_EVERY_T(INFO, 60) << fmt::format(
161+
"Failed to reserve memory {} for flush memtable, retry after 100ms",
162+
PrettyPrinter::print_bytes(size));
163+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
164+
} else {
165+
st = Status::OK();
166+
break;
167+
}
168+
} while (true);
169+
return st;
170+
}
171+
140172
Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size) {
141173
VLOG_CRITICAL << "begin to flush memtable for tablet: " << memtable->tablet_id()
142174
<< ", memsize: " << memtable->memory_usage()
@@ -147,8 +179,19 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in
147179
SCOPED_ATTACH_TASK(memtable->query_thread_context());
148180
signal::set_signal_task_id(_rowset_writer->load_id());
149181
signal::tablet_id = memtable->tablet_id();
182+
183+
DEFER_RELEASE_RESERVED();
184+
185+
auto reserve_size = memtable->get_flush_reserve_memory_size();
186+
RETURN_IF_ERROR(_try_reserve_memory(memtable->query_thread_context(), reserve_size));
150187
{
188+
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
189+
memtable->query_thread_context().query_mem_tracker->write_tracker());
151190
SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());
191+
192+
Defer defer {[&]() {
193+
ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->dec_flushing_task();
194+
}};
152195
std::unique_ptr<vectorized::Block> block;
153196
RETURN_IF_ERROR(memtable->to_block(&block));
154197
RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(), segment_id, flush_size));

be/src/olap/memtable_flush_executor.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ class FlushToken : public std::enable_shared_from_this<FlushToken> {
9494

9595
Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size);
9696

97+
Status _try_reserve_memory(QueryThreadContext query_thread_context, int64_t size);
98+
9799
// Records the current flush status of the tablet.
98100
// Note: Once its value is set to Failed, it cannot return to SUCCESS.
99101
std::shared_mutex _flush_status_lock;
@@ -139,9 +141,26 @@ class MemTableFlushExecutor {
139141
std::shared_ptr<RowsetWriter> rowset_writer, bool is_high_priority,
140142
std::shared_ptr<WorkloadGroup> wg_sptr);
141143

144+
// return true if it already has any flushing task
145+
bool check_and_inc_has_any_flushing_task() {
146+
// need to use CAS instead of only `if (0 == _flushing_task_count)` statement,
147+
// to avoid concurrent entries both pass the if statement
148+
int expected_count = 0;
149+
if (!_flushing_task_count.compare_exchange_strong(expected_count, 1)) {
150+
return true;
151+
}
152+
DCHECK(expected_count == 0 && _flushing_task_count == 1);
153+
return false;
154+
}
155+
156+
void inc_flushing_task() { _flushing_task_count++; }
157+
158+
void dec_flushing_task() { _flushing_task_count--; }
159+
142160
private:
143161
std::unique_ptr<ThreadPool> _flush_pool;
144162
std::unique_ptr<ThreadPool> _high_prio_flush_pool;
163+
std::atomic<int> _flushing_task_count = 0;
145164
};
146165

147166
} // namespace doris

0 commit comments

Comments
 (0)