Skip to content

Commit

Permalink
[fix](partial update) mishandling of exceptions in the publish phase …
Browse files Browse the repository at this point in the history
…may result in data loss (apache#30366)
  • Loading branch information
zhannngchen authored Jan 26, 2024
1 parent a35671b commit 6936b9b
Show file tree
Hide file tree
Showing 9 changed files with 362 additions and 72 deletions.
44 changes: 38 additions & 6 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3110,16 +3110,28 @@ Status Tablet::commit_phase_update_delete_bitmap(
return Status::OK();
}

Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset,
const RowsetIdUnorderedSet& pre_rowset_ids,
DeleteBitmapPtr delete_bitmap, int64_t txn_id,
RowsetWriter* rowset_writer) {
Status Tablet::update_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id) {
SCOPED_BVAR_LATENCY(g_tablet_update_delete_bitmap_latency);
RowsetIdUnorderedSet cur_rowset_ids;
RowsetIdUnorderedSet rowset_ids_to_add;
RowsetIdUnorderedSet rowset_ids_to_del;
RowsetSharedPtr rowset = txn_info->rowset;
int64_t cur_version = rowset->start_version();

std::unique_ptr<RowsetWriter> rowset_writer;
RETURN_IF_ERROR(
create_transient_rowset_writer(rowset, &rowset_writer, txn_info->partial_update_info));

DeleteBitmapPtr delete_bitmap = txn_info->delete_bitmap;
// Partial update might generate new segments when there is conflicts while publish, and mark
// the same key in original segments as delete.
// When the new segment flush fails or the rowset build fails, the deletion marker for the
// duplicate key of the original segment should not remain in `txn_info->delete_bitmap`,
// so we need to make a copy of `txn_info->delete_bitmap` and make changes on it.
if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update) {
delete_bitmap = std::make_shared<DeleteBitmap>(*(txn_info->delete_bitmap));
}

OlapStopWatch watch;
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(_load_rowset_segments(rowset, &segments));
Expand All @@ -3137,7 +3149,8 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset,
}
auto t2 = watch.get_elapse_time_us();

_rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, &rowset_ids_to_del);
_rowset_ids_difference(cur_rowset_ids, txn_info->rowset_ids, &rowset_ids_to_add,
&rowset_ids_to_del);
for (const auto& to_del : rowset_ids_to_del) {
delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX});
}
Expand All @@ -3151,7 +3164,7 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset,

auto token = _engine.calc_delete_bitmap_executor()->create_token();
RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap,
cur_version - 1, token.get(), rowset_writer));
cur_version - 1, token.get(), rowset_writer.get()));
RETURN_IF_ERROR(token->wait());

std::stringstream ss;
Expand Down Expand Up @@ -3183,6 +3196,25 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset,
_remove_sentinel_mark_from_delete_bitmap(delete_bitmap);
}

if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update) {
DBUG_EXECUTE_IF("Tablet.update_delete_bitmap.partial_update_write_rowset_fail", {
if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
LOG_WARNING("Tablet.update_delete_bitmap.partial_update_write_rowset random failed")
.tag("txn_id", txn_id);
return Status::InternalError(
"debug update_delete_bitmap partial update write rowset random failed");
}
});
// build rowset writer and merge transient rowset
RETURN_IF_ERROR(rowset_writer->flush());
RowsetSharedPtr transient_rowset;
RETURN_IF_ERROR(rowset_writer->build(transient_rowset));
rowset->merge_rowset_meta(transient_rowset->rowset_meta());

// erase segment cache cause we will add a segment to rowset
SegmentLoader::instance()->erase_segments(rowset->rowset_id(), rowset->num_segments());
}

// update version without write lock, compaction and publish_txn
// will update delete bitmap, handle compaction with _rowset_update_lock
// and publish_txn runs sequential so no need to lock here
Expand Down
6 changes: 2 additions & 4 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class TupleDescriptor;
class CalcDeleteBitmapToken;
enum CompressKind : int;
class RowsetBinlogMetasPB;
struct TabletTxnInfo;

