Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat](spill) spill and reserve #46230

Closed
wants to merge 10 commits into from
  •  
  •  
  •  
4 changes: 4 additions & 0 deletions be/src/agent/workload_group_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "agent/workload_group_listener.h"

#include <thrift/protocol/TDebugProtocol.h>

#include "runtime/exec_env.h"
#include "runtime/workload_group/workload_group.h"
#include "runtime/workload_group/workload_group_manager.h"
Expand All @@ -33,6 +35,8 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
if (!topic_info.__isset.workload_group_info) {
continue;
}
LOG(INFO) << "Received publish workload group info request: "
<< apache::thrift::ThriftDebugString(topic_info).c_str();
is_set_workload_group_info = true;

// 1 parse topic info to group info
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Status CloudDeltaWriter::batch_init(std::vector<CloudDeltaWriter*> writers) {
}

Status CloudDeltaWriter::write(const vectorized::Block* block,
const std::vector<uint32_t>& row_idxs) {
const DorisVector<uint32_t>& row_idxs) {
if (row_idxs.empty()) [[unlikely]] {
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class CloudDeltaWriter final : public BaseDeltaWriter {
const UniqueId& load_id);
~CloudDeltaWriter() override;

Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs) override;
Status write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs) override;

Status close() override;

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Status CloudTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& reques
return Status::OK();
}

std::unordered_map<int64_t, std::vector<uint32_t>> tablet_to_rowidxs;
std::unordered_map<int64_t, DorisVector<uint32_t>> tablet_to_rowidxs;
_build_tablet_to_rowidxs(request, &tablet_to_rowidxs);

std::unordered_set<int64_t> partition_ids;
Expand Down
6 changes: 5 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include <fmt/core.h>
#include <gflags/gflags.h>
#include <stdint.h>

#include <algorithm>
Expand Down Expand Up @@ -118,7 +119,7 @@ DEFINE_String(mem_limit, "90%");
DEFINE_Double(soft_mem_limit_frac, "0.9");

// Cache capacity reduce mem limit as a fraction of soft mem limit.
DEFINE_mDouble(cache_capacity_reduce_mem_limit_frac, "0.6");
DEFINE_mDouble(cache_capacity_reduce_mem_limit_frac, "0.7");

// Schema change memory limit as a fraction of soft memory limit.
DEFINE_Double(schema_change_mem_limit_frac, "0.6");
Expand Down Expand Up @@ -1271,6 +1272,9 @@ DEFINE_Validator(spill_io_thread_pool_thread_num, [](const int config) -> bool {
});
DEFINE_Int32(spill_io_thread_pool_queue_size, "102400");

// paused query in queue timeout(ms) will be resumed or canceled
DEFINE_Int64(spill_in_paused_queue_timeout_ms, "60000");

DEFINE_mBool(check_segment_when_build_rowset_meta, "false");

DEFINE_mInt32(max_s3_client_retry, "10");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1350,6 +1350,7 @@ DECLARE_mInt32(spill_gc_interval_ms);
DECLARE_mInt32(spill_gc_work_time_ms);
DECLARE_Int32(spill_io_thread_pool_thread_num);
DECLARE_Int32(spill_io_thread_pool_queue_size);
DECLARE_Int64(spill_in_paused_queue_timeout_ms);

DECLARE_mBool(check_segment_when_build_rowset_meta);

Expand Down
14 changes: 10 additions & 4 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,15 +320,18 @@ void Daemon::memory_maintenance_thread() {
doris::ExecEnv::GetInstance()->workload_group_mgr()->do_sweep();
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit();

// step 7. Analyze blocking queries.
// step 7: handle paused queries(caused by memory insufficient)
doris::ExecEnv::GetInstance()->workload_group_mgr()->handle_paused_queries();

// step 8. Analyze blocking queries.
// TODO sort the operators that can spill, wake up the pipeline task spill
// or continue execution according to certain rules or cancel query.

// step 8. Flush memtable
// step 9. Flush memtable
doris::GlobalMemoryArbitrator::notify_memtable_memory_refresh();
// TODO notify flush memtable

// step 9. Reset Jemalloc dirty page decay.
// step 10. Reset Jemalloc dirty page decay.
je_reset_dirty_decay();
}
}
Expand Down Expand Up @@ -542,7 +545,9 @@ void Daemon::cache_adjust_capacity_thread() {
doris::GlobalMemoryArbitrator::cache_adjust_capacity_cv.wait_for(
l, std::chrono::milliseconds(100));
}
double adjust_weighted = GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted;
double adjust_weighted = std::min<double>(
GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted,
GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted);
if (_stop_background_threads_latch.count() == 0) {
break;
}
Expand All @@ -562,6 +567,7 @@ void Daemon::cache_adjust_capacity_thread() {
LOG(INFO) << fmt::format(
"[MemoryGC] refresh cache capacity end, free memory {}, details: {}",
PrettyPrinter::print(freed_mem, TUnit::BYTES), ss.str());
GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted = adjust_weighted;
} while (true);
}

