Skip to content

Commit

Permalink
- release 0.1.65
Browse files Browse the repository at this point in the history
- schema supports subscriber
  • Loading branch information
julfikar committed Jun 14, 2023
1 parent 9bc0ce3 commit 6b44315
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "flinch"
version = "0.1.64"
version = "0.1.65"
edition = "2021"
authors = ["Mohammad Julfikar <[email protected]>", "Mohammad Julfikar <[email protected]>"]
categories = ["parser-implementations","caching","database-implementations"]
Expand Down
2 changes: 1 addition & 1 deletion src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl<D> Collection<D>
}

let query = NotificationType::Insert(k.to_string(), v);
let _ = self.watchman.notify(PubSubEvent::Data(query));
let _ = self.watchman.notify(PubSubEvent::Data(query)).await;

Ok(exec.done())
}
Expand Down
39 changes: 37 additions & 2 deletions src/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ use log::{error, LevelFilter, trace};
use simplelog::{ColorChoice, CombinedLogger, Config, TerminalMode, TermLogger, WriteLogger};
use size::{Base, Size};
use sled::Db;
use tokio::sync::mpsc::Sender;

use crate::authenticate::Authenticate;
use crate::doc::QueryBased;
use crate::errors::DbError;
use crate::headers::{DbName, DbUser, FlinchCnf, FlinchError, QueryResult, SessionId};
use crate::headers::{DbName, DbUser, FlinchCnf, FlinchError, PubSubEvent, QueryResult, SessionId};
use crate::persistent::Persistent;
use crate::pri_headers::{FLINCH, INTERNAL_TREE, MAGIC_DB, MAX_DBNAME_LEN, MAX_USERNAME_LEN, MIN_DBNAME_LEN, MIN_PW_LEN, MIN_USERNAME_LEN, PermissionTypes};
use crate::query::Query;
Expand Down Expand Up @@ -52,7 +54,7 @@ impl Schemas {
CombinedLogger::init(
vec![
TermLogger::new(LevelFilter::Info, Config::default(), TerminalMode::Mixed, ColorChoice::Auto),
WriteLogger::new(LevelFilter::Error, Config::default(), writer),
WriteLogger::new(LevelFilter::Warn, Config::default(), writer),
]
).unwrap();
}
Expand Down Expand Up @@ -318,4 +320,37 @@ impl Schemas {

Ok(())
}

pub async fn subscribe(&self, session_id: SessionId, collection_name: &str, sx: Sender<PubSubEvent<String, QueryBased>>) -> anyhow::Result<(), QueryResult> {
let ttk = ExecTime::new();
let user = self.auth.user(session_id.clone());
if user.is_none() {
return Err(QueryResult {
data: vec![],
error: FlinchError::SchemaError(DbError::NoSession),
time_taken: ttk.done(),
});
}
let user = user.unwrap();

let db = self.dbs.get(trim_apos(&user.db).as_str());
if db.is_none() {
return Err(QueryResult {
data: vec![],
error: FlinchError::SchemaError(DbError::DbNotExists(user.clone().db)),
time_taken: ttk.done(),
});
}
let db = db.unwrap();
let db = db.value();
let res = db.subscribe(collection_name,sx).await;
if res.is_err() {
return Err(QueryResult {
data: vec![],
error: res.err().unwrap(),
time_taken: ttk.done(),
});
}
Ok(())
}
}
14 changes: 7 additions & 7 deletions tests/library.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ mod tests {
let instance = instance.unwrap();
let collection = instance.value();

/*let (sx, mut rx) = tokio::sync::mpsc::channel(30000);
collection.sub(sx).await.expect("subscribe to channel");*/
let (sx, mut rx) = tokio::sync::mpsc::channel(30000);
collection.sub(sx).await.expect("subscribe to channel");

let insert = Instant::now();
let record_size = 1000;
let record_size = 7000;
for k in 0..record_size {
let v = serde_json::to_string(
&User {
Expand Down Expand Up @@ -96,16 +96,16 @@ mod tests {
collection.empty().await;
assert_eq!(collection.len(),0,"after::drop");

/*let mut i = 0;
let mut i = 0;
loop {
let event = rx.recv().await.unwrap();
match event {
PubSubEvent::Data(d) => {
match d {
ActionType::Insert(k, _v) => {
NotificationType::Insert(k, _v) => {
println!("inserted :pub/sub: {}",k);
}
ActionType::Remove(k) => {
NotificationType::Remove(k) => {
println!("removed :: {}",k);
}
};
Expand All @@ -118,6 +118,6 @@ mod tests {
if i == 10 { // for demo, listen till 10 message only
break;
}
}*/
}
}
}
33 changes: 30 additions & 3 deletions tests/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
mod tests {
use serde::{Serialize, Deserialize};
use std::path::Path;
use std::sync::Arc;
use flinch::database::CollectionOptions;
use flinch::doc::QueryBased;
use flinch::doc_trait::ViewConfig;
use flinch::headers::{DbUser, FlinchError};
use flinch::headers::{DbUser, FlinchError, NotificationType, PubSubEvent};
use flinch::schemas::Schemas;

const COLLECTION: &str = "demo";
Expand All @@ -16,13 +18,13 @@ mod tests {
}

#[tokio::test]
async fn test() {
async fn schema() {
let _ = std::fs::remove_dir_all(Path::new(".").join("data").as_path());

let schema = Schemas::init().await;
assert!(schema.is_ok());

let schema = schema.unwrap();
let schema = Arc::new(schema.unwrap());

let login = schema.login("root","flinch","*");
assert!(login.is_ok(), "{}", login.err().unwrap());
Expand Down Expand Up @@ -96,6 +98,31 @@ mod tests {
let col_created = schema.flql(format!("new({});",options.as_str()).as_str(),session_id.clone()).await;
assert!(col_created.error.eq(&FlinchError::None),"{:?}",col_created);

let schm = Arc::clone(&schema);
let (sx, mut rx) = tokio::sync::mpsc::channel(300);
let sid = session_id.clone();
let _ = tokio::task::spawn(async move {
let _ = schm.subscribe(sid,COLLECTION,sx).await;
let mut i = 0;
while let Some(ev) = rx.recv().await {
match ev {
PubSubEvent::Data(d) => {
match d {
NotificationType::Insert(k, _v) => {
println!("{:?}",k);
}
NotificationType::Remove(_) => {}
}
}
PubSubEvent::Subscribed(_) => {}
}
i += 1;
if i == 10 {
break;
}
}
});

let record_size = 7402;
for k in 0..record_size {
let v = serde_json::to_string(
Expand Down

0 comments on commit 6b44315

Please sign in to comment.