Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions __test__/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -1125,3 +1125,35 @@ ava_1.default.serial('should handle getting directories concurrently', async (t)
t.is(quatre.name, 'quatre');
}
});
ava_1.default.serial('should handle watch', async (t) => {
if (node_process_1.default.env.TEST_USING_MOCKS) {
t.pass("n/a for mocks");
return;
}
const sleep = async (ms) => { return new Promise((resolve) => setTimeout(resolve, ms)); };
const rootHandle = await getRootHandle();
const smbHandle = rootHandle;
let caught = { path: "", action: "" };
smbHandle.watch(async (watchEvent) => {
if (!watchEvent || watchEvent.path !== "watch_event_file" || watchEvent.action !== "create") {
console.log("watch caught something unexpected:", watchEvent, new Date());
}
caught = watchEvent;
});
await sleep(500)
.then(async () => {
// console.log("post-sleep");
const rootHandleAlt = await getRootHandle();
// console.log("obtained root handle");
const fileHandle = await rootHandleAlt.getFileHandle("watch_event_file", { create: true });
// console.log("obtained file handle");
const writable = await fileHandle.createWritable();
// console.log("obtained writable");
const writer = await writable.getWriter();
// console.log("obtained writer");
await writer.write("eventful");
// console.log("finished writing");
});
// console.log("done watching", new Date());
t.deepEqual(caught, { path: "watch_event_file", action: "create" });
});
35 changes: 35 additions & 0 deletions __test__/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1229,3 +1229,38 @@ test.serial('should handle getting directories concurrently', async (t) => {
}
})

test.serial('should handle watch', async (t) => {
if (process.env.TEST_USING_MOCKS) {
t.pass("n/a for mocks");
return;
}

const sleep = async (ms: number) => { return new Promise((resolve) => setTimeout(resolve, ms)); };
const rootHandle = await getRootHandle();
const smbHandle = rootHandle as SmbDirectoryHandle;
let caught: {path: string, action: string} = {path: "", action: ""};
smbHandle.watch(async (watchEvent) => {
if (!watchEvent || watchEvent.path !== "watch_event_file" || watchEvent.action !== "create") {
console.log("watch caught something unexpected:", watchEvent, new Date());
}
caught = watchEvent;
})

await sleep(500)
.then(async () => {
// console.log("post-sleep");
const rootHandleAlt = await getRootHandle();
// console.log("obtained root handle");
const fileHandle = await rootHandleAlt.getFileHandle("watch_event_file", {create: true});
// console.log("obtained file handle");
const writable = await fileHandle.createWritable();
// console.log("obtained writable");
const writer = await writable.getWriter();
// console.log("obtained writer");
await writer.write("eventful");
// console.log("finished writing");
});
// console.log("done watching", new Date());
t.deepEqual(caught, {path: "watch_event_file", action: "create"});
})

4 changes: 2 additions & 2 deletions deps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ if [ "${OS}" == "Darwin" ]; then
CFLAGS="-fPIC" \
LDFLAGS="-framework GSS" \
YACC="${YACC}"
make -j${PROCS} clean all
make install
make clean
make -j${PROCS} install
popd
fi

Expand Down
73 changes: 42 additions & 31 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use send_wrapper::SendWrapper;
use std::{path::Path, sync::{Arc, RwLock, RwLockWriteGuard}, thread};

mod smb;
use smb::{VFSEntryType, VFSFileNotificationInformation, VFSFileNotification, VFSFileNotificationOperation, VFSWatchMode, VFS};
use smb::{VFSEntryType, VFSFileNotificationInformation, VFSFileNotification, VFSFileNotificationBoxed, VFSFileNotificationOperation, VFSWatchMode, VFS};

