This repository was archived by the owner on Oct 23, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 169
/
Copy pathfetch_and_cat.rs
107 lines (92 loc) · 3.23 KB
/
fetch_and_cat.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use futures::pin_mut;
use futures::stream::StreamExt; // needed for StreamExt::next
use ipfs::{Ipfs, IpfsOptions, IpfsPath, MultiaddrWithPeerId, TestTypes, UninitializedIpfs};
use std::env;
use std::process::exit;
use tokio::io::AsyncWriteExt;
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
// this example will wait forever attempting to fetch a CID provided at command line. It is
// expected to be used by connecting another ipfs peer to it and providing the blocks from that
// peer.
let path = match env::args().nth(1).map(|s| s.parse::<IpfsPath>()) {
Some(Ok(cid)) => cid,
Some(Err(e)) => {
eprintln!(
"Failed to parse {} as IpfsPath: {}",
env::args().nth(1).unwrap(),
e
);
exit(1);
}
None => {
eprintln!("Usage: fetch_and_cat <IPFS_PATH | CID> [MULTIADDR]");
eprintln!(
"Example will accept connections and print all bytes of the unixfs file to \
stdout."
);
eprintln!("If second argument is present, it is expected to be a Multiaddr with \
peer_id. The given Multiaddr will be connected to instead of awaiting an incoming connection.");
exit(0);
}
};
if path.root().cid().is_none() {
eprintln!(
"Unsupported path: ipns resolution is not available yet: {}",
path
);
exit(1);
}
let target = env::args()
.nth(2)
.map(|s| s.parse::<MultiaddrWithPeerId>().unwrap());
// Start daemon and initialize repo
let mut opts = IpfsOptions::inmemory_with_generated_keys();
opts.mdns = false;
let (ipfs, fut): (Ipfs<TestTypes>, _) = UninitializedIpfs::new(opts, None)
.await
.start()
.await
.unwrap();
tokio::task::spawn(fut);
if let Some(target) = target {
ipfs.connect(target).await.unwrap();
} else {
let (_, addresses, _) = ipfs.identity().await.unwrap();
assert!(!addresses.is_empty(), "Zero listening addresses");
eprintln!("Please connect an ipfs node having {} to:\n", path);
for address in addresses {
eprintln!(" - {}", address);
}
eprintln!();
}
let stream = ipfs.cat_unixfs(path, None).await.unwrap_or_else(|e| {
eprintln!("Error: {}", e);
exit(1);
});
// The stream needs to be pinned on the stack to be used with StreamExt::next
pin_mut!(stream);
let mut stdout = tokio::io::stdout();
let mut total = 0;
loop {
// This could be made more performant by polling the stream while writing to stdout.
match stream.next().await {
Some(Ok(bytes)) => {
total += bytes.len();
stdout.write_all(&bytes).await.unwrap();
eprintln!(
"Received: {:>12} bytes, Total: {:>12} bytes",
bytes.len(),
total
);
}
Some(Err(e)) => {
eprintln!("Error: {}", e);
exit(1);
}
None => break,
}
}
eprintln!("Total received: {} bytes", total);
}