Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support something like smol::Async #2558

Closed
yihuang opened this issue May 22, 2020 · 23 comments
Closed

Support something like smol::Async #2558

yihuang opened this issue May 22, 2020 · 23 comments
Labels
A-tokio Area: The main tokio crate C-feature-request Category: A feature request. M-io Module: tokio/io M-net Module: tokio/net

Comments

@yihuang
Copy link

yihuang commented May 22, 2020

https://github.com/stjepang/smol/blob/master/src/async_io.rs#L107

@yihuang yihuang added A-tokio Area: The main tokio crate C-feature-request Category: A feature request. labels May 22, 2020
@Darksonn Darksonn added M-io Module: tokio/io M-net Module: tokio/net labels May 22, 2020
@Darksonn
Copy link
Contributor

Darksonn commented May 22, 2020

Tokio already has types such as TcpStream and UdpSocket that allow use of various types of sockets. Can you elaborate on what you feel that Tokio is missing?

For more low level access to the underlying IO loop, there's also PollEvented.

@carllerche
Copy link
Member

Closing as there is not enough information to support the request (use case, motivation, ....).

Feel free to comment if you disagree. I can reopen it.

@yihuang
Copy link
Author

yihuang commented May 22, 2020

Tokio already has types such as TcpStream and UdpSocket that allow use of various types of sockets. Can you elaborate on what you feel that Tokio is missing?

For more low level access to the underlying IO loop, there's also PollEvented.

I was trying to find an async version of zeromq library, my environment needs tokio 2.0.
I can find several options:

  • async-zmq
    • Cons: many lines of codes, running it's own reactor in background thread.
  • zmq-tokio
    • Cons: tokio 1.0

And this is my experiment result with smol's Async: erickt/rust-zmq@b33e08e. You can see that it's trivial to integrate the existing zmq library into async world with this approach. So I hope to find tokio2.0 equivalent of this Async thing.

@Darksonn
Copy link
Contributor

The example you provided doesn't compile because zmq::Socket doesn't implement AsRawFd. Even if it did, making sockets return WouldBlock errors inside libraries that normally expect their sockets to be in blocking mode sounds like a recipe for disaster.

