From f606a112cab9adde3b90a3bd096a97522a12c042 Mon Sep 17 00:00:00 2001 From: Paul Horn Date: Wed, 7 Aug 2024 19:53:02 +0200 Subject: [PATCH 1/6] Retry on certain query failures for managed transactions When using `Graph::run`, `Graph::run_on`, `Graph::execute`, or `Graph::execute_on`, the query might be retries for a while. The logic is that all errors with the `Transient` error class as well as a few other error classes are considered retryable. This catches errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted. Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails. Transaction created with `Graph::start_txn` or `Graph::start_txn` are *not* retried automatically, only managed transactions are. --- ci/Cargo.lock.min | 15 +++++ ci/Cargo.lock.msrv | 75 +++++++++++++++------ lib/Cargo.toml | 1 + lib/examples/concurrent_writes.rs | 103 +++++++++++++++++++++++++++++ lib/src/errors.rs | 106 ++++++++++++++++++++++++++++++ lib/src/graph.rs | 66 +++++++++++++++++-- lib/src/lib.rs | 5 +- lib/src/messages.rs | 2 +- lib/src/pool.rs | 16 ++++- lib/src/query.rs | 99 ++++++++++++++++++++++------ 10 files changed, 440 insertions(+), 48 deletions(-) create mode 100644 lib/examples/concurrent_writes.rs diff --git a/ci/Cargo.lock.min b/ci/Cargo.lock.min index c8f566c0..fde1b4f1 100644 --- a/ci/Cargo.lock.min +++ b/ci/Cargo.lock.min @@ -54,6 +54,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "futures-core", + "getrandom", + "instant", + "pin-project-lite", + "rand", + "tokio", +] + [[package]] name = "base64" version = "0.21.0" @@ -709,6 +723,7 @@ name = "neo4rs" version = "0.7.3" dependencies = [ "async-trait", + "backoff", "bytes", "chrono", "chrono-tz", diff --git a/ci/Cargo.lock.msrv b/ci/Cargo.lock.msrv index 0fa516f6..7fbc9ad5 100644 --- a/ci/Cargo.lock.msrv +++ b/ci/Cargo.lock.msrv @@ -63,6 +63,20 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "futures-core", + "getrandom", + "instant", + "pin-project-lite", + "rand", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.73" @@ -129,18 +143,18 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.7.0" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fca2be1d5c43812bae364ee3f30b3afcb7877cf59f4aeb94c66f313a41d2fac9" +checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" dependencies = [ "serde", ] [[package]] name = "cc" -version = "1.1.7" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26a5c3fd7bfa1ce3897a3a3501d362b2d87b7f2583ebcb4a949ec25911025cbc" +checksum = "504bdec147f2cc13c8b57ed9401fd8a147cc66b67ad5cb241394244f2c947549" [[package]] name = "cfg-if" @@ -568,6 +582,15 @@ dependencies = [ "serde", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "itoa" version = "1.0.11" @@ -671,6 +694,7 @@ name = "neo4rs" version = "0.7.3" dependencies = [ "async-trait", + "backoff", "bytes", "chrono", "chrono-tz", @@ -739,9 +763,9 @@ dependencies = [ [[package]] name = "object" -version = "0.36.2" +version = "0.36.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f203fa8daa7bb185f760ae12bd8e097f63d17041dcdcaf675ac54cdf863170e" +checksum = "27b64972346851a39438c60b341ebc01bba47464ae329e55cf343eb93964efd9" dependencies = [ "memchr", ] @@ -854,9 +878,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "ppv-lite86" -version = "0.2.18" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dee4364d9f3b902ef14fab8a1ddffb783a1cb6b4bba3bfc1fa3922732c7de97f" +checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" dependencies = [ "zerocopy", ] @@ -1019,9 +1043,9 @@ dependencies = [ [[package]] name = "rustls-pemfile" -version = "2.1.2" +version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d" +checksum = "196fe16b00e106300d3e45ecfcb764fa292a535d7326a29a5875c579c7417425" dependencies = [ "base64 0.22.1", "rustls-pki-types", @@ -1029,9 +1053,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" +checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0" [[package]] name = "rustls-webpki" @@ -1119,9 +1143,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.121" +version = "1.0.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ab380d7d9f22ef3f21ad3e6c1ebe8e4fc7a2000ccba2e4d71fc96f15b2cb609" +checksum = "784b6203951c57ff748476b126ccb5e8e2959a5c19e5c617ab1956be3dbc68da" dependencies = [ "itoa", "memchr", @@ -1566,11 +1590,11 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" [[package]] name = "winapi-util" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d4cc384e1e73b93bafa6fb4f1df8c41695c8a91cf9c4c64358067d15a7b6c6b" +checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1606,6 +1630,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-targets" version = "0.48.5" @@ -1751,9 +1784,9 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.6.6" +version = "0.7.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "854e949ac82d619ee9a14c66a1b674ac730422372ccb759ce0c39cabcf2bf8e6" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ "byteorder", "zerocopy-derive", @@ -1761,9 +1794,9 @@ dependencies = [ [[package]] name = "zerocopy-derive" -version = "0.6.6" +version = "0.7.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "125139de3f6b9d625c39e2efdd73d41bdac468ccd556556440e322be0e1bbd91" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 9f308c1c..9e76ca69 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -21,6 +21,7 @@ json = ["serde_json"] [dependencies] async-trait = "0.1.0" +backoff = { version = "0.4.0", features = ["tokio"] } bytes = { version = "1.5.0", features = ["serde"] } chrono-tz = "0.8.3" delegate = "0.10.0" diff --git a/lib/examples/concurrent_writes.rs b/lib/examples/concurrent_writes.rs new file mode 100644 index 00000000..0c9438d9 --- /dev/null +++ b/lib/examples/concurrent_writes.rs @@ -0,0 +1,103 @@ +use futures::stream::{self, StreamExt, TryStreamExt}; +use neo4rs::{query, ConfigBuilder, Graph}; + +#[tokio::main] +async fn main() { + pretty_env_logger::init(); + + let neo4j_uri = std::env::var("NEO4J_URI").unwrap(); + let neo4j_username = std::env::var("NEO4J_USERNAME").unwrap(); + let neo4j_password = std::env::var("NEO4J_PASSWORD").unwrap(); + + let graph = Graph::connect( + ConfigBuilder::new() + .uri(neo4j_uri) + .user(neo4j_username) + .password(neo4j_password) + .max_connections(420) + .build() + .unwrap(), + ) + .await + .unwrap(); + + stream::iter(1..=1337) + .map(|i| work(i, graph.clone())) + .buffer_unordered(420) + .map(|(i, node_count, rel_count)| { + if i % 100 == 0 || i == 1337 { + println!("iteration: {i}, node count: {node_count}, rel count: {rel_count}"); + } + }) + .collect::<()>() + .await; +} + +async fn work(i: u64, graph: Graph) -> (u64, u64, u64) { + graph + .run(query( + " +CREATE + (dan:Person {name: 'Dan'}), + (annie:Person {name: 'Annie'}), + (matt:Person {name: 'Matt'}), + (jeff:Person {name: 'Jeff'}), + (brie:Person {name: 'Brie'}), + (elsa:Person {name: 'Elsa'}), + + (cookies:Product {name: 'Cookies'}), + (tomatoes:Product {name: 'Tomatoes'}), + (cucumber:Product {name: 'Cucumber'}), + (celery:Product {name: 'Celery'}), + (kale:Product {name: 'Kale'}), + (milk:Product {name: 'Milk'}), + (chocolate:Product {name: 'Chocolate'}), + + (dan)-[:BUYS {amount: 1.2}]->(cookies), + (dan)-[:BUYS {amount: 3.2}]->(milk), + (dan)-[:BUYS {amount: 2.2}]->(chocolate), + + (annie)-[:BUYS {amount: 1.2}]->(cucumber), + (annie)-[:BUYS {amount: 3.2}]->(milk), + (annie)-[:BUYS {amount: 3.2}]->(tomatoes), + + (matt)-[:BUYS {amount: 3}]->(tomatoes), + (matt)-[:BUYS {amount: 2}]->(kale), + (matt)-[:BUYS {amount: 1}]->(cucumber), + + (jeff)-[:BUYS {amount: 3}]->(cookies), + (jeff)-[:BUYS {amount: 2}]->(milk), + + (brie)-[:BUYS {amount: 1}]->(tomatoes), + (brie)-[:BUYS {amount: 2}]->(milk), + (brie)-[:BUYS {amount: 2}]->(kale), + (brie)-[:BUYS {amount: 3}]->(cucumber), + (brie)-[:BUYS {amount: 0.3}]->(celery), + + (elsa)-[:BUYS {amount: 3}]->(chocolate), + (elsa)-[:BUYS {amount: 3}]->(milk) +", + )) + .await + .unwrap(); + + let node_count = graph + .execute(query("MATCH (n) RETURN count(n) AS count")) + .await + .unwrap() + .column_into_stream::("count") + .try_fold(0_u64, |acc, x| async move { Ok(acc + x) }) + .await + .unwrap(); + + let rel_count = graph + .execute(query("MATCH ()-[r]->() RETURN count(r) AS count")) + .await + .unwrap() + .column_into_stream::("count") + .try_fold(0_u64, |acc, x| async move { Ok(acc + x) }) + .await + .unwrap(); + + (i, node_count, rel_count) +} diff --git a/lib/src/errors.rs b/lib/src/errors.rs index 44adc28f..65547ab2 100644 --- a/lib/src/errors.rs +++ b/lib/src/errors.rs @@ -62,6 +62,112 @@ pub enum Error { DeserializationError(DeError), } +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum Neo4jErrorKind { + Client(Neo4jClientErrorKind), + Transient, + Database, + Unknown, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum Neo4jClientErrorKind { + Security(Neo4jSecurityErrorKind), + SessionExpired, + FatalDiscovery, + TransactionTerminated, + ProtocolViolation, + Other, + Unknown, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum Neo4jSecurityErrorKind { + Authentication, + AuthorizationExpired, + TokenExpired, + Other, + Unknown, +} + +impl Neo4jErrorKind { + pub(crate) fn new(code: &str) -> Self { + let code = Self::adjust_code(code).unwrap_or(code); + Self::classify(code) + } + + fn adjust_code(code: &str) -> Option<&str> { + match code { + "Neo.TransientError.Transaction.LockClientStopped" => { + Some("Neo.ClientError.Transaction.LockClientStopped") + } + "Neo.TransientError.Transaction.Terminated" => { + Some("Neo.ClientError.Transaction.Terminated") + } + _ => None, + } + } + + fn classify(code: &str) -> Self { + let mut parts = code.split('.').skip(1); + let [class, subclass, kind] = [parts.next(), parts.next(), parts.next()]; + + match class { + Some("ClientError") => match (subclass, kind) { + (Some("Security"), Some("Unauthorized")) => Self::Client( + Neo4jClientErrorKind::Security(Neo4jSecurityErrorKind::Authentication), + ), + (Some("Security"), Some("AuthorizationExpired")) => Self::Client( + Neo4jClientErrorKind::Security(Neo4jSecurityErrorKind::AuthorizationExpired), + ), + (Some("Security"), Some("TokenExpired")) => Self::Client( + Neo4jClientErrorKind::Security(Neo4jSecurityErrorKind::TokenExpired), + ), + (Some("Database"), Some("DatabaseNotFound")) => { + Self::Client(Neo4jClientErrorKind::FatalDiscovery) + } + (Some("Transaction"), Some("Terminated")) => { + Self::Client(Neo4jClientErrorKind::TransactionTerminated) + } + (Some("Security"), Some(_)) => Self::Client(Neo4jClientErrorKind::Security( + Neo4jSecurityErrorKind::Other, + )), + (Some("Security"), _) => Self::Client(Neo4jClientErrorKind::Security( + Neo4jSecurityErrorKind::Unknown, + )), + (Some("Request"), _) => Self::Client(Neo4jClientErrorKind::ProtocolViolation), + (Some("Cluster"), Some("NotALeader")) => { + Self::Client(Neo4jClientErrorKind::SessionExpired) + } + (Some("General"), Some("ForbiddenOnReadOnlyDatabase")) => { + Self::Client(Neo4jClientErrorKind::SessionExpired) + } + (Some(_), _) => Self::Client(Neo4jClientErrorKind::Other), + _ => Self::Client(Neo4jClientErrorKind::Unknown), + }, + Some("TransientError") => Self::Transient, + Some(_) => Self::Database, + None => Self::Unknown, + } + } + + pub(crate) fn can_retry(&self) -> bool { + matches!( + self, + Self::Client( + Neo4jClientErrorKind::Security(Neo4jSecurityErrorKind::AuthorizationExpired) + | Neo4jClientErrorKind::SessionExpired + ) | Self::Transient + ) + } +} + +impl From<&str> for Neo4jErrorKind { + fn from(code: &str) -> Self { + Self::new(code) + } +} + impl std::convert::From> for Error { fn from(e: deadpool::managed::PoolError) -> Self { match e { diff --git a/lib/src/graph.rs b/lib/src/graph.rs index 8124478d..8956c7fa 100644 --- a/lib/src/graph.rs +++ b/lib/src/graph.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use crate::{ config::{Config, ConfigBuilder, Database, LiveConfig}, errors::Result, @@ -48,6 +50,8 @@ impl Graph { /// Starts a new transaction on the configured database. /// All queries that needs to be run/executed within the transaction /// should be executed using either [`Txn::run`] or [`Txn::execute`] + /// + /// Transactions will not be automatically retried on any failure. pub async fn start_txn(&self) -> Result { self.start_txn_on(self.config.db.clone()).await } @@ -55,6 +59,8 @@ impl Graph { /// Starts a new transaction on the provided database. /// All queries that needs to be run/executed within the transaction /// should be executed using either [`Txn::run`] or [`Txn::execute`] + /// + /// Transactions will not be automatically retried on any failure. pub async fn start_txn_on(&self, db: impl Into) -> Result { let connection = self.pool.get().await?; Txn::new(db.into(), self.config.fetch_size, connection).await @@ -63,6 +69,11 @@ impl Graph { /// Runs a query on the configured database using a connection from the connection pool, /// It doesn't return any [`RowStream`] as the `run` abstraction discards any stream. /// + /// This operation retires the query on certain failures. + /// All errors with the `Transient` error class as well as a few other error classes are considered retryable. + /// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted. + /// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry. + /// /// Use [`Graph::run`] for cases where you just want a write operation /// /// use [`Graph::execute`] when you are interested in the result stream @@ -73,23 +84,70 @@ impl Graph { /// Runs a query on the provided database using a connection from the connection pool. /// It doesn't return any [`RowStream`] as the `run` abstraction discards any stream. /// + /// This operation retires the query on certain failures. + /// All errors with the `Transient` error class as well as a few other error classes are considered retryable. + /// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted. + /// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry. + /// /// Use [`Graph::run`] for cases where you just want a write operation /// /// use [`Graph::execute`] when you are interested in the result stream pub async fn run_on(&self, db: &str, q: Query) -> Result<()> { - let mut connection = self.pool.get().await?; - q.run(db, &mut connection).await + backoff::future::retry_notify( + self.pool.manager().backoff(), + || { + let pool = &self.pool; + let query = &q; + async move { + let mut connection = pool.get().await.map_err(crate::Error::from)?; + query.run_retryable(db, &mut connection).await + } + }, + Self::log_retry, + ) + .await } /// Executes a query on the configured database and returns a [`DetachedRowStream`] + /// + /// This operation retires the query on certain failures. + /// All errors with the `Transient` error class as well as a few other error classes are considered retryable. + /// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted. + /// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry. pub async fn execute(&self, q: Query) -> Result { self.execute_on(&self.config.db, q).await } /// Executes a query on the provided database and returns a [`DetaRowStream`] + /// + /// This operation retires the query on certain failures. + /// All errors with the `Transient` error class as well as a few other error classes are considered retryable. + /// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted. + /// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry. pub async fn execute_on(&self, db: &str, q: Query) -> Result { - let connection = self.pool.get().await?; - q.execute(db, self.config.fetch_size, connection).await + backoff::future::retry_notify( + self.pool.manager().backoff(), + || { + let pool = &self.pool; + let fetch_size = self.config.fetch_size; + let query = &q; + async move { + let connection = pool.get().await.map_err(crate::Error::from)?; + query.execute_retryable(db, fetch_size, connection).await + } + }, + Self::log_retry, + ) + .await + } + + fn log_retry(e: crate::Error, delay: Duration) { + let level = match delay.as_millis() { + ..500 => log::Level::Debug, + 500..5000 => log::Level::Info, + _ => log::Level::Warn, + }; + log::log!(level, "Retrying query in {delay:?} due to error: {e}"); } } diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 1b61eba8..3d6fe047 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -438,7 +438,7 @@ mod version; pub use crate::auth::ClientCertificate; pub use crate::config::{Config, ConfigBuilder, Database}; -pub use crate::errors::*; +pub use crate::errors::{unexpected, Error, Result}; pub use crate::graph::{query, Graph}; pub use crate::query::Query; pub use crate::row::{Node, Path, Point2D, Point3D, Relation, Row, UnboundedRelation}; @@ -454,3 +454,6 @@ pub use crate::types::{ BoltPoint2D, BoltPoint3D, BoltRelation, BoltString, BoltTime, BoltType, BoltUnboundedRelation, }; pub use crate::version::Version; + +pub(crate) use errors::Neo4jErrorKind; +pub(crate) use messages::Success; diff --git a/lib/src/messages.rs b/lib/src/messages.rs index 4fc8f84e..384985a7 100644 --- a/lib/src/messages.rs +++ b/lib/src/messages.rs @@ -28,7 +28,7 @@ use record::Record; use reset::Reset; use rollback::Rollback; use run::Run; -use success::Success; +pub(crate) use success::Success; #[derive(Debug, PartialEq, Clone)] pub enum BoltResponse { diff --git a/lib/src/pool.rs b/lib/src/pool.rs index ee7165c2..4fe48248 100644 --- a/lib/src/pool.rs +++ b/lib/src/pool.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use crate::{ auth::ClientCertificate, config::Config, @@ -5,6 +7,7 @@ use crate::{ errors::{Error, Result}, }; use async_trait::async_trait; +use backoff::{ExponentialBackoff, ExponentialBackoffBuilder}; use log::info; pub type ConnectionPool = deadpool::managed::Pool; @@ -12,6 +15,7 @@ pub type ManagedConnection = deadpool::managed::Object; pub struct ConnectionManager { info: ConnectionInfo, + backoff: ExponentialBackoff, } impl ConnectionManager { @@ -22,7 +26,17 @@ impl ConnectionManager { client_certificate: Option<&ClientCertificate>, ) -> Result { let info = ConnectionInfo::new(uri, user, password, client_certificate)?; - Ok(ConnectionManager { info }) + let backoff = ExponentialBackoffBuilder::new() + .with_initial_interval(Duration::from_millis(1)) + .with_randomization_factor(0.42) + .with_multiplier(2.0) + .with_max_elapsed_time(Some(Duration::from_secs(60))) + .build(); + Ok(ConnectionManager { info, backoff }) + } + + pub fn backoff(&self) -> ExponentialBackoff { + self.backoff.clone() } } diff --git a/lib/src/query.rs b/lib/src/query.rs index 285d8ead..c78dfca6 100644 --- a/lib/src/query.rs +++ b/lib/src/query.rs @@ -4,6 +4,7 @@ use crate::{ pool::ManagedConnection, stream::{DetachedRowStream, RowStream}, types::{BoltList, BoltMap, BoltString, BoltType}, + Error, Neo4jErrorKind, Success, }; /// Abstracts a cypher query that is sent to neo4j server. @@ -43,24 +44,31 @@ impl Query { } pub(crate) async fn run(self, db: &str, connection: &mut ManagedConnection) -> Result<()> { - let run = BoltRequest::run(db, &self.query, self.params); - match connection.send_recv(run).await? { - BoltResponse::Success(_) => match connection.send_recv(BoltRequest::discard()).await? { - BoltResponse::Success(_) => Ok(()), - msg => Err(unexpected(msg, "DISCARD")), - }, - msg => Err(unexpected(msg, "RUN")), - } + let request = BoltRequest::run(db, &self.query, self.params); + Self::try_run(request, connection) + .await + .map_err(unwrap_backoff) } - pub(crate) async fn execute( - self, + pub(crate) async fn run_retryable( + &self, + db: &str, + connection: &mut ManagedConnection, + ) -> Result<(), backoff::Error> { + let request = BoltRequest::run(db, &self.query, self.params.clone()); + Self::try_run(request, connection).await + } + + pub(crate) async fn execute_retryable( + &self, db: &str, fetch_size: usize, mut connection: ManagedConnection, - ) -> Result { - let stream = self.execute_mut(db, fetch_size, &mut connection).await?; - Ok(DetachedRowStream::new(stream, connection)) + ) -> Result> { + let request = BoltRequest::run(db, &self.query, self.params.clone()); + Self::try_execute(request, fetch_size, &mut connection) + .await + .map(|stream| DetachedRowStream::new(stream, connection)) } pub(crate) async fn execute_mut<'conn>( @@ -70,13 +78,38 @@ impl Query { connection: &'conn mut ManagedConnection, ) -> Result { let run = BoltRequest::run(db, &self.query, self.params); - match connection.send_recv(run).await { - Ok(BoltResponse::Success(success)) => { - let fields: BoltList = success.get("fields").unwrap_or_default(); - let qid: i64 = success.get("qid").unwrap_or(-1); - Ok(RowStream::new(qid, fields, fetch_size)) - } - msg => Err(unexpected(msg, "RUN")), + Self::try_execute(run, fetch_size, connection) + .await + .map_err(unwrap_backoff) + } + + async fn try_run(request: BoltRequest, connection: &mut ManagedConnection) -> QueryResult<()> { + let _ = Self::try_request(request, connection).await?; + match connection.send_recv(BoltRequest::discard()).await { + Ok(BoltResponse::Success(_)) => Ok(()), + otherwise => wrap_error(otherwise, "DISCARD"), + } + } + + async fn try_execute( + request: BoltRequest, + fetch_size: usize, + connection: &mut ManagedConnection, + ) -> QueryResult { + Self::try_request(request, connection).await.map(|success| { + let fields: BoltList = success.get("fields").unwrap_or_default(); + let qid: i64 = success.get("qid").unwrap_or(-1); + RowStream::new(qid, fields, fetch_size) + }) + } + + async fn try_request( + request: BoltRequest, + connection: &mut ManagedConnection, + ) -> QueryResult { + match connection.send_recv(request).await { + Ok(BoltResponse::Success(success)) => Ok(success), + otherwise => wrap_error(otherwise, "RUN"), } } } @@ -93,6 +126,32 @@ impl From<&str> for Query { } } +type QueryResult = Result>; + +fn wrap_error(resp: Result, req: &str) -> QueryResult { + let can_retry = if let Ok(BoltResponse::Failure(failure)) = &resp { + failure + .get::("code") + .map_or(false, |code| Neo4jErrorKind::new(&code).can_retry()) + } else { + false + }; + + let err = unexpected(resp, req); + if can_retry { + Err(backoff::Error::transient(err)) + } else { + Err(backoff::Error::permanent(err)) + } +} + +fn unwrap_backoff(err: backoff::Error) -> Error { + match err { + backoff::Error::Permanent(e) => e, + backoff::Error::Transient { err, .. } => err, + } +} + #[cfg(test)] mod tests { use super::*; From 4a356fcbbd15cef062af47c83125d67f88ee86f1 Mon Sep 17 00:00:00 2001 From: Paul Horn Date: Wed, 7 Aug 2024 19:59:42 +0200 Subject: [PATCH 2/6] Don't use exclusive range pattern --- lib/src/graph.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/src/graph.rs b/lib/src/graph.rs index 8956c7fa..08637fad 100644 --- a/lib/src/graph.rs +++ b/lib/src/graph.rs @@ -143,8 +143,8 @@ impl Graph { fn log_retry(e: crate::Error, delay: Duration) { let level = match delay.as_millis() { - ..500 => log::Level::Debug, - 500..5000 => log::Level::Info, + ..=499 => log::Level::Debug, + 500..=4999 => log::Level::Info, _ => log::Level::Warn, }; log::log!(level, "Retrying query in {delay:?} due to error: {e}"); From 83a75974e768cc61062985e6db003f48e81d87fa Mon Sep 17 00:00:00 2001 From: Paul Horn Date: Wed, 7 Aug 2024 20:10:37 +0200 Subject: [PATCH 3/6] Fix build settings for 0.7 base PRs --- .github/workflows/checks.yml | 4 ++-- .github/workflows/create-release-pr.yml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 556d6641..7e9995fb 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -23,8 +23,8 @@ on: env: RUST_LOG: debug CARGO_TERM_COLOR: always - MSRV: ${{ (github.ref == 'refs/heads/0.7' || startsWith(github.ref, 'refs/tags/v0.7.')) && '1.63.0' || '1.75.0' }} - HACK: hack --package neo4rs --each-feature ${{ (github.ref == 'refs/heads/0.7' || startsWith(github.ref, 'refs/tags/v0.7.')) && '' || '--exclude-features unstable-serde-packstream-format,unstable-bolt-protocol-impl-v2,unstable-streaming-summary' }} + MSRV: ${{ ((github.base_ref || github.ref) == 'refs/heads/0.7' || startsWith(github.ref, 'refs/tags/v0.7.')) && '1.63.0' || '1.75.0' }} + HACK: hack --package neo4rs --each-feature ${{ ((github.base_ref || github.ref) == 'refs/heads/0.7' || startsWith(github.ref, 'refs/tags/v0.7.')) && '' || '--exclude-features unstable-serde-packstream-format,unstable-bolt-protocol-impl-v2,unstable-streaming-summary' }} jobs: diff --git a/.github/workflows/create-release-pr.yml b/.github/workflows/create-release-pr.yml index eeb6be78..075adcb2 100644 --- a/.github/workflows/create-release-pr.yml +++ b/.github/workflows/create-release-pr.yml @@ -20,7 +20,7 @@ on: env: RUSTUP_TOOLCHAIN: stable - MSRV: ${{ (github.ref == 'refs/heads/0.7' || startsWith(github.ref, 'refs/tags/v0.7.')) && '1.63.0' || '1.75.0' }} + MSRV: ${{ ((github.base_ref || github.ref) == 'refs/heads/0.7' || startsWith(github.ref, 'refs/tags/v0.7.')) && '1.63.0' || '1.75.0' }} jobs: make-release-pr: From 949dc31f3eb0376f747f74d719a15560e3861718 Mon Sep 17 00:00:00 2001 From: Paul Horn Date: Wed, 7 Aug 2024 21:26:42 +0200 Subject: [PATCH 4/6] Actually fix build settings for 0.7 base --- .github/workflows/checks.yml | 4 ++-- .github/workflows/create-release-pr.yml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 7e9995fb..dbd674e0 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -23,8 +23,8 @@ on: env: RUST_LOG: debug CARGO_TERM_COLOR: always - MSRV: ${{ ((github.base_ref || github.ref) == 'refs/heads/0.7' || startsWith(github.ref, 'refs/tags/v0.7.')) && '1.63.0' || '1.75.0' }} - HACK: hack --package neo4rs --each-feature ${{ ((github.base_ref || github.ref) == 'refs/heads/0.7' || startsWith(github.ref, 'refs/tags/v0.7.')) && '' || '--exclude-features unstable-serde-packstream-format,unstable-bolt-protocol-impl-v2,unstable-streaming-summary' }} + MSRV: ${{ (github.base_ref == '0.7' || github.ref == 'refs/heads/0.7' || startsWith(github.ref, 'refs/tags/v0.7.')) && '1.63.0' || '1.75.0' }} + HACK: hack --package neo4rs --each-feature ${{ (github.base_ref == '0.7' || github.ref == 'refs/heads/0.7' || startsWith(github.ref, 'refs/tags/v0.7.')) && '' || '--exclude-features unstable-serde-packstream-format,unstable-bolt-protocol-impl-v2,unstable-streaming-summary' }} jobs: diff --git a/.github/workflows/create-release-pr.yml b/.github/workflows/create-release-pr.yml index 075adcb2..eb6f6b90 100644 --- a/.github/workflows/create-release-pr.yml +++ b/.github/workflows/create-release-pr.yml @@ -20,7 +20,7 @@ on: env: RUSTUP_TOOLCHAIN: stable - MSRV: ${{ ((github.base_ref || github.ref) == 'refs/heads/0.7' || startsWith(github.ref, 'refs/tags/v0.7.')) && '1.63.0' || '1.75.0' }} + MSRV: ${{ (github.base_ref == '0.7' || github.ref == 'refs/heads/0.7' || startsWith(github.ref, 'refs/tags/v0.7.')) && '1.63.0' || '1.75.0' }} jobs: make-release-pr: From 873556a2901417a3b0345d99da35abaf70c98129 Mon Sep 17 00:00:00 2001 From: Paul Horn Date: Wed, 7 Aug 2024 21:31:37 +0200 Subject: [PATCH 5/6] Yeah, empty strings are falsey --- .github/workflows/checks.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index dbd674e0..9a4ac273 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -24,7 +24,7 @@ env: RUST_LOG: debug CARGO_TERM_COLOR: always MSRV: ${{ (github.base_ref == '0.7' || github.ref == 'refs/heads/0.7' || startsWith(github.ref, 'refs/tags/v0.7.')) && '1.63.0' || '1.75.0' }} - HACK: hack --package neo4rs --each-feature ${{ (github.base_ref == '0.7' || github.ref == 'refs/heads/0.7' || startsWith(github.ref, 'refs/tags/v0.7.')) && '' || '--exclude-features unstable-serde-packstream-format,unstable-bolt-protocol-impl-v2,unstable-streaming-summary' }} + HACK: hack --package neo4rs ${{ (github.base_ref == '0.7' || github.ref == 'refs/heads/0.7' || startsWith(github.ref, 'refs/tags/v0.7.')) && '--each-feature' || '--each-feature --exclude-features unstable-serde-packstream-format,unstable-bolt-protocol-impl-v2,unstable-streaming-summary' }} jobs: From 772b1d690ebb71077648093eec0ad91dfbb16a3d Mon Sep 17 00:00:00 2001 From: Paul Horn Date: Wed, 7 Aug 2024 21:32:17 +0200 Subject: [PATCH 6/6] Also don't have half open ranges --- lib/src/graph.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/src/graph.rs b/lib/src/graph.rs index 08637fad..4b92c458 100644 --- a/lib/src/graph.rs +++ b/lib/src/graph.rs @@ -143,7 +143,7 @@ impl Graph { fn log_retry(e: crate::Error, delay: Duration) { let level = match delay.as_millis() { - ..=499 => log::Level::Debug, + 0..=499 => log::Level::Debug, 500..=4999 => log::Level::Info, _ => log::Level::Warn, };