Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): persist the backfill state for table-on-source #13276

Merged
merged 22 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,18 @@ export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.share_stream.slt'


# kill cluster and the connector node
# kill cluster
cargo make kill
echo "cluster killed "

# insert into mytest database (cdc.share_stream.slt)
mysql --protocol=tcp -u root mytest -e "INSERT INTO products
VALUES (default,'RisingWave','Next generation Streaming Database'),
(default,'Materialize','The Streaming Database You Already Know How to Use');
UPDATE products SET name = 'RW' WHERE id <= 103;
INSERT INTO orders VALUES (default, '2022-12-01 15:08:22', 'Sam', 1000.52, 110, false);"


StrikeW marked this conversation as resolved.
Show resolved Hide resolved
# insert new rows
mysql --host=mysql --port=3306 -u root -p123456 < ./e2e_test/source/cdc/mysql_cdc_insert.sql
psql < ./e2e_test/source/cdc/postgres_cdc_insert.sql
Expand Down
18 changes: 18 additions & 0 deletions e2e_test/source/cdc/cdc.check_new_rows.slt
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,21 @@ select v1, v2, v3 from mytable order by v1;
2 2 yes
3 3 no
4 4 no

# shared cdc source
query I
SELECT * from products_test_cnt
----
11

query I
SELECT * from orders_test_cnt
----
4

query ITT
SELECT * FROM products_test order by id limit 3
----
101 RW Small 2-wheel scooter
102 RW 12V car battery
103 RW 12-pack of drill bits with sizes ranging from #40 to #3
9 changes: 4 additions & 5 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ mysql --protocol=tcp -u root -e "DROP DATABASE IF EXISTS mytest; CREATE DATABASE
system ok
mysql --protocol=tcp -u root mytest < e2e_test/source/cdc/mysql_create.sql

# generate data to mysql
system ok
mysql --protocol=tcp -u root mytest < e2e_test/source/cdc/mysql_init_data.sql

# enable cdc backfill in ci
statement ok
set cdc_backfill='true';
Expand Down Expand Up @@ -47,11 +51,6 @@ create materialized view products_test_cnt as select count(*) as cnt from produc
statement ok
create materialized view orders_test_cnt as select count(*) as cnt from orders_test;


# generate data to mysql
system ok
mysql --protocol=tcp -u root mytest < e2e_test/source/cdc/mysql_init_data.sql

sleep 5s

# check ingestion results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public static ConnectorServiceProto.ValidateSourceResponse validateResponse(Stri
.build();
}