For example, if the operation required multiple writes to the underlying IO resource (which zmq's send definitely does), then returning an IO error in the middle would likely put the connection in an invalid state, and if you tried sending again, it would start over from the start of the message, even though some of the message was already sent.

@hawkw
Copy link
Member

hawkw commented May 22, 2020

And this is my experiment result with smol's Async: erickt/rust-zmq@b33e08e. You can see that it's trivial to integrate the existing zmq library into async world with this approach.

Does the linked code actually work? My understanding is that smol::Async::new requires the wrapped type implement the standard library's AsRawFd trait, which (as far as I can tell) zmq::Socket does not implement. Am I missing something?

As a side note, tokio already provides the PollEvented type for associating an I/O resource with the Tokio event loop. Using PollEvented in conjunction with mio's EventedFd is similar to what smol::Async does (as I understand it). Would this work in your use-case?

@yihuang
Copy link
Author

yihuang commented May 22, 2020

And this is my experiment result with smol's Async: erickt/rust-zmq@b33e08e. You can see that it's trivial to integrate the existing zmq library into async world with this approach.

Does the linked code actually work? My understanding is that smol::Async::new requires the wrapped type implement the standard library's AsRawFd trait, which (as far as I can tell) zmq::Socket does not implement. Am I missing something?

The AsRawFd is implemented in predecessor commit in the same PR.

As a side note, tokio already provides the PollEvented type for associating an I/O resource with the Tokio event loop. Using PollEvented in conjunction with mio's EventedFd is similar to what smol::Async does (as I understand it). Would this work in your use-case?

Thanks, I'll try this, maybe it'll come out similar.

@yihuang
Copy link
Author

yihuang commented May 22, 2020

The example you provided doesn't compile because zmq::Socket doesn't implement AsRawFd. Even if it did, making sockets return WouldBlock errors inside libraries that normally expect their sockets to be in blocking mode sounds like a recipe for disaster.

For example, if the operation required multiple writes to the underlying IO resource (which zmq's send definitely does), then returning an IO error in the middle would likely put the connection in an invalid state, and if you tried sending again, it would start over from the start of the message, even though some of the message was already sent.

The AsRawFd is implemented in predecessor commit in the same PR.

I think the zmq API is designed to work with both blocking and non-blocking style:

And:

  • First of all, from my understanding, what the get_fd returns is not the actual communication socket, but an event notification fd.
  • What the send does is actually put the message into a queue, it only blocks if the queue is full.
  • The actual network IO is done in background threads.

@Darksonn
Copy link
Contributor

Interesting. I would still be super worried about the Rust shim around the C library doing the right thing. But yes, PollEvented would be the tool here.

@yihuang
Copy link
Author

yihuang commented May 22, 2020

Interesting. I would still be super worried about the Rust shim around the C library doing the right thing. But yes, PollEvented would be the tool here.

I did a little bit more research on this, find this, which is summarised by one of the authors of tmq, which is a "Rust ZeroMQ bindings for Tokio", it looks decently written, work with Tokio 0.2, and uses the PollEvented<EventedFd> under the hood. So I guess the problem is solved for me ;D

@hawkw
Copy link
Member

hawkw commented May 22, 2020

Maybe something we can do on Tokio's side to help prevent similar confusion in the future is highlighting the use of PollEvented to add new evented primitives to Tokio's event loop more clearly in the documentation?

@yihuang
Copy link
Author

yihuang commented May 23, 2020

Maybe something we can do on Tokio's side to help prevent similar confusion in the future is highlighting the use of PollEvented to add new evented primitives to Tokio's event loop more clearly in the documentation?

I just implemented the Async on top of PollEvented and EventedFd: tokio-async-io.
The example with rust-zmq runs ok, I guess the correctness still needs the review from experts.
Particularly, I don't understand why this code is necessary. @stjepang
One caveat: EventedFd only support Unix platform.

@WGH-
Copy link

WGH- commented Sep 13, 2020

Maybe something we can do on Tokio's side to help prevent similar confusion in the future is highlighting the use of PollEvented to add new evented primitives to Tokio's event loop more clearly in the documentation?

Is there an easy way to wrap an existing non-blocking FD to provide AsyncRead, though? Well, of course one could just copy-paste some code from there -

#[derive(Debug)]
pub(crate) struct Fd<T> {
inner: T,
}
impl<T> io::Read for Fd<T>
where
T: io::Read,
{
fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
self.inner.read(bytes)
}
}
impl<T> io::Write for Fd<T>
where
T: io::Write,
{
fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
self.inner.write(bytes)
}
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
impl<T> AsRawFd for Fd<T>
where
T: AsRawFd,
{
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}
impl<T> Evented for Fd<T>
where
T: AsRawFd,
{
fn register(
&self,
poll: &MioPoll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
EventedFd(&self.as_raw_fd()).register(poll, token, interest | UnixReady::hup(), opts)
}
fn reregister(
&self,
poll: &MioPoll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
EventedFd(&self.as_raw_fd()).reregister(poll, token, interest | UnixReady::hup(), opts)
}
fn deregister(&self, poll: &MioPoll) -> io::Result<()> {
EventedFd(&self.as_raw_fd()).deregister(poll)
}
}
pub(crate) type ChildStdin = PollEvented<Fd<std::process::ChildStdin>>;
pub(crate) type ChildStdout = PollEvented<Fd<std::process::ChildStdout>>;
pub(crate) type ChildStderr = PollEvented<Fd<std::process::ChildStderr>>;
fn stdio<T>(option: Option<T>) -> io::Result<Option<PollEvented<Fd<T>>>>
where
T: AsRawFd,
{
let io = match option {
Some(io) => io,
None => return Ok(None),
};
// Set the fd to nonblocking before we pass it to the event loop
unsafe {
let fd = io.as_raw_fd();
let r = libc::fcntl(fd, libc::F_GETFL);
if r == -1 {
return Err(io::Error::last_os_error());
}
let r = libc::fcntl(fd, libc::F_SETFL, r | libc::O_NONBLOCK);
if r == -1 {
return Err(io::Error::last_os_error());
}
}
Ok(Some(PollEvented::new(Fd { inner: io })?))
}
- but that's no fun.

@Darksonn
Copy link
Contributor

@WGH- The mio crate has several implementations of the Evented trait, which can be used directly in PollEvented.

@WGH-
Copy link

WGH- commented Sep 14, 2020

@Darksonn are you referring to EventedFd? Although it provides quite a bit of value, you still need to implement io::Write/io::Read manually (like the linked fragment from tokio's process does), which is somewhat considerable amount of boilerplate.

Doing something like let f: tokio::fs::File = unsafe { std::fs::file::frow_raw_fd(fd) }.into(); could work, however, tokio::fs::File doesn't handle WouldBlock, returning it to the caller instead of waiting. For example, Go fixed this awhile ago: golang/go@ea5825b (there's a typo in commit title, but the description is accurate)

@Darksonn
Copy link
Contributor

Darksonn commented Sep 14, 2020

Well converting it to a File is a bad idea, as files are incompatible with APIs such as epoll and use blocking IO on a separate threadpool. You would have to convert it to e.g. a TcpStream to get something that actually ties it in with epoll (and handles WouldBlock correctly).

In any case, we are currently working on Tokio v0.3, which will rework this part of the API to remove the public dependency on mio. Designing this part of the API is still a work-in-progress.

Edit: It is correct that, with the current API, you are forced to write some boilerplate yourself if none of the IO types in Tokio fit your needs.

@WGH-
Copy link

WGH- commented Sep 14, 2020

FWIW, the FD in question is anonymous pipe. Unfortunately, the std::process (and tokio::process) doesn't have an easy way to pipe stdout and stderr into a single stream (a la 2>&1), so the only option is to create a pipe manually, and pass its write end as both stdout and stderr. This is the easy part. Wrapping the other end of the pipe into something nonblocking and tokio-compatible is the hard part (requires writing the boilerplate). See also rust-lang/rfcs#871

Although pipes are probably the main thing here, I believe this problem is not limited to pipes. There's quite a few of Linux-specific whateverfd APIs that are 1) pollable 2) provide read/write interface (eventfd, timerfd), and it would be cool if one could use them with Tokio without writing extra code.

@tobz
Copy link
Member

tobz commented Sep 14, 2020

These are likely best written as utilities or utility crates, so that Tokio doesn't have to be coupled to specific Linux-only subsystems, etc. We've traditionally shied away from maintaining OS-specific facilities because it adds to the support/testing burden.

Personally, writing <20 lines of code for Evented seems like a very low bar if someone needs it. I'm not sure why tokio-process does it the way that it does, but the normal case absolutely doesn't involve writing Read/Write implementations, as seen in the examples here: https://docs.rs/mio/0.6.22/mio/unix/struct.EventedFd.html#examples

@WGH-
Copy link

WGH- commented Sep 14, 2020

https://docs.rs/tokio/0.2.22/tokio/io/trait.AsyncRead.html#impl-AsyncRead-11

impl<E> AsyncRead for PollEvented<E> where
    E: Evented + Read + Unpin, 

If you want to read from that file descriptor using AsyncRead/AsyncReadExt, plain EventedFd is not sufficient, because it doesn't provide Read on its own. You need to wrap it with something that provides it, and that exactly what tokio/tokio/src/process/unix/mod.rs does.

@tobz
Copy link
Member

tobz commented Sep 14, 2020

Ah, you're right. I was looking at some similar code I had written that only used Evented for readiness, not actual reading/writing.

At any rate, everything else I said still stands. :P

@WGH-
Copy link

WGH- commented Dec 9, 2020

Hmm, given that tokio 0.3 has tokio::io::unix::AsyncFd now, I suppose this issue was silently addressed?

@Darksonn
Copy link
Contributor

Darksonn commented Dec 9, 2020

The situation has changed since we are moving away from having mio in the public API, yes. It is still not equivalent to smol's Async type.

@WGH-
Copy link

WGH- commented Dec 9, 2020

True, but the amount of boilerplate code required to wrap with AsyncFd and add required operations is much smaller than before.

@Darksonn
Copy link
Contributor

Darksonn commented Dec 9, 2020

I am happy to hear you like the new API :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-feature-request Category: A feature request. M-io Module: tokio/io M-net Module: tokio/net
Projects
None yet
Development

No branches or pull requests

6 participants