namespace io {
class RemoteFileSystem;
Expand Down Expand Up @@ -451,10 +452,7 @@ class Tablet final : public BaseTablet {
const std::vector<segment_v2::SegmentSharedPtr>& segments, int64_t txn_id,
CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer = nullptr);

Status update_delete_bitmap(const RowsetSharedPtr& rowset,
const RowsetIdUnorderedSet& pre_rowset_ids,
DeleteBitmapPtr delete_bitmap, int64_t txn_id,
RowsetWriter* rowset_writer = nullptr);
Status update_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id);
void calc_compaction_output_rowset_delete_bitmap(
const std::vector<RowsetSharedPtr>& input_rowsets,
const RowIdConversion& rowid_conversion, uint64_t start_version, uint64_t end_version,
Expand Down
63 changes: 2 additions & 61 deletions be/src/olap/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,46 +63,6 @@ using std::vector;
namespace doris {
using namespace ErrorCode;

struct TabletTxnInfo {
PUniqueId load_id;
RowsetSharedPtr rowset;
PendingRowsetGuard pending_rs_guard;
bool unique_key_merge_on_write {false};
DeleteBitmapPtr delete_bitmap;
// records rowsets calc in commit txn
RowsetIdUnorderedSet rowset_ids;
int64_t creation_time;
bool ingest {false};
std::shared_ptr<PartialUpdateInfo> partial_update_info;
TxnState state {TxnState::PREPARED};

TabletTxnInfo() = default;

TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset)
: load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {}

TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool ingest_arg)
: load_id(load_id), rowset(rowset), creation_time(UnixSeconds()), ingest(ingest_arg) {}

TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool merge_on_write,
DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& ids)
: load_id(load_id),
rowset(rowset),
unique_key_merge_on_write(merge_on_write),
delete_bitmap(delete_bitmap),
rowset_ids(ids),
creation_time(UnixSeconds()) {}

void prepare() { state = TxnState::PREPARED; }
void commit() { state = TxnState::COMMITTED; }
void rollback() { state = TxnState::ROLLEDBACK; }
void abort() {
if (state == TxnState::PREPARED) {
state = TxnState::ABORTED;
}
}
};

TxnManager::TxnManager(StorageEngine& engine, int32_t txn_map_shard_size, int32_t txn_shard_size)
: _engine(engine),
_txn_map_shard_size(txn_map_shard_size),
Expand Down Expand Up @@ -521,33 +481,14 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
});
// update delete_bitmap
if (tablet_txn_info->unique_key_merge_on_write) {
std::unique_ptr<RowsetWriter> rowset_writer;
RETURN_IF_ERROR(tablet->create_transient_rowset_writer(
rowset, &rowset_writer, tablet_txn_info->partial_update_info));

int64_t t2 = MonotonicMicros();
RETURN_IF_ERROR(tablet->update_delete_bitmap(rowset, tablet_txn_info->rowset_ids,
tablet_txn_info->delete_bitmap, transaction_id,
rowset_writer.get()));
RETURN_IF_ERROR(tablet->update_delete_bitmap(tablet_txn_info.get(), transaction_id));
int64_t t3 = MonotonicMicros();
stats->calc_delete_bitmap_time_us = t3 - t2;
if (tablet_txn_info->partial_update_info &&
tablet_txn_info->partial_update_info->is_partial_update) {
// build rowset writer and merge transient rowset
RETURN_IF_ERROR(rowset_writer->flush());
RowsetSharedPtr transient_rowset;
RETURN_IF_ERROR(rowset_writer->build(transient_rowset));
rowset->merge_rowset_meta(transient_rowset->rowset_meta());

// erase segment cache cause we will add a segment to rowset
SegmentLoader::instance()->erase_segments(rowset->rowset_id(), rowset->num_segments());
}
stats->partial_update_write_segment_us = MonotonicMicros() - t3;
int64_t t4 = MonotonicMicros();
RETURN_IF_ERROR(TabletMetaManager::save_delete_bitmap(
tablet->data_dir(), tablet->tablet_id(), tablet_txn_info->delete_bitmap,
version.second));
stats->save_meta_time_us = MonotonicMicros() - t4;
stats->save_meta_time_us = MonotonicMicros() - t3;
}

/// Step 3: add to binlog
Expand Down
40 changes: 39 additions & 1 deletion be/src/olap/txn_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,45 @@ enum class TxnState {
DELETED = 5,
};

