Skip to content

Commit

Permalink
change examples
Browse files Browse the repository at this point in the history
  • Loading branch information
mond77 authored and GTwhy committed Mar 4, 2023
1 parent d5656e3 commit d445d69
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 50 deletions.
52 changes: 26 additions & 26 deletions examples/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ use tokio::net::ToSocketAddrs;

#[derive(Clone, Debug)]
enum Request {
Echo { msg: String },
Add { arg1: u32, arg2: u32 },
Sync,
}

#[derive(Clone, Debug)]
enum Response {
Echo { msg: String },
Add { res: u32 },
Sync,
}

Expand Down Expand Up @@ -108,25 +108,25 @@ impl Server {
.unwrap();
}
}
/// Process `Echo` rpc
fn echo(msg: String) -> Response {
Response::Echo { msg }
/// Process `Add` rpc
fn add(arg1: &u32, arg2: &u32) -> Response {
Response::Add { res: *arg1 + *arg2 }
}

/// Process request and return the response
fn process_request(req: &Request) -> Response {
match req {
Request::Echo { msg } => Self::echo(msg.to_string()),
Request::Add { arg1, arg2 } => Self::add(arg1, arg2),
Request::Sync => Response::Sync,
}
}
}

fn transmute_lmr_to_string(lmr: &LocalMr) -> String {
fn transmute_lmr_to_response(lmr: &LocalMr) -> Response {
unsafe {
let resp = &*(*lmr.as_ptr() as *const Response);
match resp {
Response::Echo { msg } => msg.to_string(),
Response::Add { res } => Response::Add { res: *res },
_ => panic!("invalid input : {:?}", resp),
}
}
Expand All @@ -148,20 +148,20 @@ impl Client {
Client { rdma_stub }
}

/// Echo rpc request method powered by rdma 'send' and 'receive'
/// Add rpc request method powered by rdma 'send' and 'receive'
///
/// Send 'msg' to rpc server and then receive the same content from server.
/// Send 'args' to rpc server and then receive the result from server.
///
/// Show the usage of rdma 'send' and 'receive'
async fn echo_req_sr(&self, msg: String) -> String {
async fn handle_req_sr(&self, req: Request) -> Response {
// allocate a local memory region according to the `layout`
let mut lmr_req = self
.rdma_stub
.alloc_local_mr(Layout::new::<Request>())
.map_err(|err| println!("{}", &err))
.unwrap();
//write data to lmr
unsafe { *(*lmr_req.as_mut_ptr() as *mut Request) = Request::Echo { msg } };
unsafe { *(*lmr_req.as_mut_ptr() as *mut Request) = req };
// send request to server by rdma `send`
self.rdma_stub
.send(&lmr_req)
Expand All @@ -172,28 +172,28 @@ impl Client {
self.rdma_stub
.receive()
.await
.map(|lmr_resp| transmute_lmr_to_string(&lmr_resp))
.map(|lmr_resp| transmute_lmr_to_response(&lmr_resp))
.map_err(|err| println!("{}", &err))
.unwrap()
}

/// Echo rpc request method powered by rdma 'read' and 'write'
/// Add rpc request method powered by rdma 'read' and 'write'
///
/// Send 'msg' to rpc server and then receive the same content from server.
/// Send 'args' to rpc server and then receive the result from server.
///
/// Show the usage of rdma 'read' and 'write'
///
/// Server can't aware of rdma `read` or `write`, so we need sync with server
/// Before `read` and after `write`.
async fn echo_req_wr(&self, msg: String) -> String {
async fn handle_req_wr(&self, req: Request) -> Response {
// allocate a local memory region according to the `layout`
let mut lmr_req = self
.rdma_stub
.alloc_local_mr(Layout::new::<Request>())
.map_err(|err| println!("{}", &err))
.unwrap();
// put data into lmr
unsafe { *(*lmr_req.as_mut_ptr() as *mut Request) = Request::Echo { msg } };
unsafe { *(*lmr_req.as_mut_ptr() as *mut Request) = req };
// request a remote mr located in the server
let mut rmr_req = self
.rdma_stub
Expand Down Expand Up @@ -235,7 +235,7 @@ impl Client {
// convert memory region to 'String' or anything else you need in other scenarios.
.map_err(|err| println!("{}", &err))
.unwrap();
transmute_lmr_to_string(&lmr_resp)
transmute_lmr_to_response(&lmr_resp)
}

/// Server can't aware of rdma `read` or `write`, so we need sync with server
Expand Down Expand Up @@ -265,13 +265,13 @@ async fn main() {
println!("rpc server started");
//sleep for a second to wait for the server to start
tokio::time::sleep(Duration::new(1, 0)).await;
let msg_hello = String::from("hello");
let msg_world = String::from("world");
let request = Request::Add { arg1: 1, arg2: 1 };
let client = Client::new("localhost:5555").await;
println!("request: {}", msg_hello);
let res = client.echo_req_sr(msg_hello).await;
println!("response: {}", res);
println!("request: {}", msg_world);
let res = client.echo_req_wr(msg_world).await;
println!("response: {}", res);
println!("request: {:?}", request);
let res = client.handle_req_sr(request).await;
println!("response: {:?}", res);
let request = Request::Add { arg1: 2, arg2: 2 };
println!("request: {:?}", request);
let res = client.handle_req_wr(request).await;
println!("response: {:?}", res);
}
34 changes: 17 additions & 17 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1648,7 +1648,7 @@ impl Rdma {
/// *(*(*lmr.as_ptr() as *const Data)).0
/// )
/// };
/// // wait for the agent thread to send all reponses to the remote.
/// // wait for the agent thread to send all responses to the remote.
/// tokio::time::sleep(Duration::from_secs(1)).await;
/// Ok(())
/// }
Expand Down Expand Up @@ -1784,7 +1784,7 @@ impl Rdma {
/// let lmr = rdma.receive_raw(Layout::for_value(&RAW_DATA)).await?;
/// // read data from mr
/// assert_eq!(*lmr.as_slice(), RAW_DATA);
/// // wait for the agent thread to send all reponses to the remote.
/// // wait for the agent thread to send all responses to the remote.
/// tokio::time::sleep(Duration::from_secs(1)).await;
/// Ok(())
/// }
Expand Down Expand Up @@ -1849,7 +1849,7 @@ impl Rdma {
/// // read data from mr
/// assert_eq!(*lmr.as_slice(), RAW_DATA);
/// assert_eq!(imm, Some(IMM));
/// // wait for the agent thread to send all reponses to the remote.
/// // wait for the agent thread to send all responses to the remote.
/// tokio::time::sleep(Duration::from_secs(1)).await;
/// Ok(())
/// }
Expand Down Expand Up @@ -1924,7 +1924,7 @@ impl Rdma {
/// // compared to the above, using `receive` is a better choice.
/// let lmr = rdma.receive().await?;
/// unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
/// // wait for the agent thread to send all reponses to the remote.
/// // wait for the agent thread to send all responses to the remote.
/// tokio::time::sleep(Duration::from_secs(1)).await;
/// Ok(())
/// }
Expand Down Expand Up @@ -1993,7 +1993,7 @@ impl Rdma {
/// *(*(*lmr.as_ptr() as *const Data)).0
/// )
/// };
/// // wait for the agent thread to send all reponses to the remote.
/// // wait for the agent thread to send all responses to the remote.
/// tokio::time::sleep(Duration::from_secs(1)).await;
/// Ok(())
/// }
Expand Down Expand Up @@ -2051,7 +2051,7 @@ impl Rdma {
/// let lmr = rdma.receive_raw(Layout::for_value(&RAW_DATA)).await?;
/// // read data from mr
/// assert_eq!(*lmr.as_slice(), RAW_DATA);
/// // wait for the agent thread to send all reponses to the remote.
/// // wait for the agent thread to send all responses to the remote.
/// tokio::time::sleep(Duration::from_secs(1)).await;
/// Ok(())
/// }
Expand Down Expand Up @@ -2115,7 +2115,7 @@ impl Rdma {
/// // read data from mr
/// assert_eq!(*lmr.as_slice(), RAW_DATA);
/// assert_eq!(imm, Some(IMM));
/// // wait for the agent thread to send all reponses to the remote.
/// // wait for the agent thread to send all responses to the remote.
/// tokio::time::sleep(Duration::from_secs(1)).await;
/// Ok(())
/// }
Expand Down Expand Up @@ -2182,7 +2182,7 @@ impl Rdma {
/// .await?;
/// // read data from mr
/// assert_eq!(*lmr.as_slice(), RAW_DATA);
/// // wait for the agent thread to send all reponses to the remote.
/// // wait for the agent thread to send all responses to the remote.
/// tokio::time::sleep(Duration::from_secs(1)).await;
/// Ok(())
/// }
Expand Down Expand Up @@ -2257,7 +2257,7 @@ impl Rdma {
/// // read data from mr
/// assert_eq!(*lmr.as_slice(), RAW_DATA);
/// assert_eq!(imm, Some(IMM));
/// // wait for the agent thread to send all reponses to the remote.
/// // wait for the agent thread to send all responses to the remote.
/// tokio::time::sleep(Duration::from_secs(1)).await;
/// Ok(())
/// }
Expand Down Expand Up @@ -2341,7 +2341,7 @@ impl Rdma {
/// // compared to the above, using `receive` is a better choice.
/// let lmr = rdma.receive().await?;
/// unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
/// // wait for the agent thread to send all reponses to the remote.
/// // wait for the agent thread to send all responses to the remote.
/// tokio::time::sleep(Duration::from_secs(1)).await;
/// Ok(())
/// }
Expand Down Expand Up @@ -2410,7 +2410,7 @@ impl Rdma {
/// let lmr = rdma.receive_local_mr().await?;
/// // assert the content of lmr, which was `write` by client
/// unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
/// // wait for the agent thread to send all reponses to the remote.
/// // wait for the agent thread to send all responses to the remote.
/// tokio::time::sleep(Duration::from_secs(1)).await;
/// Ok(())
/// }
Expand Down Expand Up @@ -2620,7 +2620,7 @@ impl Rdma {
/// let lmr = rdma.receive_local_mr().await?;
/// // assert the content of lmr, which was `write` by client
/// unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
/// // wait for the agent thread to send all reponses to the remote.
/// // wait for the agent thread to send all responses to the remote.
/// tokio::time::sleep(Duration::from_secs(1)).await;
/// Ok(())
/// }
Expand Down Expand Up @@ -3195,7 +3195,7 @@ impl Rdma {
/// // receive the metadata of rmr sent by client
/// let _rmr = rdma.receive_remote_mr().await?;
/// // do something with lmr like getting data from it.
/// // wait for the agent thread to send all reponses to the remote.
/// // wait for the agent thread to send all responses to the remote.
/// tokio::time::sleep(Duration::from_secs(1)).await;
/// Ok(())
/// }
Expand Down Expand Up @@ -3258,7 +3258,7 @@ impl Rdma {
/// let rmr = rdma.receive_remote_mr().await?;
/// assert!(!rmr.timeout_check());
/// // do something with lmr like getting data from it.
/// // wait for the agent thread to send all reponses to the remote.
/// // wait for the agent thread to send all responses to the remote.
/// tokio::time::sleep(Duration::from_secs(1)).await;
/// Ok(())
/// }
Expand Down Expand Up @@ -3329,7 +3329,7 @@ impl Rdma {
/// // receive the metadata of the lmr that had been requested by client
/// let _lmr = rdma.receive_local_mr().await?;
/// // do something with lmr like getting data from it.
/// // wait for the agent thread to send all reponses to the remote.
/// // wait for the agent thread to send all responses to the remote.
/// tokio::time::sleep(Duration::from_secs(1)).await;
/// Ok(())
/// }
Expand Down Expand Up @@ -3412,7 +3412,7 @@ impl Rdma {
/// *(*(*lmr.as_ptr() as *const Data)).0
/// )
/// };
/// // wait for the agent thread to send all reponses to the remote.
/// // wait for the agent thread to send all responses to the remote.
/// tokio::time::sleep(Duration::from_secs(1)).await;
/// Ok(())
/// }
Expand Down Expand Up @@ -3589,7 +3589,7 @@ impl Rdma {
/// let new_rdma = rdma.listen().await?;
/// // receive the metadata of the lmr that had been requested by client
/// let _lmr = new_rdma.receive_local_mr().await?;
/// // wait for the agent thread to send all reponses to the remote.
/// // wait for the agent thread to send all responses to the remote.
/// tokio::time::sleep(Duration::from_secs(1)).await;
/// Ok(())
/// }
Expand Down
4 changes: 2 additions & 2 deletions tests/imm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mod send_with_imm {
// compared to the above, using `receive` is a better choice.
let lmr = rdma.receive().await?;
unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
// wait for the agent thread to send all reponses to the remote.
// wait for the agent thread to send all responses to the remote.
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(())
}
Expand Down Expand Up @@ -72,7 +72,7 @@ mod write_with_imm {
let lmr = rdma.receive_local_mr().await?;
// assert the content of lmr, which was `write` by client
unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
// wait for the agent thread to send all reponses to the remote.
// wait for the agent thread to send all responses to the remote.
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion tests/loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async fn server(rdma: Rdma) -> io::Result<()> {
for handle in handles {
handle.await.unwrap();
}
// wait for the agent thread to send all reponses to the remote.
// wait for the agent thread to send all responses to the remote.
tokio::time::sleep(Duration::from_secs(3)).await;
Ok(())
}
Expand Down
8 changes: 4 additions & 4 deletions tests/mr_slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ mod remote_mr_slice {
assert_eq!(s3.addr(), rmr.addr() + s3_start);
assert_eq!(s4.length(), 1);
assert_eq!(s4.addr(), rmr.addr() + s4_pos);
// wait for the agent thread to send all reponses to the remote.
// wait for the agent thread to send all responses to the remote.
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(())
}
Expand All @@ -156,7 +156,7 @@ mod remote_mr_slice_overbound {

async fn client(rdma: Rdma) -> io::Result<()> {
let rmr = rdma.receive_remote_mr().await.unwrap();
// wait for the agent thread to send all reponses to the remote.
// wait for the agent thread to send all responses to the remote.
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(rmr.length(), LEN);
#[allow(clippy::reversed_empty_ranges)]
Expand All @@ -180,7 +180,7 @@ mod remote_mr_slice_overbound {

async fn client(rdma: Rdma) -> io::Result<()> {
let rmr = rdma.receive_remote_mr().await.unwrap();
// wait for the agent thread to send all reponses to the remote.
// wait for the agent thread to send all responses to the remote.
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(rmr.length(), LEN);
let _s1 = rmr.get(0..0).unwrap();
Expand All @@ -203,7 +203,7 @@ mod remote_mr_slice_overbound {

async fn client(rdma: Rdma) -> io::Result<()> {
let rmr = rdma.receive_remote_mr().await.unwrap();
// wait for the agent thread to send all reponses to the remote.
// wait for the agent thread to send all responses to the remote.
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(rmr.length(), LEN);
let _s1 = rmr.get(0..LEN + 1).unwrap();
Expand Down

0 comments on commit d445d69

Please sign in to comment.