Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions engine/src/workers/queue/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// See LICENSE and PATENTS files for details.

use std::{
collections::HashMap,
collections::{HashMap, HashSet},
pin::Pin,
sync::{Arc, RwLock},
};
Expand Down Expand Up @@ -434,16 +434,29 @@ impl QueueWorker {
match self.adapter.list_topics().await {
Ok(topics) => {
let mut dlq_topics = Vec::new();
let mut seen = HashSet::new();
for topic in &topics {
let (display_name, broker_type) =
if let Some(stripped) = topic.name.strip_prefix("__fn_queue::") {
(stripped.to_string(), "function_queue".to_string())
} else {
(topic.name.clone(), topic.broker_type.clone())
};
if !seen.insert(display_name.clone()) {
continue;
}
let dlq_count = self.adapter.dlq_count(&topic.name).await.unwrap_or(0);
dlq_topics.push(json!({
"topic": topic.name,
"broker_type": topic.broker_type,
"topic": display_name,
"broker_type": broker_type,
"message_count": dlq_count,
}));
}
// Also include function queue DLQs
for name in self._config.queue_configs.keys() {
if !seen.insert(name.clone()) {
continue;
}
let namespaced = format!("__fn_queue::{}", name);
let dlq_count = self.adapter.dlq_count(&namespaced).await.unwrap_or(0);
dlq_topics.push(json!({
Expand Down Expand Up @@ -2319,6 +2332,26 @@ mod tests {
}
}

#[tokio::test]
async fn console_dlq_topics_deduplicates_function_queues_reported_by_adapter() {
let (_engine, module, adapter) = setup_queue_module_with_configs();
*adapter.list_topics_result.lock().await = vec![TopicInfo {
name: "__fn_queue::default".to_string(),
broker_type: "builtin".to_string(),
subscriber_count: 0,
}];

let result = module.console_dlq_topics(json!({})).await;
match result {
FunctionResult::Success(Some(val)) => {
let topics: Vec<Value> = serde_json::from_value(val).unwrap();
let default_count = topics.iter().filter(|t| t["topic"] == "default").count();
assert_eq!(default_count, 1, "function queue topics should not be duplicated");
}
_ => panic!("Expected Success with DLQ topics"),
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

#[tokio::test]
async fn console_dlq_topics_adapter_error() {
let (_engine, module, adapter) = setup_queue_module();
Expand Down
Loading