-
Notifications
You must be signed in to change notification settings - Fork 599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(cdc): persist the backfill state for table-on-source #13276
Conversation
96991d1
to
7b20a95
Compare
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #13276 +/- ##
==========================================
- Coverage 67.87% 67.86% -0.01%
==========================================
Files 1526 1526
Lines 259952 260043 +91
==========================================
+ Hits 176432 176479 +47
- Misses 83520 83564 +44
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Is there some test for cdc backfill over recovery? |
Right now only has e2e test. I insert new data into MV created in |
} | ||
|
||
/// Mark the backfill has done and save the last cdc offset | ||
pub async fn mutate_state(&mut self, state_item: CdcStateItem) -> StreamExecutorResult<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub async fn mutate_state(&mut self, state_item: CdcStateItem) -> StreamExecutorResult<()> { | |
pub async fn finish_state(&mut self, state_item: CdcStateItem) -> StreamExecutorResult<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is misleading, the function is not called only after the backfill is complete. We use this function to update the backfill state.
|
||
Ok(CdcStateItem { | ||
is_finished: finished, | ||
..Default::default() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm should we only recover is_finished
? What about last cdc offset, row count etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cdc offset and row count is only for observability, the executor doesn't rely on these fields.
Existing comments addressed. PTAL @hzxa21 @fuyufjh @BugenZhao |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly LGTM.
This PR only records a boolean flag - completed or not in the persisted state, intead of a real progress (marked by PK perhaps). I think we had better complete this before releasing to users, because we can't just tell users the progress is persisted but either 0% or 100%.
pub enum CdcBackfillStateImpl<S: StateStore> { | ||
Undefined, | ||
SingleTable(SingleTableState<S>), | ||
SingleTable(SingleBackfillState<S>), | ||
MultiTable(MultiBackfillState<S>), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the future, shall we deprecate the SingleTable
option (and remove the enum
)? It seems not worth to maintain it because MultiTable
can completely cover its use case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also want to deprecate it. But we need to change the original Table
job plan from source -> mview
to source -> stream_scan -> mview
, so that it can has its own state, technically it is feasible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Currently we have 3 ways to use mysql-cdc:
create table xxx with (....)
set cdc_backfill='true'
+create table xxx with (...)
set cdc_backfill=true
+create source with (...)
+create table from ...
It is confused and not easy to explain to user which one to choose. Ideally we should only have 2 ways:
create table xxx with (....)
create source with (...)
+create table from ...
1 and 2 can share the same implementation and 1 can be a syntax suger for 2 if only user only needs to cdc one table from the db.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The question is whether we need to ensure compatibility to make existing jobs created via set cdc_backfill='true'
+ create table xxx with (...)
to be compatible. IMO, we can keep it simple and avoid compatibility codes here since cdc_backfill is announced as experimental feature only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the pr to support recoverable backfill, that is to record the backfill progress and can continue the progress upon cluster recovery.
pub enum CdcBackfillStateImpl<S: StateStore> { | ||
Undefined, | ||
SingleTable(SingleTableState<S>), | ||
SingleTable(SingleBackfillState<S>), | ||
MultiTable(MultiBackfillState<S>), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also want to deprecate it. But we need to change the original Table
job plan from source -> mview
to source -> stream_scan -> mview
, so that it can has its own state, technically it is feasible.
bec0206
to
26a6113
Compare
let mut key = self.split_id.to_string(); | ||
key.push_str(BACKFILL_STATE_KEY_SUFFIX); | ||
// write backfill finished flag | ||
self.source_state_handler | ||
.set(key.into(), JsonbVal::from(Value::Bool(true))) | ||
.set( | ||
key.into(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realized that we are treating the source state table as a key value store and setting two keys (<split_id>_backfill
here and <split_id>
in L298). I feel that this can make the source state table contains different primary key schema. Will this break the read path of the source state table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since for the backfill for single table scenario, we didn't introduce a new operator to the query plan. Just wraps the source executor in the cdc backfill executor and reuse the state table of Source
. We access source state table only via point query with the key column, so it won't break.
Box::new(source_exec), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True. If only get is used in the streaming job, there won't be unexpected result. But when querying source state via SQL, we can see something like this:
partition_id | offset_info
---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1002 | {"split_info": {"mysql_split": {"inner": {"snapshot_done": false, "split_id": 1002, "start_offset": "{\"sourcePartition\":{\"server\":\"RW_CDC_1002\"},\"sourceOffset\":{\"transaction_id\":null,\"ts_sec\":1699866101,\"file\":\"binlog.000012\",\"pos\":157,\"server_id\":1},\"isHeartbeat\":true}"}}, "pg_split": null}, "split_type": "mysql-cdc"}
1002_backfill | true
(2 rows)
This looks very strange to me. How about storing the backfill finished flag inside the json as a new field under "mysql_split"? This issue is introduced in #12535 and not related to the changes in this PR. I am okay with handling it separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it is a bit dirty. As mentioned in #13276 (comment), I plan to refactor the plan of single table cdc backfill to make the cdc backfill become a separate operator instead of just wraps the source inside it. So that we don't need to write state to source state table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Could you update the PR description and add a test for this as well (separate PR is fine as well, as long as there's an issue to track it). Previously the test only tests for case where cdc backfill is done. |
Also do we need both pk and cdc offset? |
Thanks for the reminder, I will add more test for the recoverable backfill part, tracked by #13400
The persisted cdc_offset acts as a low watermark for upstream events to ignore events with offset that less than the cdc_offset. And also may help us debuging the backfill process. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Add a state table (
table_id | pk columns... | backfill_finished | row_count | cdc_offset
) to the CdcBackfill executor to persist the backfill progress during backfill. Thepk columns
would be multiple columns if the table has composite primary keys.close #13204
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.