diff --git a/Cargo.toml b/Cargo.toml index 5141a9b..0f2a41c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "flinch" -version = "0.1.64" +version = "0.1.65" edition = "2021" authors = ["Mohammad Julfikar ", "Mohammad Julfikar "] categories = ["parser-implementations","caching","database-implementations"] diff --git a/src/collection.rs b/src/collection.rs index d3149ce..063acd7 100644 --- a/src/collection.rs +++ b/src/collection.rs @@ -163,7 +163,7 @@ impl Collection } let query = NotificationType::Insert(k.to_string(), v); - let _ = self.watchman.notify(PubSubEvent::Data(query)); + let _ = self.watchman.notify(PubSubEvent::Data(query)).await; Ok(exec.done()) } diff --git a/src/schemas.rs b/src/schemas.rs index 4d0bba5..32968f4 100644 --- a/src/schemas.rs +++ b/src/schemas.rs @@ -10,10 +10,12 @@ use log::{error, LevelFilter, trace}; use simplelog::{ColorChoice, CombinedLogger, Config, TerminalMode, TermLogger, WriteLogger}; use size::{Base, Size}; use sled::Db; +use tokio::sync::mpsc::Sender; use crate::authenticate::Authenticate; +use crate::doc::QueryBased; use crate::errors::DbError; -use crate::headers::{DbName, DbUser, FlinchCnf, FlinchError, QueryResult, SessionId}; +use crate::headers::{DbName, DbUser, FlinchCnf, FlinchError, PubSubEvent, QueryResult, SessionId}; use crate::persistent::Persistent; use crate::pri_headers::{FLINCH, INTERNAL_TREE, MAGIC_DB, MAX_DBNAME_LEN, MAX_USERNAME_LEN, MIN_DBNAME_LEN, MIN_PW_LEN, MIN_USERNAME_LEN, PermissionTypes}; use crate::query::Query; @@ -52,7 +54,7 @@ impl Schemas { CombinedLogger::init( vec![ TermLogger::new(LevelFilter::Info, Config::default(), TerminalMode::Mixed, ColorChoice::Auto), - WriteLogger::new(LevelFilter::Error, Config::default(), writer), + WriteLogger::new(LevelFilter::Warn, Config::default(), writer), ] ).unwrap(); } @@ -318,4 +320,37 @@ impl Schemas { Ok(()) } + + pub async fn subscribe(&self, session_id: SessionId, collection_name: &str, sx: Sender>) -> anyhow::Result<(), QueryResult> { + let ttk = ExecTime::new(); + let user = self.auth.user(session_id.clone()); + if user.is_none() { + return Err(QueryResult { + data: vec![], + error: FlinchError::SchemaError(DbError::NoSession), + time_taken: ttk.done(), + }); + } + let user = user.unwrap(); + + let db = self.dbs.get(trim_apos(&user.db).as_str()); + if db.is_none() { + return Err(QueryResult { + data: vec![], + error: FlinchError::SchemaError(DbError::DbNotExists(user.clone().db)), + time_taken: ttk.done(), + }); + } + let db = db.unwrap(); + let db = db.value(); + let res = db.subscribe(collection_name,sx).await; + if res.is_err() { + return Err(QueryResult { + data: vec![], + error: res.err().unwrap(), + time_taken: ttk.done(), + }); + } + Ok(()) + } } diff --git a/tests/library.rs b/tests/library.rs index 60c68eb..2b7f8af 100644 --- a/tests/library.rs +++ b/tests/library.rs @@ -40,11 +40,11 @@ mod tests { let instance = instance.unwrap(); let collection = instance.value(); - /*let (sx, mut rx) = tokio::sync::mpsc::channel(30000); - collection.sub(sx).await.expect("subscribe to channel");*/ + let (sx, mut rx) = tokio::sync::mpsc::channel(30000); + collection.sub(sx).await.expect("subscribe to channel"); let insert = Instant::now(); - let record_size = 1000; + let record_size = 7000; for k in 0..record_size { let v = serde_json::to_string( &User { @@ -96,16 +96,16 @@ mod tests { collection.empty().await; assert_eq!(collection.len(),0,"after::drop"); - /*let mut i = 0; + let mut i = 0; loop { let event = rx.recv().await.unwrap(); match event { PubSubEvent::Data(d) => { match d { - ActionType::Insert(k, _v) => { + NotificationType::Insert(k, _v) => { println!("inserted :pub/sub: {}",k); } - ActionType::Remove(k) => { + NotificationType::Remove(k) => { println!("removed :: {}",k); } }; @@ -118,6 +118,6 @@ mod tests { if i == 10 { // for demo, listen till 10 message only break; } - }*/ + } } } \ No newline at end of file diff --git a/tests/schema.rs b/tests/schema.rs index e1060ef..c467a50 100644 --- a/tests/schema.rs +++ b/tests/schema.rs @@ -2,9 +2,11 @@ mod tests { use serde::{Serialize, Deserialize}; use std::path::Path; + use std::sync::Arc; use flinch::database::CollectionOptions; + use flinch::doc::QueryBased; use flinch::doc_trait::ViewConfig; - use flinch::headers::{DbUser, FlinchError}; + use flinch::headers::{DbUser, FlinchError, NotificationType, PubSubEvent}; use flinch::schemas::Schemas; const COLLECTION: &str = "demo"; @@ -16,13 +18,13 @@ mod tests { } #[tokio::test] - async fn test() { + async fn schema() { let _ = std::fs::remove_dir_all(Path::new(".").join("data").as_path()); let schema = Schemas::init().await; assert!(schema.is_ok()); - let schema = schema.unwrap(); + let schema = Arc::new(schema.unwrap()); let login = schema.login("root","flinch","*"); assert!(login.is_ok(), "{}", login.err().unwrap()); @@ -96,6 +98,31 @@ mod tests { let col_created = schema.flql(format!("new({});",options.as_str()).as_str(),session_id.clone()).await; assert!(col_created.error.eq(&FlinchError::None),"{:?}",col_created); + let schm = Arc::clone(&schema); + let (sx, mut rx) = tokio::sync::mpsc::channel(300); + let sid = session_id.clone(); + let _ = tokio::task::spawn(async move { + let _ = schm.subscribe(sid,COLLECTION,sx).await; + let mut i = 0; + while let Some(ev) = rx.recv().await { + match ev { + PubSubEvent::Data(d) => { + match d { + NotificationType::Insert(k, _v) => { + println!("{:?}",k); + } + NotificationType::Remove(_) => {} + } + } + PubSubEvent::Subscribed(_) => {} + } + i += 1; + if i == 10 { + break; + } + } + }); + let record_size = 7402; for k in 0..record_size { let v = serde_json::to_string(