Skip to content

Commit

Permalink
Feat : Get or create Key Value store
Browse files Browse the repository at this point in the history
  • Loading branch information
Callum-A authored and Jarema committed Dec 27, 2024
1 parent 23bb978 commit 0c9bc79
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 0 deletions.
40 changes: 40 additions & 0 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,46 @@ impl Context {
Ok(store)
}

/// Tries to get an existing key-value bucket, if one cannot be found it will create a new key-value bucket.
///
/// Note: This does not validate if the key-value on the server is compatible with the configuration passed in.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
/// let kv = jetstream
/// .get_or_create_key_value("kv".to_string(), async_nats::jetstream::kv::Config {
/// bucket: "kv".to_string(),
/// history: 10,
/// ..Default::default()
/// })
/// .await?;
/// # Ok(())
/// # }
/// ```
pub async fn get_or_create_key_value<T: Into<String>>(
&self,
bucket: T,
config: crate::jetstream::kv::Config,
) -> Result<Store, CreateKeyValueError> {
match self.get_key_value(bucket).await {
Ok(kv) => Ok(kv),
Err(e) => match e.kind() {
KeyValueErrorKind::GetBucket => self.create_key_value(config).await,
KeyValueErrorKind::InvalidStoreName => Err(CreateKeyValueError::new(
CreateKeyValueErrorKind::InvalidStoreName,
)),
KeyValueErrorKind::JetStream => {
Err(CreateKeyValueError::new(CreateKeyValueErrorKind::JetStream))
}
},
}
}

/// Deletes given key-value bucket.
///
/// # Examples
Expand Down
51 changes: 51 additions & 0 deletions async-nats/tests/kv_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,57 @@ mod kv {
assert!(info.config.allow_direct);
}

#[tokio::test]
async fn get_or_create_bucket() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = ConnectOptions::new()
.event_callback(|event| async move { println!("event: {event:?}") })
.connect(server.client_url())
.await
.unwrap();

let context = async_nats::jetstream::new(client);

let mut kv = context
.get_or_create_key_value(
"test",
async_nats::jetstream::kv::Config {
bucket: "test".into(),
description: "test_description".into(),
history: 10,
storage: StorageType::File,
num_replicas: 1,
..Default::default()
},
)
.await
.unwrap();
let info = kv.stream.info().await.unwrap();
assert_eq!("KV_test", kv.stream_name);
assert_eq!(info.config.discard, DiscardPolicy::New);
assert!(info.config.allow_direct);

let mut kv2 = context
.get_or_create_key_value(
"test",
async_nats::jetstream::kv::Config {
bucket: "test".into(),
description: "test_description".into(),
history: 10,
storage: StorageType::File,
num_replicas: 1,
..Default::default()
},
)
.await
.unwrap();

let info2 = kv2.stream.info().await.unwrap();
assert_eq!(kv2.stream_name, kv.stream_name);
assert_eq!(info2.config.discard, info.config.discard);
assert_eq!(info2.config.allow_direct, info.config.allow_direct);
}

#[tokio::test]
async fn create() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down

0 comments on commit 0c9bc79

Please sign in to comment.