Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix](partial update) Fix incorrect result when partial update include delete sign columns #46194

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ Status BaseTablet::generate_new_block_for_partial_update(
// read current rowset first, if a row in the current rowset has delete sign mark
// we don't need to read values from old block
RETURN_IF_ERROR(read_plan_update.read_columns_by_plan(
*rowset_schema, update_cids, rsid_to_rowset, update_block, &read_index_update));
*rowset_schema, update_cids, rsid_to_rowset, update_block, &read_index_update, false));
size_t update_rows = read_index_update.size();
for (auto i = 0; i < update_cids.size(); ++i) {
for (auto idx = 0; idx < update_rows; ++idx) {
Expand All @@ -1004,19 +1004,17 @@ Status BaseTablet::generate_new_block_for_partial_update(
// rowid in the final block(start from 0, increase, may not continuous becasue we skip to read some rows) -> rowid to read in old_block
std::map<uint32_t, uint32_t> read_index_old;
RETURN_IF_ERROR(read_plan_ori.read_columns_by_plan(*rowset_schema, missing_cids, rsid_to_rowset,
old_block, &read_index_old,
old_block, &read_index_old, true,
new_block_delete_signs));
size_t old_rows = read_index_old.size();
const auto* __restrict old_block_delete_signs =
get_delete_sign_column_data(old_block, old_rows);

DCHECK(old_block_delete_signs != nullptr);
// build default value block
auto default_value_block = old_block.clone_empty();
if (old_block_delete_signs != nullptr || new_block_delete_signs != nullptr) {
RETURN_IF_ERROR(BaseTablet::generate_default_value_block(
*rowset_schema, missing_cids, partial_update_info->default_values, old_block,
default_value_block));
}
RETURN_IF_ERROR(BaseTablet::generate_default_value_block(*rowset_schema, missing_cids,
partial_update_info->default_values,
old_block, default_value_block));

CHECK(update_rows >= old_rows);

Expand Down Expand Up @@ -1078,7 +1076,7 @@ Status BaseTablet::generate_new_block_for_flexible_partial_update(
// 1. read the current rowset first, if a row in the current rowset has delete sign mark
// we don't need to read values from old block for that row
RETURN_IF_ERROR(read_plan_update.read_columns_by_plan(*rowset_schema, all_cids, rsid_to_rowset,
update_block, &read_index_update));
update_block, &read_index_update, true));
size_t update_rows = read_index_update.size();

// TODO(bobhan1): add the delete sign optimazation here
Expand All @@ -1092,8 +1090,8 @@ Status BaseTablet::generate_new_block_for_flexible_partial_update(
// 2. read previous rowsets
// rowid in the final block(start from 0, increase, may not continuous becasue we skip to read some rows) -> rowid to read in old_block
std::map<uint32_t, uint32_t> read_index_old;
RETURN_IF_ERROR(read_plan_ori.read_columns_by_plan(*rowset_schema, non_sort_key_cids,
rsid_to_rowset, old_block, &read_index_old));
RETURN_IF_ERROR(read_plan_ori.read_columns_by_plan(
*rowset_schema, non_sort_key_cids, rsid_to_rowset, old_block, &read_index_old, true));
size_t old_rows = read_index_old.size();
DCHECK(update_rows == old_rows);
const auto* __restrict old_block_delete_signs =
Expand Down
38 changes: 23 additions & 15 deletions be/src/olap/partial_update_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
#include "util/bitmap_value.h"
Expand Down Expand Up @@ -188,6 +187,7 @@ Status PartialUpdateInfo::handle_not_found_error_for_fixed_partial_update(
}
return Status::OK();
}

Status PartialUpdateInfo::handle_not_found_error_for_flexible_partial_update(
const TabletSchema& tablet_schema, BitmapValue* skip_bitmap) const {
DCHECK(skip_bitmap != nullptr);
Expand Down Expand Up @@ -272,10 +272,20 @@ void FixedReadPlan::prepare_to_read(const RowLocation& row_location, size_t pos)
// read columns by read plan
// read_index: ori_pos-> block_idx
Status FixedReadPlan::read_columns_by_plan(
const TabletSchema& tablet_schema, const std::vector<uint32_t> cids_to_read,
const TabletSchema& tablet_schema, std::vector<uint32_t> cids_to_read,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, vectorized::Block& block,
std::map<uint32_t, uint32_t>* read_index,
const signed char* __restrict delete_signs) const {
std::map<uint32_t, uint32_t>* read_index, bool force_read_old_delete_signs,
const signed char* __restrict cur_delete_signs) const {
if (force_read_old_delete_signs) {
// always read delete sign column from historical data
if (const vectorized::ColumnWithTypeAndName* old_delete_sign_column =
block.try_get_by_name(DELETE_SIGN);
old_delete_sign_column == nullptr) {
auto del_col_cid = tablet_schema.field_index(DELETE_SIGN);
cids_to_read.emplace_back(del_col_cid);
block.swap(tablet_schema.create_block_by_cids(cids_to_read));
}
}
bool has_row_column = tablet_schema.has_row_store_for_all_columns();
auto mutable_columns = block.mutate_columns();
size_t read_idx = 0;
Expand All @@ -285,7 +295,7 @@ Status FixedReadPlan::read_columns_by_plan(
CHECK(rowset_iter != rsid_to_rowset.end());
std::vector<uint32_t> rids;
for (auto [rid, pos] : mappings) {
if (delete_signs && delete_signs[pos]) {
if (cur_delete_signs && cur_delete_signs[pos]) {
continue;
}
rids.emplace_back(rid);
Expand Down Expand Up @@ -330,17 +340,15 @@ Status FixedReadPlan::fill_missing_columns(
// segment pos to write -> rowid to read in old_value_block
std::map<uint32_t, uint32_t> read_index;
RETURN_IF_ERROR(read_columns_by_plan(tablet_schema, missing_cids, rsid_to_rowset,
old_value_block, &read_index, nullptr));

const auto* delete_sign_column_data = BaseTablet::get_delete_sign_column_data(old_value_block);
old_value_block, &read_index, true, nullptr));

const auto* old_delete_signs = BaseTablet::get_delete_sign_column_data(old_value_block);
DCHECK(old_delete_signs != nullptr);
// build default value columns
auto default_value_block = old_value_block.clone_empty();
if (has_default_or_nullable || delete_sign_column_data != nullptr) {
RETURN_IF_ERROR(BaseTablet::generate_default_value_block(
tablet_schema, missing_cids, rowset_ctx->partial_update_info->default_values,
old_value_block, default_value_block));
}
RETURN_IF_ERROR(BaseTablet::generate_default_value_block(
tablet_schema, missing_cids, rowset_ctx->partial_update_info->default_values,
old_value_block, default_value_block));
auto mutable_default_value_columns = default_value_block.mutate_columns();

// fill all missing value from mutable_old_columns, need to consider default value and null value
Expand All @@ -353,8 +361,8 @@ Status FixedReadPlan::fill_missing_columns(
// to check if a row REALLY exists in the table.
auto segment_pos = idx + segment_start_pos;
auto pos_in_old_block = read_index[segment_pos];
if (use_default_or_null_flag[idx] || (delete_sign_column_data != nullptr &&
delete_sign_column_data[pos_in_old_block] != 0)) {
if (use_default_or_null_flag[idx] ||
(old_delete_signs != nullptr && old_delete_signs[pos_in_old_block] != 0)) {
for (auto i = 0; i < missing_cids.size(); ++i) {
// if the column has default value, fill it with default value
// otherwise, if the column is nullable, fill it with null value
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/partial_update_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,11 @@ class FixedReadPlan {
public:
void prepare_to_read(const RowLocation& row_location, size_t pos);
Status read_columns_by_plan(const TabletSchema& tablet_schema,
const std::vector<uint32_t> cids_to_read,
std::vector<uint32_t> cids_to_read,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
vectorized::Block& block, std::map<uint32_t, uint32_t>* read_index,
const signed char* __restrict delete_signs = nullptr) const;
bool force_read_old_delete_signs,
const signed char* __restrict cur_delete_signs = nullptr) const;
Status fill_missing_columns(RowsetWriterContext* rowset_ctx,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
const TabletSchema& tablet_schema, vectorized::Block& full_block,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3
4 4 4 4 4
5 5 5 5 5

-- !sql --
1 1 1 987 987
2 \N \N 987 987
3 3 3 3 3
4 -1 -1 987 987
5 \N \N 987 987

Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
7 7 7 7
8 8 8 8
10 \N 999 \N
11 \N 888 \N
11 \N \N \N

-- !sql_2_1 --
0 0 0 0
3 3 30 3
7 7 7 7
8 8 8 8
10 \N 999 \N
11 \N 888 \N
11 \N \N \N

-- !sql_2_2 --
0 0 0 0
Expand Down Expand Up @@ -90,7 +90,7 @@
7 7 7 7
8 8 8 8
10 \N 999 \N
11 \N 888 \N
11 \N \N \N

-- !inspect --
0 0 0 0 1 0
Expand All @@ -109,15 +109,15 @@
8 8 8 8 1 0
10 \N 999 \N 2 0
11 \N 888 \N 2 1
11 \N 888 \N 3 0
11 \N \N \N 3 0

-- !sql_4_1 --
0 0 0 0
3 3 30 3
7 7 7 7
8 8 8 8
10 \N 999 \N
11 \N 888 \N
11 \N \N \N

-- !inspect --
0 0 0 0 1 0
Expand All @@ -138,7 +138,7 @@
8 8 8 8 1 0
10 \N 999 \N 2 0
11 \N 888 \N 2 1
11 \N 888 \N 3 0
11 \N \N \N 3 0

-- !sql_4_2 --
0 0 0 0
Expand Down Expand Up @@ -166,8 +166,8 @@
8 8 8 8 1 0
10 \N 999 \N 2 0
11 \N 888 \N 2 1
11 \N 888 \N 3 0
11 \N 888 \N 5 1
11 \N \N \N 3 0
11 \N \N \N 5 1

-- !sql --
0 0 0 0
Expand Down Expand Up @@ -198,15 +198,15 @@
7 7 7 7
8 8 8 8
10 \N 999 \N
11 \N 888 \N
11 \N \N \N

-- !sql_2_1 --
0 0 0 0
3 3 30 3
7 7 7 7
8 8 8 8
10 \N 999 \N
11 \N 888 \N
11 \N \N \N

-- !sql_2_2 --
0 0 0 0
Expand Down Expand Up @@ -260,7 +260,7 @@
7 7 7 7
8 8 8 8
10 \N 999 \N
11 \N 888 \N
11 \N \N \N

-- !inspect --
0 0 0 0 1 0
Expand All @@ -279,15 +279,15 @@
8 8 8 8 1 0
10 \N 999 \N 2 0
11 \N 888 \N 2 1
11 \N 888 \N 3 0
11 \N \N \N 3 0

-- !sql_4_1 --
0 0 0 0
3 3 30 3
7 7 7 7
8 8 8 8
10 \N 999 \N
11 \N 888 \N
11 \N \N \N

-- !inspect --
0 0 0 0 1 0
Expand All @@ -308,7 +308,7 @@
8 8 8 8 1 0
10 \N 999 \N 2 0
11 \N 888 \N 2 1
11 \N 888 \N 3 0
11 \N \N \N 3 0

-- !sql_4_2 --
0 0 0 0
Expand Down Expand Up @@ -336,6 +336,6 @@
8 8 8 8 1 0
10 \N 999 \N 2 0
11 \N 888 \N 2 1
11 \N 888 \N 3 0
11 \N 888 \N 5 1
11 \N \N \N 3 0
11 \N \N \N 5 1

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

-- !partial_update_with_seq_hidden_columns --
1 doris 200 123 1 2023-01-01 1 3 2023-01-01
2 doris2 2600 223 1 2023-07-20 1 4 2023-07-20
2 unknown 2600 \N 4321 2023-07-20 1 4 2023-07-20
3 unknown 1500 \N 4321 2022-07-20 1 4 2022-07-20

-- !select_default --
Expand All @@ -30,6 +30,6 @@

-- !partial_update_with_seq_hidden_columns --
1 doris 200 123 1 2023-01-01 1 3 2023-01-01
2 doris2 2600 223 1 2023-07-20 1 4 2023-07-20
2 unknown 2600 \N 4321 2023-07-20 1 4 2023-07-20
3 unknown 1500 \N 4321 2022-07-20 1 4 2022-07-20

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
-- !partial_update_with_seq_test --

-- !partial_update_with_seq_test_hidden --
1 doris 2300 2300 1 2021-05-19 1 4 2300
1 unknown 2300 2300 4321 2021-05-19 1 4 2300
2 doris2 3600 2400 1 2019-01-23 1 3 3600
3 unknown 1500 2500 4321 2022-03-31 1 4 2500

Expand Down Expand Up @@ -41,7 +41,7 @@
-- !partial_update_with_seq_test --

-- !partial_update_with_seq_test_hidden --
1 doris 2300 2300 1 2021-05-19 1 4 2300
1 unknown 2300 2300 4321 2021-05-19 1 4 2300
2 doris2 3600 2400 1 2019-01-23 1 3 3600
3 unknown 1500 2500 4321 2022-03-31 1 4 2500

Expand Down
Loading
Loading