diff --git a/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java b/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java index d63d5771d4..f81dd5bb16 100644 --- a/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java +++ b/amqp/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java @@ -189,12 +189,17 @@ public Address[] getAddresses() { public List ack(List messages) { final List failedMessages = new ArrayList<>(); - for (final Message message : messages) { - try { - ackMsg(message); - } catch (final Exception e) { - LOGGER.error("Cannot ACK message with delivery tag {}", message.getReceipt(), e); - failedMessages.add(message.getReceipt()); + if (!useExchange) { + // only attempt to ack messages when using queues. + // it makes no sense to ack over exchange since the messages are no longer there. + for (final Message message : messages) { + try { + ackMsg(message); + } catch (final Exception e) { + LOGGER.error( + "Cannot ACK message with delivery tag {}", message.getReceipt(), e); + failedMessages.add(message.getReceipt()); + } } } return failedMessages;