diff --git a/src/bson_compat.rs b/src/bson_compat.rs index f16503a38..dc8263db0 100644 --- a/src/bson_compat.rs +++ b/src/bson_compat.rs @@ -36,6 +36,8 @@ pub(crate) trait RawDocumentBufExt: Sized { key: impl AsRef, value: impl Into> + 'a, ); + + fn to_document(&self) -> crate::error::Result; } #[cfg(feature = "bson-3")] @@ -47,6 +49,10 @@ impl RawDocumentBufExt for crate::bson::RawDocumentBuf { ) { self.append(key, value); } + + fn to_document(&self) -> crate::error::Result { + self.try_into().map_err(Into::into) + } } #[cfg(feature = "bson-3")] diff --git a/src/client.rs b/src/client.rs index 6b425e235..ff4591981 100644 --- a/src/client.rs +++ b/src/client.rs @@ -496,6 +496,7 @@ impl Client { operation_name, start_time, timeout, + self.options().tracing_max_document_length_bytes, ); #[cfg(feature = "tracing-unstable")] event_emitter.emit_started_event(self.inner.topology.latest().description.clone()); diff --git a/src/client/csfle/state_machine.rs b/src/client/csfle/state_machine.rs index 1d85b4a17..f2644b97f 100644 --- a/src/client/csfle/state_machine.rs +++ b/src/client/csfle/state_machine.rs @@ -17,7 +17,7 @@ use tokio::{ use crate::{ client::{csfle::options::KmsProvidersTlsOptions, options::ServerAddress, WeakClient}, error::{Error, Result}, - operation::{run_command::RunCommand, RawOutput}, + operation::{raw_output::RawOutput, run_command::RunCommand}, options::ReadConcern, runtime::{process::Process, AsyncStream, TlsConfig}, Client, diff --git a/src/client/executor.rs b/src/client/executor.rs index 77410c664..6b68a78e8 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -532,9 +532,13 @@ impl Client { let start_time = Instant::now(); let command_result = match connection.send_message(message).await { Ok(response) => { - let is_sharded = - connection.stream_description()?.initial_server_type == ServerType::Mongos; - self.parse_response(op, session, is_sharded, response).await + match self + .parse_response(op, session, connection.is_sharded()?, &response) + .await + { + Ok(()) => Ok(response), + Err(error) => Err(error.with_server_response(&response)), + } } Err(err) => Err(err), }; @@ -612,7 +616,7 @@ impl Client { effective_criteria: effective_criteria.clone(), }; - match op.handle_response(response, context).await { + match op.handle_response(&response, context).await { Ok(response) => Ok(response), Err(mut err) => { err.add_labels_and_update_pin( @@ -620,7 +624,7 @@ impl Client { session, Some(retryability), ); - Err(err) + Err(err.with_server_response(&response)) } } } @@ -820,8 +824,8 @@ impl Client { op: &T, session: &mut Option<&mut ClientSession>, is_sharded: bool, - response: RawCommandResponse, - ) -> Result { + response: &RawCommandResponse, + ) -> Result<()> { let raw_doc = RawDocument::from_bytes(response.as_bytes())?; let ok = match raw_doc.get("ok")? { @@ -870,7 +874,7 @@ impl Client { } } - Ok(response) + Ok(()) } else { Err(response .body::() diff --git a/src/cmap.rs b/src/cmap.rs index b0a02f8ed..98dc2b199 100644 --- a/src/cmap.rs +++ b/src/cmap.rs @@ -69,14 +69,23 @@ impl ConnectionPool { address: ServerAddress, connection_establisher: ConnectionEstablisher, server_updater: TopologyUpdater, - topology_id: ObjectId, options: Option, + #[cfg(feature = "tracing-unstable")] topology_id: ObjectId, ) -> Self { let event_handler = options .as_ref() .and_then(|opts| opts.cmap_event_handler.clone()); - let event_emitter = CmapEventEmitter::new(event_handler, topology_id); + #[cfg(feature = "tracing-unstable")] + let event_emitter = CmapEventEmitter::new( + event_handler, + topology_id, + options + .as_ref() + .and_then(|options| options.max_document_length_bytes), + ); + #[cfg(not(feature = "tracing-unstable"))] + let event_emitter = CmapEventEmitter::new(event_handler); let (manager, connection_requester, generation_subscriber) = ConnectionPoolWorker::start( address.clone(), @@ -114,7 +123,10 @@ impl ConnectionPool { manager, connection_requester, generation_subscriber, - event_emitter: CmapEventEmitter::new(None, ObjectId::new()), + #[cfg(feature = "tracing-unstable")] + event_emitter: CmapEventEmitter::new(None, ObjectId::new(), None), + #[cfg(not(feature = "tracing-unstable"))] + event_emitter: CmapEventEmitter::new(None), } } diff --git a/src/cmap/conn/pooled.rs b/src/cmap/conn/pooled.rs index acf61fb55..59649ba05 100644 --- a/src/cmap/conn/pooled.rs +++ b/src/cmap/conn/pooled.rs @@ -30,6 +30,7 @@ use crate::{ ConnectionReadyEvent, }, runtime::AsyncStream, + ServerType, }; /// A wrapper around the [`Connection`] type that represents a connection within a connection pool. @@ -205,6 +206,11 @@ impl PooledConnection { Instant::now().duration_since(available_time) >= max_idle_time } + /// Whether this connection is to a mongos. + pub(crate) fn is_sharded(&self) -> Result { + Ok(self.stream_description()?.initial_server_type == ServerType::Mongos) + } + /// Nullifies the internal state of this connection and returns it in a new [PooledConnection] /// with the given state. fn take(&mut self, new_state: PooledConnectionState) -> Self { diff --git a/src/cmap/options.rs b/src/cmap/options.rs index 47baa2e8f..ab76fd6bb 100644 --- a/src/cmap/options.rs +++ b/src/cmap/options.rs @@ -70,6 +70,10 @@ pub(crate) struct ConnectionPoolOptions { /// /// The default is 2. pub(crate) max_connecting: Option, + + /// The maximum length for documents in tracing events. + #[cfg(feature = "tracing-unstable")] + pub(crate) max_document_length_bytes: Option, } impl ConnectionPoolOptions { @@ -86,6 +90,8 @@ impl ConnectionPoolOptions { load_balanced: options.load_balanced, credential: options.credential.clone(), max_connecting: options.max_connecting, + #[cfg(feature = "tracing-unstable")] + max_document_length_bytes: options.tracing_max_document_length_bytes, } } diff --git a/src/cmap/test.rs b/src/cmap/test.rs index 7cc3b7c0f..d63254d42 100644 --- a/src/cmap/test.rs +++ b/src/cmap/test.rs @@ -165,8 +165,9 @@ impl Executor { ConnectionEstablisher::new(EstablisherOptions::from(get_client_options().await)) .unwrap(), updater, - crate::bson::oid::ObjectId::new(), Some(self.pool_options), + #[cfg(feature = "tracing-unstable")] + crate::bson::oid::ObjectId::new(), ); // Mock a monitoring task responding to errors reported by the pool. diff --git a/src/cmap/test/integration.rs b/src/cmap/test/integration.rs index 686dc94d8..2fd3e5af4 100644 --- a/src/cmap/test/integration.rs +++ b/src/cmap/test/integration.rs @@ -49,8 +49,9 @@ async fn acquire_connection_and_send_command() { client_options.hosts[0].clone(), ConnectionEstablisher::new(EstablisherOptions::from(&client_options)).unwrap(), TopologyUpdater::channel().0, - crate::bson::oid::ObjectId::new(), Some(pool_options), + #[cfg(feature = "tracing-unstable")] + crate::bson::oid::ObjectId::new(), ); let mut connection = pool.check_out().await.unwrap(); @@ -116,8 +117,9 @@ async fn concurrent_connections() { get_client_options().await.hosts[0].clone(), ConnectionEstablisher::new(EstablisherOptions::from(&client_options)).unwrap(), TopologyUpdater::channel().0, - crate::bson::oid::ObjectId::new(), Some(options), + #[cfg(feature = "tracing-unstable")] + crate::bson::oid::ObjectId::new(), ); let tasks = (0..2).map(|_| { @@ -191,8 +193,9 @@ async fn connection_error_during_establishment() { client_options.hosts[0].clone(), ConnectionEstablisher::new(EstablisherOptions::from(&client_options)).unwrap(), TopologyUpdater::channel().0, - crate::bson::oid::ObjectId::new(), Some(options), + #[cfg(feature = "tracing-unstable")] + crate::bson::oid::ObjectId::new(), ); pool.check_out().await.expect_err("check out should fail"); diff --git a/src/error.rs b/src/error.rs index 5e389f81d..6601a397b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -13,7 +13,8 @@ use serde::{Deserialize, Serialize}; use thiserror::Error; use crate::{ - bson::{Bson, Document}, + bson::{doc, rawdoc, Bson, Document, RawDocumentBuf}, + cmap::RawCommandResponse, options::ServerAddress, sdam::{ServerType, TopologyVersion}, }; @@ -52,20 +53,32 @@ pub type Result = std::result::Result; #[derive(Clone, Debug, Error)] #[cfg_attr( test, - error("Kind: {kind}, labels: {labels:?}, source: {source:?}, backtrace: {bt}") + error( + "Kind: {kind}, labels: {labels:?}, source: {source:?}, backtrace: {bt}, server response: \ + {server_response:?}" + ) )] #[cfg_attr( not(test), - error("Kind: {kind}, labels: {labels:?}, source: {source:?}") + error( + "Kind: {kind}, labels: {labels:?}, source: {source:?}, server response: \ + {server_response:?}" + ) )] #[non_exhaustive] pub struct Error { /// The type of error that occurred. pub kind: Box, + labels: HashSet, + pub(crate) wire_version: Option, + #[source] pub(crate) source: Option>, + + pub(crate) server_response: Option>, + #[cfg(test)] bt: Arc, } @@ -99,6 +112,7 @@ impl Error { labels, wire_version: None, source: None, + server_response: None, #[cfg(test)] bt: Arc::new(std::backtrace::Backtrace::capture()), } @@ -288,6 +302,20 @@ impl Error { self.labels.insert(label); } + /// The full response returned from the server. This can be used to inspect error fields that + /// are not represented in the `Error` type. + pub fn server_response(&self) -> Option<&RawDocumentBuf> { + self.server_response.as_deref() + } + + /// Adds the server's response to this error if it is not already present. + pub(crate) fn with_server_response(mut self, response: &RawCommandResponse) -> Self { + if self.server_response.is_none() { + self.server_response = Some(Box::new(response.raw_body().to_owned())); + } + self + } + #[cfg(feature = "dns-resolver")] pub(crate) fn from_resolve_error(error: hickory_resolver::error::ResolveError) -> Self { ErrorKind::DnsResolve { @@ -496,6 +524,10 @@ impl Error { source.redact(); } + if self.server_response.is_some() { + self.server_response = Some(Box::new(rawdoc! { "redacted": true })); + } + // This is intentionally written without a catch-all branch so that if new error // kinds are added we remember to reason about whether they need to be redacted. match *self.kind { diff --git a/src/event/cmap.rs b/src/event/cmap.rs index f77bbdcd8..f033ef910 100644 --- a/src/event/cmap.rs +++ b/src/event/cmap.rs @@ -427,16 +427,18 @@ pub(crate) struct CmapEventEmitter { } impl CmapEventEmitter { - // the topology ID is only used when the tracing feature is on. - #[allow(unused_variables)] pub(crate) fn new( user_handler: Option>, - topology_id: ObjectId, + #[cfg(feature = "tracing-unstable")] topology_id: ObjectId, + #[cfg(feature = "tracing-unstable")] max_document_length_bytes: Option, ) -> CmapEventEmitter { Self { user_handler, #[cfg(feature = "tracing-unstable")] - tracing_emitter: ConnectionTracingEventEmitter::new(topology_id), + tracing_emitter: ConnectionTracingEventEmitter::new( + topology_id, + max_document_length_bytes, + ), } } diff --git a/src/operation.rs b/src/operation.rs index d5fdeb0e1..174c71d19 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -19,7 +19,7 @@ pub(crate) mod list_collections; pub(crate) mod list_databases; mod list_indexes; #[cfg(feature = "in-use-encryption")] -mod raw_output; +pub(crate) mod raw_output; pub(crate) mod run_command; pub(crate) mod run_cursor_command; mod search_index; @@ -70,8 +70,6 @@ pub(crate) use find_and_modify::FindAndModify; pub(crate) use get_more::GetMore; pub(crate) use insert::Insert; pub(crate) use list_indexes::ListIndexes; -#[cfg(feature = "in-use-encryption")] -pub(crate) use raw_output::RawOutput; pub(crate) use search_index::{CreateSearchIndexes, DropSearchIndex, UpdateSearchIndex}; pub(crate) use update::{Update, UpdateOrReplace}; @@ -141,7 +139,7 @@ pub(crate) trait Operation { /// Interprets the server response to the command. fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &'a RawCommandResponse, context: ExecutionContext<'a>, ) -> BoxFuture<'a, Result>; @@ -205,7 +203,7 @@ pub(crate) trait OperationWithDefaults: Send + Sync { /// Interprets the server response to the command. fn handle_response<'a>( &'a self, - _response: RawCommandResponse, + _response: &'a RawCommandResponse, _context: ExecutionContext<'a>, ) -> Result { Err(ErrorKind::Internal { @@ -218,7 +216,7 @@ pub(crate) trait OperationWithDefaults: Send + Sync { /// async code is required to handle the response. fn handle_response_async<'a>( &'a self, - response: RawCommandResponse, + response: &'a RawCommandResponse, context: ExecutionContext<'a>, ) -> BoxFuture<'a, Result> { async move { self.handle_response(response, context) }.boxed() @@ -295,7 +293,7 @@ where } fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &'a RawCommandResponse, context: ExecutionContext<'a>, ) -> BoxFuture<'a, Result> { self.handle_response_async(response, context) diff --git a/src/operation/abort_transaction.rs b/src/operation/abort_transaction.rs index 802633e4e..326a7a5e4 100644 --- a/src/operation/abort_transaction.rs +++ b/src/operation/abort_transaction.rs @@ -47,7 +47,7 @@ impl OperationWithDefaults for AbortTransaction { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &RawCommandResponse, _context: ExecutionContext<'a>, ) -> Result { let response: WriteConcernOnlyBody = response.body()?; diff --git a/src/operation/aggregate.rs b/src/operation/aggregate.rs index cd7697d03..13a8932a1 100644 --- a/src/operation/aggregate.rs +++ b/src/operation/aggregate.rs @@ -80,7 +80,7 @@ impl OperationWithDefaults for Aggregate { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &RawCommandResponse, context: ExecutionContext<'a>, ) -> Result { let cursor_response: CursorBody = response.body()?; diff --git a/src/operation/aggregate/change_stream.rs b/src/operation/aggregate/change_stream.rs index 8b8bab53c..d2b384269 100644 --- a/src/operation/aggregate/change_stream.rs +++ b/src/operation/aggregate/change_stream.rs @@ -83,7 +83,7 @@ impl OperationWithDefaults for ChangeStreamAggregate { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &RawCommandResponse, mut context: ExecutionContext<'a>, ) -> Result { let op_time = response diff --git a/src/operation/bulk_write.rs b/src/operation/bulk_write.rs index 8227f24bf..a1829f1f0 100644 --- a/src/operation/bulk_write.rs +++ b/src/operation/bulk_write.rs @@ -389,7 +389,7 @@ where fn handle_response_async<'b>( &'b self, - response: RawCommandResponse, + response: &'b RawCommandResponse, mut context: ExecutionContext<'b>, ) -> BoxFuture<'b, Result> { async move { diff --git a/src/operation/commit_transaction.rs b/src/operation/commit_transaction.rs index ccb417cfc..a920ea68c 100644 --- a/src/operation/commit_transaction.rs +++ b/src/operation/commit_transaction.rs @@ -39,7 +39,7 @@ impl OperationWithDefaults for CommitTransaction { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &RawCommandResponse, _context: ExecutionContext<'a>, ) -> Result { let response: WriteConcernOnlyBody = response.body()?; diff --git a/src/operation/count.rs b/src/operation/count.rs index bf3757611..ab03656ee 100644 --- a/src/operation/count.rs +++ b/src/operation/count.rs @@ -46,7 +46,7 @@ impl OperationWithDefaults for Count { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &'a RawCommandResponse, _context: ExecutionContext<'a>, ) -> Result { let response_body: ResponseBody = response.body()?; diff --git a/src/operation/count_documents.rs b/src/operation/count_documents.rs index f907eb177..0fe048fb9 100644 --- a/src/operation/count_documents.rs +++ b/src/operation/count_documents.rs @@ -92,7 +92,7 @@ impl OperationWithDefaults for CountDocuments { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &'a RawCommandResponse, _context: ExecutionContext<'a>, ) -> Result { let response: SingleCursorResult = response.body()?; diff --git a/src/operation/create.rs b/src/operation/create.rs index 56b849ec0..40aacaf65 100644 --- a/src/operation/create.rs +++ b/src/operation/create.rs @@ -40,7 +40,7 @@ impl OperationWithDefaults for Create { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &'a RawCommandResponse, _context: ExecutionContext<'a>, ) -> Result { let response: WriteConcernOnlyBody = response.body()?; diff --git a/src/operation/create_indexes.rs b/src/operation/create_indexes.rs index 621b0d332..1c46a7c05 100644 --- a/src/operation/create_indexes.rs +++ b/src/operation/create_indexes.rs @@ -69,7 +69,7 @@ impl OperationWithDefaults for CreateIndexes { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &'a RawCommandResponse, _context: ExecutionContext<'a>, ) -> Result { let response: WriteConcernOnlyBody = response.body()?; diff --git a/src/operation/delete.rs b/src/operation/delete.rs index 5a388c1d2..f15bb184f 100644 --- a/src/operation/delete.rs +++ b/src/operation/delete.rs @@ -75,7 +75,7 @@ impl OperationWithDefaults for Delete { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &'a RawCommandResponse, _context: ExecutionContext<'a>, ) -> Result { let response: WriteResponseBody = response.body()?; diff --git a/src/operation/distinct.rs b/src/operation/distinct.rs index 0e9d4e4c1..8a0f45307 100644 --- a/src/operation/distinct.rs +++ b/src/operation/distinct.rs @@ -68,7 +68,7 @@ impl OperationWithDefaults for Distinct { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &'a RawCommandResponse, _context: ExecutionContext<'a>, ) -> Result { let response: Response = response.body()?; diff --git a/src/operation/drop_collection.rs b/src/operation/drop_collection.rs index 48fc24edc..562b086fd 100644 --- a/src/operation/drop_collection.rs +++ b/src/operation/drop_collection.rs @@ -40,7 +40,7 @@ impl OperationWithDefaults for DropCollection { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &'a RawCommandResponse, _context: ExecutionContext<'a>, ) -> Result { let response: WriteConcernOnlyBody = response.body()?; diff --git a/src/operation/drop_database.rs b/src/operation/drop_database.rs index f0009f3c6..1273e615f 100644 --- a/src/operation/drop_database.rs +++ b/src/operation/drop_database.rs @@ -40,7 +40,7 @@ impl OperationWithDefaults for DropDatabase { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &'a RawCommandResponse, _context: ExecutionContext<'a>, ) -> Result { let response: WriteConcernOnlyBody = response.body()?; diff --git a/src/operation/drop_indexes.rs b/src/operation/drop_indexes.rs index a9f7c0b4a..de9a06d3f 100644 --- a/src/operation/drop_indexes.rs +++ b/src/operation/drop_indexes.rs @@ -40,7 +40,7 @@ impl OperationWithDefaults for DropIndexes { fn handle_response<'a>( &'a self, - _response: RawCommandResponse, + _response: &'a RawCommandResponse, _context: ExecutionContext<'a>, ) -> Result { Ok(()) diff --git a/src/operation/find.rs b/src/operation/find.rs index fafffd1be..efd8eb32f 100644 --- a/src/operation/find.rs +++ b/src/operation/find.rs @@ -92,7 +92,7 @@ impl OperationWithDefaults for Find { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &'a RawCommandResponse, context: ExecutionContext<'a>, ) -> Result { let response: CursorBody = response.body()?; diff --git a/src/operation/find_and_modify.rs b/src/operation/find_and_modify.rs index 123966202..208795da9 100644 --- a/src/operation/find_and_modify.rs +++ b/src/operation/find_and_modify.rs @@ -89,7 +89,7 @@ impl OperationWithDefaults for FindAndModify { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &'a RawCommandResponse, _context: ExecutionContext<'a>, ) -> Result { #[derive(Debug, Deserialize)] diff --git a/src/operation/get_more.rs b/src/operation/get_more.rs index 046cccbde..49aa7ac0d 100644 --- a/src/operation/get_more.rs +++ b/src/operation/get_more.rs @@ -84,7 +84,7 @@ impl OperationWithDefaults for GetMore<'_> { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &'a RawCommandResponse, _context: ExecutionContext<'a>, ) -> Result { let response: GetMoreResponseBody = response.body()?; diff --git a/src/operation/insert.rs b/src/operation/insert.rs index bdb7645c3..b79a1c42b 100644 --- a/src/operation/insert.rs +++ b/src/operation/insert.rs @@ -130,7 +130,7 @@ impl OperationWithDefaults for Insert<'_> { fn handle_response<'b>( &'b self, - response: RawCommandResponse, + response: &'b RawCommandResponse, _context: ExecutionContext<'b>, ) -> Result { let response: WriteResponseBody = response.body()?; diff --git a/src/operation/list_collections.rs b/src/operation/list_collections.rs index eaad8a0a9..c4b76a3b1 100644 --- a/src/operation/list_collections.rs +++ b/src/operation/list_collections.rs @@ -57,7 +57,7 @@ impl OperationWithDefaults for ListCollections { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &'a RawCommandResponse, context: ExecutionContext<'a>, ) -> Result { let response: CursorBody = response.body()?; diff --git a/src/operation/list_databases.rs b/src/operation/list_databases.rs index cf5b67d02..9897d4047 100644 --- a/src/operation/list_databases.rs +++ b/src/operation/list_databases.rs @@ -43,7 +43,7 @@ impl OperationWithDefaults for ListDatabases { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &'a RawCommandResponse, _context: ExecutionContext<'a>, ) -> Result { let response: Response = response.body()?; diff --git a/src/operation/list_indexes.rs b/src/operation/list_indexes.rs index 591c32d18..2b7c28867 100644 --- a/src/operation/list_indexes.rs +++ b/src/operation/list_indexes.rs @@ -45,7 +45,7 @@ impl OperationWithDefaults for ListIndexes { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &'a RawCommandResponse, context: ExecutionContext<'a>, ) -> Result { let response: CursorBody = response.body()?; diff --git a/src/operation/raw_output.rs b/src/operation/raw_output.rs index e7dec57d6..5bffe4f4e 100644 --- a/src/operation/raw_output.rs +++ b/src/operation/raw_output.rs @@ -31,10 +31,10 @@ impl Operation for RawOutput { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &'a RawCommandResponse, _context: ExecutionContext<'a>, ) -> BoxFuture<'a, Result> { - async move { Ok(response) }.boxed() + async move { Ok(response.clone()) }.boxed() } fn handle_error(&self, error: crate::error::Error) -> Result { diff --git a/src/operation/run_command.rs b/src/operation/run_command.rs index fe5a5a869..06ecac047 100644 --- a/src/operation/run_command.rs +++ b/src/operation/run_command.rs @@ -73,10 +73,10 @@ impl OperationWithDefaults for RunCommand<'_> { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &'a RawCommandResponse, _context: ExecutionContext<'a>, ) -> Result { - Ok(response.into_raw_document_buf().try_into()?) + Ok(response.raw_body().try_into()?) } fn selection_criteria(&self) -> Option<&SelectionCriteria> { diff --git a/src/operation/run_cursor_command.rs b/src/operation/run_cursor_command.rs index ad96eca79..b467f52ed 100644 --- a/src/operation/run_cursor_command.rs +++ b/src/operation/run_cursor_command.rs @@ -94,7 +94,7 @@ impl Operation for RunCursorCommand<'_> { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &'a RawCommandResponse, context: ExecutionContext<'a>, ) -> BoxFuture<'a, Result> { async move { diff --git a/src/operation/search_index.rs b/src/operation/search_index.rs index 2248cbf95..46cc5a977 100644 --- a/src/operation/search_index.rs +++ b/src/operation/search_index.rs @@ -42,7 +42,7 @@ impl OperationWithDefaults for CreateSearchIndexes { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &'a RawCommandResponse, _context: ExecutionContext<'a>, ) -> Result { #[derive(Debug, Deserialize)] @@ -114,7 +114,7 @@ impl OperationWithDefaults for UpdateSearchIndex { fn handle_response<'a>( &'a self, - _response: RawCommandResponse, + _response: &'a RawCommandResponse, _context: ExecutionContext<'a>, ) -> Result { Ok(()) @@ -158,7 +158,7 @@ impl OperationWithDefaults for DropSearchIndex { fn handle_response<'a>( &'a self, - _response: RawCommandResponse, + _response: &'a RawCommandResponse, _context: ExecutionContext<'a>, ) -> Result { Ok(()) diff --git a/src/operation/update.rs b/src/operation/update.rs index e62cf8991..01341e87e 100644 --- a/src/operation/update.rs +++ b/src/operation/update.rs @@ -172,7 +172,7 @@ impl OperationWithDefaults for Update { fn handle_response<'a>( &'a self, - response: RawCommandResponse, + response: &'a RawCommandResponse, _context: ExecutionContext<'a>, ) -> Result { let response: WriteResponseBody = response.body()?; diff --git a/src/sdam/server.rs b/src/sdam/server.rs index a956716b7..854396028 100644 --- a/src/sdam/server.rs +++ b/src/sdam/server.rs @@ -37,15 +37,16 @@ impl Server { options: ClientOptions, connection_establisher: ConnectionEstablisher, topology_updater: TopologyUpdater, - topology_id: crate::bson::oid::ObjectId, + #[cfg(feature = "tracing-unstable")] topology_id: crate::bson::oid::ObjectId, ) -> Arc { Arc::new(Self { pool: ConnectionPool::new( address.clone(), connection_establisher, topology_updater, - topology_id, Some(ConnectionPoolOptions::from_client_options(&options)), + #[cfg(feature = "tracing-unstable")] + topology_id, ), address, operation_count: AtomicU32::new(0), diff --git a/src/sdam/topology.rs b/src/sdam/topology.rs index 043cf5a48..a5de1ca15 100644 --- a/src/sdam/topology.rs +++ b/src/sdam/topology.rs @@ -531,6 +531,7 @@ impl TopologyWorker { self.options.clone(), self.connection_establisher.clone(), self.topology_updater.clone(), + #[cfg(feature = "tracing-unstable")] self.id, ); diff --git a/src/test/spec/crud.rs b/src/test/spec/crud.rs index dcbd52dff..9c0559260 100644 --- a/src/test/spec/crud.rs +++ b/src/test/spec/crud.rs @@ -22,14 +22,8 @@ async fn run_unified() { "replaceOne-hint-unacknowledged.json", "updateMany-hint-unacknowledged.json", "updateOne-hint-unacknowledged.json", - // TODO RUST-1405: unskip the errorResponse tests - "client-bulkWrite-errorResponse.json", + // The Rust driver does not support the collection-level bulk write method. "bulkWrite-errorResponse.json", - "updateOne-errorResponse.json", - "insertOne-errorResponse.json", - "deleteOne-errorResponse.json", - "aggregate-merge-errorResponse.json", - "findOneAndUpdate-errorResponse.json", ]; let skipped_tests = vec![ diff --git a/src/test/spec/handshake.rs b/src/test/spec/handshake.rs index 4d343624d..1ffae27a1 100644 --- a/src/test/spec/handshake.rs +++ b/src/test/spec/handshake.rs @@ -4,21 +4,15 @@ use std::{ }; use crate::{ - bson::{doc, oid::ObjectId, Bson, Document}, - cmap::Command, - event::EventHandler, - options::ClientOptions, - test::{spec::unified_runner::run_unified_tests, topology_is_sharded}, -}; - -use crate::{ + bson::{doc, Bson, Document}, cmap::{ conn::PendingConnection, establish::{ConnectionEstablisher, EstablisherOptions}, + Command, }, - event::cmap::CmapEventEmitter, - options::DriverInfo, - test::get_client_options, + event::{cmap::CmapEventEmitter, EventHandler}, + options::{ClientOptions, DriverInfo}, + test::{get_client_options, spec::unified_runner::run_unified_tests, topology_is_sharded}, Client, }; @@ -56,7 +50,10 @@ async fn arbitrary_auth_mechanism() { id: 0, address: client_options.hosts[0].clone(), generation: crate::cmap::PoolGeneration::normal(), - event_emitter: CmapEventEmitter::new(None, ObjectId::new()), + #[cfg(feature = "tracing-unstable")] + event_emitter: CmapEventEmitter::new(None, crate::bson::oid::ObjectId::new(), None), + #[cfg(not(feature = "tracing-unstable"))] + event_emitter: CmapEventEmitter::new(None), time_created: Instant::now(), cancellation_receiver: None, }; diff --git a/src/test/spec/trace.rs b/src/test/spec/trace.rs index d7b03ef2e..0b150e7d2 100644 --- a/src/test/spec/trace.rs +++ b/src/test/spec/trace.rs @@ -123,6 +123,8 @@ async fn command_logging_truncation_explicit_limit() { client_opts.tracing_max_document_length_bytes = Some(5); let client = Client::for_test().options(client_opts).await; + let expected_truncated_len = 5 + 3; // max len + 3 for trailing "..." + let _levels_guard = DEFAULT_GLOBAL_TRACING_HANDLER.set_levels(HashMap::from([( COMMAND_TRACING_EVENT_TARGET.to_string(), tracing::Level::DEBUG, @@ -142,14 +144,29 @@ async fn command_logging_truncation_explicit_limit() { let started = &events[0]; let command = started.get_value_as_string("command"); - assert_eq!(command.len(), 8); // 5 + 3 for trailing "..." + assert_eq!(command.len(), expected_truncated_len); // 5 + 3 for trailing "..." let succeeded = &events[1]; let reply = succeeded.get_value_as_string("reply"); - assert_eq!(reply.len(), 8); // 5 + 3 for trailing "..." + assert_eq!(reply.len(), expected_truncated_len); // 5 + 3 for trailing "..." + + client + .database("tracing_test") + .run_command(doc! { "invalidOp": 1 }) + .await + .unwrap_err(); - // TODO RUST-1405: when we expose the full server reply for command errors, we should confirm - // that gets correctly truncated in command failed events here as well. + let events = tracing_stream + .collect(Duration::from_millis(500), |_| true) + .await; + assert_eq!(events.len(), 2); // started + failed + + let failed = &events[1]; + let failure = failed.get_value_as_string("failure"); + let (_, server_response) = failure + .split_once("server response: ") + .expect("no server response logged"); + assert_eq!(server_response.len(), expected_truncated_len); } /// Prose test 3: mid-codepoint truncation diff --git a/src/test/spec/unified_runner.rs b/src/test/spec/unified_runner.rs index 0651869c7..fbb265bb1 100644 --- a/src/test/spec/unified_runner.rs +++ b/src/test/spec/unified_runner.rs @@ -126,8 +126,6 @@ async fn valid_pass() { let mut skipped_files = vec![ // TODO RUST-1570: unskip this file "collectionData-createOptions.json", - // TODO RUST-1405: unskip this file - "expectedError-errorResponse.json", // TODO RUST-582: unskip these files "entity-cursor-iterateOnce.json", "matches-lte-operator.json", diff --git a/src/test/spec/unified_runner/operation.rs b/src/test/spec/unified_runner/operation.rs index 3de966d02..d6357094f 100644 --- a/src/test/spec/unified_runner/operation.rs +++ b/src/test/spec/unified_runner/operation.rs @@ -291,7 +291,7 @@ pub(crate) enum Expectation { expected_value: Option, save_as_entity: Option, }, - Error(ExpectError), + Error(Box), Ignore, } @@ -460,7 +460,7 @@ impl<'de> Deserialize<'de> for Operation { "expectError is mutually exclusive with expectResult and saveResultAsEntity", )); } - Expectation::Error(err) + Expectation::Error(Box::new(err)) } else { Expectation::Result { expected_value: definition.expect_result, diff --git a/src/test/spec/unified_runner/operation/index.rs b/src/test/spec/unified_runner/operation/index.rs index d5ad6cba2..0c6b3140e 100644 --- a/src/test/spec/unified_runner/operation/index.rs +++ b/src/test/spec/unified_runner/operation/index.rs @@ -19,6 +19,7 @@ pub(super) struct CreateIndex { session: Option, keys: Document, name: Option, + unique: Option, } impl TestOperation for CreateIndex { @@ -28,7 +29,10 @@ impl TestOperation for CreateIndex { test_runner: &'a TestRunner, ) -> BoxFuture<'a, Result>> { async move { - let options = IndexOptions::builder().name(self.name.clone()).build(); + let options = IndexOptions::builder() + .name(self.name.clone()) + .unique(self.unique) + .build(); let index = IndexModel::builder() .keys(self.keys.clone()) .options(options) diff --git a/src/test/spec/unified_runner/test_file.rs b/src/test/spec/unified_runner/test_file.rs index 048ed364c..3dd1de74a 100644 --- a/src/test/spec/unified_runner/test_file.rs +++ b/src/test/spec/unified_runner/test_file.rs @@ -8,6 +8,8 @@ use tokio::sync::oneshot; use super::{results_match, ExpectedEvent, ObserveEvent, Operation}; +#[cfg(feature = "bson-3")] +use crate::bson_compat::RawDocumentBufExt; #[cfg(feature = "tracing-unstable")] use crate::trace; use crate::{ @@ -553,6 +555,7 @@ pub(crate) struct ExpectError { #[serde(default, deserialize_with = "serde_util::deserialize_indexed_map")] pub(crate) write_errors: Option>, pub(crate) write_concern_errors: Option>, + pub(crate) error_response: Option, } impl ExpectError { @@ -658,6 +661,12 @@ impl ExpectError { results_match(Some(&actual), expected, true, None).expect(&context); } } + + if let Some(ref expected) = self.error_response { + let actual_raw = error.server_response().expect(&context); + let actual = actual_raw.to_document().expect(&context); + results_match(Some(&actual.into()), &expected.into(), true, None).expect(&context); + } } } diff --git a/src/test/spec/unified_runner/test_runner.rs b/src/test/spec/unified_runner/test_runner.rs index 04dd002a4..b3f089a29 100644 --- a/src/test/spec/unified_runner/test_runner.rs +++ b/src/test/spec/unified_runner/test_runner.rs @@ -64,6 +64,7 @@ const SKIPPED_OPERATIONS: &[&str] = &[ "mapReduce", "watch", "rewrapManyDataKey", + "modifyCollection", ]; static MIN_SPEC_VERSION: Version = Version::new(1, 0, 0); diff --git a/src/trace.rs b/src/trace.rs index 4bdbf9317..f3841f1ea 100644 --- a/src/trace.rs +++ b/src/trace.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "bson-3")] +use crate::bson_compat::RawDocumentBufExt; use crate::{ bson::Bson, client::options::{ServerAddress, DEFAULT_PORT}, @@ -29,11 +31,27 @@ impl TracingRepresentation for crate::bson::oid::ObjectId { } } -impl TracingRepresentation for crate::error::Error { - type Representation = String; - - fn tracing_representation(&self) -> String { - self.to_string() +impl crate::error::Error { + fn tracing_representation(&self, max_document_length: usize) -> String { + let mut error_string = format!( + "Kind: {}, labels: {:?}, source: {:?}", + self.kind, + self.labels(), + self.source + ); + if let Some(server_response) = self.server_response() { + let server_response_string = match server_response.to_document() { + Ok(document) => serialize_command_or_reply(document, max_document_length), + Err(_) => { + let mut hex_string = hex::encode(server_response.as_bytes()); + truncate_on_char_boundary(&mut hex_string, max_document_length); + hex_string + } + }; + error_string.push_str(", server response: "); + error_string.push_str(&server_response_string); + } + error_string } } diff --git a/src/trace/command.rs b/src/trace/command.rs index 57d1f1d69..9e6bcd973 100644 --- a/src/trace/command.rs +++ b/src/trace/command.rs @@ -64,7 +64,7 @@ impl CommandTracingEventEmitter { tracing::debug!( target: COMMAND_TRACING_EVENT_TARGET, topologyId = self.topology_id.tracing_representation(), - failure = event.failure.tracing_representation(), + failure = event.failure.tracing_representation(self.max_document_length_bytes), commandName = event.command_name, requestId = event.request_id, driverConnectionId = event.connection.id, diff --git a/src/trace/connection.rs b/src/trace/connection.rs index 6b8d6f811..321fe9a7e 100644 --- a/src/trace/connection.rs +++ b/src/trace/connection.rs @@ -2,17 +2,29 @@ use crate::bson::oid::ObjectId; use crate::{ event::cmap::{CmapEvent, ConnectionCheckoutFailedReason, ConnectionClosedReason}, - trace::{TracingRepresentation, CONNECTION_TRACING_EVENT_TARGET}, + trace::{ + TracingRepresentation, + CONNECTION_TRACING_EVENT_TARGET, + DEFAULT_MAX_DOCUMENT_LENGTH_BYTES, + }, }; #[derive(Clone)] pub(crate) struct ConnectionTracingEventEmitter { topology_id: ObjectId, + max_document_length_bytes: usize, } impl ConnectionTracingEventEmitter { - pub(crate) fn new(topology_id: ObjectId) -> ConnectionTracingEventEmitter { - Self { topology_id } + pub(crate) fn new( + topology_id: ObjectId, + max_document_length_bytes: Option, + ) -> ConnectionTracingEventEmitter { + Self { + topology_id, + max_document_length_bytes: max_document_length_bytes + .unwrap_or(DEFAULT_MAX_DOCUMENT_LENGTH_BYTES), + } } pub(crate) fn handle(&self, event: CmapEvent) { @@ -88,7 +100,7 @@ impl ConnectionTracingEventEmitter { serverPort = event.address.port_tracing_representation(), driverConnectionId = event.connection_id, reason = event.reason.tracing_representation(), - error = event.error.map(|e| e.tracing_representation()), + error = event.error.map(|e| e.tracing_representation(self.max_document_length_bytes)), "Connection closed", ); } @@ -108,7 +120,7 @@ impl ConnectionTracingEventEmitter { serverHost = event.address.host().as_ref(), serverPort = event.address.port_tracing_representation(), reason = event.reason.tracing_representation(), - error = event.error.map(|e| e.tracing_representation()), + error = event.error.map(|e| e.tracing_representation(self.max_document_length_bytes)), durationMS = event.duration.as_millis(), "Connection checkout failed", ); diff --git a/src/trace/server_selection.rs b/src/trace/server_selection.rs index 16a2f0258..a3a568a57 100644 --- a/src/trace/server_selection.rs +++ b/src/trace/server_selection.rs @@ -9,6 +9,7 @@ use crate::{ error::Error, sdam::{SelectedServer, TopologyDescription}, selection_criteria::SelectionCriteria, + trace::DEFAULT_MAX_DOCUMENT_LENGTH_BYTES, }; use std::time::{Duration, Instant}; @@ -35,6 +36,7 @@ pub(crate) struct ServerSelectionTracingEventEmitter<'a> { operation_name: &'a str, start_time: Instant, timeout: Duration, + max_document_length_bytes: usize, } impl ServerSelectionTracingEventEmitter<'_> { @@ -44,6 +46,7 @@ impl ServerSelectionTracingEventEmitter<'_> { operation_name: &'a str, start_time: Instant, timeout: Duration, + max_document_length_bytes: Option, ) -> ServerSelectionTracingEventEmitter<'a> { ServerSelectionTracingEventEmitter::<'a> { topology_id, @@ -51,6 +54,8 @@ impl ServerSelectionTracingEventEmitter<'_> { operation_name, start_time, timeout, + max_document_length_bytes: max_document_length_bytes + .unwrap_or(DEFAULT_MAX_DOCUMENT_LENGTH_BYTES), } } @@ -85,7 +90,7 @@ impl ServerSelectionTracingEventEmitter<'_> { operation = self.operation_name, selector = self.criteria.tracing_representation(), topologyDescription = topology_description.tracing_representation(), - failure = error.tracing_representation(), + failure = error.tracing_representation(self.max_document_length_bytes), "Server selection failed" ); } diff --git a/src/trace/topology.rs b/src/trace/topology.rs index 19cebdb00..2ee67e111 100644 --- a/src/trace/topology.rs +++ b/src/trace/topology.rs @@ -195,7 +195,7 @@ impl TopologyTracingEventEmitter { driverConnectionId = event.driver_connection_id, serverConnectionId = event.server_connection_id, awaited = event.awaited, - failure = event.failure.tracing_representation(), + failure = event.failure.tracing_representation(self.max_document_length_bytes), durationMS = event.duration.as_millis(), "Server heartbeat failed" )