diff --git a/README.md b/README.md index fe306b6..3ed78fa 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,9 @@ let mut res = kv::read(&client, "mykey", None).await.unwrap(); // the response back into a UTF-8 encoded string. let mykey: String = res.response.pop().unwrap().value.unwrap().try_into().unwrap(); +// In most cases, it's easier to just read the raw value +let mykey = std::str::from_utf8(&kv::read_raw(&client, "mykey", None).unwrap()).unwrap() + assert_eq!(mykey, "myvalue".to_string()); ``` diff --git a/examples/election.rs b/examples/election.rs index a30005a..ecd96af 100644 --- a/examples/election.rs +++ b/examples/election.rs @@ -6,22 +6,121 @@ use consulrs::{ api::{ features::Blocking, kv::requests::{ReadKeyRequest, SetKeyRequest}, - Features, + ApiResponse, Features, }, + client::ConsulClient, + error::ClientError, kv, session, }; use futures::future::*; use serde::{Deserialize, Serialize}; use tokio::time::{sleep, Duration}; +/// Represents a service in our fictitious example. +struct Node { + pub client: ConsulClient, + pub info: NodeInfo, + pub key: String, + pub session: Option, +} + /// The data structure stored in the `lead` key. In this case only the node name -// is being stored so followers can see who the leader is - more advanced usage -// would put all relevant information here (i.e. the lead's IP address). +/// is being stored so followers can see who the leader is - more advanced usage +/// would put all relevant information here (i.e. the lead's IP address). #[derive(Debug, Deserialize, Serialize)] -struct Node { +struct NodeInfo { pub name: String, } +impl Node { + /// Returns a new [Node]. + pub fn new(client: ConsulClient, name: String, key: String) -> Self { + Node { + client, + info: NodeInfo { name }, + key, + session: None, + } + } + + /// Attempts to acquire leadership for this node. + /// + /// Leadership is gained by passsing the `acquire` parameter when writing to + /// the leader key. The write will only be successful if the key has not + /// already been acquired by another entity. If successful, `true` is + /// returned and the this node's `info` is serialized and written to the + /// key's value. + pub async fn acquire(&self) -> Result { + Ok(kv::set_json( + &self.client, + &self.key, + &self.info, + Some(SetKeyRequest::builder().acquire(self.session.as_ref().unwrap())), + ) + .await? + .response) + } + + /// Reads the current leader information from the leader key. + pub async fn read_leader(&self) -> Result, ClientError> { + kv::read_json_raw::(&self.client, &self.key, None).await + } + + /// If this node is the leader, releases the leader key to allow other nodes + /// to attempt to acquire it. + /// + /// Releasing is done by passing the `release` parameter when writing to the + /// leader key. The value of the leader key is also cleared out to match the + /// fact that there is now no elected leader. + pub async fn release(&self) -> Result { + Ok(kv::set( + &self.client, + &self.key, + b"", + Some(SetKeyRequest::builder().release(self.session.as_ref().unwrap())), + ) + .await? + .response) + } + + /// Acquires a new session for this node. + /// + /// Sessions can be roughly considered equal to locks. Each node must hold + /// a unique session which it uses in order to acquire and release locks on + /// the leader key. + pub async fn session(&mut self) -> Result<(), ClientError> { + self.session = Some(session::create(&self.client, None).await?.response.id); + Ok(()) + } + + /// Watches the leader key for changes or until the timeout is reached. + /// + /// Blocking is accomplished by passing a modify index along with the read + /// request to the leader key. The modify index must match the last + /// index change, otherwise this request will return immediately. The HTTP + /// request will hang until a change in the key is detected or the given + /// timeout is reached. + pub async fn watch(&self, index: u64, timeout: &str) { + kv::read( + &self.client, + &self.key, + Some( + ReadKeyRequest::builder().features( + Features::builder() + .blocking(Blocking { + index: index, + wait: Some(timeout.into()), + }) + .build() + .unwrap(), + ), + ), + ) + .await + .unwrap(); + } +} + fn main() { // Setup a test environment. The `test.run()` method is responsible for // bringing up a Consul Docker container to use in the example. @@ -36,125 +135,87 @@ fn main() { // become the leader when it initializes. let a = (1..4) .map(|i| { + // This is the key we will use for handling elections. + const LEADER_KEY: &str = "leader"; + let server = &server; async move { - let node = Node { - name: format!("Node {}", i), - }; - println!("{}: starting up", node.name); - // The `server.client()` method is just a simple wrapper // that returns a `ConsulClient` already configured to talk // to our Docker container. - let client = server.client(); + let mut node = Node::new( + server.client(), + format!("Node {}", i), + LEADER_KEY.to_string(), + ); + println!("{}: starting up", node.info.name); // A session is roughly equivalent to a lock. Each node // must have its own lock in order to uniquely hold the // `leader` key. - let session = session::create(&client, None).await.unwrap().response.id; - println!("{}: created session {}", node.name, session); - - // Here is where election happens. By specifying the - // `acquire` parameter with our write request we ask Consul - // to attempt to lock this key for our unique session. The - // actual content we're writing is the `Node` serialized - // into a JSON string. - let res = kv::set_json( - &client, - "lead", - &node, - Some(SetKeyRequest::builder().acquire(&session)), - ) - .await - .unwrap(); + node.session().await.unwrap(); + println!( + "{}: created session {}", + node.info.name, + node.session.as_ref().unwrap() + ); + + // Here is where election happens. The `acquire()` method + // will attempt to acquire a lock on the leader key and + // return a `bool` reflecting the status of the attempt. A + // successful acquire means that this node is now the + // leader. + let is_lead = node.acquire().await.unwrap(); // Consul returns `true` is the lock was successfully // acquired and `false` if the key was already locked. - let is_lead = res.response; if is_lead { - println!("{}: elected the leader", node.name); + println!("{}: elected the leader", node.info.name); } else { - println!("{}: following", node.name); + println!("{}: following", node.info.name); } // We can confirm who the leader is by querying the `lead` // key. - let res = kv::read_json::(&client, "lead", None) - .await - .unwrap(); + let res = node.read_leader().await.unwrap(); println!( "{}: the current leader is {}", - node.name, res.response.value.name + node.info.name, res.response.name ); // To simulate a dropped service the leader will now drop // its lock. if is_lead { sleep(Duration::from_secs(2)).await; - println!("{}: dropping lead", node.name); + println!("{}: dropping lead", node.info.name); // Releasing the lock is as simple as writing to the key // and specifying the `release` parameter with the // session. - kv::set( - &client, - "lead", - b"", - Some(SetKeyRequest::builder().release(&session)), - ) - .await - .unwrap(); - println!("{}: dropped!", node.name); + node.release().await.unwrap(); + println!("{}: dropped!", node.info.name); } else { // All nodes should be watching the `lead` key for // changes in order to determine if a new election is // needed. This includes the leader, but for the sake of // this example, only the followers will watch the key. - println!("{} is watching for changes...", node.name); + println!("{} is watching for changes...", node.info.name); // Watching is done through using the blocking feature - // of the KV endpoint. When the key changes the index - // value also changes. Below we pass the index we got - // from our last read which will cause the HTP request - // to hang until either a new index is created (because - // something happened to the key) or our timeout of five - // seconds is reached. - kv::read( - &client, - "lead", - Some( - ReadKeyRequest::builder().features( - Features::builder() - .blocking(Blocking { - index: res.index.unwrap().parse::().unwrap(), - wait: Some("5s".into()), - }) - .build() - .unwrap(), - ), - ), - ) - .await - .unwrap(); + // of the KV endpoint. + let index = res.index.unwrap().parse::().unwrap(); + node.watch(index, "5s").await; // In our example we can assume that if we reached this // point the leader has dropped its lock. Real use-cases // would have all of this logic on loop. - println!("{}: attempting to become leader", node.name); - let res = kv::set_json( - &client, - "lead", - &node, - Some(SetKeyRequest::builder().acquire(session)), - ) - .await - .unwrap(); - - let is_lead = res.response; + println!("{}: attempting to become leader", node.info.name); + let is_lead = node.acquire().await.unwrap(); + if is_lead { - println!("{}: elected the leader", node.name); + println!("{}: elected the leader", node.info.name); } else { - println!("{}: following", node.name); + println!("{}: following", node.info.name); } } } diff --git a/src/kv.rs b/src/kv.rs index d34921e..ad624e6 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -32,7 +32,7 @@ pub async fn delete( api::exec_with_result(client, endpoint).await } -/// Reads the value at the given key. +/// Lists all keys at the given path. /// /// See [ReadKeysRequest] #[instrument(skip(client, opts), err)] @@ -50,7 +50,7 @@ pub async fn keys( /// /// See [ReadKeyRequest] #[instrument(skip(client, opts), err)] -pub async fn raw( +pub async fn read_raw( client: &impl Client, key: &str, opts: Option<&mut ReadRawKeyRequestBuilder>, @@ -121,6 +121,37 @@ pub async fn read_json( } } +/// Reads the raw JSON value at the given key and deserializes it into an object. +/// +/// See [ReadRawKeyRequest] +#[instrument(skip(client, opts), err)] +pub async fn read_json_raw( + client: &C, + key: &str, + opts: Option<&mut ReadRawKeyRequestBuilder>, +) -> Result, ClientError> { + let mut t = ReadRawKeyRequest::builder(); + let endpoint = opts.unwrap_or(&mut t).key(key).build().unwrap(); + let res = api::exec_with_raw(client, endpoint).await?; + + if !res.response.is_empty() { + let t = serde_json::from_slice(&res.response) + .map_err(|e| ClientError::JsonDeserializeError { source: e })?; + Ok(ApiResponse { + response: t, + cache: res.cache, + content_hash: res.content_hash, + default_acl_policy: res.default_acl_policy, + index: res.index, + known_leader: res.known_leader, + last_contact: res.last_contact, + query_backend: res.query_backend, + }) + } else { + Err(ClientError::EmptyResponseError) + } +} + /// Sets the value at the given key. /// /// See [SetKeyRequest] diff --git a/src/lib.rs b/src/lib.rs index d29fc02..39029ac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,6 +83,10 @@ //! // the response back into a UTF-8 encoded string. //! let mykey: String = res.response.pop().unwrap().value.unwrap().try_into().unwrap(); //! +//! // In most cases, it's easier to just read the raw value +//! let res = kv::read_raw(&client, "mykey", None).await.unwrap().response; +//! let mykey = std::str::from_utf8(&res).unwrap(); +//! //! assert_eq!(mykey, "myvalue".to_string()); //! # }) //! ``` diff --git a/tests/kv.rs b/tests/kv.rs index c2f9666..a29561a 100644 --- a/tests/kv.rs +++ b/tests/kv.rs @@ -21,7 +21,7 @@ fn test() { test_set(&client, key).await; test_keys(&client).await; test_read(&client, key).await; - test_raw(&client, key).await; + test_read_raw(&client, key).await; test_delete(&client, key).await; test_json(&client, key).await; }); @@ -51,8 +51,8 @@ async fn test_keys(client: &impl Client) { assert!(res.is_ok()); } -async fn test_raw(client: &impl Client, key: &str) { - let res = kv::raw(client, key, None).await; +async fn test_read_raw(client: &impl Client, key: &str) { + let res = kv::read_raw(client, key, None).await; assert!(res.is_ok()); }