Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
af69964
fix test sql parser
loomts Nov 19, 2025
4683734
cleanup sql
loomts Nov 19, 2025
faea9cc
fix test
loomts Nov 19, 2025
f7e9bc8
output full row
loomts Nov 11, 2025
7f2c11d
revise sql
loomts Nov 11, 2025
a1573a6
add test and try to revise mongodb
loomts Nov 18, 2025
fc39ced
fix revise
loomts Nov 19, 2025
72645f7
fix test sql parser
loomts Nov 19, 2025
fb7db7e
cleanup sql
loomts Nov 19, 2025
7dee62c
fix test
loomts Nov 19, 2025
db12b91
cleanup unwrap
loomts Nov 19, 2025
00d56b1
fix test order
loomts Nov 19, 2025
0a7e27a
docs
loomts Nov 19, 2025
5e67742
add `revise_match_full_row` test for pg& improve checker router
loomts Nov 20, 2025
c029f67
fix docs
loomts Nov 20, 2025
9335642
tmp
loomts Nov 21, 2025
0c79635
fix unwrap row_data
loomts Nov 22, 2025
c6a6335
simplify
loomts Nov 24, 2025
07f2ed7
fix revise sql without " & fix test & add MongoDB command
loomts Nov 25, 2025
98c083c
upd check.md
loomts Nov 25, 2025
0d8e5e9
fix review checker
loomts Nov 25, 2025
e2fbef1
add test
loomts Dec 4, 2025
0025845
fix comments, enhance data checking logic, expected test outputs and …
loomts Dec 4, 2025
432e880
add log4rs file size limiter
loomts Dec 5, 2025
4f662cc
fix tests
loomts Dec 5, 2025
4f75c69
cleanup
loomts Dec 5, 2025
4291ec5
fix test
loomts Dec 5, 2025
0a5b559
revert recheck logics
loomts Dec 6, 2025
f211f10
add extra log for snapshot checker
loomts Dec 7, 2025
29a7e43
revise structure check tests & fix review and revise tests
loomts Dec 8, 2025
0f84df8
fix copy
loomts Dec 8, 2025
d68aabd
cleanup
loomts Dec 8, 2025
0ebe3c5
cleanup and fix test
loomts Dec 8, 2025
6efb3b4
output full row
loomts Nov 11, 2025
4b155a3
cleanup sql
loomts Nov 19, 2025
40b1596
tmp
loomts Nov 21, 2025
09ce81a
stash
loomts Dec 3, 2025
6cfe493
add tests
loomts Dec 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions docs/en/snapshot/check.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,44 @@ The summary log contains the overall result of the check, such as the number of
{"start_time":"2023-09-01T12:00:00+08:00","end_time":"2023-09-01T12:00:01+08:00","is_consistent":false,"miss_count":1,"diff_count":2,"extra_count":1,"sql_count":3}
```

## Output complete rows

When the business needs the full row content for troubleshooting, enable full-row logging in the `[sinker]` section:

```
[sinker]
output_full_row=true
```

When set to `true`, the checker appends `src_row` and `dst_row` to every diff log, and `src_row` to every miss log (full rows are currently available for MySQL, PostgreSQL, and MongoDB; Redis is not supported yet). Example:

```
{
"log_type": "Diff",
"schema": "test_db_1",
"tb": "one_pk_multi_uk",
"id_col_values": {
"f_0": "5"
},
"diff_col_values": {
"f_1": {
"src": "5",
"dst": "5000"
}
},
"src_row": {
"f_0": 5,
"f_1": 5,
"f_2": "ok"
},
"dst_row": {
"f_0": 5,
"f_1": 5000,
"f_2": "after manual update"
}
}
```

# Other configurations

- For [filter] and [router], refer to [config details](../config.md).
Expand Down
6 changes: 6 additions & 0 deletions dt-common/src/config/sinker_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub enum SinkerConfig {
output_full_row: bool,
output_revise_sql: bool,
revise_match_full_row: bool,
recheck_interval_secs: u64,
recheck_attempts: u32,
},

PgCheck {
Expand All @@ -47,6 +49,8 @@ pub enum SinkerConfig {
output_full_row: bool,
output_revise_sql: bool,
revise_match_full_row: bool,
recheck_interval_secs: u64,
recheck_attempts: u32,
},

MongoCheck {
Expand All @@ -57,6 +61,8 @@ pub enum SinkerConfig {
check_log_file_size: String,
output_full_row: bool,
output_revise_sql: bool,
recheck_interval_secs: u64,
recheck_attempts: u32,
},

MysqlStruct {
Expand Down
20 changes: 20 additions & 0 deletions dt-common/src/config/task_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ const CHECK_LOG_DIR: &str = "check_log_dir";
const OUTPUT_FULL_ROW: &str = "output_full_row";
const OUTPUT_REVISE_SQL: &str = "output_revise_sql";
const REVISE_MATCH_FULL_ROW: &str = "revise_match_full_row";
const RECHECK_INTERVAL_SECS: &str = "recheck_interval_secs";
const RECHECK_ATTEMPTS: &str = "recheck_attempts";
const DB_TYPE: &str = "db_type";
const URL: &str = "url";
const BATCH_SIZE: &str = "batch_size";
Expand Down Expand Up @@ -471,6 +473,12 @@ impl TaskConfig {
REVISE_MATCH_FULL_ROW,
false,
),
recheck_interval_secs: loader.get_with_default(
SINKER,
RECHECK_INTERVAL_SECS,
0,
),
recheck_attempts: loader.get_with_default(SINKER, RECHECK_ATTEMPTS, 1),
},

SinkType::Struct => SinkerConfig::MysqlStruct {
Expand Down Expand Up @@ -513,6 +521,12 @@ impl TaskConfig {
REVISE_MATCH_FULL_ROW,
false,
),
recheck_interval_secs: loader.get_with_default(
SINKER,
RECHECK_INTERVAL_SECS,
0,
),
recheck_attempts: loader.get_with_default(SINKER, RECHECK_ATTEMPTS, 1),
},

SinkType::Struct => SinkerConfig::PgStruct {
Expand Down Expand Up @@ -553,6 +567,12 @@ impl TaskConfig {
"output_revise_sql",
false,
),
recheck_interval_secs: loader.get_with_default(
SINKER,
RECHECK_INTERVAL_SECS,
0,
),
recheck_attempts: loader.get_with_default(SINKER, RECHECK_ATTEMPTS, 1),
},

_ => bail! { not_supported_err },
Expand Down
11 changes: 9 additions & 2 deletions dt-connector/src/check_log/check_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ use dt_common::{error::Error, meta::col_value::ColValue, utils::serialize_util::
use serde::{Deserialize, Serialize};
use serde_json::json;

#[derive(Serialize, Deserialize)]
use super::log_type::LogType;

#[derive(Debug, Serialize, Deserialize)]
pub struct CheckLog {
pub log_type: LogType,
pub schema: String,
pub tb: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
Expand Down Expand Up @@ -34,9 +37,11 @@ pub struct CheckLog {
serialize_with = "SerializeUtil::ordered_option_map"
)]
pub dst_row: Option<HashMap<String, ColValue>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub revise_sql: Option<String>,
}

#[derive(Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DiffColValue {
pub src: Option<String>,
pub dst: Option<String>,
Expand Down Expand Up @@ -126,6 +131,7 @@ mod tests {
let mut id_col_values = HashMap::new();
id_col_values.insert("id".to_string(), Some("1".to_string()));
CheckLog {
log_type: LogType::Diff,
schema: "s".into(),
tb: "t".into(),
target_schema: None,
Expand All @@ -134,6 +140,7 @@ mod tests {
diff_col_values: HashMap::new(),
src_row: None,
dst_row: None,
revise_sql: None,
}
}

Expand Down
2 changes: 1 addition & 1 deletion dt-connector/src/check_log/log_type.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
use strum::{EnumString, IntoStaticStr};

#[derive(EnumString, IntoStaticStr, PartialEq, Serialize, Deserialize, Clone)]
#[derive(Debug, EnumString, IntoStaticStr, PartialEq, Serialize, Deserialize, Clone)]
pub enum LogType {
#[strum(serialize = "miss")]
Miss,
Expand Down
10 changes: 5 additions & 5 deletions dt-connector/src/extractor/mysql/mysql_check_extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl BatchCheckExtractor for MysqlCheckExtractor {
row_data.row_type = RowType::Update;
row_data.before = row_data.after.clone();
}
log_info!("extracted row_data = {}", row_data);

self.base_extractor
.push_row(row_data, Position::None)
Expand All @@ -95,14 +96,13 @@ impl MysqlCheckExtractor {
let mut after = HashMap::new();
for (col, value) in check_log.id_col_values.iter() {
let col_type = tb_meta.get_col_type(col)?;
let col_value = if let Some(str) = value {
MysqlColValueConvertor::from_str(col_type, str)?
} else {
ColValue::None
};
let col_value = value.as_deref().map_or(Ok(ColValue::None), |v| {
MysqlColValueConvertor::from_str(col_type, v)
})?;
after.insert(col.to_string(), col_value);
}
let check_row_data = RowData::build_insert_row_data(after, &tb_meta.basic);
log_info!("check_row_data = {}", check_row_data);
result.push(check_row_data);
}
Ok(result)
Expand Down
8 changes: 3 additions & 5 deletions dt-connector/src/extractor/pg/pg_check_extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,9 @@ impl PgCheckExtractor {
let mut after = HashMap::new();
for (col, value) in check_log.id_col_values.iter() {
let col_type = tb_meta.get_col_type(col)?;
let col_value = if let Some(str) = value {
PgColValueConvertor::from_str(col_type, str, &mut self.meta_manager)?
} else {
ColValue::None
};
let col_value = value.as_deref().map_or(Ok(ColValue::None), |v| {
PgColValueConvertor::from_str(col_type, v, &mut self.meta_manager)
})?;
after.insert(col.to_string(), col_value);
}
let check_row_data = RowData::build_insert_row_data(after, &tb_meta.basic);
Expand Down
12 changes: 12 additions & 0 deletions dt-connector/src/rdb_query_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,18 @@ impl RdbQueryBuilder<'_> {
fn get_pg_sql_value(&self, col_value: &ColValue) -> String {
match col_value {
ColValue::Blob(v) => format!(r#"'\x{}'"#, hex::encode(v)),
ColValue::String(_)
| ColValue::RawString(_)
| ColValue::Time(_)
| ColValue::Date(_)
| ColValue::DateTime(_)
| ColValue::Timestamp(_)
| ColValue::Json(_)
| ColValue::Json2(_)
| ColValue::Json3(_)
| ColValue::Set2(_)
| ColValue::Enum2(_)
| ColValue::MongoDoc(_) => Self::quote_pg_string_literal(col_value),
// For numeric types, we should not quote them in SQL
ColValue::Tiny(_)
| ColValue::UnsignedTiny(_)
Expand Down
Loading
Loading