diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 336895f8..3f5316b6 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -26,7 +26,6 @@ env: MSRV: 1.75.0 HACK: hack --package neo4rs --each-feature --exclude-features unstable-serde-packstream-format,unstable-bolt-protocol-impl-v2,unstable-streaming-summary - jobs: check: name: Compile on MSRV diff --git a/ci/Cargo.lock.min b/ci/Cargo.lock.min index 549b2cab..cea6743e 100644 --- a/ci/Cargo.lock.min +++ b/ci/Cargo.lock.min @@ -73,6 +73,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "futures-core", + "getrandom", + "instant", + "pin-project-lite", + "rand", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.58" @@ -886,6 +900,12 @@ dependencies = [ "serde", ] +[[package]] +name = "instant" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fba4f4144401d448848bda978e2a037f002e58438d5b89195352a773e8d874b" + [[package]] name = "ipconfig" version = "0.3.0" @@ -1125,6 +1145,7 @@ dependencies = [ name = "neo4rs" version = "0.8.0-alpha.1" dependencies = [ + "backoff", "bytes", "chrono", "chrono-tz", diff --git a/ci/Cargo.lock.msrv b/ci/Cargo.lock.msrv index 9c24c612..2035d511 100644 --- a/ci/Cargo.lock.msrv +++ b/ci/Cargo.lock.msrv @@ -73,6 +73,20 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "futures-core", + "getrandom", + "instant", + "pin-project-lite", + "rand", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.73" @@ -164,24 +178,30 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "bytemuck" -version = "1.16.1" +version = "1.16.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "102087e286b4677862ea56cf8fc58bb2cdfa8725c40ffb80fe3a008eb7f2fc83" + +[[package]] +name = "byteorder" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b236fc92302c97ed75b38da1f4917b5cdda4984745740f153a5d3059e48d725e" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.6.1" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" +checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" dependencies = [ "serde", ] [[package]] name = "cc" -version = "1.1.6" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2aba8f4e9906c7ce3c73463f62a7f0c65183ada1a2d47e397cc8810827f9694f" +checksum = "504bdec147f2cc13c8b57ed9401fd8a147cc66b67ad5cb241394244f2c947549" [[package]] name = "cfg-if" @@ -530,7 +550,7 @@ dependencies = [ "futures-core", "futures-sink", "http", - "indexmap 2.2.6", + "indexmap 2.3.0", "slab", "tokio", "tokio-util", @@ -753,9 +773,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ab92f4f49ee4fb4f997c784b7a2e0fa70050211e0b6a287f898c3c9785ca956" +checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9" dependencies = [ "bytes", "futures-channel", @@ -848,15 +868,24 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.6" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" +checksum = "de3fc2e30ba82dd1b3911c8de1ffc143c74a914a14e99514d7637e3099df5ea0" dependencies = [ "equivalent", "hashbrown 0.14.5", "serde", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "ipconfig" version = "0.3.2" @@ -991,9 +1020,9 @@ checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" [[package]] name = "matrixmultiply" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7574c1cf36da4798ab73da5b215bbf444f50718207754cb522201d78d1cd0ff2" +checksum = "9380b911e3e96d10c1f415da0876389aaf1b56759054eeb0de7df940c456ba1a" dependencies = [ "autocfg", "rawpointer", @@ -1072,6 +1101,7 @@ dependencies = [ name = "neo4rs" version = "0.8.0-alpha.1" dependencies = [ + "backoff", "bytes", "chrono", "chrono-tz", @@ -1167,9 +1197,9 @@ dependencies = [ [[package]] name = "object" -version = "0.36.2" +version = "0.36.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f203fa8daa7bb185f760ae12bd8e097f63d17041dcdcaf675ac54cdf863170e" +checksum = "27b64972346851a39438c60b341ebc01bba47464ae329e55cf343eb93964efd9" dependencies = [ "memchr", ] @@ -1339,9 +1369,12 @@ checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" [[package]] name = "ppv-lite86" -version = "0.2.17" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" +dependencies = [ + "zerocopy", +] [[package]] name = "pretty_env_logger" @@ -1370,9 +1403,9 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" [[package]] name = "quinn" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4ceeeeabace7857413798eb1ffa1e9c905a9946a57d81fb69b4b71c4d8eb3ad" +checksum = "b22d8e7369034b9a7132bc2008cac12f2013c8132b45e0554e6e20e2617f2156" dependencies = [ "bytes", "pin-project-lite", @@ -1380,6 +1413,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls 0.23.12", + "socket2", "thiserror", "tokio", "tracing", @@ -1387,9 +1421,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.3" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddf517c03a109db8100448a4be38d498df8a210a99fe0e1b9eaf39e78c640efe" +checksum = "ba92fb39ec7ad06ca2582c0ca834dfeadcaf06ddfc8e635c80aa7e1c05315fdd" dependencies = [ "bytes", "rand", @@ -1411,6 +1445,7 @@ dependencies = [ "libc", "once_cell", "socket2", + "tracing", "windows-sys 0.52.0", ] @@ -1481,9 +1516,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.5" +version = "1.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b91213439dad192326a0d7c6ee3955910425f441d7038e0d6933b0aec5c4517f" +checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619" dependencies = [ "aho-corasick", "memchr", @@ -1587,9 +1622,9 @@ checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" [[package]] name = "rustc-hash" -version = "1.1.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152" [[package]] name = "rustls" @@ -1634,9 +1669,9 @@ dependencies = [ [[package]] name = "rustls-pemfile" -version = "2.1.2" +version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d" +checksum = "196fe16b00e106300d3e45ecfcb764fa292a535d7326a29a5875c579c7417425" dependencies = [ "base64 0.22.1", "rustls-pki-types", @@ -1644,9 +1679,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" +checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0" [[package]] name = "rustls-webpki" @@ -1743,11 +1778,12 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.120" +version = "1.0.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e0d21c9a8cae1235ad58a00c11cb40d4b1e5c784f1ef2c537876ed6ffd8b7c5" +checksum = "784b6203951c57ff748476b126ccb5e8e2959a5c19e5c617ab1956be3dbc68da" dependencies = [ "itoa", + "memchr", "ryu", "serde", ] @@ -1765,9 +1801,9 @@ dependencies = [ [[package]] name = "serde_test" -version = "1.0.176" +version = "1.0.177" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a2f49ace1498612d14f7e0b8245519584db8299541dfe31a06374a828d620ab" +checksum = "7f901ee573cab6b3060453d2d5f0bae4e6d628c23c0a962ff9b5f1d7c8d4f1ed" dependencies = [ "serde", ] @@ -1794,7 +1830,7 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.2.6", + "indexmap 2.3.0", "serde", "serde_derive", "serde_json", @@ -2079,9 +2115,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.39.1" +version = "1.39.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d040ac2b29ab03b09d4129c2f5bbd012a3ac2f79d38ff506a4bf8dd34b0eac8a" +checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1" dependencies = [ "backtrace", "bytes", @@ -2404,11 +2440,11 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" [[package]] name = "winapi-util" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d4cc384e1e73b93bafa6fb4f1df8c41695c8a91cf9c4c64358067d15a7b6c6b" +checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2444,6 +2480,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-targets" version = "0.48.5" @@ -2607,6 +2652,27 @@ dependencies = [ "xshell", ] +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "byteorder", + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zeroize" version = "1.8.1" diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 607d3484..c7ca59b5 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -28,6 +28,7 @@ unstable-bolt-protocol-impl-v2 = [ ] [dependencies] +backoff = { version = "0.4.0", features = ["tokio"] } bytes = { version = "1.5.0", features = ["serde"] } chrono-tz = "0.9.0" delegate = "0.12.0" diff --git a/lib/examples/concurrent_writes.rs b/lib/examples/concurrent_writes.rs new file mode 100644 index 00000000..0c9438d9 --- /dev/null +++ b/lib/examples/concurrent_writes.rs @@ -0,0 +1,103 @@ +use futures::stream::{self, StreamExt, TryStreamExt}; +use neo4rs::{query, ConfigBuilder, Graph}; + +#[tokio::main] +async fn main() { + pretty_env_logger::init(); + + let neo4j_uri = std::env::var("NEO4J_URI").unwrap(); + let neo4j_username = std::env::var("NEO4J_USERNAME").unwrap(); + let neo4j_password = std::env::var("NEO4J_PASSWORD").unwrap(); + + let graph = Graph::connect( + ConfigBuilder::new() + .uri(neo4j_uri) + .user(neo4j_username) + .password(neo4j_password) + .max_connections(420) + .build() + .unwrap(), + ) + .await + .unwrap(); + + stream::iter(1..=1337) + .map(|i| work(i, graph.clone())) + .buffer_unordered(420) + .map(|(i, node_count, rel_count)| { + if i % 100 == 0 || i == 1337 { + println!("iteration: {i}, node count: {node_count}, rel count: {rel_count}"); + } + }) + .collect::<()>() + .await; +} + +async fn work(i: u64, graph: Graph) -> (u64, u64, u64) { + graph + .run(query( + " +CREATE + (dan:Person {name: 'Dan'}), + (annie:Person {name: 'Annie'}), + (matt:Person {name: 'Matt'}), + (jeff:Person {name: 'Jeff'}), + (brie:Person {name: 'Brie'}), + (elsa:Person {name: 'Elsa'}), + + (cookies:Product {name: 'Cookies'}), + (tomatoes:Product {name: 'Tomatoes'}), + (cucumber:Product {name: 'Cucumber'}), + (celery:Product {name: 'Celery'}), + (kale:Product {name: 'Kale'}), + (milk:Product {name: 'Milk'}), + (chocolate:Product {name: 'Chocolate'}), + + (dan)-[:BUYS {amount: 1.2}]->(cookies), + (dan)-[:BUYS {amount: 3.2}]->(milk), + (dan)-[:BUYS {amount: 2.2}]->(chocolate), + + (annie)-[:BUYS {amount: 1.2}]->(cucumber), + (annie)-[:BUYS {amount: 3.2}]->(milk), + (annie)-[:BUYS {amount: 3.2}]->(tomatoes), + + (matt)-[:BUYS {amount: 3}]->(tomatoes), + (matt)-[:BUYS {amount: 2}]->(kale), + (matt)-[:BUYS {amount: 1}]->(cucumber), + + (jeff)-[:BUYS {amount: 3}]->(cookies), + (jeff)-[:BUYS {amount: 2}]->(milk), + + (brie)-[:BUYS {amount: 1}]->(tomatoes), + (brie)-[:BUYS {amount: 2}]->(milk), + (brie)-[:BUYS {amount: 2}]->(kale), + (brie)-[:BUYS {amount: 3}]->(cucumber), + (brie)-[:BUYS {amount: 0.3}]->(celery), + + (elsa)-[:BUYS {amount: 3}]->(chocolate), + (elsa)-[:BUYS {amount: 3}]->(milk) +", + )) + .await + .unwrap(); + + let node_count = graph + .execute(query("MATCH (n) RETURN count(n) AS count")) + .await + .unwrap() + .column_into_stream::("count") + .try_fold(0_u64, |acc, x| async move { Ok(acc + x) }) + .await + .unwrap(); + + let rel_count = graph + .execute(query("MATCH ()-[r]->() RETURN count(r) AS count")) + .await + .unwrap() + .column_into_stream::("count") + .try_fold(0_u64, |acc, x| async move { Ok(acc + x) }) + .await + .unwrap(); + + (i, node_count, rel_count) +} diff --git a/lib/src/errors.rs b/lib/src/errors.rs index 02cae19d..cc69b1d0 100644 --- a/lib/src/errors.rs +++ b/lib/src/errors.rs @@ -95,6 +95,112 @@ pub enum Error { DeserializationError(DeError), } +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum Neo4jErrorKind { + Client(Neo4jClientErrorKind), + Transient, + Database, + Unknown, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum Neo4jClientErrorKind { + Security(Neo4jSecurityErrorKind), + SessionExpired, + FatalDiscovery, + TransactionTerminated, + ProtocolViolation, + Other, + Unknown, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum Neo4jSecurityErrorKind { + Authentication, + AuthorizationExpired, + TokenExpired, + Other, + Unknown, +} + +impl Neo4jErrorKind { + pub(crate) fn new(code: &str) -> Self { + let code = Self::adjust_code(code).unwrap_or(code); + Self::classify(code) + } + + fn adjust_code(code: &str) -> Option<&str> { + match code { + "Neo.TransientError.Transaction.LockClientStopped" => { + Some("Neo.ClientError.Transaction.LockClientStopped") + } + "Neo.TransientError.Transaction.Terminated" => { + Some("Neo.ClientError.Transaction.Terminated") + } + _ => None, + } + } + + fn classify(code: &str) -> Self { + let mut parts = code.split('.').skip(1); + let [class, subclass, kind] = [parts.next(), parts.next(), parts.next()]; + + match class { + Some("ClientError") => match (subclass, kind) { + (Some("Security"), Some("Unauthorized")) => Self::Client( + Neo4jClientErrorKind::Security(Neo4jSecurityErrorKind::Authentication), + ), + (Some("Security"), Some("AuthorizationExpired")) => Self::Client( + Neo4jClientErrorKind::Security(Neo4jSecurityErrorKind::AuthorizationExpired), + ), + (Some("Security"), Some("TokenExpired")) => Self::Client( + Neo4jClientErrorKind::Security(Neo4jSecurityErrorKind::TokenExpired), + ), + (Some("Database"), Some("DatabaseNotFound")) => { + Self::Client(Neo4jClientErrorKind::FatalDiscovery) + } + (Some("Transaction"), Some("Terminated")) => { + Self::Client(Neo4jClientErrorKind::TransactionTerminated) + } + (Some("Security"), Some(_)) => Self::Client(Neo4jClientErrorKind::Security( + Neo4jSecurityErrorKind::Other, + )), + (Some("Security"), _) => Self::Client(Neo4jClientErrorKind::Security( + Neo4jSecurityErrorKind::Unknown, + )), + (Some("Request"), _) => Self::Client(Neo4jClientErrorKind::ProtocolViolation), + (Some("Cluster"), Some("NotALeader")) => { + Self::Client(Neo4jClientErrorKind::SessionExpired) + } + (Some("General"), Some("ForbiddenOnReadOnlyDatabase")) => { + Self::Client(Neo4jClientErrorKind::SessionExpired) + } + (Some(_), _) => Self::Client(Neo4jClientErrorKind::Other), + _ => Self::Client(Neo4jClientErrorKind::Unknown), + }, + Some("TransientError") => Self::Transient, + Some(_) => Self::Database, + None => Self::Unknown, + } + } + + pub(crate) fn can_retry(&self) -> bool { + matches!( + self, + Self::Client( + Neo4jClientErrorKind::Security(Neo4jSecurityErrorKind::AuthorizationExpired) + | Neo4jClientErrorKind::SessionExpired + ) | Self::Transient + ) + } +} + +impl From<&str> for Neo4jErrorKind { + fn from(code: &str) -> Self { + Self::new(code) + } +} + impl std::convert::From> for Error { fn from(e: deadpool::managed::PoolError) -> Self { match e { diff --git a/lib/src/graph.rs b/lib/src/graph.rs index 8124478d..4b92c458 100644 --- a/lib/src/graph.rs +++ b/lib/src/graph.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use crate::{ config::{Config, ConfigBuilder, Database, LiveConfig}, errors::Result, @@ -48,6 +50,8 @@ impl Graph { /// Starts a new transaction on the configured database. /// All queries that needs to be run/executed within the transaction /// should be executed using either [`Txn::run`] or [`Txn::execute`] + /// + /// Transactions will not be automatically retried on any failure. pub async fn start_txn(&self) -> Result { self.start_txn_on(self.config.db.clone()).await } @@ -55,6 +59,8 @@ impl Graph { /// Starts a new transaction on the provided database. /// All queries that needs to be run/executed within the transaction /// should be executed using either [`Txn::run`] or [`Txn::execute`] + /// + /// Transactions will not be automatically retried on any failure. pub async fn start_txn_on(&self, db: impl Into) -> Result { let connection = self.pool.get().await?; Txn::new(db.into(), self.config.fetch_size, connection).await @@ -63,6 +69,11 @@ impl Graph { /// Runs a query on the configured database using a connection from the connection pool, /// It doesn't return any [`RowStream`] as the `run` abstraction discards any stream. /// + /// This operation retires the query on certain failures. + /// All errors with the `Transient` error class as well as a few other error classes are considered retryable. + /// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted. + /// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry. + /// /// Use [`Graph::run`] for cases where you just want a write operation /// /// use [`Graph::execute`] when you are interested in the result stream @@ -73,23 +84,70 @@ impl Graph { /// Runs a query on the provided database using a connection from the connection pool. /// It doesn't return any [`RowStream`] as the `run` abstraction discards any stream. /// + /// This operation retires the query on certain failures. + /// All errors with the `Transient` error class as well as a few other error classes are considered retryable. + /// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted. + /// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry. + /// /// Use [`Graph::run`] for cases where you just want a write operation /// /// use [`Graph::execute`] when you are interested in the result stream pub async fn run_on(&self, db: &str, q: Query) -> Result<()> { - let mut connection = self.pool.get().await?; - q.run(db, &mut connection).await + backoff::future::retry_notify( + self.pool.manager().backoff(), + || { + let pool = &self.pool; + let query = &q; + async move { + let mut connection = pool.get().await.map_err(crate::Error::from)?; + query.run_retryable(db, &mut connection).await + } + }, + Self::log_retry, + ) + .await } /// Executes a query on the configured database and returns a [`DetachedRowStream`] + /// + /// This operation retires the query on certain failures. + /// All errors with the `Transient` error class as well as a few other error classes are considered retryable. + /// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted. + /// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry. pub async fn execute(&self, q: Query) -> Result { self.execute_on(&self.config.db, q).await } /// Executes a query on the provided database and returns a [`DetaRowStream`] + /// + /// This operation retires the query on certain failures. + /// All errors with the `Transient` error class as well as a few other error classes are considered retryable. + /// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted. + /// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry. pub async fn execute_on(&self, db: &str, q: Query) -> Result { - let connection = self.pool.get().await?; - q.execute(db, self.config.fetch_size, connection).await + backoff::future::retry_notify( + self.pool.manager().backoff(), + || { + let pool = &self.pool; + let fetch_size = self.config.fetch_size; + let query = &q; + async move { + let connection = pool.get().await.map_err(crate::Error::from)?; + query.execute_retryable(db, fetch_size, connection).await + } + }, + Self::log_retry, + ) + .await + } + + fn log_retry(e: crate::Error, delay: Duration) { + let level = match delay.as_millis() { + 0..=499 => log::Level::Debug, + 500..=4999 => log::Level::Info, + _ => log::Level::Warn, + }; + log::log!(level, "Retrying query in {delay:?} due to error: {e}"); } } diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 0b2243df..85ca126b 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -444,7 +444,7 @@ mod version; pub use crate::auth::ClientCertificate; pub use crate::config::{Config, ConfigBuilder, Database}; -pub use crate::errors::*; +pub use crate::errors::{Error, Result}; pub use crate::graph::{query, Graph}; pub use crate::query::Query; pub use crate::row::{Node, Path, Point2D, Point3D, Relation, Row, UnboundedRelation}; @@ -460,3 +460,6 @@ pub use crate::types::{ BoltPoint2D, BoltPoint3D, BoltRelation, BoltString, BoltTime, BoltType, BoltUnboundedRelation, }; pub use crate::version::Version; + +pub(crate) use errors::Neo4jErrorKind; +pub(crate) use messages::Success; diff --git a/lib/src/messages.rs b/lib/src/messages.rs index b3df41ac..5db2d20f 100644 --- a/lib/src/messages.rs +++ b/lib/src/messages.rs @@ -28,7 +28,7 @@ use record::Record; use reset::Reset; use rollback::Rollback; use run::Run; -use success::Success; +pub(crate) use success::Success; #[derive(Debug, PartialEq, Clone)] pub enum BoltResponse { diff --git a/lib/src/pool.rs b/lib/src/pool.rs index c91a9f76..39eda8e8 100644 --- a/lib/src/pool.rs +++ b/lib/src/pool.rs @@ -1,9 +1,12 @@ -use crate::auth::ClientCertificate; +use std::time::Duration; + use crate::{ + auth::ClientCertificate, config::Config, connection::{Connection, ConnectionInfo}, errors::{Error, Result}, }; +use backoff::{ExponentialBackoff, ExponentialBackoffBuilder}; use deadpool::managed::{Manager, Metrics, Object, Pool, RecycleResult}; use log::info; @@ -12,6 +15,7 @@ pub type ManagedConnection = Object; pub struct ConnectionManager { info: ConnectionInfo, + backoff: ExponentialBackoff, } impl ConnectionManager { @@ -22,7 +26,17 @@ impl ConnectionManager { client_certificate: Option<&ClientCertificate>, ) -> Result { let info = ConnectionInfo::new(uri, user, password, client_certificate)?; - Ok(ConnectionManager { info }) + let backoff = ExponentialBackoffBuilder::new() + .with_initial_interval(Duration::from_millis(1)) + .with_randomization_factor(0.42) + .with_multiplier(2.0) + .with_max_elapsed_time(Some(Duration::from_secs(60))) + .build(); + Ok(ConnectionManager { info, backoff }) + } + + pub fn backoff(&self) -> ExponentialBackoff { + self.backoff.clone() } } diff --git a/lib/src/query.rs b/lib/src/query.rs index 19211201..80bdae84 100644 --- a/lib/src/query.rs +++ b/lib/src/query.rs @@ -4,6 +4,7 @@ use crate::{ pool::ManagedConnection, stream::{DetachedRowStream, RowStream}, types::{BoltList, BoltMap, BoltString, BoltType}, + Error, Neo4jErrorKind, Success, }; /// Abstracts a cypher query that is sent to neo4j server. @@ -43,24 +44,31 @@ impl Query { } pub(crate) async fn run(self, db: &str, connection: &mut ManagedConnection) -> Result<()> { - let run = BoltRequest::run(db, &self.query, self.params); - match connection.send_recv(run).await? { - BoltResponse::Success(_) => match connection.send_recv(BoltRequest::discard()).await? { - BoltResponse::Success(_) => Ok(()), - otherwise => Err(otherwise.into_error("DISCARD")), - }, - msg => Err(msg.into_error("RUN")), - } + let request = BoltRequest::run(db, &self.query, self.params); + Self::try_run(request, connection) + .await + .map_err(unwrap_backoff) } - pub(crate) async fn execute( - self, + pub(crate) async fn run_retryable( + &self, + db: &str, + connection: &mut ManagedConnection, + ) -> Result<(), backoff::Error> { + let request = BoltRequest::run(db, &self.query, self.params.clone()); + Self::try_run(request, connection).await + } + + pub(crate) async fn execute_retryable( + &self, db: &str, fetch_size: usize, mut connection: ManagedConnection, - ) -> Result { - let stream = self.execute_mut(db, fetch_size, &mut connection).await?; - Ok(DetachedRowStream::new(stream, connection)) + ) -> Result> { + let request = BoltRequest::run(db, &self.query, self.params.clone()); + Self::try_execute(request, fetch_size, &mut connection) + .await + .map(|stream| DetachedRowStream::new(stream, connection)) } pub(crate) async fn execute_mut<'conn>( @@ -70,14 +78,38 @@ impl Query { connection: &'conn mut ManagedConnection, ) -> Result { let run = BoltRequest::run(db, &self.query, self.params); - match connection.send_recv(run).await { - Ok(BoltResponse::Success(success)) => { - let fields: BoltList = success.get("fields").unwrap_or_default(); - let qid: i64 = success.get("qid").unwrap_or(-1); - Ok(RowStream::new(qid, fields, fetch_size)) - } - Ok(msg) => Err(msg.into_error("RUN")), - Err(e) => Err(e), + Self::try_execute(run, fetch_size, connection) + .await + .map_err(unwrap_backoff) + } + + async fn try_run(request: BoltRequest, connection: &mut ManagedConnection) -> QueryResult<()> { + let _ = Self::try_request(request, connection).await?; + match connection.send_recv(BoltRequest::discard()).await { + Ok(BoltResponse::Success(_)) => Ok(()), + otherwise => wrap_error(otherwise, "DISCARD"), + } + } + + async fn try_execute( + request: BoltRequest, + fetch_size: usize, + connection: &mut ManagedConnection, + ) -> QueryResult { + Self::try_request(request, connection).await.map(|success| { + let fields: BoltList = success.get("fields").unwrap_or_default(); + let qid: i64 = success.get("qid").unwrap_or(-1); + RowStream::new(qid, fields, fetch_size) + }) + } + + async fn try_request( + request: BoltRequest, + connection: &mut ManagedConnection, + ) -> QueryResult { + match connection.send_recv(request).await { + Ok(BoltResponse::Success(success)) => Ok(success), + otherwise => wrap_error(otherwise, "RUN"), } } } @@ -94,6 +126,36 @@ impl From<&str> for Query { } } +type QueryResult = Result>; + +fn wrap_error(resp: Result, req: &'static str) -> QueryResult { + let can_retry = if let Ok(BoltResponse::Failure(failure)) = &resp { + failure + .get::("code") + .map_or(false, |code| Neo4jErrorKind::new(&code).can_retry()) + } else { + false + }; + + let err = match resp { + Ok(resp) => resp.into_error(req), + Err(e) => e, + }; + + if can_retry { + Err(backoff::Error::transient(err)) + } else { + Err(backoff::Error::permanent(err)) + } +} + +fn unwrap_backoff(err: backoff::Error) -> Error { + match err { + backoff::Error::Permanent(e) => e, + backoff::Error::Transient { err, .. } => err, + } +} + #[cfg(test)] mod tests { use super::*;