struct TabletTxnInfo;
struct TabletTxnInfo {
PUniqueId load_id;
RowsetSharedPtr rowset;
PendingRowsetGuard pending_rs_guard;
bool unique_key_merge_on_write {false};
DeleteBitmapPtr delete_bitmap;
// records rowsets calc in commit txn
RowsetIdUnorderedSet rowset_ids;
int64_t creation_time;
bool ingest {false};
std::shared_ptr<PartialUpdateInfo> partial_update_info;
TxnState state {TxnState::PREPARED};

TabletTxnInfo() = default;

TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset)
: load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {}

TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool ingest_arg)
: load_id(load_id), rowset(rowset), creation_time(UnixSeconds()), ingest(ingest_arg) {}

TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool merge_on_write,
DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& ids)
: load_id(load_id),
rowset(rowset),
unique_key_merge_on_write(merge_on_write),
delete_bitmap(delete_bitmap),
rowset_ids(ids),
creation_time(UnixSeconds()) {}

void prepare() { state = TxnState::PREPARED; }
void commit() { state = TxnState::COMMITTED; }
void rollback() { state = TxnState::ROLLEDBACK; }
void abort() {
if (state == TxnState::PREPARED) {
state = TxnState::ABORTED;
}
}
};

struct CommitTabletTxnInfo {
TTransactionId transaction_id {0};
Expand Down
21 changes: 21 additions & 0 deletions regression-test/data/fault_injection_p0/concurrency_update1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 changes: 21 additions & 0 deletions regression-test/data/fault_injection_p0/concurrency_update2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
0,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
1,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
2,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
3,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
4,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
5,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
6,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
7,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
8,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
9,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
10,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
11,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
12,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
13,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
14,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
15,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
16,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
17,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
18,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
19,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
20,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
21 changes: 21 additions & 0 deletions regression-test/data/fault_injection_p0/concurrency_update3.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
0,b0
1,b1
2,b2
3,b3
4,b4
5,b5
6,b6
7,b7
8,b8
9,b9
10,b10
11,b11
12,b12
13,b13
14,b14
15,b15
16,b16
17,b17
18,b18
19,b19
20,b20
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
0 \N \N \N \N \N
1 \N \N \N \N \N
10 \N \N \N \N \N
11 \N \N \N \N \N
12 \N \N \N \N \N
13 \N \N \N \N \N
14 \N \N \N \N \N
15 \N \N \N \N \N
16 \N \N \N \N \N
17 \N \N \N \N \N
18 \N \N \N \N \N
19 \N \N \N \N \N
2 \N \N \N \N \N
20 \N \N \N \N \N
3 \N \N \N \N \N
4 \N \N \N \N \N
5 \N \N \N \N \N
6 \N \N \N \N \N
7 \N \N \N \N \N
8 \N \N \N \N \N
9 \N \N \N \N \N

-- !sql --
0 aaaaaaaaaa b0 \N \N \N
1 aaaaaaaaaa b1 \N \N \N
10 aaaaaaaaaa b10 \N \N \N
11 aaaaaaaaaa b11 \N \N \N
12 aaaaaaaaaa b12 \N \N \N
13 aaaaaaaaaa b13 \N \N \N
14 aaaaaaaaaa b14 \N \N \N
15 aaaaaaaaaa b15 \N \N \N
16 aaaaaaaaaa b16 \N \N \N
17 aaaaaaaaaa b17 \N \N \N
18 aaaaaaaaaa b18 \N \N \N
19 aaaaaaaaaa b19 \N \N \N
2 aaaaaaaaaa b2 \N \N \N
20 aaaaaaaaaa b20 \N \N \N
3 aaaaaaaaaa b3 \N \N \N
4 aaaaaaaaaa b4 \N \N \N
5 aaaaaaaaaa b5 \N \N \N
6 aaaaaaaaaa b6 \N \N \N
7 aaaaaaaaaa b7 \N \N \N
8 aaaaaaaaaa b8 \N \N \N
9 aaaaaaaaaa b9 \N \N \N

Loading

0 comments on commit 6936b9b

Please sign in to comment.