diff --git a/Cargo.toml b/Cargo.toml index 4227aa1..1255813 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,10 @@ edition = '2018' [dependencies] +futures = "0.1.25" libc = "~0.2" libnfs-sys = "~0.2" +log = "~0.4" +mio = "~0.6" nix = "~0.11" +tokio = "~0.1" \ No newline at end of file diff --git a/examples/basic.rs b/examples/basic.rs index 2659fc1..2c0663a 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -6,10 +6,19 @@ use nix::{fcntl::OFlag, sys::stat::Mode}; fn main() -> Result<()> { let mut nfs = Nfs::new()?; - nfs.set_uid(1000)?; - nfs.set_gid(1000)?; + //nfs.set_uid(0)?; + //nfs.set_gid(0)?; nfs.set_debug(9)?; - nfs.mount("0.0.0.0", "/srv/nfs")?; + println!("mounting"); + nfs.mount("192.168.122.78", "/var/nfs")?; + nfs.stat64_async(&Path::new("foo"), |result|{ + println!("async stat result: {:?}", result); + + // Pass it on + result + })?; + nfs.run_async()?; + /* let dir = nfs.opendir(&Path::new("/"))?; for f in dir { @@ -29,5 +38,6 @@ fn main() -> Result<()> { let file = nfs.open(&Path::new("/rust"), OFlag::O_RDONLY)?; let buff = file.read(1024)?; println!("read file: {}", String::from_utf8_lossy(&buff)); + */ Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index 1fc4ad7..3a2c1ca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,19 +3,27 @@ //! version=4 or programatically calling nfs_set_version(nfs, NFS_V4) before //! connecting to the server/share. //! +use futures::future::Future; +use futures::unsync::oneshot; use libnfs_sys::*; +use log::error; +use mio::event::Evented; +use mio::unix::EventedFd; use nix::fcntl::OFlag; +use nix::poll::poll; +use nix::poll::{EventFlags, PollFd}; use nix::sys::stat::Mode; +use tokio::prelude::*; -use std::ffi::{CStr, CString}; +use std::ffi::{c_void, CStr, CString}; use std::io::{Error, ErrorKind, Result}; -use std::mem::zeroed; +use std::mem::{transmute, zeroed}; use std::os::unix::ffi::OsStrExt; use std::path::{Path, PathBuf}; use std::ptr; use std::rc::Rc; -#[derive(Clone)] +#[derive(Clone, Debug)] struct NfsPtr(*mut nfs_context); impl Drop for NfsPtr { @@ -48,7 +56,95 @@ fn check_retcode(ctx: *mut nfs_context, code: i32) -> Result<()> { } } -#[derive(Clone)] +#[no_mangle] +pub extern "C" fn nfs_mount_callback( + result: i32, + context: *mut nfs_context, + // Data is null for mount + _data: *mut c_void, + private_data: *mut c_void, +) { + println!("Callback called"); + unsafe { + // box the callback function + let callback: Box) -> Result<()>> = + Box::from_raw(private_data as *mut fn(Result<()>) -> Result<()>); + // Check the async result from libnfs + let result: Result<()> = if result < 0 { + let err_str = nfs_get_error(context); + let e = CStr::from_ptr(err_str).to_string_lossy().into_owned(); + Err(Error::new(ErrorKind::Other, e)) + } else { + Ok(()) + }; + if let Err(e) = callback(result) { + error!("nfs_mount callback error: {:?}", e); + }; + } +} + +#[no_mangle] +pub extern "C" fn nfs_open_callback( + result: i32, + context: *mut nfs_context, + // data is an *mut nfsfh + data: *mut c_void, + private_data: *mut c_void, +) { + println!("open callback called"); + unsafe { + // box the callback function + let callback: Box) -> Result> = + Box::from_raw(private_data as *mut fn(Result) -> Result); + // Check the async result from libnfs + let result: Result = if result < 0 { + let err_str = nfs_get_error(context); + let e = CStr::from_ptr(err_str).to_string_lossy().into_owned(); + Err(Error::new(ErrorKind::Other, e)) + } else { + let nfsfh = NfsFile { + nfs: Rc::new(NfsPtr(context)), + handle: data as *mut nfsfh, + }; + Ok(nfsfh) + }; + if let Err(e) = callback(result) { + error!("nfs_mount callback error: {:?}", e); + }; + } +} + +#[no_mangle] +pub extern "C" fn stat64_callback( + result: i32, + context: *mut nfs_context, + // data is an *mut nfs_stat_64 + data: *mut c_void, + private_data: *mut c_void, +) { + unsafe { + // box the callback function + println!("callback called with private_data: {:p}", &private_data); + let callback: &mut Box) -> Result<()>> = transmute(private_data); + println!("boxed callback: {:p}", &callback); + // Check the async result from libnfs + let result: Result<*mut nfs_stat_64> = if result < 0 { + let err_str = nfs_get_error(context); + let e = CStr::from_ptr(err_str).to_string_lossy().into_owned(); + Err(Error::new(ErrorKind::Other, e)) + } else { + let stat = data as *mut nfs_stat_64; + Ok(stat) + }; + println!("Calling user callback with: {:?}", result); + if let Err(e) = callback(result) { + error!("nfs_mount callback error: {:?}", e); + }; + println!("Done"); + } +} + +#[derive(Clone, Debug)] pub struct Nfs { context: Rc, } @@ -105,7 +201,7 @@ pub struct DirEntry { pub ctime_nsec: u32, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct NfsDirectory { nfs: Rc, handle: *mut nfsdir, @@ -121,7 +217,7 @@ impl Drop for NfsDirectory { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct NfsFile { nfs: Rc, handle: *mut nfsfh, @@ -332,6 +428,32 @@ impl Nfs { } } + /// Accepts an optional callback function pointer + pub fn mount_async(&self, server: &str, export_name: &str, callback: F) -> Result<()> + where + F: FnOnce(Result<()>) -> Result<()> + 'static, + // This should accept a Nfs and return a Future + // how does the async chain get created? + // mount -> stat -> open -> etc etc etc all needs to be chained + { + let server = CString::new(server.as_bytes()).unwrap(); + let export = CString::new(export_name.as_bytes()).unwrap(); + let callback_ptr = Box::into_raw(Box::new(callback)); + unsafe { + check_retcode( + self.context.0, + nfs_mount_async( + self.context.0, + server.as_ptr(), + export.as_ptr(), + Some(nfs_mount_callback), + callback_ptr as *mut c_void, + ), + )?; + } + Ok(()) + } + /// Supported flags are /// O_APPEND /// O_RDONLY @@ -359,6 +481,31 @@ impl Nfs { } } + /// Accepts an optional callback function pointer + pub fn open_async(&self, path: &Path, flags: OFlag, callback: F) -> Result<()> + where + F: FnOnce(Result) -> Result + 'static, + // This should accept a Nfs and return a Future + // how does the async chain get created? + // mount -> stat -> open -> etc etc etc all needs to be chained + { + let path = CString::new(path.as_os_str().as_bytes())?; + let callback_ptr = Box::into_raw(Box::new(callback)); + unsafe { + check_retcode( + self.context.0, + nfs_open_async( + self.context.0, + path.as_ptr(), + flags.bits(), + Some(nfs_open_callback), + callback_ptr as *mut c_void, + ), + )?; + } + Ok(()) + } + pub fn opendir(&mut self, path: &Path) -> Result { let path = CString::new(path.as_os_str().as_bytes())?; unsafe { @@ -525,6 +672,55 @@ impl Nfs { Ok(()) } + pub fn run_async(&mut self) -> Result<()> { + loop { + let mut file_desc: Vec = Vec::new(); + let fd = unsafe { nfs_get_fd(self.context.0) }; + let events_needed = unsafe { nfs_which_events(self.context.0) }; + println!("events_needed: {:?}", events_needed); + let nfs_fd = PollFd::new(fd, EventFlags::from_bits_truncate(events_needed as i16)); + file_desc.push(nfs_fd); + if poll(&mut file_desc, -1).expect("poll failed") < 0 { + println!("poll failed"); + break; + } else { + println!("polled. Got revents: {:?}", file_desc[0].revents()); + if unsafe { + nfs_service( + self.context.0, + file_desc[0].revents().expect("nfs_service failed").bits() as i32, + ) < 0 + } { + println!("nfs_service failed"); + break; + } + } + } + Ok(()) + } + + pub fn setup_async(&mut self) -> Result<()> { + let register = tokio::reactor::Registration::new(); + let fd = unsafe { nfs_get_fd(self.context.0) }; + let events_needed = unsafe { nfs_which_events(self.context.0) }; + println!("nsf_get_fd: {}", fd); + println!("nfs_which_events: {}", events_needed); + let registered = register.register(&EventedFd(&fd))?; + if registered { + println!("registered"); + register.poll_read_ready()?; + println!("read ready"); + if unsafe { nfs_service(self.context.0, events_needed) } < 0 { + println!("nfs_service failed"); + } + } else { + // I/O resource has been previously registered + + } + + Ok(()) + } + pub fn stat64(&self, path: &Path) -> Result { let path = CString::new(path.as_os_str().as_bytes())?; unsafe { @@ -537,6 +733,27 @@ impl Nfs { } } + pub fn stat64_async(&self, path: &Path, callback: F) -> Result<()> + where + F: FnMut(Result) -> Result, F: 'static, + { + let path = CString::new(path.as_os_str().as_bytes())?; + let cb: Box) -> Result>> = Box::new(Box::new(callback)); + println!("stat64_async callback_ptr: {:p}", &cb); + unsafe { + check_retcode( + self.context.0, + nfs_stat64_async( + self.context.0, + path.as_ptr(), + Some(stat64_callback), + Box::into_raw(cb) as _ + ), + )?; + Ok(()) + } + } + pub fn statvfs(&self, path: &Path) -> Result { let path = CString::new(path.as_os_str().as_bytes())?; unsafe {