diff --git a/async-nats/src/jetstream/context.rs b/async-nats/src/jetstream/context.rs index ae13e8429..57c85562e 100644 --- a/async-nats/src/jetstream/context.rs +++ b/async-nats/src/jetstream/context.rs @@ -821,6 +821,46 @@ impl Context { Ok(store) } + /// Tries to get an existing key-value bucket, if one cannot be found it will create a new key-value bucket. + /// + /// Note: This does not validate if the key-value on the server is compatible with the configuration passed in. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// let client = async_nats::connect("demo.nats.io:4222").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// let kv = jetstream + /// .get_or_create_key_value("kv".to_string(), async_nats::jetstream::kv::Config { + /// bucket: "kv".to_string(), + /// history: 10, + /// ..Default::default() + /// }) + /// .await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn get_or_create_key_value>( + &self, + bucket: T, + config: crate::jetstream::kv::Config, + ) -> Result { + match self.get_key_value(bucket).await { + Ok(kv) => Ok(kv), + Err(e) => match e.kind() { + KeyValueErrorKind::GetBucket => self.create_key_value(config).await, + KeyValueErrorKind::InvalidStoreName => Err(CreateKeyValueError::new( + CreateKeyValueErrorKind::InvalidStoreName, + )), + KeyValueErrorKind::JetStream => { + Err(CreateKeyValueError::new(CreateKeyValueErrorKind::JetStream)) + } + }, + } + } + /// Deletes given key-value bucket. /// /// # Examples diff --git a/async-nats/tests/kv_tests.rs b/async-nats/tests/kv_tests.rs index a443b5068..b74f010d1 100644 --- a/async-nats/tests/kv_tests.rs +++ b/async-nats/tests/kv_tests.rs @@ -52,6 +52,57 @@ mod kv { assert!(info.config.allow_direct); } + #[tokio::test] + async fn get_or_create_bucket() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = ConnectOptions::new() + .event_callback(|event| async move { println!("event: {event:?}") }) + .connect(server.client_url()) + .await + .unwrap(); + + let context = async_nats::jetstream::new(client); + + let mut kv = context + .get_or_create_key_value( + "test", + async_nats::jetstream::kv::Config { + bucket: "test".into(), + description: "test_description".into(), + history: 10, + storage: StorageType::File, + num_replicas: 1, + ..Default::default() + }, + ) + .await + .unwrap(); + let info = kv.stream.info().await.unwrap(); + assert_eq!("KV_test", kv.stream_name); + assert_eq!(info.config.discard, DiscardPolicy::New); + assert!(info.config.allow_direct); + + let mut kv2 = context + .get_or_create_key_value( + "test", + async_nats::jetstream::kv::Config { + bucket: "test".into(), + description: "test_description".into(), + history: 10, + storage: StorageType::File, + num_replicas: 1, + ..Default::default() + }, + ) + .await + .unwrap(); + + let info2 = kv2.stream.info().await.unwrap(); + assert_eq!(kv2.stream_name, kv.stream_name); + assert_eq!(info2.config.discard, info.config.discard); + assert_eq!(info2.config.allow_direct, info.config.allow_direct); + } + #[tokio::test] async fn create() { let server = nats_server::run_server("tests/configs/jetstream.conf");