Skip to content

Commit e0ec22a

Browse files
Support more operations on stream publishers and consumers
1 parent 122f4aa commit e0ec22a

File tree

2 files changed

+70
-2
lines changed

2 files changed

+70
-2
lines changed

src/api.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ where
386386
Ok(response)
387387
}
388388

389-
/// Lists stream publishers of the given stream.
389+
/// Lists stream publishers publishing to the given stream.
390390
pub async fn list_stream_publishers_of(
391391
&self,
392392
virtual_host: &str,
@@ -403,6 +403,24 @@ where
403403
Ok(response)
404404
}
405405

406+
/// Lists stream publishers on the given stream connection.
407+
pub async fn list_stream_publishers_on_connection(
408+
&self,
409+
virtual_host: &str,
410+
name: &str,
411+
) -> Result<Vec<responses::StreamPublisher>> {
412+
let response = self
413+
.http_get(
414+
path!("stream", "connections", virtual_host, name, "publishers"),
415+
None,
416+
None,
417+
)
418+
.await?;
419+
420+
let response = response.json().await?;
421+
Ok(response)
422+
}
423+
406424
/// Lists all stream consumers across the cluster.
407425
pub async fn list_stream_consumers(&self) -> Result<Vec<responses::StreamConsumer>> {
408426
let response = self
@@ -424,6 +442,24 @@ where
424442
Ok(response)
425443
}
426444

445+
/// Lists stream consumers on the given stream connection.
446+
pub async fn list_stream_consumers_on_connection(
447+
&self,
448+
virtual_host: &str,
449+
name: &str,
450+
) -> Result<Vec<responses::StreamConsumer>> {
451+
let response = self
452+
.http_get(
453+
path!("stream", "connections", virtual_host, name, "consumers"),
454+
None,
455+
None,
456+
)
457+
.await?;
458+
459+
let response = response.json().await?;
460+
Ok(response)
461+
}
462+
427463
/// Lists all queues and streams across the cluster.
428464
pub async fn list_queues(&self) -> Result<Vec<responses::QueueInfo>> {
429465
let response = self.http_get("queues", None, None).await?;

src/blocking_api.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ where
350350
Ok(response)
351351
}
352352

353-
/// Lists stream publishers on connections in the given virtual host.
353+
/// Lists stream publishers publishing to the given stream.
354354
pub fn list_stream_publishers_in(
355355
&self,
356356
virtual_host: &str,
@@ -377,6 +377,22 @@ where
377377
Ok(response)
378378
}
379379

380+
/// Lists stream publishers on the given stream connection.
381+
pub fn list_stream_publishers_on_connection(
382+
&self,
383+
virtual_host: &str,
384+
name: &str,
385+
) -> Result<Vec<responses::StreamPublisher>> {
386+
let response = self.http_get(
387+
path!("stream", "connections", virtual_host, name, "publishers"),
388+
None,
389+
None,
390+
)?;
391+
392+
let response = response.json()?;
393+
Ok(response)
394+
}
395+
380396
/// Lists all stream consumers across the cluster.
381397
pub fn list_stream_consumers(&self) -> Result<Vec<responses::StreamConsumer>> {
382398
let response = self.http_get(path!("stream", "consumers"), None, None)?;
@@ -396,6 +412,22 @@ where
396412
Ok(response)
397413
}
398414

415+
/// Lists stream consumers on the given stream connection.
416+
pub fn list_stream_consumers_on_connection(
417+
&self,
418+
virtual_host: &str,
419+
name: &str,
420+
) -> Result<Vec<responses::StreamConsumer>> {
421+
let response = self.http_get(
422+
path!("stream", "connections", virtual_host, name, "consumers"),
423+
None,
424+
None,
425+
)?;
426+
427+
let response = response.json()?;
428+
Ok(response)
429+
}
430+
399431
/// Lists all queues and streams across the cluster.
400432
pub fn list_queues(&self) -> Result<Vec<responses::QueueInfo>> {
401433
let response = self.http_get("queues", None, None)?;

0 commit comments

Comments
 (0)