diff --git a/.gitmodules b/.gitmodules index 0e6a418d..883353ad 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "offchain-modules/eth-proof"] path = offchain-modules/eth-proof url = https://github.com/LeonLi000/eth-proof +[submodule "offchain-modules/sign-server"] + path = offchain-modules/sign-server + url = https://github.com/LeonLi000/sign-server.git diff --git a/Makefile b/Makefile index e78b2c01..48d5a49e 100644 --- a/Makefile +++ b/Makefile @@ -66,7 +66,7 @@ init-light-client: ${FORCE_CLI} init-ckb-light-contract -k 0 -f 500 -c 40000 --wait init-multi-address: - ${FORCE_CLI} init-multi-sign-address -k 1 --multi-address ckt1qyqyph8v9mclls35p6snlaxajeca97tc062sa5gahk ckt1qyqvsv5240xeh85wvnau2eky8pwrhh4jr8ts8vyj37 + ${FORCE_CLI} init-multi-sign-address -k 1 --multi-address ckt1qyqyph8v9mclls35p6snlaxajeca97tc062sa5gahk ckt1qyqywrwdchjyqeysjegpzw38fvandtktdhrs0zaxl4 ckt1qyq2f0uwf3lk7e0nthfucvxgl3zu36v6zuwq6mlzps --threshold 2 --hosts http://127.0.0.1:3031 http://127.0.0.1:3032 http://127.0.0.1:3033 ckb2eth-relay: pm2 start --name ckb2eth-relay "${FORCE_CLI} ckb-relay -k 1 --per-amount 5" @@ -126,12 +126,14 @@ local-ci: make close-dev-env rm -rf ~/.force-bridge/eth-rocksdb rm -rf ~/.force-bridge/ckb-rocksdb + rm -rf ~/.force-bridge/ckb-rocksdb test -f ~/.force-bridge/config.toml && mv ~/.force-bridge/config.toml ~/.force-bridge/config_bak_`date "+%Y%m%d-%H%M%S"`.toml || echo 'config not exist' cd offchain-modules && cargo build make init-config make integration-ci github-ci: + git submodule update --init rm -rf ~/.force-bridge/eth-rocksdb rm -rf ~/.force-bridge/ckb-rocksdb rm -rf ~/.force-bridge/dapp-lib/eth-rocksdb diff --git a/eth-contracts/scripts/deploy.js b/eth-contracts/scripts/deploy.js index f5e8f47b..a8134a36 100644 --- a/eth-contracts/scripts/deploy.js +++ b/eth-contracts/scripts/deploy.js @@ -57,13 +57,13 @@ async function deploy() { // deploy ckbChain const validators = network_config.ethereum_private_keys - .slice(0, 2) + .slice(0, 4) .map((privateKey) => { let publicKey = EthUtil.privateToPublic(Buffer.from(privateKey, 'hex')); return '0x' + EthUtil.publicToAddress(publicKey).toString('hex'); }); console.error('validator validator: ', validators); - const multisigThreshold = 1; + const multisigThreshold = 2; let eth_network = await provider.getNetwork(); const chainId = eth_network.chainId; console.error('chain id :', chainId); diff --git a/offchain-modules/Cargo.lock b/offchain-modules/Cargo.lock index 9d0baaf1..a89de48a 100644 --- a/offchain-modules/Cargo.lock +++ b/offchain-modules/Cargo.lock @@ -753,6 +753,15 @@ dependencies = [ "libc", ] +[[package]] +name = "bstr" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a40b47ad93e1a5404e6c18dec46b628214fee441c70f4ab5d6942142cc268a3d" +dependencies = [ + "memchr", +] + [[package]] name = "build_const" version = "0.2.1" @@ -970,7 +979,7 @@ source = "git+https://github.com/nervosnetwork/ckb?tag=v0.36.0-rc2#38aacb54991e0 dependencies = [ "ckb-types", "faster-hex 0.4.1", - "jsonrpc-core", + "jsonrpc-core 14.2.0", "serde 1.0.118", "serde_json", ] @@ -1951,9 +1960,10 @@ dependencies = [ "config", "env_logger", "failure", + "futures 0.3.8", "hex 0.4.2", "int-enum", - "jsonrpc-core", + "jsonrpc-core 14.2.0", "log", "molecule", "reqwest 0.9.24", @@ -2212,6 +2222,19 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8be18de09a56b60ed0edf84bc9df007e30040691af7acd1c41874faac5895bfb" +[[package]] +name = "globset" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c152169ef1e421390738366d2f796655fec62621dabbd0fd476f905934061e4a" +dependencies = [ + "aho-corasick", + "bstr", + "fnv", + "log", + "regex", +] + [[package]] name = "gloo-timers" version = "0.2.1" @@ -2652,6 +2675,50 @@ dependencies = [ "serde_json", ] +[[package]] +name = "jsonrpc-core" +version = "15.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0745a6379e3edc893c84ec203589790774e4247420033e71a76d3ab4687991fa" +dependencies = [ + "futures 0.1.30", + "log", + "serde 1.0.118", + "serde_derive", + "serde_json", +] + +[[package]] +name = "jsonrpc-http-server" +version = "15.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fb5c4513b7b542f42da107942b7b759f27120b5cc894729f88254b28dff44b7" +dependencies = [ + "hyper 0.12.35", + "jsonrpc-core 15.1.0", + "jsonrpc-server-utils", + "log", + "net2", + "parking_lot 0.10.2", + "unicase", +] + +[[package]] +name = "jsonrpc-server-utils" +version = "15.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72f1f3990650c033bd8f6bd46deac76d990f9bbfb5f8dc8c4767bf0a00392176" +dependencies = [ + "bytes 0.4.12", + "globset", + "jsonrpc-core 15.1.0", + "lazy_static", + "log", + "tokio 0.1.22", + "tokio-codec", + "unicase", +] + [[package]] name = "keccak" version = "0.1.0" @@ -3252,6 +3319,16 @@ dependencies = [ "rustc_version", ] +[[package]] +name = "parking_lot" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e" +dependencies = [ + "lock_api 0.3.4", + "parking_lot_core 0.7.2", +] + [[package]] name = "parking_lot" version = "0.11.1" @@ -3290,6 +3367,20 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "parking_lot_core" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3" +dependencies = [ + "cfg-if 0.1.10", + "cloudabi", + "libc", + "redox_syscall", + "smallvec 1.5.1", + "winapi 0.3.9", +] + [[package]] name = "parking_lot_core" version = "0.8.1" @@ -4335,6 +4426,40 @@ dependencies = [ "dirs", ] +[[package]] +name = "sign-server" +version = "0.1.0" +dependencies = [ + "anyhow", + "blake2b-ref", + "ckb-hash", + "ckb-jsonrpc-types", + "ckb-rocksdb", + "ckb-sdk", + "ckb-types", + "clap", + "cmd_lib", + "config", + "env_logger", + "failure", + "futures 0.3.8", + "hex 0.4.2", + "jsonrpc-http-server", + "log", + "merkle-cbt", + "molecule", + "reqwest 0.9.24", + "secp256k1 0.17.2", + "serde 1.0.118", + "serde_derive", + "serde_json", + "shellexpand", + "sparse-merkle-tree", + "tokio 0.2.24", + "toml", + "web3", +] + [[package]] name = "signal-hook-registry" version = "1.2.2" @@ -4902,13 +5027,18 @@ dependencies = [ "futures 0.1.30", "mio", "num_cpus", + "tokio-codec", "tokio-current-thread", "tokio-executor", + "tokio-fs", "tokio-io", "tokio-reactor", + "tokio-sync", "tokio-tcp", "tokio-threadpool", "tokio-timer", + "tokio-udp", + "tokio-uds", ] [[package]] @@ -4946,6 +5076,17 @@ dependencies = [ "futures 0.1.30", ] +[[package]] +name = "tokio-codec" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25b2998660ba0e70d18684de5d06b70b70a3a747469af9dea7618cc59e75976b" +dependencies = [ + "bytes 0.4.12", + "futures 0.1.30", + "tokio-io", +] + [[package]] name = "tokio-current-thread" version = "0.1.7" @@ -4966,6 +5107,17 @@ dependencies = [ "futures 0.1.30", ] +[[package]] +name = "tokio-fs" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "297a1206e0ca6302a0eed35b700d292b275256f596e2f3fea7729d5e629b6ff4" +dependencies = [ + "futures 0.1.30", + "tokio-io", + "tokio-threadpool", +] + [[package]] name = "tokio-io" version = "0.1.13" @@ -5080,6 +5232,39 @@ dependencies = [ "tokio 0.2.24", ] +[[package]] +name = "tokio-udp" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2a0b10e610b39c38b031a2fcab08e4b82f16ece36504988dcbd81dbba650d82" +dependencies = [ + "bytes 0.4.12", + "futures 0.1.30", + "log", + "mio", + "tokio-codec", + "tokio-io", + "tokio-reactor", +] + +[[package]] +name = "tokio-uds" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab57a4ac4111c8c9dbcf70779f6fc8bc35ae4b2454809febac840ad19bd7e4e0" +dependencies = [ + "bytes 0.4.12", + "futures 0.1.30", + "iovec", + "libc", + "log", + "mio", + "mio-uds", + "tokio-codec", + "tokio-io", + "tokio-reactor", +] + [[package]] name = "tokio-util" version = "0.3.1" @@ -5477,7 +5662,7 @@ dependencies = [ "futures-timer", "hyper 0.13.9", "hyper-tls 0.4.3", - "jsonrpc-core", + "jsonrpc-core 14.2.0", "log", "native-tls", "parking_lot 0.11.1", diff --git a/offchain-modules/cli/src/commands/mod.rs b/offchain-modules/cli/src/commands/mod.rs index f5b0fd12..42c9006f 100755 --- a/offchain-modules/cli/src/commands/mod.rs +++ b/offchain-modules/cli/src/commands/mod.rs @@ -93,8 +93,12 @@ pub async fn init_config(args: InitConfigArgs) -> Result<()> { } pub async fn init_multisig_address_handler(args: InitMultiSignAddressArgs) -> Result<()> { + if args.multi_address.len() != args.hosts.len() { + anyhow::bail!("failed to init multisig address. the length of multi_address and the length of hosts are not equal.") + } let multi_sign_address = init_multi_sign_address( args.multi_address, + args.hosts, args.require_first_n, args.threshold, args.config_path, @@ -433,7 +437,7 @@ pub async fn eth_relay_handler(args: EthRelayArgs) -> Result<()> { config_path, args.network, args.private_key_path, - args.multisig_privkeys, + // args.multisig_privkeys, args.confirm, args.delay, )?; @@ -453,8 +457,8 @@ pub async fn ckb_relay_handler(args: CkbRelayArgs) -> Result<()> { config_path, args.network, args.private_key_path, - args.mutlisig_privkeys, args.gas_price, + args.hosts, args.confirm, )?; diff --git a/offchain-modules/cli/src/commands/types.rs b/offchain-modules/cli/src/commands/types.rs index bff96232..1cedb739 100755 --- a/offchain-modules/cli/src/commands/types.rs +++ b/offchain-modules/cli/src/commands/types.rs @@ -132,6 +132,8 @@ pub struct InitConfigArgs { pub struct InitMultiSignAddressArgs { #[clap(long)] pub multi_address: Vec, + #[clap(long)] + pub hosts: Vec, #[clap(long, default_value = "2")] pub threshold: u8, #[clap(long, default_value = "0")] @@ -329,8 +331,8 @@ pub struct EthRelayArgs { pub network: Option, #[clap(short = 'k', long)] pub private_key_path: String, - #[clap(long)] - pub multisig_privkeys: Vec, + // #[clap(long)] + // pub multisig_privkeys: Vec, #[clap(long, default_value = "15")] pub confirm: u64, #[clap(long, default_value = "300")] @@ -351,8 +353,10 @@ pub struct CkbRelayArgs { pub max_tx_count: u64, #[clap(short, long, default_value = "0")] pub gas_price: u64, + // #[clap(long)] + // pub mutlisig_privkeys: Vec, #[clap(long)] - pub mutlisig_privkeys: Vec, + pub hosts: Vec, #[clap(long, default_value = "15")] pub confirm: u64, } diff --git a/offchain-modules/lib/src/header_relay/ckb_relay.rs b/offchain-modules/lib/src/header_relay/ckb_relay.rs index e2969930..9674b836 100644 --- a/offchain-modules/lib/src/header_relay/ckb_relay.rs +++ b/offchain-modules/lib/src/header_relay/ckb_relay.rs @@ -1,10 +1,10 @@ -use crate::transfer::to_eth::{get_add_ckb_headers_func, get_msg_hash, get_msg_signature}; +use crate::transfer::to_eth::{get_add_ckb_headers_func, get_msg_hash}; use crate::util::ckb_proof_helper::CBMT; use crate::util::ckb_tx_generator::Generator; use crate::util::ckb_util::covert_to_h256; use crate::util::config::ForceConfig; use crate::util::eth_util::{ - convert_eth_address, parse_private_key, parse_secret_key, relay_header_transaction, Web3Client, + convert_eth_address, parse_private_key, relay_header_transaction, Web3Client, }; use crate::util::rocksdb::open_rocksdb; use anyhow::{anyhow, bail, Result}; @@ -16,9 +16,10 @@ use force_sdk::constants::{ BURN_TX_MAX_NUM, BURN_TX_MAX_WAITING_BLOCKS, MAINNET_CKB_WAITING_BLOCKS, TESTNET_CKB_WAITING_BLOCKS, }; +use force_sdk::indexer::SignServerRpcClient; +use futures::future::join_all; use log::info; use rocksdb::ops::{Get, Put}; -use secp256k1::SecretKey; use std::time::Instant; use web3::types::{H160, H256}; @@ -29,7 +30,8 @@ pub struct CKBRelayer { pub ckb_client: Generator, pub web3_client: Web3Client, pub gas_price: U256, - pub multisig_privkeys: Vec, + // pub multisig_privkeys: Vec, + pub hosts: Vec, pub ckb_rpc_url: String, pub eth_rpc_url: String, pub ckb_init_height: u64, @@ -37,6 +39,7 @@ pub struct CKBRelayer { pub last_burn_tx_height: u64, pub last_submit_height: u64, pub waiting_burn_txs_count: u64, + pub threshold: usize, pub confirm: u64, } @@ -45,8 +48,8 @@ impl CKBRelayer { config_path: String, network: Option, priv_key_path: String, - multisig_privkeys: Vec, gas_price: u64, + hosts: Vec, confirm: u64, ) -> Result { let force_config = ForceConfig::new(config_path.as_str())?; @@ -55,11 +58,11 @@ impl CKBRelayer { .as_ref() .ok_or_else(|| anyhow!("contracts should be deployed"))?; - if multisig_privkeys.len() < deployed_contracts.ckb_relay_mutlisig_threshold.threshold { + if hosts.len() < deployed_contracts.ckb_relay_mutlisig_threshold.threshold { bail!( - "the mutlisig privkeys number is less. expect {}, actual {} ", + "the mutlisig number is less. expect {}, actual {} ", deployed_contracts.ckb_relay_mutlisig_threshold.threshold, - multisig_privkeys.len() + hosts.len() ); } @@ -72,10 +75,10 @@ impl CKBRelayer { let ckb_rpc_url = force_config.get_ckb_rpc_url(&network)?; let ckb_indexer_url = force_config.get_ckb_indexer_url(&network)?; let priv_key = parse_private_key(&priv_key_path, &force_config, &network)?; - let multisig_privkeys = multisig_privkeys - .into_iter() - .map(|k| parse_private_key(&k, &force_config, &network)) - .collect::>>()?; + // let multisig_privkeys = multisig_privkeys + // .into_iter() + // .map(|k| parse_private_key(&k, &force_config, &network)) + // .collect::>>()?; let contract_addr = convert_eth_address(&deployed_contracts.eth_ckb_chain_addr)?; let mut ckb_client = @@ -111,12 +114,10 @@ impl CKBRelayer { ckb_client, web3_client, gas_price, - confirm, + hosts, network: net, - multisig_privkeys: multisig_privkeys - .iter() - .map(|&privkey| parse_secret_key(privkey)) - .collect::>>()?, + threshold: deployed_contracts.ckb_relay_mutlisig_threshold.threshold, + confirm, }) } @@ -204,12 +205,75 @@ impl CKBRelayer { history_tx_root, )?; + // This can be optimized to collect signatures through multiple threads to improve efficiency. let mut signatures: Vec = vec![]; - for &privkey in self.multisig_privkeys.iter() { - let mut signature = get_msg_signature(&headers_msg_hash, privkey)?; - signatures.append(&mut signature); + let mut signature_number = 0; + let mut signature_futures = vec![]; + for host in self.hosts.clone() { + signature_futures.push(sign_eth_tx(host, hex::encode(&headers_msg_hash))); + } + if !signature_futures.is_empty() { + let now = Instant::now(); + let count = signature_futures.len(); + let timeout_future = tokio::time::delay_for(std::time::Duration::from_secs(30)); + let task_future = join_all(signature_futures); + tokio::select! { + v = task_future => { + for res in v.iter() { + match res { + Ok(res) => + { + if signature_number >= self.threshold { + break; + } + if res.len() != 130 { + log::error!("wrong signature: {:?}", res.clone()); + continue; + } + signatures.append(&mut hex::decode(res).map_err(|err| anyhow!(err))?); + signature_number += 1; + log::info!("collect eth signature success. index: {:?}", signature_number); + }, + Err(error) => log::error!("collect eth signature error : {:?}", error), + } + } + log::info!("collect {:?} eth signatures elapsed {:?}", count, now.elapsed()); + } + _ = timeout_future => { + log::error!("collect eth signatures timeout"); + } + } } - info!("msg signatures {}", hex::encode(&signatures)); + if signature_number < self.threshold { + log::error!( + "did not collect enough eth signatures. expect: {:?}, actual: {:?}", + self.threshold, + signature_number + ); + anyhow::bail!("did not collect enough eth signatures"); + } + // let m = Arc::new(Mutex::new(vec![])); + // for host in self.hosts.clone() { + // thread::spawn(move || { + // let mut signatures = m.lock().unwrap(); + // let signature = sign_eth_tx(host, hex::encode(&headers_msg_hash)) + // .await + // .unwrap(); + // signatures.push(signature); + // }); + // } + // loop { + // let signatures = m.lock().unwrap(); + // if signatures.len() >= 2 { + // break; + // } + // info!( + // "waiting to collect more signatures. expect: {:?}, current: {:?}", + // 2, + // signatures.len() + // ); + // tokio::time::delay_for(std::time::Duration::from_secs(1)).await; + // } let add_headers_abi = add_headers_func.encode_input(&[ Token::Uint(init_block_number.into()), @@ -348,3 +412,11 @@ impl CKBRelayer { Ok(()) } } + +async fn sign_eth_tx(host: String, raw_tx_str: String) -> Result { + let mut client = SignServerRpcClient::new(host); + Ok(client + .sign_eth_tx(raw_tx_str) + .map_err(|err| anyhow!(err))? + .ok_or_else(|| anyhow!(""))?) +} diff --git a/offchain-modules/lib/src/header_relay/eth_relay.rs b/offchain-modules/lib/src/header_relay/eth_relay.rs index 4c84fc10..d06a783f 100644 --- a/offchain-modules/lib/src/header_relay/eth_relay.rs +++ b/offchain-modules/lib/src/header_relay/eth_relay.rs @@ -1,7 +1,5 @@ use crate::util::ckb_tx_generator::Generator; -use crate::util::ckb_util::{ - parse_cell, parse_main_chain_headers, parse_merkle_cell_data, parse_privkey_path, -}; +use crate::util::ckb_util::{parse_cell, parse_merkle_cell_data, parse_privkey_path}; use crate::util::config::ForceConfig; use crate::util::eth_util::Web3Client; use crate::util::rocksdb; @@ -13,7 +11,7 @@ use ethereum_types::H256; use force_eth_types::generated::eth_header_cell::ETHHeaderCellMerkleDataReader; use force_sdk::cell_collector::get_live_cell_by_typescript; use force_sdk::indexer::{Cell, IndexerRpcClient}; -use force_sdk::tx_helper::{sign_with_multi_key, MultisigConfig}; +use force_sdk::tx_helper::{sign_from_multi_server, MultisigConfig}; use force_sdk::util::send_tx_sync_with_response; use log::{debug, info}; use molecule::prelude::Reader; @@ -21,7 +19,6 @@ use secp256k1::SecretKey; // use serde::export::Clone; use shellexpand::tilde; use sparse_merkle_tree::traits::Value; -use std::ops::Add; use std::str::FromStr; use web3::types::{Block, BlockHeader, U64}; @@ -33,7 +30,7 @@ pub struct ETHRelayer { pub config_path: String, pub config: ForceConfig, pub multisig_config: MultisigConfig, - pub multisig_privkeys: Vec, + pub hosts: Vec, pub secret_key: SecretKey, pub confirm: u64, pub delay: u64, @@ -44,7 +41,6 @@ impl ETHRelayer { config_path: String, network: Option, priv_key_path: String, - multisig_privkeys: Vec, confirm: u64, delay: u64, ) -> Result { @@ -86,10 +82,7 @@ impl ETHRelayer { config_path, multisig_config, secret_key, - multisig_privkeys: multisig_privkeys - .into_iter() - .map(|k| parse_privkey_path(&k, &force_config, &network)) - .collect::>>()?, + hosts: deployed_contracts.multisig_address.hosts.clone(), config: force_config, confirm, delay, @@ -194,6 +187,14 @@ impl ETHRelayer { } let force_config = ForceConfig::new(self.config_path.as_str())?; + let mut multisig_address = force_config + .deployed_contracts + .ok_or_else(|| anyhow!("deployed_contracts should be exist."))? + .multisig_address + .clone(); + multisig_address.hosts = vec![]; + let multisig_conf_json = + serde_json::to_string(&multisig_address.clone()).map_err(|err| anyhow!(err))?; let db_path = force_config.eth_rocksdb_path; // make tx let cell = @@ -269,15 +270,23 @@ impl ETHRelayer { start_height, new_latest_height, )?; + log::info!( + "tx: \n{}", + serde_json::to_string_pretty(&ckb_jsonrpc_types::TransactionView::from( + unsigned_tx.clone() + )) + .unwrap() + ); - let mut privkeys = vec![&self.secret_key]; - privkeys.extend(self.multisig_privkeys.iter()); - let tx = sign_with_multi_key( + let tx = sign_from_multi_server( unsigned_tx, &mut self.generator.rpc_client, - privkeys, + &self.secret_key, + self.hosts.clone(), self.multisig_config.clone(), + multisig_conf_json, ) + .await .map_err(|err| anyhow::anyhow!(err))?; let send_tx_res = send_tx_sync_with_response(&mut self.generator.rpc_client, &tx, 180).await; @@ -328,123 +337,123 @@ impl ETHRelayer { } } - pub async fn do_relay_loop(&mut self, mut cell: Cell) -> Result<()> { - let ckb_cell_data = cell.clone().output_data.as_bytes().to_vec(); - let mut un_confirmed_headers = vec![]; - let mut index: isize = 0; - if !ckb_cell_data.is_empty() { - let (headers, _) = parse_main_chain_headers(ckb_cell_data)?; - un_confirmed_headers = headers; - index = (un_confirmed_headers.len() - 1) as isize; - } - let mut number: U64; - let mut current_block: Block; - if index == 0 { - // first relay - number = self.eth_client.client().eth().block_number().await?; - current_block = self.eth_client.get_block(number.into()).await?; - } else { - // Determine whether the latest_header is on the Ethereum main chain - // If it is in the main chain, the new header currently needs to be added current_height = latest_height + 1 - // If it is not in the main chain, it means that reorg has occurred, and you need to trace back from latest_height until the back traced header is on the main chain - current_block = self - .lookup_common_ancestor(&un_confirmed_headers, index) - .await?; - number = current_block - .number - .ok_or_else(|| anyhow!("the block number is not exist."))?; - } - - loop { - let witnesses = vec![]; - let start = number.add(1 as u64); - let mut latest_number = self.eth_client.client().eth().block_number().await?; - if latest_number <= start { - info!("current block is newest, waiting for new header on ethereum."); - tokio::time::delay_for(std::time::Duration::from_secs(1)).await; - continue; - } - if latest_number.as_u64() - start.as_u64() > HEADER_LIMIT_IN_TX as u64 { - latest_number = start.add(HEADER_LIMIT_IN_TX as u64); - } - info!( - "try to relay eth light client, block height start: {:?}, end: {:?}", - start.as_u64(), - latest_number.as_u64() - ); - let headers = self - .eth_client - .get_blocks(start.as_u64(), latest_number.as_u64()) - .await?; - if headers[0].parent_hash - == current_block - .hash - .ok_or_else(|| anyhow!("the block hash is not exist."))? - { - // No reorg - // don't remove it, it will be used in later. - - // for item in headers.clone() { - // let witness = self.generate_witness(item.number.unwrap().as_u64())?; - // witnesses.push(witness); - // } - } else { - // Reorg occurred, need to go back - info!("reorg occurred, ready to go back"); - let index: isize = (un_confirmed_headers.len() - 1) as isize; - current_block = self - .lookup_common_ancestor(&un_confirmed_headers, index) - .await?; - info!( - "reorg occurred, found the common ancestor. {:?}", - current_block - ); - number = current_block - .number - .ok_or_else(|| anyhow!("the block number is not exist."))?; - continue; - } - - let from_privkey = self.secret_key; - let from_lockscript = self.generate_from_lockscript(from_privkey)?; - - let unsigned_tx = self.generator.generate_eth_light_client_tx( - &headers, - &cell, - &witnesses, - &un_confirmed_headers, - from_lockscript, - )?; - // FIXME: waiting for sign server. - let secret_key_a = parse_privkey_path("0", &self.config, &Option::None)?; - let secret_key_b = parse_privkey_path("1", &self.config, &Option::None)?; - let tx = sign_with_multi_key( - unsigned_tx, - &mut self.generator.rpc_client, - vec![&self.secret_key, &secret_key_a, &secret_key_b], - self.multisig_config.clone(), - ) - .map_err(|err| anyhow::anyhow!(err))?; - self.generator - .rpc_client - .send_transaction(tx.data()) - .map_err(|err| anyhow!(err))?; - - // update cell current_block and number. - update_cell_sync(&mut self.generator.indexer_client, &tx, 600, &mut cell) - .await - .map_err(|err| anyhow::anyhow!(err))?; - current_block = headers[headers.len() - 1].clone(); - number = current_block.number.unwrap(); - let ckb_cell_data = cell.clone().output_data.as_bytes().to_vec(); - let (un_confirmed, _) = parse_main_chain_headers(ckb_cell_data)?; - un_confirmed_headers = un_confirmed; - info!( - "Successfully relayed the headers, ready to relay the next one. next number: {:?}", - number - ); - } - } + // pub async fn do_relay_loop(&mut self, mut cell: Cell) -> Result<()> { + // let ckb_cell_data = cell.clone().output_data.as_bytes().to_vec(); + // let mut un_confirmed_headers = vec![]; + // let mut index: isize = 0; + // if !ckb_cell_data.is_empty() { + // let (headers, _) = parse_main_chain_headers(ckb_cell_data)?; + // un_confirmed_headers = headers; + // index = (un_confirm ed_headers.len() - 1) as isize; + // } + // let mut number: U64; + // let mut current_block: Block; + // if index == 0 { + // // first relay + // number = self.eth_client.client().eth().block_number().await?; + // current_block = self.eth_client.get_block(number.into()).await?; + // } else { + // // Determine whether the latest_header is on the Ethereum main chain + // // If it is in the main chain, the new header currently needs to be added current_height = latest_height + 1 + // // If it is not in the main chain, it means that reorg has occurred, and you need to trace back from latest_height until the back traced header is on the main chain + // current_block = self + // .lookup_common_ancestor(&un_confirmed_headers, index) + // .await?; + // number = current_block + // .number + // .ok_or_else(|| anyhow!("the block number is not exist."))?; + // } + // + // loop { + // let witnesses = vec![]; + // let start = number.add(1 as u64); + // let mut latest_number = self.eth_client.client().eth().block_number().await?; + // if latest_number <= start { + // info!("current block is newest, waiting for new header on ethereum."); + // tokio::time::delay_for(std::time::Duration::from_secs(1)).await; + // continue; + // } + // if latest_number.as_u64() - start.as_u64() > HEADER_LIMIT_IN_TX as u64 { + // latest_number = start.add(HEADER_LIMIT_IN_TX as u64); + // } + // info!( + // "try to relay eth light client, block height start: {:?}, end: {:?}", + // start.as_u64(), + // latest_number.as_u64() + // ); + // let headers = self + // .eth_client + // .get_blocks(start.as_u64(), latest_number.as_u64()) + // .await?; + // if headers[0].parent_hash + // == current_block + // .hash + // .ok_or_else(|| anyhow!("the block hash is not exist."))? + // { + // // No reorg + // // don't remove it, it will be used in later. + // + // // for item in headers.clone() { + // // let witness = self.generate_witness(item.number.unwrap().as_u64())?; + // // witnesses.push(witness); + // // } + // } else { + // // Reorg occurred, need to go back + // info!("reorg occurred, ready to go back"); + // let index: isize = (un_confirmed_headers.len() - 1) as isize; + // current_block = self + // .lookup_common_ancestor(&un_confirmed_headers, index) + // .await?; + // info!( + // "reorg occurred, found the common ancestor. {:?}", + // current_block + // ); + // number = current_block + // .number + // .ok_or_else(|| anyhow!("the block number is not exist."))?; + // continue; + // } + // + // let from_privkey = self.secret_key; + // let from_lockscript = self.generate_from_lockscript(from_privkey)?; + // + // let unsigned_tx = self.generator.generate_eth_light_client_tx( + // &headers, + // &cell, + // &witnesses, + // &un_confirmed_headers, + // from_lockscript, + // )?; + // // FIXME: waiting for sign server. + // let secret_key_a = parse_privkey_path("0", &self.config, &Option::None)?; + // let secret_key_b = parse_privkey_path("1", &self.config, &Option::None)?; + // let tx = sign_with_multi_key( + // unsigned_tx, + // &mut self.generator.rpc_client, + // vec![&self.secret_key, &secret_key_a, &secret_key_b], + // self.multisig_config.clone(), + // ) + // .map_err(|err| anyhow::anyhow!(err))?; + // self.generator + // .rpc_client + // .send_transaction(tx.data()) + // .map_err(|err| anyhow!(err))?; + // + // // update cell current_block and number. + // update_cell_sync(&mut self.generator.indexer_client, &tx, 600, &mut cell) + // .await + // .map_err(|err| anyhow::anyhow!(err))?; + // current_block = headers[headers.len() - 1].clone(); + // number = current_block.number.unwrap(); + // let ckb_cell_data = cell.clone().output_data.as_bytes().to_vec(); + // let (un_confirmed, _) = parse_main_chain_headers(ckb_cell_data)?; + // un_confirmed_headers = un_confirmed; + // info!( + // "Successfully relayed the headers, ready to relay the next one. next number: {:?}", + // number + // ); + // } + // } } pub async fn update_cell_sync( diff --git a/offchain-modules/lib/src/transfer/to_ckb.rs b/offchain-modules/lib/src/transfer/to_ckb.rs index 84cab465..69a7853a 100644 --- a/offchain-modules/lib/src/transfer/to_ckb.rs +++ b/offchain-modules/lib/src/transfer/to_ckb.rs @@ -497,6 +497,7 @@ pub async fn deploy_ckb( pub async fn init_multi_sign_address( multisig_address: Vec, + hosts: Vec, require_first_n: u8, threshold: u8, config_path: String, @@ -523,6 +524,7 @@ pub async fn init_multi_sign_address( .ok_or_else(|| anyhow!("contracts should be deployed"))?; deployed_contracts.multisig_address = MultisigConf { addresses: multisig_address, + hosts, require_first_n, threshold, }; diff --git a/offchain-modules/lib/src/util/config.rs b/offchain-modules/lib/src/util/config.rs index 5052c9eb..026be158 100644 --- a/offchain-modules/lib/src/util/config.rs +++ b/offchain-modules/lib/src/util/config.rs @@ -129,6 +129,7 @@ pub struct DeployedContracts { #[derive(Deserialize, Serialize, Default, Debug, Clone)] pub struct MultisigConf { pub addresses: Vec, + pub hosts: Vec, pub require_first_n: u8, pub threshold: u8, } diff --git a/offchain-modules/sdk/Cargo.toml b/offchain-modules/sdk/Cargo.toml index 12ee0c54..3a31edc9 100644 --- a/offchain-modules/sdk/Cargo.toml +++ b/offchain-modules/sdk/Cargo.toml @@ -27,6 +27,7 @@ serde = { version = "1.0", features = ["rc"] } serde_derive = "1.0" serde_json = "1.0" tokio = { version = "0.2.9", features = ["full"]} +futures = "0.3.4" [dev-dependencies] env_logger = "0.7.1" diff --git a/offchain-modules/sdk/src/indexer.rs b/offchain-modules/sdk/src/indexer.rs index b1d0beff..318b88e4 100644 --- a/offchain-modules/sdk/src/indexer.rs +++ b/offchain-modules/sdk/src/indexer.rs @@ -58,6 +58,49 @@ macro_rules! serialize_parameters { ($($arg_name:ident,)+) => ( serde_json::to_value(($($arg_name,)+))?) } +jsonrpc!(pub struct SignServerRawRpc{ +pub fn sign_eth_tx(&mut self, raw_tx_str: String) -> Option; +pub fn sign_ckb_tx(&mut self, multisig_conf: String, raw_tx_str: String) -> Option>; +}); + +pub struct SignServerRpcClient { + url: String, + client: SignServerRawRpc, +} + +impl SignServerRpcClient { + pub fn new(url: String) -> SignServerRpcClient { + let client = SignServerRawRpc::new(url.as_str()); + SignServerRpcClient { url, client } + } + + pub fn url(&self) -> &str { + self.url.as_str() + } + + pub fn client(&mut self) -> &mut SignServerRawRpc { + &mut self.client + } + + pub fn sign_eth_tx(&mut self, raw_tx_str: String) -> Result, String> { + self.client + .sign_eth_tx(raw_tx_str) + .map(|opt| opt.map(Into::into)) + .map_err(|err| err.to_string()) + } + + pub fn sign_ckb_tx( + &mut self, + multisig_conf: String, + raw_tx_str: String, + ) -> Result>, String> { + self.client + .sign_ckb_tx(multisig_conf, raw_tx_str) + .map(|opt| opt.map(Into::into)) + .map_err(|err| err.to_string()) + } +} + jsonrpc!(pub struct RawHttpRpcClient { pub fn get_tip(&mut self) -> Option; diff --git a/offchain-modules/sdk/src/tx_helper.rs b/offchain-modules/sdk/src/tx_helper.rs index ab1c9d67..a0aec70d 100644 --- a/offchain-modules/sdk/src/tx_helper.rs +++ b/offchain-modules/sdk/src/tx_helper.rs @@ -13,14 +13,16 @@ use std::convert::TryInto; use crate::cell_collector::{collect_sudt_cells_by_amout, get_live_cells_by_lock_and_capacity}; use crate::constants::XT_CELL_CAPACITY; -use crate::indexer::IndexerRpcClient; +use crate::indexer::{IndexerRpcClient, SignServerRpcClient}; use crate::util::{get_live_cell_with_cache, get_privkey_signer}; use ckb_sdk::constants::{ MIN_SECP_CELL_CAPACITY, MULTISIG_TYPE_HASH, ONE_CKB, SECP_SIGNATURE_SIZE, SIGHASH_TYPE_HASH, }; use ckb_sdk::HttpRpcClient; use ckb_sdk::{AddressPayload, AddressType, CodeHashIndex, GenesisInfo, Since}; +use futures::future::join_all; use secp256k1::SecretKey; +use std::time::Instant; pub const CKB_UNITS: u64 = 100_000_000; pub const PUBLIC_BRIDGE_CELL: u64 = 1000 * CKB_UNITS; @@ -72,11 +74,13 @@ pub fn sign( } #[allow(clippy::mutable_key_type)] -pub fn sign_with_multi_key( +pub async fn sign_from_multi_server( tx: TransactionView, rpc_client: &mut HttpRpcClient, - privkeys: Vec<&SecretKey>, + privkey: &SecretKey, + hosts: Vec, multisig_config: MultisigConfig, + multisig_config_json: String, ) -> Result { let mut live_cell_cache: HashMap<(OutPoint, bool), (CellOutput, Bytes)> = Default::default(); let get_live_cell_fn = |out_point: OutPoint, with_data: bool| { @@ -84,8 +88,16 @@ pub fn sign_with_multi_key( .map(|(output, _)| output) }; let mut tx_helper = TxHelper::new(tx); - tx_helper.add_multisig_config(multisig_config); - tx_helper.sign_with_multi_key(get_live_cell_fn, privkeys) + tx_helper.add_multisig_config(multisig_config.clone()); + tx_helper + .sign_from_multi_server( + get_live_cell_fn, + privkey, + hosts, + multisig_config_json, + multisig_config.threshold, + ) + .await } /// A transaction helper handle input/output with secp256k1(sighash/multisg) lock @@ -644,19 +656,101 @@ impl TxHelper { self.build_tx(&mut get_live_cell_fn, true) } - pub fn sign_with_multi_key Result>( + pub async fn sign_from_multi_server Result>( &mut self, mut get_live_cell_fn: F, - privkeys: Vec<&SecretKey>, + privkey: &SecretKey, + hosts: Vec, + multisig_config_json: String, + threshold: u8, ) -> Result { - for key in privkeys { - let signer = get_privkey_signer(*key); - for (lock_arg, signature) in self.sign_inputs(signer, &mut get_live_cell_fn, true)? { - self.add_signature(lock_arg, signature)?; + let mut signature_number = 0; + let raw_tx_str = hex::encode(self.transaction.data().as_bytes().to_vec()); + let mut collect_signature_futures = vec![]; + for host in hosts.clone() { + let result = collect_signatures( + host.clone(), + multisig_config_json.clone(), + raw_tx_str.clone(), + ); + collect_signature_futures.push(result); + } + if !collect_signature_futures.is_empty() { + let now = Instant::now(); + let count = collect_signature_futures.len(); + let timeout_future = tokio::time::delay_for(std::time::Duration::from_secs(30)); + let task_future = join_all(collect_signature_futures); + tokio::select! { + v = task_future => { + for res in v.iter() { + match res { + Ok(res) => + { + if signature_number >= threshold { + break; + } + for i in 0..res.len() / 2 { + if res[i * 2].clone().len() != 40 || res[i * 2 + 1].clone().len() != 130 { + log::error!("wrong signature: {:?}", res[i * 2 + 1].clone()); + continue; + } + let lock_arg: Bytes = + Bytes::from(hex::decode(res[i * 2].clone()).map_err(|err| err.to_string())?); + let signature = + Bytes::from(hex::decode(res[i * 2 + 1].clone()).map_err(|err| err.to_string())?); + self.add_signature(lock_arg, signature)?; + } + log::info!("collect signature success. index: {:?}", signature_number); + signature_number += 1; + }, + Err(error) => log::error!("collect signature error : {:?}", error), + } + } + log::info!("collect {:?} signatures elapsed {:?}", count, now.elapsed()); + } + _ = timeout_future => { + log::error!("collect signatures timeout"); + } } } - self.build_tx(&mut get_live_cell_fn, true) + if signature_number < threshold { + return Err(String::from("did not collect enough ckb signatures")); + } + let signer = get_privkey_signer(*privkey); + for (lock_arg, signature) in self.sign_inputs(signer, &mut get_live_cell_fn, true)? { + self.add_signature(lock_arg, signature)?; + } + Ok(self + .build_tx(&mut get_live_cell_fn, true) + .map_err(|err| err)?) + } +} + +async fn collect_signatures( + host: String, + multisig_conf: String, + raw_tx_str: String, +) -> Result, String> { + let res = sign_ckb_tx(host.clone(), multisig_conf.clone(), raw_tx_str) + .await + .map_err(|err| err)?; + log::info!("get signature from {:?} : {:?}", host, res); + if res.len() < 2 { + return Err(String::from("invalid signatures")); } + Ok(res) +} + +async fn sign_ckb_tx( + host: String, + multisig_conf: String, + raw_tx_str: String, +) -> Result, String> { + let mut client = SignServerRpcClient::new(host); + client + .sign_ckb_tx(multisig_conf, raw_tx_str) + .map_err(|err| err)? + .ok_or_else(|| String::from("the signature is not exist.")) } pub type SignerFn = Box< diff --git a/offchain-modules/sign-server b/offchain-modules/sign-server new file mode 160000 index 00000000..5f1af7fa --- /dev/null +++ b/offchain-modules/sign-server @@ -0,0 +1 @@ +Subproject commit 5f1af7fa0b21771b86427bbec09ac85b0222cb0e diff --git a/offchain-modules/start-services.sh b/offchain-modules/start-services.sh index 5fb86ed0..0d42cc65 100644 --- a/offchain-modules/start-services.sh +++ b/offchain-modules/start-services.sh @@ -8,9 +8,11 @@ set -o xtrace export RUST_BACKTRACE=1 export RUST_LOG=info,force=debug +export FORCE_CONFIG_PATH=~/.force-bridge/config.toml PROJECT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && cd .. && pwd )" FORCE_CLI=${PROJECT_DIR}/offchain-modules/target/debug/force-eth-cli +SIGN_CLI=${PROJECT_DIR}/offchain-modules/target/debug/sign-server FORCE_LOG_PATH=~/.force-bridge/logs SQL_PATH="$PROJECT_DIR"/offchain-modules/lib/src/dapp/db/source/ DB_NAME=forcedb @@ -45,6 +47,19 @@ start_indexer() { ${FORCE_CLI} dapp eth-indexer --db-path ${DB_PATH} > ${FORCE_LOG_PATH}/eth-indexer.log 2>&1 & } +start_sign_server() { + cell_script=`cat ${FORCE_CONFIG_PATH}|grep "cell_script" | awk '{print $3}' | sed 's/\"//g'` + echo ${cell_script} + cd ${PROJECT_DIR}/offchain-modules/sign-server + cp ${PROJECT_DIR}/offchain-modules/target/debug/sign-server . + ${PROJECT_DIR}/offchain-modules/sign-server/sign-server run --cell-script ${cell_script}> ${FORCE_LOG_PATH}/sign-server-0.log 2>&1 & + ${PROJECT_DIR}/offchain-modules/sign-server/sign-server run --config-path conf_1/config.toml --listen-url 0.0.0.0:3032 --ckb-key-path conf_1/ckb_key --eth-key-path conf_1/eth_key --cell-script ${cell_script}> ${FORCE_LOG_PATH}/sign-server-1.log 2>&1 & + ${PROJECT_DIR}/offchain-modules/sign-server/sign-server run --config-path conf_2/config.toml --listen-url 0.0.0.0:3033 --ckb-key-path conf_2/ckb_key --eth-key-path conf_2/eth_key --cell-script ${cell_script}> ${FORCE_LOG_PATH}/sign-server-2.log 2>&1 & + sleep 5 +} + +start_sign_server + while [[ $# -gt 0 ]] do key="$1" @@ -66,9 +81,9 @@ cd ${PROJECT_DIR}/offchain-modules if [ "${FORCE_NETWORK}" = "" ] then # ${FORCE_CLI} init-ckb-light-contract -k 0 -f 500 -c 40000 --wait - ${FORCE_CLI} ckb-relay -k 1 --per-amount 5 --max-tx-count 10 --mutlisig-privkeys 0 > ${FORCE_LOG_PATH}/ckb-relayer.log 2>&1 & + ${FORCE_CLI} ckb-relay -k 1 --per-amount 5 --max-tx-count 10 --hosts http://127.0.0.1:3031 http://127.0.0.1:3032 http://127.0.0.1:3033 > ${FORCE_LOG_PATH}/ckb-relayer.log 2>&1 & # ${FORCE_CLI} init-multi-sign-address -k 1 --multi-address ckt1qyqyph8v9mclls35p6snlaxajeca97tc062sa5gahk ckt1qyqvsv5240xeh85wvnau2eky8pwrhh4jr8ts8vyj37 - ${FORCE_CLI} eth-relay -k 1 --multisig-privkeys 0 1 --confirm 5 --delay 30 > ${FORCE_LOG_PATH}/eth-relayer.log 2>&1 & + ${FORCE_CLI} eth-relay -k 1 --confirm 5 --delay 30 > ${FORCE_LOG_PATH}/eth-relayer.log 2>&1 & else # ${FORCE_CLI} init-ckb-light-contract --network "${FORCE_NETWORK}" -k 0 -f 500 -c 40000 --wait ${FORCE_CLI} ckb-relay --network "${FORCE_NETWORK}" -k 1 --per-amount 5 --max-tx-count 10 -mutlisig-privkeys 0 > ${FORCE_LOG_PATH}/ckb-relayer.log 2>&1 & @@ -76,6 +91,10 @@ else ${FORCE_CLI} eth-relay --network "${FORCE_NETWORK}" -k 1 > ${FORCE_LOG_PATH}/eth-relayer.log 2>&1 & fi +#sleep 10 +#cat ${FORCE_LOG_PATH}/eth-relayer.log +##echo ${eth_relayer_log} +#cat ${FORCE_LOG_PATH}/sign-server.log start_mysql sleep 10 start_indexer diff --git a/offchain-modules/stop-services.sh b/offchain-modules/stop-services.sh index 6110c7cf..a1fdf7e9 100644 --- a/offchain-modules/stop-services.sh +++ b/offchain-modules/stop-services.sh @@ -10,5 +10,6 @@ ps aux | grep 'dapp ckb-indexer' | grep -v grep | awk '{print $2}' | xargs kill ps aux | grep 'dapp ckb-tx-relayer' | grep -v grep | awk '{print $2}' | xargs kill -9 ps aux | grep 'dapp eth-tx-relayer' | grep -v grep | awk '{print $2}' | xargs kill -9 ps aux | grep 'dapp server' | grep -v grep | awk '{print $2}' | xargs kill -9 +ps aux | grep 'sign-server run' | grep -v grep | awk '{print $2}' | xargs kill -9 ps aux | grep 'dapp eth-header-indexer' | grep -v grep | awk '{print $2}' | xargs kill -9 ps aux | grep 'dapp ckb-header-indexer' | grep -v grep | awk '{print $2}' | xargs kill -9