diff --git a/src/client.rs b/src/client.rs index ea39d10..9c1913f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -5,9 +5,12 @@ use std::net::SocketAddr; use grpcio::{Channel, ChannelBuilder, EnvBuilder, LbPolicy}; -use crate::protos::rpc_grpc::{AuthClient, KvClient, LeaseClient, WatchClient}; +use crate::protos::{ + lock_grpc::LockClient, + rpc_grpc::{AuthClient, KvClient, LeaseClient, WatchClient}, +}; use crate::watch::EtcdWatchResponse; -use crate::{Auth, KeyRange, Kv, Lease, Result, Watch}; +use crate::{Auth, KeyRange, Kv, Lease, Lock, Result, Watch}; /// Config for establishing etcd client. pub struct ClientConfig { @@ -41,11 +44,13 @@ pub struct Inner { watch_client: Watch, /// Lease client for lease operations. lease_client: Lease, + /// Lock client for lock operations. + lock_client: Lock, } impl Client { /// Get grpc channel. - fn get_channel(cfg: &ClientConfig) -> Result { + fn get_channel(cfg: &ClientConfig) -> Channel { // let mut endpoints = Vec::with_capacity(cfg.endpoints.len()); // for e in cfg.endpoints.iter() { // let c = Channel::from_shared(e.to_owned())?; @@ -97,7 +102,7 @@ impl Client { let ch = ChannelBuilder::new(env) .load_balancing_policy(LbPolicy::RoundRobin) .connect(end_points.as_str()); - Ok(ch) + ch } /// Connects to etcd generate auth token. @@ -128,7 +133,7 @@ impl Client { /// Will returns `Err` if failed to contact with given endpoints or authentication failed. #[inline] pub async fn connect(cfg: ClientConfig) -> Result { - let channel = Self::get_channel(&cfg)?; + let channel = Self::get_channel(&cfg); let etcd_watch_client = WatchClient::new(channel.clone()); Ok(Self { @@ -142,7 +147,8 @@ impl Client { cfg.cache_enable, ), watch_client: Watch::new(etcd_watch_client), - lease_client: Lease::new(LeaseClient::new(channel)), + lease_client: Lease::new(LeaseClient::new(channel.clone())), + lock_client: Lock::new(LockClient::new(channel)), }), }) } @@ -161,6 +167,13 @@ impl Client { self.inner.kv_client.clone() } + /// Get a lock client. + #[inline] + #[must_use] + pub fn lock(&self) -> Lock { + self.inner.lock_client.clone() + } + /// Gets a watch client. #[inline] #[must_use] diff --git a/src/lib.rs b/src/lib.rs index 0862581..53502a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -97,6 +97,8 @@ pub use lease::{ EtcdLeaseGrantRequest, EtcdLeaseGrantResponse, EtcdLeaseKeepAliveRequest, EtcdLeaseKeepAliveResponse, EtcdLeaseRevokeRequest, EtcdLeaseRevokeResponse, Lease, }; +pub use lock::Lock; +pub use lock::{EtcdLockRequest, EtcdLockResponse, EtcdUnlockRequest, EtcdUnlockResponse}; pub use response_header::ResponseHeader; pub use utilities::OverflowArithmetic; pub use watch::{EtcdWatchRequest, EtcdWatchResponse, Event, EventType, Watch}; @@ -113,6 +115,8 @@ mod kv; mod lazy; /// Lease mod for lease operations. mod lease; +/// Lock mod for lock operations. +mod lock; /// Etcd client request and response protos mod protos; /// Etcd API response header @@ -129,10 +133,13 @@ mod tests { use anyhow::Context; use async_compat::Compat; use std::collections::HashMap; + use std::time::Duration; + use std::time::SystemTime; use utilities::Cast; const DEFAULT_ETCD_ENDPOINT1_FOR_TEST: &str = "127.0.0.1:2379"; - const DEFAULT_ETCD_ENDPOINT2_FOR_TEST: &str = "127.0.0.1:2380"; + // Should not connect 2380 port, which will cause lock operation error. + //const DEFAULT_ETCD_ENDPOINT2_FOR_TEST: &str = "127.0.0.1:2380"; #[test] fn test_all() -> anyhow::Result<()> { @@ -141,11 +148,59 @@ mod tests { test_transaction() .await .context("test etcd transaction operations")?; + test_lock().await.context("test etcd lock operations")?; Ok::<(), anyhow::Error>(()) }))?; Ok(()) } + async fn test_lock() -> anyhow::Result<()> { + // 1. Lock on "ABC" + let client = build_etcd_client().await?; + let lease_id = client + .lease() + .grant(EtcdLeaseGrantRequest::new(Duration::from_secs(10))) + .await? + .id(); + let lease_id_2 = client + .lease() + .grant(EtcdLeaseGrantRequest::new(Duration::from_secs(10))) + .await? + .id(); + let key_bytes = client + .lock() + .lock(EtcdLockRequest::new("ABC".as_bytes(), lease_id)) + .await? + .take_key(); + + // 2. Wait until the first lock released automatically + let time1 = SystemTime::now(); + let key_bytes2 = client + .lock() + .lock(EtcdLockRequest::new("ABC".as_bytes(), lease_id_2)) + .await? + .take_key(); + let time2 = SystemTime::now(); + + // wait a least 5 seconds (the first lock has a 10s lease) + assert!(time2.duration_since(time1)?.as_secs() > 5); + + let key_slice = key_bytes.as_slice(); + assert_eq!(&key_slice[..3], "ABC".as_bytes()); + + // 3. Release all locks + client + .lock() + .unlock(EtcdUnlockRequest::new(key_bytes)) + .await?; + + client + .lock() + .unlock(EtcdUnlockRequest::new(key_bytes2)) + .await?; + Ok(()) + } + async fn test_transaction() -> anyhow::Result<()> { let client = build_etcd_client().await?; test_compose(&client).await?; @@ -357,7 +412,7 @@ mod tests { let client = Client::connect(ClientConfig { endpoints: vec![ DEFAULT_ETCD_ENDPOINT1_FOR_TEST.to_owned(), - DEFAULT_ETCD_ENDPOINT2_FOR_TEST.to_owned(), + //DEFAULT_ETCD_ENDPOINT2_FOR_TEST.to_owned(), ], auth: None, cache_size: 64, diff --git a/src/lock/mod.rs b/src/lock/mod.rs new file mode 100644 index 0000000..45ddb01 --- /dev/null +++ b/src/lock/mod.rs @@ -0,0 +1,47 @@ +/// The mod of lock release operations +mod release; +/// The mod of lock require operations +mod require; + +use crate::protos::lock_grpc::LockClient; +use crate::Result as Res; +pub use release::{EtcdUnlockRequest, EtcdUnlockResponse}; +pub use require::{EtcdLockRequest, EtcdLockResponse}; + +/// Lock client. +#[derive(Clone)] +pub struct Lock { + /// Etcd Lock client. + client: LockClient, +} + +impl Lock { + /// Creates a new `LockClient`. + /// + /// This method should only be called within etcd client. + pub(crate) const fn new(client: LockClient) -> Self { + Self { client } + } + + /// Performs a lock operation. + /// + /// # Errors + /// + /// Will return `Err` if RPC call is failed. + #[inline] + pub async fn lock(&mut self, req: EtcdLockRequest) -> Res { + let resp = self.client.lock_async(&req.into())?.await?; + Ok(From::from(resp)) + } + + /// Performs a unlock operation. + /// + /// # Errors + /// + /// Will return `Err` if RPC call is failed. + #[inline] + pub async fn unlock(&mut self, req: EtcdUnlockRequest) -> Res { + let resp = self.client.unlock_async(&req.into())?.await?; + Ok(From::from(resp)) + } +} diff --git a/src/lock/release.rs b/src/lock/release.rs new file mode 100644 index 0000000..844962f --- /dev/null +++ b/src/lock/release.rs @@ -0,0 +1,64 @@ +use crate::protos::lock::{UnlockRequest, UnlockResponse}; +use crate::ResponseHeader; + +/// Request for requiring a lock +pub struct EtcdUnlockRequest { + /// Etcd lock request + proto: UnlockRequest, +} + +impl EtcdUnlockRequest { + /// Creates a new `EtcdUnlockRequest` for requiring a lock + #[inline] + pub fn new(key: T) -> Self + where + T: Into>, + { + let lock_request = UnlockRequest { + key: key.into(), + ..UnlockRequest::default() + }; + + Self { + proto: lock_request, + } + } + + /// Get the name from `UnlockRequest` + #[inline] + pub fn get_key(&self) -> Vec { + self.proto.get_key().to_vec() + } +} + +impl Into for EtcdUnlockRequest { + #[inline] + fn into(self) -> UnlockRequest { + self.proto + } +} + +/// Response for requring a lock. +#[derive(Debug)] +pub struct EtcdUnlockResponse { + /// Etcd lock response + proto: UnlockResponse, +} + +impl EtcdUnlockResponse { + /// Takes the header out of response, leaving a `None` in its place. + #[inline] + pub fn take_header(&mut self) -> Option { + match self.proto.header.take() { + Some(header) => Some(From::from(header)), + None => None, + } + } +} + +impl From for EtcdUnlockResponse { + #[inline] + fn from(resp: UnlockResponse) -> Self { + Self { proto: resp } + } +} diff --git a/src/lock/require.rs b/src/lock/require.rs new file mode 100644 index 0000000..a1d4f76 --- /dev/null +++ b/src/lock/require.rs @@ -0,0 +1,78 @@ +use crate::protos::lock::{LockRequest, LockResponse}; +use crate::ResponseHeader; +use utilities::Cast; + +/// Request for requiring a lock +pub struct EtcdLockRequest { + /// Etcd lock request + proto: LockRequest, +} + +impl EtcdLockRequest { + /// Creates a new `EtcdLockRequest` for requiring a lock + #[inline] + pub fn new(name: T, lease: u64) -> Self + where + T: Into>, + { + let lock_request = LockRequest { + name: name.into(), + lease: lease.cast(), + ..LockRequest::default() + }; + + Self { + proto: lock_request, + } + } + + /// Get the name from `LockRequest` + #[inline] + pub fn get_name(&self) -> Vec { + self.proto.get_name().to_vec() + } + + /// Get the name from `LockRequest` + #[inline] + pub fn get_lease(&self) -> u64 { + self.proto.get_lease().cast() + } +} + +impl Into for EtcdLockRequest { + #[inline] + fn into(self) -> LockRequest { + self.proto + } +} + +/// Response for requring a lock. +#[derive(Debug)] +pub struct EtcdLockResponse { + /// Etcd lock response + proto: LockResponse, +} + +impl EtcdLockResponse { + /// Takes the header out of response, leaving a `None` in its place. + #[inline] + pub fn take_header(&mut self) -> Option { + match self.proto.header.take() { + Some(header) => Some(From::from(header)), + None => None, + } + } + + /// Take the key out of response, leaving a empty Vec in its place. + #[inline] + pub fn take_key(&mut self) -> Vec { + self.proto.take_key() + } +} + +impl From for EtcdLockResponse { + #[inline] + fn from(resp: LockResponse) -> Self { + Self { proto: resp } + } +}