You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Suitable for situations with more reading and less writing
use aqueue::RwModel;use std::sync::Arc;use std::time::Instant;use tokio::try_join;#[derive(Default)]structFoo{count:u64,i:i128,}implFoo{pubfnadd(&mutself,x:i32) -> i128{self.count += 1;self.i += x asi128;self.i}fnreset(&mutself){self.count = 0;self.i = 0;}pubfnget(&self) -> i128{self.i}pubfnget_count(&self) -> u64{self.count}}traitFooRunner{asyncfnadd(&self,x:i32) -> i128;asyncfnreset(&self);asyncfnget(&self) -> i128;asyncfnget_count(&self) -> u64;}implFooRunnerforRwModel<Foo>{asyncfnadd(&self,x:i32) -> i128{self.call_mut(|mut inner| asyncmove{ inner.add(x)}).await}asyncfnreset(&self){self.call_mut(|mut inner| asyncmove{ inner.reset()}).await}asyncfnget(&self) -> i128{self.call(|inner| asyncmove{ inner.get()}).await}asyncfnget_count(&self) -> u64{self.call(|inner| asyncmove{ inner.get_count()}).await}}#[tokio::main]asyncfnmain() -> anyhow::Result<()>{{// Single thread testlet tf = RwModel::new(Foo::default());
tf.add(100).await;assert_eq!(100, tf.get().await);
tf.add(-100).await;assert_eq!(0, tf.get().await);
tf.reset().await;let start = Instant::now();for i in0..100000000{
tf.add(i).await;}println!("test rw a count:{} value:{} time:{} qps:{}",
tf.get_count().await,
tf.get().await,
start.elapsed().as_secs_f32(),
tf.get_count().await / start.elapsed().as_millis()asu64*1000);}{//Multithreading testlet tf = Arc::new(RwModel::new(Foo::default()));let start = Instant::now();let a_tf = tf.clone();let a = tokio::spawn(asyncmove{for i in0..25000000{
a_tf.add(i).await;}});let b_tf = tf.clone();let b = tokio::spawn(asyncmove{for i in25000000..50000000{
b_tf.add(i).await;}});let c_tf = tf.clone();let c = tokio::spawn(asyncmove{for i in50000000..75000000{
c_tf.add(i).await;}});let d_tf = tf.clone();let d = tokio::spawn(asyncmove{for i in75000000..100000000{
d_tf.add(i).await;}});try_join!(a, b, c, d)?;println!("test rw b count:{} value:{} time:{} qps:{}",
tf.get_count().await,
tf.get().await,
start.elapsed().as_secs_f32(),
tf.get_count().await / start.elapsed().as_millis()asu64*1000);}Ok(())}
test rw a count:100000000 value:4999999950000000 time:5.1791396 qps:19308000
test rw b count:100000000 value:4999999950000000 time:5.293417 qps:18892000
Example Actor Database
Suitable for high-performance environments
(use Actor Trait and Sqlx Sqlite)
use anyhow::{anyhow,Result};use aqueue::{inner_wait,Actor};use sqlx::sqlite::SqlitePoolOptions;use sqlx::SqlitePool;use std::env;use tokio::task::JoinHandle;#[derive(sqlx::FromRow,Debug)]#[allow(dead_code)]pubstructUser{id:i64,name:String,gold:f64,}pubstructDataBases{auto_id:u32,pool:SqlitePool,}unsafeimplSendforDataBases{}unsafeimplSyncforDataBases{}implDataBases{pubfnnew(sqlite_max_connections:u32) -> Result<Actor<DataBases>>{let pool = SqlitePoolOptions::new().max_connections(sqlite_max_connections).connect_lazy("sqlite://:memory:")?;Ok(Actor::new(DataBases{auto_id:0, pool }))}/// create user table from table.sqlasyncfncreate_table(&self) -> Result<()>{
sqlx::query(r#" CREATE TABLE "user" ( "id" integer NOT NULL PRIMARY KEY, "name" text, "gold" real ); "#).execute(&self.pool).await?;Ok(())}/// insert user dataasyncfninsert_user(&mutself,name:&str,gold:f64) -> Result<bool>{self.auto_id += 1;let row = sqlx::query(r#" insert into `user`(`id`,`name`,`gold`) values(?,?,?) "#,).bind(&self.auto_id).bind(name).bind(gold).execute(&self.pool).await?
.rows_affected();Ok(row == 1)}/// insert user dataasyncfnselect_all_users(&self) -> Result<Vec<User>>{Ok(sqlx::query_as::<_,User>("select * from `user`").fetch_all(&self.pool).await?)}}traitIDatabase{/// create user table from table.sqlasyncfncreate_table(&self) -> Result<()>;/// insert user dataasyncfninsert_user(&self,name:String,gold:f64) -> Result<bool>;/// insert user dataasyncfninsert_user_ref_name(&self,name:&str,gold:f64) -> Result<bool>;/// select all users tableasyncfnselect_all_users(&self) -> Result<Vec<User>>;}implIDatabaseforActor<DataBases>{asyncfncreate_table(&self) -> Result<()>{self.inner_call(|inner| asyncmove{ inner.get().create_table().await}).await}asyncfninsert_user(&self,name:String,gold:f64) -> Result<bool>{self.inner_call(|inner| asyncmove{ inner.get_mut().insert_user(&name, gold).await}).await}asyncfninsert_user_ref_name(&self,name:&str,gold:f64) -> Result<bool>{self.inner_call(|inner| asyncmove{ inner.get_mut().insert_user(name, gold).await}).await}asyncfnselect_all_users(&self) -> Result<Vec<User>>{unsafe{self.deref_inner().select_all_users().await}}}
lazy_static::lazy_static! {/// default global static database actor objstatic ref DB:Actor<DataBases>={DataBases::new(50).expect("install db error")};}#[tokio::main]asyncfnmain() -> Result<()>{DB.create_table().await?;letmut join_vec = Vec::with_capacity(100);// create 100 tokio task run it.for i in0..100{let join:JoinHandle<Result<()>> = tokio::spawn(asyncmove{//each task runs 1000 timesfor j in0..1000{DB.insert_user(i.to_string(), j asf64).await?;}Ok(())});
join_vec.push(join);}//wait all task finishfor join in join_vec {
join.await??;}// print all usersfor user inDB.select_all_users().await? {println!("{:?}", user);}Ok(())}
User { id: 1, name: "0", gold: 0.0 }
User { id: 2, name: "0", gold: 0.0 }
User { id: 3, name: "0", gold: 0.0 }
User { id: 4, name: "10", gold: 0.0 }
User { id: 5, name: "10", gold: 0.0 }
User { id: 6, name: "16", gold: 0.0 }
User { id: 7, name: "10", gold: 0.0 }
...
User { id: 99996, name: "2", gold: 999.0 }
User { id: 99997, name: "8", gold: 999.0 }
User { id: 99998, name: "5", gold: 999.0 }
User { id: 99999, name: "9", gold: 999.0 }
User { id: 100000, name: "10", gold: 999.0 }