Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry on certain query failures for managed transactions #186

Merged
merged 6 commits into from
Aug 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/checks.yml
Original file line number Diff line number Diff line change
@@ -23,8 +23,8 @@ on:
env:
RUST_LOG: debug
CARGO_TERM_COLOR: always
MSRV: ${{ (github.ref == 'refs/heads/0.7' || startsWith(github.ref, 'refs/tags/v0.7.')) && '1.63.0' || '1.75.0' }}
HACK: hack --package neo4rs --each-feature ${{ (github.ref == 'refs/heads/0.7' || startsWith(github.ref, 'refs/tags/v0.7.')) && '' || '--exclude-features unstable-serde-packstream-format,unstable-bolt-protocol-impl-v2,unstable-streaming-summary' }}
MSRV: ${{ (github.base_ref == '0.7' || github.ref == 'refs/heads/0.7' || startsWith(github.ref, 'refs/tags/v0.7.')) && '1.63.0' || '1.75.0' }}
HACK: hack --package neo4rs ${{ (github.base_ref == '0.7' || github.ref == 'refs/heads/0.7' || startsWith(github.ref, 'refs/tags/v0.7.')) && '--each-feature' || '--each-feature --exclude-features unstable-serde-packstream-format,unstable-bolt-protocol-impl-v2,unstable-streaming-summary' }}


jobs:
2 changes: 1 addition & 1 deletion .github/workflows/create-release-pr.yml
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@ on:

env:
RUSTUP_TOOLCHAIN: stable
MSRV: ${{ (github.ref == 'refs/heads/0.7' || startsWith(github.ref, 'refs/tags/v0.7.')) && '1.63.0' || '1.75.0' }}
MSRV: ${{ (github.base_ref == '0.7' || github.ref == 'refs/heads/0.7' || startsWith(github.ref, 'refs/tags/v0.7.')) && '1.63.0' || '1.75.0' }}

jobs:
make-release-pr:
15 changes: 15 additions & 0 deletions ci/Cargo.lock.min

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 54 additions & 21 deletions ci/Cargo.lock.msrv

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/Cargo.toml
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ json = ["serde_json"]

[dependencies]
async-trait = "0.1.0"
backoff = { version = "0.4.0", features = ["tokio"] }
bytes = { version = "1.5.0", features = ["serde"] }
chrono-tz = "0.8.3"
delegate = "0.10.0"
103 changes: 103 additions & 0 deletions lib/examples/concurrent_writes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
use futures::stream::{self, StreamExt, TryStreamExt};
use neo4rs::{query, ConfigBuilder, Graph};

#[tokio::main]
async fn main() {
pretty_env_logger::init();

let neo4j_uri = std::env::var("NEO4J_URI").unwrap();
let neo4j_username = std::env::var("NEO4J_USERNAME").unwrap();
let neo4j_password = std::env::var("NEO4J_PASSWORD").unwrap();

let graph = Graph::connect(
ConfigBuilder::new()
.uri(neo4j_uri)
.user(neo4j_username)
.password(neo4j_password)
.max_connections(420)
.build()
.unwrap(),
)
.await
.unwrap();

stream::iter(1..=1337)
.map(|i| work(i, graph.clone()))
.buffer_unordered(420)
.map(|(i, node_count, rel_count)| {
if i % 100 == 0 || i == 1337 {
println!("iteration: {i}, node count: {node_count}, rel count: {rel_count}");
}
})
.collect::<()>()
.await;
}

