Skip to content

Commit 34ae89b

Browse files
authored
RUST-1871 Convert gridfs to a fluent API (mongodb#1051)
1 parent ccfeb72 commit 34ae89b

17 files changed

+715
-655
lines changed

src/action.rs

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ mod drop;
1212
mod drop_index;
1313
mod find;
1414
mod find_and_modify;
15+
pub mod gridfs;
1516
mod insert_many;
1617
mod insert_one;
1718
mod list_collections;

src/action/gridfs.rs

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
//! Action builders for gridfs.
2+
3+
mod delete;
4+
mod download;
5+
mod drop;
6+
mod find;
7+
mod rename;
8+
mod upload;
9+
10+
pub use delete::Delete;
11+
pub use download::{OpenDownloadStream, OpenDownloadStreamByName};
12+
pub use drop::Drop;
13+
pub use find::{Find, FindOne};
14+
pub use rename::Rename;
15+
pub use upload::OpenUploadStream;

src/action/gridfs/delete.rs

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
use bson::{doc, Bson};
2+
3+
#[cfg(docsrs)]
4+
use crate::gridfs::FilesCollectionDocument;
5+
use crate::{
6+
action::action_impl,
7+
error::{ErrorKind, GridFsErrorKind, GridFsFileIdentifier, Result},
8+
GridFsBucket,
9+
};
10+
11+
impl GridFsBucket {
12+
/// Deletes the [`FilesCollectionDocument`] with the given `id` and its associated chunks from
13+
/// this bucket. This method returns an error if the `id` does not match any files in the
14+
/// bucket.
15+
///
16+
/// `await` will return [`Result<()>`].
17+
pub fn delete(&self, id: Bson) -> Delete {
18+
Delete { bucket: self, id }
19+
}
20+
}
21+
22+
#[cfg(feature = "sync")]
23+
impl crate::sync::gridfs::GridFsBucket {
24+
/// Deletes the [`FilesCollectionDocument`] with the given `id` and its associated chunks from
25+
/// this bucket. This method returns an error if the `id` does not match any files in the
26+
/// bucket.
27+
///
28+
/// [`run`](Delete::run) will return [`Result<()>`].
29+
pub fn delete(&self, id: Bson) -> Delete {
30+
self.async_bucket.delete(id)
31+
}
32+
}
33+
34+
/// Deletes a specific [`FilesCollectionDocument`] and its associated chunks. Construct with
35+
/// [`GridFsBucket::delete`].
36+
#[must_use]
37+
pub struct Delete<'a> {
38+
bucket: &'a GridFsBucket,
39+
id: Bson,
40+
}
41+
42+
action_impl! {
43+
impl<'a> Action for Delete<'a> {
44+
type Future = DeleteFuture;
45+
46+
async fn execute(self) -> Result<()> {
47+
let delete_result = self.bucket.files().delete_one(doc! { "_id": self.id.clone() }).await?;
48+
// Delete chunks regardless of whether a file was found. This will remove any possibly
49+
// orphaned chunks.
50+
self.bucket
51+
.chunks()
52+
.delete_many(doc! { "files_id": self.id.clone() })
53+
.await?;
54+
55+
if delete_result.deleted_count == 0 {
56+
return Err(ErrorKind::GridFs(GridFsErrorKind::FileNotFound {
57+
identifier: GridFsFileIdentifier::Id(self.id),
58+
})
59+
.into());
60+
}
61+
62+
Ok(())
63+
}
64+
}
65+
}

src/action/gridfs/download.rs

