diff --git a/docs/en/snapshot/check.md b/docs/en/snapshot/check.md index 8ea00d43..81b61714 100644 --- a/docs/en/snapshot/check.md +++ b/docs/en/snapshot/check.md @@ -146,6 +146,44 @@ The summary log contains the overall result of the check, such as the number of {"start_time":"2023-09-01T12:00:00+08:00","end_time":"2023-09-01T12:00:01+08:00","is_consistent":false,"miss_count":1,"diff_count":2,"extra_count":1,"sql_count":3} ``` +## Output complete rows + +When the business needs the full row content for troubleshooting, enable full-row logging in the `[sinker]` section: + +``` +[sinker] +output_full_row=true +``` + +When set to `true`, the checker appends `src_row` and `dst_row` to every diff log, and `src_row` to every miss log (full rows are currently available for MySQL, PostgreSQL, and MongoDB; Redis is not supported yet). Example: + +``` +{ + "log_type": "Diff", + "schema": "test_db_1", + "tb": "one_pk_multi_uk", + "id_col_values": { + "f_0": "5" + }, + "diff_col_values": { + "f_1": { + "src": "5", + "dst": "5000" + } + }, + "src_row": { + "f_0": 5, + "f_1": 5, + "f_2": "ok" + }, + "dst_row": { + "f_0": 5, + "f_1": 5000, + "f_2": "after manual update" + } +} +``` + # Other configurations - For [filter] and [router], refer to [config details](../config.md). diff --git a/dt-common/src/config/sinker_config.rs b/dt-common/src/config/sinker_config.rs index fbc0502e..567816d3 100644 --- a/dt-common/src/config/sinker_config.rs +++ b/dt-common/src/config/sinker_config.rs @@ -37,6 +37,8 @@ pub enum SinkerConfig { output_full_row: bool, output_revise_sql: bool, revise_match_full_row: bool, + recheck_interval_secs: u64, + recheck_attempts: u32, }, PgCheck { @@ -47,6 +49,8 @@ pub enum SinkerConfig { output_full_row: bool, output_revise_sql: bool, revise_match_full_row: bool, + recheck_interval_secs: u64, + recheck_attempts: u32, }, MongoCheck { @@ -57,6 +61,8 @@ pub enum SinkerConfig { check_log_file_size: String, output_full_row: bool, output_revise_sql: bool, + recheck_interval_secs: u64, + recheck_attempts: u32, }, MysqlStruct { diff --git a/dt-common/src/config/task_config.rs b/dt-common/src/config/task_config.rs index eb02bd58..d49ae4d8 100644 --- a/dt-common/src/config/task_config.rs +++ b/dt-common/src/config/task_config.rs @@ -80,6 +80,8 @@ const CHECK_LOG_DIR: &str = "check_log_dir"; const OUTPUT_FULL_ROW: &str = "output_full_row"; const OUTPUT_REVISE_SQL: &str = "output_revise_sql"; const REVISE_MATCH_FULL_ROW: &str = "revise_match_full_row"; +const RECHECK_INTERVAL_SECS: &str = "recheck_interval_secs"; +const RECHECK_ATTEMPTS: &str = "recheck_attempts"; const DB_TYPE: &str = "db_type"; const URL: &str = "url"; const BATCH_SIZE: &str = "batch_size"; @@ -471,6 +473,12 @@ impl TaskConfig { REVISE_MATCH_FULL_ROW, false, ), + recheck_interval_secs: loader.get_with_default( + SINKER, + RECHECK_INTERVAL_SECS, + 0, + ), + recheck_attempts: loader.get_with_default(SINKER, RECHECK_ATTEMPTS, 1), }, SinkType::Struct => SinkerConfig::MysqlStruct { @@ -513,6 +521,12 @@ impl TaskConfig { REVISE_MATCH_FULL_ROW, false, ), + recheck_interval_secs: loader.get_with_default( + SINKER, + RECHECK_INTERVAL_SECS, + 0, + ), + recheck_attempts: loader.get_with_default(SINKER, RECHECK_ATTEMPTS, 1), }, SinkType::Struct => SinkerConfig::PgStruct { @@ -553,6 +567,12 @@ impl TaskConfig { "output_revise_sql", false, ), + recheck_interval_secs: loader.get_with_default( + SINKER, + RECHECK_INTERVAL_SECS, + 0, + ), + recheck_attempts: loader.get_with_default(SINKER, RECHECK_ATTEMPTS, 1), }, _ => bail! { not_supported_err }, diff --git a/dt-connector/src/check_log/check_log.rs b/dt-connector/src/check_log/check_log.rs index 8badc17b..f9750e45 100644 --- a/dt-connector/src/check_log/check_log.rs +++ b/dt-connector/src/check_log/check_log.rs @@ -5,8 +5,11 @@ use dt_common::{error::Error, meta::col_value::ColValue, utils::serialize_util:: use serde::{Deserialize, Serialize}; use serde_json::json; -#[derive(Serialize, Deserialize)] +use super::log_type::LogType; + +#[derive(Debug, Serialize, Deserialize)] pub struct CheckLog { + pub log_type: LogType, pub schema: String, pub tb: String, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -34,9 +37,11 @@ pub struct CheckLog { serialize_with = "SerializeUtil::ordered_option_map" )] pub dst_row: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub revise_sql: Option, } -#[derive(Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct DiffColValue { pub src: Option, pub dst: Option, @@ -126,6 +131,7 @@ mod tests { let mut id_col_values = HashMap::new(); id_col_values.insert("id".to_string(), Some("1".to_string())); CheckLog { + log_type: LogType::Diff, schema: "s".into(), tb: "t".into(), target_schema: None, @@ -134,6 +140,7 @@ mod tests { diff_col_values: HashMap::new(), src_row: None, dst_row: None, + revise_sql: None, } } diff --git a/dt-connector/src/check_log/log_type.rs b/dt-connector/src/check_log/log_type.rs index 54aaf946..65a54bb3 100644 --- a/dt-connector/src/check_log/log_type.rs +++ b/dt-connector/src/check_log/log_type.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use strum::{EnumString, IntoStaticStr}; -#[derive(EnumString, IntoStaticStr, PartialEq, Serialize, Deserialize, Clone)] +#[derive(Debug, EnumString, IntoStaticStr, PartialEq, Serialize, Deserialize, Clone)] pub enum LogType { #[strum(serialize = "miss")] Miss, diff --git a/dt-connector/src/extractor/mysql/mysql_check_extractor.rs b/dt-connector/src/extractor/mysql/mysql_check_extractor.rs index 3ac5289a..dce23369 100644 --- a/dt-connector/src/extractor/mysql/mysql_check_extractor.rs +++ b/dt-connector/src/extractor/mysql/mysql_check_extractor.rs @@ -76,6 +76,7 @@ impl BatchCheckExtractor for MysqlCheckExtractor { row_data.row_type = RowType::Update; row_data.before = row_data.after.clone(); } + log_info!("extracted row_data = {}", row_data); self.base_extractor .push_row(row_data, Position::None) @@ -95,14 +96,13 @@ impl MysqlCheckExtractor { let mut after = HashMap::new(); for (col, value) in check_log.id_col_values.iter() { let col_type = tb_meta.get_col_type(col)?; - let col_value = if let Some(str) = value { - MysqlColValueConvertor::from_str(col_type, str)? - } else { - ColValue::None - }; + let col_value = value.as_deref().map_or(Ok(ColValue::None), |v| { + MysqlColValueConvertor::from_str(col_type, v) + })?; after.insert(col.to_string(), col_value); } let check_row_data = RowData::build_insert_row_data(after, &tb_meta.basic); + log_info!("check_row_data = {}", check_row_data); result.push(check_row_data); } Ok(result) diff --git a/dt-connector/src/extractor/pg/pg_check_extractor.rs b/dt-connector/src/extractor/pg/pg_check_extractor.rs index 50c9f752..20ba330d 100644 --- a/dt-connector/src/extractor/pg/pg_check_extractor.rs +++ b/dt-connector/src/extractor/pg/pg_check_extractor.rs @@ -103,11 +103,9 @@ impl PgCheckExtractor { let mut after = HashMap::new(); for (col, value) in check_log.id_col_values.iter() { let col_type = tb_meta.get_col_type(col)?; - let col_value = if let Some(str) = value { - PgColValueConvertor::from_str(col_type, str, &mut self.meta_manager)? - } else { - ColValue::None - }; + let col_value = value.as_deref().map_or(Ok(ColValue::None), |v| { + PgColValueConvertor::from_str(col_type, v, &mut self.meta_manager) + })?; after.insert(col.to_string(), col_value); } let check_row_data = RowData::build_insert_row_data(after, &tb_meta.basic); diff --git a/dt-connector/src/rdb_query_builder.rs b/dt-connector/src/rdb_query_builder.rs index 972a8414..970a99bb 100644 --- a/dt-connector/src/rdb_query_builder.rs +++ b/dt-connector/src/rdb_query_builder.rs @@ -519,6 +519,18 @@ impl RdbQueryBuilder<'_> { fn get_pg_sql_value(&self, col_value: &ColValue) -> String { match col_value { ColValue::Blob(v) => format!(r#"'\x{}'"#, hex::encode(v)), + ColValue::String(_) + | ColValue::RawString(_) + | ColValue::Time(_) + | ColValue::Date(_) + | ColValue::DateTime(_) + | ColValue::Timestamp(_) + | ColValue::Json(_) + | ColValue::Json2(_) + | ColValue::Json3(_) + | ColValue::Set2(_) + | ColValue::Enum2(_) + | ColValue::MongoDoc(_) => Self::quote_pg_string_literal(col_value), // For numeric types, we should not quote them in SQL ColValue::Tiny(_) | ColValue::UnsignedTiny(_) diff --git a/dt-connector/src/sinker/base_checker.rs b/dt-connector/src/sinker/base_checker.rs index 2f75dd15..0e64d42c 100644 --- a/dt-connector/src/sinker/base_checker.rs +++ b/dt-connector/src/sinker/base_checker.rs @@ -1,10 +1,15 @@ -use anyhow::{Context, Ok}; +use anyhow::Context; use mongodb::bson::Document; use std::borrow::Cow; use std::collections::{BTreeSet, HashMap}; +use std::future::Future; +use tokio::time::{sleep, Duration}; use crate::{ - check_log::check_log::{CheckLog, DiffColValue, StructCheckLog}, + check_log::{ + check_log::{CheckLog, DiffColValue, StructCheckLog}, + log_type::LogType, + }, rdb_query_builder::RdbQueryBuilder, rdb_router::RdbRouter, sinker::mongo::mongo_cmd, @@ -28,6 +33,24 @@ pub struct ReviseSqlContext<'a> { pub match_full_row: bool, } +#[derive(Clone, Copy)] +pub struct RecheckConfig { + pub delay_ms: u64, + pub times: u32, +} + +pub enum CheckResult { + Ok, + Miss, + Diff(HashMap), +} + +impl CheckResult { + pub fn is_inconsistent(&self) -> bool { + !matches!(self, CheckResult::Ok) + } +} + impl<'a> ReviseSqlContext<'a> { pub fn mysql(meta: &'a MysqlTbMeta, match_full_row: bool) -> Self { Self { @@ -182,12 +205,18 @@ pub struct BatchCompareContext<'ctx> { } impl BaseChecker { - pub async fn batch_compare_row_data_items( + pub async fn batch_compare_row_data_items( src_data: &[RowData], - dst_row_data_map: &HashMap, + mut dst_row_data_map: HashMap, range: BatchCompareRange, ctx: BatchCompareContext<'_>, - ) -> anyhow::Result<(Vec, Vec, usize)> { + recheck_config: RecheckConfig, + fetch_latest_row: F, + ) -> anyhow::Result<(Vec, Vec, usize)> + where + F: Fn(u128, &RowData) -> Fut, + Fut: Future>>, + { let BatchCompareContext { dst_tb_meta, extractor_meta_manager, @@ -210,61 +239,120 @@ impl BaseChecker { for src_row_data in target_slice { let hash_code = src_row_data.get_hash_code(dst_tb_meta)?; - // src_row_data is already routed, so here we call get_hash_code by dst_tb_meta - match dst_row_data_map.get(&hash_code) { - Some(dst_row_data) => { - let diff_col_values = Self::compare_row_data(src_row_data, dst_row_data)?; - if diff_col_values.is_empty() { - continue; - } + let dst_row_data = dst_row_data_map.remove(&hash_code); - if let Some(revise_sql) = revise_ctx + let (check_result, final_dst_row) = + Self::check_row_with_retry(src_row_data, dst_row_data, recheck_config, |row| { + fetch_latest_row(hash_code, row) + }) + .await?; + + match check_result { + CheckResult::Diff(diff_col_values) => { + let dst_row = final_dst_row + .as_ref() + .expect("diff result should have a dst row"); + + let revise_sql = revise_ctx .as_ref() - .map(|ctx| ctx.build_diff_sql(src_row_data, dst_row_data, &diff_col_values)) + .map(|ctx| ctx.build_diff_sql(src_row_data, dst_row, &diff_col_values)) .transpose()? - .flatten() - { + .flatten(); + + if let Some(revise_sql) = &revise_sql { log_sql!("{}", revise_sql); sql_count += 1; } - let diff_log = Self::build_diff_log( + let mut diff_log = Self::build_diff_log( src_row_data, - dst_row_data, + dst_row, diff_col_values, extractor_meta_manager, reverse_router, output_full_row, ) .await?; + diff_log.revise_sql = revise_sql; diff.push(diff_log); } - None => { - if let Some(revise_sql) = revise_ctx + CheckResult::Miss => { + let revise_sql = revise_ctx .as_ref() .map(|ctx| ctx.build_miss_sql(src_row_data)) .transpose()? - .flatten() - { + .flatten(); + + if let Some(revise_sql) = &revise_sql { log_sql!("{}", revise_sql); sql_count += 1; } - let miss_log = Self::build_miss_log( + let mut miss_log = Self::build_miss_log( src_row_data, extractor_meta_manager, reverse_router, output_full_row, ) .await?; + miss_log.revise_sql = revise_sql; miss.push(miss_log); } + CheckResult::Ok => {} } } Ok((miss, diff, sql_count)) } + pub async fn check_row_with_retry( + src_row: &RowData, + mut dst_row: Option, + recheck_config: RecheckConfig, + fetch_latest: F, + ) -> anyhow::Result<(CheckResult, Option)> + where + F: Fn(&RowData) -> Fut, + Fut: Future>>, + { + let mut check_result = Self::compare_src_dst(src_row, dst_row.as_ref())?; + + if check_result.is_inconsistent() && recheck_config.times > 0 { + for _ in 0..recheck_config.times { + if recheck_config.delay_ms > 0 { + sleep(Duration::from_millis(recheck_config.delay_ms)).await; + } + + if let Some(latest) = fetch_latest(src_row).await? { + dst_row = Some(latest); + } + + check_result = Self::compare_src_dst(src_row, dst_row.as_ref())?; + if !check_result.is_inconsistent() { + break; + } + } + } + + Ok((check_result, dst_row)) + } + + fn compare_src_dst( + src_row: &RowData, + dst_row: Option<&RowData>, + ) -> anyhow::Result { + if let Some(dst_row) = dst_row { + let diffs = Self::compare_row_data(src_row, dst_row)?; + if diffs.is_empty() { + Ok(CheckResult::Ok) + } else { + Ok(CheckResult::Diff(diffs)) + } + } else { + Ok(CheckResult::Miss) + } + } + pub fn compare_row_data( src_row_data: &RowData, dst_row_data: &RowData, @@ -455,6 +543,7 @@ impl BaseChecker { .flatten(); Ok(CheckLog { + log_type: LogType::Miss, schema: schema_for_meta, tb: tb_for_meta, target_schema, @@ -463,6 +552,7 @@ impl BaseChecker { diff_col_values: HashMap::new(), src_row, dst_row: None, + revise_sql: None, }) } @@ -496,6 +586,8 @@ impl BaseChecker { }; log.diff_col_values = mapped_diff_values; + log.log_type = LogType::Diff; + log.revise_sql = None; if output_full_row { let has_col_map = reverse_router @@ -547,6 +639,7 @@ impl BaseChecker { }; Ok(CheckLog { + log_type: LogType::Miss, schema: schema_for_log, tb: tb_for_log, target_schema: schema_changed.then(|| src_row_data.schema.clone()), @@ -555,6 +648,7 @@ impl BaseChecker { diff_col_values: HashMap::new(), src_row, dst_row: None, + revise_sql: None, }) } @@ -570,6 +664,8 @@ impl BaseChecker { Self::build_mongo_miss_log(src_row_data, tb_meta, reverse_router, output_full_row)?; diff_log.diff_col_values = diff_col_values; + diff_log.log_type = LogType::Diff; + diff_log.revise_sql = None; if output_full_row { let has_col_map = reverse_router diff --git a/dt-connector/src/sinker/mongo/mongo_checker.rs b/dt-connector/src/sinker/mongo/mongo_checker.rs index bee10afa..45be8702 100644 --- a/dt-connector/src/sinker/mongo/mongo_checker.rs +++ b/dt-connector/src/sinker/mongo/mongo_checker.rs @@ -15,7 +15,7 @@ use crate::{ check_log::check_log::CheckSummaryLog, rdb_router::RdbRouter, sinker::{ - base_checker::{BaseChecker, ReviseSqlContext}, + base_checker::{BaseChecker, CheckResult, RecheckConfig, ReviseSqlContext}, base_sinker::BaseSinker, }, Sinker, @@ -41,6 +41,8 @@ pub struct MongoChecker { pub monitor: Arc, pub output_full_row: bool, pub output_revise_sql: bool, + pub recheck_interval_secs: u64, + pub recheck_attempts: u32, pub summary: CheckSummaryLog, pub global_summary: Option>>, } @@ -154,51 +156,106 @@ impl MongoChecker { let mut diff = Vec::new(); let mut sql_count = 0; let revise_ctx = self.output_revise_sql.then(ReviseSqlContext::mongo); + let recheck_config = RecheckConfig { + delay_ms: self.recheck_interval_secs.saturating_mul(1000), + times: self.recheck_attempts, + }; + + let collection = collection.clone(); + let schema_clone = schema.to_string(); + let tb_clone = tb.to_string(); + let fetch_latest = |src_row: &RowData| { + let collection = collection.clone(); + let schema = schema_clone.clone(); + let tb = tb_clone.clone(); + let src_row = src_row.clone(); + async move { + let after = src_row.require_after()?; + let doc = after + .get(MongoConstants::DOC) + .and_then(|v| match v { + ColValue::MongoDoc(doc) => Some(doc), + _ => None, + }) + .context("missing mongo doc")?; + + let id = doc.get(MongoConstants::ID).context("missing _id")?; + let filter = doc! { MongoConstants::ID: id }; + + if let Some(doc) = collection.find_one(filter, None).await? { + if let Some(key) = MongoKey::from_doc(&doc) { + let row_data = Self::build_row_data(&schema, &tb, doc, &key); + Ok(Some(row_data)) + } else { + Ok(None) + } + } else { + Ok(None) + } + } + }; + for (key, src_row_data) in src_row_data_map { - if let Some(dst_row_data) = dst_row_data_map.remove(&key) { - let diff_col_values = BaseChecker::compare_row_data(src_row_data, &dst_row_data)?; - if !diff_col_values.is_empty() { + let dst_row_data = dst_row_data_map.remove(&key); + + let (check_result, final_dst_row) = BaseChecker::check_row_with_retry( + src_row_data, + dst_row_data, + recheck_config, + fetch_latest, + ) + .await?; + + match check_result { + CheckResult::Diff(diff_col_values) => { + let dst_row = final_dst_row + .as_ref() + .expect("diff result should have a dst row"); let revise_sql = revise_ctx .as_ref() - .map(|ctx| { - ctx.build_diff_sql(src_row_data, &dst_row_data, &diff_col_values) - }) + .map(|ctx| ctx.build_diff_sql(src_row_data, dst_row, &diff_col_values)) .transpose()? .flatten(); - let diff_log = BaseChecker::build_mongo_diff_log( + if let Some(revise_sql) = &revise_sql { + log_sql!("{}", revise_sql); + sql_count += 1; + } + + let mut diff_log = BaseChecker::build_mongo_diff_log( src_row_data, - &dst_row_data, + dst_row, diff_col_values, &tb_meta, &self.reverse_router, self.output_full_row, )?; - if let Some(revise_sql) = revise_sql { + diff_log.revise_sql = revise_sql; + diff.push(diff_log); + } + CheckResult::Miss => { + let revise_sql = revise_ctx + .as_ref() + .map(|ctx| ctx.build_miss_sql(src_row_data)) + .transpose()? + .flatten(); + + if let Some(revise_sql) = &revise_sql { log_sql!("{}", revise_sql); sql_count += 1; } - diff.push(diff_log); - } - } else { - let revise_sql = revise_ctx - .as_ref() - .map(|ctx| ctx.build_miss_sql(src_row_data)) - .transpose()? - .flatten(); - - let miss_log = BaseChecker::build_mongo_miss_log( - src_row_data, - &tb_meta, - &self.reverse_router, - self.output_full_row, - )?; - if let Some(revise_sql) = revise_sql { - log_sql!("{}", revise_sql); - sql_count += 1; + + let mut miss_log = BaseChecker::build_mongo_miss_log( + src_row_data, + &tb_meta, + &self.reverse_router, + self.output_full_row, + )?; + miss_log.revise_sql = revise_sql; + miss.push(miss_log); } - miss.push(miss_log); - }; + CheckResult::Ok => {} + } } BaseChecker::log_dml(&miss, &diff); diff --git a/dt-connector/src/sinker/mysql/mysql_checker.rs b/dt-connector/src/sinker/mysql/mysql_checker.rs index f196a648..a14e164e 100644 --- a/dt-connector/src/sinker/mysql/mysql_checker.rs +++ b/dt-connector/src/sinker/mysql/mysql_checker.rs @@ -17,7 +17,9 @@ use crate::{ rdb_query_builder::RdbQueryBuilder, rdb_router::RdbRouter, sinker::{ - base_checker::{BaseChecker, BatchCompareContext, BatchCompareRange, ReviseSqlContext}, + base_checker::{ + BaseChecker, BatchCompareContext, BatchCompareRange, RecheckConfig, ReviseSqlContext, + }, base_sinker::BaseSinker, }, Sinker, @@ -47,6 +49,8 @@ pub struct MysqlChecker { pub output_full_row: bool, pub output_revise_sql: bool, pub revise_match_full_row: bool, + pub recheck_interval_secs: u64, + pub recheck_attempts: u32, pub summary: CheckSummaryLog, pub global_summary: Option>>, } @@ -128,7 +132,7 @@ impl MysqlChecker { .transpose()? .flatten(); - let diff_log = BaseChecker::build_diff_log( + let mut diff_log = BaseChecker::build_diff_log( src_row_data, &dst_row_data, diff_col_values, @@ -137,6 +141,7 @@ impl MysqlChecker { self.output_full_row, ) .await?; + diff_log.revise_sql = revise_sql.clone(); if let Some(revise_sql) = revise_sql { log_sql!("{}", revise_sql); } @@ -149,13 +154,14 @@ impl MysqlChecker { .transpose()? .flatten(); - let miss_log = BaseChecker::build_miss_log( + let mut miss_log = BaseChecker::build_miss_log( src_row_data, &mut self.extractor_meta_manager, &self.reverse_router, self.output_full_row, ) .await?; + miss_log.revise_sql = revise_sql.clone(); if let Some(revise_sql) = revise_sql { log_sql!("{}", revise_sql); } @@ -174,7 +180,12 @@ impl MysqlChecker { start_index: usize, batch_size: usize, ) -> anyhow::Result<()> { - let tb_meta = self.meta_manager.get_tb_meta_by_row_data(&data[0]).await?; + let tb_meta_owned = self + .meta_manager + .get_tb_meta_by_row_data(&data[0]) + .await? + .clone(); + let tb_meta = &tb_meta_owned; let query_builder = RdbQueryBuilder::new_for_mysql(tb_meta, None); // build fetch dst sql @@ -202,6 +213,11 @@ impl MysqlChecker { .output_revise_sql .then(|| ReviseSqlContext::mysql(tb_meta, self.revise_match_full_row)); + let recheck_config = RecheckConfig { + delay_ms: self.recheck_interval_secs.saturating_mul(1000), + times: self.recheck_attempts, + }; + let ctx = BatchCompareContext { dst_tb_meta: &tb_meta.basic, extractor_meta_manager: &mut self.extractor_meta_manager, @@ -210,9 +226,35 @@ impl MysqlChecker { revise_ctx: revise_ctx.as_ref(), }; - let (miss, diff, sql_count) = - BaseChecker::batch_compare_row_data_items(data, &dst_row_data_map, compare_range, ctx) - .await?; + let pool = self.conn_pool.clone(); + let fetch_tb_meta = tb_meta_owned.clone(); + let fetch_latest = move |_, src_row: &RowData| { + let pool = pool.clone(); + let tb_meta = fetch_tb_meta.clone(); + let src_row = src_row.clone(); + async move { + let qb = RdbQueryBuilder::new_for_mysql(&tb_meta, None); + let q_info = qb.get_select_query(&src_row)?; + let query = qb.create_mysql_query(&q_info)?; + let mut rows = query.fetch(&pool); + if let Some(row) = rows.try_next().await? { + let row_data = RowData::from_mysql_row(&row, &tb_meta, &None); + Ok(Some(row_data)) + } else { + Ok(None) + } + } + }; + + let (miss, diff, sql_count) = BaseChecker::batch_compare_row_data_items( + data, + dst_row_data_map, + compare_range, + ctx, + recheck_config, + fetch_latest, + ) + .await?; BaseChecker::log_dml(&miss, &diff); diff --git a/dt-connector/src/sinker/pg/pg_checker.rs b/dt-connector/src/sinker/pg/pg_checker.rs index bab7e9cc..b84cfd3f 100644 --- a/dt-connector/src/sinker/pg/pg_checker.rs +++ b/dt-connector/src/sinker/pg/pg_checker.rs @@ -17,7 +17,9 @@ use crate::{ rdb_query_builder::RdbQueryBuilder, rdb_router::RdbRouter, sinker::{ - base_checker::{BaseChecker, BatchCompareContext, BatchCompareRange, ReviseSqlContext}, + base_checker::{ + BaseChecker, BatchCompareContext, BatchCompareRange, RecheckConfig, ReviseSqlContext, + }, base_sinker::BaseSinker, }, Sinker, @@ -47,6 +49,8 @@ pub struct PgChecker { pub output_full_row: bool, pub output_revise_sql: bool, pub revise_match_full_row: bool, + pub recheck_interval_secs: u64, + pub recheck_attempts: u32, pub summary: CheckSummaryLog, pub global_summary: Option>>, } @@ -128,7 +132,7 @@ impl PgChecker { .transpose()? .flatten(); - let diff_log = BaseChecker::build_diff_log( + let mut diff_log = BaseChecker::build_diff_log( src_row_data, &dst_row_data, diff_col_values, @@ -137,6 +141,7 @@ impl PgChecker { self.output_full_row, ) .await?; + diff_log.revise_sql = revise_sql.clone(); if let Some(revise_sql) = revise_sql { log_sql!("{}", revise_sql); } @@ -149,13 +154,14 @@ impl PgChecker { .transpose()? .flatten(); - let miss_log = BaseChecker::build_miss_log( + let mut miss_log = BaseChecker::build_miss_log( src_row_data, &mut self.extractor_meta_manager, &self.reverse_router, self.output_full_row, ) .await?; + miss_log.revise_sql = revise_sql.clone(); if let Some(revise_sql) = revise_sql { log_sql!("{}", revise_sql); } @@ -174,7 +180,12 @@ impl PgChecker { start_index: usize, batch_size: usize, ) -> anyhow::Result<()> { - let tb_meta = self.meta_manager.get_tb_meta_by_row_data(&data[0]).await?; + let tb_meta_owned = self + .meta_manager + .get_tb_meta_by_row_data(&data[0]) + .await? + .clone(); + let tb_meta = &tb_meta_owned; let query_builder = RdbQueryBuilder::new_for_pg(tb_meta, None); // build fetch dst sql @@ -202,6 +213,11 @@ impl PgChecker { .output_revise_sql .then(|| ReviseSqlContext::pg(tb_meta, self.revise_match_full_row)); + let recheck_config = RecheckConfig { + delay_ms: self.recheck_interval_secs.saturating_mul(1000), + times: self.recheck_attempts, + }; + let ctx = BatchCompareContext { dst_tb_meta: &tb_meta.basic, extractor_meta_manager: &mut self.extractor_meta_manager, @@ -210,9 +226,35 @@ impl PgChecker { revise_ctx: revise_ctx.as_ref(), }; - let (miss, diff, sql_count) = - BaseChecker::batch_compare_row_data_items(data, &dst_row_data_map, compare_range, ctx) - .await?; + let pool = self.conn_pool.clone(); + let fetch_tb_meta = tb_meta_owned.clone(); + let fetch_latest = move |_, src_row: &RowData| { + let pool = pool.clone(); + let tb_meta = fetch_tb_meta.clone(); + let src_row = src_row.clone(); + async move { + let qb = RdbQueryBuilder::new_for_pg(&tb_meta, None); + let q_info = qb.get_select_query(&src_row)?; + let query = qb.create_pg_query(&q_info)?; + let mut rows = query.fetch(&pool); + if let Some(row) = rows.try_next().await? { + let row_data = RowData::from_pg_row(&row, &tb_meta, &None); + Ok(Some(row_data)) + } else { + Ok(None) + } + } + }; + + let (miss, diff, sql_count) = BaseChecker::batch_compare_row_data_items( + data, + dst_row_data_map, + compare_range, + ctx, + recheck_config, + fetch_latest, + ) + .await?; BaseChecker::log_dml(&miss, &diff); diff --git a/dt-task/src/sinker_util.rs b/dt-task/src/sinker_util.rs index 9ea1d0f7..bd6da1a8 100644 --- a/dt-task/src/sinker_util.rs +++ b/dt-task/src/sinker_util.rs @@ -135,6 +135,8 @@ impl SinkerUtil { output_full_row, output_revise_sql, revise_match_full_row, + recheck_interval_secs, + recheck_attempts, .. } => { let reverse_router = create_router!(config, Mysql).reverse(); @@ -163,6 +165,8 @@ impl SinkerUtil { output_full_row, output_revise_sql, revise_match_full_row, + recheck_interval_secs, + recheck_attempts, summary: CheckSummaryLog { start_time: Local::now().to_rfc3339(), ..Default::default() @@ -209,6 +213,8 @@ impl SinkerUtil { output_full_row, output_revise_sql, revise_match_full_row, + recheck_interval_secs, + recheck_attempts, .. } => { let reverse_router = create_router!(config, Pg).reverse(); @@ -237,6 +243,8 @@ impl SinkerUtil { output_full_row, output_revise_sql, revise_match_full_row, + recheck_interval_secs, + recheck_attempts, summary: CheckSummaryLog { start_time: Local::now().to_rfc3339(), ..Default::default() @@ -271,6 +279,8 @@ impl SinkerUtil { batch_size, output_full_row, output_revise_sql, + recheck_interval_secs, + recheck_attempts, .. } => { let reverse_router = create_router!(config, Mongo).reverse(); @@ -288,6 +298,8 @@ impl SinkerUtil { monitor: monitor.clone(), output_full_row, output_revise_sql, + recheck_interval_secs, + recheck_attempts, summary: CheckSummaryLog { start_time: Local::now().to_rfc3339(), ..Default::default() diff --git a/dt-tests/tests/mongo_to_mongo/check/basic_test/expect_check_log/diff.log b/dt-tests/tests/mongo_to_mongo/check/basic_test/expect_check_log/diff.log index 9bba417a..9279fd5c 100644 --- a/dt-tests/tests/mongo_to_mongo/check/basic_test/expect_check_log/diff.log +++ b/dt-tests/tests/mongo_to_mongo/check/basic_test/expect_check_log/diff.log @@ -1,6 +1,6 @@ -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4b\"}}"},"diff_col_values":{"age":{"src":"2","dst":"2000"}}} -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4c\"}}"},"diff_col_values":{"age":{"src":"3","dst":"3000"}}} -{"schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de42\"}}"},"diff_col_values":{"age":{"src":"4","dst":"4000"}}} -{"schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de43\"}}"},"diff_col_values":{"age":{"src":"5","dst":"5000"}}} -{"schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de44\"}}"},"diff_col_values":{"age":{"src":"1","dst":"1000"}}} -{"schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de48\"}}"},"diff_col_values":{"age":{"src":"5","dst":"5000"}}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4b\"}}"},"diff_col_values":{"age":{"src":"2","dst":"2000"}}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4c\"}}"},"diff_col_values":{"age":{"src":"3","dst":"3000"}}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de42\"}}"},"diff_col_values":{"age":{"src":"4","dst":"4000"}}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de43\"}}"},"diff_col_values":{"age":{"src":"5","dst":"5000"}}} +{"log_type":"Diff","schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de44\"}}"},"diff_col_values":{"age":{"src":"1","dst":"1000"}}} +{"log_type":"Diff","schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de48\"}}"},"diff_col_values":{"age":{"src":"5","dst":"5000"}}} diff --git a/dt-tests/tests/mongo_to_mongo/check/basic_test/expect_check_log/miss.log b/dt-tests/tests/mongo_to_mongo/check/basic_test/expect_check_log/miss.log index a380a5ad..49b3afbe 100644 --- a/dt-tests/tests/mongo_to_mongo/check/basic_test/expect_check_log/miss.log +++ b/dt-tests/tests/mongo_to_mongo/check/basic_test/expect_check_log/miss.log @@ -1,5 +1,5 @@ -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4d\"}}"}} -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4e\"}}"}} -{"schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de40\"}}"}} -{"schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4f\"}}"}} -{"schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de46\"}}"}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4d\"}}"}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4e\"}}"}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4f\"}}"}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de40\"}}"}} +{"log_type":"Miss","schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de46\"}}"}} diff --git a/dt-tests/tests/mongo_to_mongo/check/output_full_row_test/expect_check_log/diff.log b/dt-tests/tests/mongo_to_mongo/check/output_full_row_test/expect_check_log/diff.log index 990db094..d72f1ca7 100644 --- a/dt-tests/tests/mongo_to_mongo/check/output_full_row_test/expect_check_log/diff.log +++ b/dt-tests/tests/mongo_to_mongo/check/output_full_row_test/expect_check_log/diff.log @@ -1 +1 @@ -{"schema":"test_db_1","tb":"users","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"507f1f77bcf86cd799439012\"}}"},"diff_col_values":{"age":{"src":"25","dst":"99"},"email":{"src":"bob@example.com","dst":"bob_updated@example.com"}},"src_row":{"_id":"{\"ObjectId\":{\"$oid\":\"507f1f77bcf86cd799439012\"}}","doc":{"_id":{"$oid":"507f1f77bcf86cd799439012"},"name":"Bob","age":25,"email":"bob@example.com"}},"dst_row":{"_id":"{\"ObjectId\":{\"$oid\":\"507f1f77bcf86cd799439012\"}}","doc":{"_id":{"$oid":"507f1f77bcf86cd799439012"},"name":"Bob","age":99,"email":"bob_updated@example.com"}}} +{"log_type":"Diff","schema":"test_db_1","tb":"users","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"507f1f77bcf86cd799439012\"}}"},"diff_col_values":{"age":{"src":"25","dst":"99"},"email":{"src":"bob@example.com","dst":"bob_updated@example.com"}},"src_row":{"_id":"{\"ObjectId\":{\"$oid\":\"507f1f77bcf86cd799439012\"}}","doc":{"_id":{"$oid":"507f1f77bcf86cd799439012"},"name":"Bob","age":25,"email":"bob@example.com"}},"dst_row":{"_id":"{\"ObjectId\":{\"$oid\":\"507f1f77bcf86cd799439012\"}}","doc":{"_id":{"$oid":"507f1f77bcf86cd799439012"},"name":"Bob","age":99,"email":"bob_updated@example.com"}}} diff --git a/dt-tests/tests/mongo_to_mongo/check/output_full_row_test/expect_check_log/miss.log b/dt-tests/tests/mongo_to_mongo/check/output_full_row_test/expect_check_log/miss.log index 8bba3a23..0f4c7e55 100644 --- a/dt-tests/tests/mongo_to_mongo/check/output_full_row_test/expect_check_log/miss.log +++ b/dt-tests/tests/mongo_to_mongo/check/output_full_row_test/expect_check_log/miss.log @@ -1 +1 @@ -{"schema":"test_db_1","tb":"users","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"507f1f77bcf86cd799439013\"}}"},"src_row":{"_id":"{\"ObjectId\":{\"$oid\":\"507f1f77bcf86cd799439013\"}}","doc":{"_id":{"$oid":"507f1f77bcf86cd799439013"},"name":"Charlie","age":35,"email":"charlie@example.com"}}} +{"log_type":"Miss","schema":"test_db_1","tb":"users","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"507f1f77bcf86cd799439013\"}}"},"src_row":{"_id":"{\"ObjectId\":{\"$oid\":\"507f1f77bcf86cd799439013\"}}","doc":{"_id":{"$oid":"507f1f77bcf86cd799439013"},"name":"Charlie","age":35,"email":"charlie@example.com"}}} diff --git a/dt-tests/tests/mongo_to_mongo/check/output_revise_sql_test/check_log/diff.log b/dt-tests/tests/mongo_to_mongo/check/output_revise_sql_test/check_log/diff.log new file mode 100644 index 00000000..77ec1001 --- /dev/null +++ b/dt-tests/tests/mongo_to_mongo/check/output_revise_sql_test/check_log/diff.log @@ -0,0 +1 @@ +{"log_type":"Diff","schema":"test_db_1","tb":"users","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de02\"}}"},"diff_col_values":{"age":{"src":"25","dst":"99"},"email":{"src":"\"bob@example.com\"","dst":"\"bob_updated@example.com\""}},"revise_sql":"db.users.updateOne({ \"_id\": ObjectId(\"65733a82fb2ce9836745de02\") }, { \"$set\": { \"age\": 25, \"email\": \"bob@example.com\" } })"} diff --git a/dt-tests/tests/mongo_to_mongo/check/output_revise_sql_test/check_log/extra.log b/dt-tests/tests/mongo_to_mongo/check/output_revise_sql_test/check_log/extra.log new file mode 100644 index 00000000..e69de29b diff --git a/dt-tests/tests/mongo_to_mongo/check/output_revise_sql_test/check_log/miss.log b/dt-tests/tests/mongo_to_mongo/check/output_revise_sql_test/check_log/miss.log new file mode 100644 index 00000000..88e811a3 --- /dev/null +++ b/dt-tests/tests/mongo_to_mongo/check/output_revise_sql_test/check_log/miss.log @@ -0,0 +1 @@ +{"log_type":"Miss","schema":"test_db_1","tb":"users","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de03\"}}"},"revise_sql":"db.users.insertOne({ \"_id\": ObjectId(\"65733a82fb2ce9836745de03\"), \"name\": \"Charlie\", \"age\": 35, \"email\": \"charlie@example.com\" })"} diff --git a/dt-tests/tests/mongo_to_mongo/check/output_revise_sql_test/expect_check_log/diff.log b/dt-tests/tests/mongo_to_mongo/check/output_revise_sql_test/expect_check_log/diff.log index c2f8796e..4b4c2e3f 100644 --- a/dt-tests/tests/mongo_to_mongo/check/output_revise_sql_test/expect_check_log/diff.log +++ b/dt-tests/tests/mongo_to_mongo/check/output_revise_sql_test/expect_check_log/diff.log @@ -1 +1 @@ -{"schema":"test_db_1","tb":"users","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de02\"}}"},"diff_col_values":{"age":{"src":"25","dst":"99"},"email":{"src":"bob@example.com","dst":"bob_updated@example.com"}}} +{"log_type":"Diff","schema":"test_db_1","tb":"users","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de02\"}}"},"diff_col_values":{"age":{"src":"25","dst":"99"},"email":{"src":"bob@example.com","dst":"bob_updated@example.com"}},"revise_sql":"db.users.updateOne({'_id': ObjectId('65733a82fb2ce9836745de02')}, {'$set': {'age': 25, 'email': 'bob@example.com'}})"} diff --git a/dt-tests/tests/mongo_to_mongo/check/output_revise_sql_test/expect_check_log/miss.log b/dt-tests/tests/mongo_to_mongo/check/output_revise_sql_test/expect_check_log/miss.log index 8be650b5..bb7ee754 100644 --- a/dt-tests/tests/mongo_to_mongo/check/output_revise_sql_test/expect_check_log/miss.log +++ b/dt-tests/tests/mongo_to_mongo/check/output_revise_sql_test/expect_check_log/miss.log @@ -1 +1 @@ -{"schema":"test_db_1","tb":"users","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de03\"}}"}} +{"log_type":"Miss","schema":"test_db_1","tb":"users","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de03\"}}"},"revise_sql":"db.users.insertOne({'_id': ObjectId('65733a82fb2ce9836745de03'), 'name': 'Charlie', 'age': 35, 'email': 'charlie@example.com'})"} diff --git a/dt-tests/tests/mongo_to_mongo/check/recheck_config/dst_prepare.sql b/dt-tests/tests/mongo_to_mongo/check/recheck_config/dst_prepare.sql new file mode 100644 index 00000000..a9d83298 --- /dev/null +++ b/dt-tests/tests/mongo_to_mongo/check/recheck_config/dst_prepare.sql @@ -0,0 +1,3 @@ +use sample_db +db.dropDatabase(); +db.createCollection("sample_tb"); diff --git a/dt-tests/tests/mongo_to_mongo/check/recheck_config/dst_test.sql b/dt-tests/tests/mongo_to_mongo/check/recheck_config/dst_test.sql new file mode 100644 index 00000000..1b85ee25 --- /dev/null +++ b/dt-tests/tests/mongo_to_mongo/check/recheck_config/dst_test.sql @@ -0,0 +1,3 @@ +use sample_db +db.sample_tb.insertOne({ "_id": 1, "name": "Alice" }); +db.sample_tb.insertOne({ "_id": 2, "name": "Eve" }); diff --git a/dt-tests/tests/mongo_to_mongo/check/recheck_config/expect_check_log/diff.log b/dt-tests/tests/mongo_to_mongo/check/recheck_config/expect_check_log/diff.log new file mode 100644 index 00000000..ca997833 --- /dev/null +++ b/dt-tests/tests/mongo_to_mongo/check/recheck_config/expect_check_log/diff.log @@ -0,0 +1 @@ +{"diff_col_values":{"name":{"src":"Bob","dst":"Eve"}},"id_col_values":{"_id":"{\"Int32\":2}"},"log_type":"Diff","schema":"sample_db","tb":"sample_tb"} diff --git a/dt-tests/tests/mongo_to_mongo/check/recheck_config/expect_check_log/summary.log b/dt-tests/tests/mongo_to_mongo/check/recheck_config/expect_check_log/summary.log new file mode 100644 index 00000000..daf2fed5 --- /dev/null +++ b/dt-tests/tests/mongo_to_mongo/check/recheck_config/expect_check_log/summary.log @@ -0,0 +1 @@ +{"start_time":"2023-01-01T00:00:00+00:00","end_time":"2023-01-01T00:00:00+00:00","is_consistent":false,"diff_count":1} diff --git a/dt-tests/tests/mongo_to_mongo/check/recheck_config/src_prepare.sql b/dt-tests/tests/mongo_to_mongo/check/recheck_config/src_prepare.sql new file mode 100644 index 00000000..a9d83298 --- /dev/null +++ b/dt-tests/tests/mongo_to_mongo/check/recheck_config/src_prepare.sql @@ -0,0 +1,3 @@ +use sample_db +db.dropDatabase(); +db.createCollection("sample_tb"); diff --git a/dt-tests/tests/mongo_to_mongo/check/recheck_config/src_test.sql b/dt-tests/tests/mongo_to_mongo/check/recheck_config/src_test.sql new file mode 100644 index 00000000..fb586070 --- /dev/null +++ b/dt-tests/tests/mongo_to_mongo/check/recheck_config/src_test.sql @@ -0,0 +1,3 @@ +use sample_db +db.sample_tb.insertOne({ "_id": 1, "name": "Alice" }); +db.sample_tb.insertOne({ "_id": 2, "name": "Bob" }); diff --git a/dt-tests/tests/mongo_to_mongo/check/recheck_config/task_config.ini b/dt-tests/tests/mongo_to_mongo/check/recheck_config/task_config.ini new file mode 100644 index 00000000..81ce48ad --- /dev/null +++ b/dt-tests/tests/mongo_to_mongo/check/recheck_config/task_config.ini @@ -0,0 +1,39 @@ +[extractor] +db_type=mongo +extract_type=snapshot +url={mongo_extractor_url} +app_name=APE_DTS + +[sinker] +db_type=mongo +sink_type=check +url={mongo_sinker_url} +app_name=recheck_test +recheck_interval_secs=7 +recheck_attempts=2 +batch_size=10 + +[filter] +do_dbs=sample_db +ignore_dbs= +do_tbs=sample_db.sample_tb +ignore_tbs= +do_events=insert + +[router] +db_map= +tb_map= +col_map= + +[parallelizer] +parallel_type=rdb_check +parallel_size=1 + +[pipeline] +buffer_size=2 +checkpoint_interval_secs=1 + +[runtime] +log_level=info +log4rs_file=./log4rs.yaml +log_dir=./logs diff --git a/dt-tests/tests/mongo_to_mongo/check/route_test/expect_check_log/diff.log b/dt-tests/tests/mongo_to_mongo/check/route_test/expect_check_log/diff.log index ad65106c..17ae44ef 100644 --- a/dt-tests/tests/mongo_to_mongo/check/route_test/expect_check_log/diff.log +++ b/dt-tests/tests/mongo_to_mongo/check/route_test/expect_check_log/diff.log @@ -1,8 +1,8 @@ -{"schema":"test_db_1","tb":"tb_1","target_schema":"dst_test_db_1","target_tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4b\"}}"},"diff_col_values":{"age":{"src":"2","dst":"2000"}}} -{"schema":"test_db_1","tb":"tb_1","target_schema":"dst_test_db_1","target_tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4c\"}}"},"diff_col_values":{"age":{"src":"3","dst":"3000"}}} -{"schema":"test_db_1","tb":"tb_2","target_schema":"dst_test_db_1","target_tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de42\"}}"},"diff_col_values":{"age":{"src":"4","dst":"4000"}}} -{"schema":"test_db_1","tb":"tb_2","target_schema":"dst_test_db_1","target_tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de43\"}}"},"diff_col_values":{"age":{"src":"5","dst":"5000"}}} -{"schema":"test_db_2","tb":"tb_1","target_schema":"dst_test_db_2","target_tb":"dst_tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de44\"}}"},"diff_col_values":{"age":{"src":"1","dst":"1000"}}} -{"schema":"test_db_2","tb":"tb_1","target_schema":"dst_test_db_2","target_tb":"dst_tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de48\"}}"},"diff_col_values":{"age":{"src":"5","dst":"5000"}}} -{"schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4c\"}}"},"diff_col_values":{"age":{"src":"4","dst":"4000"}}} -{"schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4d\"}}"},"diff_col_values":{"age":{"src":"5","dst":"5000"}}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_1","target_schema":"dst_test_db_1","target_tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4c\"}}"},"diff_col_values":{"age":{"src":"3","dst":"3000"}}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_1","target_schema":"dst_test_db_1","target_tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4b\"}}"},"diff_col_values":{"age":{"src":"2","dst":"2000"}}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_2","target_schema":"dst_test_db_1","target_tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de43\"}}"},"diff_col_values":{"age":{"src":"5","dst":"5000"}}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_2","target_schema":"dst_test_db_1","target_tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de42\"}}"},"diff_col_values":{"age":{"src":"4","dst":"4000"}}} +{"log_type":"Diff","schema":"test_db_2","tb":"tb_1","target_schema":"dst_test_db_2","target_tb":"dst_tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de44\"}}"},"diff_col_values":{"age":{"src":"1","dst":"1000"}}} +{"log_type":"Diff","schema":"test_db_2","tb":"tb_1","target_schema":"dst_test_db_2","target_tb":"dst_tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de48\"}}"},"diff_col_values":{"age":{"src":"5","dst":"5000"}}} +{"log_type":"Diff","schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4c\"}}"},"diff_col_values":{"age":{"src":"4","dst":"4000"}}} +{"log_type":"Diff","schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4d\"}}"},"diff_col_values":{"age":{"src":"5","dst":"5000"}}} diff --git a/dt-tests/tests/mongo_to_mongo/check/route_test/expect_check_log/miss.log b/dt-tests/tests/mongo_to_mongo/check/route_test/expect_check_log/miss.log index a3fda6d1..af18f674 100644 --- a/dt-tests/tests/mongo_to_mongo/check/route_test/expect_check_log/miss.log +++ b/dt-tests/tests/mongo_to_mongo/check/route_test/expect_check_log/miss.log @@ -1,7 +1,7 @@ -{"schema":"test_db_1","tb":"tb_1","target_schema":"dst_test_db_1","target_tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4d\"}}"}} -{"schema":"test_db_1","tb":"tb_1","target_schema":"dst_test_db_1","target_tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4e\"}}"}} -{"schema":"test_db_1","tb":"tb_2","target_schema":"dst_test_db_1","target_tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de40\"}}"}} -{"schema":"test_db_1","tb":"tb_2","target_schema":"dst_test_db_1","target_tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4f\"}}"}} -{"schema":"test_db_2","tb":"tb_1","target_schema":"dst_test_db_2","target_tb":"dst_tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de46\"}}"}} -{"schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de49\"}}"}} -{"schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4a\"}}"}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_1","target_schema":"dst_test_db_1","target_tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4d\"}}"}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_1","target_schema":"dst_test_db_1","target_tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4e\"}}"}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_2","target_schema":"dst_test_db_1","target_tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4f\"}}"}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_2","target_schema":"dst_test_db_1","target_tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de40\"}}"}} +{"log_type":"Miss","schema":"test_db_2","tb":"tb_1","target_schema":"dst_test_db_2","target_tb":"dst_tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de46\"}}"}} +{"log_type":"Miss","schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4a\"}}"}} +{"log_type":"Miss","schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de49\"}}"}} diff --git a/dt-tests/tests/mongo_to_mongo/check_tests.rs b/dt-tests/tests/mongo_to_mongo/check_tests.rs index 9cf4e30f..28708d60 100644 --- a/dt-tests/tests/mongo_to_mongo/check_tests.rs +++ b/dt-tests/tests/mongo_to_mongo/check_tests.rs @@ -27,4 +27,10 @@ mod test { async fn check_output_revise_sql_test() { TestBase::run_mongo_check_test("mongo_to_mongo/check/output_revise_sql_test").await; } + + #[tokio::test] + #[serial] + async fn check_recheck_test() { + TestBase::run_mongo_check_test("mongo_to_mongo/check/recheck_config").await; + } } diff --git a/dt-tests/tests/mongo_to_mongo/review/basic_test/check_log/diff.log b/dt-tests/tests/mongo_to_mongo/review/basic_test/check_log/diff.log index ed12e7b6..27b4e10f 100644 --- a/dt-tests/tests/mongo_to_mongo/review/basic_test/check_log/diff.log +++ b/dt-tests/tests/mongo_to_mongo/review/basic_test/check_log/diff.log @@ -1,3 +1,6 @@ -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4a\"}}"}} -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4b\"}}"}} -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4c\"}}"}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4b\"}"},"diff_col_values":{}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4c\"}"},"diff_col_values":{}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4i\"}"},"diff_col_values":{}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4j\"}"},"diff_col_values":{}} +{"log_type":"Diff","schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4k\"}"},"diff_col_values":{}} +{"log_type":"Diff","schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4o\"}"},"diff_col_values":{}} diff --git a/dt-tests/tests/mongo_to_mongo/review/basic_test/check_log/miss.log b/dt-tests/tests/mongo_to_mongo/review/basic_test/check_log/miss.log index 32dae424..15a645d7 100644 --- a/dt-tests/tests/mongo_to_mongo/review/basic_test/check_log/miss.log +++ b/dt-tests/tests/mongo_to_mongo/review/basic_test/check_log/miss.log @@ -1,2 +1,5 @@ -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4d\"}}"}} -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4e\"}}"}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4d\"}"},"diff_col_values":{}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4e\"}"},"diff_col_values":{}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4g\"}"},"diff_col_values":{}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4f\"}"},"diff_col_values":{}} +{"log_type":"Miss","schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4m\"}"},"diff_col_values":{}} diff --git a/dt-tests/tests/mongo_to_mongo/review/basic_test/expect_check_log/diff.log b/dt-tests/tests/mongo_to_mongo/review/basic_test/expect_check_log/diff.log index ff504166..1c6f7e4d 100644 --- a/dt-tests/tests/mongo_to_mongo/review/basic_test/expect_check_log/diff.log +++ b/dt-tests/tests/mongo_to_mongo/review/basic_test/expect_check_log/diff.log @@ -1 +1,3 @@ -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4c\"}}"},"diff_col_values":{"age":{"src":"3","dst":"3000"}}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4c\"}}"},"diff_col_values":{"age":{"src":"3","dst":"3000"}}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de43\"}}"},"diff_col_values":{"age":{"src":"5","dst":"5000"}}} +{"log_type":"Diff","schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de48\"}}"},"diff_col_values":{"age":{"src":"5","dst":"5000"}}} diff --git a/dt-tests/tests/mongo_to_mongo/review/basic_test/expect_check_log/miss.log b/dt-tests/tests/mongo_to_mongo/review/basic_test/expect_check_log/miss.log index 4a967da1..d733d7ed 100644 --- a/dt-tests/tests/mongo_to_mongo/review/basic_test/expect_check_log/miss.log +++ b/dt-tests/tests/mongo_to_mongo/review/basic_test/expect_check_log/miss.log @@ -1 +1,2 @@ -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4e\"}}"}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4e\"}}"}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de40\"}}"}} diff --git a/dt-tests/tests/mongo_to_mongo/review/route_test/check_log/diff.log b/dt-tests/tests/mongo_to_mongo/review/route_test/check_log/diff.log index ba0685e5..0e4b068e 100644 --- a/dt-tests/tests/mongo_to_mongo/review/route_test/check_log/diff.log +++ b/dt-tests/tests/mongo_to_mongo/review/route_test/check_log/diff.log @@ -1,8 +1,8 @@ -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4b\"}"}} -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4c\"}"}} -{"schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4i\"}"}} -{"schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4j\"}"}} -{"schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4k\"}"}} -{"schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4o\"}"}} -{"schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4s\"}"}} -{"schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4t\"}"}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4b\"}"},"diff_col_values":{}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4c\"}"},"diff_col_values":{}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4i\"}"},"diff_col_values":{}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4j\"}"},"diff_col_values":{}} +{"log_type":"Diff","schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4k\"}"},"diff_col_values":{}} +{"log_type":"Diff","schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4o\"}"},"diff_col_values":{}} +{"log_type":"Diff","schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4s\"}"},"diff_col_values":{}} +{"log_type":"Diff","schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4t\"}"},"diff_col_values":{}} diff --git a/dt-tests/tests/mongo_to_mongo/review/route_test/check_log/miss.log b/dt-tests/tests/mongo_to_mongo/review/route_test/check_log/miss.log index c74c04ad..976b4c1c 100644 --- a/dt-tests/tests/mongo_to_mongo/review/route_test/check_log/miss.log +++ b/dt-tests/tests/mongo_to_mongo/review/route_test/check_log/miss.log @@ -1,7 +1,7 @@ -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4d\"}"}} -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4e\"}"}} -{"schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4g\"}"}} -{"schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4f\"}"}} -{"schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4m\"}"}} -{"schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4p\"}"}} -{"schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4q\"}"}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4d\"}"},"diff_col_values":{}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4e\"}"},"diff_col_values":{}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4g\"}"},"diff_col_values":{}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4f\"}"},"diff_col_values":{}} +{"log_type":"Miss","schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4m\"}"},"diff_col_values":{}} +{"log_type":"Miss","schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4p\"}"},"diff_col_values":{}} +{"log_type":"Miss","schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4q\"}"},"diff_col_values":{}} diff --git a/dt-tests/tests/mongo_to_mongo/review/route_test/expect_check_log/diff.log b/dt-tests/tests/mongo_to_mongo/review/route_test/expect_check_log/diff.log index a9b6d5bd..8c1c6dea 100644 --- a/dt-tests/tests/mongo_to_mongo/review/route_test/expect_check_log/diff.log +++ b/dt-tests/tests/mongo_to_mongo/review/route_test/expect_check_log/diff.log @@ -1,5 +1,5 @@ -{"schema":"test_db_1","tb":"tb_1","target_schema":"dst_test_db_1","target_tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4c\"}}"},"diff_col_values":{"age":{"src":"3","dst":"3000"}}} -{"schema":"test_db_2","tb":"tb_1","target_schema":"dst_test_db_2","target_tb":"dst_tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de48\"}}"},"diff_col_values":{"age":{"src":"5","dst":"5000"}}} -{"schema":"test_db_1","tb":"tb_2","target_schema":"dst_test_db_1","target_tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de43\"}}"},"diff_col_values":{"age":{"src":"5","dst":"5000"}}} -{"schema":"test_db_1","tb":"tb_2","target_schema":"dst_test_db_1","target_tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de42\"}}"},"diff_col_values":{"age":{"src":"4","dst":"4000"}}} -{"schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4d\"}}"},"diff_col_values":{"age":{"src":"5","dst":"5000"}}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_1","target_schema":"dst_test_db_1","target_tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4c\"}}"},"diff_col_values":{"age":{"src":"3","dst":"3000"}}} +{"log_type":"Diff","schema":"test_db_2","tb":"tb_1","target_schema":"dst_test_db_2","target_tb":"dst_tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de48\"}}"},"diff_col_values":{"age":{"src":"5","dst":"5000"}}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_2","target_schema":"dst_test_db_1","target_tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de43\"}}"},"diff_col_values":{"age":{"src":"5","dst":"5000"}}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_2","target_schema":"dst_test_db_1","target_tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de42\"}}"},"diff_col_values":{"age":{"src":"4","dst":"4000"}}} +{"log_type":"Diff","schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4d\"}}"},"diff_col_values":{"age":{"src":"5","dst":"5000"}}} diff --git a/dt-tests/tests/mongo_to_mongo/review/route_test/expect_check_log/miss.log b/dt-tests/tests/mongo_to_mongo/review/route_test/expect_check_log/miss.log index 10cdbf3a..cc6bae36 100644 --- a/dt-tests/tests/mongo_to_mongo/review/route_test/expect_check_log/miss.log +++ b/dt-tests/tests/mongo_to_mongo/review/route_test/expect_check_log/miss.log @@ -1,3 +1,3 @@ -{"schema":"test_db_1","tb":"tb_1","target_schema":"dst_test_db_1","target_tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4e\"}}"}} -{"schema":"test_db_1","tb":"tb_2","target_schema":"dst_test_db_1","target_tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de40\"}}"}} -{"schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4a\"}}"}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_1","target_schema":"dst_test_db_1","target_tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4e\"}}"}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_2","target_schema":"dst_test_db_1","target_tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de40\"}}"}} +{"log_type":"Miss","schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4a\"}}"}} diff --git a/dt-tests/tests/mongo_to_mongo/revise/basic_test/check_log/diff.log b/dt-tests/tests/mongo_to_mongo/revise/basic_test/check_log/diff.log index 9bba417a..9279fd5c 100644 --- a/dt-tests/tests/mongo_to_mongo/revise/basic_test/check_log/diff.log +++ b/dt-tests/tests/mongo_to_mongo/revise/basic_test/check_log/diff.log @@ -1,6 +1,6 @@ -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4b\"}}"},"diff_col_values":{"age":{"src":"2","dst":"2000"}}} -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4c\"}}"},"diff_col_values":{"age":{"src":"3","dst":"3000"}}} -{"schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de42\"}}"},"diff_col_values":{"age":{"src":"4","dst":"4000"}}} -{"schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de43\"}}"},"diff_col_values":{"age":{"src":"5","dst":"5000"}}} -{"schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de44\"}}"},"diff_col_values":{"age":{"src":"1","dst":"1000"}}} -{"schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de48\"}}"},"diff_col_values":{"age":{"src":"5","dst":"5000"}}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4b\"}}"},"diff_col_values":{"age":{"src":"2","dst":"2000"}}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4c\"}}"},"diff_col_values":{"age":{"src":"3","dst":"3000"}}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de42\"}}"},"diff_col_values":{"age":{"src":"4","dst":"4000"}}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de43\"}}"},"diff_col_values":{"age":{"src":"5","dst":"5000"}}} +{"log_type":"Diff","schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de44\"}}"},"diff_col_values":{"age":{"src":"1","dst":"1000"}}} +{"log_type":"Diff","schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de48\"}}"},"diff_col_values":{"age":{"src":"5","dst":"5000"}}} diff --git a/dt-tests/tests/mongo_to_mongo/revise/basic_test/check_log/miss.log b/dt-tests/tests/mongo_to_mongo/revise/basic_test/check_log/miss.log index d213dacc..49b3afbe 100644 --- a/dt-tests/tests/mongo_to_mongo/revise/basic_test/check_log/miss.log +++ b/dt-tests/tests/mongo_to_mongo/revise/basic_test/check_log/miss.log @@ -1,5 +1,5 @@ -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4d\"}}"}} -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4e\"}}"}} -{"schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4f\"}}"}} -{"schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de40\"}}"}} -{"schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de46\"}}"}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4d\"}}"}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4e\"}}"}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de4f\"}}"}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de40\"}}"}} +{"log_type":"Miss","schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"ObjectId\":{\"$oid\":\"65733a82fb2ce9836745de46\"}}"}} diff --git a/dt-tests/tests/mongo_to_mongo/revise/route_test/check_log/diff.log b/dt-tests/tests/mongo_to_mongo/revise/route_test/check_log/diff.log index ba0685e5..0e4b068e 100644 --- a/dt-tests/tests/mongo_to_mongo/revise/route_test/check_log/diff.log +++ b/dt-tests/tests/mongo_to_mongo/revise/route_test/check_log/diff.log @@ -1,8 +1,8 @@ -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4b\"}"}} -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4c\"}"}} -{"schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4i\"}"}} -{"schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4j\"}"}} -{"schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4k\"}"}} -{"schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4o\"}"}} -{"schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4s\"}"}} -{"schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4t\"}"}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4b\"}"},"diff_col_values":{}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4c\"}"},"diff_col_values":{}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4i\"}"},"diff_col_values":{}} +{"log_type":"Diff","schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4j\"}"},"diff_col_values":{}} +{"log_type":"Diff","schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4k\"}"},"diff_col_values":{}} +{"log_type":"Diff","schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4o\"}"},"diff_col_values":{}} +{"log_type":"Diff","schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4s\"}"},"diff_col_values":{}} +{"log_type":"Diff","schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4t\"}"},"diff_col_values":{}} diff --git a/dt-tests/tests/mongo_to_mongo/revise/route_test/check_log/miss.log b/dt-tests/tests/mongo_to_mongo/revise/route_test/check_log/miss.log index c74c04ad..976b4c1c 100644 --- a/dt-tests/tests/mongo_to_mongo/revise/route_test/check_log/miss.log +++ b/dt-tests/tests/mongo_to_mongo/revise/route_test/check_log/miss.log @@ -1,7 +1,7 @@ -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4d\"}"}} -{"schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4e\"}"}} -{"schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4g\"}"}} -{"schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4f\"}"}} -{"schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4m\"}"}} -{"schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4p\"}"}} -{"schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4q\"}"}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4d\"}"},"diff_col_values":{}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4e\"}"},"diff_col_values":{}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4g\"}"},"diff_col_values":{}} +{"log_type":"Miss","schema":"test_db_1","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4f\"}"},"diff_col_values":{}} +{"log_type":"Miss","schema":"test_db_2","tb":"tb_1","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4m\"}"},"diff_col_values":{}} +{"log_type":"Miss","schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4p\"}"},"diff_col_values":{}} +{"log_type":"Miss","schema":"test_db_2","tb":"tb_2","id_col_values":{"_id":"{\"String\":\"65733a82fb2ce9836745de4q\"}"},"diff_col_values":{}} diff --git a/dt-tests/tests/mysql_to_mysql/check/basic_struct_test/dst_prepare.sql b/dt-tests/tests/mysql_to_mysql/check/basic_struct_test/dst_prepare.sql index c079512d..5ab5f91b 100644 --- a/dt-tests/tests/mysql_to_mysql/check/basic_struct_test/dst_prepare.sql +++ b/dt-tests/tests/mysql_to_mysql/check/basic_struct_test/dst_prepare.sql @@ -101,19 +101,18 @@ CREATE TABLE struct_check_test_1.not_match_column ( ); -- not match: index -CREATE TABLE IF NOT EXISTS struct_check_test_1.not_match_index ( - `id` int(11) NOT NULL, - `index_col` int(11) DEFAULT NULL, - `unique_col` int(11) DEFAULT NULL, - `fulltext_col` text DEFAULT NULL, - `composite_index_col1` int(11) DEFAULT NULL, - `composite_index_col2` int(11) DEFAULT NULL, - `composite_index_col3` int(11) DEFAULT NULL, - PRIMARY KEY (`id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8; +CREATE TABLE struct_check_test_1.not_match_index ( + id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY, + unique_col VARCHAR(255) NOT NULL, + index_col VARCHAR(255), + fulltext_col TEXT, + spatial_col POINT NOT NULL, + simple_index_col VARCHAR(255), + composite_index_col1 VARCHAR(255), + composite_index_col2 VARCHAR(255), + composite_index_col3 VARCHAR(255) +); CREATE INDEX i4_diff_order ON struct_check_test_1.not_match_index (composite_index_col3, composite_index_col2, composite_index_col1); CREATE INDEX i5_diff_name_dst ON struct_check_test_1.not_match_index (index_col); -CREATE UNIQUE INDEX u_index ON struct_check_test_1.not_match_index (unique_col); -CREATE FULLTEXT INDEX f_index ON struct_check_test_1.not_match_index (fulltext_col); -- CREATE INDEX i6_miss ON struct_check_test_1.not_match_index (index_col); \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_mysql/check/basic_struct_test/src_prepare.sql b/dt-tests/tests/mysql_to_mysql/check/basic_struct_test/src_prepare.sql index 4996038d..cc0bc2e1 100644 --- a/dt-tests/tests/mysql_to_mysql/check/basic_struct_test/src_prepare.sql +++ b/dt-tests/tests/mysql_to_mysql/check/basic_struct_test/src_prepare.sql @@ -107,19 +107,18 @@ CREATE TABLE struct_check_test_1.not_match_column ( ); -- not match: index -CREATE TABLE IF NOT EXISTS struct_check_test_1.not_match_index ( - `id` int(11) NOT NULL, - `index_col` int(11) DEFAULT NULL, - `unique_col` int(11) DEFAULT NULL, - `fulltext_col` text DEFAULT NULL, - `composite_index_col1` int(11) DEFAULT NULL, - `composite_index_col2` int(11) DEFAULT NULL, - `composite_index_col3` int(11) DEFAULT NULL, - PRIMARY KEY (`id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8; +CREATE TABLE struct_check_test_1.not_match_index ( + id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY, + unique_col VARCHAR(255) NOT NULL, + index_col VARCHAR(255), + fulltext_col TEXT, + spatial_col POINT NOT NULL, + simple_index_col VARCHAR(255), + composite_index_col1 VARCHAR(255), + composite_index_col2 VARCHAR(255), + composite_index_col3 VARCHAR(255) +); +CREATE INDEX i4_diff_order ON struct_check_test_1.not_match_index (composite_index_col2, composite_index_col1 , composite_index_col3); CREATE INDEX i5_diff_name_src ON struct_check_test_1.not_match_index (index_col); -CREATE UNIQUE INDEX u_index ON struct_check_test_1.not_match_index (unique_col); -CREATE FULLTEXT INDEX f_index ON struct_check_test_1.not_match_index (fulltext_col); -CREATE INDEX i4_diff_order ON struct_check_test_1.not_match_index (composite_index_col2, composite_index_col1, composite_index_col3); CREATE INDEX i6_miss ON struct_check_test_1.not_match_index (index_col); \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_mysql/check/recheck_config/dst_prepare.sql b/dt-tests/tests/mysql_to_mysql/check/recheck_config/dst_prepare.sql new file mode 100644 index 00000000..9096e982 --- /dev/null +++ b/dt-tests/tests/mysql_to_mysql/check/recheck_config/dst_prepare.sql @@ -0,0 +1,3 @@ +DROP DATABASE IF EXISTS test_db_1; +CREATE DATABASE test_db_1; +CREATE TABLE test_db_1.recheck_table (id INT PRIMARY KEY, name VARCHAR(100)); diff --git a/dt-tests/tests/mysql_to_mysql/check/recheck_config/dst_test.sql b/dt-tests/tests/mysql_to_mysql/check/recheck_config/dst_test.sql new file mode 100644 index 00000000..d42a0bd3 --- /dev/null +++ b/dt-tests/tests/mysql_to_mysql/check/recheck_config/dst_test.sql @@ -0,0 +1,2 @@ +INSERT INTO test_db_1.recheck_table (id, name) VALUES (1, 'Alice'); +INSERT INTO test_db_1.recheck_table (id, name) VALUES (2, 'Eve'); -- Diff diff --git a/dt-tests/tests/mysql_to_mysql/check/recheck_config/expect_check_log/diff.log b/dt-tests/tests/mysql_to_mysql/check/recheck_config/expect_check_log/diff.log new file mode 100644 index 00000000..8a43d073 --- /dev/null +++ b/dt-tests/tests/mysql_to_mysql/check/recheck_config/expect_check_log/diff.log @@ -0,0 +1 @@ +{"diff_col_values":{"name":{"src":"Bob","dst":"Eve"}},"id_col_values":{"id":"2"},"log_type":"Diff","schema":"test_db_1","tb":"recheck_table"} diff --git a/dt-tests/tests/mysql_to_mysql/check/recheck_config/expect_check_log/summary.log b/dt-tests/tests/mysql_to_mysql/check/recheck_config/expect_check_log/summary.log new file mode 100644 index 00000000..daf2fed5 --- /dev/null +++ b/dt-tests/tests/mysql_to_mysql/check/recheck_config/expect_check_log/summary.log @@ -0,0 +1 @@ +{"start_time":"2023-01-01T00:00:00+00:00","end_time":"2023-01-01T00:00:00+00:00","is_consistent":false,"diff_count":1} diff --git a/dt-tests/tests/mysql_to_mysql/check/recheck_config/src_prepare.sql b/dt-tests/tests/mysql_to_mysql/check/recheck_config/src_prepare.sql new file mode 100644 index 00000000..9096e982 --- /dev/null +++ b/dt-tests/tests/mysql_to_mysql/check/recheck_config/src_prepare.sql @@ -0,0 +1,3 @@ +DROP DATABASE IF EXISTS test_db_1; +CREATE DATABASE test_db_1; +CREATE TABLE test_db_1.recheck_table (id INT PRIMARY KEY, name VARCHAR(100)); diff --git a/dt-tests/tests/mysql_to_mysql/check/recheck_config/src_test.sql b/dt-tests/tests/mysql_to_mysql/check/recheck_config/src_test.sql new file mode 100644 index 00000000..cd9478af --- /dev/null +++ b/dt-tests/tests/mysql_to_mysql/check/recheck_config/src_test.sql @@ -0,0 +1,2 @@ +INSERT INTO test_db_1.recheck_table (id, name) VALUES (1, 'Alice'); +INSERT INTO test_db_1.recheck_table (id, name) VALUES (2, 'Bob'); diff --git a/dt-tests/tests/mysql_to_mysql/check/recheck_config/task_config.ini b/dt-tests/tests/mysql_to_mysql/check/recheck_config/task_config.ini new file mode 100644 index 00000000..206c1d99 --- /dev/null +++ b/dt-tests/tests/mysql_to_mysql/check/recheck_config/task_config.ini @@ -0,0 +1,37 @@ +[extractor] +db_type=mysql +extract_type=snapshot +url={mysql_extractor_url} + +[sinker] +db_type=mysql +sink_type=check +url={mysql_sinker_url} +recheck_interval_secs=5 +recheck_attempts=4 +batch_size=2 + +[filter] +do_dbs= +ignore_dbs= +do_tbs=test_db_1.recheck_table +ignore_tbs= +do_events=insert + +[router] +db_map= +tb_map= +col_map= + +[parallelizer] +parallel_type=rdb_check +parallel_size=1 + +[pipeline] +buffer_size=2 +checkpoint_interval_secs=1 + +[runtime] +log_level=info +log4rs_file=./log4rs.yaml +log_dir=./logs diff --git a/dt-tests/tests/mysql_to_mysql/check_tests.rs b/dt-tests/tests/mysql_to_mysql/check_tests.rs index 6e95d98c..32422626 100644 --- a/dt-tests/tests/mysql_to_mysql/check_tests.rs +++ b/dt-tests/tests/mysql_to_mysql/check_tests.rs @@ -3,7 +3,8 @@ mod test { use serial_test::serial; - use crate::test_runner::test_base::TestBase; + use crate::{test_config_util::TestConfigUtil, test_runner::test_base::TestBase}; + use dt_common::config::{sinker_config::SinkerConfig, task_config::TaskConfig}; #[tokio::test] #[serial] @@ -72,4 +73,49 @@ mod test { // gen log, and verify log size limit TestBase::run_check_test("mysql_to_mysql/check/log_size_limit_test").await; } + + #[test] + fn check_recheck_default_config_loaded() { + let config_path = + TestConfigUtil::get_absolute_path("mysql_to_mysql/check/basic_test/task_config.ini"); + let config = TaskConfig::new(&config_path).expect("load default check config"); + + match config.sinker { + SinkerConfig::MysqlCheck { + recheck_interval_secs, + recheck_attempts, + .. + } => { + assert_eq!(recheck_interval_secs, 0); + assert_eq!(recheck_attempts, 1); + } + _ => panic!("unexpected sinker config variant"), + } + } + + #[test] + fn check_recheck_config_loaded() { + let config_path = TestConfigUtil::get_absolute_path( + "mysql_to_mysql/check/recheck_config/task_config.ini", + ); + let config = TaskConfig::new(&config_path).expect("load recheck config"); + + match config.sinker { + SinkerConfig::MysqlCheck { + recheck_interval_secs, + recheck_attempts, + .. + } => { + assert_eq!(recheck_interval_secs, 5); + assert_eq!(recheck_attempts, 4); + } + _ => panic!("unexpected sinker config variant"), + } + } + + #[tokio::test] + #[serial] + async fn check_recheck_test() { + TestBase::run_check_test("mysql_to_mysql/check/recheck_config").await; + } } diff --git a/dt-tests/tests/pg_to_pg/check/recheck_config/dst_prepare.sql b/dt-tests/tests/pg_to_pg/check/recheck_config/dst_prepare.sql new file mode 100644 index 00000000..d05b8b82 --- /dev/null +++ b/dt-tests/tests/pg_to_pg/check/recheck_config/dst_prepare.sql @@ -0,0 +1,5 @@ +DROP TABLE IF EXISTS recheck_table; +CREATE TABLE recheck_table ( + id serial PRIMARY KEY, + name text +); diff --git a/dt-tests/tests/pg_to_pg/check/recheck_config/dst_test.sql b/dt-tests/tests/pg_to_pg/check/recheck_config/dst_test.sql new file mode 100644 index 00000000..b383e420 --- /dev/null +++ b/dt-tests/tests/pg_to_pg/check/recheck_config/dst_test.sql @@ -0,0 +1,2 @@ +INSERT INTO recheck_table (id, name) VALUES (1, 'Alice'); +INSERT INTO recheck_table (id, name) VALUES (2, 'Eve'); -- Diff diff --git a/dt-tests/tests/pg_to_pg/check/recheck_config/expect_check_log/diff.log b/dt-tests/tests/pg_to_pg/check/recheck_config/expect_check_log/diff.log new file mode 100644 index 00000000..4303e772 --- /dev/null +++ b/dt-tests/tests/pg_to_pg/check/recheck_config/expect_check_log/diff.log @@ -0,0 +1 @@ +{"diff_col_values":{"name":{"src":"Bob","dst":"Eve"}},"id_col_values":{"id":"2"},"log_type":"Diff","schema":"public","tb":"recheck_table"} diff --git a/dt-tests/tests/pg_to_pg/check/recheck_config/expect_check_log/summary.log b/dt-tests/tests/pg_to_pg/check/recheck_config/expect_check_log/summary.log new file mode 100644 index 00000000..daf2fed5 --- /dev/null +++ b/dt-tests/tests/pg_to_pg/check/recheck_config/expect_check_log/summary.log @@ -0,0 +1 @@ +{"start_time":"2023-01-01T00:00:00+00:00","end_time":"2023-01-01T00:00:00+00:00","is_consistent":false,"diff_count":1} diff --git a/dt-tests/tests/pg_to_pg/check/recheck_config/src_prepare.sql b/dt-tests/tests/pg_to_pg/check/recheck_config/src_prepare.sql new file mode 100644 index 00000000..d05b8b82 --- /dev/null +++ b/dt-tests/tests/pg_to_pg/check/recheck_config/src_prepare.sql @@ -0,0 +1,5 @@ +DROP TABLE IF EXISTS recheck_table; +CREATE TABLE recheck_table ( + id serial PRIMARY KEY, + name text +); diff --git a/dt-tests/tests/pg_to_pg/check/recheck_config/src_test.sql b/dt-tests/tests/pg_to_pg/check/recheck_config/src_test.sql new file mode 100644 index 00000000..4ee600d5 --- /dev/null +++ b/dt-tests/tests/pg_to_pg/check/recheck_config/src_test.sql @@ -0,0 +1,2 @@ +INSERT INTO recheck_table (id, name) VALUES (1, 'Alice'); +INSERT INTO recheck_table (id, name) VALUES (2, 'Bob'); diff --git a/dt-tests/tests/pg_to_pg/check/recheck_config/task_config.ini b/dt-tests/tests/pg_to_pg/check/recheck_config/task_config.ini new file mode 100644 index 00000000..ffc1c2bc --- /dev/null +++ b/dt-tests/tests/pg_to_pg/check/recheck_config/task_config.ini @@ -0,0 +1,38 @@ +[extractor] +db_type=pg +extract_type=snapshot +url={pg_extractor_url} + +[sinker] +db_type=pg +sink_type=check +url={pg_sinker_url} +recheck_interval_secs=2 +recheck_attempts=3 +batch_size=10 + +[filter] +do_dbs= +ignore_dbs= +do_tbs=public.recheck_table +ignore_tbs= +do_events=insert,update,delete,ddl,dcl + + +[router] +db_map= +tb_map= +col_map= + +[parallelizer] +parallel_type=rdb_check +parallel_size=1 + +[pipeline] +buffer_size=2 +checkpoint_interval_secs=1 + +[runtime] +log_level=debug +log4rs_file=./log4rs.yaml +log_dir=./logs diff --git a/dt-tests/tests/pg_to_pg/check_tests.rs b/dt-tests/tests/pg_to_pg/check_tests.rs index 0aaa3bd7..33178007 100644 --- a/dt-tests/tests/pg_to_pg/check_tests.rs +++ b/dt-tests/tests/pg_to_pg/check_tests.rs @@ -3,7 +3,8 @@ mod test { use serial_test::serial; - use crate::test_runner::test_base::TestBase; + use crate::{test_config_util::TestConfigUtil, test_runner::test_base::TestBase}; + use dt_common::config::{sinker_config::SinkerConfig, task_config::TaskConfig}; #[tokio::test] #[serial] @@ -52,4 +53,48 @@ mod test { async fn check_revise_struct_test() { TestBase::run_check_test("pg_to_pg/check/revise_struct_test").await; } + + #[test] + fn check_recheck_default_config_loaded() { + let config_path = + TestConfigUtil::get_absolute_path("pg_to_pg/check/basic_test/task_config.ini"); + let config = TaskConfig::new(&config_path).expect("load default check config"); + + match config.sinker { + SinkerConfig::PgCheck { + recheck_interval_secs, + recheck_attempts, + .. + } => { + assert_eq!(recheck_interval_secs, 0); + assert_eq!(recheck_attempts, 1); + } + _ => panic!("unexpected sinker config variant"), + } + } + + #[test] + fn check_recheck_config_loaded() { + let config_path = + TestConfigUtil::get_absolute_path("pg_to_pg/check/recheck_config/task_config.ini"); + let config = TaskConfig::new(&config_path).expect("load recheck config"); + + match config.sinker { + SinkerConfig::PgCheck { + recheck_interval_secs, + recheck_attempts, + .. + } => { + assert_eq!(recheck_interval_secs, 2); + assert_eq!(recheck_attempts, 3); + } + _ => panic!("unexpected sinker config variant"), + } + } + + #[tokio::test] + #[serial] + async fn check_recheck_test() { + TestBase::run_check_test("pg_to_pg/check/recheck_config").await; + } } diff --git a/dt-tests/tests/pg_to_pg/snapshot/resume_db_test/dst_prepare.sql b/dt-tests/tests/pg_to_pg/snapshot/resume_db_test/dst_prepare.sql index 9f675e46..18b5b6b1 100644 --- a/dt-tests/tests/pg_to_pg/snapshot/resume_db_test/dst_prepare.sql +++ b/dt-tests/tests/pg_to_pg/snapshot/resume_db_test/dst_prepare.sql @@ -11,10 +11,6 @@ CREATE TABLE resume_table_2("p.k" serial, val numeric(20,8), PRIMARY KEY("p.k")) DROP TABLE IF EXISTS resume_table_3; CREATE TABLE resume_table_3(f_0 integer, f_1 integer, PRIMARY KEY(f_0, f_1)); --- test nullable composite unique key -DROP TABLE IF EXISTS nullable_composite_unique_key_table; -CREATE TABLE nullable_composite_unique_key_table (uk1 int, uk2 varchar(10), val int, UNIQUE(uk1, uk2)); - DROP TABLE IF EXISTS "resume_table_*$4"; CREATE TABLE "resume_table_*$4"("p.k" serial, val numeric(20,8), PRIMARY KEY("p.k")); @@ -30,15 +26,6 @@ CREATE TABLE "test_db_*.*"."finished_table_*$2"("p.k" serial, val numeric(20,8), DROP SCHEMA IF EXISTS apecloud_resumer_test CASCADE; CREATE SCHEMA apecloud_resumer_test; -DROP TABLE IF EXISTS bytea_pk_test; -CREATE TABLE bytea_pk_test ( - category_id VARCHAR(50), - binary_id BYTEA, - description TEXT, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - CONSTRAINT pk_bytea_pk_test PRIMARY KEY (category_id, binary_id) -); - CREATE TABLE IF NOT EXISTS apecloud_resumer_test.ape_task_position ( id bigserial PRIMARY KEY, task_id varchar(255) NOT NULL, @@ -48,14 +35,12 @@ CREATE TABLE IF NOT EXISTS apecloud_resumer_test.ape_task_position ( created_at timestamp DEFAULT CURRENT_TIMESTAMP, updated_at timestamp DEFAULT CURRENT_TIMESTAMP, CONSTRAINT uk_task_id_task_type_position_key UNIQUE (task_id, resumer_type, position_key) -); +) insert into apecloud_resumer_test.ape_task_position (task_id, resumer_type, position_key, position_data) values ('resume_db_test_1', 'SnapshotFinished', 'test_db_*.*-finished_table_*$1', '{"type":"RdbSnapshotFinished","db_type":"pg","schema":"test_db_*.*","tb":"finished_table_*$1"}'); insert into apecloud_resumer_test.ape_task_position (task_id, resumer_type, position_key, position_data) values ('resume_db_test_1', 'SnapshotFinished', 'test_db_*.*-finished_table_*$2', '{"type":"RdbSnapshotFinished","db_type":"pg","schema":"test_db_*.*","tb":"finished_table_*$2"}'); -insert into apecloud_resumer_test.ape_task_position (task_id, resumer_type, position_key, position_data) values ('resume_db_test_1', 'SnapshotDoing', 'public-resume_table_1', '{"type":"RdbSnapshot","db_type":"pg","schema":"public","tb":"resume_table_1","order_key":{"single":["pk","1"]}}'); -insert into apecloud_resumer_test.ape_task_position (task_id, resumer_type, position_key, position_data) values ('resume_db_test_1', 'SnapshotDoing', 'public-resume_table_2', '{"type":"RdbSnapshot","db_type":"pg","schema":"public","tb":"resume_table_2","order_key":{"single":["p.k","1"]}}'); -insert into apecloud_resumer_test.ape_task_position (task_id, resumer_type, position_key, position_data) values ('resume_db_test_1', 'SnapshotDoing', 'public-resume_table_3', '{"type":"RdbSnapshot","db_type":"pg","schema":"public","tb":"resume_table_3","order_key":{"composite":[["f_0","1"],["f_1","30"]]}}'); -insert into apecloud_resumer_test.ape_task_position (task_id, resumer_type, position_key, position_data) values ('resume_db_test_1', 'SnapshotDoing', 'public-nullable_composite_unique_key_table', '{"type":"RdbSnapshot","db_type":"pg","schema":"public","tb":"nullable_composite_unique_key_table","order_key":{"composite":[["uk1","6"],["uk2","6"]]}}'); -insert into apecloud_resumer_test.ape_task_position (task_id, resumer_type, position_key, position_data) values ('resume_db_test_1', 'SnapshotDoing', 'public-resume_table_*$4', '{"type":"RdbSnapshot","db_type":"pg","schema":"public","tb":"resume_table_*$4","order_key":{"single":["p.k","1"]}}'); -insert into apecloud_resumer_test.ape_task_position (task_id, resumer_type, position_key, position_data) values ('resume_db_test_1', 'SnapshotDoing', 'test_db_*.*-resume_table_*$5', '{"type":"RdbSnapshot","db_type":"pg","schema":"test_db_*.*","tb":"resume_table_*$5","order_key":{"single":["p.k","1"]}}'); -insert into apecloud_resumer_test.ape_task_position (task_id, resumer_type, position_key, position_data) values ('resume_db_test_1', 'SnapshotDoing', 'public-bytea_pk_test', '{"type":"RdbSnapshot","db_type":"pg","schema":"public","tb":"bytea_pk_test","order_key":{"composite":[["category_id","cat1"],["binary_id","e4bda0e5a5bde4b896e7958c30"]]}}'); +insert into apecloud_resumer_test.ape_task_position (task_id, resumer_type, position_key, position_data) values ('resume_db_test_1', 'SnapshotDoing', 'public-resume_table_1-pk', '{"type":"RdbSnapshot","db_type":"pg","schema":"public","tb":"resume_table_1","order_col":"pk","value":"1"}'); +insert into apecloud_resumer_test.ape_task_position (task_id, resumer_type, position_key, position_data) values ('resume_db_test_1', 'SnapshotDoing', 'public-resume_table_2-p.k', '{"type":"RdbSnapshot","db_type":"pg","schema":"public","tb":"resume_table_2","order_col":"p.k","value":"1"}'); +insert into apecloud_resumer_test.ape_task_position (task_id, resumer_type, position_key, position_data) values ('resume_db_test_1', 'SnapshotDoing', 'public-resume_table_3-f_0', '{"type":"RdbSnapshot","db_type":"pg","schema":"public","tb":"resume_table_3","order_col":"f_0","value":"1"}'); +insert into apecloud_resumer_test.ape_task_position (task_id, resumer_type, position_key, position_data) values ('resume_db_test_1', 'SnapshotDoing', 'public-resume_table_*$4-p.k', '{"type":"RdbSnapshot","db_type":"pg","schema":"public","tb":"resume_table_*$4","order_col":"p.k","value":"1"}'); +insert into apecloud_resumer_test.ape_task_position (task_id, resumer_type, position_key, position_data) values ('resume_db_test_1', 'SnapshotDoing', 'test_db_*.*-resume_table_*$5-p.k', '{"type":"RdbSnapshot","db_type":"pg","schema":"test_db_*.*","tb":"resume_table_*$5","order_col":"p.k","value":"1"}'); \ No newline at end of file