async fn work(i: u64, graph: Graph) -> (u64, u64, u64) {
graph
.run(query(
"
CREATE
(dan:Person {name: 'Dan'}),
(annie:Person {name: 'Annie'}),
(matt:Person {name: 'Matt'}),
(jeff:Person {name: 'Jeff'}),
(brie:Person {name: 'Brie'}),
(elsa:Person {name: 'Elsa'}),

(cookies:Product {name: 'Cookies'}),
(tomatoes:Product {name: 'Tomatoes'}),
(cucumber:Product {name: 'Cucumber'}),
(celery:Product {name: 'Celery'}),
(kale:Product {name: 'Kale'}),
(milk:Product {name: 'Milk'}),
(chocolate:Product {name: 'Chocolate'}),

(dan)-[:BUYS {amount: 1.2}]->(cookies),
(dan)-[:BUYS {amount: 3.2}]->(milk),
(dan)-[:BUYS {amount: 2.2}]->(chocolate),

(annie)-[:BUYS {amount: 1.2}]->(cucumber),
(annie)-[:BUYS {amount: 3.2}]->(milk),
(annie)-[:BUYS {amount: 3.2}]->(tomatoes),

(matt)-[:BUYS {amount: 3}]->(tomatoes),
(matt)-[:BUYS {amount: 2}]->(kale),
(matt)-[:BUYS {amount: 1}]->(cucumber),

(jeff)-[:BUYS {amount: 3}]->(cookies),
(jeff)-[:BUYS {amount: 2}]->(milk),

(brie)-[:BUYS {amount: 1}]->(tomatoes),
(brie)-[:BUYS {amount: 2}]->(milk),
(brie)-[:BUYS {amount: 2}]->(kale),
(brie)-[:BUYS {amount: 3}]->(cucumber),
(brie)-[:BUYS {amount: 0.3}]->(celery),

(elsa)-[:BUYS {amount: 3}]->(chocolate),
(elsa)-[:BUYS {amount: 3}]->(milk)
",
))
.await
.unwrap();

let node_count = graph
.execute(query("MATCH (n) RETURN count(n) AS count"))
.await
.unwrap()
.column_into_stream::<u64>("count")
.try_fold(0_u64, |acc, x| async move { Ok(acc + x) })
.await
.unwrap();

let rel_count = graph
.execute(query("MATCH ()-[r]->() RETURN count(r) AS count"))
.await
.unwrap()
.column_into_stream::<u64>("count")
.try_fold(0_u64, |acc, x| async move { Ok(acc + x) })
.await
.unwrap();

(i, node_count, rel_count)
}
106 changes: 106 additions & 0 deletions lib/src/errors.rs
Original file line number Diff line number Diff line change
@@ -62,6 +62,112 @@ pub enum Error {
DeserializationError(DeError),
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum Neo4jErrorKind {
Client(Neo4jClientErrorKind),
Transient,
Database,
Unknown,
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum Neo4jClientErrorKind {
Security(Neo4jSecurityErrorKind),
SessionExpired,
FatalDiscovery,
TransactionTerminated,
ProtocolViolation,
Other,
Unknown,
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum Neo4jSecurityErrorKind {
Authentication,
AuthorizationExpired,
TokenExpired,
Other,
Unknown,
}

impl Neo4jErrorKind {
pub(crate) fn new(code: &str) -> Self {
let code = Self::adjust_code(code).unwrap_or(code);
Self::classify(code)
}

fn adjust_code(code: &str) -> Option<&str> {
match code {
"Neo.TransientError.Transaction.LockClientStopped" => {
Some("Neo.ClientError.Transaction.LockClientStopped")
}
"Neo.TransientError.Transaction.Terminated" => {
Some("Neo.ClientError.Transaction.Terminated")
}
_ => None,
}
}

fn classify(code: &str) -> Self {
let mut parts = code.split('.').skip(1);
let [class, subclass, kind] = [parts.next(), parts.next(), parts.next()];

match class {
Some("ClientError") => match (subclass, kind) {
(Some("Security"), Some("Unauthorized")) => Self::Client(
Neo4jClientErrorKind::Security(Neo4jSecurityErrorKind::Authentication),
),
(Some("Security"), Some("AuthorizationExpired")) => Self::Client(
Neo4jClientErrorKind::Security(Neo4jSecurityErrorKind::AuthorizationExpired),
),
(Some("Security"), Some("TokenExpired")) => Self::Client(
Neo4jClientErrorKind::Security(Neo4jSecurityErrorKind::TokenExpired),
),
(Some("Database"), Some("DatabaseNotFound")) => {
Self::Client(Neo4jClientErrorKind::FatalDiscovery)
}
(Some("Transaction"), Some("Terminated")) => {
Self::Client(Neo4jClientErrorKind::TransactionTerminated)
}
(Some("Security"), Some(_)) => Self::Client(Neo4jClientErrorKind::Security(
Neo4jSecurityErrorKind::Other,
)),
(Some("Security"), _) => Self::Client(Neo4jClientErrorKind::Security(
Neo4jSecurityErrorKind::Unknown,
)),
(Some("Request"), _) => Self::Client(Neo4jClientErrorKind::ProtocolViolation),
(Some("Cluster"), Some("NotALeader")) => {
Self::Client(Neo4jClientErrorKind::SessionExpired)
}
(Some("General"), Some("ForbiddenOnReadOnlyDatabase")) => {
Self::Client(Neo4jClientErrorKind::SessionExpired)
}
(Some(_), _) => Self::Client(Neo4jClientErrorKind::Other),
_ => Self::Client(Neo4jClientErrorKind::Unknown),
},
Some("TransientError") => Self::Transient,
Some(_) => Self::Database,
None => Self::Unknown,
}
}

pub(crate) fn can_retry(&self) -> bool {
matches!(
self,
Self::Client(
Neo4jClientErrorKind::Security(Neo4jSecurityErrorKind::AuthorizationExpired)
| Neo4jClientErrorKind::SessionExpired
) | Self::Transient
)
}
}

impl From<&str> for Neo4jErrorKind {
fn from(code: &str) -> Self {
Self::new(code)
}
}

impl std::convert::From<deadpool::managed::PoolError<Error>> for Error {
fn from(e: deadpool::managed::PoolError<Error>) -> Self {
match e {
66 changes: 62 additions & 4 deletions lib/src/graph.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use crate::{
config::{Config, ConfigBuilder, Database, LiveConfig},
errors::Result,
@@ -48,13 +50,17 @@ impl Graph {
/// Starts a new transaction on the configured database.
/// All queries that needs to be run/executed within the transaction
/// should be executed using either [`Txn::run`] or [`Txn::execute`]
///
/// Transactions will not be automatically retried on any failure.
pub async fn start_txn(&self) -> Result<Txn> {
self.start_txn_on(self.config.db.clone()).await
}

/// Starts a new transaction on the provided database.
/// All queries that needs to be run/executed within the transaction
/// should be executed using either [`Txn::run`] or [`Txn::execute`]
///
/// Transactions will not be automatically retried on any failure.
pub async fn start_txn_on(&self, db: impl Into<Database>) -> Result<Txn> {
let connection = self.pool.get().await?;
Txn::new(db.into(), self.config.fetch_size, connection).await
@@ -63,6 +69,11 @@ impl Graph {
/// Runs a query on the configured database using a connection from the connection pool,
/// It doesn't return any [`RowStream`] as the `run` abstraction discards any stream.
///
/// This operation retires the query on certain failures.
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
/// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted.
/// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry.
///
/// Use [`Graph::run`] for cases where you just want a write operation
///
/// use [`Graph::execute`] when you are interested in the result stream
@@ -73,23 +84,70 @@ impl Graph {
/// Runs a query on the provided database using a connection from the connection pool.
/// It doesn't return any [`RowStream`] as the `run` abstraction discards any stream.
///
/// This operation retires the query on certain failures.
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
/// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted.
/// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry.
///
/// Use [`Graph::run`] for cases where you just want a write operation
///
/// use [`Graph::execute`] when you are interested in the result stream
pub async fn run_on(&self, db: &str, q: Query) -> Result<()> {
let mut connection = self.pool.get().await?;
q.run(db, &mut connection).await
backoff::future::retry_notify(
self.pool.manager().backoff(),
|| {
let pool = &self.pool;
let query = &q;
async move {
let mut connection = pool.get().await.map_err(crate::Error::from)?;
query.run_retryable(db, &mut connection).await
}
},
Self::log_retry,
)
.await
}

/// Executes a query on the configured database and returns a [`DetachedRowStream`]
///
/// This operation retires the query on certain failures.
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
/// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted.
/// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry.
pub async fn execute(&self, q: Query) -> Result<DetachedRowStream> {
self.execute_on(&self.config.db, q).await
}

/// Executes a query on the provided database and returns a [`DetaRowStream`]
///
/// This operation retires the query on certain failures.
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
/// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted.
/// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry.
pub async fn execute_on(&self, db: &str, q: Query) -> Result<DetachedRowStream> {
let connection = self.pool.get().await?;
q.execute(db, self.config.fetch_size, connection).await
backoff::future::retry_notify(
self.pool.manager().backoff(),
|| {
let pool = &self.pool;
let fetch_size = self.config.fetch_size;
let query = &q;
async move {
let connection = pool.get().await.map_err(crate::Error::from)?;
query.execute_retryable(db, fetch_size, connection).await
}
},
Self::log_retry,
)
.await
}

fn log_retry(e: crate::Error, delay: Duration) {
let level = match delay.as_millis() {
0..=499 => log::Level::Debug,
500..=4999 => log::Level::Info,
_ => log::Level::Warn,
};
log::log!(level, "Retrying query in {delay:?} due to error: {e}");
}
}

5 changes: 4 additions & 1 deletion lib/src/lib.rs
Original file line number Diff line number Diff line change
@@ -438,7 +438,7 @@ mod version;

pub use crate::auth::ClientCertificate;
pub use crate::config::{Config, ConfigBuilder, Database};
pub use crate::errors::*;
pub use crate::errors::{unexpected, Error, Result};
pub use crate::graph::{query, Graph};
pub use crate::query::Query;
pub use crate::row::{Node, Path, Point2D, Point3D, Relation, Row, UnboundedRelation};
@@ -454,3 +454,6 @@ pub use crate::types::{
BoltPoint2D, BoltPoint3D, BoltRelation, BoltString, BoltTime, BoltType, BoltUnboundedRelation,
};
pub use crate::version::Version;

pub(crate) use errors::Neo4jErrorKind;
pub(crate) use messages::Success;
2 changes: 1 addition & 1 deletion lib/src/messages.rs
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ use record::Record;
use reset::Reset;
use rollback::Rollback;
use run::Run;
use success::Success;
pub(crate) use success::Success;

#[derive(Debug, PartialEq, Clone)]
pub enum BoltResponse {
16 changes: 15 additions & 1 deletion lib/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
use std::time::Duration;

use crate::{
auth::ClientCertificate,
config::Config,
connection::{Connection, ConnectionInfo},
errors::{Error, Result},
};
use async_trait::async_trait;
use backoff::{ExponentialBackoff, ExponentialBackoffBuilder};
use log::info;

pub type ConnectionPool = deadpool::managed::Pool<ConnectionManager>;
pub type ManagedConnection = deadpool::managed::Object<ConnectionManager>;

pub struct ConnectionManager {
info: ConnectionInfo,
backoff: ExponentialBackoff,
}

impl ConnectionManager {
@@ -22,7 +26,17 @@ impl ConnectionManager {
client_certificate: Option<&ClientCertificate>,
) -> Result<Self> {
let info = ConnectionInfo::new(uri, user, password, client_certificate)?;
Ok(ConnectionManager { info })
let backoff = ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_millis(1))
.with_randomization_factor(0.42)
.with_multiplier(2.0)
.with_max_elapsed_time(Some(Duration::from_secs(60)))
.build();
Ok(ConnectionManager { info, backoff })
}

pub fn backoff(&self) -> ExponentialBackoff {
self.backoff.clone()
}
}

99 changes: 79 additions & 20 deletions lib/src/query.rs
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ use crate::{
pool::ManagedConnection,
stream::{DetachedRowStream, RowStream},
types::{BoltList, BoltMap, BoltString, BoltType},
Error, Neo4jErrorKind, Success,
};

/// Abstracts a cypher query that is sent to neo4j server.
@@ -43,24 +44,31 @@ impl Query {
}

pub(crate) async fn run(self, db: &str, connection: &mut ManagedConnection) -> Result<()> {
let run = BoltRequest::run(db, &self.query, self.params);
match connection.send_recv(run).await? {
BoltResponse::Success(_) => match connection.send_recv(BoltRequest::discard()).await? {
BoltResponse::Success(_) => Ok(()),
msg => Err(unexpected(msg, "DISCARD")),
},
msg => Err(unexpected(msg, "RUN")),
}
let request = BoltRequest::run(db, &self.query, self.params);
Self::try_run(request, connection)
.await
.map_err(unwrap_backoff)
}

pub(crate) async fn execute(
self,
pub(crate) async fn run_retryable(
&self,
db: &str,
connection: &mut ManagedConnection,
) -> Result<(), backoff::Error<Error>> {
let request = BoltRequest::run(db, &self.query, self.params.clone());
Self::try_run(request, connection).await
}

pub(crate) async fn execute_retryable(
&self,
db: &str,
fetch_size: usize,
mut connection: ManagedConnection,
) -> Result<DetachedRowStream> {
let stream = self.execute_mut(db, fetch_size, &mut connection).await?;
Ok(DetachedRowStream::new(stream, connection))
) -> Result<DetachedRowStream, backoff::Error<Error>> {
let request = BoltRequest::run(db, &self.query, self.params.clone());
Self::try_execute(request, fetch_size, &mut connection)
.await
.map(|stream| DetachedRowStream::new(stream, connection))
}

pub(crate) async fn execute_mut<'conn>(
@@ -70,13 +78,38 @@ impl Query {
connection: &'conn mut ManagedConnection,
) -> Result<RowStream> {
let run = BoltRequest::run(db, &self.query, self.params);
match connection.send_recv(run).await {
Ok(BoltResponse::Success(success)) => {
let fields: BoltList = success.get("fields").unwrap_or_default();
let qid: i64 = success.get("qid").unwrap_or(-1);
Ok(RowStream::new(qid, fields, fetch_size))
}
msg => Err(unexpected(msg, "RUN")),
Self::try_execute(run, fetch_size, connection)
.await
.map_err(unwrap_backoff)
}

async fn try_run(request: BoltRequest, connection: &mut ManagedConnection) -> QueryResult<()> {
let _ = Self::try_request(request, connection).await?;
match connection.send_recv(BoltRequest::discard()).await {
Ok(BoltResponse::Success(_)) => Ok(()),
otherwise => wrap_error(otherwise, "DISCARD"),
}
}

async fn try_execute(
request: BoltRequest,
fetch_size: usize,
connection: &mut ManagedConnection,
) -> QueryResult<RowStream> {
Self::try_request(request, connection).await.map(|success| {
let fields: BoltList = success.get("fields").unwrap_or_default();
let qid: i64 = success.get("qid").unwrap_or(-1);
RowStream::new(qid, fields, fetch_size)
})
}

async fn try_request(
request: BoltRequest,
connection: &mut ManagedConnection,
) -> QueryResult<Success> {
match connection.send_recv(request).await {
Ok(BoltResponse::Success(success)) => Ok(success),
otherwise => wrap_error(otherwise, "RUN"),
}
}
}
@@ -93,6 +126,32 @@ impl From<&str> for Query {
}
}

type QueryResult<T> = Result<T, backoff::Error<Error>>;

fn wrap_error<T>(resp: Result<BoltResponse>, req: &str) -> QueryResult<T> {
let can_retry = if let Ok(BoltResponse::Failure(failure)) = &resp {
failure
.get::<String>("code")
.map_or(false, |code| Neo4jErrorKind::new(&code).can_retry())
} else {
false
};

let err = unexpected(resp, req);
if can_retry {
Err(backoff::Error::transient(err))
} else {
Err(backoff::Error::permanent(err))
}
}

fn unwrap_backoff(err: backoff::Error<Error>) -> Error {
match err {
backoff::Error::Permanent(e) => e,
backoff::Error::Transient { err, .. } => err,
}
}

#[cfg(test)]
mod tests {
use super::*;