From 7dbe19332d9d29be3a6e296faba9ab23cbd7c926 Mon Sep 17 00:00:00 2001 From: Sergio Urinovsky Date: Thu, 12 Mar 2026 17:58:25 -0300 Subject: [PATCH 1/3] do not try to ack a message on a exchange --- .../queue/amqp/AMQPObservableQueue.java | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java b/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java index d63d5771d4..e18b287cc8 100644 --- a/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java +++ b/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java @@ -189,14 +189,18 @@ public Address[] getAddresses() { public List ack(List messages) { final List failedMessages = new ArrayList<>(); - for (final Message message : messages) { - try { - ackMsg(message); - } catch (final Exception e) { - LOGGER.error("Cannot ACK message with delivery tag {}", message.getReceipt(), e); - failedMessages.add(message.getReceipt()); + if (!useExchange) { + // only attempt to ack messages when using queues. + // it make no sense to ack over exchange since the message is no longer there. + for (final Message message : messages) { + try { + ackMsg(message); + } catch (final Exception e) { + LOGGER.error("Cannot ACK message with delivery tag {}", message.getReceipt(), e); + failedMessages.add(message.getReceipt()); + } } - } + } return failedMessages; } From 93d911c8facf5701c8c9cbf5e66d399074bc372b Mon Sep 17 00:00:00 2001 From: Sergio Urinovsky Date: Thu, 12 Mar 2026 18:13:38 -0300 Subject: [PATCH 2/3] fix formatting errors --- .../conductor/contribs/queue/amqp/AMQPObservableQueue.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java b/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java index e18b287cc8..5e92803fa7 100644 --- a/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java +++ b/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java @@ -190,17 +190,18 @@ public Address[] getAddresses() { public List ack(List messages) { final List failedMessages = new ArrayList<>(); if (!useExchange) { - // only attempt to ack messages when using queues. + // only attempt to ack messages when using queues. // it make no sense to ack over exchange since the message is no longer there. for (final Message message : messages) { try { ackMsg(message); } catch (final Exception e) { - LOGGER.error("Cannot ACK message with delivery tag {}", message.getReceipt(), e); + LOGGER.error( + "Cannot ACK message with delivery tag {}", message.getReceipt(), e); failedMessages.add(message.getReceipt()); } } - } + } return failedMessages; } From 8f528f8d5a1010b6df9931b8804081caecdf8684 Mon Sep 17 00:00:00 2001 From: Sergio Urinovsky Date: Thu, 12 Mar 2026 19:23:42 -0300 Subject: [PATCH 3/3] fixed comment --- .../conductor/contribs/queue/amqp/AMQPObservableQueue.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java b/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java index 5e92803fa7..f81dd5bb16 100644 --- a/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java +++ b/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java @@ -191,7 +191,7 @@ public List ack(List messages) { final List failedMessages = new ArrayList<>(); if (!useExchange) { // only attempt to ack messages when using queues. - // it make no sense to ack over exchange since the message is no longer there. + // it makes no sense to ack over exchange since the messages are no longer there. for (final Message message : messages) { try { ackMsg(message);