diff --git a/Cargo.toml b/Cargo.toml index 31b8839..ee92af4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,5 @@ rayon = "1.7.0" serde = { version="1.0.159", features=["derive"] } serde_json = "1.0.95" chrono = "0.4.24" -lazy_static = "1.4.0" thiserror = "1.0.40" uuid = { version="1.3.1", features = ["v4","fast-rng","macro-diagnostics"] } \ No newline at end of file diff --git a/README.md b/README.md index b3f7154..b035be3 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ -Flinch + [![Rust](https://github.com/mjm918/flinch/actions/workflows/rust.yml/badge.svg)](https://github.com/mjm918/flinch/actions/workflows/rust.yml) -Flinch is an in-memory, real-time document database designed for fast and efficient full-text search. It comes with a built-in full-text search engine that enables both "search-as-you-type" and wildcard search capabilities. Flinch was created with the goal of providing a high-performance search solution that can be integrated into various applications. +Flinch is an in-memory, real-time document database designed for fast, efficient full-text search and querying. It comes with a built-in full-text search engine that enables both "search-as-you-type" and like search capabilities. Flinch was created with the goal of providing a high-performance search solution that can be integrated into various applications. # Features - In-memory database: Flinch stores documents in memory, allowing for ultra-fast search performance. @@ -10,5 +10,43 @@ Flinch is an in-memory, real-time document database designed for fast and effici - Full-text search: Flinch has a built-in full-text search engine that provides powerful search capabilities, including search-as-you-type and wildcard search. - Lightweight and easy to use: Flinch is designed to be lightweight and easy to use, with a simple API that allows developers to quickly integrate it into their applications. - Document-oriented: Flinch is document-oriented, allowing users to store and retrieve documents as JSON objects. -- Highly scalable: Flinch is designed to be highly scalable, allowing users to handle large volumes of documents and queries efficiently. +- Highly scalable: Flinch is designed to be highly scalable, allowing users to handle large volumes of documents and queries efficiently.\ +- Query: Document query faster than ⚡️ - Open source: Flinch is an open-source project, allowing users to contribute to its development and customize it to suit their needs. + +# How to use + +Refer to [lib.rs](src%2Flib.rs) + +# Query Example + + ``` + { + product_status: { gt: 0 }, + cust_code: { like: "Abs-001" }, + salesperson_id: { eq: "19982" }, + uom_price: { + prop: "price", + lt: 10 + } + } + ``` +assume `document` is something like below: + ``` + { + product_code: "", + product_name: "", + product_status: 1 OR 0, + cust_code: [ + "A12390", + "A89900" + ], + salesperson_id: [ + 19928, + 18320 + ], + uom_price:[ + {id: 1, price: 9} + ] + } + ``` \ No newline at end of file diff --git a/assets/flinch.png b/assets/flinch.png index 413bcf0..100d713 100644 Binary files a/assets/flinch.png and b/assets/flinch.png differ diff --git a/src/col.rs b/src/col.rs index 3a7a5ce..4ec4869 100644 --- a/src/col.rs +++ b/src/col.rs @@ -18,6 +18,7 @@ use crate::err::QueryError; use crate::hdrs::{Event, ActionType, SessionRes, QueryResult, QueryType}; use crate::hidx::HashIndex; use crate::ividx::InvertedIndex; +use crate::qry::Query; use crate::range::Range; use crate::sess::Session; use crate::utils::ExecTime; @@ -305,11 +306,24 @@ where K: Serialize self.kv.clear(); } - pub fn query(&self, cmd: &str) -> Result>, QueryError> { + pub fn query(&self, cmd: &str) -> Result>, QueryError> { + let exec = ExecTime::new(); + let qry = Query::new(cmd); + if qry.is_err() { + return Err(qry.err().unwrap()); + } + let qry = qry.unwrap(); let mut res = SegQueue::new(); self.iter().for_each(|it|{ - let chk = it.value().document().pointer(cmd); + let is_ok = qry.filter(it.value()); + if is_ok { + res.push((it.key().clone(), it.value().clone())); + } }); - Ok(res) + Ok(QueryResult { + query: QueryType::Query(cmd.to_string()), + data: res, + time_taken: exec.done(), + }) } } diff --git a/src/ividx.rs b/src/ividx.rs index cc434f9..bd05b4a 100644 --- a/src/ividx.rs +++ b/src/ividx.rs @@ -8,6 +8,7 @@ use serde::de::DeserializeOwned; use serde::Serialize; use tokio::io::AsyncReadExt; use tokio::task::JoinHandle; +use crate::utils::tokenize; pub struct InvertedIndex { pub kv: Arc>> @@ -35,7 +36,8 @@ impl InvertedIndex pub fn put(&self, k: K, v: String) -> JoinHandle<()> { let rf = self.kv.clone(); tokio::spawn(async move { - for w in v.split_whitespace() { + let separated = tokenize(&v); + for w in separated { let token = w.to_lowercase(); let key = &k; match rf.get_mut(&token) { diff --git a/src/lib.rs b/src/lib.rs index bbb9306..dbdec70 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,14 +13,23 @@ pub mod db; mod qry; mod utils; +/// +/// Examples +/// 1M records +/// insert:: 61.118093292s +/// single:: 3.25µs +/// multi:: 6.458µs +/// search index:: 27.209µs result count 1 +/// search:: 2.494042417s result count 402130 +/// view:: 45.084µs result count 1 +/// query:: 438.283791ms result count 10 with query {"age":{"$lt":10}} +/// #[cfg(test)] mod tests { use std::fmt::format; use std::sync::Arc; use std::time::{Instant}; - use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; - use tokio::sync::mpsc::channel; use crate::doc::{Document, FromRawString, ViewConfig}; use crate::db::{Database, CollectionOptions}; use crate::hdrs::{Event, ActionType}; @@ -54,8 +63,11 @@ mod tests { let collection = instance.value(); assert_eq!(collection.len(),0); + let (sx, mut rx) = tokio::sync::mpsc::channel(30000); + collection.sub(sx).await.expect("subscribe to channel"); + let insert = Instant::now(); - let record_size = 100_000; + let record_size = 10_000; for i in 0..record_size { collection.put(format!("P_{}",&i), FromRawString::new( serde_json::to_string( @@ -73,20 +85,49 @@ mod tests { assert!(single.data.is_some()); println!("single:: {:?}", single.time_taken); - let multi = collection.multi_get(vec![&format!("P_100"),&format!("P_1999")]); - assert_eq!(multi.data.len(),2); + let multi = collection.multi_get(vec![&format!("P_1"),&format!("P_0")]); + assert_ne!(multi.data.len(),0); println!("multi:: {:?}",multi.time_taken); - let search = collection.search("Julfikar9999"); + let search = collection.search("Julfikar0"); assert_ne!(search.data.len(),0); - println!("search:: {} res {}",search.time_taken, search.data.len()); + println!("search index:: {} res {}",search.time_taken, search.data.len()); - let like_search = collection.like_search("Julfikar 99 11"); + let like_search = collection.like_search("Julfikar 0"); assert_ne!(like_search.data.len(),0); println!("search:: {} res {}",like_search.time_taken, like_search.data.len()); let view = collection.fetch_view("ADULT"); assert_ne!(view.data.len(),0); println!("view:: {} res {}",view.time_taken, view.data.len()); + + let query = collection.query(r#"{"age":{"$lt":10}}"#); + assert!(query.is_ok()); + let query = query.unwrap(); + println!("query:: {} {}", query.time_taken,query.data.len()); + + let mut i = 0; + loop { + let event = rx.recv().await.unwrap(); + match event { + Event::Data(d) => { + match d { + ActionType::Insert(k, v) => { + println!("inserted :watcher: {}",k) + } + ActionType::Remove(k) => { + + } + }; + } + Event::Subscribed(s) => { + + } + }; + i += 1; + if i == 10 { // for demo, listen till 10 message only + break; + } + } } } diff --git a/src/qry.rs b/src/qry.rs index 8e5a9cf..628ae45 100644 --- a/src/qry.rs +++ b/src/qry.rs @@ -1,5 +1,7 @@ +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; use anyhow::{Result}; -use dashmap::DashMap; +use rayon::prelude::*; use serde::de::DeserializeOwned; use serde::Serialize; use serde_json::{Value, map::Map}; @@ -7,16 +9,36 @@ use crate::doc::Document; use crate::err::QueryError; pub struct Query { - cmd: DashMap> + stmt: HashMap } +#[derive(PartialEq)] +enum DataTypes { + String, + Number, +} +struct CompareFactors { + lt: Option, + gt: Option, + eq: Option, + like: Option, + prop: Option, + data_type: DataTypes, +} + +const LT: &str = "$lt"; +const GT: &str = "$gt"; +const EQ: &str = "$eq"; +const LIKE: &str = "$like"; +const PROP: &str = "$prop"; + impl Query { pub fn new(cmd: &str) -> Result { let json: serde_json::error::Result = serde_json::from_str(cmd); if json.is_ok() { let json = json.unwrap(); return if json.is_object() { - let mut cmd = DashMap::new(); + let mut cmd = Map::new(); let jobj = json.as_object().unwrap(); for kv in jobj { if !kv.1.is_object() { @@ -25,9 +47,13 @@ impl Query { if cmd.contains_key(kv.0.as_str()) { cmd.remove(kv.0.as_str()); } - cmd.insert(kv.0.clone(), kv.1.as_object().unwrap().clone()); + cmd.insert(kv.0.clone(), kv.1.clone()); + } + let mut stmt = HashMap::new(); + for kv in cmd { + stmt.insert(kv.0, kv.1); } - Ok(Self { cmd }) + Ok(Self { stmt }) } else { Err(QueryError::QueryIsNotObject) } @@ -35,10 +61,247 @@ impl Query { Err(QueryError::ParseError(json.err().unwrap().to_string())) } - pub fn filter(&self, value: D) + pub fn filter(&self, row: &D) -> bool where D: Serialize + DeserializeOwned + Clone + Send + Sync + Document + 'static { - let obj = value.document(); + let doc = row.object(); + let mut is_ok_count = 0; + let mut filter_count = 0; + for (key, value) in &self.stmt { + let mut is_ok = false; + if doc.contains_key(key.as_str()) && value.is_object() { + let dval = doc.get(key.as_str()).unwrap(); + let fmap = value.as_object().unwrap(); + if dval.is_object() { + is_ok = self.compare_by_filter_obj(&fmap, &doc); + if is_ok { + is_ok_count = is_ok_count + 1; + } + } + if dval.is_string() { + let as_str = dval.as_str().unwrap(); + is_ok = self.compare_by_filter_str(&fmap, as_str); + if is_ok { + is_ok_count = is_ok_count + 1; + } + } + if dval.is_array() { + let as_array = dval.as_array().unwrap(); + is_ok = self.compare_by_filter_array(&fmap, as_array); + if is_ok { + is_ok_count = is_ok_count + 1; + } + } + if dval.is_f64() { + let as_f64 = dval.as_f64().unwrap(); + is_ok = self.compare_by_filter_f64(&fmap, &as_f64); + if is_ok { + is_ok_count = is_ok_count + 1; + } + } + if dval.is_i64() { + let as_i64 = dval.as_i64().unwrap(); + is_ok = self.compare_by_filter_i64(&fmap, &as_i64); + if is_ok { + is_ok_count = is_ok_count + 1; + } + } + } + filter_count = filter_count + 1; + } + filter_count == is_ok_count + } + fn factors(&self, fv: &Map) -> CompareFactors { + let mut data_type = DataTypes::String; + let mut cf = CompareFactors { + lt: if fv.contains_key(LT) { + Some(fv.contains_key(LT)) + } else { + None + }, + gt: if fv.contains_key(GT) { + Some(fv.contains_key(GT)) + } else { + None + }, + eq: if fv.contains_key(EQ) { + Some(fv.contains_key(EQ)) + } else { + None + }, + like: if fv.contains_key(LIKE) { + Some(fv.contains_key(LIKE)) + } else { + None + }, + prop: if fv.contains_key(PROP) { + Some(fv.get(PROP).unwrap().to_string()) + } else { + None + }, + data_type: DataTypes::String, + }; + if cf.lt.is_some() { + let v = fv.get(LT).unwrap(); + data_type = if v.is_f64() || v.is_i64() { + DataTypes::Number + } else { + DataTypes::String + }; + } + if cf.gt.is_some() { + let v = fv.get(GT).unwrap(); + data_type = if v.is_f64() || v.is_i64() { + DataTypes::Number + } else { + DataTypes::String + }; + } + if cf.eq.is_some() { + let v = fv.get(EQ).unwrap(); + data_type = if v.is_f64() || v.is_i64() { + DataTypes::Number + } else { + DataTypes::String + }; + } + if cf.like.is_some() { + let v = fv.get(LIKE).unwrap(); + data_type = if v.is_f64() || v.is_i64() { + DataTypes::Number + } else { + DataTypes::String + }; + } + cf.data_type = data_type; + cf + } + + fn compare_by_filter_i64(&self, fv: &Map, value: &i64) -> bool { + let comparator = self.factors(fv); + let mut is_ok = false; + if (comparator.data_type == DataTypes::String || comparator.data_type == DataTypes::Number) + && comparator.eq.is_some() + { + let fv = fv.get(EQ).unwrap(); + if fv.is_i64() && value == &fv.as_i64().unwrap() { + is_ok = true; + } + } + if comparator.data_type == DataTypes::Number && comparator.lt.is_some() { + let fv = fv.get(LT).unwrap(); + if fv.is_i64() && value < &fv.as_i64().unwrap() { + is_ok = true; + } + } + if comparator.data_type == DataTypes::Number && comparator.gt.is_some() { + let fv = fv.get(GT).unwrap(); + if fv.is_i64() && value > &fv.as_i64().unwrap() { + is_ok = true; + } + } + is_ok + } + + fn compare_by_filter_f64(&self, fv: &Map, value: &f64) -> bool { + let comparator = self.factors(fv); + let mut is_ok = false; + if comparator.data_type == DataTypes::String && comparator.eq.is_some() { + let fv = fv.get(EQ).unwrap(); + if fv.is_f64() && value == &fv.as_f64().unwrap() { + is_ok = true; + } + } + if comparator.data_type == DataTypes::String && comparator.lt.is_some() { + let fv = fv.get(LT).unwrap(); + if fv.is_f64() && value < &fv.as_f64().unwrap() { + is_ok = true; + } + } + if comparator.data_type == DataTypes::String && comparator.gt.is_some() { + let fv = fv.get(GT).unwrap(); + if fv.is_f64() && value > &fv.as_f64().unwrap() { + is_ok = true; + } + } + is_ok + } + + fn compare_by_filter_array(&self, fv: &Map, value: &Vec) -> bool { + let mut is_ok = false; + for obj in value { + if obj.is_string() && self.compare_by_filter_str(fv, obj.as_str().unwrap()) { + is_ok = true; + } + if obj.is_i64() && self.compare_by_filter_i64(fv, &obj.as_i64().unwrap()) { + is_ok = true; + } + if obj.is_f64() && self.compare_by_filter_f64(fv, &obj.as_f64().unwrap()) { + is_ok = true; + } + if obj.is_object() { + let mut hmp: Map = Map::new(); + for (key, value) in obj.as_object().unwrap() { + hmp.insert(key.clone(), value.clone()); + } + if self.compare_by_filter_obj(fv, &hmp) { + is_ok = true; + } + } + } + is_ok + } + + fn compare_by_filter_str(&self, fv: &Map, value: &str) -> bool { + let comparator = self.factors(fv); + let mut is_ok = false; + if comparator.data_type == DataTypes::String && comparator.eq.is_some() && !value.is_empty() + { + let fv = fv.get(EQ).unwrap(); + if fv.is_string() && value.eq_ignore_ascii_case(fv.as_str().unwrap()) { + is_ok = true; + } + } + if comparator.data_type == DataTypes::String + && comparator.like.is_some() + && !value.is_empty() + { + let fv = fv.get(LIKE).unwrap(); + if fv.is_string() && value.to_string().contains(&fv.as_str().unwrap()) { + is_ok = true; + } + } + is_ok + } + + fn compare_by_filter_obj(&self, fv: &Map, value: &Map) -> bool { + let comparator = self.factors(fv); + if comparator.prop.is_some() { + let mut is_ok = false; + let val: Option<&Value> = value.get(comparator.prop.unwrap().as_str()); + if comparator.data_type == DataTypes::String + && val.is_some() + && val.unwrap().is_string() + { + is_ok = self.compare_by_filter_str(fv, &val.unwrap().as_str().unwrap()); + } + if comparator.data_type == DataTypes::Number + && val.is_some() + && val.unwrap().is_number() + { + if val.unwrap().is_i64() { + is_ok = + self.compare_by_filter_i64(fv, &val.unwrap().as_i64().unwrap()); + } + if val.unwrap().is_f64() { + is_ok = + self.compare_by_filter_f64(fv, &val.unwrap().as_f64().unwrap()); + } + } + is_ok + } else { + false + } } } \ No newline at end of file diff --git a/src/utils.rs b/src/utils.rs index 7ba4131..a077676 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -22,4 +22,24 @@ pub fn set_view_name(name: &str) -> String { } pub fn get_view_name(name: &str) -> String { name.replace(":view:","") +} +pub fn tokenize(query: &String) -> Vec { + query + .clone() + .trim() + .to_lowercase() + .replace("(", " ") + .replace(")", " ") + .replace("+", " ") + .replace("-", " ") + .replace("/", " ") + .replace("\\", " ") + .replace("_", " ") + .replace("[", " ") + .replace("]", " ") + .split_whitespace() + .into_iter() + .filter(|&char| !char.is_empty()) + .map(|char| char.to_string()) + .collect::>() } \ No newline at end of file diff --git a/src/wtch.rs b/src/wtch.rs index 947ef75..0dc4199 100644 --- a/src/wtch.rs +++ b/src/wtch.rs @@ -69,9 +69,14 @@ impl Watchman where M: Clone + Send + 'static { } async fn broadcast(&mut self, msg: M) { - for index in 0..=(self.chans.len() - 2) { - let msg = msg.clone(); - let _ = self.chans[index].send(msg).await; + if self.chans.len() == 0 { + return; + } + if self.chans.len() > 1 { + for index in 0..=(self.chans.len() - 2) { + let msg = msg.clone(); + let _ = self.chans[index].send(msg).await; + } } let _ = self.chans[self.chans.len() - 1].send(msg).await; }