Skip to content

Commit

Permalink
[Fix](partial update) Fix incorrect result when partial update includ…
Browse files Browse the repository at this point in the history
…e delete sign columns (apache#46194)

Currently, when a partial update load include delete sign columns, it
will not read delete sign columns from historical data. This may result
in incorrect result because it may read data from rows whose delete sign
is 1.
  • Loading branch information
bobhan1 committed Jan 3, 2025
1 parent b4c36d3 commit e9afb8d
Show file tree
Hide file tree
Showing 13 changed files with 408 additions and 44 deletions.
14 changes: 6 additions & 8 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ Status BaseTablet::generate_new_block_for_partial_update(
// read current rowset first, if a row in the current rowset has delete sign mark
// we don't need to read values from old block
RETURN_IF_ERROR(read_plan_update.read_columns_by_plan(
*rowset_schema, update_cids, rsid_to_rowset, update_block, &read_index_update));
*rowset_schema, update_cids, rsid_to_rowset, update_block, &read_index_update, false));
size_t update_rows = read_index_update.size();
for (auto i = 0; i < update_cids.size(); ++i) {
for (auto idx = 0; idx < update_rows; ++idx) {
Expand All @@ -951,19 +951,17 @@ Status BaseTablet::generate_new_block_for_partial_update(
// rowid in the final block(start from 0, increase, may not continuous becasue we skip to read some rows) -> rowid to read in old_block
std::map<uint32_t, uint32_t> read_index_old;
RETURN_IF_ERROR(read_plan_ori.read_columns_by_plan(*rowset_schema, missing_cids, rsid_to_rowset,
old_block, &read_index_old,
old_block, &read_index_old, true,
new_block_delete_signs));
size_t old_rows = read_index_old.size();
const auto* __restrict old_block_delete_signs =
get_delete_sign_column_data(old_block, old_rows);

DCHECK(old_block_delete_signs != nullptr);
// build default value block
auto default_value_block = old_block.clone_empty();
if (old_block_delete_signs != nullptr || new_block_delete_signs != nullptr) {
RETURN_IF_ERROR(BaseTablet::generate_default_value_block(
*rowset_schema, missing_cids, partial_update_info->default_values, old_block,
default_value_block));
}
RETURN_IF_ERROR(BaseTablet::generate_default_value_block(*rowset_schema, missing_cids,
partial_update_info->default_values,
old_block, default_value_block));
auto mutable_default_value_columns = default_value_block.mutate_columns();

CHECK(update_rows >= old_rows);
Expand Down
37 changes: 23 additions & 14 deletions be/src/olap/partial_update_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
#include "util/bitmap_value.h"
Expand Down Expand Up @@ -206,9 +205,21 @@ void PartialUpdateReadPlan::prepare_to_read(const RowLocation& row_location, siz
// read columns by read plan
// read_index: ori_pos-> block_idx
Status PartialUpdateReadPlan::read_columns_by_plan(
const TabletSchema& tablet_schema, const std::vector<uint32_t> cids_to_read,
const TabletSchema& tablet_schema, std::vector<uint32_t> cids_to_read,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, vectorized::Block& block,
std::map<uint32_t, uint32_t>* read_index, const signed char* __restrict skip_map) const {
std::map<uint32_t, uint32_t>* read_index, bool force_read_old_delete_signs,
const signed char* __restrict cur_delete_signs) const {
if (force_read_old_delete_signs) {
// always read delete sign column from historical data
if (const vectorized::ColumnWithTypeAndName* old_delete_sign_column =
block.try_get_by_name(DELETE_SIGN);
old_delete_sign_column == nullptr) {
auto del_col_cid = tablet_schema.field_index(DELETE_SIGN);
cids_to_read.emplace_back(del_col_cid);
block.swap(tablet_schema.create_block_by_cids(cids_to_read));
}
}

bool has_row_column = tablet_schema.has_row_store_for_all_columns();
auto mutable_columns = block.mutate_columns();
size_t read_idx = 0;
Expand All @@ -218,7 +229,7 @@ Status PartialUpdateReadPlan::read_columns_by_plan(
CHECK(rowset_iter != rsid_to_rowset.end());
std::vector<uint32_t> rids;
for (auto [rid, pos] : mappings) {
if (skip_map && skip_map[pos]) {
if (cur_delete_signs && cur_delete_signs[pos]) {
continue;
}
rids.emplace_back(rid);
Expand Down Expand Up @@ -263,17 +274,15 @@ Status PartialUpdateReadPlan::fill_missing_columns(
// record real pos, key is input line num, value is old_block line num
std::map<uint32_t, uint32_t> read_index;
RETURN_IF_ERROR(read_columns_by_plan(tablet_schema, missing_cids, rsid_to_rowset,
old_value_block, &read_index, nullptr));

const auto* delete_sign_column_data = BaseTablet::get_delete_sign_column_data(old_value_block);
old_value_block, &read_index, true, nullptr));

const auto* old_delete_signs = BaseTablet::get_delete_sign_column_data(old_value_block);
DCHECK(old_delete_signs != nullptr);
// build default value columns
auto default_value_block = old_value_block.clone_empty();
if (has_default_or_nullable || delete_sign_column_data != nullptr) {
RETURN_IF_ERROR(BaseTablet::generate_default_value_block(
tablet_schema, missing_cids, rowset_ctx->partial_update_info->default_values,
old_value_block, default_value_block));
}
RETURN_IF_ERROR(BaseTablet::generate_default_value_block(
tablet_schema, missing_cids, rowset_ctx->partial_update_info->default_values,
old_value_block, default_value_block));
auto mutable_default_value_columns = default_value_block.mutate_columns();

// fill all missing value from mutable_old_columns, need to consider default value and null value
Expand All @@ -285,8 +294,8 @@ Status PartialUpdateReadPlan::fill_missing_columns(
// read values from old rows for missing values in this occasion. So we should read the DELETE_SIGN column
// to check if a row REALLY exists in the table.
auto pos_in_old_block = read_index[idx + segment_start_pos];
if (use_default_or_null_flag[idx] || (delete_sign_column_data != nullptr &&
delete_sign_column_data[pos_in_old_block] != 0)) {
if (use_default_or_null_flag[idx] ||
(old_delete_signs != nullptr && old_delete_signs[pos_in_old_block] != 0)) {
for (auto i = 0; i < missing_cids.size(); ++i) {
// if the column has default value, fill it with default value
// otherwise, if the column is nullable, fill it with null value
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/partial_update_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ class PartialUpdateReadPlan {
public:
void prepare_to_read(const RowLocation& row_location, size_t pos);
Status read_columns_by_plan(const TabletSchema& tablet_schema,
const std::vector<uint32_t> cids_to_read,
std::vector<uint32_t> cids_to_read,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
vectorized::Block& block, std::map<uint32_t, uint32_t>* read_index,
const signed char* __restrict skip_map = nullptr) const;
bool force_read_old_delete_signs,
const signed char* __restrict cur_delete_signs = nullptr) const;
Status fill_missing_columns(RowsetWriterContext* rowset_ctx,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
const TabletSchema& tablet_schema, vectorized::Block& full_block,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3
4 4 4 4 4
5 5 5 5 5

-- !sql --
1 1 1 987 987
2 \N \N 987 987
3 3 3 3 3
4 -1 -1 987 987
5 \N \N 987 987

Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
7 7 7 7
8 8 8 8
10 \N 999 \N
11 \N 888 \N
11 \N \N \N

-- !sql_2_1 --
0 0 0 0
3 3 30 3
7 7 7 7
8 8 8 8
10 \N 999 \N
11 \N 888 \N
11 \N \N \N

-- !sql_2_2 --
0 0 0 0
Expand Down Expand Up @@ -90,7 +90,7 @@
7 7 7 7
8 8 8 8
10 \N 999 \N
11 \N 888 \N
11 \N \N \N

-- !inspect --
0 0 0 0 1 0
Expand All @@ -109,15 +109,15 @@
8 8 8 8 1 0
10 \N 999 \N 2 0
11 \N 888 \N 2 1
11 \N 888 \N 3 0
11 \N \N \N 3 0

-- !sql_4_1 --
0 0 0 0
3 3 30 3
7 7 7 7
8 8 8 8
10 \N 999 \N
11 \N 888 \N
11 \N \N \N

-- !inspect --
0 0 0 0 1 0
Expand All @@ -138,7 +138,7 @@
8 8 8 8 1 0
10 \N 999 \N 2 0
11 \N 888 \N 2 1
11 \N 888 \N 3 0
11 \N \N \N 3 0

-- !sql_4_2 --
0 0 0 0
Expand Down Expand Up @@ -166,8 +166,8 @@
8 8 8 8 1 0
10 \N 999 \N 2 0
11 \N 888 \N 2 1
11 \N 888 \N 3 0
11 \N 888 \N 5 1
11 \N \N \N 3 0
11 \N \N \N 5 1

-- !sql --
0 0 0 0
Expand Down Expand Up @@ -198,15 +198,15 @@
7 7 7 7
8 8 8 8
10 \N 999 \N
11 \N 888 \N
11 \N \N \N

-- !sql_2_1 --
0 0 0 0
3 3 30 3
7 7 7 7
8 8 8 8
10 \N 999 \N
11 \N 888 \N
11 \N \N \N

-- !sql_2_2 --
0 0 0 0
Expand Down Expand Up @@ -260,7 +260,7 @@
7 7 7 7
8 8 8 8
10 \N 999 \N
11 \N 888 \N
11 \N \N \N

-- !inspect --
0 0 0 0 1 0
Expand All @@ -279,15 +279,15 @@
8 8 8 8 1 0
10 \N 999 \N 2 0
11 \N 888 \N 2 1
11 \N 888 \N 3 0
11 \N \N \N 3 0

-- !sql_4_1 --
0 0 0 0
3 3 30 3
7 7 7 7
8 8 8 8
10 \N 999 \N
11 \N 888 \N
11 \N \N \N

-- !inspect --
0 0 0 0 1 0
Expand All @@ -308,7 +308,7 @@
8 8 8 8 1 0
10 \N 999 \N 2 0
11 \N 888 \N 2 1
11 \N 888 \N 3 0
11 \N \N \N 3 0

-- !sql_4_2 --
0 0 0 0
Expand Down Expand Up @@ -336,6 +336,6 @@
8 8 8 8 1 0
10 \N 999 \N 2 0
11 \N 888 \N 2 1
11 \N 888 \N 3 0
11 \N 888 \N 5 1
11 \N \N \N 3 0
11 \N \N \N 5 1

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

-- !partial_update_with_seq_hidden_columns --
1 doris 200 123 1 2023-01-01 1 3 2023-01-01
2 doris2 2600 223 1 2023-07-20 1 4 2023-07-20
2 unknown 2600 \N 4321 2023-07-20 1 4 2023-07-20
3 unknown 1500 \N 4321 2022-07-20 1 4 2022-07-20

-- !select_default --
Expand All @@ -30,6 +30,6 @@

-- !partial_update_with_seq_hidden_columns --
1 doris 200 123 1 2023-01-01 1 3 2023-01-01
2 doris2 2600 223 1 2023-07-20 1 4 2023-07-20
2 unknown 2600 \N 4321 2023-07-20 1 4 2023-07-20
3 unknown 1500 \N 4321 2022-07-20 1 4 2022-07-20

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
-- !partial_update_with_seq_test --

-- !partial_update_with_seq_test_hidden --
1 doris 2300 2300 1 2021-05-19 1 4 2300
1 unknown 2300 2300 4321 2021-05-19 1 4 2300
2 doris2 3600 2400 1 2019-01-23 1 3 3600
3 unknown 1500 2500 4321 2022-03-31 1 4 2500

Expand Down Expand Up @@ -41,7 +41,7 @@
-- !partial_update_with_seq_test --

-- !partial_update_with_seq_test_hidden --
1 doris 2300 2300 1 2021-05-19 1 4 2300
1 unknown 2300 2300 4321 2021-05-19 1 4 2300
2 doris2 3600 2400 1 2019-01-23 1 3 3600
3 unknown 1500 2500 4321 2022-03-31 1 4 2500

Expand Down
Loading

0 comments on commit e9afb8d

Please sign in to comment.