Expand Down
10 changes: 10 additions & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ namespace ErrorCode {
E(BAD_CAST, -254, true); \
E(ARITHMETIC_OVERFLOW_ERRROR, -255, false); \
E(PERMISSION_DENIED, -256, false); \
E(QUERY_MEMORY_EXCEEDED, -257, false); \
E(WORKLOAD_GROUP_MEMORY_EXCEEDED, -258, false); \
E(PROCESS_MEMORY_EXCEEDED, -259, false); \
E(CE_CMD_PARAMS_ERROR, -300, true); \
E(CE_BUFFER_TOO_SMALL, -301, true); \
E(CE_CMD_NOT_VALID, -302, true); \
Expand Down Expand Up @@ -381,6 +384,11 @@ class [[nodiscard]] Status {
_code = rhs._code;
if (rhs._err_msg) {
_err_msg = std::make_unique<ErrMsg>(*rhs._err_msg);
} else {
// If rhs error msg is empty, then should also clear current error msg
// For example, if rhs is OK and current status is error, then copy to current
// status, should clear current error message.
_err_msg.reset();
}
return *this;
}
Expand All @@ -390,6 +398,8 @@ class [[nodiscard]] Status {
_code = rhs._code;
if (rhs._err_msg) {
_err_msg = std::move(rhs._err_msg);
} else {
_err_msg.reset();
}
return *this;
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ std::vector<SchemaScanner::ColumnDesc> SchemaBackendActiveTasksScanner::_s_tbls_
// name, type, size
{"BE_ID", TYPE_BIGINT, sizeof(int64_t), false},
{"FE_HOST", TYPE_VARCHAR, sizeof(StringRef), false},
{"WORKLOAD_GROUP_ID", TYPE_BIGINT, sizeof(int64_t), false},
{"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false},
{"TASK_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false},
{"TASK_CPU_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false},
Expand All @@ -41,6 +42,8 @@ std::vector<SchemaScanner::ColumnDesc> SchemaBackendActiveTasksScanner::_s_tbls_
{"SHUFFLE_SEND_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"SHUFFLE_SEND_ROWS", TYPE_BIGINT, sizeof(int64_t), false},
{"QUERY_TYPE", TYPE_VARCHAR, sizeof(StringRef), false},
{"SPILL_WRITE_BYTES_TO_LOCAL_STORAGE", TYPE_BIGINT, sizeof(int64_t), false},
{"SPILL_READ_BYTES_FROM_LOCAL_STORAGE", TYPE_BIGINT, sizeof(int64_t), false},
};

SchemaBackendActiveTasksScanner::SchemaBackendActiveTasksScanner()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ std::vector<SchemaScanner::ColumnDesc> SchemaBackendWorkloadGroupResourceUsage::
{"CPU_USAGE_PERCENT", TYPE_DOUBLE, sizeof(double), false},
{"LOCAL_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false},
{"REMOTE_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false},
{"WRITE_BUFFER_USAGE_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
};

SchemaBackendWorkloadGroupResourceUsage::SchemaBackendWorkloadGroupResourceUsage()
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ Status BaseDeltaWriter::init() {
return Status::OK();
}

Status DeltaWriter::write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs) {
Status DeltaWriter::write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs) {
if (UNLIKELY(row_idxs.empty())) {
return Status::OK();
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BaseDeltaWriter {

virtual ~BaseDeltaWriter();

virtual Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs) = 0;
virtual Status write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs) = 0;

// flush the last memtable to flush queue, must call it before build_rowset()
virtual Status close() = 0;
Expand Down Expand Up @@ -123,7 +123,7 @@ class DeltaWriter final : public BaseDeltaWriter {

~DeltaWriter() override;

Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs) override;
Status write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs) override;

Status close() override;

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ Status DeltaWriterV2::init() {
return Status::OK();
}

Status DeltaWriterV2::write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs) {
Status DeltaWriterV2::write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs) {
if (UNLIKELY(row_idxs.empty())) {
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/delta_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class DeltaWriterV2 {

Status init();

Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs);
Status write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs);

// flush the last memtable to flush queue, must call it before close_wait()
Status close();
Expand Down
33 changes: 26 additions & 7 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) {
}

MemTable::~MemTable() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
_query_thread_context.query_mem_tracker->write_tracker());
if (_is_flush_success) {
// If the memtable is flush success, then its memtracker's consumption should be 0
if (_mem_tracker->consumption() != 0 && config::crash_in_memory_tracker_inaccurate) {
Expand Down Expand Up @@ -181,7 +182,9 @@ int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* r
}

Status MemTable::insert(const vectorized::Block* input_block,
const std::vector<uint32_t>& row_idxs) {
const DorisVector<uint32_t>& row_idxs) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
_query_thread_context.query_mem_tracker->write_tracker());
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);

if (_is_first_insertion) {
Expand Down Expand Up @@ -279,7 +282,7 @@ void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_blo
}
Status MemTable::_put_into_output(vectorized::Block& in_block) {
SCOPED_RAW_TIMER(&_stat.put_into_output_ns);
std::vector<uint32_t> row_pos_vec;
DorisVector<uint32_t> row_pos_vec;
DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
row_pos_vec.reserve(in_block.rows());
for (int i = 0; i < _row_in_blocks.size(); i++) {
Expand Down Expand Up @@ -340,7 +343,7 @@ Status MemTable::_sort_by_cluster_keys() {
auto clone_block = in_block.clone_without_columns();
_output_mutable_block = vectorized::MutableBlock::build_mutable_block(&clone_block);

std::vector<RowInBlock*> row_in_blocks;
DorisVector<RowInBlock*> row_in_blocks;
std::unique_ptr<int, std::function<void(int*)>> row_in_blocks_deleter((int*)0x01, [&](int*) {
std::for_each(row_in_blocks.begin(), row_in_blocks.end(),
std::default_delete<RowInBlock>());
Expand Down Expand Up @@ -375,7 +378,7 @@ Status MemTable::_sort_by_cluster_keys() {

in_block = mutable_block.to_block();
SCOPED_RAW_TIMER(&_stat.put_into_output_ns);
std::vector<uint32_t> row_pos_vec;
DorisVector<uint32_t> row_pos_vec;
DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
row_pos_vec.reserve(in_block.rows());
for (int i = 0; i < row_in_blocks.size(); i++) {
Expand All @@ -389,7 +392,7 @@ Status MemTable::_sort_by_cluster_keys() {
row_pos_vec.data() + in_block.rows(), &column_offset);
}

void MemTable::_sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie,
void MemTable::_sort_one_column(DorisVector<RowInBlock*>& row_in_blocks, Tie& tie,
std::function<int(const RowInBlock*, const RowInBlock*)> cmp) {
auto iter = tie.iter();
while (iter.next()) {
Expand Down Expand Up @@ -461,7 +464,7 @@ void MemTable::_aggregate() {
vectorized::MutableBlock::build_mutable_block(&in_block);
_vec_row_comparator->set_block(&mutable_block);
auto& block_data = in_block.get_columns_with_type_and_name();
std::vector<RowInBlock*> temp_row_in_blocks;
DorisVector<RowInBlock*> temp_row_in_blocks;
temp_row_in_blocks.reserve(_last_sorted_pos);
RowInBlock* prev_row = nullptr;
int row_pos = -1;
Expand Down Expand Up @@ -579,6 +582,8 @@ void MemTable::_aggregate() {
}

void MemTable::shrink_memtable_by_agg() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
_query_thread_context.query_mem_tracker->write_tracker());
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
if (_keys_type == KeysType::DUP_KEYS) {
return;
Expand Down Expand Up @@ -608,6 +613,20 @@ bool MemTable::need_agg() const {
return false;
}

size_t MemTable::get_flush_reserve_memory_size() const {
size_t reserve_size = 0;
if (_keys_type == KeysType::DUP_KEYS) {
if (_tablet_schema->num_key_columns() == 0) {
// no need to reserve
} else {
reserve_size = _input_mutable_block.allocated_bytes();
}
} else {
reserve_size = _input_mutable_block.allocated_bytes();
}
return reserve_size;
}

Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) {
size_t same_keys_num = _sort();
if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) {
Expand Down
11 changes: 6 additions & 5 deletions be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,9 @@ class MemTable {

int64_t tablet_id() const { return _tablet_id; }
size_t memory_usage() const { return _mem_tracker->consumption(); }
size_t get_flush_reserve_memory_size() const;
// insert tuple from (row_pos) to (row_pos+num_rows)
Status insert(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs);
Status insert(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs);

void shrink_memtable_by_agg();

Expand Down Expand Up @@ -251,7 +252,7 @@ class MemTable {
//return number of same keys
size_t _sort();
Status _sort_by_cluster_keys();
void _sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie,
void _sort_one_column(DorisVector<RowInBlock*>& row_in_blocks, Tie& tie,
std::function<int(const RowInBlock*, const RowInBlock*)> cmp);
template <bool is_final>
void _finalize_one_row(RowInBlock* row, const vectorized::ColumnsWithTypeAndName& block_data,
Expand All @@ -262,10 +263,10 @@ class MemTable {
bool _is_first_insertion;

void _init_agg_functions(const vectorized::Block* block);
std::vector<vectorized::AggregateFunctionPtr> _agg_functions;
std::vector<size_t> _offsets_of_aggregate_states;
DorisVector<vectorized::AggregateFunctionPtr> _agg_functions;
DorisVector<size_t> _offsets_of_aggregate_states;
size_t _total_size_of_aggregate_states;
std::vector<RowInBlock*> _row_in_blocks;
DorisVector<RowInBlock*> _row_in_blocks;

size_t _num_columns;
int32_t _seq_col_idx_in_block = -1;
Expand Down
Loading
Loading