/*

Expand Down Expand Up @@ -109,6 +109,7 @@ interface Blob {

const FIELD_KIND: &str = "kind";
const FIELD_NAME: &str = "name";
const FIELD_URL: &str = "url";
const FIELD_PATH: &str = "path";
const FIELD_DATA: &str = "data";
const FIELD_TYPE: &str = "type";
Expand Down Expand Up @@ -321,6 +322,7 @@ impl Default for JsSmbCreateWritableOptions {
#[napi]
pub struct JsSmbHandle {
smb: Option<Arc<RwLock<Box<dyn VFS>>>>,
url: String,
path: String,
#[napi(readonly, ts_type="'directory' | 'file'")]
pub kind: String,
Expand All @@ -332,17 +334,25 @@ pub struct JsSmbHandle {
impl JsSmbHandle {

pub fn open(url: String) -> Result<Self> {
let conn_res = smb::connect(url);
Self::open_path(url, DIR_ROOT.into(), KIND_DIRECTORY.into(), DIR_ROOT.into())
}

fn open_path(url: String, path: String, kind: String, name: String) -> Result<Self> {
let conn_res = smb::connect(url.to_owned());
match conn_res {
Ok(conn) => {
return Ok(Self{smb: Some(Arc::new(RwLock::new(conn))), path: DIR_ROOT.into(), kind: KIND_DIRECTORY.into(), name: DIR_ROOT.into()});
return Ok(Self{smb: Some(Arc::new(RwLock::new(conn))), url, path, kind, name});
},
Err(e) => {
return Err(e.into())
},
}
}

fn clone_with_new_connection(&self) -> Result<Self> {
Self::open_path(self.url.to_owned(), self.path.to_owned(), self.kind.to_owned(), self.name.to_owned())
}

fn is_same(&self, other: &JsSmbHandle) -> bool {
other.kind == self.kind && other.name == self.name && (other.path.is_empty() || self.path.is_empty() || other.path == self.path)
}
Expand Down Expand Up @@ -400,8 +410,9 @@ impl FromNapiValue for JsSmbHandle {
let obj = Object::from_napi_value(env, napi_val)?;
let kind = obj.get::<&str, &str>(FIELD_KIND)?.unwrap_or_default().into();
let name = obj.get::<&str, &str>(FIELD_NAME)?.unwrap_or_default().into();
let url = obj.get::<&str, &str>(FIELD_URL)?.unwrap_or_default().into();
let path = obj.get::<&str, &str>(FIELD_PATH)?.unwrap_or_default().into();
Ok(Self{smb: None, path, kind, name})
Ok(Self{smb: None, url, path, kind, name})
},
|handle| Ok(handle.to_owned())
)
Expand Down Expand Up @@ -474,7 +485,7 @@ impl JsSmbDirectoryHandle {
_ => (KIND_FILE.into(), format_file_path(&self.handle.path, &name))
};
if kind != KIND_DIRECTORY || (name != DIR_CURRENT && name != DIR_PARENT) {
entries.push(JsSmbHandle{smb: self.handle.smb.clone(), path, kind, name});
entries.push(JsSmbHandle{smb: self.handle.smb.clone(), url: self.handle.url.to_owned(), path, kind, name});
}
}
}
Expand Down Expand Up @@ -513,7 +524,7 @@ impl JsSmbDirectoryHandle {
let smb = &self.handle.smb;
let my_smb = using_rwlock!(smb);
let _ = my_smb.mkdir(path.trim_end_matches('/'), 0o775)?;
Ok(JsSmbHandle{smb: self.handle.smb.clone(), path, kind: KIND_DIRECTORY.into(), name}.into())
Ok(JsSmbHandle{smb: self.handle.smb.clone(), url: self.handle.url.to_owned(), path, kind: KIND_DIRECTORY.into(), name}.into())
}

#[napi]
Expand All @@ -533,7 +544,7 @@ impl JsSmbDirectoryHandle {
let smb = &self.handle.smb;
let mut my_smb = using_rwlock!(smb);
let _ = my_smb.create(path.as_str(), nix::fcntl::OFlag::O_SYNC.bits() as u32, (Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IRGRP | Mode::S_IWGRP | Mode::S_IROTH | Mode::S_IWOTH).bits() as u32)?; // XXX: change mode value to 0o664?
Ok(JsSmbHandle{smb: self.handle.smb.clone(), path, kind: KIND_FILE.into(), name}.into())
Ok(JsSmbHandle{smb: self.handle.smb.clone(), url: self.handle.url.to_owned(), path, kind: KIND_FILE.into(), name}.into())
}

fn smb_remove(&self, entry: &JsSmbHandle, recursive: bool) -> Result<()> {
Expand Down Expand Up @@ -595,31 +606,27 @@ impl JsSmbDirectoryHandle {

#[napi]
pub fn watch(&self, callback: JsFunction) -> Result<()> {
let tsfn: ThreadsafeFunction<Result<JsSmbNotifyChange>, ErrorStrategy::Fatal> = callback
.create_threadsafe_function(0, |ctx: ThreadSafeCallContext<std::prelude::v1::Result<JsSmbNotifyChange, Error>>| {
match ctx.value {
Ok(value) => {
Ok(vec![value])
},
Err(err) => {
Err(err)
},
}
let tsfn: ThreadsafeFunction<Result<VFSFileNotificationInformation>, ErrorStrategy::Fatal> = callback
.create_threadsafe_function(0, |ctx: ThreadSafeCallContext<std::prelude::v1::Result<VFSFileNotificationInformation, Error>>| {
ctx.value.map(|info| {
let conv: JsSmbNotifyChange = info.into();
// println!("watch callback - path={} action={}", &conv.path, &conv.action);
vec![conv]
})
})?;

let handle = self.handle.clone();
let handle = self.handle.clone_with_new_connection()?;
//spawn(async move {
thread::spawn(move || {

fn watch_file_path(handle: &JsSmbHandle) -> Result<Vec<JsSmbNotifyChange>> {
fn watch_file_path(handle: &JsSmbHandle) -> Result<VFSFileNotificationBoxed> {
let smb = &handle.smb;
let path = &handle.path;
let my_smb = using_rwlock!(smb);
let watch_mode = VFSWatchMode::Recursive;
let listen_flags = VFSFileNotificationOperation::all();

let res = my_smb.watch(path, watch_mode, listen_flags)?;
Ok(res.into())
Ok(my_smb.watch(path, watch_mode, listen_flags)?)
}

let run_in_loop = true;
Expand All @@ -628,11 +635,14 @@ impl JsSmbDirectoryHandle {
let res_contents = watch_file_path(&handle);
match res_contents {
Ok(contents) => {
for item in contents.iter() {
let item_owned = item.to_owned();
tsfn.call(Ok(item_owned), ThreadsafeFunctionCallMode::NonBlocking);
}
}
for item in contents.into_iter() {
let it = match item {
Ok(itm) => Ok(itm),
Err(err) => Err(err.into()),
};
tsfn.call(it, ThreadsafeFunctionCallMode::NonBlocking);
}
},
Err(_) => {}
}
}
Expand Down Expand Up @@ -738,11 +748,12 @@ impl From<Box<dyn VFSFileNotification>> for Vec<JsSmbNotifyChange> {

impl From<VFSFileNotificationInformation> for JsSmbNotifyChange {
fn from(value: VFSFileNotificationInformation) -> Self {
let ret = JsSmbNotifyChange{
path: value.path,
action: value.operation.into(),
};
return ret;
let action = value.operation.into();
// println!("From<VFSFileNotificationInformation> for JsSmbNotifyChange - path={} action={}", &value.path, &action);
Self{
path: value.path,
action,
}
}
}

Expand Down
Loading