public static void ensurePropNotNull(Map<String, String> props, String name) {
if (!props.containsKey(name)) {
public static void ensurePropNotBlank(Map<String, String> props, String name) {
if (StringUtils.isBlank(props.get(name))) {
throw ValidatorUtils.invalidArgument(
String.format("'%s' not found, please check the WITH properties", name));
}
Expand All @@ -73,39 +73,39 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re
throws Exception {
var props = request.getPropertiesMap();

ensurePropNotNull(props, DbzConnectorConfig.HOST);
ensurePropNotNull(props, DbzConnectorConfig.PORT);
ensurePropNotNull(props, DbzConnectorConfig.DB_NAME);
ensurePropNotNull(props, DbzConnectorConfig.USER);
ensurePropNotNull(props, DbzConnectorConfig.PASSWORD);
ensurePropNotBlank(props, DbzConnectorConfig.HOST);
ensurePropNotBlank(props, DbzConnectorConfig.PORT);
ensurePropNotBlank(props, DbzConnectorConfig.DB_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.USER);
ensurePropNotBlank(props, DbzConnectorConfig.PASSWORD);

// ensure table name is passed by user in single mode
if (Utils.getCdcSourceMode(props) == CdcSourceMode.SINGLE_MODE) {
ensurePropNotNull(props, DbzConnectorConfig.TABLE_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.TABLE_NAME);
}

TableSchema tableSchema = TableSchema.fromProto(request.getTableSchema());
switch (request.getSourceType()) {
case POSTGRES:
ensurePropNotNull(props, DbzConnectorConfig.TABLE_NAME);
ensurePropNotNull(props, DbzConnectorConfig.PG_SCHEMA_NAME);
ensurePropNotNull(props, DbzConnectorConfig.PG_SLOT_NAME);
ensurePropNotNull(props, DbzConnectorConfig.PG_PUB_NAME);
ensurePropNotNull(props, DbzConnectorConfig.PG_PUB_CREATE);
ensurePropNotBlank(props, DbzConnectorConfig.TABLE_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_SCHEMA_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_SLOT_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_PUB_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_PUB_CREATE);
try (var validator = new PostgresValidator(props, tableSchema)) {
validator.validateAll();
}
break;

case CITUS:
ensurePropNotNull(props, DbzConnectorConfig.TABLE_NAME);
ensurePropNotNull(props, DbzConnectorConfig.PG_SCHEMA_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.TABLE_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_SCHEMA_NAME);
try (var coordinatorValidator = new CitusValidator(props, tableSchema)) {
coordinatorValidator.validateDistributedTable();
coordinatorValidator.validateTable();
}

ensurePropNotNull(props, DbzConnectorConfig.DB_SERVERS);
ensurePropNotBlank(props, DbzConnectorConfig.DB_SERVERS);
var workerServers =
StringUtils.split(props.get(DbzConnectorConfig.DB_SERVERS), ',');
// props extracted from grpc request, clone it to modify
Expand All @@ -126,6 +126,7 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re

break;
case MYSQL:
ensurePropNotBlank(props, DbzConnectorConfig.MYSQL_SERVER_ID);
try (var validator = new MySqlValidator(props, tableSchema)) {
validator.validateAll();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class DbzConnectorConfig {

private static final String DBZ_PROPERTY_PREFIX = "debezium.";

private static final String SNAPSHOT_MODE_KEY = "debezium.snapshot.mode";
private static final String SNAPSHOT_MODE_BACKFILL = "rw_cdc_backfill";

private static Map<String, String> extractDebeziumProperties(
Expand Down Expand Up @@ -103,31 +104,48 @@ public DbzConnectorConfig(
String startOffset,
Map<String, String> userProps,
boolean snapshotDone) {

StringSubstitutor substitutor = new StringSubstitutor(userProps);
var dbzProps = initiateDbConfig(DBZ_CONFIG_FILE, substitutor);
var isCdcBackfill =
null != userProps.get(SNAPSHOT_MODE_KEY)
&& userProps.get(SNAPSHOT_MODE_KEY).equals(SNAPSHOT_MODE_BACKFILL);

LOG.info(
"DbzConnectorConfig: source={}, sourceId={}, startOffset={}, snapshotDone={}",
"DbzConnectorConfig: source={}, sourceId={}, startOffset={}, snapshotDone={}, isCdcBackfill={}",
source,
sourceId,
startOffset,
snapshotDone);
snapshotDone,
isCdcBackfill);

StringSubstitutor substitutor = new StringSubstitutor(userProps);
var dbzProps = initiateDbConfig(DBZ_CONFIG_FILE, substitutor);
if (source == SourceTypeE.MYSQL) {
var mysqlProps = initiateDbConfig(MYSQL_CONFIG_FILE, substitutor);
// if snapshot phase is finished and offset is specified, we will continue binlog
// reading from the given offset
if (snapshotDone && null != startOffset && !startOffset.isBlank()) {
// 'snapshot.mode=schema_only_recovery' must be configured if binlog offset is
// specified. It only snapshots the schemas, not the data, and continue binlog
// reading from the specified offset
mysqlProps.setProperty("snapshot.mode", "schema_only_recovery");
mysqlProps.setProperty(
ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset);
} else if (mysqlProps.getProperty("snapshot.mode").equals(SNAPSHOT_MODE_BACKFILL)) {
// only snapshot table schemas which is not required by the source parser
mysqlProps.setProperty("snapshot.mode", "schema_only");
if (isCdcBackfill) {
// disable snapshot locking at all
mysqlProps.setProperty("snapshot.locking.mode", "none");

// If cdc backfill enabled, the source only emit incremental changes, so we must
// rewind to the given offset and continue binlog reading from there
if (null != startOffset && !startOffset.isBlank()) {
mysqlProps.setProperty("snapshot.mode", "schema_only_recovery");
mysqlProps.setProperty(
ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset);
} else {
// read upstream table schemas and emit incremental changes only
mysqlProps.setProperty("snapshot.mode", "schema_only");
}
} else {
// if snapshot phase is finished and offset is specified, we will continue binlog
// reading from the given offset
if (snapshotDone && null != startOffset && !startOffset.isBlank()) {
// 'snapshot.mode=schema_only_recovery' must be configured if binlog offset is
// specified. It only snapshots the schemas, not the data, and continue binlog
// reading from the specified offset
mysqlProps.setProperty("snapshot.mode", "schema_only_recovery");
mysqlProps.setProperty(
ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset);
}
}

dbzProps.putAll(mysqlProps);
Expand Down
2 changes: 2 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ message ExternalTableDesc {
string table_name = 4;
repeated uint32 stream_key = 5;
map<string, string> connect_properties = 6;
// upstream cdc source job id
uint32 source_id = 7;
}

enum JoinType {
Expand Down
6 changes: 5 additions & 1 deletion src/common/src/catalog/external_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ use crate::util::sort_util::ColumnOrder;
/// Compute node will use this information to connect to the external database and scan the table.
#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
pub struct CdcTableDesc {
/// Id of the upstream source in sharing cdc mode
/// Id of the table in RW
pub table_id: TableId,

/// Id of the upstream source in sharing cdc mode
pub source_id: TableId,

/// The full name of the table in external database, e.g. `database_name.table.name` in MySQL
/// and `schema_name.table_name` in the Postgres.
pub external_table_name: String,
Expand Down Expand Up @@ -58,6 +61,7 @@ impl CdcTableDesc {
pub fn to_protobuf(&self) -> ExternalTableDesc {
ExternalTableDesc {
table_id: self.table_id.into(),
source_id: self.source_id.into(),
columns: self.columns.iter().map(Into::into).collect(),
pk: self.pk.iter().map(|v| v.to_protobuf()).collect(),
table_name: self.external_table_name.clone(),
Expand Down
3 changes: 2 additions & 1 deletion src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ async fn test_cdc_backfill() -> StreamResult<()> {
vec![0, 1, 2],
None,
Arc::new(StreamingMetrics::unused()),
source_state_handler,
None,
Some(source_state_handler),
false,
4, // 4 rows in a snapshot chunk
);
Expand Down
14 changes: 6 additions & 8 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,14 +543,12 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream

for (i, msg) in batch.into_iter().enumerate() {
if msg.key.is_none() && msg.payload.is_none() {
if parser.parser_format() == ParserFormat::Debezium {
tracing::debug!(offset = msg.offset, "skip parsing of heartbeat message");
// empty payload means a heartbeat in cdc source
// heartbeat message offset should not overwrite data messages offset
split_offset_mapping
.entry(msg.split_id)
.or_insert(msg.offset.clone());
}
tracing::debug!(offset = msg.offset, "skip parsing of heartbeat message");
// assumes an empty message as a heartbeat
// heartbeat message offset should not overwrite data messages offset
split_offset_mapping
.entry(msg.split_id)
.or_insert(msg.offset.clone());
kwannoel marked this conversation as resolved.
Show resolved Hide resolved

continue;
}
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/source/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl SchemaTableName {
}
}

#[derive(Debug, Clone, Default, PartialEq, PartialOrd)]
#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Serialize, Deserialize)]
pub struct MySqlOffset {
pub filename: String,
pub position: u64,
Expand All @@ -133,14 +133,14 @@ impl MySqlOffset {
}
}

#[derive(Debug, Clone, Default, PartialEq, PartialOrd)]
#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Serialize, Deserialize)]
pub struct PostgresOffset {
pub txid: u64,
pub lsn: u64,
pub tx_usec: u64,
}

#[derive(Debug, Clone, PartialEq, PartialOrd)]
#[derive(Debug, Clone, PartialEq, PartialOrd, Serialize, Deserialize)]
pub enum CdcOffset {
MySql(MySqlOffset),
Postgres(PostgresOffset),
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ pub(crate) async fn gen_create_table_plan_with_source(

let cdc_table_desc = CdcTableDesc {
table_id: TableId::placeholder(),
source_id: TableId::placeholder(),
external_table_name: "".to_string(),
pk: table_pk,
columns: columns.iter().map(|c| c.column_desc.clone()).collect(),
Expand Down Expand Up @@ -847,7 +848,8 @@ pub(crate) fn gen_create_table_plan_for_cdc_source(
derive_connect_properties(source.as_ref(), external_table_name.clone())?;

let cdc_table_desc = CdcTableDesc {
table_id: source.id.into(), // source can be considered as an external table
table_id: TableId::placeholder(), // will be filled in meta node
source_id: source.id.into(), // id of cdc source streaming job
external_table_name: external_table_name.clone(),
pk: table_pk,
columns: columns.iter().map(|c| c.column_desc.clone()).collect(),
Expand Down
Loading