diff --git a/Cargo.lock b/Cargo.lock index b2b4602f..d5daac9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -177,19 +177,21 @@ dependencies = [ [[package]] name = "alloy-evm" -version = "0.20.1" +version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0dbe7c66c859b658d879b22e8aaa19546dab726b0639f4649a424ada3d99349e" +checksum = "06a5f67ee74999aa4fe576a83be1996bdf74a30fce3d248bf2007d6fc7dae8aa" dependencies = [ "alloy-consensus", "alloy-eips", "alloy-hardforks", "alloy-primitives 1.4.0", + "alloy-rpc-types-engine", "alloy-rpc-types-eth", "alloy-sol-types", "auto_impl", "derive_more 2.0.1", "op-alloy-consensus", + "op-alloy-rpc-types-engine", "op-revm", "revm", "thiserror 2.0.17", @@ -2867,6 +2869,7 @@ dependencies = [ "pin-project-lite", "rand 0.9.2", "rbuilder-primitives", + "rbuilder-utils", "redb", "reqwest", "revm-interpreter", @@ -4543,9 +4546,9 @@ checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" [[package]] name = "op-alloy-consensus" -version = "0.19.1" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9ade20c592484ba1ea538006e0454284174447a3adf9bb59fa99ed512f95493" +checksum = "3a501241474c3118833d6195312ae7eb7cc90bbb0d5f524cbb0b06619e49ff67" dependencies = [ "alloy-consensus", "alloy-eips", @@ -4557,6 +4560,25 @@ dependencies = [ "thiserror 2.0.17", ] +[[package]] +name = "op-alloy-rpc-types-engine" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14e50c94013a1d036a529df259151991dbbd6cf8dc215e3b68b784f95eec60e6" +dependencies = [ + "alloy-consensus", + "alloy-eips", + "alloy-primitives 1.4.0", + "alloy-rlp", + "alloy-rpc-types-engine", + "derive_more 2.0.1", + "ethereum_ssz 0.9.1", + "ethereum_ssz_derive", + "op-alloy-consensus", + "snap", + "thiserror 2.0.17", +] + [[package]] name = "op-revm" version = "10.1.1" @@ -5345,7 +5367,7 @@ dependencies = [ [[package]] name = "rbuilder-primitives" version = "0.1.0" -source = "git+https://github.com/flashbots/rbuilder?rev=a21de88#a21de88b133a5604bd6c56e75543577e9992b4b9" +source = "git+https://github.com/flashbots/rbuilder?rev=332b00ceeb960cfa27278c020f0f8b299f928982#332b00ceeb960cfa27278c020f0f8b299f928982" dependencies = [ "ahash", "alloy-consensus", @@ -5387,6 +5409,45 @@ dependencies = [ "uuid", ] +[[package]] +name = "rbuilder-utils" +version = "0.1.0" +source = "git+https://github.com/flashbots/rbuilder?rev=332b00ceeb960cfa27278c020f0f8b299f928982#332b00ceeb960cfa27278c020f0f8b299f928982" +dependencies = [ + "ahash", + "alloy-primitives 1.4.0", + "auto_impl", + "clickhouse", + "clickhouse-derive 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "derivative", + "derive_more 2.0.1", + "dyn-clone", + "eyre", + "futures", + "futures-util", + "governor", + "integer-encoding 4.0.2", + "rand 0.9.2", + "redb", + "reqwest", + "reth-tasks", + "serde", + "serde_json", + "serde_with", + "sha2 0.10.9", + "strum", + "strum_macros", + "tempfile", + "thiserror 1.0.69", + "time", + "tokio", + "toml 0.8.23", + "tracing", + "tracing-futures", + "tracing-subscriber 0.3.20", + "uuid", +] + [[package]] name = "redb" version = "3.1.0" @@ -5526,8 +5587,8 @@ checksum = "6b3789b30bd25ba102de4beabd95d21ac45b69b1be7d14522bab988c526d6799" [[package]] name = "reth-chain-state" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "alloy-consensus", "alloy-eips", @@ -5552,8 +5613,8 @@ dependencies = [ [[package]] name = "reth-chainspec" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "alloy-chains", "alloy-consensus", @@ -5572,8 +5633,8 @@ dependencies = [ [[package]] name = "reth-codecs" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "alloy-consensus", "alloy-eips", @@ -5590,8 +5651,8 @@ dependencies = [ [[package]] name = "reth-codecs-derive" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "convert_case 0.7.1", "proc-macro2", @@ -5601,8 +5662,8 @@ dependencies = [ [[package]] name = "reth-consensus" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "alloy-consensus", "alloy-primitives 1.4.0", @@ -5614,8 +5675,8 @@ dependencies = [ [[package]] name = "reth-db-models" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "alloy-eips", "alloy-primitives 1.4.0", @@ -5624,8 +5685,8 @@ dependencies = [ [[package]] name = "reth-errors" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "reth-consensus", "reth-execution-errors", @@ -5635,8 +5696,8 @@ dependencies = [ [[package]] name = "reth-eth-wire-types" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "alloy-chains", "alloy-consensus", @@ -5656,8 +5717,8 @@ dependencies = [ [[package]] name = "reth-ethereum-forks" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "alloy-eip2124", "alloy-hardforks", @@ -5669,13 +5730,15 @@ dependencies = [ [[package]] name = "reth-ethereum-primitives" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "alloy-consensus", "alloy-eips", "alloy-primitives 1.4.0", "alloy-rlp", + "alloy-rpc-types-eth", + "alloy-serde", "modular-bitfield", "reth-codecs", "reth-primitives-traits", @@ -5686,8 +5749,8 @@ dependencies = [ [[package]] name = "reth-execution-errors" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "alloy-evm", "alloy-primitives 1.4.0", @@ -5699,8 +5762,8 @@ dependencies = [ [[package]] name = "reth-execution-types" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "alloy-consensus", "alloy-eips", @@ -5715,8 +5778,8 @@ dependencies = [ [[package]] name = "reth-fs-util" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "serde", "serde_json", @@ -5725,8 +5788,8 @@ dependencies = [ [[package]] name = "reth-metrics" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "metrics", "metrics-derive", @@ -5734,8 +5797,8 @@ dependencies = [ [[package]] name = "reth-network-peers" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "alloy-primitives 1.4.0", "alloy-rlp", @@ -5747,8 +5810,8 @@ dependencies = [ [[package]] name = "reth-primitives" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "alloy-consensus", "c-kzg", @@ -5761,8 +5824,8 @@ dependencies = [ [[package]] name = "reth-primitives-traits" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "alloy-consensus", "alloy-eips", @@ -5790,8 +5853,8 @@ dependencies = [ [[package]] name = "reth-prune-types" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "alloy-primitives 1.4.0", "derive_more 2.0.1", @@ -5800,8 +5863,8 @@ dependencies = [ [[package]] name = "reth-stages-types" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "alloy-primitives 1.4.0", "reth-trie-common", @@ -5809,8 +5872,8 @@ dependencies = [ [[package]] name = "reth-static-file-types" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "alloy-primitives 1.4.0", "derive_more 2.0.1", @@ -5820,8 +5883,8 @@ dependencies = [ [[package]] name = "reth-storage-api" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "alloy-consensus", "alloy-eips", @@ -5842,8 +5905,8 @@ dependencies = [ [[package]] name = "reth-storage-errors" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "alloy-eips", "alloy-primitives 1.4.0", @@ -5858,8 +5921,8 @@ dependencies = [ [[package]] name = "reth-tasks" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "auto_impl", "dyn-clone", @@ -5874,8 +5937,8 @@ dependencies = [ [[package]] name = "reth-transaction-pool" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "alloy-consensus", "alloy-eips", @@ -5913,8 +5976,8 @@ dependencies = [ [[package]] name = "reth-trie" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "alloy-consensus", "alloy-eips", @@ -5935,8 +5998,8 @@ dependencies = [ [[package]] name = "reth-trie-common" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "alloy-consensus", "alloy-primitives 1.4.0", @@ -5952,8 +6015,8 @@ dependencies = [ [[package]] name = "reth-trie-sparse" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "alloy-primitives 1.4.0", "alloy-rlp", @@ -5968,8 +6031,8 @@ dependencies = [ [[package]] name = "reth-zstd-compressors" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth?rev=0b316160a9915ac80c4ae867f69e304aca85ec01#0b316160a9915ac80c4ae867f69e304aca85ec01" +version = "1.8.2" +source = "git+https://github.com/paradigmxyz/reth?rev=9c30bf7af5e0d45deaf5917375c9922c16654b28#9c30bf7af5e0d45deaf5917375c9922c16654b28" dependencies = [ "zstd", ] @@ -6104,9 +6167,9 @@ dependencies = [ [[package]] name = "revm-inspectors" -version = "0.29.2" +version = "0.30.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fdb678b03faa678a7007a7c761a78efa9ca9adcd9434ef3d1ad894aec6e43d1" +checksum = "de23199c4b6181a6539e4131cf7e31cde4df05e1192bcdce491c34a511241588" dependencies = [ "alloy-primitives 1.4.0", "alloy-rpc-types-eth", @@ -6820,6 +6883,16 @@ dependencies = [ "cfg-if", "cpufeatures", "digest 0.10.7", + "sha2-asm", +] + +[[package]] +name = "sha2-asm" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b845214d6175804686b2bd482bcffe96651bb2d1200742b712003504a2dac1ab" +dependencies = [ + "cc", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index f02f583c..1bad841a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,10 @@ revm-primitives = { version = "20.2.1", default-features = false } revm-interpreter = { version = "25.0.2", default-features = false } # rbuilder -rbuilder-primitives = { git = "https://github.com/flashbots/rbuilder", rev = "a21de88" } +rbuilder-primitives = { git = "https://github.com/flashbots/rbuilder", rev = "332b00ceeb960cfa27278c020f0f8b299f928982" } +rbuilder-utils = { git = "https://github.com/flashbots/rbuilder", rev = "332b00ceeb960cfa27278c020f0f8b299f928982", features = ["test-utils"] } + + # rt tokio = { version = "1", default-features = false, features = [ diff --git a/rust-toolchain.toml b/rust-toolchain.toml index aadbe483..6ae25eea 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] channel = "stable" -version = "1.89.0" +version = "1.89.0" \ No newline at end of file diff --git a/src/cli.rs b/src/cli.rs index e7db4ca8..51e75e5c 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -3,15 +3,12 @@ use std::path::PathBuf; use alloy_primitives::Address; use alloy_signer_local::PrivateKeySigner; use clap::{Args, Parser, ValueHint}; +use rbuilder_utils::clickhouse::indexer::{ + default_disk_backup_database_path, MAX_DISK_BACKUP_SIZE_BYTES, MAX_MEMORY_BACKUP_SIZE_BYTES, +}; use crate::{ - indexer::{ - click::{ - default_disk_backup_database_path, MAX_DISK_BACKUP_SIZE_BYTES, - MAX_MEMORY_BACKUP_SIZE_BYTES, - }, - BUNDLE_RECEIPTS_TABLE_NAME, BUNDLE_TABLE_NAME, - }, + indexer::{BUNDLE_RECEIPTS_TABLE_NAME, BUNDLE_TABLE_NAME}, SystemBundleDecoder, }; diff --git a/src/forwarder.rs b/src/forwarder.rs index 4f81bcb8..96ec54b0 100644 --- a/src/forwarder.rs +++ b/src/forwarder.rs @@ -12,7 +12,6 @@ use crate::{ UtcInstant, WithEncoding, }, priority::{pchannel, Priority}, - tasks::TaskExecutor, utils::UtcDateTimeHeader as _, }; use alloy_primitives::Address; @@ -22,6 +21,7 @@ use axum::http::HeaderValue; use dashmap::DashMap; use futures::{stream::FuturesUnordered, StreamExt}; use hyper::{header::CONTENT_TYPE, HeaderMap, StatusCode}; +use rbuilder_utils::tasks::TaskExecutor; use reqwest::Url; use revm_primitives::keccak256; use serde_json::json; @@ -68,7 +68,7 @@ impl IngressForwarders { } /// Broadcast bundle to all forwarders. - pub fn broadcast_bundle(&self, bundle: SystemBundle) { + pub(crate) fn broadcast_bundle(&self, bundle: SystemBundle) { let encoded_bundle = bundle.encode(); // Create local request first diff --git a/src/indexer/click/backup.rs b/src/indexer/click/backup.rs deleted file mode 100644 index 7a33e7ae..00000000 --- a/src/indexer/click/backup.rs +++ /dev/null @@ -1,798 +0,0 @@ -use std::{ - collections::VecDeque, - marker::PhantomData, - path::PathBuf, - sync::{Arc, RwLock}, - time::{Duration, Instant, SystemTime, UNIX_EPOCH}, -}; - -use clickhouse::inserter::Inserter; -use derive_more::{Deref, DerefMut}; -use redb::{ReadableDatabase, ReadableTable, ReadableTableMetadata}; -use strum::AsRefStr; -use tokio::sync::mpsc; - -use crate::{ - indexer::click::{ - default_disk_backup_database_path, primitives::ClickhouseRowExt, ClickhouseIndexableOrder, - MAX_DISK_BACKUP_SIZE_BYTES, MAX_MEMORY_BACKUP_SIZE_BYTES, - }, - metrics::IndexerMetrics, - primitives::{backoff::BackoffInterval, Quantities}, - tasks::TaskExecutor, - utils::FormatBytes, -}; - -/// Tracing target for the backup actor. -const TARGET: &str = "indexer::backup"; - -/// A type alias for disk backup keys. -type DiskBackupKey = u128; -/// A type alias for disk backup tables. -type Table<'a> = redb::TableDefinition<'a, DiskBackupKey, Vec>; - -/// The source of a backed-up failed commit. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum BackupSource { - Disk(DiskBackupKey), - Memory, -} - -/// Generates a new unique key for disk backup entries, based on current system time in -/// milliseconds. -fn new_disk_backup_key() -> DiskBackupKey { - SystemTime::now().duration_since(UNIX_EPOCH).expect("time went backwards").as_micros() -} - -/// Represents data we failed to commit to clickhouse, including the rows and some information -/// about the size of such data. -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub(crate) struct FailedCommit { - /// The actual rows we were trying to commit. - rows: Vec, - /// The quantities related to such commit, like the total size in bytes. - quantities: Quantities, -} - -impl FailedCommit { - pub(crate) fn new(rows: Vec, quantities: Quantities) -> Self { - Self { rows, quantities } - } -} - -impl Default for FailedCommit { - fn default() -> Self { - Self { rows: Vec::new(), quantities: Quantities::ZERO } - } -} - -/// A [`FailedCommit`] along with its source (disk or memory). -struct RetrievedFailedCommit { - source: BackupSource, - commit: FailedCommit, -} - -/// A wrapper over a [`VecDeque`] of [`FailedCommit`] with added functionality. -/// -/// Newly failed commits are pushed to the front of the queue, so the oldest are at the back. -#[derive(Deref, DerefMut)] -struct FailedCommits(VecDeque>); - -impl FailedCommits { - /// Get the aggregated quantities of the failed commits; - #[inline] - fn quantities(&self) -> Quantities { - let total_size_bytes = self.iter().map(|c| c.quantities.bytes).sum::(); - let total_rows = self.iter().map(|c| c.quantities.rows).sum::(); - let total_transactions = self.iter().map(|c| c.quantities.transactions).sum::(); - - Quantities { bytes: total_size_bytes, rows: total_rows, transactions: total_transactions } - } -} - -impl Default for FailedCommits { - fn default() -> Self { - Self(VecDeque::default()) - } -} - -/// Configuration for the [`DiskBackup`] of failed commits. -#[derive(Debug)] -pub(crate) struct DiskBackupConfig { - /// The path where the backup database is stored. - path: PathBuf, - /// The maximum size in bytes for holding past failed commits on disk. - max_size_bytes: u64, - /// The interval at which buffered writes are flushed to disk. - flush_interval: tokio::time::Interval, -} - -impl DiskBackupConfig { - pub(crate) fn new() -> Self { - Self { - path: default_disk_backup_database_path().into(), - max_size_bytes: MAX_DISK_BACKUP_SIZE_BYTES, - flush_interval: tokio::time::interval(Duration::from_secs(30)), - } - } - - pub(crate) fn with_path>(mut self, path: Option

) -> Self { - if let Some(p) = path { - self.path = p.into(); - } - self - } - - pub(crate) fn with_max_size_bytes(mut self, max_size_bytes: Option) -> Self { - if let Some(max_size_bytes) = max_size_bytes { - self.max_size_bytes = max_size_bytes; - } - self - } - - #[allow(dead_code)] - pub(crate) fn with_immediate_commit_interval(mut self, interval: Option) -> Self { - if let Some(interval) = interval { - self.flush_interval = tokio::time::interval(interval); - } - self - } -} - -impl Clone for DiskBackupConfig { - fn clone(&self) -> Self { - Self { - path: self.path.clone(), - max_size_bytes: self.max_size_bytes, - flush_interval: tokio::time::interval(self.flush_interval.period()), - } - } -} - -#[derive(Debug, Clone, Copy)] -pub(crate) struct MemoryBackupConfig { - /// The maximum size in bytes for holding past failed commits in-memory. Once we go over this - /// threshold, pressure is applied and old commits are dropped. - pub max_size_bytes: u64, -} - -impl MemoryBackupConfig { - pub(crate) fn new(max_size_bytes: u64) -> Self { - Self { max_size_bytes } - } -} - -impl Default for MemoryBackupConfig { - fn default() -> Self { - Self { max_size_bytes: MAX_MEMORY_BACKUP_SIZE_BYTES } - } -} - -/// Data retrieved from disk, along with its key and some stats. -pub(crate) struct DiskRetrieval { - pub(crate) key: K, - pub(crate) value: V, - pub(crate) stats: BackupSourceStats, -} - -/// Errors that can occur during disk backup operations. Mostly wrapping redb and serde errors. -#[derive(Debug, thiserror::Error, AsRefStr)] -pub(crate) enum DiskBackupError { - #[error(transparent)] - Database(#[from] redb::DatabaseError), - #[error(transparent)] - Transactions(#[from] redb::TransactionError), - #[error(transparent)] - Table(#[from] redb::TableError), - #[error(transparent)] - Storage(#[from] redb::StorageError), - #[error(transparent)] - Commit(#[from] redb::CommitError), - #[error(transparent)] - Durability(#[from] redb::SetDurabilityError), - #[error(transparent)] - Compaction(#[from] redb::CompactionError), - #[error("serialization error: {0}")] - Serde(#[from] serde_json::Error), - #[error("backup size limit exceeded: {0} bytes")] - SizeExceeded(u64), - #[error("failed to join flushing task")] - JoinTask, -} - -/// A disk backup for failed commits. This handle to a database allows to write only to one table -/// for scoped access. If you want to write to another table, clone it using -/// [`Self::clone_with_table`]. -#[derive(Debug)] -pub(crate) struct DiskBackup { - db: Arc>, - config: DiskBackupConfig, - - _marker: PhantomData, -} - -impl DiskBackup { - pub(crate) fn new( - config: DiskBackupConfig, - task_executor: &TaskExecutor, - ) -> Result { - // Ensure all parent directories exist, so that the database can be initialized correctly. - if let Some(parent) = config.path.parent() { - std::fs::create_dir_all(parent)?; - } - - let db = redb::Database::create(&config.path)?; - - let disk_backup = - Self { db: Arc::new(RwLock::new(db)), config, _marker: Default::default() }; - - task_executor.spawn({ - let disk_backup: Self = disk_backup.clone(); - async move { - disk_backup.flush_routine().await; - } - }); - - Ok(disk_backup) - } - - /// Like `clone`, but allows to change the type parameter `U`. - pub(crate) fn clone_to(&self) -> DiskBackup { - DiskBackup { db: self.db.clone(), config: self.config.clone(), _marker: Default::default() } - } -} - -impl Clone for DiskBackup { - fn clone(&self) -> Self { - Self { db: self.db.clone(), config: self.config.clone(), _marker: Default::default() } - } -} - -impl DiskBackup { - /// Saves a new failed commit to disk. `commit_immediately` indicates whether to force - /// durability on write. - fn save(&mut self, data: &FailedCommit) -> Result { - let table_def = Table::new(T::ORDER); - // NOTE: not efficient, but we don't expect to store a lot of data here. - let bytes = serde_json::to_vec(&data)?; - - let writer = self.db.write().expect("not poisoned").begin_write()?; - let (stored_bytes, rows) = { - let mut table = writer.open_table(table_def)?; - if table.stats()?.stored_bytes() > self.config.max_size_bytes { - return Err(DiskBackupError::SizeExceeded(self.config.max_size_bytes)); - } - - table.insert(new_disk_backup_key(), bytes)?; - - (table.stats()?.stored_bytes(), table.len()?) - }; - writer.commit()?; - - Ok(BackupSourceStats { size_bytes: stored_bytes, total_batches: rows as usize }) - } - - /// Retrieves the oldest failed commit from disk, if any. - fn retrieve_oldest( - &mut self, - ) -> Result>>, DiskBackupError> { - let table_def = Table::new(T::ORDER); - - let reader = self.db.read().expect("not poisoned").begin_read()?; - let table = match reader.open_table(table_def) { - Ok(t) => t, - Err(redb::TableError::TableDoesNotExist(_)) => { - // No table means no data. - return Ok(None); - } - Err(e) => { - return Err(e.into()); - } - }; - - let stored_bytes = table.stats()?.stored_bytes(); - let rows = table.len()? as usize; - let stats = BackupSourceStats { size_bytes: stored_bytes, total_batches: rows }; - - // Retreives in sorted order. - let Some(entry_res) = table.iter()?.next() else { - return Ok(None); - }; - let (key, rows_raw) = entry_res?; - let commit: FailedCommit = serde_json::from_slice(&rows_raw.value())?; - - Ok(Some(DiskRetrieval { key: key.value(), value: commit, stats })) - } - - /// Deletes the failed commit with the given key from disk. - fn delete(&mut self, key: DiskBackupKey) -> Result { - let table_def = Table::new(T::ORDER); - - let mut writer = self.db.write().expect("not poisoned").begin_write()?; - writer.set_durability(redb::Durability::Immediate)?; - - let (stored_bytes, rows) = { - let mut table = writer.open_table(table_def)?; - table.remove(key)?; - (table.stats()?.stored_bytes(), table.len()?) - }; - writer.commit()?; - - Ok(BackupSourceStats { size_bytes: stored_bytes, total_batches: rows as usize }) - } - - /// Explicity flushes any pending writes to disk. This is async to avoid blocking the main - /// thread. - async fn flush(&mut self) -> Result<(), DiskBackupError> { - let db = self.db.clone(); - - // Since this can easily block by a second or two, send it to a blocking thread. - tokio::task::spawn_blocking(move || { - let mut db = db.write().expect("not poisoned"); - let mut writer = db.begin_write()?; - - // If there is no data to flush, don't do anything. - if writer.stats()?.stored_bytes() == 0 { - return Ok(()); - } - - writer.set_durability(redb::Durability::Immediate)?; - writer.commit()?; - - db.compact()?; - Ok(()) - }) - .await - .map_err(|_| DiskBackupError::JoinTask)? - } - - /// Takes an instance of self and performs a flush routine if the immediate flush interval has - /// ticked. - async fn flush_routine(mut self) { - loop { - self.config.flush_interval.tick().await; - let start = Instant::now(); - match self.flush().await { - Ok(_) => { - tracing::debug!(target: TARGET, elapsed = ?start.elapsed(), "flushed backup write buffer to disk"); - } - Err(e) => { - tracing::error!(target: TARGET, ?e, "failed to flush backup write buffer to disk"); - } - } - } - } -} - -/// Statistics about the Clickhouse data stored in a certain backup source (disk or memory). -#[derive(Debug, Clone, Copy, Default)] -pub(crate) struct BackupSourceStats { - /// The total size in bytes of failed commit batches stored. - size_bytes: u64, - /// The total number of failed commit batches stored. - total_batches: usize, -} - -/// An in-memory backup for failed commits. -#[derive(Deref, DerefMut)] -struct MemoryBackup { - /// The in-memory cache of failed commits. - #[deref] - #[deref_mut] - failed_commits: FailedCommits, - /// The configuration for the in-memory backup. - config: MemoryBackupConfig, - /// The statistics about the in-memory backup. - stats: BackupSourceStats, -} - -impl MemoryBackup { - /// Updates the internal statistics and returns them. - fn update_stats(&mut self) -> BackupSourceStats { - let quantities = self.failed_commits.quantities(); - let new_len = self.failed_commits.len(); - - self.stats = BackupSourceStats { size_bytes: quantities.bytes, total_batches: new_len }; - self.stats - } - - /// Checks whether the threshold for maximum size has been exceeded. - fn threshold_exceeded(&self) -> bool { - self.stats.size_bytes > self.config.max_size_bytes && self.failed_commits.len() > 1 - } - - /// Drops the oldest failed commit if the threshold has been exceeded, returning the updated - /// stats - fn drop_excess(&mut self) -> Option<(BackupSourceStats, Quantities)> { - if self.threshold_exceeded() { - self.failed_commits.pop_back(); - Some((self.update_stats(), self.failed_commits.quantities())) - } else { - None - } - } - - /// Saves a new failed commit into memory, updating the stats. - fn save(&mut self, data: FailedCommit) -> BackupSourceStats { - self.failed_commits.push_front(data); - self.update_stats() - } - - /// Retrieves the oldest failed commit from memory, updating the stats. - fn retrieve_oldest(&mut self) -> Option> { - let oldest = self.failed_commits.pop_back(); - self.update_stats(); - oldest - } -} - -// Needed otherwise requires T: Default -impl Default for MemoryBackup { - fn default() -> Self { - Self { - failed_commits: FailedCommits::default(), - config: MemoryBackupConfig::default(), - stats: BackupSourceStats::default(), - } - } -} - -/// An backup actor for Clickhouse data. This actor receives [`FailedCommit`]s and saves them on -/// disk and in memory in case of failure of the former, and periodically tries to commit them back -/// again to Clickhouse. Since memory is finite, there is an upper bound on how much memory this -/// data structure holds. Once this has been hit, pressure applies, meaning that we try again a -/// certain failed commit for a finite number of times, and then we discard it to accomdate new -/// data. -pub(crate) struct Backup { - /// The receiver of failed commit attempts. - /// - /// Rationale for sending multiple rows instead of sending rows: the backup abstraction must - /// periodically block to write data to the inserter and try to commit it to clickhouse. Each - /// attempt results in doing the previous step. This could clog the channel which will receive - /// individual rows, leading to potential row losses. - /// - /// By sending backup data less often, we give time gaps for these operation to be performed. - rx: mpsc::Receiver>, - /// The disk cache of failed commits. - disk_backup: DiskBackup, - /// The in-memory cache of failed commits. - memory_backup: MemoryBackup, - /// A clickhouse inserter for committing again the data. - inserter: Inserter, - /// The interval at which we try to backup data. - interval: BackoffInterval, - - /// A failed commit retrieved from either disk or memory, waiting to be retried. - last_cached: Option>, - - /// Whether to use only the in-memory backup (for testing purposes). - #[cfg(test)] - use_only_memory_backup: bool, -} - -impl Backup { - pub(crate) fn new( - rx: mpsc::Receiver>, - inserter: Inserter, - disk_backup: DiskBackup, - ) -> Self { - Self { - rx, - inserter, - interval: Default::default(), - memory_backup: MemoryBackup::default(), - disk_backup, - last_cached: None, - #[cfg(test)] - use_only_memory_backup: false, - } - } - - /// Override the default memory backup configuration. - pub(crate) fn with_memory_backup_config(mut self, config: MemoryBackupConfig) -> Self { - self.memory_backup.config = config; - self - } - - /// Backs up a failed commit, first trying to write to disk, then to memory. - fn backup(&mut self, failed_commit: FailedCommit) { - let quantities = failed_commit.quantities; - tracing::debug!(target: TARGET, order = T::ORDER, bytes = ?quantities.bytes, rows = ?quantities.rows, "backing up failed commit"); - - #[cfg(test)] - if self.use_only_memory_backup { - self.memory_backup.save(failed_commit); - self.last_cached = - self.last_cached.take().filter(|cached| cached.source != BackupSource::Memory); - return; - } - - let start = Instant::now(); - match self.disk_backup.save(&failed_commit) { - Ok(stats) => { - tracing::debug!(target: TARGET, order = T::ORDER, total_size = stats.size_bytes.format_bytes(), elapsed = ?start.elapsed(), "saved failed commit to disk"); - IndexerMetrics::set_clickhouse_disk_backup_size( - stats.size_bytes, - stats.total_batches, - T::ORDER, - ); - - return; - } - Err(e) => { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to write commit, trying in-memory"); - IndexerMetrics::increment_clickhouse_backup_disk_errors(T::ORDER, e.as_ref()); - } - }; - - let stats = self.memory_backup.save(failed_commit); - IndexerMetrics::set_clickhouse_memory_backup_size( - stats.size_bytes, - stats.total_batches, - T::ORDER, - ); - tracing::debug!(target: TARGET, order = T::ORDER, bytes = ?quantities.bytes, rows = ?quantities.rows, ?stats, "saved failed commit in-memory"); - - if let Some((stats, oldest_quantities)) = self.memory_backup.drop_excess() { - tracing::warn!(target: TARGET, order = T::ORDER, ?stats, "failed commits exceeded max memory backup size, dropping oldest"); - IndexerMetrics::process_clickhouse_backup_data_lost_quantities(&oldest_quantities); - // Clear the cached last commit if it was from memory and we just dropped it. - self.last_cached = - self.last_cached.take().filter(|cached| cached.source != BackupSource::Memory); - } - } - - /// Retrieves the oldest failed commit, first trying from memory, then from disk. - fn retrieve_oldest(&mut self) -> Option> { - if let Some(cached) = self.last_cached.take() { - tracing::debug!(target: TARGET, order = T::ORDER, rows = cached.commit.rows.len(), "retrieved last cached failed commit"); - return Some(cached); - } - - if let Some(commit) = self.memory_backup.retrieve_oldest() { - tracing::debug!(target: TARGET, order = T::ORDER, rows = commit.rows.len(), "retrieved oldest failed commit from memory"); - return Some(RetrievedFailedCommit { source: BackupSource::Memory, commit }); - } - - match self.disk_backup.retrieve_oldest() { - Ok(maybe_commit) => { - maybe_commit.inspect(|data| { - tracing::debug!(target: TARGET, order = T::ORDER, rows = data.stats.total_batches, "retrieved oldest failed commit from disk"); - }) - .map(|data| RetrievedFailedCommit { - source: BackupSource::Disk(data.key), - commit: data.value, - }) - } - Err(e) => { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to retrieve oldest failed commit from disk"); - IndexerMetrics::increment_clickhouse_backup_disk_errors(T::ORDER, e.as_ref()); - None - } - } - } - - /// Populates the inserter with the rows from the given failed commit. - async fn populate_inserter(&mut self, commit: &FailedCommit) { - for row in &commit.rows { - let value_ref = T::to_row_ref(row); - - if let Err(e) = self.inserter.write(value_ref).await { - IndexerMetrics::increment_clickhouse_write_failures(e.to_string()); - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to write to backup inserter"); - continue; - } - } - } - - /// Purges a committed failed commit from disk, if applicable. - async fn purge_commit(&mut self, retrieved: &RetrievedFailedCommit) { - if let BackupSource::Disk(key) = retrieved.source { - let start = Instant::now(); - match self.disk_backup.delete(key) { - Ok(stats) => { - tracing::debug!(target: TARGET, order = T::ORDER, total_size = stats.size_bytes.format_bytes(), elapsed = ?start.elapsed(), "deleted failed commit from disk"); - IndexerMetrics::set_clickhouse_disk_backup_size( - stats.size_bytes, - stats.total_batches, - T::ORDER, - ); - } - Err(e) => { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to purge failed commit from disk"); - } - } - tracing::debug!(target: TARGET, order = T::ORDER, "purged committed failed commit from disk"); - } - } - - /// Run the backup actor until it is possible to receive messages. - /// - /// If some data were stored on disk previously, they will be retried first. - pub(crate) async fn run(&mut self) { - loop { - tokio::select! { - maybe_failed_commit = self.rx.recv() => { - let Some(failed_commit) = maybe_failed_commit else { - tracing::error!(target: TARGET, order = T::ORDER, "backup channel closed"); - break; - }; - - self.backup(failed_commit); - } - _ = self.interval.tick() => { - let Some(oldest) = self.retrieve_oldest() else { - self.interval.reset(); - IndexerMetrics::set_clickhouse_backup_empty_size(T::ORDER); - continue // Nothing to do! - }; - - self.populate_inserter(&oldest.commit).await; - - let start = Instant::now(); - match self.inserter.force_commit().await { - Ok(quantities) => { - tracing::info!(target: TARGET, order = T::ORDER, ?quantities, "successfully backed up"); - IndexerMetrics::process_clickhouse_backup_data_quantities(&quantities.into()); - IndexerMetrics::record_clickhouse_batch_commit_time(start.elapsed()); - self.interval.reset(); - self.purge_commit(&oldest).await; - } - Err(e) => { - tracing::error!(target: TARGET, order = T::ORDER, ?e, quantities = ?oldest.commit.quantities, "failed to commit bundle to clickhouse from backup"); - IndexerMetrics::increment_clickhouse_commit_failures(e.to_string()); - self.last_cached = Some(oldest); - continue; - } - } - } - } - } - } - - /// To call on shutdown, tries make a last-resort attempt to post back to Clickhouse all - /// in-memory data. - pub(crate) async fn end(mut self) { - for failed_commit in self.memory_backup.failed_commits.drain(..) { - for row in &failed_commit.rows { - let value_ref = T::to_row_ref(row); - - if let Err(e) = self.inserter.write(value_ref).await { - tracing::error!( target: TARGET, order = T::ORDER, ?e, "failed to write to backup inserter during shutdown"); - IndexerMetrics::increment_clickhouse_write_failures(e.to_string()); - continue; - } - } - if let Err(e) = self.inserter.force_commit().await { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to commit backup to CH during shutdown, trying disk"); - IndexerMetrics::increment_clickhouse_commit_failures(e.to_string()); - } - - if let Err(e) = self.disk_backup.save(&failed_commit) { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to write commit to disk backup during shutdown"); - IndexerMetrics::increment_clickhouse_backup_disk_errors(T::ORDER, e.as_ref()); - } - } - - if let Err(e) = self.disk_backup.flush().await { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to flush disk backup during shutdown"); - IndexerMetrics::increment_clickhouse_backup_disk_errors(T::ORDER, e.as_ref()); - } else { - tracing::info!(target: TARGET, order = T::ORDER, "flushed disk backup during shutdown"); - } - - if let Err(e) = self.inserter.end().await { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to end backup inserter during shutdown"); - } else { - tracing::info!(target: TARGET, order = T::ORDER, "successfully ended backup inserter during shutdown"); - } - } -} - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use super::*; - - use crate::{ - indexer::{ - click::{ - models::BundleRow, - tests::{create_clickhouse_bundles_table, create_test_clickhouse_client}, - }, - tests::system_bundle_example, - BUNDLE_TABLE_NAME, - }, - spawn_clickhouse_backup, - tasks::TaskManager, - }; - - // Uncomment to enable logging during tests. - // use tracing::level_filters::LevelFilter; - // use tracing_subscriber::{layer::SubscriberExt as _, util::SubscriberInitExt as _, EnvFilter}; - - impl Backup { - fn new_test( - rx: mpsc::Receiver>, - inserter: Inserter, - disk_backup: DiskBackup, - use_only_memory_backup: bool, - ) -> Self { - Self { - rx, - inserter, - interval: Default::default(), - memory_backup: MemoryBackup::default(), - disk_backup, - last_cached: None, - use_only_memory_backup, - } - } - } - - #[tokio::test(flavor = "multi_thread")] - async fn backup_e2e_works() { - // Uncomment to toggle logs. - // let registry = tracing_subscriber::registry().with( - // EnvFilter::builder().with_default_directive(LevelFilter::DEBUG.into()). - // from_env_lossy(), ); - // let _ = registry.with(tracing_subscriber::fmt::layer()).try_init(); - - let memory_backup_only = [false, true]; - - let task_manager = TaskManager::new(tokio::runtime::Handle::current()); - let task_executor = task_manager.executor(); - - for use_memory_only in memory_backup_only { - println!( - "---- Running backup_memory_e2e_works with use_memory_only = {use_memory_only} ----" - ); - - // 1. Spin up Clickhouse. No validation because we're testing both receipts and bundles, - // and validation on U256 is not supported. - let (image, client, _) = create_test_clickhouse_client(false).await.unwrap(); - create_clickhouse_bundles_table(&client).await.unwrap(); - - let tempfile = tempfile::NamedTempFile::new().unwrap(); - - let disk_backup = DiskBackup::new( - DiskBackupConfig::new().with_path(tempfile.path().to_path_buf().into()), - &task_executor, - ) - .expect("could not create disk backup"); - - let (tx, rx) = mpsc::channel(128); - let mut bundle_backup = Backup::::new_test( - rx, - client - .inserter(BUNDLE_TABLE_NAME) - .with_timeouts(Some(Duration::from_secs(2)), Some(Duration::from_secs(12))), - disk_backup, - use_memory_only, - ); - - spawn_clickhouse_backup!(task_executor, bundle_backup, "bundles"); - - let quantities = Quantities { bytes: 512, rows: 1, transactions: 1 }; // approximated - let bundle_row: BundleRow = (system_bundle_example(), "buildernet".to_string()).into(); - let bundle_rows = Vec::from([bundle_row]); - let failed_commit = FailedCommit::::new(bundle_rows.clone(), quantities); - - tx.send(failed_commit).await.unwrap(); - // Wait some time to let the backup process it - tokio::time::sleep(Duration::from_millis(100)).await; - - let results = client - .query(&format!("select * from {BUNDLE_TABLE_NAME}")) - .fetch_all::() - .await - .unwrap(); - - assert_eq!(results.len(), 1); - assert_eq!(bundle_rows, results, "expected, got"); - - drop(image); - } - } -} diff --git a/src/indexer/click/macros.rs b/src/indexer/click/macros.rs deleted file mode 100644 index ea51900c..00000000 --- a/src/indexer/click/macros.rs +++ /dev/null @@ -1,54 +0,0 @@ -//! Helpful macros spawning clickhouse indexer tasks. - -// Rationale: a simple text-replacement macro was much more effective compared to fighting the -// compiler with additional trait bounds on the [`clickhouse::Row`] trait. - -#[macro_export] -macro_rules! spawn_clickhouse_inserter { - ($executor:ident, $runner:ident, $name:expr) => {{ - $executor.spawn_with_graceful_shutdown_signal(|shutdown| async move { - let mut shutdown_guard = None; - tokio::select! { - _ = $runner.run_loop() => { - tracing::info!(target: TARGET, "clickhouse {} indexer channel closed", $name); - } - guard = shutdown => { - tracing::info!(target: TARGET, "Received shutdown for {} indexer, performing cleanup", $name); - shutdown_guard = Some(guard); - }, - } - - match $runner.inserter.end().await { - Ok(quantities) => { - tracing::info!(target: TARGET, ?quantities, "finalized clickhouse {} inserter", $name); - } - Err(e) => { - tracing::error!(target: TARGET, ?e, "failed to write end insertion of {} to indexer", $name); - } - } - - drop(shutdown_guard); - }); - }}; -} - -#[macro_export] -macro_rules! spawn_clickhouse_backup { - ($executor:ident, $backup:ident, $name: expr) => {{ - $executor.spawn_with_graceful_shutdown_signal(|shutdown| async move { - let mut shutdown_guard = None; - tokio::select! { - _ = $backup.run() => { - tracing::info!(target: TARGET, "clickhouse {} backup channel closed", $name); - } - guard = shutdown => { - tracing::info!(target: TARGET, "Received shutdown for {} backup, performing cleanup", $name); - shutdown_guard = Some(guard); - }, - } - - $backup.end().await; - drop(shutdown_guard); - }); - }}; -} diff --git a/src/indexer/click/mod.rs b/src/indexer/click/mod.rs index 44a4aa05..1f7765d3 100644 --- a/src/indexer/click/mod.rs +++ b/src/indexer/click/mod.rs @@ -1,223 +1,84 @@ //! Indexing functionality powered by Clickhouse. -use std::{ - fmt::Debug, - time::{Duration, Instant}, -}; +use std::{fmt::Debug, time::Duration}; -use clickhouse::{ - error::Result as ClickhouseResult, inserter::Inserter, Client as ClickhouseClient, Row, +use rbuilder_utils::{ + clickhouse::{ + backup::{Backup, DiskBackup, DiskBackupConfig, MemoryBackupConfig}, + indexer::{default_inserter, ClickhouseClientConfig, ClickhouseInserter, InserterRunner}, + Quantities, + }, + spawn_clickhouse_backup, spawn_clickhouse_inserter, + tasks::TaskExecutor, }; use tokio::sync::mpsc; use crate::{ cli::ClickhouseArgs, indexer::{ - click::{ - backup::{Backup, DiskBackup, DiskBackupConfig, FailedCommit, MemoryBackupConfig}, - models::{BundleReceiptRow, BundleRow}, - primitives::{ClickhouseIndexableOrder, ClickhouseRowExt}, - }, + click::models::{BundleReceiptRow, BundleRow}, OrderReceivers, TARGET, }, metrics::IndexerMetrics, - primitives::{Quantities, Sampler}, - spawn_clickhouse_backup, spawn_clickhouse_inserter, - tasks::TaskExecutor, }; -mod backup; -mod macros; mod models; -pub(crate) mod primitives; - -/// A default maximum size in bytes for the in-memory backup of failed commits. -pub(crate) const MAX_MEMORY_BACKUP_SIZE_BYTES: u64 = 1024 * 1024 * 1024; // 1 GiB -/// A default maximum size in bytes for the disk backup of failed commits. -pub(crate) const MAX_DISK_BACKUP_SIZE_BYTES: u64 = 10 * 1024 * 1024 * 1024; // 10 GiB - -/// The default path where the backup database is stored. For tests, a temporary file is used. -pub(crate) fn default_disk_backup_database_path() -> String { - #[cfg(test)] - return tempfile::NamedTempFile::new().unwrap().path().to_string_lossy().to_string(); - #[cfg(not(test))] - { - use std::path::PathBuf; - - let home = std::env::var("HOME").unwrap_or_else(|_| ".".to_string()); - PathBuf::from(home) - .join(".flowproxy") - .join("clickhouse_backup.db") - .to_string_lossy() - .to_string() - } -} -/// An clickhouse inserter with some sane defaults. -fn default_inserter(client: &ClickhouseClient, table_name: &str) -> Inserter { - // TODO: make this configurable. - let send_timeout = Duration::from_secs(2); - let end_timeout = Duration::from_secs(3); - - client - .inserter::(table_name) - .with_period(Some(Duration::from_secs(4))) // Dump every 4s - .with_period_bias(0.1) // 4±(0.1*4) - .with_max_bytes(128 * 1024 * 1024) // 128MiB - .with_max_rows(65_536) - .with_timeouts(Some(send_timeout), Some(end_timeout)) +fn config_from_clickhouse_args(args: &ClickhouseArgs, validation: bool) -> ClickhouseClientConfig { + ClickhouseClientConfig { + host: args.host.clone().expect("host is set"), + database: args.database.clone().expect("database is set"), + username: args.username.clone().expect("username is set"), + password: args.password.clone().expect("password is set"), + validation, + } } -/// A wrapper over a Clickhouse [`Inserter`] that supports a backup mechanism. -struct ClickhouseInserter { - /// The inner Clickhouse inserter client. - inner: Inserter, - /// A small in-memory backup of the current data we're trying to commit. In case this fails to - /// be inserted into Clickhouse, it is sent to the backup actor. - rows_backup: Vec, - /// The channel where to send data to be backed up. - backup_tx: mpsc::Sender>, -} +struct Metrics {} -impl ClickhouseInserter { - fn new(inner: Inserter, backup_tx: mpsc::Sender>) -> Self { - let rows_backup = Vec::new(); - Self { inner, rows_backup, backup_tx } +impl rbuilder_utils::clickhouse::backup::metrics::Metrics for Metrics { + fn increment_write_failures(err: String) { + IndexerMetrics::increment_clickhouse_write_failures(err); } - /// Writes the provided order into the inner Clickhouse writer buffer. - async fn write(&mut self, row: T) { - let hash = row.hash(); - let value_ref = ClickhouseRowExt::to_row_ref(&row); - - if let Err(e) = self.inner.write(value_ref).await { - IndexerMetrics::increment_clickhouse_write_failures(e.to_string()); - tracing::error!(target: TARGET, order = T::ORDER, ?e, %hash, "failed to write to clickhouse inserter"); - return; - } - - // NOTE: we don't backup if writing failes. The reason is that if this fails, then the same - // writing to the backup inserter should fail. - self.rows_backup.push(row); + fn process_quantities(quantities: &Quantities) { + IndexerMetrics::process_clickhouse_quantities(quantities); } - /// Tries to commit to Clickhouse if the conditions are met. In case of failures, data is sent - /// to the backup actor for retries. - async fn commit(&mut self) { - let pending = self.inner.pending().clone().into(); // This is cheap to clone. - - let start = Instant::now(); - match self.inner.commit().await { - Ok(quantities) => { - if quantities == Quantities::ZERO.into() { - tracing::trace!(target: TARGET, order = T::ORDER, "committed to inserter"); - } else { - tracing::debug!(target: TARGET, order = T::ORDER, ?quantities, "inserted batch to clickhouse"); - IndexerMetrics::process_clickhouse_quantities(&quantities.into()); - IndexerMetrics::record_clickhouse_batch_commit_time(start.elapsed()); - // Clear the backup rows. - self.rows_backup.clear(); - } - } - Err(e) => { - IndexerMetrics::increment_clickhouse_commit_failures(e.to_string()); - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to commit bundle to clickhouse"); - - let rows = std::mem::take(&mut self.rows_backup); - let failed_commit = FailedCommit::new(rows, pending); - - if let Err(e) = self.backup_tx.try_send(failed_commit) { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to send rows backup"); - } - } - } + fn record_batch_commit_time(duration: Duration) { + IndexerMetrics::record_clickhouse_batch_commit_time(duration); } - /// Ends the current `INSERT` and whole `Inserter` unconditionally. - async fn end(self) -> ClickhouseResult { - self.inner.end().await.map(Into::into) + fn increment_commit_failures(err: String) { + IndexerMetrics::increment_clickhouse_commit_failures(err); } -} -impl std::fmt::Debug for ClickhouseInserter { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ClickhouseInserter") - .field("inserter", &T::ORDER.to_string()) - .field("rows_backup_len", &self.rows_backup.len()) - .finish() + fn set_queue_size(size: usize, order: &'static str) { + IndexerMetrics::set_clickhouse_queue_size(size, order); } -} -/// A long-lived actor to run a [`ClickhouseIndexer`] until it possible to receive new order to -/// index. -struct InserterRunner { - /// The channel from which we can receive new orders to index. - rx: mpsc::Receiver, - /// The underlying Clickhouse inserter. - inserter: ClickhouseInserter, - /// The name of the local operator to use when adding data to clickhouse. - builder_name: String, -} + fn set_disk_backup_size(size_bytes: u64, batches: usize, order: &'static str) { + IndexerMetrics::set_clickhouse_disk_backup_size(size_bytes, batches, order); + } -impl InserterRunner { - fn new( - rx: mpsc::Receiver, - inserter: ClickhouseInserter, - builder_name: String, - ) -> Self { - Self { rx, inserter, builder_name } + fn increment_backup_disk_errors(order: &'static str, error: &str) { + IndexerMetrics::increment_clickhouse_backup_disk_errors(order, error); } - /// Run the inserter until it is possible to receive new orders. - async fn run_loop(&mut self) { - let mut sampler = Sampler::default() - .with_sample_size(self.rx.capacity() / 2) - .with_interval(Duration::from_secs(4)); - - while let Some(order) = self.rx.recv().await { - tracing::trace!(target: TARGET, order = T::ORDER, hash = %order.hash(), "received data to index"); - sampler.sample(|| { - IndexerMetrics::set_clickhouse_queue_size(self.rx.len(), T::ORDER); - }); - - let row = order.to_row(self.builder_name.clone()); - self.inserter.write(row).await; - self.inserter.commit().await; - } - tracing::error!(target: TARGET, order = T::ORDER, "tx channel closed, indexer will stop running"); + fn set_memory_backup_size(size_bytes: u64, batches: usize, order: &'static str) { + IndexerMetrics::set_clickhouse_memory_backup_size(size_bytes, batches, order); } -} -/// The configuration used in a [`ClickhouseClient`]. -#[derive(Debug, Clone)] -pub(crate) struct ClickhouseClientConfig { - host: String, - database: String, - username: String, - password: String, - validation: bool, -} + fn process_backup_data_lost_quantities(quantities: &Quantities) { + IndexerMetrics::process_clickhouse_backup_data_lost_quantities(quantities); + } -impl ClickhouseClientConfig { - fn new(args: &ClickhouseArgs, validation: bool) -> Self { - Self { - host: args.host.clone().expect("host is set"), - database: args.database.clone().expect("database is set"), - username: args.username.clone().expect("username is set"), - password: args.password.clone().expect("password is set"), - validation, - } + fn process_backup_data_quantities(quantities: &Quantities) { + IndexerMetrics::process_clickhouse_backup_data_quantities(quantities); } -} -impl From for ClickhouseClient { - fn from(config: ClickhouseClientConfig) -> Self { - ClickhouseClient::default() - .with_url(config.host) - .with_database(config.database) - .with_user(config.username) - .with_password(config.password) - .with_validation(config.validation) + fn set_backup_empty_size(order: &'static str) { + IndexerMetrics::set_clickhouse_backup_empty_size(order); } } @@ -237,7 +98,7 @@ impl ClickhouseIndexer { task_executor: TaskExecutor, validation: bool, ) { - let client = ClickhouseClientConfig::new(&args, validation).into(); + let client = config_from_clickhouse_args(&args, validation).into(); tracing::info!("Running with clickhouse indexer"); let (bundles_table_name, bundle_receipts_table_name) = @@ -256,11 +117,11 @@ impl ClickhouseIndexer { let (tx, rx) = mpsc::channel(128); let bundle_inserter = default_inserter(&client, &bundles_table_name); - let bundle_inserter = ClickhouseInserter::new(bundle_inserter, tx); + let bundle_inserter = ClickhouseInserter::<_, Metrics>::new(bundle_inserter, tx); let mut bundle_inserter_runner = InserterRunner::new(bundle_rx, bundle_inserter, builder_name.clone()); - let mut bundle_backup = Backup::::new( + let mut bundle_backup = Backup::::new( rx, client .inserter(&bundles_table_name) @@ -271,10 +132,11 @@ impl ClickhouseIndexer { let (tx, rx) = mpsc::channel(128); let bundle_receipt_inserter = default_inserter(&client, &bundle_receipts_table_name); - let bundle_receipt_inserter = ClickhouseInserter::new(bundle_receipt_inserter, tx); + let bundle_receipt_inserter = + ClickhouseInserter::<_, Metrics>::new(bundle_receipt_inserter, tx); let mut bundle_receipt_inserter_runner = InserterRunner::new(bundle_receipt_rx, bundle_receipt_inserter, builder_name); - let mut bundle_receipt_backup = Backup::::new( + let mut bundle_receipt_backup = Backup::::new( rx, client .inserter(&bundle_receipts_table_name) @@ -283,23 +145,15 @@ impl ClickhouseIndexer { ) .with_memory_backup_config(MemoryBackupConfig::new(memory_backup_max_size_bytes)); - spawn_clickhouse_inserter!(task_executor, bundle_inserter_runner, "bundles"); - spawn_clickhouse_backup!(task_executor, bundle_backup, "bundles"); + spawn_clickhouse_inserter!(task_executor, bundle_inserter_runner, "bundles", TARGET); + spawn_clickhouse_backup!(task_executor, bundle_backup, "bundles", TARGET); spawn_clickhouse_inserter!( task_executor, bundle_receipt_inserter_runner, - "bundle receipts" + "bundle receipts", + TARGET ); - spawn_clickhouse_backup!(task_executor, bundle_receipt_backup, "bundle receipts"); - } -} - -impl std::fmt::Debug for InserterRunner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("InserterRunner") - .field("inserter", &T::ORDER.to_string()) - .field("rx", &self.rx) - .finish() + spawn_clickhouse_backup!(task_executor, bundle_receipt_backup, "bundle receipts", TARGET); } } @@ -311,16 +165,23 @@ pub(crate) mod tests { cli::ClickhouseArgs, indexer::{ click::{ - default_disk_backup_database_path, models::{BundleReceiptRow, BundleRow}, ClickhouseClientConfig, ClickhouseIndexer, }, tests::{bundle_receipt_example, system_bundle_example}, - OrderSenders, BUNDLE_RECEIPTS_TABLE_NAME, BUNDLE_TABLE_NAME, + OrderSenders, BUNDLE_RECEIPTS_TABLE_NAME, BUNDLE_TABLE_NAME, TARGET, }, - tasks::TaskManager, }; use clickhouse::{error::Result as ClickhouseResult, Client as ClickhouseClient}; + use rbuilder_utils::{ + clickhouse::{ + backup::{metrics::NullMetrics, Backup, DiskBackup, DiskBackupConfig, FailedCommit}, + indexer::default_disk_backup_database_path, + Quantities, + }, + spawn_clickhouse_backup, + tasks::TaskManager, + }; use testcontainers::{ core::{ error::Result as TestcontainersResult, wait::HttpWaitStrategy, ContainerPort, WaitFor, @@ -328,7 +189,7 @@ pub(crate) mod tests { runners::AsyncRunner as _, ContainerAsync, Image, }; - use tokio::runtime::Handle; + use tokio::{runtime::Handle, sync::mpsc}; // Uncomment to enable logging during tests. // use tracing::level_filters::LevelFilter; @@ -619,4 +480,73 @@ pub(crate) mod tests { drop(image); } + + // Uncomment to enable logging during tests. + // use tracing::level_filters::LevelFilter; + // use tracing_subscriber::{layer::SubscriberExt as _, util::SubscriberInitExt as _, EnvFilter}; + + #[tokio::test(flavor = "multi_thread")] + async fn backup_e2e_works() { + // Uncomment to toggle logs. + // let registry = tracing_subscriber::registry().with( + // EnvFilter::builder().with_default_directive(LevelFilter::DEBUG.into()). + // from_env_lossy(), ); + // let _ = registry.with(tracing_subscriber::fmt::layer()).try_init(); + + let memory_backup_only = [false, true]; + + let task_manager = TaskManager::new(tokio::runtime::Handle::current()); + let task_executor = task_manager.executor(); + + for use_memory_only in memory_backup_only { + println!( + "---- Running backup_memory_e2e_works with use_memory_only = {use_memory_only} ----" + ); + + // 1. Spin up Clickhouse. No validation because we're testing both receipts and bundles, + // and validation on U256 is not supported. + let (image, client, _) = create_test_clickhouse_client(false).await.unwrap(); + create_clickhouse_bundles_table(&client).await.unwrap(); + + let tempfile = tempfile::NamedTempFile::new().unwrap(); + + let disk_backup = DiskBackup::new( + DiskBackupConfig::new().with_path(tempfile.path().to_path_buf().into()), + &task_executor, + ) + .expect("could not create disk backup"); + + let (tx, rx) = mpsc::channel(128); + let mut bundle_backup = Backup::::new_test( + rx, + client + .inserter(BUNDLE_TABLE_NAME) + .with_timeouts(Some(Duration::from_secs(2)), Some(Duration::from_secs(12))), + disk_backup, + use_memory_only, + ); + + spawn_clickhouse_backup!(task_executor, bundle_backup, "bundles", TARGET); + + let quantities = Quantities { bytes: 512, rows: 1, transactions: 1 }; // approximated + let bundle_row: BundleRow = (system_bundle_example(), "buildernet".to_string()).into(); + let bundle_rows = Vec::from([bundle_row]); + let failed_commit = FailedCommit::::new(bundle_rows.clone(), quantities); + + tx.send(failed_commit).await.unwrap(); + // Wait some time to let the backup process it + tokio::time::sleep(Duration::from_millis(100)).await; + + let results = client + .query(&format!("select * from {BUNDLE_TABLE_NAME}")) + .fetch_all::() + .await + .unwrap(); + + assert_eq!(results.len(), 1); + assert_eq!(bundle_rows, results, "expected, got"); + + drop(image); + } + } } diff --git a/src/indexer/click/models.rs b/src/indexer/click/models.rs index 72f2fa77..deb9b30f 100644 --- a/src/indexer/click/models.rs +++ b/src/indexer/click/models.rs @@ -8,7 +8,9 @@ use alloy_consensus::Transaction; use alloy_eips::Typed2718; use alloy_primitives::{Address, Keccak256, B256, U256}; use alloy_rlp::Encodable; +use clickhouse::Row; use rbuilder_primitives::BundleVersion; +use rbuilder_utils::clickhouse::backup::primitives::{ClickhouseIndexableOrder, ClickhouseRowExt}; use time::{OffsetDateTime, UtcDateTime}; use uuid::Uuid; @@ -19,7 +21,7 @@ use crate::primitives::{DecodedBundle, SystemBundle}; /// NOTE: Make sure the fields are in the same order as the columns in the Clickhouse table. #[derive(Clone, clickhouse::Row, Debug, serde::Serialize, serde::Deserialize)] #[cfg_attr(test, derive(PartialEq, Eq))] -pub(crate) struct BundleRow { +pub struct BundleRow { /// The timestamp at which the bundle was observed. #[serde(with = "clickhouse::serde::time::datetime64::micros")] pub received_at: OffsetDateTime, @@ -123,6 +125,18 @@ pub(crate) struct BundleRow { pub version: u8, } +impl ClickhouseRowExt for BundleRow { + const ORDER: &'static str = "bundle"; + + fn hash(&self) -> B256 { + self.hash + } + + fn to_row_ref(row: &Self) -> &::Value<'_> { + row + } +} + /// Adapted from impl From<(SystemBundle, String)> for BundleRow { fn from((bundle, builder_name): (SystemBundle, String)) -> Self { @@ -320,7 +334,7 @@ impl From<(SystemBundle, String)> for BundleRow { /// The clickhouse model representing a [`crate::primitives::BundleReceipt`]. #[derive(Debug, Clone, clickhouse::Row, serde::Serialize, serde::Deserialize)] #[cfg_attr(test, derive(PartialEq, Eq))] -pub(crate) struct BundleReceiptRow { +pub struct BundleReceiptRow { #[serde(with = "hash")] pub(crate) bundle_hash: B256, /// The hash of the bundle hash. @@ -337,6 +351,18 @@ pub(crate) struct BundleReceiptRow { pub(crate) priority: u8, } +impl ClickhouseRowExt for BundleReceiptRow { + const ORDER: &'static str = "bundle_receipt"; + + fn hash(&self) -> B256 { + self.bundle_hash + } + + fn to_row_ref(row: &Self) -> &::Value<'_> { + row + } +} + impl From<(BundleReceipt, String)> for BundleReceiptRow { fn from((receipt, dst_builder_name): (BundleReceipt, String)) -> Self { let mut hasher = Keccak256::new(); @@ -368,6 +394,34 @@ impl From<(BundleReceipt, String)> for BundleReceiptRow { } } +impl ClickhouseIndexableOrder for SystemBundle { + type ClickhouseRowType = BundleRow; + + const ORDER: &'static str = ::ORDER; + + fn hash(&self) -> B256 { + self.bundle_hash + } + + fn to_row(self, builder_name: String) -> Self::ClickhouseRowType { + (self, builder_name).into() + } +} + +impl ClickhouseIndexableOrder for BundleReceipt { + type ClickhouseRowType = BundleReceiptRow; + + const ORDER: &'static str = ::ORDER; + + fn hash(&self) -> B256 { + self.bundle_hash + } + + fn to_row(self, builder_name: String) -> Self::ClickhouseRowType { + (self, builder_name).into() + } +} + /// Tests to make sure round-trip conversion between raw bundle and clickhouse bundle types is /// feasible. #[cfg(test)] diff --git a/src/indexer/click/primitives.rs b/src/indexer/click/primitives.rs deleted file mode 100644 index 7616cc8a..00000000 --- a/src/indexer/click/primitives.rs +++ /dev/null @@ -1,91 +0,0 @@ -use alloy_primitives::B256; -use clickhouse::{Row, RowWrite}; -use serde::{de::DeserializeOwned, Serialize}; - -use crate::{ - indexer::click::models::{BundleReceiptRow, BundleRow}, - primitives::{BundleReceipt, SystemBundle}, -}; - -pub(crate) trait ClickhouseRowExt: - Row + RowWrite + Serialize + DeserializeOwned + Sync + Send + 'static -{ - /// The type of such row, e.g. "bundles" or "bundle_receipts". Used as backup db table name and - /// for informational purposes. - const ORDER: &'static str; - - /// An identifier of such row. - fn hash(&self) -> B256; - - /// Internal function that takes the inner row types and extracts the reference needed for - /// Clickhouse inserter functions like `Inserter::write`. While a default implementation is not - /// provided, it should suffice to simply return `row`. - fn to_row_ref(row: &Self) -> &::Value<'_>; -} - -impl ClickhouseRowExt for BundleRow { - const ORDER: &'static str = "bundle"; - - fn hash(&self) -> B256 { - self.hash - } - - fn to_row_ref(row: &Self) -> &::Value<'_> { - row - } -} - -impl ClickhouseRowExt for BundleReceiptRow { - const ORDER: &'static str = "bundle_receipt"; - - fn hash(&self) -> B256 { - self.bundle_hash - } - - fn to_row_ref(row: &Self) -> &::Value<'_> { - row - } -} - -/// An high-level order type that can be indexed in clickhouse. -pub(crate) trait ClickhouseIndexableOrder: Sized { - /// The associated inner row type that can be serialized into Clickhouse data. - type ClickhouseRowType: ClickhouseRowExt; - - /// The type of such order, e.g. "bundles" or "transactions". For informational purposes. - const ORDER: &'static str; - - /// An identifier of such order. - fn hash(&self) -> B256; - - /// Converts such order into the associated Clickhouse row type. - fn to_row(self, builder_name: String) -> Self::ClickhouseRowType; -} - -impl ClickhouseIndexableOrder for SystemBundle { - type ClickhouseRowType = BundleRow; - - const ORDER: &'static str = ::ORDER; - - fn hash(&self) -> B256 { - self.bundle_hash - } - - fn to_row(self, builder_name: String) -> Self::ClickhouseRowType { - (self, builder_name).into() - } -} - -impl ClickhouseIndexableOrder for BundleReceipt { - type ClickhouseRowType = BundleReceiptRow; - - const ORDER: &'static str = ::ORDER; - - fn hash(&self) -> B256 { - self.bundle_hash - } - - fn to_row(self, builder_name: String) -> Self::ClickhouseRowType { - (self, builder_name).into() - } -} diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index d431b5b7..ce55791e 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -3,6 +3,7 @@ use std::fmt::Debug; +use rbuilder_utils::tasks::TaskExecutor; use tokio::sync::mpsc; use crate::{ @@ -10,7 +11,6 @@ use crate::{ indexer::{click::ClickhouseIndexer, parq::ParquetIndexer}, metrics::IndexerMetrics, primitives::{BundleReceipt, SystemBundle}, - tasks::TaskExecutor, }; pub(crate) mod click; @@ -42,7 +42,7 @@ pub const BACKUP_DATABASE_PATH: &str = "/var/lib/buildernet-of-proxy/clickhouse- const TARGET: &str = "indexer"; /// Trait for adding order indexing functionality. -pub trait OrderIndexer: Sync + Send { +pub(crate) trait OrderIndexer: Sync + Send { fn index_bundle(&self, system_bundle: SystemBundle); fn index_bundle_receipt(&self, bundle_receipt: BundleReceipt); } diff --git a/src/indexer/parq.rs b/src/indexer/parq.rs index ddd80492..08759747 100644 --- a/src/indexer/parq.rs +++ b/src/indexer/parq.rs @@ -12,6 +12,7 @@ use arrow::{ error::Result as ArrowResult, }; use parquet::{arrow::ArrowWriter, file::properties::WriterPropertiesBuilder}; +use rbuilder_utils::{metrics::Sampler, tasks::TaskExecutor}; use tokio::{sync::mpsc, time::Instant}; use std::{ @@ -25,8 +26,7 @@ use crate::{ cli::ParquetArgs, indexer::{OrderReceivers, TARGET}, metrics::IndexerMetrics, - primitives::{BundleReceipt, Sampler}, - tasks::TaskExecutor, + primitives::BundleReceipt, }; /// The Arrow schema for bundle receipts. @@ -270,6 +270,7 @@ impl ParquetRunner { #[cfg(test)] mod tests { use alloy_primitives::{Address, B256, U32}; + use rbuilder_utils::tasks::TaskManager; use time::UtcDateTime; // Uncomment to enable logging during tests. @@ -281,7 +282,6 @@ mod tests { indexer::{self, parq::ParquetIndexer}, primitives::BundleReceipt, priority::Priority, - tasks::TaskManager, utils::testutils::Random, }; diff --git a/src/lib.rs b/src/lib.rs index bad56063..96bb6921 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,6 @@ use crate::{ primitives::SystemBundleDecoder, runner::CliContext, statics::LOCAL_PEER_STORE, - tasks::TaskExecutor, }; use alloy_primitives::Address; use alloy_signer_local::PrivateKeySigner; @@ -29,6 +28,7 @@ use eyre::Context as _; use forwarder::{spawn_forwarder, IngressForwarders, PeerHandle}; use metrics_exporter_prometheus::PrometheusBuilder; use metrics_util::layers::{PrefixLayer, Stack}; +use rbuilder_utils::tasks::TaskExecutor; use reqwest::Url; use std::{ net::SocketAddr, @@ -61,7 +61,6 @@ pub mod priority; pub mod rate_limit; pub mod runner; pub mod statics; -pub mod tasks; pub mod utils; pub mod validation; diff --git a/src/metrics.rs b/src/metrics.rs index 1f566f3e..fb1de5d8 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -4,8 +4,9 @@ use std::time::Duration; use hyper::{Method, StatusCode}; use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram}; +use rbuilder_utils::clickhouse::Quantities; -use crate::{forwarder::ForwardingDirection, primitives::Quantities, priority::Priority}; +use crate::{forwarder::ForwardingDirection, priority::Priority}; mod name { /// BuilderHub metrics. diff --git a/src/primitives/backoff.rs b/src/primitives/backoff.rs deleted file mode 100644 index 83cb3eb5..00000000 --- a/src/primitives/backoff.rs +++ /dev/null @@ -1,313 +0,0 @@ -//! Time-related utilies. - -use std::{ - future::{poll_fn, Future as _}, - iter::Iterator, - pin::Pin, - task::{Context, Poll}, - time::Duration, -}; - -/// A random number generator for applying jitter to [`std::time::Duration`]. -#[derive(Debug, Clone)] -pub(crate) struct Jitter; - -impl Jitter { - /// Apply jitter to provided duration, by multiplying it for a random number between 0 and 2. - pub(crate) fn apply_to(duration: Duration) -> Duration { - duration.mul_f64(rand::random::() * 2_f64) - } -} - -/// A retry strategy driven by exponential back-off. -/// -/// The power corresponds to the number of past attempts. -/// -/// Taken from -#[derive(Debug, Clone)] -pub(crate) struct ExponentialBackoff { - current: u64, - base: u64, - factor: u64, - max_delay: Option, -} - -#[allow(dead_code)] -impl ExponentialBackoff { - /// Constructs a new exponential back-off strategy, - /// given a base duration in milliseconds. - /// - /// The resulting duration is calculated by taking the base to the `n`-th power, - /// where `n` denotes the number of past attempts. - pub(crate) fn from_millis(base: u64) -> ExponentialBackoff { - ExponentialBackoff { current: base, base, factor: 1u64, max_delay: None } - } - - /// A multiplicative factor that will be applied to the retry delay. - /// - /// For example, using a factor of `1000` will make each delay in units of seconds. - /// - /// Default factor is `1`. - pub(crate) fn factor(mut self, factor: u64) -> ExponentialBackoff { - self.factor = factor; - self - } - - /// Apply a maximum delay. No retry delay will be longer than this `Duration`. - pub(crate) fn max_delay(mut self, duration: Duration) -> ExponentialBackoff { - self.max_delay = Some(duration); - self - } - - /// Reset the backoff to the initial state. - pub(crate) fn reset(&mut self) { - self.current = self.base; - } -} - -impl Iterator for ExponentialBackoff { - type Item = Duration; - - // TODO: change this logic, so that we can always multiply base by a factor. - // e.g. base = 8, factor = 2 yields to: 8ms, 16ms, 32ms, 64ms, ... - fn next(&mut self) -> Option { - // set delay duration by applying factor - let duration = if let Some(duration) = self.current.checked_mul(self.factor) { - Duration::from_millis(duration) - } else { - Duration::from_millis(u64::MAX) - }; - - // check if we reached max delay - if let Some(ref max_delay) = self.max_delay { - if duration > *max_delay { - return Some(*max_delay); - } - } - - if let Some(next) = self.current.checked_mul(self.base) { - self.current = next; - } else { - self.current = u64::MAX; - } - - Some(duration) - } -} - -/// An interval heavily inspired by [`tokio::time::Interval`], that supports exponential back-off -/// and jitter. -#[derive(Debug)] -pub(crate) struct BackoffInterval { - /// Future that completes the next time the `Interval` yields a value. - delay: Pin>, - - /// The exponential backoff configuration. - backoff: ExponentialBackoff, - - /// An optional jitter to apply to the ticks. - jitter: bool, -} - -impl BackoffInterval { - /// Creates a new interval that ticks immediately. - pub(crate) fn new(backoff: ExponentialBackoff) -> Self { - let start = tokio::time::Instant::now(); - let delay = Box::pin(tokio::time::sleep_until(start)); - Self { delay, backoff, jitter: false } - } - - pub(crate) fn with_jitter(mut self) -> Self { - self.jitter = true; - self - } - - pub(crate) fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll { - // Wait for the delay to be done - std::task::ready!(Pin::new(&mut self.delay).poll(cx)); - - // Get the time when we were schedulued to tick - let timeout = self.delay.deadline(); - - // CHANGE: use custom logic that takes into a account backoff and jitter to calculate new - // instant. - let next = self.next(); - - // CHANGE: Unfortunately, [`tokio::time::Sleep::reset_without_reregister`] isn't - // pub(crate)lic so we have to register the waker again. - self.delay.as_mut().reset(next); - - Poll::Ready(timeout) - } - - /// Completes when the next instant in the interval has been reached. - pub(crate) async fn tick(&mut self) -> tokio::time::Instant { - let instant = poll_fn(|cx| self.poll_tick(cx)); - - instant.await - } - - /// Resets backoff to the initial state, and the next tick will happen after the initial period - /// returned by [`ExponentialBackoff`]. - pub(crate) fn reset(&mut self) { - self.backoff.reset(); - let next = self.next(); - self.delay.as_mut().reset(next); - } - - /// Return the next instant at which the interval should tick. - fn next(&mut self) -> tokio::time::Instant { - let now = tokio::time::Instant::now(); - // We provide a [`tokio::time::MissedTickBehavior::Delay`] behavior but we also add backoff - // and jitter if the user configured it. - let mut period = self.backoff.next().expect("ExponentialBackoff never returns None"); - if self.jitter { - period = Jitter::apply_to(period); - } - now.checked_add(period).expect("no overflow") - } -} - -impl Default for BackoffInterval { - fn default() -> Self { - // So will return 4, 16, 64, 256, 1024, ... milliseconds with jitter. - Self::new(ExponentialBackoff::from_millis(4).max_delay(Duration::from_millis(8192))) - .with_jitter() - } -} - -#[cfg(test)] -mod tests { - use tokio::time::{Duration, Instant}; - - use super::*; - - #[test] - fn exp_backoff_returns_some_exponential_base_10() { - let mut s = ExponentialBackoff::from_millis(10); - - assert_eq!(s.next(), Some(Duration::from_millis(10))); - assert_eq!(s.next(), Some(Duration::from_millis(100))); - assert_eq!(s.next(), Some(Duration::from_millis(1000))); - } - - #[test] - fn exp_backoff_returns_some_exponential_base_2() { - let mut s = ExponentialBackoff::from_millis(2); - - assert_eq!(s.next(), Some(Duration::from_millis(2))); - assert_eq!(s.next(), Some(Duration::from_millis(4))); - assert_eq!(s.next(), Some(Duration::from_millis(8))); - } - - #[test] - fn exp_backoff_saturates_at_maximum_value() { - let mut s = ExponentialBackoff::from_millis(u64::MAX - 1); - - assert_eq!(s.next(), Some(Duration::from_millis(u64::MAX - 1))); - assert_eq!(s.next(), Some(Duration::from_millis(u64::MAX))); - assert_eq!(s.next(), Some(Duration::from_millis(u64::MAX))); - } - - #[test] - fn exp_backoff_can_use_factor_to_get_seconds() { - let factor = 1000; - let mut s = ExponentialBackoff::from_millis(2).factor(factor); - - assert_eq!(s.next(), Some(Duration::from_secs(2))); - assert_eq!(s.next(), Some(Duration::from_secs(4))); - assert_eq!(s.next(), Some(Duration::from_secs(8))); - } - - #[test] - fn exp_backoff_stops_increasing_at_max_delay() { - let mut s = ExponentialBackoff::from_millis(2).max_delay(Duration::from_millis(4)); - - assert_eq!(s.next(), Some(Duration::from_millis(2))); - assert_eq!(s.next(), Some(Duration::from_millis(4))); - assert_eq!(s.next(), Some(Duration::from_millis(4))); - } - - #[test] - fn exp_backoff_returns_max_when_max_less_than_base() { - let mut s = ExponentialBackoff::from_millis(20).max_delay(Duration::from_millis(10)); - - assert_eq!(s.next(), Some(Duration::from_millis(10))); - assert_eq!(s.next(), Some(Duration::from_millis(10))); - } - - // Tests with `start_paused = true` consists of tests with [`tokio::time::pause`] and - // require manual advancement of time with [`tokio::time::advance`] or with sleeps. - - #[tokio::test(start_paused = true)] - async fn backoff_interval_ticks_as_expected() { - let backoff = ExponentialBackoff::from_millis(2); - let mut backoff_clone = backoff.clone(); - let mut interval = BackoffInterval::new(backoff); - - let before = Instant::now(); - let t1 = interval.tick().await; - assert_eq!(t1, before); - let t2 = interval.tick().await; - assert_eq!(t2, t1 + backoff_clone.next().unwrap()); - let t3 = interval.tick().await; - assert_eq!(t3, t2 + backoff_clone.next().unwrap()); - let t4 = interval.tick().await; - assert_eq!(t4, t3 + backoff_clone.next().unwrap()); - } - - #[tokio::test(start_paused = true)] - async fn backoff_interval_resets_properly() { - let backoff = ExponentialBackoff::from_millis(2); - let mut backoff_clone = backoff.clone(); - let mut interval = BackoffInterval::new(backoff); - - interval.tick().await; - interval.tick().await; - interval.tick().await; - interval.tick().await; - - interval.reset(); - let now = Instant::now(); - let expected_delay = backoff_clone.next().unwrap(); - let actual = interval.tick().await; - - assert_eq!(now + expected_delay, actual); - } - - #[tokio::test(start_paused = true)] - async fn backoff_interval_with_jitter_works() { - // No jitter - { - let beginning = Instant::now(); - - let backoff = ExponentialBackoff::from_millis(5); - let mut backoff_clone = backoff.clone(); - let mut interval = BackoffInterval::new(backoff); - - let t1 = interval.tick().await; - assert_eq!(t1, beginning); // First tick is immediate - - let t2 = interval.tick().await; - assert_eq!(t2, t1 + backoff_clone.next().unwrap()); - - let t3 = interval.tick().await; - assert_eq!(t3, t2 + backoff_clone.next().unwrap()); - } - - // Jitter - { - let beginning = Instant::now(); - - let backoff = ExponentialBackoff::from_millis(5); - let mut backoff_clone = backoff.clone(); - let mut interval = BackoffInterval::new(backoff).with_jitter(); - let t1 = interval.tick().await; - assert_eq!(t1, beginning); // First tick is immediate - - // Next tick will be 5ms later, but jitter changes it. - let t2 = interval.tick().await; - assert_ne!(t2, t1 + backoff_clone.next().unwrap()); - } - } -} diff --git a/src/primitives/mod.rs b/src/primitives/mod.rs index 8be1308d..1a97675f 100644 --- a/src/primitives/mod.rs +++ b/src/primitives/mod.rs @@ -24,7 +24,7 @@ use rbuilder_primitives::{ Bundle, BundleReplacementData, ShareBundle, }; use revm_primitives::B256; -use serde::{Deserialize, Serialize}; +use serde::Serialize; use serde_json::json; use uuid::Uuid; @@ -33,8 +33,6 @@ use crate::{ priority::Priority, }; -pub mod backoff; - /// Metadata about a [`SystemBundle`]. #[derive(PartialEq, Eq, Clone, Debug)] pub struct SystemBundleMetadata { @@ -531,8 +529,8 @@ impl SystemTransaction { /// The receipt of a bundle received from the system endpoint, to be indexed. #[derive(Debug, Clone, PartialEq, Eq)] +/// The hash of the raw bundle. pub struct BundleReceipt { - /// The hash of the raw bundle. pub bundle_hash: B256, /// The time the bundle has been sent, according to the field provided in the JSON-RPC request /// header. `None` if the bundle was sent on the user endpoint. @@ -698,76 +696,6 @@ impl Samplable for B256 { } } -/// A simple sampler that executes a closure every `sample_size` calls, or if a certain amount of -/// time has passed since last sampling call. -#[derive(Debug, Clone)] -pub struct Sampler { - sample_size: usize, - counter: usize, - start: Instant, - interval: Duration, -} - -impl Default for Sampler { - fn default() -> Self { - Self { - sample_size: 4096, - counter: 0, - start: Instant::now(), - interval: Duration::from_secs(10), - } - } -} - -impl Sampler { - pub fn with_sample_size(mut self, sample_size: usize) -> Self { - self.sample_size = sample_size; - self - } - - pub fn with_interval(mut self, interval: Duration) -> Self { - self.start = Instant::now() - interval; - self - } - - /// Call this function to potentially execute the sample closure if we have reached the sample - /// size, or enough time has passed. Otherwise, it increments the internal counter. - pub fn sample(&mut self, f: impl FnOnce()) { - if self.counter >= self.sample_size || self.start.elapsed() >= self.interval { - self.counter = 0; - self.start = Instant::now(); - f(); - } else { - self.counter += 1; - } - } -} - -/// Equilalent of `clickhouse::inserter::Quantities` with more traits derived. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] -pub struct Quantities { - pub bytes: u64, - pub rows: u64, - pub transactions: u64, -} - -impl Quantities { - /// Just zero quantities, nothing special. - pub const ZERO: Quantities = Quantities { bytes: 0, rows: 0, transactions: 0 }; -} - -impl From for Quantities { - fn from(value: clickhouse::inserter::Quantities) -> Self { - Self { bytes: value.bytes, rows: value.rows, transactions: value.transactions } - } -} - -impl From for clickhouse::inserter::Quantities { - fn from(value: Quantities) -> Self { - Self { bytes: value.bytes, rows: value.rows, transactions: value.transactions } - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/runner/mod.rs b/src/runner/mod.rs index f266be19..e1c91373 100644 --- a/src/runner/mod.rs +++ b/src/runner/mod.rs @@ -2,7 +2,7 @@ use std::{future::Future, time::Duration}; -use crate::tasks::{self, TaskExecutor, TaskManager}; +use rbuilder_utils::tasks::{PanickedTaskError, TaskExecutor, TaskManager}; #[derive(Debug, Clone)] pub struct CliContext { @@ -39,7 +39,7 @@ impl CliRunner { ) -> Result<(), E> where F: Future>, - E: Send + Sync + From + From + 'static, + E: Send + Sync + From + From + 'static, { let tokio_runtime = self.tokio_runtime; let mut task_manager = TaskManager::new(tokio_runtime.handle().clone()); @@ -118,7 +118,7 @@ where async fn run_to_completion_or_panic(tasks: &mut TaskManager, fut: F) -> Result<(), E> where F: Future>, - E: Send + Sync + From + 'static, + E: Send + Sync + From + 'static, { { let fut = Box::pin(fut); diff --git a/src/tasks/mod.rs b/src/tasks/mod.rs deleted file mode 100644 index 840c4627..00000000 --- a/src/tasks/mod.rs +++ /dev/null @@ -1,863 +0,0 @@ -//! Task management utilities. -//! -//! Taken from `reth_tasks` crate (https://github.com/paradigmxyz/reth/blob/main/crates/tasks/src/lib.rs) and adapted. -//! -//! This crate exposes two main abstractions: a [`TaskManager`] and a [`TaskExecutor`]. The -//! [`TaskManager`] is a centralized entity responsible, as the name suggests, for managing tasks, -//! while the [`TaskExecutor`] is used to spawn tasks onto a Tokio runtime. -//! -//! ## Architecture -//! -//! The [`TaskManager`] holds a [`tokio`] runtime handle that is needed to create child executor -//! that actually spawn tasks. Other than that, it contains: -//! - a receiver for task events (like packing of critical tasks); -//! - a sender for task events, used by the executors to let spawned task report events; -//! - a counter which tracks how many tasks that need graceful shutdown are currently running. -//! -//! Tasks can be also spawned as "critical" and/or with "graceful shutdown" support. -//! Critical tasks when they terminate they send a message to the [`TaskManager`] which in turn -//! will terminate itself after sending a shutdown signal to all long-running tasks. It is up to -//! the application to wait enough time before closing the process to allow graceful shutdown tasks -//! to complete. -//! Graceful shutdown tasks are spawned with a [`GracefulShutdown`] signal that can be awaited, and -//! resolves when a shutdown is explicitely requested by the executor or manager. That can be -//! before a SIGINT/SIGTERM signal is received or when a critical task panics. The -//! [`GracefulShutdown`] signal resolves to a [`GracefulShutdownGuard`]. This guard is simply a -//! shared counter that when dropped, decrements. This used by tasks to notify the manager that the -//! graceful shutdown has completed. - -use dyn_clone::DynClone; -use futures_util::{ - future::{select, BoxFuture}, - Future, FutureExt, TryFutureExt, -}; -use shutdown::{signal, GracefulShutdown, GracefulShutdownGuard, Shutdown, Signal}; -use std::{ - any::Any, - fmt::{Display, Formatter}, - pin::{pin, Pin}, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - task::{ready, Context, Poll}, -}; -use tokio::{ - runtime::Handle, - sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - task::JoinHandle, -}; -use tracing_futures::Instrument; - -pub mod shutdown; - -/// A type that can spawn tasks. -/// -/// The main purpose of this type is to abstract over [`TaskExecutor`] so it's more convenient to -/// provide default impls for testing. -/// -/// -/// # Examples -/// -/// Use the [`TokioTaskExecutor`] that spawns with [`tokio::task::spawn`] -/// -/// ``` -/// # async fn t() { -/// use flowproxy::tasks::{TaskSpawner, TokioTaskExecutor}; -/// let executor = TokioTaskExecutor::default(); -/// -/// let task = executor.spawn(Box::pin(async { -/// // -- snip -- -/// })); -/// task.await.unwrap(); -/// # } -/// ``` -/// -/// Use the [`TaskExecutor`] that spawns task directly onto the tokio runtime via the [Handle]. -/// -/// ``` -/// # use flowproxy::tasks::TaskManager; -/// fn t() { -/// use flowproxy::tasks::TaskSpawner; -/// let rt = tokio::runtime::Runtime::new().unwrap(); -/// let manager = TaskManager::new(rt.handle().clone()); -/// let executor = manager.executor(); -/// let task = TaskSpawner::spawn(&executor, Box::pin(async { -/// // -- snip -- -/// })); -/// rt.block_on(task).unwrap(); -/// # } -/// ``` -/// -/// The [`TaskSpawner`] trait is [`DynClone`] so `Box` are also `Clone`. -#[auto_impl::auto_impl(&, Arc)] -pub trait TaskSpawner: Send + Sync + Unpin + std::fmt::Debug + DynClone { - /// Spawns the task onto the runtime. - /// See also [`Handle::spawn`]. - fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()>; - - /// This spawns a critical task onto the runtime. - fn spawn_critical(&self, name: &'static str, fut: BoxFuture<'static, ()>) -> JoinHandle<()>; - - /// Spawns a blocking task onto the runtime. - fn spawn_blocking(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()>; - - /// This spawns a critical blocking task onto the runtime. - fn spawn_critical_blocking( - &self, - name: &'static str, - fut: BoxFuture<'static, ()>, - ) -> JoinHandle<()>; -} - -dyn_clone::clone_trait_object!(TaskSpawner); - -/// An [`TaskSpawner`] that uses [`tokio::task::spawn`] to execute tasks -#[derive(Debug, Clone, Default)] -#[non_exhaustive] -pub struct TokioTaskExecutor; - -impl TokioTaskExecutor { - /// Converts the instance to a boxed [`TaskSpawner`]. - pub fn boxed(self) -> Box { - Box::new(self) - } -} - -impl TaskSpawner for TokioTaskExecutor { - fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { - tokio::task::spawn(fut) - } - - fn spawn_critical(&self, _name: &'static str, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { - tokio::task::spawn(fut) - } - - fn spawn_blocking(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { - tokio::task::spawn_blocking(move || tokio::runtime::Handle::current().block_on(fut)) - } - - fn spawn_critical_blocking( - &self, - _name: &'static str, - fut: BoxFuture<'static, ()>, - ) -> JoinHandle<()> { - tokio::task::spawn_blocking(move || tokio::runtime::Handle::current().block_on(fut)) - } -} - -/// Many reth components require to spawn tasks for long-running jobs. For example `discovery` -/// spawns tasks to handle egress and ingress of udp traffic or `network` that spawns session tasks -/// that handle the traffic to and from a peer. -/// -/// To unify how tasks are created, the [`TaskManager`] provides access to the configured Tokio -/// runtime. A [`TaskManager`] stores the [`tokio::runtime::Handle`] it is associated with. In this -/// way it is possible to configure on which runtime a task is executed. -/// -/// The main purpose of this type is to be able to monitor if a critical task panicked, for -/// diagnostic purposes, since tokio task essentially fail silently. Therefore, this type is a -/// Stream that yields the name of panicked task, See [`TaskExecutor::spawn_critical`]. In order to -/// execute Tasks use the [`TaskExecutor`] type [`TaskManager::executor`]. -#[derive(Debug)] -#[must_use = "TaskManager must be polled to monitor critical tasks"] -pub struct TaskManager { - /// Handle to the tokio runtime this task manager is associated with. - /// - /// See [`Handle`] docs. - handle: Handle, - /// Sender half for sending task events to this type - task_events_tx: UnboundedSender, - /// Receiver for task events - task_events_rx: UnboundedReceiver, - /// The [Signal] to fire when all tasks should be shutdown. - /// - /// This is fired when dropped. - signal: Option, - /// Receiver of the shutdown signal. - on_shutdown: Shutdown, - /// How many [`GracefulShutdown`] tasks are currently active - graceful_tasks: Arc, -} - -// === impl TaskManager === - -impl TaskManager { - /// Returns a __new__ [`TaskManager`] over the currently running Runtime. - /// - /// This must be polled for the duration of the program. - /// - /// To obtain the current [`TaskExecutor`] see [`TaskExecutor::current`]. - /// - /// # Panics - /// - /// This will panic if called outside the context of a Tokio runtime. - pub fn current() -> Self { - let handle = Handle::current(); - Self::new(handle) - } - - /// Create a new instance connected to the given handle's tokio runtime. - /// - /// This also sets the global [`TaskExecutor`]. - pub fn new(handle: Handle) -> Self { - let (task_events_tx, task_events_rx) = unbounded_channel(); - let (signal, on_shutdown) = signal(); - Self { - handle, - task_events_tx, - task_events_rx, - signal: Some(signal), - on_shutdown, - graceful_tasks: Arc::new(AtomicUsize::new(0)), - } - } - - /// Returns a new [`TaskExecutor`] that can spawn new tasks onto the tokio runtime this type is - /// connected to. - pub fn executor(&self) -> TaskExecutor { - TaskExecutor { - handle: self.handle.clone(), - on_shutdown: self.on_shutdown.clone(), - task_events_tx: self.task_events_tx.clone(), - graceful_tasks: Arc::clone(&self.graceful_tasks), - } - } - - /// Fires the shutdown signal and awaits until all tasks are shutdown. - pub fn graceful_shutdown(self) { - let _ = self.do_graceful_shutdown(None); - } - - /// Fires the shutdown signal and awaits until all tasks are shutdown. - /// - /// Returns true if all tasks were shutdown before the timeout elapsed. - pub fn graceful_shutdown_with_timeout(self, timeout: std::time::Duration) -> bool { - self.do_graceful_shutdown(Some(timeout)) - } - - fn do_graceful_shutdown(self, timeout: Option) -> bool { - drop(self.signal); - let when = timeout.map(|t| std::time::Instant::now() + t); - while self.graceful_tasks.load(Ordering::Relaxed) > 0 { - if when.map(|when| std::time::Instant::now() > when).unwrap_or(false) { - tracing::debug!("graceful shutdown timed out"); - return false; - } - std::hint::spin_loop(); - } - - tracing::debug!("gracefully shut down"); - true - } -} - -/// An endless future that resolves if a critical task panicked. -/// -/// See [`TaskExecutor::spawn_critical`] -impl Future for TaskManager { - type Output = Result<(), PanickedTaskError>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match ready!(self.as_mut().get_mut().task_events_rx.poll_recv(cx)) { - Some(TaskEvent::Panic(err)) => Poll::Ready(Err(err)), - Some(TaskEvent::GracefulShutdown) | None => { - if let Some(signal) = self.get_mut().signal.take() { - signal.fire(); - } - Poll::Ready(Ok(())) - } - } - } -} - -/// Error with the name of the task that panicked and an error downcasted to string, if possible. -#[derive(Debug, thiserror::Error, PartialEq, Eq)] -pub struct PanickedTaskError { - task_name: &'static str, - error: Option, -} - -impl Display for PanickedTaskError { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let task_name = self.task_name; - if let Some(error) = &self.error { - write!(f, "Critical task `{task_name}` panicked: `{error}`") - } else { - write!(f, "Critical task `{task_name}` panicked") - } - } -} - -impl PanickedTaskError { - fn new(task_name: &'static str, error: Box) -> Self { - let error = match error.downcast::() { - Ok(value) => Some(*value), - Err(error) => match error.downcast::<&str>() { - Ok(value) => Some(value.to_string()), - Err(_) => None, - }, - }; - - Self { task_name, error } - } -} - -/// Represents the events that the `TaskManager`'s main future can receive. -#[derive(Debug)] -enum TaskEvent { - /// Indicates that a critical task has panicked. - Panic(PanickedTaskError), - /// A signal requesting a graceful shutdown of the `TaskManager`. - GracefulShutdown, -} - -/// A type that can spawn new tokio tasks -#[derive(Debug, Clone)] -pub struct TaskExecutor { - /// Handle to the tokio runtime this task manager is associated with. - /// - /// See [`Handle`] docs. - handle: Handle, - /// Receiver of the shutdown signal. - on_shutdown: Shutdown, - /// Sender half for sending task events to this type - task_events_tx: UnboundedSender, - /// How many [`GracefulShutdown`] tasks are currently active - graceful_tasks: Arc, -} - -// === impl TaskExecutor === - -impl TaskExecutor { - /// Returns the [Handle] to the tokio runtime. - pub const fn handle(&self) -> &Handle { - &self.handle - } - - /// Returns the receiver of the shutdown signal. - pub const fn on_shutdown_signal(&self) -> &Shutdown { - &self.on_shutdown - } - - /// Spawns a future on the tokio runtime depending on the [`TaskKind`] - fn spawn_on_rt(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()> - where - F: Future + Send + 'static, - { - match task_kind { - TaskKind::Default => self.handle.spawn(fut), - TaskKind::Blocking => { - let handle = self.handle.clone(); - self.handle.spawn_blocking(move || handle.block_on(fut)) - } - } - } - - /// Spawns a regular task depending on the given [`TaskKind`] - fn spawn_task_as(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()> - where - F: Future + Send + 'static, - { - let on_shutdown = self.on_shutdown.clone(); - - // Wrap the original future to increment the finished tasks counter upon completion - let task = { - async move { - let fut = pin!(fut); - let _ = select(on_shutdown, fut).await; - } - } - .in_current_span(); - - self.spawn_on_rt(task, task_kind) - } - - /// Spawns the task onto the runtime. - /// The given future resolves as soon as the [Shutdown] signal is received. - /// - /// See also [`Handle::spawn`]. - pub fn spawn(&self, fut: F) -> JoinHandle<()> - where - F: Future + Send + 'static, - { - self.spawn_task_as(fut, TaskKind::Default) - } - - /// Spawns a blocking task onto the runtime. - /// The given future resolves as soon as the [Shutdown] signal is received. - /// - /// See also [`Handle::spawn_blocking`]. - pub fn spawn_blocking(&self, fut: F) -> JoinHandle<()> - where - F: Future + Send + 'static, - { - self.spawn_task_as(fut, TaskKind::Blocking) - } - - /// Spawns the task onto the runtime. - /// The given future resolves as soon as the [Shutdown] signal is received. - /// - /// See also [`Handle::spawn`]. - pub fn spawn_with_signal(&self, f: impl FnOnce(Shutdown) -> F) -> JoinHandle<()> - where - F: Future + Send + 'static, - { - let on_shutdown = self.on_shutdown.clone(); - let fut = f(on_shutdown); - - let task = fut.in_current_span(); - - self.handle.spawn(task) - } - - /// Spawns a critical task depending on the given [`TaskKind`] - fn spawn_critical_as( - &self, - name: &'static str, - fut: F, - task_kind: TaskKind, - ) -> JoinHandle<()> - where - F: Future + Send + 'static, - { - let panicked_tasks_tx = self.task_events_tx.clone(); - let on_shutdown = self.on_shutdown.clone(); - - // wrap the task in catch unwind - let task = std::panic::AssertUnwindSafe(fut) - .catch_unwind() - .map_err(move |error| { - let task_error = PanickedTaskError::new(name, error); - tracing::error!("{task_error}"); - let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error)); - }) - .in_current_span(); - - let task = async move { - let task = pin!(task); - let _ = select(on_shutdown, task).await; - }; - - self.spawn_on_rt(task, task_kind) - } - - /// This spawns a critical blocking task onto the runtime. - /// The given future resolves as soon as the [Shutdown] signal is received. - /// - /// If this task panics, the [`TaskManager`] is notified. - pub fn spawn_critical_blocking(&self, name: &'static str, fut: F) -> JoinHandle<()> - where - F: Future + Send + 'static, - { - self.spawn_critical_as(name, fut, TaskKind::Blocking) - } - - /// This spawns a critical task onto the runtime. - /// The given future resolves as soon as the [Shutdown] signal is received. - /// - /// If this task panics, the [`TaskManager`] is notified. - pub fn spawn_critical(&self, name: &'static str, fut: F) -> JoinHandle<()> - where - F: Future + Send + 'static, - { - self.spawn_critical_as(name, fut, TaskKind::Default) - } - - /// This spawns a critical task onto the runtime. - /// - /// If this task panics, the [`TaskManager`] is notified. - pub fn spawn_critical_with_shutdown_signal( - &self, - name: &'static str, - f: impl FnOnce(Shutdown) -> F, - ) -> JoinHandle<()> - where - F: Future + Send + 'static, - { - let panicked_tasks_tx = self.task_events_tx.clone(); - let on_shutdown = self.on_shutdown.clone(); - let fut = f(on_shutdown); - - // wrap the task in catch unwind - let task = std::panic::AssertUnwindSafe(fut) - .catch_unwind() - .map_err(move |error| { - let task_error = PanickedTaskError::new(name, error); - tracing::error!("{task_error}"); - let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error)); - }) - .map(drop) - .in_current_span(); - - self.handle.spawn(task) - } - - /// This spawns a critical task onto the runtime. - /// - /// If this task panics, the [`TaskManager`] is notified. - /// The [`TaskManager`] will wait until the given future has completed before shutting down. - /// - /// # Example - /// - /// ```no_run - /// # async fn t(executor: flowproxy::tasks::TaskExecutor) { - /// - /// executor.spawn_critical_with_graceful_shutdown_signal("grace", |shutdown| async move { - /// // await the shutdown signal - /// let guard = shutdown.await; - /// // do work before exiting the program - /// tokio::time::sleep(std::time::Duration::from_secs(1)).await; - /// // allow graceful shutdown - /// drop(guard); - /// }); - /// # } - /// ``` - pub fn spawn_critical_with_graceful_shutdown_signal( - &self, - name: &'static str, - f: impl FnOnce(GracefulShutdown) -> F, - ) -> JoinHandle<()> - where - F: Future + Send + 'static, - { - let panicked_tasks_tx = self.task_events_tx.clone(); - let on_shutdown = GracefulShutdown::new( - self.on_shutdown.clone(), - GracefulShutdownGuard::new(Arc::clone(&self.graceful_tasks)), - ); - let fut = f(on_shutdown); - - // wrap the task in catch unwind - let task = std::panic::AssertUnwindSafe(fut) - .catch_unwind() - .map_err(move |error| { - let task_error = PanickedTaskError::new(name, error); - tracing::error!("{task_error}"); - let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error)); - }) - .map(drop) - .in_current_span(); - - self.handle.spawn(task) - } - - /// This spawns a regular task onto the runtime. - /// - /// The [`TaskManager`] will wait until the given future has completed before shutting down. - /// - /// # Example - /// - /// ```no_run - /// # async fn t(executor: flowproxy::tasks::TaskExecutor) { - /// - /// executor.spawn_with_graceful_shutdown_signal(|shutdown| async move { - /// // await the shutdown signal - /// let guard = shutdown.await; - /// // do work before exiting the program - /// tokio::time::sleep(std::time::Duration::from_secs(1)).await; - /// // allow graceful shutdown - /// drop(guard); - /// }); - /// # } - /// ``` - pub fn spawn_with_graceful_shutdown_signal( - &self, - f: impl FnOnce(GracefulShutdown) -> F, - ) -> JoinHandle<()> - where - F: Future + Send + 'static, - { - let on_shutdown = GracefulShutdown::new( - self.on_shutdown.clone(), - GracefulShutdownGuard::new(Arc::clone(&self.graceful_tasks)), - ); - let fut = f(on_shutdown); - - self.handle.spawn(fut) - } - - /// Sends a request to the `TaskManager` to initiate a graceful shutdown. - /// - /// Caution: This will terminate the entire program. - /// - /// The [`TaskManager`] upon receiving this event, will terminate and initiate the shutdown that - /// can be handled via the returned [`GracefulShutdown`]. - pub fn initiate_graceful_shutdown( - &self, - ) -> Result> { - self.task_events_tx - .send(TaskEvent::GracefulShutdown) - .map_err(|_send_error_with_task_event| tokio::sync::mpsc::error::SendError(()))?; - - Ok(GracefulShutdown::new( - self.on_shutdown.clone(), - GracefulShutdownGuard::new(Arc::clone(&self.graceful_tasks)), - )) - } -} - -impl TaskSpawner for TaskExecutor { - fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { - self.spawn(fut) - } - - fn spawn_critical(&self, name: &'static str, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { - Self::spawn_critical(self, name, fut) - } - - fn spawn_blocking(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { - self.spawn_blocking(fut) - } - - fn spawn_critical_blocking( - &self, - name: &'static str, - fut: BoxFuture<'static, ()>, - ) -> JoinHandle<()> { - Self::spawn_critical_blocking(self, name, fut) - } -} - -/// `TaskSpawner` with extended behaviour -#[auto_impl::auto_impl(&, Arc)] -pub trait TaskSpawnerExt: Send + Sync + Unpin + std::fmt::Debug + DynClone { - /// This spawns a critical task onto the runtime. - /// - /// If this task panics, the [`TaskManager`] is notified. - /// The [`TaskManager`] will wait until the given future has completed before shutting down. - fn spawn_critical_with_graceful_shutdown_signal( - &self, - name: &'static str, - f: impl FnOnce(GracefulShutdown) -> F, - ) -> JoinHandle<()> - where - F: Future + Send + 'static; - - /// This spawns a regular task onto the runtime. - /// - /// The [`TaskManager`] will wait until the given future has completed before shutting down. - fn spawn_with_graceful_shutdown_signal( - &self, - f: impl FnOnce(GracefulShutdown) -> F, - ) -> JoinHandle<()> - where - F: Future + Send + 'static; -} - -impl TaskSpawnerExt for TaskExecutor { - fn spawn_critical_with_graceful_shutdown_signal( - &self, - name: &'static str, - f: impl FnOnce(GracefulShutdown) -> F, - ) -> JoinHandle<()> - where - F: Future + Send + 'static, - { - Self::spawn_critical_with_graceful_shutdown_signal(self, name, f) - } - - fn spawn_with_graceful_shutdown_signal( - &self, - f: impl FnOnce(GracefulShutdown) -> F, - ) -> JoinHandle<()> - where - F: Future + Send + 'static, - { - Self::spawn_with_graceful_shutdown_signal(self, f) - } -} - -/// Determines how a task is spawned -enum TaskKind { - /// Spawn the task to the default executor [`Handle::spawn`] - Default, - /// Spawn the task to the blocking executor [`Handle::spawn_blocking`] - Blocking, -} - -/// Error returned by `try_current` when no task executor has been configured. -#[derive(Debug, Default, thiserror::Error)] -#[error("No current task executor available.")] -#[non_exhaustive] -pub struct NoCurrentTaskExecutorError; - -#[cfg(test)] -mod tests { - use super::*; - use std::{sync::atomic::AtomicBool, time::Duration}; - - #[test] - fn test_cloneable() { - #[derive(Clone)] - struct ExecutorWrapper { - _e: Box, - } - - let executor: Box = Box::::default(); - let _e = dyn_clone::clone_box(&*executor); - - let e = ExecutorWrapper { _e }; - let _e2 = e; - } - - #[test] - fn test_critical() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let handle = runtime.handle().clone(); - let manager = TaskManager::new(handle); - let executor = manager.executor(); - - executor.spawn_critical("this is a critical task", async { panic!("intentionally panic") }); - - runtime.block_on(async move { - let err_result = manager.await; - assert!(err_result.is_err(), "Expected TaskManager to return an error due to panic"); - let panicked_err = err_result.unwrap_err(); - - assert_eq!(panicked_err.task_name, "this is a critical task"); - assert_eq!(panicked_err.error, Some("intentionally panic".to_string())); - }) - } - - // Tests that spawned tasks are terminated if the `TaskManager` drops - #[test] - fn test_manager_shutdown_critical() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let handle = runtime.handle().clone(); - let manager = TaskManager::new(handle.clone()); - let executor = manager.executor(); - - let (signal, shutdown) = signal(); - - executor.spawn_critical("this is a critical task", async move { - tokio::time::sleep(Duration::from_millis(200)).await; - drop(signal); - }); - - drop(manager); - - handle.block_on(shutdown); - } - - // Tests that spawned tasks are terminated if the `TaskManager` drops - #[test] - fn test_manager_shutdown() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let handle = runtime.handle().clone(); - let manager = TaskManager::new(handle.clone()); - let executor = manager.executor(); - - let (signal, shutdown) = signal(); - - executor.spawn(Box::pin(async move { - tokio::time::sleep(Duration::from_millis(200)).await; - drop(signal); - })); - - drop(manager); - - handle.block_on(shutdown); - } - - #[test] - fn test_manager_graceful_shutdown() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let handle = runtime.handle().clone(); - let manager = TaskManager::new(handle); - let executor = manager.executor(); - - let val = Arc::new(AtomicBool::new(false)); - let c = val.clone(); - executor.spawn_critical_with_graceful_shutdown_signal("grace", |shutdown| async move { - let _guard = shutdown.await; - tokio::time::sleep(Duration::from_millis(200)).await; - c.store(true, Ordering::Relaxed); - }); - - manager.graceful_shutdown(); - assert!(val.load(Ordering::Relaxed)); - } - - #[test] - fn test_manager_graceful_shutdown_many() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let handle = runtime.handle().clone(); - let manager = TaskManager::new(handle); - let executor = manager.executor(); - - let counter = Arc::new(AtomicUsize::new(0)); - let num = 10; - for _ in 0..num { - let c = counter.clone(); - executor.spawn_critical_with_graceful_shutdown_signal( - "grace", - move |shutdown| async move { - let _guard = shutdown.await; - tokio::time::sleep(Duration::from_millis(200)).await; - c.fetch_add(1, Ordering::SeqCst); - }, - ); - } - - manager.graceful_shutdown(); - assert_eq!(counter.load(Ordering::Relaxed), num); - } - - #[test] - fn test_manager_graceful_shutdown_timeout() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let handle = runtime.handle().clone(); - let manager = TaskManager::new(handle); - let executor = manager.executor(); - - let timeout = Duration::from_millis(500); - let val = Arc::new(AtomicBool::new(false)); - let val2 = val.clone(); - executor.spawn_critical_with_graceful_shutdown_signal("grace", |shutdown| async move { - let _guard = shutdown.await; - tokio::time::sleep(timeout * 3).await; - val2.store(true, Ordering::Relaxed); - unreachable!("should not be reached"); - }); - - manager.graceful_shutdown_with_timeout(timeout); - assert!(!val.load(Ordering::Relaxed)); - } - - #[test] - fn test_graceful_shutdown_triggered_by_executor() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let task_manager = TaskManager::new(runtime.handle().clone()); - let executor = task_manager.executor(); - - let task_did_shutdown_flag = Arc::new(AtomicBool::new(false)); - let flag_clone = task_did_shutdown_flag.clone(); - - let spawned_task_handle = executor.spawn_with_signal(|shutdown_signal| async move { - shutdown_signal.await; - flag_clone.store(true, Ordering::SeqCst); - }); - - let manager_future_handle = runtime.spawn(task_manager); - - let send_result = executor.initiate_graceful_shutdown(); - assert!(send_result.is_ok(), "Sending the graceful shutdown signal should succeed and return a GracefulShutdown future"); - - let manager_final_result = runtime.block_on(manager_future_handle); - - assert!(manager_final_result.is_ok(), "TaskManager task should not panic"); - assert_eq!( - manager_final_result.unwrap(), - Ok(()), - "TaskManager should resolve cleanly with Ok(()) after graceful shutdown request" - ); - - let task_join_result = runtime.block_on(spawned_task_handle); - assert!(task_join_result.is_ok(), "Spawned task should complete without panic"); - - assert!( - task_did_shutdown_flag.load(Ordering::Relaxed), - "Task should have received the shutdown signal and set the flag" - ); - } -} diff --git a/src/tasks/shutdown.rs b/src/tasks/shutdown.rs deleted file mode 100644 index e108c31c..00000000 --- a/src/tasks/shutdown.rs +++ /dev/null @@ -1,162 +0,0 @@ -//! Helper for shutdown signals - -use futures_util::{ - future::{FusedFuture, Shared}, - FutureExt, -}; -use std::{ - future::Future, - pin::Pin, - sync::{atomic::AtomicUsize, Arc}, - task::{ready, Context, Poll}, -}; -use tokio::sync::oneshot; - -/// A [`Future`] that resolves when the shutdown event has been fired. -/// -/// Compared to [`Shutdown`] it is "graceful", meaning that when it resolves it returns a -/// [`GracefulShutdownGuard`]. -#[derive(Debug)] -pub struct GracefulShutdown { - shutdown: Shutdown, - guard: Option, -} - -impl GracefulShutdown { - /// Creates a new instance of `Self`. To do so, it requires a [`Shutdown`] future, that will - /// drive `Self` to resolution, and the [`GracefulShutdownGuard`] used to notify the completion - /// of the graceful shutdown produre. - pub(crate) const fn new(shutdown: Shutdown, guard: GracefulShutdownGuard) -> Self { - Self { shutdown, guard: Some(guard) } - } -} - -impl Future for GracefulShutdown { - type Output = GracefulShutdownGuard; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - ready!(self.shutdown.poll_unpin(cx)); - Poll::Ready(self.get_mut().guard.take().expect("Future polled after completion")) - } -} - -impl Clone for GracefulShutdown { - fn clone(&self) -> Self { - Self { - shutdown: self.shutdown.clone(), - guard: self.guard.as_ref().map(|g| GracefulShutdownGuard::new(Arc::clone(&g.0))), - } - } -} - -/// A guard that fires once dropped to signal the [`TaskManager`](crate::TaskManager) that the -/// [`GracefulShutdown`] has completed. -#[derive(Debug)] -#[must_use = "if unused the task will not be gracefully shutdown"] -pub struct GracefulShutdownGuard(Arc); - -impl GracefulShutdownGuard { - pub(crate) fn new(counter: Arc) -> Self { - counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - Self(counter) - } -} - -impl Drop for GracefulShutdownGuard { - fn drop(&mut self) { - self.0.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); - } -} - -/// A [`Future`] that resolves when a shutdown event is fired. -#[derive(Debug, Clone)] -pub struct Shutdown( - /// The internal [`oneshot`] channel receiver, wrapped in a - /// [`futures_util::FutureExt::shared`] so that it can be cloned and polled from multiple - /// tasks. - Shared>, -); - -impl Future for Shutdown { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let pin = self.get_mut(); - if pin.0.is_terminated() || pin.0.poll_unpin(cx).is_ready() { - Poll::Ready(()) - } else { - Poll::Pending - } - } -} - -/// Shutdown signal that fires either manually or on drop by closing the channel -#[derive(Debug)] -pub struct Signal(oneshot::Sender<()>); - -impl Signal { - /// Fire the signal manually. - pub fn fire(self) { - let _ = self.0.send(()); - } -} - -/// Create a channel pair that's used to propagate shutdown event -pub fn signal() -> (Signal, Shutdown) { - let (sender, receiver) = oneshot::channel(); - (Signal(sender), Shutdown(receiver.shared())) -} - -#[cfg(test)] -mod tests { - use super::*; - use futures_util::future::join_all; - use std::time::Duration; - - #[tokio::test(flavor = "multi_thread")] - async fn test_shutdown() { - let (_signal, _shutdown) = signal(); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_drop_signal() { - let (signal, shutdown) = signal(); - - tokio::task::spawn(async move { - tokio::time::sleep(Duration::from_millis(500)).await; - drop(signal) - }); - - shutdown.await; - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_multi_shutdowns() { - let (signal, shutdown) = signal(); - - let mut tasks = Vec::with_capacity(100); - for _ in 0..100 { - let shutdown = shutdown.clone(); - let task = tokio::task::spawn(async move { - shutdown.await; - }); - tasks.push(task); - } - - drop(signal); - - join_all(tasks).await; - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_drop_signal_from_thread() { - let (signal, shutdown) = signal(); - - let _thread = std::thread::spawn(|| { - std::thread::sleep(Duration::from_millis(500)); - drop(signal) - }); - - shutdown.await; - } -} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index f3f47768..c592c0fc 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -36,7 +36,7 @@ pub(crate) async fn spawn_ingress(builder_url: Option) -> IngressClient< let builder_listener = None; let address = user_listener.local_addr().unwrap(); - let task_manager = flowproxy::tasks::TaskManager::current(); + let task_manager = rbuilder_utils::tasks::TaskManager::current(); tokio::spawn( async move {