+178
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
use bson::{doc, Bson};
2+
3+
use crate::{
4+
action::{action_impl, deeplink, option_setters},
5+
error::{ErrorKind, GridFsErrorKind, GridFsFileIdentifier, Result},
6+
gridfs::{FilesCollectionDocument, GridFsDownloadByNameOptions},
7+
GridFsBucket,
8+
GridFsDownloadStream,
9+
};
10+
11+
impl GridFsBucket {
12+
/// Opens and returns a [`GridFsDownloadStream`] from which the application can read
13+
/// the contents of the stored file specified by `id`.
14+
///
15+
/// `await` will return d[`Result<GridFsDownloadStream>`].
16+
#[deeplink]
17+
pub fn open_download_stream(&self, id: Bson) -> OpenDownloadStream {
18+
OpenDownloadStream { bucket: self, id }
19+
}
20+
21+
/// Opens and returns a [`GridFsDownloadStream`] from which the application can read
22+
/// the contents of the stored file specified by `filename`.
23+
///
24+
/// If there are multiple files in the bucket with the given filename, the `revision` in the
25+
/// options provided is used to determine which one to download. See the documentation for
26+
/// [`GridFsDownloadByNameOptions`] for details on how to specify a revision. If no revision is
27+
/// provided, the file with `filename` most recently uploaded will be downloaded.
28+
///
29+
/// `await` will return d[`Result<GridFsDownloadStream>`].
30+
#[deeplink]
31+
pub fn open_download_stream_by_name(
32+
&self,
33+
filename: impl AsRef<str>,
34+
) -> OpenDownloadStreamByName {
35+
OpenDownloadStreamByName {
36+
bucket: self,
37+
filename: filename.as_ref().to_owned(),
38+
options: None,
39+
}
40+
}
41+
42+
// Utility functions for finding files within the bucket.
43+
44+
async fn find_file_by_id(&self, id: &Bson) -> Result<FilesCollectionDocument> {
45+
match self.find_one(doc! { "_id": id }).await? {
46+
Some(file) => Ok(file),
47+
None => Err(ErrorKind::GridFs(GridFsErrorKind::FileNotFound {
48+
identifier: GridFsFileIdentifier::Id(id.clone()),
49+
})
50+
.into()),
51+
}
52+
}
53+
54+
async fn find_file_by_name(
55+
&self,
56+
filename: &str,
57+
options: Option<GridFsDownloadByNameOptions>,
58+
) -> Result<FilesCollectionDocument> {
59+
let revision = options.and_then(|opts| opts.revision).unwrap_or(-1);
60+
let (sort, skip) = if revision >= 0 {
61+
(1, revision)
62+
} else {
63+
(-1, -revision - 1)
64+
};
65+
66+
match self
67+
.files()
68+
.find_one(doc! { "filename": filename })
69+
.sort(doc! { "uploadDate": sort })
70+
.skip(skip as u64)
71+
.await?
72+
{
73+
Some(fcd) => Ok(fcd),
74+
None => {
75+
if self
76+
.files()
77+
.find_one(doc! { "filename": filename })
78+
.await?
79+
.is_some()
80+
{
81+
Err(ErrorKind::GridFs(GridFsErrorKind::RevisionNotFound { revision }).into())
82+
} else {
83+
Err(ErrorKind::GridFs(GridFsErrorKind::FileNotFound {
84+
identifier: GridFsFileIdentifier::Filename(filename.into()),
85+
})
86+
.into())
87+
}
88+
}
89+
}
90+
}
91+
}
92+
93+
#[cfg(feature = "sync")]
94+
impl crate::sync::gridfs::GridFsBucket {
95+
/// Opens and returns a [`GridFsDownloadStream`] from which the application can read
96+
/// the contents of the stored file specified by `id`.
97+
///
98+
/// [`run`](OpenDownloadStream::run) will return d[`Result<GridFsDownloadStream>`].
99+
#[deeplink]
100+
pub fn open_download_stream(&self, id: Bson) -> OpenDownloadStream {
101+
self.async_bucket.open_download_stream(id)
102+
}
103+
104+
/// Opens and returns a [`GridFsDownloadStream`] from which the application can read
105+
/// the contents of the stored file specified by `filename`.
106+
///
107+
/// If there are multiple files in the bucket with the given filename, the `revision` in the
108+
/// options provided is used to determine which one to download. See the documentation for
109+
/// [`GridFsDownloadByNameOptions`] for details on how to specify a revision. If no revision is
110+
/// provided, the file with `filename` most recently uploaded will be downloaded.
111+
///
112+
/// [`run`](OpenDownloadStreamByName::run) will return d[`Result<GridFsDownloadStream>`].
113+
#[deeplink]
114+
pub fn open_download_stream_by_name(
115+
&self,
116+
filename: impl AsRef<str>,
117+
) -> OpenDownloadStreamByName {
118+
self.async_bucket.open_download_stream_by_name(filename)
119+
}
120+
}
121+
122+
/// Opens and returns a [`GridFsDownloadStream`] from which the application can read
123+
/// the contents of the stored file specified by an id. Construct with
124+
/// [`GridFsBucket::open_download_stream`].
125+
#[must_use]
126+
pub struct OpenDownloadStream<'a> {
127+
bucket: &'a GridFsBucket,
128+
id: Bson,
129+
}
130+
131+
action_impl! {
132+
impl<'a> Action for OpenDownloadStream<'a> {
133+
type Future = OpenDownloadStreamFuture;
134+
135+
async fn execute(self) -> Result<GridFsDownloadStream> {
136+
let file = self.bucket.find_file_by_id(&self.id).await?;
137+
GridFsDownloadStream::new(file, self.bucket.chunks()).await
138+
}
139+
140+
fn sync_wrap(out) -> Result<crate::sync::gridfs::GridFsDownloadStream> {
141+
out.map(crate::sync::gridfs::GridFsDownloadStream::new)
142+
}
143+
}
144+
}
145+
146+
/// Opens and returns a [`GridFsDownloadStream`] from which the application can read
147+
/// the contents of the stored file specified by a filename. Construct with
148+
/// [`GridFsBucket::open_download_stream_by_name`].
149+
#[must_use]
150+
pub struct OpenDownloadStreamByName<'a> {
151+
bucket: &'a GridFsBucket,
152+
filename: String,
153+
options: Option<GridFsDownloadByNameOptions>,
154+
}
155+
156+
impl<'a> OpenDownloadStreamByName<'a> {
157+
option_setters! { options: GridFsDownloadByNameOptions;
158+
revision: i32,
159+
}
160+
}
161+
162+
action_impl! {
163+
impl<'a> Action for OpenDownloadStreamByName<'a> {
164+
type Future = OpenDownloadStreamByNameFuture;
165+
166+
async fn execute(self) -> Result<GridFsDownloadStream> {
167+
let file = self
168+
.bucket
169+
.find_file_by_name(&self.filename, self.options)
170+
.await?;
171+
GridFsDownloadStream::new(file, self.bucket.chunks()).await
172+
}
173+
174+
fn sync_wrap(out) -> Result<crate::sync::gridfs::GridFsDownloadStream> {
175+
out.map(crate::sync::gridfs::GridFsDownloadStream::new)
176+
}
177+
}
178+
}

