Skip to content

Commit

Permalink
Adds raw JSON KV read and improves election example (jmgilman#10)
Browse files Browse the repository at this point in the history
* Adds raw JSON kv function and improves README

* Cleans up the leader election example
  • Loading branch information
jmgilman authored Oct 1, 2021
1 parent e5d6720 commit 2b78df2
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 85 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ let mut res = kv::read(&client, "mykey", None).await.unwrap();
// the response back into a UTF-8 encoded string.
let mykey: String = res.response.pop().unwrap().value.unwrap().try_into().unwrap();

// In most cases, it's easier to just read the raw value
let mykey = std::str::from_utf8(&kv::read_raw(&client, "mykey", None).unwrap()).unwrap()

assert_eq!(mykey, "myvalue".to_string());
```

Expand Down
221 changes: 141 additions & 80 deletions examples/election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,121 @@ use consulrs::{
api::{
features::Blocking,
kv::requests::{ReadKeyRequest, SetKeyRequest},
Features,
ApiResponse, Features,
},
client::ConsulClient,
error::ClientError,
kv, session,
};
use futures::future::*;
use serde::{Deserialize, Serialize};
use tokio::time::{sleep, Duration};

/// Represents a service in our fictitious example.
struct Node {
pub client: ConsulClient,
pub info: NodeInfo,
pub key: String,
pub session: Option<String>,
}

/// The data structure stored in the `lead` key. In this case only the node name
// is being stored so followers can see who the leader is - more advanced usage
// would put all relevant information here (i.e. the lead's IP address).
/// is being stored so followers can see who the leader is - more advanced usage
/// would put all relevant information here (i.e. the lead's IP address).
#[derive(Debug, Deserialize, Serialize)]
struct Node {
struct NodeInfo {
pub name: String,
}

impl Node {
/// Returns a new [Node].
pub fn new(client: ConsulClient, name: String, key: String) -> Self {
Node {
client,
info: NodeInfo { name },
key,
session: None,
}
}

/// Attempts to acquire leadership for this node.
///
/// Leadership is gained by passsing the `acquire` parameter when writing to
/// the leader key. The write will only be successful if the key has not
/// already been acquired by another entity. If successful, `true` is
/// returned and the this node's `info` is serialized and written to the
/// key's value.
pub async fn acquire(&self) -> Result<bool, ClientError> {
Ok(kv::set_json(
&self.client,
&self.key,
&self.info,
Some(SetKeyRequest::builder().acquire(self.session.as_ref().unwrap())),
)
.await?
.response)
}

/// Reads the current leader information from the leader key.
pub async fn read_leader(&self) -> Result<ApiResponse<NodeInfo>, ClientError> {
kv::read_json_raw::<NodeInfo, _>(&self.client, &self.key, None).await
}

/// If this node is the leader, releases the leader key to allow other nodes
/// to attempt to acquire it.
///
/// Releasing is done by passing the `release` parameter when writing to the
/// leader key. The value of the leader key is also cleared out to match the
/// fact that there is now no elected leader.
pub async fn release(&self) -> Result<bool, ClientError> {
Ok(kv::set(
&self.client,
&self.key,
b"",
Some(SetKeyRequest::builder().release(self.session.as_ref().unwrap())),
)
.await?
.response)
}

/// Acquires a new session for this node.
///
/// Sessions can be roughly considered equal to locks. Each node must hold
/// a unique session which it uses in order to acquire and release locks on
/// the leader key.
pub async fn session(&mut self) -> Result<(), ClientError> {
self.session = Some(session::create(&self.client, None).await?.response.id);
Ok(())
}

/// Watches the leader key for changes or until the timeout is reached.
///
/// Blocking is accomplished by passing a modify index along with the read
/// request to the leader key. The modify index must match the last
/// index change, otherwise this request will return immediately. The HTTP
/// request will hang until a change in the key is detected or the given
/// timeout is reached.
pub async fn watch(&self, index: u64, timeout: &str) {
kv::read(
&self.client,
&self.key,
Some(
ReadKeyRequest::builder().features(
Features::builder()
.blocking(Blocking {
index: index,
wait: Some(timeout.into()),
})
.build()
.unwrap(),
),
),
)
.await
.unwrap();
}
}

fn main() {
// Setup a test environment. The `test.run()` method is responsible for
// bringing up a Consul Docker container to use in the example.
Expand All @@ -36,125 +135,87 @@ fn main() {
// become the leader when it initializes.
let a = (1..4)
.map(|i| {
// This is the key we will use for handling elections.
const LEADER_KEY: &str = "leader";

let server = &server;
async move {
let node = Node {
name: format!("Node {}", i),
};
println!("{}: starting up", node.name);

// The `server.client()` method is just a simple wrapper
// that returns a `ConsulClient` already configured to talk
// to our Docker container.
let client = server.client();
let mut node = Node::new(
server.client(),
format!("Node {}", i),
LEADER_KEY.to_string(),
);
println!("{}: starting up", node.info.name);

// A session is roughly equivalent to a lock. Each node
// must have its own lock in order to uniquely hold the
// `leader` key.
let session = session::create(&client, None).await.unwrap().response.id;
println!("{}: created session {}", node.name, session);

// Here is where election happens. By specifying the
// `acquire` parameter with our write request we ask Consul
// to attempt to lock this key for our unique session. The
// actual content we're writing is the `Node` serialized
// into a JSON string.
let res = kv::set_json(
&client,
"lead",
&node,
Some(SetKeyRequest::builder().acquire(&session)),
)
.await
.unwrap();
node.session().await.unwrap();
println!(
"{}: created session {}",
node.info.name,
node.session.as_ref().unwrap()
);

// Here is where election happens. The `acquire()` method
// will attempt to acquire a lock on the leader key and
// return a `bool` reflecting the status of the attempt. A
// successful acquire means that this node is now the
// leader.
let is_lead = node.acquire().await.unwrap();

// Consul returns `true` is the lock was successfully
// acquired and `false` if the key was already locked.
let is_lead = res.response;
if is_lead {
println!("{}: elected the leader", node.name);
println!("{}: elected the leader", node.info.name);
} else {
println!("{}: following", node.name);
println!("{}: following", node.info.name);
}

// We can confirm who the leader is by querying the `lead`
// key.
let res = kv::read_json::<Node, _>(&client, "lead", None)
.await
.unwrap();
let res = node.read_leader().await.unwrap();
println!(
"{}: the current leader is {}",
node.name, res.response.value.name
node.info.name, res.response.name
);

// To simulate a dropped service the leader will now drop
// its lock.
if is_lead {
sleep(Duration::from_secs(2)).await;
println!("{}: dropping lead", node.name);
println!("{}: dropping lead", node.info.name);

// Releasing the lock is as simple as writing to the key
// and specifying the `release` parameter with the
// session.
kv::set(
&client,
"lead",
b"",
Some(SetKeyRequest::builder().release(&session)),
)
.await
.unwrap();
println!("{}: dropped!", node.name);
node.release().await.unwrap();
println!("{}: dropped!", node.info.name);
} else {
// All nodes should be watching the `lead` key for
// changes in order to determine if a new election is
// needed. This includes the leader, but for the sake of
// this example, only the followers will watch the key.
println!("{} is watching for changes...", node.name);
println!("{} is watching for changes...", node.info.name);

// Watching is done through using the blocking feature
// of the KV endpoint. When the key changes the index
// value also changes. Below we pass the index we got
// from our last read which will cause the HTP request
// to hang until either a new index is created (because
// something happened to the key) or our timeout of five
// seconds is reached.
kv::read(
&client,
"lead",
Some(
ReadKeyRequest::builder().features(
Features::builder()
.blocking(Blocking {
index: res.index.unwrap().parse::<u64>().unwrap(),
wait: Some("5s".into()),
})
.build()
.unwrap(),
),
),
)
.await
.unwrap();
// of the KV endpoint.
let index = res.index.unwrap().parse::<u64>().unwrap();
node.watch(index, "5s").await;

// In our example we can assume that if we reached this
// point the leader has dropped its lock. Real use-cases
// would have all of this logic on loop.
println!("{}: attempting to become leader", node.name);
let res = kv::set_json(
&client,
"lead",
&node,
Some(SetKeyRequest::builder().acquire(session)),
)
.await
.unwrap();

let is_lead = res.response;
println!("{}: attempting to become leader", node.info.name);
let is_lead = node.acquire().await.unwrap();

if is_lead {
println!("{}: elected the leader", node.name);
println!("{}: elected the leader", node.info.name);
} else {
println!("{}: following", node.name);
println!("{}: following", node.info.name);
}
}
}
Expand Down
35 changes: 33 additions & 2 deletions src/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub async fn delete(
api::exec_with_result(client, endpoint).await
}

/// Reads the value at the given key.
/// Lists all keys at the given path.
///
/// See [ReadKeysRequest]
#[instrument(skip(client, opts), err)]
Expand All @@ -50,7 +50,7 @@ pub async fn keys(
///
/// See [ReadKeyRequest]
#[instrument(skip(client, opts), err)]
pub async fn raw(
pub async fn read_raw(
client: &impl Client,
key: &str,
opts: Option<&mut ReadRawKeyRequestBuilder>,
Expand Down Expand Up @@ -121,6 +121,37 @@ pub async fn read_json<T: DeserializeOwned, C: Client>(
}
}

/// Reads the raw JSON value at the given key and deserializes it into an object.
///
/// See [ReadRawKeyRequest]
#[instrument(skip(client, opts), err)]
pub async fn read_json_raw<T: DeserializeOwned, C: Client>(
client: &C,
key: &str,
opts: Option<&mut ReadRawKeyRequestBuilder>,
) -> Result<ApiResponse<T>, ClientError> {
let mut t = ReadRawKeyRequest::builder();
let endpoint = opts.unwrap_or(&mut t).key(key).build().unwrap();
let res = api::exec_with_raw(client, endpoint).await?;

if !res.response.is_empty() {
let t = serde_json::from_slice(&res.response)
.map_err(|e| ClientError::JsonDeserializeError { source: e })?;
Ok(ApiResponse {
response: t,
cache: res.cache,
content_hash: res.content_hash,
default_acl_policy: res.default_acl_policy,
index: res.index,
known_leader: res.known_leader,
last_contact: res.last_contact,
query_backend: res.query_backend,
})
} else {
Err(ClientError::EmptyResponseError)
}
}

/// Sets the value at the given key.
///
/// See [SetKeyRequest]
Expand Down
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@
//! // the response back into a UTF-8 encoded string.
//! let mykey: String = res.response.pop().unwrap().value.unwrap().try_into().unwrap();
//!
//! // In most cases, it's easier to just read the raw value
//! let res = kv::read_raw(&client, "mykey", None).await.unwrap().response;
//! let mykey = std::str::from_utf8(&res).unwrap();
//!
//! assert_eq!(mykey, "myvalue".to_string());
//! # })
//! ```
Expand Down
6 changes: 3 additions & 3 deletions tests/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn test() {
test_set(&client, key).await;
test_keys(&client).await;
test_read(&client, key).await;
test_raw(&client, key).await;
test_read_raw(&client, key).await;
test_delete(&client, key).await;
test_json(&client, key).await;
});
Expand Down Expand Up @@ -51,8 +51,8 @@ async fn test_keys(client: &impl Client) {
assert!(res.is_ok());
}

async fn test_raw(client: &impl Client, key: &str) {
let res = kv::raw(client, key, None).await;
async fn test_read_raw(client: &impl Client, key: &str) {
let res = kv::read_raw(client, key, None).await;
assert!(res.is_ok());
}

Expand Down

0 comments on commit 2b78df2

Please sign in to comment.