diff --git a/lib/src/bolt/mod.rs b/lib/src/bolt/mod.rs index bfd11720..f3f21527 100644 --- a/lib/src/bolt/mod.rs +++ b/lib/src/bolt/mod.rs @@ -13,7 +13,9 @@ mod request; mod structs; mod summary; -pub use request::{Commit, Discard, Goodbye, Hello, HelloBuilder, Reset, Rollback, WrapExtra}; +pub use request::{ + Commit, Discard, Goodbye, Hello, HelloBuilder, Pull, Reset, Rollback, WrapExtra, +}; pub use structs::{ Bolt, BoltRef, Date, DateDuration, DateTime, DateTimeZoneId, DateTimeZoneIdRef, Duration, LegacyDateTime, LegacyDateTimeZoneId, LegacyDateTimeZoneIdRef, LocalDateTime, LocalTime, Node, diff --git a/lib/src/messages.rs b/lib/src/messages.rs index 7e4795ad..af469dac 100644 --- a/lib/src/messages.rs +++ b/lib/src/messages.rs @@ -20,7 +20,6 @@ use crate::{ use begin::Begin; use bytes::Bytes; use failure::Failure; -use pull::Pull; use record::Record; use run::Run; pub(crate) use success::Success; @@ -37,30 +36,34 @@ pub enum BoltResponse { pub enum BoltRequest { #[cfg_attr( feature = "unstable-bolt-protocol-impl-v2", - deprecated(since = "0.8.0", note = "Use `crate::bolt::Hello` instead.") + deprecated(since = "0.9.0", note = "Use `crate::bolt::Hello` instead.") )] Hello(hello::Hello), Run(Run), - Pull(Pull), #[cfg_attr( feature = "unstable-bolt-protocol-impl-v2", - deprecated(since = "0.8.0", note = "Use `crate::bolt::Discard` instead.") + deprecated(since = "0.9.0", note = "Use `crate::bolt::Pull` instead.") + )] + Pull(pull::Pull), + #[cfg_attr( + feature = "unstable-bolt-protocol-impl-v2", + deprecated(since = "0.9.0", note = "Use `crate::bolt::Discard` instead.") )] Discard(discard::Discard), Begin(Begin), #[cfg_attr( feature = "unstable-bolt-protocol-impl-v2", - deprecated(since = "0.8.0", note = "Use `crate::bolt::Commit` instead.") + deprecated(since = "0.9.0", note = "Use `crate::bolt::Commit` instead.") )] Commit(commit::Commit), #[cfg_attr( feature = "unstable-bolt-protocol-impl-v2", - deprecated(since = "0.8.0", note = "Use `crate::bolt::Rollback` instead.") + deprecated(since = "0.9.0", note = "Use `crate::bolt::Rollback` instead.") )] Rollback(rollback::Rollback), #[cfg_attr( feature = "unstable-bolt-protocol-impl-v2", - deprecated(since = "0.8.0", note = "Use `crate::bolt::Reset` instead.") + deprecated(since = "0.9.0", note = "Use `crate::bolt::Reset` instead.") )] Reset(reset::Reset), } @@ -107,7 +110,7 @@ impl HelloBuilder { impl BoltRequest { #[cfg_attr( feature = "unstable-bolt-protocol-impl-v2", - deprecated(since = "0.8.0", note = "Use `crate::bolt::Hello` instead.") + deprecated(since = "0.9.0", note = "Use `crate::bolt::Hello` instead.") )] pub fn hello( agent: BoltString, @@ -133,13 +136,17 @@ impl BoltRequest { BoltRequest::Run(Run::new(db.into(), query.into(), params)) } + #[cfg_attr( + feature = "unstable-bolt-protocol-impl-v2", + deprecated(since = "0.9.0", note = "Use `crate::bolt::Pull` instead.") + )] pub fn pull(n: usize, qid: i64) -> BoltRequest { - BoltRequest::Pull(Pull::new(n as i64, qid)) + BoltRequest::Pull(pull::Pull::new(n as i64, qid)) } #[cfg_attr( feature = "unstable-bolt-protocol-impl-v2", - deprecated(since = "0.8.0", note = "Use `crate::bolt::Discard` instead.") + deprecated(since = "0.9.0", note = "Use `crate::bolt::Discard` instead.") )] pub fn discard() -> BoltRequest { BoltRequest::Discard(discard::Discard::default()) @@ -152,7 +159,7 @@ impl BoltRequest { #[cfg_attr( feature = "unstable-bolt-protocol-impl-v2", - deprecated(since = "0.8.0", note = "Use `crate::bolt::Commit` instead.") + deprecated(since = "0.9.0", note = "Use `crate::bolt::Commit` instead.") )] pub fn commit() -> BoltRequest { BoltRequest::Commit(commit::Commit::new()) @@ -160,7 +167,7 @@ impl BoltRequest { #[cfg_attr( feature = "unstable-bolt-protocol-impl-v2", - deprecated(since = "0.8.0", note = "Use `crate::bolt::Rollback` instead.") + deprecated(since = "0.9.0", note = "Use `crate::bolt::Rollback` instead.") )] pub fn rollback() -> BoltRequest { BoltRequest::Rollback(rollback::Rollback::new()) @@ -168,7 +175,7 @@ impl BoltRequest { #[cfg_attr( feature = "unstable-bolt-protocol-impl-v2", - deprecated(since = "0.8.0", note = "Use `crate::bolt::Reset` instead.") + deprecated(since = "0.9.0", note = "Use `crate::bolt::Reset` instead.") )] pub fn reset() -> BoltRequest { BoltRequest::Reset(reset::Reset::new()) diff --git a/lib/src/messages/bye.rs b/lib/src/messages/bye.rs index 94b50a20..f4c8a191 100644 --- a/lib/src/messages/bye.rs +++ b/lib/src/messages/bye.rs @@ -6,7 +6,7 @@ use neo4rs_macros::BoltStruct; #[signature(0xB0, 0x02)] #[cfg_attr( feature = "unstable-bolt-protocol-impl-v2", - deprecated(since = "0.8.0", note = "Use `crate::bolt::Bye` instead.") + deprecated(since = "0.9.0", note = "Use `crate::bolt::Bye` instead.") )] pub struct Bye; diff --git a/lib/src/messages/commit.rs b/lib/src/messages/commit.rs index 1d7f8d05..03e2aad1 100644 --- a/lib/src/messages/commit.rs +++ b/lib/src/messages/commit.rs @@ -6,7 +6,7 @@ use neo4rs_macros::BoltStruct; #[signature(0xB0, 0x12)] #[cfg_attr( feature = "unstable-bolt-protocol-impl-v2", - deprecated(since = "0.8.0", note = "Use `crate::bolt::Commit` instead.") + deprecated(since = "0.9.0", note = "Use `crate::bolt::Commit` instead.") )] pub struct Commit; diff --git a/lib/src/messages/discard.rs b/lib/src/messages/discard.rs index 75be6631..c7377da0 100644 --- a/lib/src/messages/discard.rs +++ b/lib/src/messages/discard.rs @@ -7,7 +7,7 @@ use neo4rs_macros::BoltStruct; #[signature(0xB1, 0x2F)] #[cfg_attr( feature = "unstable-bolt-protocol-impl-v2", - deprecated(since = "0.8.0", note = "Use `crate::bolt::Discard` instead.") + deprecated(since = "0.9.0", note = "Use `crate::bolt::Discard` instead.") )] pub struct Discard { extra: BoltMap, diff --git a/lib/src/messages/hello.rs b/lib/src/messages/hello.rs index 8b41f90b..86315fc5 100644 --- a/lib/src/messages/hello.rs +++ b/lib/src/messages/hello.rs @@ -7,7 +7,7 @@ use neo4rs_macros::BoltStruct; #[signature(0xB1, 0x01)] #[cfg_attr( feature = "unstable-bolt-protocol-impl-v2", - deprecated(since = "0.8.0", note = "Use `crate::bolt::Hello` instead.") + deprecated(since = "0.9.0", note = "Use `crate::bolt::Hello` instead.") )] pub struct Hello { extra: BoltMap, diff --git a/lib/src/messages/pull.rs b/lib/src/messages/pull.rs index c2614a9a..4d857f28 100644 --- a/lib/src/messages/pull.rs +++ b/lib/src/messages/pull.rs @@ -1,8 +1,14 @@ +#![cfg_attr(feature = "unstable-bolt-protocol-impl-v2", allow(deprecated))] + use crate::types::BoltMap; use neo4rs_macros::BoltStruct; #[derive(Debug, PartialEq, Clone, BoltStruct)] #[signature(0xB1, 0x3F)] +#[cfg_attr( + feature = "unstable-bolt-protocol-impl-v2", + deprecated(since = "0.9.0", note = "Use `crate::bolt::Pull` instead.") +)] pub struct Pull { extra: BoltMap, } @@ -14,6 +20,7 @@ impl Default for Pull { } impl Pull { + #[cfg_attr(feature = "unstable-bolt-protocol-impl-v2", allow(dead_code))] pub fn new(n: i64, qid: i64) -> Pull { let mut extra = BoltMap::default(); extra.put("n".into(), n.into()); diff --git a/lib/src/messages/reset.rs b/lib/src/messages/reset.rs index c11af6bd..a5569cca 100644 --- a/lib/src/messages/reset.rs +++ b/lib/src/messages/reset.rs @@ -6,7 +6,7 @@ use neo4rs_macros::BoltStruct; #[signature(0xB0, 0x0F)] #[cfg_attr( feature = "unstable-bolt-protocol-impl-v2", - deprecated(since = "0.8.0", note = "Use `crate::bolt::Reset` instead.") + deprecated(since = "0.9.0", note = "Use `crate::bolt::Reset` instead.") )] pub struct Reset; diff --git a/lib/src/messages/rollback.rs b/lib/src/messages/rollback.rs index 84507620..1b129aa0 100644 --- a/lib/src/messages/rollback.rs +++ b/lib/src/messages/rollback.rs @@ -6,7 +6,7 @@ use neo4rs_macros::BoltStruct; #[signature(0xB0, 0x13)] #[cfg_attr( feature = "unstable-bolt-protocol-impl-v2", - deprecated(since = "0.8.0", note = "Use `crate::bolt::Rollback` instead.") + deprecated(since = "0.9.0", note = "Use `crate::bolt::Rollback` instead.") )] pub struct Rollback; diff --git a/lib/src/stream.rs b/lib/src/stream.rs index d69bfb6c..29b5253f 100644 --- a/lib/src/stream.rs +++ b/lib/src/stream.rs @@ -1,9 +1,16 @@ +#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))] +use crate::messages::{BoltRequest, BoltResponse}; #[cfg(feature = "unstable-streaming-summary")] use crate::summary::StreamingSummary; +#[cfg(feature = "unstable-bolt-protocol-impl-v2")] +use crate::{ + bolt::{Bolt, Pull, Response, WrapExtra as _}, + summary::Streaming, + BoltType, +}; use crate::{ errors::{Error, Result}, - messages::{BoltRequest, BoltResponse}, pool::ManagedConnection, row::Row, txn::TransactionHandle, @@ -118,27 +125,60 @@ impl RowStream { match self.state { State::Ready => { - let pull = BoltRequest::pull(self.fetch_size, self.qid); - let connection = handle.connection(); - connection.send(pull).await?; + #[cfg(feature = "unstable-bolt-protocol-impl-v2")] + { + let pull = Pull::some(self.fetch_size as i64).for_query(self.qid); + let connection = handle.connection(); + connection.send_as(pull).await?; - self.state = loop { - match connection.recv().await { - Ok(BoltResponse::Success(s)) => { - break if s.get("has_more").unwrap_or(false) { - State::Ready - } else { - State::Complete(None) - }; + self.state = loop { + let response = connection + .recv_as::, Streaming>>() + .await?; + match response { + Response::Detail(record) => { + let record = BoltList::from( + record + .into_iter() + .map(BoltType::from) + .collect::>(), + ); + let row = Row::new(self.fields.clone(), record); + self.buffer.push_back(row); + } + Response::Success(Streaming::HasMore) => break State::Ready, + Response::Success(Streaming::Done(s)) => { + break State::Complete(Some(s)) + } + otherwise => return Err(otherwise.into_error("PULL")), } - Ok(BoltResponse::Record(record)) => { - let row = Row::new(self.fields.clone(), record.data); - self.buffer.push_back(row); + }; + } + + #[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))] + { + let pull = BoltRequest::pull(self.fetch_size, self.qid); + let connection = handle.connection(); + connection.send(pull).await?; + + self.state = loop { + match connection.recv().await { + Ok(BoltResponse::Success(s)) => { + break if s.get("has_more").unwrap_or(false) { + State::Ready + } else { + State::Complete(None) + }; + } + Ok(BoltResponse::Record(record)) => { + let row = Row::new(self.fields.clone(), record.data); + self.buffer.push_back(row); + } + Ok(msg) => return Err(msg.into_error("PULL")), + Err(e) => return Err(e), } - Ok(msg) => return Err(msg.into_error("PULL")), - Err(e) => return Err(e), - } - }; + }; + } } State::Complete(ref mut _summary) => { #[cfg(feature = "unstable-streaming-summary")]