Skip to content

Commit 703f516

Browse files
authored
refactor: spooled local file and cleanup (#85)
I refactored the code to extract `StreamingAndLocal` into its own file and add a `SpooledTemporaryFile`. The `SpooledTemporaryFile` keeps data written to it in memory until it exceeds a certain limit (5MB in this case). This PR is in preparation to refactor wheel caching.
1 parent e4c1a49 commit 703f516

File tree

7 files changed

+131
-91
lines changed

7 files changed

+131
-91
lines changed

.codespell-whitelist.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
crate
2+
ser

crates/rattler_installs_packages/src/http.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::str::FromStr;
1616
use std::sync::Arc;
1717
use std::time::SystemTime;
1818
use thiserror::Error;
19+
use tokio_util::compat::FuturesAsyncReadCompatExt;
1920
use url::Url;
2021

2122
// Attached to HTTP responses, to make testing easier
@@ -359,6 +360,7 @@ fn body_to_streaming_or_local(
359360
StreamingOrLocal::Streaming(Box::new(
360361
stream
361362
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
362-
.into_async_read(),
363+
.into_async_read()
364+
.compat(),
363365
))
364366
}

crates/rattler_installs_packages/src/package_database.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ impl PackageDb {
267267
)
268268
.await?;
269269

270-
let mut bytes = response.into_body().force_local().await.into_diagnostic()?;
270+
let mut bytes = response.into_body().into_local().await.into_diagnostic()?;
271271
let mut source = String::new();
272272
bytes.read_to_string(&mut source).into_diagnostic()?;
273273
html::parse_package_names_html(&source)
@@ -301,7 +301,7 @@ impl PackageDb {
301301
// Turn the response into a seekable response.
302302
let bytes = artifact_bytes
303303
.into_body()
304-
.force_local()
304+
.into_local()
305305
.await
306306
.into_diagnostic()?;
307307
A::new(name.clone(), bytes)

crates/rattler_installs_packages/src/utils.rs

Lines changed: 0 additions & 88 deletions
This file was deleted.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
mod read_and_seek;
2+
mod streaming_or_local;
3+
4+
use include_dir::{include_dir, Dir};
5+
use url::Url;
6+
7+
pub use read_and_seek::ReadAndSeek;
8+
pub use streaming_or_local::StreamingOrLocal;
9+
10+
/// Keep retrying a certain IO function until it either succeeds or until it doesn't return
11+
/// [`std::io::ErrorKind::Interrupted`].
12+
pub fn retry_interrupted<F, T>(mut f: F) -> std::io::Result<T>
13+
where
14+
F: FnMut() -> std::io::Result<T>,
15+
{
16+
loop {
17+
match f() {
18+
Ok(result) => return Ok(result),
19+
Err(err) if err.kind() != std::io::ErrorKind::Interrupted => {
20+
return Err(err);
21+
}
22+
_ => {
23+
// Otherwise keep looping!
24+
}
25+
}
26+
}
27+
}
28+
29+
/// Normalize url according to pip standards
30+
pub fn normalize_index_url(mut url: Url) -> Url {
31+
let path = url.path();
32+
if !path.ends_with('/') {
33+
url.set_path(&format!("{path}/"));
34+
}
35+
url
36+
}
37+
38+
pub(crate) static VENDORED_PACKAGING_DIR: Dir<'_> =
39+
include_dir!("$CARGO_MANIFEST_DIR/vendor/packaging/");
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
use std::io::{Read, Seek};
2+
3+
/// Defines that a type can be read and seeked. This trait has a blanket implementation for any type
4+
/// that implements both [`Read`] and [`Seek`].
5+
pub trait ReadAndSeek: Read + Seek {}
6+
7+
impl<T> ReadAndSeek for T where T: Read + Seek {}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
use crate::utils::ReadAndSeek;
2+
use futures::TryFutureExt;
3+
use std::{
4+
io,
5+
io::{Read, Seek, Write},
6+
};
7+
use tempfile::SpooledTempFile;
8+
use tokio::io::{AsyncRead, AsyncReadExt};
9+
use tokio::task::JoinError;
10+
11+
/// Represents a stream of data that is either coming in asynchronously from a remote source or from
12+
/// a synchronous location (like the filesystem).
13+
///
14+
/// It is often useful to make this distinction because reading from a remote source is often slower
15+
/// than reading synchronously (from disk or memory).
16+
pub enum StreamingOrLocal {
17+
/// Represents an asynchronous stream of data.
18+
Streaming(Box<dyn AsyncRead + Unpin + Send>),
19+
20+
/// Represents a synchronous stream of data.
21+
Local(Box<dyn ReadAndSeek + Send>),
22+
}
23+
24+
impl StreamingOrLocal {
25+
/// Stream in the contents of the stream and make sure we have a fast locally accessible stream.
26+
///
27+
/// If the stream is already local this will simply return that stream. If however the file is
28+
/// remote it will first be read to a temporary spooled file.
29+
pub async fn into_local(self) -> io::Result<Box<dyn ReadAndSeek + Send>> {
30+
match self {
31+
StreamingOrLocal::Streaming(mut stream) => {
32+
// Create a [`SpooledTempFile`] which is a blob of memory that is kept in memory if
33+
// it does not grow beyond 5MB, otherwise it is written to disk.
34+
let mut local_file = SpooledTempFile::new(5 * 1024 * 1024);
35+
36+
// Stream in the bytes and copy them to the temporary file.
37+
let mut buf = [0u8; 1024 * 8];
38+
loop {
39+
let bytes_read = stream.read(&mut buf).await?;
40+
if bytes_read == 0 {
41+
break;
42+
}
43+
local_file.write_all(&buf)?;
44+
}
45+
46+
// Restart the file from the start so we can start reading from it.
47+
local_file.rewind()?;
48+
49+
Ok(Box::new(local_file))
50+
}
51+
StreamingOrLocal::Local(stream) => Ok(stream),
52+
}
53+
}
54+
55+
/// Asynchronously read the contents of the stream into a vector of bytes.
56+
pub async fn read_to_end(self, bytes: &mut Vec<u8>) -> std::io::Result<usize> {
57+
match self {
58+
StreamingOrLocal::Streaming(mut streaming) => streaming.read_to_end(bytes).await,
59+
StreamingOrLocal::Local(mut local) => {
60+
match tokio::task::spawn_blocking(move || {
61+
let mut bytes = Vec::new();
62+
local.read_to_end(&mut bytes).map(|_| bytes)
63+
})
64+
.map_err(JoinError::try_into_panic)
65+
.await
66+
{
67+
Ok(Ok(result)) => {
68+
*bytes = result;
69+
Ok(bytes.len())
70+
}
71+
Ok(Err(err)) => Err(err),
72+
// Resume the panic on the main task
73+
Err(Ok(panic)) => std::panic::resume_unwind(panic),
74+
Err(Err(_)) => Err(std::io::ErrorKind::Interrupted.into()),
75+
}
76+
}
77+
}
78+
}
79+
}

0 commit comments

Comments
 (0)