-
Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathmain.rs
103 lines (83 loc) · 2.85 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
use std::time::Duration;
use futures::{stream::FuturesUnordered, StreamExt};
use tokio::sync::mpsc;
struct Image {
url: String,
data: Vec<u8>,
}
async fn download_image(url: String) -> Image {
println!("downloading {url}");
tokio::time::sleep(Duration::from_millis(5)).await;
Image { url, data: vec![0] }
}
async fn process_image(image: Image) -> Image {
println!("processing image {}", image.url);
tokio::time::sleep(Duration::from_millis(20)).await;
Image {
url: image.url,
data: vec![1],
}
}
async fn save_image(image: Image) {
println!("saving image {}", image.url);
tokio::time::sleep(Duration::from_millis(5)).await;
}
async fn async_pipeline_example() {
let urls = (0..32).map(|i| format!("https://example.com/image/{}", i));
let (url_sender, mut url_receiver) = mpsc::channel(100);
let (image_sender, mut image_receiver) = mpsc::channel(100);
let (processed_sender, mut processed_receiver) = mpsc::channel(100);
let (output_sender, mut output_receiver) = mpsc::channel(100);
let h1 = tokio::spawn(async move {
while let Some(url) = url_receiver.recv().await {
let image = download_image(url).await;
if let Err(err) = image_sender.send(image).await {
println!("failed to send output: {}", err);
break;
}
}
});
let h2 = tokio::spawn(async move {
// process concurrently up to 4 images
let mut futures = FuturesUnordered::new();
loop {
let in_progress_len = futures.len();
tokio::select! {
biased;
Some(image) = image_receiver.recv(), if in_progress_len < 4 => {
futures.push(process_image(image));
},
Some(processed_image) = futures.next(), if in_progress_len > 0 => {
if let Err(err) = processed_sender.send(processed_image).await {
println!("failed to send output: {}", err);
break;
}
},
else => break
}
}
});
let h3 = tokio::spawn(async move {
while let Some(image) = processed_receiver.recv().await {
let image_url = image.url.clone();
save_image(image).await;
if let Err(err) = output_sender.send(image_url).await {
println!("failed to send output: {}", err);
break;
}
}
});
for url in urls {
url_sender.send(url).await.unwrap();
}
// drop sender to make channel finite
drop(url_sender);
while let Some(url) = output_receiver.recv().await {
println!("done with {url}");
}
tokio::try_join!(h1, h2, h3).unwrap();
}
#[tokio::main()]
async fn main() {
async_pipeline_example().await;
}