From 0419826bddc0c0f274dff678a37a167b109336db Mon Sep 17 00:00:00 2001 From: Sebastian Bach Date: Mon, 25 Aug 2025 17:03:10 +0200 Subject: [PATCH 01/13] chore: improve DA codebase --- .../src/da/stream_blocks/mod.rs | 2 +- protocol-units/da-sequencer/client/src/lib.rs | 19 +++++++++---------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/networks/movement/movement-full-node/src/da/stream_blocks/mod.rs b/networks/movement/movement-full-node/src/da/stream_blocks/mod.rs index 571ea849d..b292d30a3 100644 --- a/networks/movement/movement-full-node/src/da/stream_blocks/mod.rs +++ b/networks/movement/movement-full-node/src/da/stream_blocks/mod.rs @@ -26,7 +26,7 @@ impl StreamBlocks { .await .expect("gRPC client connection failed."); - let (mut blocks_from_da, _aleert_channel) = client + let (mut blocks_from_da, _alert_channel) = client .stream_read_from_height(StreamReadFromHeightRequest { height: self.from_height }) .await .context("Failed to stream blocks from DA")?; diff --git a/protocol-units/da-sequencer/client/src/lib.rs b/protocol-units/da-sequencer/client/src/lib.rs index d932313e3..49ff1596a 100644 --- a/protocol-units/da-sequencer/client/src/lib.rs +++ b/protocol-units/da-sequencer/client/src/lib.rs @@ -28,7 +28,7 @@ use url::Url; /// Errors thrown by `DaSequencer`. #[derive(Debug, thiserror::Error)] pub enum ClientDaSequencerError { - #[error("Fail to open block stream: {0}")] + #[error("Failed to open block stream: {0}")] FailToOpenBlockStream(String), } @@ -64,12 +64,12 @@ pub trait DaSequencerClient: Clone + Send { /// Grpc implementation of the DA Sequencer client #[derive(Debug, Clone)] pub struct GrpcDaSequencerClient { - client: DaSequencerNodeServiceClient, + client: DaSequencerNodeServiceClient, pub stream_heartbeat_interval_sec: u64, } impl GrpcDaSequencerClient { - /// Creates an http2 connection to the Da Sequencer node service. + /// Creates a http2 connection to the Da Sequencer node service. pub async fn try_connect( connection_url: &Url, stream_heartbeat_interval_sec: u64, @@ -97,8 +97,8 @@ impl GrpcDaSequencerClient { /// Connects to a da sequencer node service using the given connection string. async fn connect( connection_url: Url, - ) -> Result, anyhow::Error> { - tracing::info!("Grpc client connect using :{connection_url}"); + ) -> Result, anyhow::Error> { + tracing::info!("Grpc client connect using: {connection_url}"); let endpoint = Channel::from_shared(connection_url.as_str().to_string())?; // Dynamically configure TLS based on the scheme (http or https) @@ -118,7 +118,7 @@ impl GrpcDaSequencerClient { } impl DaSequencerClient for GrpcDaSequencerClient { - /// Stream reads from a given hestreamight. + /// Stream reads from a given height. async fn stream_read_from_height( &mut self, request: StreamReadFromHeightRequest, @@ -169,7 +169,7 @@ impl DaSequencerClient for GrpcDaSequencerClient { Some(block_response::BlockType::BlockV1(block)) => { // Detect non consecutive height. if block.height != expected_height { - tracing::error!("Not an expected block height from DA: expected:{expected_height} received:{}", block.height); + tracing::error!("Unexpected block height from DA: expected {expected_height} received {}", block.height); // only break because we don't report error in the stream. // The client re connection will detect end of heartbeat and reconnect. break; @@ -254,8 +254,7 @@ pub async fn sign_and_encode_batch( signer: &LoadedSigner, ) -> Result, anyhow::Error> { let signature = signer.sign(&batch_data).await?; - let verifying_key = - ed25519_dalek::VerifyingKey::from_bytes(&signer.public_key().await?.to_bytes())?; + let verifying_key = VerifyingKey::from_bytes(&signer.public_key().await?.to_bytes())?; Ok(serialize_full_node_batch(verifying_key, signature, batch_data)) } @@ -310,7 +309,7 @@ impl DaSequencerClient for EmptyDaSequencerClient { /// Stream reads from a given height. async fn stream_read_from_height( &mut self, - _request: movement_da_sequencer_proto::StreamReadFromHeightRequest, + _request: StreamReadFromHeightRequest, ) -> Result<(StreamReadBlockFromHeight, UnboundedReceiver<()>), ClientDaSequencerError> { let never_ending_stream = stream::pending::>(); let (_alert_tx, alert_rx) = unbounded_channel(); From 19b50046a9606d62209b7d77097ff389f423937a Mon Sep 17 00:00:00 2001 From: Sebastian Bach Date: Mon, 25 Aug 2025 17:04:40 +0200 Subject: [PATCH 02/13] feat(l1-migration): add replay blocks from DA-sequencer utility --- Cargo.lock | 21 +++++++ Cargo.toml | 84 ++++++++++++------------- l1-migration/replay/Cargo.toml | 30 +++++++++ l1-migration/replay/src/lib.rs | 27 ++++++++ l1-migration/replay/src/main.rs | 13 ++++ l1-migration/replay/src/replay.rs | 92 ++++++++++++++++++++++++++++ l1-migration/replay/src/types.rs | 2 + l1-migration/replay/src/types/api.rs | 22 +++++++ l1-migration/replay/src/types/da.rs | 69 +++++++++++++++++++++ 9 files changed, 319 insertions(+), 41 deletions(-) create mode 100644 l1-migration/replay/Cargo.toml create mode 100644 l1-migration/replay/src/lib.rs create mode 100644 l1-migration/replay/src/main.rs create mode 100644 l1-migration/replay/src/replay.rs create mode 100644 l1-migration/replay/src/types.rs create mode 100644 l1-migration/replay/src/types/api.rs create mode 100644 l1-migration/replay/src/types/da.rs diff --git a/Cargo.lock b/Cargo.lock index 53b2d35e6..ae8a16970 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17057,6 +17057,27 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +[[package]] +name = "replay-blocks" +version = "0.3.4" +dependencies = [ + "anyhow", + "aptos-rest-client 0.0.0 (git+https://github.com/movementlabsxyz/aptos-core?rev=1d1cdbbd7fabb80dcb95ba5e23213faa072fab67)", + "aptos-types 0.0.3 (git+https://github.com/movementlabsxyz/aptos-core?rev=1d1cdbbd7fabb80dcb95ba5e23213faa072fab67)", + "bcs 0.1.6 (git+https://github.com/movementlabsxyz/bcs.git?rev=bc16d2d39cabafaabd76173dd1b04b2aa170cf0c)", + "clap 4.5.21", + "hex", + "movement-da-sequencer-client", + "movement-da-sequencer-proto", + "movement-types", + "rocksdb", + "serde_json", + "tokio", + "tokio-stream", + "tracing", + "tracing-subscriber 0.3.18", +] + [[package]] name = "reqwest" version = "0.11.27" diff --git a/Cargo.toml b/Cargo.toml index 786d11b60..6b8b749a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,47 +3,48 @@ resolver = "2" members = [ - "protocol-units/execution/maptos/dof", - "protocol-units/execution/maptos/opt-executor", - "protocol-units/execution/maptos/fin-view", - "protocol-units/execution/maptos/util", - "protocol-units/da/movement/protocol/*", - "protocol-units/sequencing/memseq/*", - "protocol-units/mempool/*", - "protocol-units/syncing/*", - "protocol-units/settlement/mcr/client", - "protocol-units/settlement/mcr/config", - "protocol-units/settlement/mcr/manager", - "protocol-units/settlement/mcr/setup", - "protocol-units/settlement/mcr/runner", - "protocol-units/movement-rest", - "protocol-units/access-control/aptos/account-whitelist", - "util/buildtime", - "util/buildtime/buildtime-helpers", - "util/buildtime/buildtime-macros", - "util/commander", - "util/dot-movement", - "util/flocks", - "util/godfig", - "util/movement-algs", - "util/movement-types", - "util/tracing", - "util/syncador", - "util/collections", - "util/whitelist", - "networks/movement/*", - "benches/*", - "util/signing/interface", - "util/signing/integrations/aptos", - "util/signing/providers/aws-kms", - "util/signing/providers/hashicorp-vault", - "util/signing/testing", - "demo/hsm", - "protocol-units/execution/maptos/framework/releases/*", - "protocol-units/execution/maptos/framework/migrations/*", - "protocol-units/da-sequencer/config", - "protocol-units/da-sequencer/client", - "protocol-units/da-sequencer/node" + "protocol-units/execution/maptos/dof", + "protocol-units/execution/maptos/opt-executor", + "protocol-units/execution/maptos/fin-view", + "protocol-units/execution/maptos/util", + "protocol-units/da/movement/protocol/*", + "protocol-units/sequencing/memseq/*", + "protocol-units/mempool/*", + "protocol-units/syncing/*", + "protocol-units/settlement/mcr/client", + "protocol-units/settlement/mcr/config", + "protocol-units/settlement/mcr/manager", + "protocol-units/settlement/mcr/setup", + "protocol-units/settlement/mcr/runner", + "protocol-units/movement-rest", + "protocol-units/access-control/aptos/account-whitelist", + "util/buildtime", + "util/buildtime/buildtime-helpers", + "util/buildtime/buildtime-macros", + "util/commander", + "util/dot-movement", + "util/flocks", + "util/godfig", + "util/movement-algs", + "util/movement-types", + "util/tracing", + "util/syncador", + "util/collections", + "util/whitelist", + "networks/movement/*", + "benches/*", + "util/signing/interface", + "util/signing/integrations/aptos", + "util/signing/providers/aws-kms", + "util/signing/providers/hashicorp-vault", + "util/signing/testing", + "demo/hsm", + "protocol-units/execution/maptos/framework/releases/*", + "protocol-units/execution/maptos/framework/migrations/*", + "protocol-units/da-sequencer/config", + "protocol-units/da-sequencer/client", + "protocol-units/da-sequencer/node", + "l1-migration/replay" ] [workspace.package] @@ -197,6 +198,7 @@ aptos-indexer-grpc-fullnode = { git = "https://github.com/movementlabsxyz/aptos- aptos-indexer-grpc-table-info = { git = "https://github.com/movementlabsxyz/aptos-core", rev = "1d1cdbbd7fabb80dcb95ba5e23213faa072fab67" } aptos-protos = { git = "https://github.com/movementlabsxyz/aptos-core", rev = "1d1cdbbd7fabb80dcb95ba5e23213faa072fab67" } aptos-release-builder = { git = "https://github.com/movementlabsxyz/aptos-core", rev = "1d1cdbbd7fabb80dcb95ba5e23213faa072fab67" } +aptos-rest-client = { git = "https://github.com/movementlabsxyz/aptos-core", rev = "1d1cdbbd7fabb80dcb95ba5e23213faa072fab67" } aptos-gas-schedule = { git = "https://github.com/movementlabsxyz/aptos-core", rev = "1d1cdbbd7fabb80dcb95ba5e23213faa072fab67" } move-package = { git = "https://github.com/movementlabsxyz/aptos-core", rev = "1d1cdbbd7fabb80dcb95ba5e23213faa072fab67" } movement = { git = "https://github.com/movementlabsxyz/aptos-core", rev = "1d1cdbbd7fabb80dcb95ba5e23213faa072fab67" } diff --git a/l1-migration/replay/Cargo.toml b/l1-migration/replay/Cargo.toml new file mode 100644 index 000000000..c82c07978 --- /dev/null +++ b/l1-migration/replay/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "replay-blocks" +version.workspace = true +edition.workspace = true +license.workspace = true +authors.workspace = true +repository.workspace = true +homepage.workspace = true +publish.workspace = true +rust-version.workspace = true + +[dependencies] +aptos-types = { workspace = true } +anyhow = { workspace = true } +bcs = { workspace = true } +clap = { workspace = true } +hex = { workspace = true } +movement-da-sequencer-client = { workspace = true } +movement-da-sequencer-proto = { workspace = true } +movement-types = { workspace = true } +rocksdb = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +tokio-stream = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +aptos-rest-client = { workspace = true } + +[lints] +workspace = true diff --git a/l1-migration/replay/src/lib.rs b/l1-migration/replay/src/lib.rs new file mode 100644 index 000000000..12b0eae23 --- /dev/null +++ b/l1-migration/replay/src/lib.rs @@ -0,0 +1,27 @@ +use crate::replay::DaReplayTransactions; +use clap::Parser; + +mod replay; +mod types; + +#[derive(Parser)] +#[clap(name = "Movement Da-Sequencer replay tool", author, disable_version_flag = true)] +pub enum ApiReplayTool { + Replay(DaReplayTransactions), + // Validate, +} + +impl ApiReplayTool { + pub async fn run(self) -> anyhow::Result<()> { + match self { + ApiReplayTool::Replay(cmd) => cmd.run().await, + // ApiReplayTool::Validate => Err(anyhow::anyhow!("Validation tool is unimplemented")), + } + } +} + +#[test] +fn verify_tool() { + use clap::CommandFactory; + ApiReplayTool::command().debug_assert() +} diff --git a/l1-migration/replay/src/main.rs b/l1-migration/replay/src/main.rs new file mode 100644 index 000000000..870fe21f4 --- /dev/null +++ b/l1-migration/replay/src/main.rs @@ -0,0 +1,13 @@ +#[tokio::main] +async fn main() -> anyhow::Result<()> { + use clap::Parser; + use tracing_subscriber::EnvFilter; + + tracing_subscriber::fmt() + .with_env_filter( + EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")), + ) + .init(); + + replay_blocks::ApiReplayTool::parse().run().await +} diff --git a/l1-migration/replay/src/replay.rs b/l1-migration/replay/src/replay.rs new file mode 100644 index 000000000..9f46f5292 --- /dev/null +++ b/l1-migration/replay/src/replay.rs @@ -0,0 +1,92 @@ +use crate::types::api::AptosRestClient; +use crate::types::da::{DaSequencerClient, DaSequencerDb}; +use anyhow::Context; +use aptos_types::transaction::SignedTransaction; +use clap::{Args, Parser}; +use movement_types::block::Block; +use std::path::PathBuf; +use tokio_stream::StreamExt; +use tracing::{error, info}; + +#[derive(Parser)] +#[clap(name = "replay", about = "Stream transactions from DA-sequencer blocks")] +pub struct DaReplayTransactions { + #[clap(value_parser)] + #[clap(long = "api", help = "The url of the Aptos validator node api endpoint")] + pub aptos_api_url: String, + #[clap(long = "da", help = "The url of the DA-Sequencer")] + pub da_sequencer_url: String, + #[command(flatten)] + da_sequencer_db: DaBlockHeight, +} + +#[derive(Args)] +#[group(required = true, multiple = false)] +pub struct DaBlockHeight { + #[arg(long = "da-db", help = "Path to the DA-Sequencer database")] + pub path: Option, + #[arg(long = "da-height", help = "Synced DA-Sequencer block height")] + pub height: Option, +} + +impl DaReplayTransactions { + pub async fn run(self) -> anyhow::Result<()> { + let block_height = match (self.da_sequencer_db.height, self.da_sequencer_db.path) { + (Some(height), _) => height, + (_, Some(ref path)) => get_da_block_height(path)?, + _ => unreachable!(), + }; + let da_sequencer_client = DaSequencerClient::try_connect(&self.da_sequencer_url).await?; + let rest_client = AptosRestClient::new(&self.aptos_api_url)?; + stream_transactions(&rest_client, &da_sequencer_client, block_height + 1).await + } +} + +#[test] +fn verify_tool() { + use clap::CommandFactory; + DaReplayTransactions::command().debug_assert() +} + +fn get_da_block_height(path_buf: &PathBuf) -> Result { + let db = DaSequencerDb::open(path_buf)?; + db.get_synced_height() +} + +async fn stream_transactions( + rest_client: &AptosRestClient, + da_sequencer_client: &DaSequencerClient, + block_height: u64, +) -> anyhow::Result<()> { + let mut blocks = da_sequencer_client + .stream_blocks_from_height(0) + .await + .context("Failed to stream blocks from DA")?; + + info!("streaming blocks from DA, starting at block_height: {}", block_height); + + if let Some(block_res) = blocks.next().await { + let block = block_res.context("Failed to get next block from DA")?; + info!("block at DA height {}: 0x{}", block.height, hex::encode(block.block_id)); + let block = bcs::from_bytes::<'_, Block>(block.data.as_ref()) + .context("Failed to deserialize Movement block")?; + + for transaction in block.transactions() { + info!("processing transaction 0x{}", transaction.id()); + let aptos_transaction = bcs::from_bytes::<'_, SignedTransaction>(transaction.data()) + .context("Failed to deserialize Aptos transaction")?; + + info!( + "Submitting Aptos transaction {}", + aptos_transaction.committed_hash().to_hex_literal() + ); + rest_client + .submit_bcs(&aptos_transaction) + .await + .context("Failed to submit the Aptos transaction")?; + } + } + + error!("Broken DA stream"); + Err(anyhow::anyhow!("Broken DA stream")) +} diff --git a/l1-migration/replay/src/types.rs b/l1-migration/replay/src/types.rs new file mode 100644 index 000000000..f317d6974 --- /dev/null +++ b/l1-migration/replay/src/types.rs @@ -0,0 +1,2 @@ +pub mod api; +pub mod da; diff --git a/l1-migration/replay/src/types/api.rs b/l1-migration/replay/src/types/api.rs new file mode 100644 index 000000000..f31b0ad93 --- /dev/null +++ b/l1-migration/replay/src/types/api.rs @@ -0,0 +1,22 @@ +use aptos_rest_client::Client; +use std::ops::Deref; + +pub struct AptosRestClient(Client); + +impl AptosRestClient { + pub fn new(url: &str) -> Result { + let client = Client::new( + url.parse() + .map_err(|e| anyhow::anyhow!("Failed to parse Aptos rest api url: {}", e))?, + ); + Ok(Self(client)) + } +} + +impl Deref for AptosRestClient { + type Target = Client; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} diff --git a/l1-migration/replay/src/types/da.rs b/l1-migration/replay/src/types/da.rs new file mode 100644 index 000000000..880edbade --- /dev/null +++ b/l1-migration/replay/src/types/da.rs @@ -0,0 +1,69 @@ +use movement_da_sequencer_client::DaSequencerClient as _; +use movement_da_sequencer_client::{GrpcDaSequencerClient, StreamReadBlockFromHeight}; +use movement_da_sequencer_proto::StreamReadFromHeightRequest; +use rocksdb::{ColumnFamilyDescriptor, DB}; +use std::cell::Cell; +use std::path::Path; + +pub struct DaSequencerClient(Cell>); + +impl DaSequencerClient { + pub async fn try_connect(url: &str) -> Result { + let client = GrpcDaSequencerClient::try_connect( + &url.parse() + .map_err(|e| anyhow::anyhow!("Failed to parse DA-Sequencer url: {}", e))?, + 10, + ) + .await?; + Ok(Self(Cell::new(Some(client)))) + } + + pub async fn stream_blocks_from_height( + &self, + block_height: u64, + ) -> Result { + let mut client = self.0.take().expect("Da-sequencer client should be always avaialable"); + let request = StreamReadFromHeightRequest { height: block_height }; + let result = client.stream_read_from_height(request).await; + self.0.set(Some(client)); + let (blocks, _) = result?; + Ok(blocks) + } +} + +const SYNCED_HEIGHT: &str = "synced_height"; + +pub struct DaSequencerDb(DB); + +impl DaSequencerDb { + pub fn open(path: impl AsRef) -> anyhow::Result { + let options = rocksdb::Options::default(); + let synced_height = ColumnFamilyDescriptor::new(SYNCED_HEIGHT, rocksdb::Options::default()); + let db = DB::open_cf_descriptors(&options, path, vec![synced_height]) + .map_err(|e| anyhow::anyhow!("Failed to open DA-Sequencer DB: {:?}", e))?; + + Ok(Self(db)) + } + + /// Get the synced height marker stored in the database. + pub fn get_synced_height(&self) -> Result { + // This is heavy for this purpose, but progressively the contents of the DA DB will be used for more things + let height = { + let cf = self + .0 + .cf_handle(SYNCED_HEIGHT) + .ok_or(anyhow::anyhow!("No synced_height column family"))?; + let height = self + .0 + .get_cf(&cf, "synced_height") + .map_err(|e| anyhow::anyhow!("Failed to get synced height: {:?}", e))?; + let height = match height { + Some(height) => serde_json::from_slice(&height) + .map_err(|e| anyhow::anyhow!("Failed to deserialize synced height: {:?}", e))?, + None => 0, + }; + Ok::(height) + }?; + Ok(height) + } +} From 550d98b6f48f5a1715c9ab72983eb3943680c78b Mon Sep 17 00:00:00 2001 From: Sebastian Bach Date: Mon, 25 Aug 2025 17:35:57 +0200 Subject: [PATCH 03/13] feat(l1-migration): add extract block height from DA-sequencer utility --- l1-migration/replay/src/da_height.rs | 25 +++++++++++++++++++++++++ l1-migration/replay/src/lib.rs | 6 ++++-- 2 files changed, 29 insertions(+), 2 deletions(-) create mode 100644 l1-migration/replay/src/da_height.rs diff --git a/l1-migration/replay/src/da_height.rs b/l1-migration/replay/src/da_height.rs new file mode 100644 index 000000000..d50df9b98 --- /dev/null +++ b/l1-migration/replay/src/da_height.rs @@ -0,0 +1,25 @@ +use crate::types::da::DaSequencerDb; +use clap::Parser; +use std::path::PathBuf; + +#[derive(Parser)] +#[clap(name = "da-height", about = "Extract synced block height from the DA-sequencer database")] +pub struct DaHeight { + #[arg(long, help = "Path to the DA-Sequencer database")] + path: PathBuf, +} + +impl DaHeight { + pub fn run(self) -> anyhow::Result<()> { + let db = DaSequencerDb::open(self.path)?; + let height = db.get_synced_height()?; + println!("{}", height); + Ok(()) + } +} + +#[test] +fn verify_tool() { + use clap::CommandFactory; + DaHeight::command().debug_assert() +} diff --git a/l1-migration/replay/src/lib.rs b/l1-migration/replay/src/lib.rs index 12b0eae23..f52c99095 100644 --- a/l1-migration/replay/src/lib.rs +++ b/l1-migration/replay/src/lib.rs @@ -1,6 +1,8 @@ +use crate::da_height::DaHeight; use crate::replay::DaReplayTransactions; use clap::Parser; +mod da_height; mod replay; mod types; @@ -8,14 +10,14 @@ mod types; #[clap(name = "Movement Da-Sequencer replay tool", author, disable_version_flag = true)] pub enum ApiReplayTool { Replay(DaReplayTransactions), - // Validate, + ExtractDaHeight(DaHeight), } impl ApiReplayTool { pub async fn run(self) -> anyhow::Result<()> { match self { ApiReplayTool::Replay(cmd) => cmd.run().await, - // ApiReplayTool::Validate => Err(anyhow::anyhow!("Validation tool is unimplemented")), + ApiReplayTool::ExtractDaHeight(cmd) => cmd.run(), } } } From 63c81cac215a1982c8bbf3023ef18e51d3b5d94f Mon Sep 17 00:00:00 2001 From: Sebastian Bach Date: Mon, 25 Aug 2025 17:41:47 +0200 Subject: [PATCH 04/13] feat(l1-migration): add extract block height from DA-sequencer utility --- l1-migration/replay/src/da_height.rs | 2 +- l1-migration/replay/src/types/da.rs | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/l1-migration/replay/src/da_height.rs b/l1-migration/replay/src/da_height.rs index d50df9b98..edf50f36d 100644 --- a/l1-migration/replay/src/da_height.rs +++ b/l1-migration/replay/src/da_height.rs @@ -5,7 +5,7 @@ use std::path::PathBuf; #[derive(Parser)] #[clap(name = "da-height", about = "Extract synced block height from the DA-sequencer database")] pub struct DaHeight { - #[arg(long, help = "Path to the DA-Sequencer database")] + #[arg(help = "Path to the DA-Sequencer database")] path: PathBuf, } diff --git a/l1-migration/replay/src/types/da.rs b/l1-migration/replay/src/types/da.rs index 880edbade..47c3aeb63 100644 --- a/l1-migration/replay/src/types/da.rs +++ b/l1-migration/replay/src/types/da.rs @@ -32,6 +32,7 @@ impl DaSequencerClient { } const SYNCED_HEIGHT: &str = "synced_height"; +pub const EXECUTED_BLOCKS: &str = "executed_blocks"; pub struct DaSequencerDb(DB); @@ -39,7 +40,9 @@ impl DaSequencerDb { pub fn open(path: impl AsRef) -> anyhow::Result { let options = rocksdb::Options::default(); let synced_height = ColumnFamilyDescriptor::new(SYNCED_HEIGHT, rocksdb::Options::default()); - let db = DB::open_cf_descriptors(&options, path, vec![synced_height]) + let executed_blocks = + ColumnFamilyDescriptor::new(EXECUTED_BLOCKS, rocksdb::Options::default()); + let db = DB::open_cf_descriptors(&options, path, vec![synced_height, executed_blocks]) .map_err(|e| anyhow::anyhow!("Failed to open DA-Sequencer DB: {:?}", e))?; Ok(Self(db)) From dbdad35c8ccd47b6f097fa4b7b7f92a1020be9ae Mon Sep 17 00:00:00 2001 From: Sebastian Bach Date: Tue, 26 Aug 2025 14:23:36 +0200 Subject: [PATCH 05/13] feat(l1-migration): read and replay batches of transactions from DA-sequencer in separate tasks --- Cargo.lock | 3 +- l1-migration/replay/Cargo.toml | 3 +- l1-migration/replay/src/replay.rs | 85 +++++++++++++++++++---------- l1-migration/replay/src/types/da.rs | 36 +++++++++++- 4 files changed, 96 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ae8a16970..79f22c9b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17064,9 +17064,10 @@ dependencies = [ "anyhow", "aptos-rest-client 0.0.0 (git+https://github.com/movementlabsxyz/aptos-core?rev=1d1cdbbd7fabb80dcb95ba5e23213faa072fab67)", "aptos-types 0.0.3 (git+https://github.com/movementlabsxyz/aptos-core?rev=1d1cdbbd7fabb80dcb95ba5e23213faa072fab67)", + "async-stream", "bcs 0.1.6 (git+https://github.com/movementlabsxyz/bcs.git?rev=bc16d2d39cabafaabd76173dd1b04b2aa170cf0c)", "clap 4.5.21", - "hex", + "futures", "movement-da-sequencer-client", "movement-da-sequencer-proto", "movement-types", diff --git a/l1-migration/replay/Cargo.toml b/l1-migration/replay/Cargo.toml index c82c07978..3353f9556 100644 --- a/l1-migration/replay/Cargo.toml +++ b/l1-migration/replay/Cargo.toml @@ -11,10 +11,11 @@ rust-version.workspace = true [dependencies] aptos-types = { workspace = true } +async-stream = { workspace = true } anyhow = { workspace = true } bcs = { workspace = true } clap = { workspace = true } -hex = { workspace = true } +futures = { workspace = true } movement-da-sequencer-client = { workspace = true } movement-da-sequencer-proto = { workspace = true } movement-types = { workspace = true } diff --git a/l1-migration/replay/src/replay.rs b/l1-migration/replay/src/replay.rs index 9f46f5292..8d754248d 100644 --- a/l1-migration/replay/src/replay.rs +++ b/l1-migration/replay/src/replay.rs @@ -3,8 +3,11 @@ use crate::types::da::{DaSequencerClient, DaSequencerDb}; use anyhow::Context; use aptos_types::transaction::SignedTransaction; use clap::{Args, Parser}; -use movement_types::block::Block; +use futures::pin_mut; use std::path::PathBuf; +use std::time::Duration; +use tokio::sync::mpsc; +use tokio::task::JoinSet; use tokio_stream::StreamExt; use tracing::{error, info}; @@ -38,7 +41,7 @@ impl DaReplayTransactions { }; let da_sequencer_client = DaSequencerClient::try_connect(&self.da_sequencer_url).await?; let rest_client = AptosRestClient::new(&self.aptos_api_url)?; - stream_transactions(&rest_client, &da_sequencer_client, block_height + 1).await + stream_transactions(rest_client, da_sequencer_client, block_height + 1).await } } @@ -54,38 +57,64 @@ fn get_da_block_height(path_buf: &PathBuf) -> Result { } async fn stream_transactions( - rest_client: &AptosRestClient, - da_sequencer_client: &DaSequencerClient, + rest_client: AptosRestClient, + da_sequencer_client: DaSequencerClient, block_height: u64, ) -> anyhow::Result<()> { - let mut blocks = da_sequencer_client - .stream_blocks_from_height(0) - .await - .context("Failed to stream blocks from DA")?; + let stream = da_sequencer_client + .stream_transactions_from_height(block_height) + .await? + .chunks_timeout(10, Duration::from_secs(1)); + let (tx, mut rx) = mpsc::channel::>(10); + let mut tasks = JoinSet::new(); - info!("streaming blocks from DA, starting at block_height: {}", block_height); + // Spawn a task which fetches transaction batches ahead + tasks.spawn(async move { + pin_mut!(stream); + while let Some(txns) = stream.next().await { + let txns = txns + .into_iter() + .collect::, _>>() + .context("Failed to get the next batch of Aptos transactions"); - if let Some(block_res) = blocks.next().await { - let block = block_res.context("Failed to get next block from DA")?; - info!("block at DA height {}: 0x{}", block.height, hex::encode(block.block_id)); - let block = bcs::from_bytes::<'_, Block>(block.data.as_ref()) - .context("Failed to deserialize Movement block")?; - - for transaction in block.transactions() { - info!("processing transaction 0x{}", transaction.id()); - let aptos_transaction = bcs::from_bytes::<'_, SignedTransaction>(transaction.data()) - .context("Failed to deserialize Aptos transaction")?; + match txns { + Ok(txns) => { + if let Err(_) = tx.send(txns).await { + // channel is closed + break; + } + } + Err(err) => { + error!("{err}"); + break; + } + } + } + }); - info!( - "Submitting Aptos transaction {}", - aptos_transaction.committed_hash().to_hex_literal() - ); - rest_client - .submit_bcs(&aptos_transaction) - .await - .context("Failed to submit the Aptos transaction")?; + // Spawn a task which submits transaction batches to the validator node + tasks.spawn(async move { + while let Some(txns) = rx.recv().await { + match rest_client.submit_batch_bcs(&txns).await { + Ok(result) => { + info!("Submitted {} Aptos transaction(s)", txns.len()); + for failure in result.into_inner().transaction_failures { + let txn = &txns[failure.transaction_index]; + let hash = txn.committed_hash().to_hex_literal(); + error!("Failed to submit Aptos transaction {}: {}", hash, failure.error); + } + } + Err(e) => { + error!("Failed to submit {} transaction(s): {}", txns.len(), e); + break; + } + } } - } + }); + + // If one of the tasks has finished then something went wrong + tasks.join_next().await; + tasks.shutdown().await; error!("Broken DA stream"); Err(anyhow::anyhow!("Broken DA stream")) diff --git a/l1-migration/replay/src/types/da.rs b/l1-migration/replay/src/types/da.rs index 47c3aeb63..8767fb922 100644 --- a/l1-migration/replay/src/types/da.rs +++ b/l1-migration/replay/src/types/da.rs @@ -1,9 +1,14 @@ +use anyhow::Context; +use aptos_types::transaction::SignedTransaction; +use futures::Stream; +use futures::TryStreamExt; use movement_da_sequencer_client::DaSequencerClient as _; use movement_da_sequencer_client::{GrpcDaSequencerClient, StreamReadBlockFromHeight}; use movement_da_sequencer_proto::StreamReadFromHeightRequest; use rocksdb::{ColumnFamilyDescriptor, DB}; use std::cell::Cell; use std::path::Path; +use tracing::info; pub struct DaSequencerClient(Cell>); @@ -22,13 +27,42 @@ impl DaSequencerClient { &self, block_height: u64, ) -> Result { - let mut client = self.0.take().expect("Da-sequencer client should be always avaialable"); + let Some(mut client) = self.0.take() else { unreachable!() }; let request = StreamReadFromHeightRequest { height: block_height }; let result = client.stream_read_from_height(request).await; self.0.set(Some(client)); let (blocks, _) = result?; Ok(blocks) } + + pub async fn stream_transactions_from_height( + &self, + block_height: u64, + ) -> Result> + Send, anyhow::Error> + { + let mut blocks = self + .stream_blocks_from_height(block_height) + .await + .context("Failed to stream blocks from DA")?; + + let stream = async_stream::try_stream! { + while let Some(da_block) = blocks.try_next().await.context("Failed to get next block from DA")? { + let block = bcs::from_bytes::<'_, movement_types::block::Block>(da_block.data.as_ref()) + .context("Failed to deserialize Movement block")?; + let txns = block.transactions(); + + info!("processing block at DA height {} with {} transaction(s)", da_block.height, txns.len()); + + for txn in txns { + let aptos_transaction = bcs::from_bytes::<'_, SignedTransaction>(txn.data()) + .context("Failed to deserialize Aptos transaction")?; + yield aptos_transaction; + } + } + }; + + Ok(stream) + } } const SYNCED_HEIGHT: &str = "synced_height"; From 705bd2a52725d8e5533d17824ceba0e2579418a7 Mon Sep 17 00:00:00 2001 From: Sebastian Bach Date: Wed, 27 Aug 2025 17:39:32 +0200 Subject: [PATCH 06/13] feat(l1-migration): move replay tool into movement-full-node --- Cargo.lock | 25 ++----------- Cargo.toml | 1 - l1-migration/replay/Cargo.toml | 31 ---------------- l1-migration/replay/src/main.rs | 13 ------- .../movement/movement-full-node/Cargo.toml | 3 ++ .../src/admin/l1_migration/mod.rs | 19 ++++++++++ .../admin/l1_migration/replay}/da_height.rs | 8 ++-- .../src/admin/l1_migration/replay/mod.rs | 8 ++-- .../src/admin/l1_migration/replay}/replay.rs | 14 +++---- .../src/admin/l1_migration/replay}/types.rs | 0 .../admin/l1_migration/replay}/types/api.rs | 0 .../admin/l1_migration/replay}/types/da.rs | 37 +++---------------- .../movement-full-node/src/admin/mod.rs | 4 ++ .../movement-full-node/src/node/mod.rs | 2 +- 14 files changed, 50 insertions(+), 115 deletions(-) delete mode 100644 l1-migration/replay/Cargo.toml delete mode 100644 l1-migration/replay/src/main.rs create mode 100644 networks/movement/movement-full-node/src/admin/l1_migration/mod.rs rename {l1-migration/replay/src => networks/movement/movement-full-node/src/admin/l1_migration/replay}/da_height.rs (70%) rename l1-migration/replay/src/lib.rs => networks/movement/movement-full-node/src/admin/l1_migration/replay/mod.rs (69%) rename {l1-migration/replay/src => networks/movement/movement-full-node/src/admin/l1_migration/replay}/replay.rs (89%) rename {l1-migration/replay/src => networks/movement/movement-full-node/src/admin/l1_migration/replay}/types.rs (100%) rename {l1-migration/replay/src => networks/movement/movement-full-node/src/admin/l1_migration/replay}/types/api.rs (100%) rename {l1-migration/replay/src => networks/movement/movement-full-node/src/admin/l1_migration/replay}/types/da.rs (63%) diff --git a/Cargo.lock b/Cargo.lock index 79f22c9b3..5f7e15654 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14421,7 +14421,10 @@ dependencies = [ "aptos-crypto 0.0.3 (git+https://github.com/movementlabsxyz/aptos-core?rev=1d1cdbbd7fabb80dcb95ba5e23213faa072fab67)", "aptos-framework-biarritz-rc1-to-pre-l1-merge-migration", "aptos-framework-elsa-to-biarritz-rc1-migration", + "aptos-rest-client 0.0.0 (git+https://github.com/movementlabsxyz/aptos-core?rev=1d1cdbbd7fabb80dcb95ba5e23213faa072fab67)", "aptos-sdk 0.0.3 (git+https://github.com/movementlabsxyz/aptos-core?rev=1d1cdbbd7fabb80dcb95ba5e23213faa072fab67)", + "aptos-types 0.0.3 (git+https://github.com/movementlabsxyz/aptos-core?rev=1d1cdbbd7fabb80dcb95ba5e23213faa072fab67)", + "async-stream", "bcs 0.1.6 (git+https://github.com/movementlabsxyz/bcs.git?rev=bc16d2d39cabafaabd76173dd1b04b2aa170cf0c)", "clap 4.5.21", "console-subscriber", @@ -17057,28 +17060,6 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" -[[package]] -name = "replay-blocks" -version = "0.3.4" -dependencies = [ - "anyhow", - "aptos-rest-client 0.0.0 (git+https://github.com/movementlabsxyz/aptos-core?rev=1d1cdbbd7fabb80dcb95ba5e23213faa072fab67)", - "aptos-types 0.0.3 (git+https://github.com/movementlabsxyz/aptos-core?rev=1d1cdbbd7fabb80dcb95ba5e23213faa072fab67)", - "async-stream", - "bcs 0.1.6 (git+https://github.com/movementlabsxyz/bcs.git?rev=bc16d2d39cabafaabd76173dd1b04b2aa170cf0c)", - "clap 4.5.21", - "futures", - "movement-da-sequencer-client", - "movement-da-sequencer-proto", - "movement-types", - "rocksdb", - "serde_json", - "tokio", - "tokio-stream", - "tracing", - "tracing-subscriber 0.3.18", -] - [[package]] name = "reqwest" version = "0.11.27" diff --git a/Cargo.toml b/Cargo.toml index 6b8b749a0..927ce3e70 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,6 @@ members = [ "protocol-units/da-sequencer/config", "protocol-units/da-sequencer/client", "protocol-units/da-sequencer/node", - "l1-migration/replay" ] [workspace.package] diff --git a/l1-migration/replay/Cargo.toml b/l1-migration/replay/Cargo.toml deleted file mode 100644 index 3353f9556..000000000 --- a/l1-migration/replay/Cargo.toml +++ /dev/null @@ -1,31 +0,0 @@ -[package] -name = "replay-blocks" -version.workspace = true -edition.workspace = true -license.workspace = true -authors.workspace = true -repository.workspace = true -homepage.workspace = true -publish.workspace = true -rust-version.workspace = true - -[dependencies] -aptos-types = { workspace = true } -async-stream = { workspace = true } -anyhow = { workspace = true } -bcs = { workspace = true } -clap = { workspace = true } -futures = { workspace = true } -movement-da-sequencer-client = { workspace = true } -movement-da-sequencer-proto = { workspace = true } -movement-types = { workspace = true } -rocksdb = { workspace = true } -serde_json = { workspace = true } -tokio = { workspace = true } -tokio-stream = { workspace = true } -tracing = { workspace = true } -tracing-subscriber = { workspace = true } -aptos-rest-client = { workspace = true } - -[lints] -workspace = true diff --git a/l1-migration/replay/src/main.rs b/l1-migration/replay/src/main.rs deleted file mode 100644 index 870fe21f4..000000000 --- a/l1-migration/replay/src/main.rs +++ /dev/null @@ -1,13 +0,0 @@ -#[tokio::main] -async fn main() -> anyhow::Result<()> { - use clap::Parser; - use tracing_subscriber::EnvFilter; - - tracing_subscriber::fmt() - .with_env_filter( - EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")), - ) - .init(); - - replay_blocks::ApiReplayTool::parse().run().await -} diff --git a/networks/movement/movement-full-node/Cargo.toml b/networks/movement/movement-full-node/Cargo.toml index a86b421ad..90585bed6 100644 --- a/networks/movement/movement-full-node/Cargo.toml +++ b/networks/movement/movement-full-node/Cargo.toml @@ -48,6 +48,8 @@ movement-signer = { workspace = true } movement-signer-loader = { workspace = true } syncador = { workspace = true } syncup = { workspace = true } +aptos-rest-client = { workspace = true } +aptos-types = { workspace = true } aptos-crypto = { workspace = true } aptos-sdk = { workspace = true } aptos-api = { workspace = true } @@ -62,6 +64,7 @@ movement-da-light-node-setup = { workspace = true } movement-da-sequencer-config = { workspace = true } ed25519-dalek = { workspace = true } mcr-settlement-setup = { workspace = true } +async-stream = { workspace = true } url = { workspace = true } diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/mod.rs b/networks/movement/movement-full-node/src/admin/l1_migration/mod.rs new file mode 100644 index 000000000..e8f9cbc25 --- /dev/null +++ b/networks/movement/movement-full-node/src/admin/l1_migration/mod.rs @@ -0,0 +1,19 @@ +use crate::admin::l1_migration::replay::ApiReplayTool; +use clap::Subcommand; + +mod replay; + +#[derive(Subcommand, Debug)] +#[clap(rename_all = "kebab-case", about = "Commands for rotating keys")] +pub enum L1Migration { + #[clap(subcommand)] + Replay(ApiReplayTool), +} + +impl L1Migration { + pub async fn execute(&self) -> Result<(), anyhow::Error> { + match self { + L1Migration::Replay(tool) => tool.execute().await, + } + } +} diff --git a/l1-migration/replay/src/da_height.rs b/networks/movement/movement-full-node/src/admin/l1_migration/replay/da_height.rs similarity index 70% rename from l1-migration/replay/src/da_height.rs rename to networks/movement/movement-full-node/src/admin/l1_migration/replay/da_height.rs index edf50f36d..09c84cd20 100644 --- a/l1-migration/replay/src/da_height.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/replay/da_height.rs @@ -1,8 +1,8 @@ -use crate::types::da::DaSequencerDb; +use crate::admin::l1_migration::replay::types::da::DaSequencerDb; use clap::Parser; use std::path::PathBuf; -#[derive(Parser)] +#[derive(Parser, Debug)] #[clap(name = "da-height", about = "Extract synced block height from the DA-sequencer database")] pub struct DaHeight { #[arg(help = "Path to the DA-Sequencer database")] @@ -10,8 +10,8 @@ pub struct DaHeight { } impl DaHeight { - pub fn run(self) -> anyhow::Result<()> { - let db = DaSequencerDb::open(self.path)?; + pub fn run(&self) -> anyhow::Result<()> { + let db = DaSequencerDb::open(&self.path)?; let height = db.get_synced_height()?; println!("{}", height); Ok(()) diff --git a/l1-migration/replay/src/lib.rs b/networks/movement/movement-full-node/src/admin/l1_migration/replay/mod.rs similarity index 69% rename from l1-migration/replay/src/lib.rs rename to networks/movement/movement-full-node/src/admin/l1_migration/replay/mod.rs index f52c99095..6e913b0f9 100644 --- a/l1-migration/replay/src/lib.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/replay/mod.rs @@ -1,12 +1,12 @@ -use crate::da_height::DaHeight; -use crate::replay::DaReplayTransactions; +use crate::admin::l1_migration::replay::da_height::DaHeight; +use crate::admin::l1_migration::replay::replay::DaReplayTransactions; use clap::Parser; mod da_height; mod replay; mod types; -#[derive(Parser)] +#[derive(Parser, Debug)] #[clap(name = "Movement Da-Sequencer replay tool", author, disable_version_flag = true)] pub enum ApiReplayTool { Replay(DaReplayTransactions), @@ -14,7 +14,7 @@ pub enum ApiReplayTool { } impl ApiReplayTool { - pub async fn run(self) -> anyhow::Result<()> { + pub async fn execute(&self) -> anyhow::Result<()> { match self { ApiReplayTool::Replay(cmd) => cmd.run().await, ApiReplayTool::ExtractDaHeight(cmd) => cmd.run(), diff --git a/l1-migration/replay/src/replay.rs b/networks/movement/movement-full-node/src/admin/l1_migration/replay/replay.rs similarity index 89% rename from l1-migration/replay/src/replay.rs rename to networks/movement/movement-full-node/src/admin/l1_migration/replay/replay.rs index 8d754248d..1409714a5 100644 --- a/l1-migration/replay/src/replay.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/replay/replay.rs @@ -1,5 +1,5 @@ -use crate::types::api::AptosRestClient; -use crate::types::da::{DaSequencerClient, DaSequencerDb}; +use crate::admin::l1_migration::replay::types::api::AptosRestClient; +use crate::admin::l1_migration::replay::types::da::{DaSequencerClient, DaSequencerDb}; use anyhow::Context; use aptos_types::transaction::SignedTransaction; use clap::{Args, Parser}; @@ -11,7 +11,7 @@ use tokio::task::JoinSet; use tokio_stream::StreamExt; use tracing::{error, info}; -#[derive(Parser)] +#[derive(Parser, Debug)] #[clap(name = "replay", about = "Stream transactions from DA-sequencer blocks")] pub struct DaReplayTransactions { #[clap(value_parser)] @@ -23,7 +23,7 @@ pub struct DaReplayTransactions { da_sequencer_db: DaBlockHeight, } -#[derive(Args)] +#[derive(Args, Debug)] #[group(required = true, multiple = false)] pub struct DaBlockHeight { #[arg(long = "da-db", help = "Path to the DA-Sequencer database")] @@ -33,10 +33,10 @@ pub struct DaBlockHeight { } impl DaReplayTransactions { - pub async fn run(self) -> anyhow::Result<()> { - let block_height = match (self.da_sequencer_db.height, self.da_sequencer_db.path) { + pub async fn run(&self) -> anyhow::Result<()> { + let block_height = match (self.da_sequencer_db.height, &self.da_sequencer_db.path) { (Some(height), _) => height, - (_, Some(ref path)) => get_da_block_height(path)?, + (_, Some(path)) => get_da_block_height(path)?, _ => unreachable!(), }; let da_sequencer_client = DaSequencerClient::try_connect(&self.da_sequencer_url).await?; diff --git a/l1-migration/replay/src/types.rs b/networks/movement/movement-full-node/src/admin/l1_migration/replay/types.rs similarity index 100% rename from l1-migration/replay/src/types.rs rename to networks/movement/movement-full-node/src/admin/l1_migration/replay/types.rs diff --git a/l1-migration/replay/src/types/api.rs b/networks/movement/movement-full-node/src/admin/l1_migration/replay/types/api.rs similarity index 100% rename from l1-migration/replay/src/types/api.rs rename to networks/movement/movement-full-node/src/admin/l1_migration/replay/types/api.rs diff --git a/l1-migration/replay/src/types/da.rs b/networks/movement/movement-full-node/src/admin/l1_migration/replay/types/da.rs similarity index 63% rename from l1-migration/replay/src/types/da.rs rename to networks/movement/movement-full-node/src/admin/l1_migration/replay/types/da.rs index 8767fb922..391fa2fa6 100644 --- a/l1-migration/replay/src/types/da.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/replay/types/da.rs @@ -1,3 +1,4 @@ +use crate::node::da_db::DaDB; use anyhow::Context; use aptos_types::transaction::SignedTransaction; use futures::Stream; @@ -5,7 +6,6 @@ use futures::TryStreamExt; use movement_da_sequencer_client::DaSequencerClient as _; use movement_da_sequencer_client::{GrpcDaSequencerClient, StreamReadBlockFromHeight}; use movement_da_sequencer_proto::StreamReadFromHeightRequest; -use rocksdb::{ColumnFamilyDescriptor, DB}; use std::cell::Cell; use std::path::Path; use tracing::info; @@ -65,42 +65,15 @@ impl DaSequencerClient { } } -const SYNCED_HEIGHT: &str = "synced_height"; -pub const EXECUTED_BLOCKS: &str = "executed_blocks"; - -pub struct DaSequencerDb(DB); +pub struct DaSequencerDb(DaDB); impl DaSequencerDb { pub fn open(path: impl AsRef) -> anyhow::Result { - let options = rocksdb::Options::default(); - let synced_height = ColumnFamilyDescriptor::new(SYNCED_HEIGHT, rocksdb::Options::default()); - let executed_blocks = - ColumnFamilyDescriptor::new(EXECUTED_BLOCKS, rocksdb::Options::default()); - let db = DB::open_cf_descriptors(&options, path, vec![synced_height, executed_blocks]) - .map_err(|e| anyhow::anyhow!("Failed to open DA-Sequencer DB: {:?}", e))?; - - Ok(Self(db)) + let da_db = DaDB::open(path)?; + Ok(DaSequencerDb(da_db)) } - /// Get the synced height marker stored in the database. pub fn get_synced_height(&self) -> Result { - // This is heavy for this purpose, but progressively the contents of the DA DB will be used for more things - let height = { - let cf = self - .0 - .cf_handle(SYNCED_HEIGHT) - .ok_or(anyhow::anyhow!("No synced_height column family"))?; - let height = self - .0 - .get_cf(&cf, "synced_height") - .map_err(|e| anyhow::anyhow!("Failed to get synced height: {:?}", e))?; - let height = match height { - Some(height) => serde_json::from_slice(&height) - .map_err(|e| anyhow::anyhow!("Failed to deserialize synced height: {:?}", e))?, - None => 0, - }; - Ok::(height) - }?; - Ok(height) + self.0.get_synced_height() } } diff --git a/networks/movement/movement-full-node/src/admin/mod.rs b/networks/movement/movement-full-node/src/admin/mod.rs index 590457527..38b4d6840 100644 --- a/networks/movement/movement-full-node/src/admin/mod.rs +++ b/networks/movement/movement-full-node/src/admin/mod.rs @@ -2,6 +2,7 @@ pub mod bring_up; pub mod config; pub mod framework; pub mod governed_gas_pool; +pub mod l1_migration; pub mod mcr; pub mod ops; pub mod rotate_key; @@ -28,6 +29,8 @@ pub enum Admin { Config(config::Config), #[clap(subcommand)] TestKey(testkey::TestKey), + #[clap(subcommand)] + L1Migration(l1_migration::L1Migration), } impl Admin { @@ -41,6 +44,7 @@ impl Admin { Admin::Framework(framework) => framework.execute().await, Admin::Config(config) => config.execute().await, Admin::TestKey(key) => key.execute().await, + Admin::L1Migration(l1_migration) => l1_migration.execute().await, } } } diff --git a/networks/movement/movement-full-node/src/node/mod.rs b/networks/movement/movement-full-node/src/node/mod.rs index 9f3c2c9f6..cb2e72b0f 100644 --- a/networks/movement/movement-full-node/src/node/mod.rs +++ b/networks/movement/movement-full-node/src/node/mod.rs @@ -1,4 +1,4 @@ -mod da_db; +pub mod da_db; pub mod manager; pub mod partial; mod tasks; From 0ea88d1b6f28899d9b5d37f25d3d24b172a438d3 Mon Sep 17 00:00:00 2001 From: Sebastian Bach Date: Fri, 29 Aug 2025 13:42:51 +0200 Subject: [PATCH 07/13] feat(l1-migration): add validation of transactions to the replay tool --- .../src/admin/l1_migration/mod.rs | 6 +- .../src/admin/l1_migration/replay/compare.rs | 82 +++++++++ .../admin/l1_migration/replay/da_height.rs | 5 +- .../src/admin/l1_migration/replay/mod.rs | 11 +- .../src/admin/l1_migration/replay/replay.rs | 167 +++++++++++++----- .../admin/l1_migration/replay/types/api.rs | 44 ++++- .../src/admin/l1_migration/replay/types/da.rs | 18 +- protocol-units/da-sequencer/client/src/lib.rs | 2 +- 8 files changed, 263 insertions(+), 72 deletions(-) create mode 100644 networks/movement/movement-full-node/src/admin/l1_migration/replay/compare.rs diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/mod.rs b/networks/movement/movement-full-node/src/admin/l1_migration/mod.rs index e8f9cbc25..a490670cd 100644 --- a/networks/movement/movement-full-node/src/admin/l1_migration/mod.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/mod.rs @@ -1,4 +1,4 @@ -use crate::admin::l1_migration::replay::ApiReplayTool; +use crate::admin::l1_migration::replay::ValidationTool; use clap::Subcommand; mod replay; @@ -7,13 +7,13 @@ mod replay; #[clap(rename_all = "kebab-case", about = "Commands for rotating keys")] pub enum L1Migration { #[clap(subcommand)] - Replay(ApiReplayTool), + Validate(ValidationTool), } impl L1Migration { pub async fn execute(&self) -> Result<(), anyhow::Error> { match self { - L1Migration::Replay(tool) => tool.execute().await, + L1Migration::Validate(tool) => tool.execute().await, } } } diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/replay/compare.rs b/networks/movement/movement-full-node/src/admin/l1_migration/replay/compare.rs new file mode 100644 index 000000000..aba5e03f4 --- /dev/null +++ b/networks/movement/movement-full-node/src/admin/l1_migration/replay/compare.rs @@ -0,0 +1,82 @@ +use aptos_api_types::TransactionOnChainData; +use aptos_types::contract_event::{ContractEvent, ContractEventV1, ContractEventV2}; +use tracing::error; + +pub fn compare_transaction_outputs( + movement_txn: TransactionOnChainData, + aptos_txn: TransactionOnChainData, +) -> bool { + let txn_hash = movement_txn.info.transaction_hash().to_hex_literal(); + + if movement_txn.info.transaction_hash() != aptos_txn.info.transaction_hash() { + error!( + "Transaction hash mismatch:\nMovement transaction hash:{}\nAptos transaction hash:{}", + txn_hash, + aptos_txn.info.transaction_hash().to_hex_literal() + ); + return false; + } + + let movement_events = movement_txn.events.iter().map(Into::::into).collect::>(); + let aptos_events = movement_txn.events.iter().map(Into::::into).collect::>(); + if movement_events != aptos_events { + error!( + "Transaction events mismatch ({})\nMovement events:\n{}\nAptos events:\n{}", + txn_hash, + display_events(&movement_txn.events), + display_events(&aptos_txn.events) + ); + return false; + } + + if movement_txn.changes != aptos_txn.changes { + error!("Transaction write-set mismatch ({})", txn_hash); + return false; + } + + true +} + +fn display_events(events: &[ContractEvent]) -> String { + format!("[\n {}\n]", events.iter().map(|e| e.to_string()).collect::>().join(",\n ")) +} + +#[derive(PartialEq)] +enum Event<'a> { + V1(EventV1<'a>), + V2(EventV2<'a>), +} + +impl<'a> From<&'a ContractEvent> for Event<'a> { + fn from(value: &'a ContractEvent) -> Self { + match value { + ContractEvent::V1(e) => Event::V1(EventV1::from(e)), + ContractEvent::V2(e) => Event::V2(EventV2::from(e)), + } + } +} + +struct EventV1<'a>(&'a ContractEventV1); + +impl<'a> From<&'a ContractEventV1> for EventV1<'a> { + fn from(value: &'a ContractEventV1) -> Self { + EventV1(value) + } +} + +impl<'a> PartialEq for EventV1<'a> { + fn eq(&self, other: &Self) -> bool { + self.0.key() == other.0.key() + && self.0.type_tag() == other.0.type_tag() + && self.0.event_data() == other.0.event_data() + } +} + +#[derive(PartialEq)] +struct EventV2<'a>(&'a ContractEventV2); + +impl<'a> From<&'a ContractEventV2> for EventV2<'a> { + fn from(value: &'a ContractEventV2) -> Self { + EventV2(value) + } +} diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/replay/da_height.rs b/networks/movement/movement-full-node/src/admin/l1_migration/replay/da_height.rs index 09c84cd20..a9c62f055 100644 --- a/networks/movement/movement-full-node/src/admin/l1_migration/replay/da_height.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/replay/da_height.rs @@ -1,4 +1,4 @@ -use crate::admin::l1_migration::replay::types::da::DaSequencerDb; +use crate::admin::l1_migration::replay::types::da::get_da_block_height; use clap::Parser; use std::path::PathBuf; @@ -11,8 +11,7 @@ pub struct DaHeight { impl DaHeight { pub fn run(&self) -> anyhow::Result<()> { - let db = DaSequencerDb::open(&self.path)?; - let height = db.get_synced_height()?; + let height = get_da_block_height(&self.path)?; println!("{}", height); Ok(()) } diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/replay/mod.rs b/networks/movement/movement-full-node/src/admin/l1_migration/replay/mod.rs index 6e913b0f9..a85bf19ec 100644 --- a/networks/movement/movement-full-node/src/admin/l1_migration/replay/mod.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/replay/mod.rs @@ -2,22 +2,23 @@ use crate::admin::l1_migration::replay::da_height::DaHeight; use crate::admin::l1_migration::replay::replay::DaReplayTransactions; use clap::Parser; +mod compare; mod da_height; mod replay; mod types; #[derive(Parser, Debug)] #[clap(name = "Movement Da-Sequencer replay tool", author, disable_version_flag = true)] -pub enum ApiReplayTool { +pub enum ValidationTool { Replay(DaReplayTransactions), ExtractDaHeight(DaHeight), } -impl ApiReplayTool { +impl ValidationTool { pub async fn execute(&self) -> anyhow::Result<()> { match self { - ApiReplayTool::Replay(cmd) => cmd.run().await, - ApiReplayTool::ExtractDaHeight(cmd) => cmd.run(), + ValidationTool::Replay(cmd) => cmd.run().await, + ValidationTool::ExtractDaHeight(cmd) => cmd.run(), } } } @@ -25,5 +26,5 @@ impl ApiReplayTool { #[test] fn verify_tool() { use clap::CommandFactory; - ApiReplayTool::command().debug_assert() + ValidationTool::command().debug_assert() } diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/replay/replay.rs b/networks/movement/movement-full-node/src/admin/l1_migration/replay/replay.rs index 1409714a5..b7019b57b 100644 --- a/networks/movement/movement-full-node/src/admin/l1_migration/replay/replay.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/replay/replay.rs @@ -1,21 +1,25 @@ -use crate::admin::l1_migration::replay::types::api::AptosRestClient; -use crate::admin::l1_migration::replay::types::da::{DaSequencerClient, DaSequencerDb}; +use crate::admin::l1_migration::replay::compare::compare_transaction_outputs; +use crate::admin::l1_migration::replay::types::api::{AptosRestClient, MovementRestClient}; +use crate::admin::l1_migration::replay::types::da::{get_da_block_height, DaSequencerClient}; use anyhow::Context; +use aptos_crypto::HashValue; use aptos_types::transaction::SignedTransaction; use clap::{Args, Parser}; -use futures::pin_mut; +use std::collections::HashSet; use std::path::PathBuf; -use std::time::Duration; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; use tokio::task::JoinSet; use tokio_stream::StreamExt; -use tracing::{error, info}; +use tracing::{debug, error, info, warn}; #[derive(Parser, Debug)] #[clap(name = "replay", about = "Stream transactions from DA-sequencer blocks")] pub struct DaReplayTransactions { #[clap(value_parser)] - #[clap(long = "api", help = "The url of the Aptos validator node api endpoint")] + #[clap(long = "movement-api", help = "The url of the Movement full node endpoint")] + pub movement_api_url: Option, + #[clap(long = "aptos-api", help = "The url of the Aptos validator node api endpoint")] pub aptos_api_url: String, #[clap(long = "da", help = "The url of the DA-Sequencer")] pub da_sequencer_url: String, @@ -39,9 +43,36 @@ impl DaReplayTransactions { (_, Some(path)) => get_da_block_height(path)?, _ => unreachable!(), }; + let (tx_batches, rx_batches) = mpsc::channel::>(10); + let mut tasks = JoinSet::new(); let da_sequencer_client = DaSequencerClient::try_connect(&self.da_sequencer_url).await?; - let rest_client = AptosRestClient::new(&self.aptos_api_url)?; - stream_transactions(rest_client, da_sequencer_client, block_height + 1).await + let aptos_rest_client = AptosRestClient::try_connect(&self.aptos_api_url).await?; + + // Spawn a task which compares transaction outputs from the Movement node and Aptos node + let tx_hashes = if let Some(ref movement_api_url) = self.movement_api_url { + let movement_rest_client = MovementRestClient::try_connect(movement_api_url).await?; + let (tx_hashes, rx_hashes) = mpsc::unbounded_channel::(); + tasks.spawn(validate_transactions( + aptos_rest_client.clone(), + movement_rest_client, + rx_hashes, + )); + Some(tx_hashes) + } else { + None + }; + + // Spawn a task which submits transaction batches to the validator node + tasks.spawn(submit_transactions(aptos_rest_client, rx_batches, tx_hashes)); + // Spawn a task which fetches transaction batches ahead + tasks.spawn(stream_transactions(da_sequencer_client, tx_batches, block_height)); + + // If one of the tasks has finished then something went wrong + tasks.join_next().await; + tasks.shutdown().await; + + error!("Broken stream"); + Err(anyhow::anyhow!("Broken stream")) } } @@ -51,26 +82,15 @@ fn verify_tool() { DaReplayTransactions::command().debug_assert() } -fn get_da_block_height(path_buf: &PathBuf) -> Result { - let db = DaSequencerDb::open(path_buf)?; - db.get_synced_height() -} - async fn stream_transactions( - rest_client: AptosRestClient, da_sequencer_client: DaSequencerClient, + tx_batches: mpsc::Sender>, block_height: u64, -) -> anyhow::Result<()> { - let stream = da_sequencer_client - .stream_transactions_from_height(block_height) - .await? - .chunks_timeout(10, Duration::from_secs(1)); - let (tx, mut rx) = mpsc::channel::>(10); - let mut tasks = JoinSet::new(); +) { + if let Ok(stream) = da_sequencer_client.stream_transactions_from_height(block_height).await { + let stream = stream.chunks_timeout(10, Duration::from_secs(1)); - // Spawn a task which fetches transaction batches ahead - tasks.spawn(async move { - pin_mut!(stream); + futures::pin_mut!(stream); while let Some(txns) = stream.next().await { let txns = txns .into_iter() @@ -79,7 +99,7 @@ async fn stream_transactions( match txns { Ok(txns) => { - if let Err(_) = tx.send(txns).await { + if tx_batches.send(txns).await.is_err() { // channel is closed break; } @@ -90,32 +110,87 @@ async fn stream_transactions( } } } - }); + warn!("Stream of transaction from the DA-Sequencer ended unexpectedly"); + } else { + error!("Failed to stream transactions from DA-Sequencer blocks") + } +} - // Spawn a task which submits transaction batches to the validator node - tasks.spawn(async move { - while let Some(txns) = rx.recv().await { - match rest_client.submit_batch_bcs(&txns).await { - Ok(result) => { - info!("Submitted {} Aptos transaction(s)", txns.len()); - for failure in result.into_inner().transaction_failures { - let txn = &txns[failure.transaction_index]; - let hash = txn.committed_hash().to_hex_literal(); - error!("Failed to submit Aptos transaction {}: {}", hash, failure.error); - } +async fn submit_transactions( + aptos_rest_client: AptosRestClient, + mut rx_batches: mpsc::Receiver>, + tx_hashes: Option>, +) { + while let Some(txns) = rx_batches.recv().await { + match aptos_rest_client.submit_batch_bcs(&txns).await { + Ok(result) => { + debug!("Submitted {} Aptos transaction(s)", txns.len()); + let mut failed_txns = HashSet::new(); + for failure in result.into_inner().transaction_failures { + failed_txns.insert(failure.transaction_index); + let txn = &txns[failure.transaction_index]; + let hash = txn.committed_hash().to_hex_literal(); + error!("Failed to submit Aptos transaction {}: {}", hash, failure.error); } - Err(e) => { - error!("Failed to submit {} transaction(s): {}", txns.len(), e); - break; + + if let Some(ref tx_hashes) = tx_hashes { + if txns + .iter() + .enumerate() + .filter_map(|item| match item { + (idx, _) if failed_txns.contains(&idx) => None, + (_, txn) => Some(txn.committed_hash()), + }) + .try_for_each(|hash| tx_hashes.send(hash)) + .is_err() + { + // channel is closed + break; + } } } + Err(e) => { + error!("Failed to submit {} transaction(s): {}", txns.len(), e); + break; + } } - }); + } + warn!("Stream of transaction batches ended unexpectedly"); +} - // If one of the tasks has finished then something went wrong - tasks.join_next().await; - tasks.shutdown().await; +async fn validate_transactions( + aptos_rest_client: AptosRestClient, + movement_rest_client: MovementRestClient, + mut rx_hashes: mpsc::UnboundedReceiver, +) { + while let Some(hash) = rx_hashes.recv().await { + let hash_str = hash.to_hex_literal(); + let timeout = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() + 60; + let result = tokio::join!( + movement_rest_client.wait_for_transaction_by_hash_bcs(hash, timeout, None, None), + aptos_rest_client.wait_for_transaction_by_hash_bcs(hash, timeout, None, None) + ); - error!("Broken DA stream"); - Err(anyhow::anyhow!("Broken DA stream")) + match result { + (Ok(txn_movement), Ok(txn_aptos)) => { + if compare_transaction_outputs(txn_movement.into_inner(), txn_aptos.into_inner()) { + info!("Validated transaction {}", hash_str); + } + } + (Ok(_), Err(error_aptos)) => { + error!( + "The execution of the transaction {} failed on Aptos: {}", + hash_str, error_aptos + ) + } + (Err(error_movement), Ok(_)) => error!( + "The execution of the transaction {} failed on Movement but succeeded on Aptos: {}", + hash_str, error_movement + ), + _ => { + // ignore if the execution failed on both sides??? + } + } + } + warn!("Stream of transaction hashes ended unexpectedly"); } diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/replay/types/api.rs b/networks/movement/movement-full-node/src/admin/l1_migration/replay/types/api.rs index f31b0ad93..66e96b5c0 100644 --- a/networks/movement/movement-full-node/src/admin/l1_migration/replay/types/api.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/replay/types/api.rs @@ -1,15 +1,15 @@ +use anyhow::Context; use aptos_rest_client::Client; use std::ops::Deref; +use std::sync::Arc; -pub struct AptosRestClient(Client); +#[derive(Clone)] +pub struct AptosRestClient(Arc); impl AptosRestClient { - pub fn new(url: &str) -> Result { - let client = Client::new( - url.parse() - .map_err(|e| anyhow::anyhow!("Failed to parse Aptos rest api url: {}", e))?, - ); - Ok(Self(client)) + pub async fn try_connect(url: &str) -> Result { + let client = try_connect("Aptos", url).await?; + Ok(Self(Arc::new(client))) } } @@ -20,3 +20,33 @@ impl Deref for AptosRestClient { &self.0 } } + +#[derive(Clone)] +pub struct MovementRestClient(Arc); + +impl MovementRestClient { + pub async fn try_connect(url: &str) -> Result { + let client = try_connect("Movement", url).await?; + Ok(Self(Arc::new(client))) + } +} + +impl Deref for MovementRestClient { + type Target = Client; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +async fn try_connect(name: &str, url: &str) -> Result { + let client = Client::new( + url.parse() + .map_err(|e| anyhow::anyhow!("Failed to parse {} rest api url: {}", name, e))?, + ); + client + .get_index_bcs() + .await + .context(format!("{} rest api unreachable.", name))?; + Ok(client) +} diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/replay/types/da.rs b/networks/movement/movement-full-node/src/admin/l1_migration/replay/types/da.rs index 391fa2fa6..b543bd36b 100644 --- a/networks/movement/movement-full-node/src/admin/l1_migration/replay/types/da.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/replay/types/da.rs @@ -6,11 +6,11 @@ use futures::TryStreamExt; use movement_da_sequencer_client::DaSequencerClient as _; use movement_da_sequencer_client::{GrpcDaSequencerClient, StreamReadBlockFromHeight}; use movement_da_sequencer_proto::StreamReadFromHeightRequest; -use std::cell::Cell; use std::path::Path; -use tracing::info; +use tokio::sync::RwLock; +use tracing::debug; -pub struct DaSequencerClient(Cell>); +pub struct DaSequencerClient(RwLock); impl DaSequencerClient { pub async fn try_connect(url: &str) -> Result { @@ -20,17 +20,16 @@ impl DaSequencerClient { 10, ) .await?; - Ok(Self(Cell::new(Some(client)))) + Ok(Self(RwLock::new(client))) } pub async fn stream_blocks_from_height( &self, block_height: u64, ) -> Result { - let Some(mut client) = self.0.take() else { unreachable!() }; + let mut client = self.0.write().await; let request = StreamReadFromHeightRequest { height: block_height }; let result = client.stream_read_from_height(request).await; - self.0.set(Some(client)); let (blocks, _) = result?; Ok(blocks) } @@ -51,7 +50,7 @@ impl DaSequencerClient { .context("Failed to deserialize Movement block")?; let txns = block.transactions(); - info!("processing block at DA height {} with {} transaction(s)", da_block.height, txns.len()); + debug!("processing block at DA height {} with {} transaction(s)", da_block.height, txns.len()); for txn in txns { let aptos_transaction = bcs::from_bytes::<'_, SignedTransaction>(txn.data()) @@ -77,3 +76,8 @@ impl DaSequencerDb { self.0.get_synced_height() } } + +pub fn get_da_block_height(path: impl AsRef) -> Result { + let db = DaSequencerDb::open(path)?; + db.get_synced_height() +} diff --git a/protocol-units/da-sequencer/client/src/lib.rs b/protocol-units/da-sequencer/client/src/lib.rs index 49ff1596a..4e1b2c8ba 100644 --- a/protocol-units/da-sequencer/client/src/lib.rs +++ b/protocol-units/da-sequencer/client/src/lib.rs @@ -163,7 +163,7 @@ impl DaSequencerClient for GrpcDaSequencerClient { match block_response.response { Some(response) => match response.block_type { Some(block_response::BlockType::Heartbeat(_)) => { - tracing::info!("Received heartbeat"); + tracing::debug!("Received heartbeat"); *last_heartbeat_time.lock().await = Instant::now(); } Some(block_response::BlockType::BlockV1(block)) => { From e3e7f2ad868d33e49253cdc927da82e7c2fd4b3e Mon Sep 17 00:00:00 2001 From: Sebastian Bach Date: Mon, 1 Sep 2025 16:23:41 +0200 Subject: [PATCH 08/13] feat(l1-migration): add diff output to the transaction comparison --- Cargo.lock | 113 ++++++++++++- Cargo.toml | 2 + .../movement/movement-full-node/Cargo.toml | 2 + .../src/admin/l1_migration/replay/compare.rs | 149 ++++++++++++------ .../src/admin/l1_migration/replay/display.rs | 44 ++++++ .../src/admin/l1_migration/replay/mod.rs | 4 + .../src/admin/l1_migration/replay/replay.rs | 19 ++- 7 files changed, 275 insertions(+), 58 deletions(-) create mode 100644 networks/movement/movement-full-node/src/admin/l1_migration/replay/display.rs diff --git a/Cargo.lock b/Cargo.lock index 0130c881d..8106654b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8229,13 +8229,26 @@ version = "0.15.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e1f83fc076bd6dd27517eacdf25fef6c4dfe5f1d7448bafaaf3a26f13b5e4eb" dependencies = [ - "encode_unicode", + "encode_unicode 0.3.6", "lazy_static", "libc", "unicode-width 0.1.14", "windows-sys 0.52.0", ] +[[package]] +name = "console" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e09ced7ebbccb63b4c65413d821f2e00ce54c5ca4514ddc6b3c892fdbcbc69d" +dependencies = [ + "encode_unicode 1.0.0", + "libc", + "once_cell", + "unicode-width 0.2.0", + "windows-sys 0.60.2", +] + [[package]] name = "console-api" version = "0.7.0" @@ -9583,6 +9596,12 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" +[[package]] +name = "encode_unicode" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0" + [[package]] name = "encoding_rs" version = "0.8.35" @@ -11427,7 +11446,7 @@ version = "0.17.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cbf675b85ed934d3c67b5c5469701eec7db22689d0a2139d856e0925fa28b281" dependencies = [ - "console", + "console 0.15.8", "number_prefix", "portable-atomic", "unicode-width 0.2.0", @@ -14834,6 +14853,7 @@ dependencies = [ "async-stream", "bcs 0.1.6 (git+https://github.com/movementlabsxyz/bcs.git?rev=bc16d2d39cabafaabd76173dd1b04b2aa170cf0c)", "clap 4.5.21", + "console 0.16.0", "console-subscriber", "dot-movement", "ed25519-dalek 2.1.1", @@ -14866,6 +14886,7 @@ dependencies = [ "rocksdb", "serde_json", "sha2 0.10.8", + "similar", "syncador", "syncup", "tokio", @@ -19060,9 +19081,9 @@ checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe" [[package]] name = "similar" -version = "2.6.0" +version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1de1d4f81173b03af4c0cbed3c898f6bff5b870e4a7f5d6f4057d62a7a4b686e" +checksum = "bbbb5d9659141646ae647b42fe094daf6c6192d1620870b449d9557f748b2daa" dependencies = [ "bstr", "unicode-segmentation", @@ -19074,7 +19095,7 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cfe85670573cd6f0fa97940f26e7e6601213c3b0555246c24234131f88c5709e" dependencies = [ - "console", + "console 0.15.8", "similar", ] @@ -21358,6 +21379,12 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-link" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" + [[package]] name = "windows-registry" version = "0.2.0" @@ -21424,6 +21451,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" +dependencies = [ + "windows-targets 0.53.3", +] + [[package]] name = "windows-targets" version = "0.42.2" @@ -21463,13 +21499,30 @@ dependencies = [ "windows_aarch64_gnullvm 0.52.6", "windows_aarch64_msvc 0.52.6", "windows_i686_gnu 0.52.6", - "windows_i686_gnullvm", + "windows_i686_gnullvm 0.52.6", "windows_i686_msvc 0.52.6", "windows_x86_64_gnu 0.52.6", "windows_x86_64_gnullvm 0.52.6", "windows_x86_64_msvc 0.52.6", ] +[[package]] +name = "windows-targets" +version = "0.53.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5fe6031c4041849d7c496a8ded650796e7b6ecc19df1a431c1a363342e5dc91" +dependencies = [ + "windows-link", + "windows_aarch64_gnullvm 0.53.0", + "windows_aarch64_msvc 0.53.0", + "windows_i686_gnu 0.53.0", + "windows_i686_gnullvm 0.53.0", + "windows_i686_msvc 0.53.0", + "windows_x86_64_gnu 0.53.0", + "windows_x86_64_gnullvm 0.53.0", + "windows_x86_64_msvc 0.53.0", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.42.2" @@ -21488,6 +21541,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86b8d5f90ddd19cb4a147a5fa63ca848db3df085e25fee3cc10b39b6eebae764" + [[package]] name = "windows_aarch64_msvc" version = "0.42.2" @@ -21506,6 +21565,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7651a1f62a11b8cbd5e0d42526e55f2c99886c77e007179efff86c2b137e66c" + [[package]] name = "windows_i686_gnu" version = "0.42.2" @@ -21524,12 +21589,24 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" +[[package]] +name = "windows_i686_gnu" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1dc67659d35f387f5f6c479dc4e28f1d4bb90ddd1a5d3da2e5d97b42d6272c3" + [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ce6ccbdedbf6d6354471319e781c0dfef054c81fbc7cf83f338a4296c0cae11" + [[package]] name = "windows_i686_msvc" version = "0.42.2" @@ -21548,6 +21625,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_i686_msvc" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "581fee95406bb13382d2f65cd4a908ca7b1e4c2f1917f143ba16efe98a589b5d" + [[package]] name = "windows_x86_64_gnu" version = "0.42.2" @@ -21566,6 +21649,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e55b5ac9ea33f2fc1716d1742db15574fd6fc8dadc51caab1c16a3d3b4190ba" + [[package]] name = "windows_x86_64_gnullvm" version = "0.42.2" @@ -21584,6 +21673,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a6e035dd0599267ce1ee132e51c27dd29437f63325753051e71dd9e42406c57" + [[package]] name = "windows_x86_64_msvc" version = "0.42.2" @@ -21602,6 +21697,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" + [[package]] name = "winnow" version = "0.5.40" diff --git a/Cargo.toml b/Cargo.toml index b8199ffa2..e9b5b72b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -367,6 +367,8 @@ tower = { version = "0.5" } http-body-util = "0.1" tap = "1.0.1" prometheus = "0.14.0" +console = "0.16.0" +similar = "2.7.0" # trying to pin diesel diesel = { version = "2.2.7", features = ["postgres", "numeric", "r2d2"] } diff --git a/networks/movement/movement-full-node/Cargo.toml b/networks/movement/movement-full-node/Cargo.toml index 90585bed6..adc75a8ef 100644 --- a/networks/movement/movement-full-node/Cargo.toml +++ b/networks/movement/movement-full-node/Cargo.toml @@ -65,6 +65,8 @@ movement-da-sequencer-config = { workspace = true } ed25519-dalek = { workspace = true } mcr-settlement-setup = { workspace = true } async-stream = { workspace = true } +console = { workspace = true } +similar = { workspace = true } url = { workspace = true } diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/replay/compare.rs b/networks/movement/movement-full-node/src/admin/l1_migration/replay/compare.rs index aba5e03f4..a01fed7f0 100644 --- a/networks/movement/movement-full-node/src/admin/l1_migration/replay/compare.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/replay/compare.rs @@ -1,82 +1,135 @@ -use aptos_api_types::TransactionOnChainData; -use aptos_types::contract_event::{ContractEvent, ContractEventV1, ContractEventV2}; +use anyhow::Context; +use aptos_api_types::transaction::UserTransaction; +use aptos_api_types::Event; use tracing::error; pub fn compare_transaction_outputs( - movement_txn: TransactionOnChainData, - aptos_txn: TransactionOnChainData, -) -> bool { - let txn_hash = movement_txn.info.transaction_hash().to_hex_literal(); + movement_txn: UserTransaction, + aptos_txn: UserTransaction, +) -> anyhow::Result { + let txn_hash = movement_txn.info.hash.0.to_hex_literal(); - if movement_txn.info.transaction_hash() != aptos_txn.info.transaction_hash() { + if movement_txn.info.hash != aptos_txn.info.hash { error!( "Transaction hash mismatch:\nMovement transaction hash:{}\nAptos transaction hash:{}", txn_hash, - aptos_txn.info.transaction_hash().to_hex_literal() + aptos_txn.info.hash.0.to_hex_literal() ); - return false; + return Ok(false); } - let movement_events = movement_txn.events.iter().map(Into::::into).collect::>(); - let aptos_events = movement_txn.events.iter().map(Into::::into).collect::>(); + let movement_events = + movement_txn.events.iter().map(Into::::into).collect::>(); + let aptos_events = aptos_txn.events.iter().map(Into::::into).collect::>(); if movement_events != aptos_events { + let movement_values = movement_events + .iter() + .map(|event| event.to_json()) + .collect::, _>>() + .context("Failed to serialize Movement events to json")?; + let aptos_values = aptos_events + .iter() + .map(|event| event.to_json()) + .collect::, _>>() + .context("Failed to serialize Aptes events to json")?; error!( - "Transaction events mismatch ({})\nMovement events:\n{}\nAptos events:\n{}", + "Transaction events mismatch ({})\n{}", txn_hash, - display_events(&movement_txn.events), - display_events(&aptos_txn.events) + display_diff(movement_values, aptos_values)? ); - return false; + return Ok(false); } - if movement_txn.changes != aptos_txn.changes { - error!("Transaction write-set mismatch ({})", txn_hash); - return false; + if movement_txn.info.changes != aptos_txn.info.changes { + let movement_values = movement_txn + .info + .changes + .iter() + .map(|change| serde_json::to_value(change)) + .collect::, _>>() + .context("Failed to serialize Movement write-set changes to json")?; + let aptos_values = aptos_txn + .info + .changes + .iter() + .map(|change| serde_json::to_value(change)) + .collect::, _>>() + .context("Failed to serialize Aptos write-set changes to json")?; + error!( + "Transaction write-set mismatch ({})\n{}", + txn_hash, + display_diff(movement_values, aptos_values)? + ); + return Ok(false); } - true + Ok(true) } -fn display_events(events: &[ContractEvent]) -> String { - format!("[\n {}\n]", events.iter().map(|e| e.to_string()).collect::>().join(",\n ")) -} +struct EventCompare<'a>(&'a Event); -#[derive(PartialEq)] -enum Event<'a> { - V1(EventV1<'a>), - V2(EventV2<'a>), +impl<'a> EventCompare<'a> { + pub fn to_json(&self) -> anyhow::Result { + let mut event = serde_json::Map::with_capacity(4); + event.insert("sequence_number".to_owned(), serde_json::to_value(&self.0.sequence_number)?); + event.insert("type".to_owned(), serde_json::to_value(&self.0.typ)?); + event.insert("data".to_owned(), self.0.data.to_owned()); + Ok(serde_json::Value::Object(event)) + } } -impl<'a> From<&'a ContractEvent> for Event<'a> { - fn from(value: &'a ContractEvent) -> Self { - match value { - ContractEvent::V1(e) => Event::V1(EventV1::from(e)), - ContractEvent::V2(e) => Event::V2(EventV2::from(e)), - } +impl<'a> PartialEq for EventCompare<'a> { + fn eq(&self, other: &Self) -> bool { + self.0.typ == other.0.typ && self.0.data == other.0.data } } -struct EventV1<'a>(&'a ContractEventV1); - -impl<'a> From<&'a ContractEventV1> for EventV1<'a> { - fn from(value: &'a ContractEventV1) -> Self { - EventV1(value) +impl<'a> From<&'a Event> for EventCompare<'a> { + fn from(value: &'a Event) -> Self { + Self(value) } } -impl<'a> PartialEq for EventV1<'a> { - fn eq(&self, other: &Self) -> bool { - self.0.key() == other.0.key() - && self.0.type_tag() == other.0.type_tag() - && self.0.event_data() == other.0.event_data() - } +fn display_diff( + movement_values: Vec, + aptos_values: Vec, +) -> anyhow::Result { + let movement_json = serde_json::to_string_pretty(&serde_json::Value::Array(movement_values))?; + let aptos_json = serde_json::to_string_pretty(&serde_json::Value::Array(aptos_values))?; + Ok(create_diff(&movement_json, &aptos_json)?) } -#[derive(PartialEq)] -struct EventV2<'a>(&'a ContractEventV2); +fn create_diff(movement: &str, aptos: &str) -> anyhow::Result { + use console::Style; + use similar::{ChangeTag, TextDiff}; + use std::fmt::Write; + + let mut out = String::with_capacity(movement.len() + aptos.len()); + let diff = TextDiff::from_lines(movement, aptos); + let hunks = diff + .grouped_ops(3) + .into_iter() + .filter(|ops| !ops.is_empty()) + .collect::>(); + let last_hunk_idx = hunks.len() - 1; -impl<'a> From<&'a ContractEventV2> for EventV2<'a> { - fn from(value: &'a ContractEventV2) -> Self { - EventV2(value) + writeln!(out, "--- Movement full-node")?; + writeln!(out, "+++ Aptos validator-node")?; + for (idx, hunk) in hunks.iter().enumerate() { + for op in hunk.iter() { + for change in diff.iter_changes(op) { + let (sign, style) = match change.tag() { + ChangeTag::Delete => ("-", Style::new().red()), + ChangeTag::Insert => ("+", Style::new().green()), + ChangeTag::Equal => (" ", Style::new()), + }; + write!(out, "{}{}", style.apply_to(sign).bold(), style.apply_to(change))?; + } + } + if idx < last_hunk_idx { + writeln!(out, "===")?; + } } + + Ok(out) } diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/replay/display.rs b/networks/movement/movement-full-node/src/admin/l1_migration/replay/display.rs new file mode 100644 index 000000000..d69b7c704 --- /dev/null +++ b/networks/movement/movement-full-node/src/admin/l1_migration/replay/display.rs @@ -0,0 +1,44 @@ +use crate::admin::l1_migration::replay::types::api::AptosRestClient; +use aptos_api_types::Transaction; +use aptos_crypto::HashValue; +use clap::Parser; + +#[derive(Parser, Debug)] +#[clap(name = "da-height", about = "Extract synced block height from the DA-sequencer database")] +pub struct DisplayTransactionOutputs { + #[clap(long = "api", help = "The url of an Aptos api endpoint")] + pub api_url: String, + #[arg(help = "Transaction hash")] + hash: String, +} + +impl DisplayTransactionOutputs { + pub async fn run(&self) -> anyhow::Result<()> { + let hash = HashValue::from_hex(self.hash.trim_start_matches("0x"))?; + let aptos_rest_client = AptosRestClient::try_connect(&self.api_url).await?; + display_txn_outputs(aptos_rest_client, hash).await?; + Ok(()) + } +} + +#[test] +fn verify_tool() { + use clap::CommandFactory; + DisplayTransactionOutputs::command().debug_assert() +} + +async fn display_txn_outputs( + aptos_rest_client: AptosRestClient, + hash: HashValue, +) -> anyhow::Result<()> { + let txn = aptos_rest_client.get_transaction_by_hash(hash).await?.into_inner(); + + if let Transaction::UserTransaction(txn) = txn { + println!("Events:\n{}", serde_json::to_string_pretty(&txn.events)?); + println!("Write-Set:\n{}", serde_json::to_string_pretty(&txn.info.changes)?); + } else { + println!("Transaction is pending"); + } + + Ok(()) +} diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/replay/mod.rs b/networks/movement/movement-full-node/src/admin/l1_migration/replay/mod.rs index a85bf19ec..2184c5859 100644 --- a/networks/movement/movement-full-node/src/admin/l1_migration/replay/mod.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/replay/mod.rs @@ -1,9 +1,11 @@ use crate::admin::l1_migration::replay::da_height::DaHeight; +use crate::admin::l1_migration::replay::display::DisplayTransactionOutputs; use crate::admin::l1_migration::replay::replay::DaReplayTransactions; use clap::Parser; mod compare; mod da_height; +mod display; mod replay; mod types; @@ -12,6 +14,7 @@ mod types; pub enum ValidationTool { Replay(DaReplayTransactions), ExtractDaHeight(DaHeight), + Display(DisplayTransactionOutputs), } impl ValidationTool { @@ -19,6 +22,7 @@ impl ValidationTool { match self { ValidationTool::Replay(cmd) => cmd.run().await, ValidationTool::ExtractDaHeight(cmd) => cmd.run(), + ValidationTool::Display(cmd) => cmd.run().await, } } } diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/replay/replay.rs b/networks/movement/movement-full-node/src/admin/l1_migration/replay/replay.rs index b7019b57b..a5fa4c7bc 100644 --- a/networks/movement/movement-full-node/src/admin/l1_migration/replay/replay.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/replay/replay.rs @@ -163,18 +163,29 @@ async fn validate_transactions( movement_rest_client: MovementRestClient, mut rx_hashes: mpsc::UnboundedReceiver, ) { + use aptos_api_types::transaction::Transaction; + while let Some(hash) = rx_hashes.recv().await { let hash_str = hash.to_hex_literal(); let timeout = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() + 60; let result = tokio::join!( - movement_rest_client.wait_for_transaction_by_hash_bcs(hash, timeout, None, None), - aptos_rest_client.wait_for_transaction_by_hash_bcs(hash, timeout, None, None) + movement_rest_client.wait_for_transaction_by_hash(hash, timeout, None, None), + aptos_rest_client.wait_for_transaction_by_hash(hash, timeout, None, None) ); match result { (Ok(txn_movement), Ok(txn_aptos)) => { - if compare_transaction_outputs(txn_movement.into_inner(), txn_aptos.into_inner()) { - info!("Validated transaction {}", hash_str); + let Transaction::UserTransaction(txn_movement) = txn_movement.into_inner() else { + unreachable!() + }; + let Transaction::UserTransaction(txn_aptos) = txn_aptos.into_inner() else { + unreachable!() + }; + + match compare_transaction_outputs(*txn_movement, *txn_aptos) { + Ok(valid) if valid => info!("Validated transaction {}", hash_str), + Ok(_) => {} // invalid, errors logged elsewhere + Err(e) => error!("Failed to validate transaction {}: {}", hash_str, e), } } (Ok(_), Err(error_aptos)) => { From 3e4bb3e24c7cc937f3d4725b03602b8f9792ad68 Mon Sep 17 00:00:00 2001 From: Sebastian Bach Date: Mon, 1 Sep 2025 16:55:23 +0200 Subject: [PATCH 09/13] feat(l1-migration): exclude the event sequence number from json serialization --- .../movement-full-node/src/admin/l1_migration/replay/compare.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/replay/compare.rs b/networks/movement/movement-full-node/src/admin/l1_migration/replay/compare.rs index a01fed7f0..e04bcd549 100644 --- a/networks/movement/movement-full-node/src/admin/l1_migration/replay/compare.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/replay/compare.rs @@ -71,7 +71,6 @@ struct EventCompare<'a>(&'a Event); impl<'a> EventCompare<'a> { pub fn to_json(&self) -> anyhow::Result { let mut event = serde_json::Map::with_capacity(4); - event.insert("sequence_number".to_owned(), serde_json::to_value(&self.0.sequence_number)?); event.insert("type".to_owned(), serde_json::to_value(&self.0.typ)?); event.insert("data".to_owned(), self.0.data.to_owned()); Ok(serde_json::Value::Object(event)) From 3dfb5d0edb127ae193ea1fdc31b8e464d2c4b8e0 Mon Sep 17 00:00:00 2001 From: Sebastian Bach Date: Tue, 2 Sep 2025 13:44:55 +0200 Subject: [PATCH 10/13] feat(l1-migration): modify comparison in replay validation --- Cargo.lock | 1 + .../movement/movement-full-node/Cargo.toml | 1 + .../src/admin/l1_migration/replay/compare.rs | 161 +++++++++++------- .../src/admin/l1_migration/replay/replay.rs | 6 +- 4 files changed, 102 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8106654b5..34865ed09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14884,6 +14884,7 @@ dependencies = [ "movement-types", "prost 0.13.5", "rocksdb", + "serde", "serde_json", "sha2 0.10.8", "similar", diff --git a/networks/movement/movement-full-node/Cargo.toml b/networks/movement/movement-full-node/Cargo.toml index adc75a8ef..1a39faf70 100644 --- a/networks/movement/movement-full-node/Cargo.toml +++ b/networks/movement/movement-full-node/Cargo.toml @@ -18,6 +18,7 @@ movement-da-util = { workspace = true } maptos-execution-util = { workspace = true } mcr-settlement-client = { workspace = true, features = ["eth"] } mcr-settlement-manager = { workspace = true } +serde = { workspace = true } serde_json = { workspace = true } anyhow = { workspace = true } futures = { workspace = true } diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/replay/compare.rs b/networks/movement/movement-full-node/src/admin/l1_migration/replay/compare.rs index e04bcd549..2aaedc9f8 100644 --- a/networks/movement/movement-full-node/src/admin/l1_migration/replay/compare.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/replay/compare.rs @@ -1,11 +1,11 @@ -use anyhow::Context; use aptos_api_types::transaction::UserTransaction; -use aptos_api_types::Event; +use aptos_api_types::{Event, WriteSetChange}; use tracing::error; pub fn compare_transaction_outputs( movement_txn: UserTransaction, aptos_txn: UserTransaction, + show_diff: bool, ) -> anyhow::Result { let txn_hash = movement_txn.info.hash.0.to_hex_literal(); @@ -22,79 +22,52 @@ pub fn compare_transaction_outputs( movement_txn.events.iter().map(Into::::into).collect::>(); let aptos_events = aptos_txn.events.iter().map(Into::::into).collect::>(); if movement_events != aptos_events { - let movement_values = movement_events - .iter() - .map(|event| event.to_json()) - .collect::, _>>() - .context("Failed to serialize Movement events to json")?; - let aptos_values = aptos_events - .iter() - .map(|event| event.to_json()) - .collect::, _>>() - .context("Failed to serialize Aptes events to json")?; - error!( - "Transaction events mismatch ({})\n{}", - txn_hash, - display_diff(movement_values, aptos_values)? - ); + if show_diff { + error!( + "Transaction events mismatch ({})\n{}", + txn_hash, + display_diff(&movement_txn.events, &aptos_txn.events)? + ); + } else { + error!("Transaction events mismatch ({})", txn_hash,); + } return Ok(false); } - if movement_txn.info.changes != aptos_txn.info.changes { - let movement_values = movement_txn - .info - .changes - .iter() - .map(|change| serde_json::to_value(change)) - .collect::, _>>() - .context("Failed to serialize Movement write-set changes to json")?; - let aptos_values = aptos_txn - .info - .changes - .iter() - .map(|change| serde_json::to_value(change)) - .collect::, _>>() - .context("Failed to serialize Aptos write-set changes to json")?; - error!( - "Transaction write-set mismatch ({})\n{}", - txn_hash, - display_diff(movement_values, aptos_values)? - ); + let movement_changes = movement_txn + .info + .changes + .iter() + .map(Into::::into) + .collect::>(); + let aptos_changes = aptos_txn + .info + .changes + .iter() + .map(Into::::into) + .collect::>(); + if movement_changes != aptos_changes { + if show_diff { + error!( + "Transaction write-set mismatch ({})\n{}", + txn_hash, + display_diff(&movement_txn.info.changes, &aptos_txn.info.changes)? + ); + } else { + error!("Transaction write-set mismatch ({})", txn_hash,); + } return Ok(false); } Ok(true) } -struct EventCompare<'a>(&'a Event); - -impl<'a> EventCompare<'a> { - pub fn to_json(&self) -> anyhow::Result { - let mut event = serde_json::Map::with_capacity(4); - event.insert("type".to_owned(), serde_json::to_value(&self.0.typ)?); - event.insert("data".to_owned(), self.0.data.to_owned()); - Ok(serde_json::Value::Object(event)) - } -} - -impl<'a> PartialEq for EventCompare<'a> { - fn eq(&self, other: &Self) -> bool { - self.0.typ == other.0.typ && self.0.data == other.0.data - } -} - -impl<'a> From<&'a Event> for EventCompare<'a> { - fn from(value: &'a Event) -> Self { - Self(value) - } -} - -fn display_diff( - movement_values: Vec, - aptos_values: Vec, -) -> anyhow::Result { - let movement_json = serde_json::to_string_pretty(&serde_json::Value::Array(movement_values))?; - let aptos_json = serde_json::to_string_pretty(&serde_json::Value::Array(aptos_values))?; +fn display_diff(movement_values: &[T], aptos_values: &[T]) -> anyhow::Result +where + T: serde::Serialize, +{ + let movement_json = serde_json::to_string_pretty(movement_values)?; + let aptos_json = serde_json::to_string_pretty(aptos_values)?; Ok(create_diff(&movement_json, &aptos_json)?) } @@ -132,3 +105,59 @@ fn create_diff(movement: &str, aptos: &str) -> anyhow::Result { Ok(out) } + +struct EventCompare<'a>(&'a Event); + +impl<'a> PartialEq for EventCompare<'a> { + fn eq(&self, other: &Self) -> bool { + self.0.typ == self.0.typ && self.0.guid == other.0.guid + } +} + +impl<'a> From<&'a Event> for EventCompare<'a> { + fn from(value: &'a Event) -> Self { + Self(value) + } +} + +struct WriteSetChangeCompare<'a>(&'a WriteSetChange); + +impl<'a> PartialEq for WriteSetChangeCompare<'a> { + fn eq(&self, other: &Self) -> bool { + match (self.0, other.0) { + (WriteSetChange::DeleteModule(value1), WriteSetChange::DeleteModule(value2)) => { + // Ignored fields: state_key_hash + value1.address == value2.address && value1.module == value2.module + } + (WriteSetChange::DeleteResource(value1), WriteSetChange::DeleteResource(value2)) => { + // Ignored fields: state_key_hash + value1.address == value2.address && value1.resource == value2.resource + } + (WriteSetChange::DeleteTableItem(value1), WriteSetChange::DeleteTableItem(value2)) => { + // Ignored fields: state_key_hash, data + value1.key == value2.key && value1.handle == value2.handle + } + (WriteSetChange::WriteModule(value1), WriteSetChange::WriteModule(value2)) => { + // Ignored fields: state_key_hash + value1.address == value2.address && value1.data == value2.data + } + (WriteSetChange::WriteResource(value1), WriteSetChange::WriteResource(value2)) => { + // Ignored fields: state_key_hash, data.data.0.values + value1.address == value2.address + && value1.data.typ == value2.data.typ + && value1.data.data.0.keys().eq(value2.data.data.0.keys()) + } + (WriteSetChange::WriteTableItem(value1), WriteSetChange::WriteTableItem(value2)) => { + // Ignored fields: state_key_hash, value, data + value1.key == value2.key && value1.handle == value2.handle + } + _ => false, + } + } +} + +impl<'a> From<&'a WriteSetChange> for WriteSetChangeCompare<'a> { + fn from(value: &'a WriteSetChange) -> Self { + Self(value) + } +} diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/replay/replay.rs b/networks/movement/movement-full-node/src/admin/l1_migration/replay/replay.rs index a5fa4c7bc..7c30bb742 100644 --- a/networks/movement/movement-full-node/src/admin/l1_migration/replay/replay.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/replay/replay.rs @@ -25,6 +25,8 @@ pub struct DaReplayTransactions { pub da_sequencer_url: String, #[command(flatten)] da_sequencer_db: DaBlockHeight, + #[clap(long = "diff", help = "Show diff on transaction output mismatch")] + pub show_diff: bool, } #[derive(Args, Debug)] @@ -56,6 +58,7 @@ impl DaReplayTransactions { aptos_rest_client.clone(), movement_rest_client, rx_hashes, + self.show_diff, )); Some(tx_hashes) } else { @@ -162,6 +165,7 @@ async fn validate_transactions( aptos_rest_client: AptosRestClient, movement_rest_client: MovementRestClient, mut rx_hashes: mpsc::UnboundedReceiver, + show_diff: bool, ) { use aptos_api_types::transaction::Transaction; @@ -182,7 +186,7 @@ async fn validate_transactions( unreachable!() }; - match compare_transaction_outputs(*txn_movement, *txn_aptos) { + match compare_transaction_outputs(*txn_movement, *txn_aptos, show_diff) { Ok(valid) if valid => info!("Validated transaction {}", hash_str), Ok(_) => {} // invalid, errors logged elsewhere Err(e) => error!("Failed to validate transaction {}: {}", hash_str, e), From 9bacf4455bb99a2a9ccea9023195deb0e531bd79 Mon Sep 17 00:00:00 2001 From: Sebastian Bach Date: Tue, 2 Sep 2025 13:58:22 +0200 Subject: [PATCH 11/13] feat(l1-migration): rename replay to validate --- .../movement-full-node/src/admin/l1_migration/mod.rs | 4 ++-- .../src/admin/l1_migration/{replay => validate}/compare.rs | 0 .../admin/l1_migration/{replay => validate}/da_height.rs | 2 +- .../src/admin/l1_migration/{replay => validate}/display.rs | 2 +- .../src/admin/l1_migration/{replay => validate}/mod.rs | 6 +++--- .../src/admin/l1_migration/{replay => validate}/replay.rs | 6 +++--- .../src/admin/l1_migration/{replay => validate}/types.rs | 0 .../admin/l1_migration/{replay => validate}/types/api.rs | 0 .../src/admin/l1_migration/{replay => validate}/types/da.rs | 0 9 files changed, 10 insertions(+), 10 deletions(-) rename networks/movement/movement-full-node/src/admin/l1_migration/{replay => validate}/compare.rs (100%) rename networks/movement/movement-full-node/src/admin/l1_migration/{replay => validate}/da_height.rs (87%) rename networks/movement/movement-full-node/src/admin/l1_migration/{replay => validate}/display.rs (94%) rename networks/movement/movement-full-node/src/admin/l1_migration/{replay => validate}/mod.rs (76%) rename networks/movement/movement-full-node/src/admin/l1_migration/{replay => validate}/replay.rs (96%) rename networks/movement/movement-full-node/src/admin/l1_migration/{replay => validate}/types.rs (100%) rename networks/movement/movement-full-node/src/admin/l1_migration/{replay => validate}/types/api.rs (100%) rename networks/movement/movement-full-node/src/admin/l1_migration/{replay => validate}/types/da.rs (100%) diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/mod.rs b/networks/movement/movement-full-node/src/admin/l1_migration/mod.rs index a490670cd..7e3de2a07 100644 --- a/networks/movement/movement-full-node/src/admin/l1_migration/mod.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/mod.rs @@ -1,7 +1,7 @@ -use crate::admin::l1_migration::replay::ValidationTool; +use crate::admin::l1_migration::validate::ValidationTool; use clap::Subcommand; -mod replay; +mod validate; #[derive(Subcommand, Debug)] #[clap(rename_all = "kebab-case", about = "Commands for rotating keys")] diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/replay/compare.rs b/networks/movement/movement-full-node/src/admin/l1_migration/validate/compare.rs similarity index 100% rename from networks/movement/movement-full-node/src/admin/l1_migration/replay/compare.rs rename to networks/movement/movement-full-node/src/admin/l1_migration/validate/compare.rs diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/replay/da_height.rs b/networks/movement/movement-full-node/src/admin/l1_migration/validate/da_height.rs similarity index 87% rename from networks/movement/movement-full-node/src/admin/l1_migration/replay/da_height.rs rename to networks/movement/movement-full-node/src/admin/l1_migration/validate/da_height.rs index a9c62f055..d6a3d5c32 100644 --- a/networks/movement/movement-full-node/src/admin/l1_migration/replay/da_height.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/validate/da_height.rs @@ -1,4 +1,4 @@ -use crate::admin::l1_migration::replay::types::da::get_da_block_height; +use crate::admin::l1_migration::validate::types::da::get_da_block_height; use clap::Parser; use std::path::PathBuf; diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/replay/display.rs b/networks/movement/movement-full-node/src/admin/l1_migration/validate/display.rs similarity index 94% rename from networks/movement/movement-full-node/src/admin/l1_migration/replay/display.rs rename to networks/movement/movement-full-node/src/admin/l1_migration/validate/display.rs index d69b7c704..6721cbf33 100644 --- a/networks/movement/movement-full-node/src/admin/l1_migration/replay/display.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/validate/display.rs @@ -1,4 +1,4 @@ -use crate::admin::l1_migration::replay::types::api::AptosRestClient; +use crate::admin::l1_migration::validate::types::api::AptosRestClient; use aptos_api_types::Transaction; use aptos_crypto::HashValue; use clap::Parser; diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/replay/mod.rs b/networks/movement/movement-full-node/src/admin/l1_migration/validate/mod.rs similarity index 76% rename from networks/movement/movement-full-node/src/admin/l1_migration/replay/mod.rs rename to networks/movement/movement-full-node/src/admin/l1_migration/validate/mod.rs index 2184c5859..7073dd516 100644 --- a/networks/movement/movement-full-node/src/admin/l1_migration/replay/mod.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/validate/mod.rs @@ -1,6 +1,6 @@ -use crate::admin::l1_migration::replay::da_height::DaHeight; -use crate::admin::l1_migration::replay::display::DisplayTransactionOutputs; -use crate::admin::l1_migration::replay::replay::DaReplayTransactions; +use crate::admin::l1_migration::validate::da_height::DaHeight; +use crate::admin::l1_migration::validate::display::DisplayTransactionOutputs; +use crate::admin::l1_migration::validate::replay::DaReplayTransactions; use clap::Parser; mod compare; diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/replay/replay.rs b/networks/movement/movement-full-node/src/admin/l1_migration/validate/replay.rs similarity index 96% rename from networks/movement/movement-full-node/src/admin/l1_migration/replay/replay.rs rename to networks/movement/movement-full-node/src/admin/l1_migration/validate/replay.rs index 7c30bb742..fe9e84b2c 100644 --- a/networks/movement/movement-full-node/src/admin/l1_migration/replay/replay.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/validate/replay.rs @@ -1,6 +1,6 @@ -use crate::admin::l1_migration::replay::compare::compare_transaction_outputs; -use crate::admin::l1_migration::replay::types::api::{AptosRestClient, MovementRestClient}; -use crate::admin::l1_migration::replay::types::da::{get_da_block_height, DaSequencerClient}; +use crate::admin::l1_migration::validate::compare::compare_transaction_outputs; +use crate::admin::l1_migration::validate::types::api::{AptosRestClient, MovementRestClient}; +use crate::admin::l1_migration::validate::types::da::{get_da_block_height, DaSequencerClient}; use anyhow::Context; use aptos_crypto::HashValue; use aptos_types::transaction::SignedTransaction; diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/replay/types.rs b/networks/movement/movement-full-node/src/admin/l1_migration/validate/types.rs similarity index 100% rename from networks/movement/movement-full-node/src/admin/l1_migration/replay/types.rs rename to networks/movement/movement-full-node/src/admin/l1_migration/validate/types.rs diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/replay/types/api.rs b/networks/movement/movement-full-node/src/admin/l1_migration/validate/types/api.rs similarity index 100% rename from networks/movement/movement-full-node/src/admin/l1_migration/replay/types/api.rs rename to networks/movement/movement-full-node/src/admin/l1_migration/validate/types/api.rs diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/replay/types/da.rs b/networks/movement/movement-full-node/src/admin/l1_migration/validate/types/da.rs similarity index 100% rename from networks/movement/movement-full-node/src/admin/l1_migration/replay/types/da.rs rename to networks/movement/movement-full-node/src/admin/l1_migration/validate/types/da.rs From 4cb8e15eef683e2f7cb4a6ce0a37779f5c7b954e Mon Sep 17 00:00:00 2001 From: Sebastian Bach Date: Tue, 2 Sep 2025 16:08:53 +0200 Subject: [PATCH 12/13] feat(l1-migration): reduce logging --- .../src/admin/l1_migration/validate/replay.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/validate/replay.rs b/networks/movement/movement-full-node/src/admin/l1_migration/validate/replay.rs index fe9e84b2c..c29e615b6 100644 --- a/networks/movement/movement-full-node/src/admin/l1_migration/validate/replay.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/validate/replay.rs @@ -11,7 +11,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; use tokio::task::JoinSet; use tokio_stream::StreamExt; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, warn}; #[derive(Parser, Debug)] #[clap(name = "replay", about = "Stream transactions from DA-sequencer blocks")] @@ -187,7 +187,7 @@ async fn validate_transactions( }; match compare_transaction_outputs(*txn_movement, *txn_aptos, show_diff) { - Ok(valid) if valid => info!("Validated transaction {}", hash_str), + Ok(valid) if valid => debug!("Validated transaction {}", hash_str), Ok(_) => {} // invalid, errors logged elsewhere Err(e) => error!("Failed to validate transaction {}: {}", hash_str, e), } From ebf87ebeb92a20ff9f099ccc23df45cbac233cf6 Mon Sep 17 00:00:00 2001 From: Sebastian Bach Date: Wed, 3 Sep 2025 10:21:01 +0200 Subject: [PATCH 13/13] feat(l1-migration): log extracted da-height --- .../src/admin/l1_migration/validate/replay.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/networks/movement/movement-full-node/src/admin/l1_migration/validate/replay.rs b/networks/movement/movement-full-node/src/admin/l1_migration/validate/replay.rs index c29e615b6..952924644 100644 --- a/networks/movement/movement-full-node/src/admin/l1_migration/validate/replay.rs +++ b/networks/movement/movement-full-node/src/admin/l1_migration/validate/replay.rs @@ -11,7 +11,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; use tokio::task::JoinSet; use tokio_stream::StreamExt; -use tracing::{debug, error, warn}; +use tracing::{debug, error, info, warn}; #[derive(Parser, Debug)] #[clap(name = "replay", about = "Stream transactions from DA-sequencer blocks")] @@ -42,7 +42,11 @@ impl DaReplayTransactions { pub async fn run(&self) -> anyhow::Result<()> { let block_height = match (self.da_sequencer_db.height, &self.da_sequencer_db.path) { (Some(height), _) => height, - (_, Some(path)) => get_da_block_height(path)?, + (_, Some(path)) => { + let da_block_height = get_da_block_height(path)?; + info!("Extracted DA-Sequencer block height: {}", da_block_height); + da_block_height + } _ => unreachable!(), }; let (tx_batches, rx_batches) = mpsc::channel::>(10);