src/action/gridfs/drop.rs

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
use crate::{action::action_impl, error::Result, GridFsBucket};
2+
3+
impl GridFsBucket {
4+
/// Removes all of the files and their associated chunks from this bucket.
5+
///
6+
/// `await` will return [`Result<()>`].
7+
pub fn drop(&self) -> Drop {
8+
Drop { bucket: self }
9+
}
10+
}
11+
12+
#[cfg(feature = "sync")]
13+
impl crate::sync::gridfs::GridFsBucket {
14+
/// Removes all of the files and their associated chunks from this bucket.
15+
///
16+
/// [`run`](Drop::run) will return [`Result<()>`].
17+
pub fn drop(&self) -> Drop {
18+
self.async_bucket.drop()
19+
}
20+
}
21+
22+
/// Removes all of the files and their associated chunks from a bucket. Construct with
23+
/// [`GridFsBucket::drop`].
24+
#[must_use]
25+
pub struct Drop<'a> {
26+
bucket: &'a GridFsBucket,
27+
}
28+
29+
action_impl! {
30+
impl<'a> Action for Drop<'a> {
31+
type Future = DropFuture;
32+
33+
async fn execute(self) -> Result<()> {
34+
self.bucket.files().drop().await?;
35+
self.bucket.chunks().drop().await?;
36+
37+
Ok(())
38+
}
39+
}
40+
}

0 commit comments

Comments
 (0)