Skip to content

Commit a06cef3

Browse files
committed
Implement Jakarta Messaging 3.1 feature, async send with CompletionListener. Added unit tests to cover behaviour required by specs.
1 parent 52a9aec commit a06cef3

File tree

7 files changed

+870
-32
lines changed

7 files changed

+870
-32
lines changed

activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.concurrent.atomic.AtomicInteger;
3838
import java.util.concurrent.atomic.AtomicLong;
3939

40+
import jakarta.jms.CompletionListener;
4041
import jakarta.jms.Connection;
4142
import jakarta.jms.ConnectionConsumer;
4243
import jakarta.jms.ConnectionMetaData;
@@ -1439,6 +1440,68 @@ public void onCompletion(FutureResponse resp) {
14391440
}
14401441
}
14411442

1443+
/**
1444+
* Send a packet through a Connection - for internal use only
1445+
*
1446+
* @param command
1447+
*
1448+
* @throws JMSException
1449+
*/
1450+
public void syncSendPacket(final Command command, final CompletionListener completionListener) throws JMSException {
1451+
if(completionListener==null) {
1452+
syncSendPacket(command);
1453+
} else {
1454+
if (isClosed()) {
1455+
throw new ConnectionClosedException();
1456+
}
1457+
try {
1458+
this.transport.asyncRequest(command, resp -> {
1459+
Response response;
1460+
Throwable exception = null;
1461+
try {
1462+
response = resp.getResult();
1463+
if (response.isException()) {
1464+
ExceptionResponse er = (ExceptionResponse)response;
1465+
exception = er.getException();
1466+
}
1467+
} catch (Exception e) {
1468+
exception = e;
1469+
}
1470+
if (exception != null) {
1471+
if ( exception instanceof JMSException) {
1472+
completionListener.onException((jakarta.jms.Message) command, (JMSException) exception);
1473+
} else {
1474+
if (isClosed() || closing.get()) {
1475+
LOG.debug("Received an exception but connection is closing");
1476+
}
1477+
JMSException jmsEx = null;
1478+
try {
1479+
jmsEx = JMSExceptionSupport.create(exception);
1480+
} catch(Throwable e) {
1481+
LOG.error("Caught an exception trying to create a JMSException for " +exception,e);
1482+
}
1483+
// dispose of transport for security exceptions on connection initiation
1484+
if (exception instanceof SecurityException && command instanceof ConnectionInfo){
1485+
try {
1486+
forceCloseOnSecurityException(exception);
1487+
} catch (Throwable t) {
1488+
// We throw the original error from the ExceptionResponse instead.
1489+
}
1490+
}
1491+
if (jmsEx != null) {
1492+
completionListener.onException((jakarta.jms.Message) command, jmsEx);
1493+
}
1494+
}
1495+
} else {
1496+
completionListener.onCompletion((jakarta.jms.Message) command);
1497+
}
1498+
});
1499+
} catch (IOException e) {
1500+
throw JMSExceptionSupport.create(e);
1501+
}
1502+
}
1503+
}
1504+
14421505
private void forceCloseOnSecurityException(Throwable exception) {
14431506
LOG.trace("force close on security exception:{}, transport={}", this, transport, exception);
14441507
onException(new IOException("Force close due to SecurityException on connect", exception));

activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java

Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@
2323
import jakarta.jms.CompletionListener;
2424
import jakarta.jms.Destination;
2525
import jakarta.jms.IllegalStateException;
26+
import jakarta.jms.IllegalStateRuntimeException;
2627
import jakarta.jms.InvalidDestinationException;
2728
import jakarta.jms.JMSException;
2829
import jakarta.jms.Message;
29-
3030
import org.apache.activemq.command.ActiveMQDestination;
3131
import org.apache.activemq.command.ProducerAck;
3232
import org.apache.activemq.command.ProducerId;
@@ -83,11 +83,13 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl
8383
private final long startTime;
8484
private MessageTransformer transformer;
8585
private MemoryUsage producerWindow;
86+
private final ThreadLocal<Boolean> inCompletionListenerCallback = new ThreadLocal<>();
8687

8788
protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException {
8889
super(session);
8990
this.info = new ProducerInfo(producerId);
9091
this.info.setWindowSize(session.connection.getProducerWindowSize());
92+
inCompletionListenerCallback.set(false);
9193
// Allows the options on the destination to configure the producerInfo
9294
if (destination != null && destination.getOptions() != null) {
9395
Map<String, Object> options = IntrospectionSupport.extractProperties(
@@ -168,6 +170,9 @@ public Destination getDestination() throws JMSException {
168170
*/
169171
@Override
170172
public void close() throws JMSException {
173+
if (inCompletionListenerCallback.get()) {
174+
throw new IllegalStateRuntimeException("Can't close message producer within CompletionListener");
175+
}
171176
if (!closed) {
172177
dispose();
173178
this.session.asyncSendPacket(info.createRemoveCommand());
@@ -239,27 +244,88 @@ public void send(Destination destination, Message message, int deliveryMode, int
239244
*/
240245
@Override
241246
public void send(Message message, CompletionListener completionListener) throws JMSException {
242-
throw new UnsupportedOperationException("send(Message, CompletionListener) is not supported");
247+
this.send(getDestination(),
248+
message,
249+
defaultDeliveryMode,
250+
defaultPriority,
251+
defaultTimeToLive,
252+
completionListener);
243253
}
244254

255+
245256
@Override
246257
public void send(Message message, int deliveryMode, int priority, long timeToLive,
247258
CompletionListener completionListener) throws JMSException {
248-
throw new UnsupportedOperationException("send(Message, deliveryMode, priority, timetoLive, CompletionListener) is not supported");
259+
this.send(this.getDestination(),
260+
message,
261+
deliveryMode,
262+
priority,
263+
timeToLive,
264+
completionListener);
249265
}
250266

251267
@Override
252268
public void send(Destination destination, Message message, CompletionListener completionListener) throws JMSException {
253-
throw new UnsupportedOperationException("send(Destination, Message, CompletionListener) is not supported");
269+
this.send(destination,
270+
message,
271+
defaultDeliveryMode,
272+
defaultPriority,
273+
defaultTimeToLive,
274+
completionListener);
254275
}
255276

256277
@Override
257278
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
258279
CompletionListener completionListener) throws JMSException {
259-
throw new UnsupportedOperationException("send(Destination, Message, deliveryMode, priority, timetoLive, CompletionListener) is not supported");
280+
this.send(destination, message, deliveryMode, priority, timeToLive,
281+
getDisableMessageID(), getDisableMessageTimestamp(), completionListener);
282+
}
283+
284+
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
285+
boolean disableMessageID, boolean disableMessageTimestamp, CompletionListener completionListener) throws JMSException {
286+
checkClosed();
287+
if (destination == null) {
288+
if (info.getDestination() == null) {
289+
throw new UnsupportedOperationException("A destination must be specified.");
290+
}
291+
throw new InvalidDestinationException("Don't understand null destinations");
292+
}
293+
294+
ActiveMQDestination dest;
295+
if (destination.equals(info.getDestination())) {
296+
dest = (ActiveMQDestination)destination;
297+
} else if (info.getDestination() == null) {
298+
dest = ActiveMQDestination.transform(destination);
299+
} else {
300+
throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
301+
}
302+
if (dest == null) {
303+
throw new JMSException("No destination specified");
304+
}
305+
306+
if (transformer != null) {
307+
Message transformedMessage = transformer.producerTransform(session, this, message);
308+
if (transformedMessage != null) {
309+
message = transformedMessage;
310+
}
311+
}
312+
313+
if (producerWindow != null) {
314+
try {
315+
producerWindow.waitForSpace();
316+
} catch (InterruptedException e) {
317+
throw new JMSException("Send aborted due to thread interrupt.");
318+
}
319+
}
320+
321+
this.session.send(this, dest, message, deliveryMode, priority, timeToLive, disableMessageID,
322+
disableMessageTimestamp, producerWindow, sendTimeout, completionListener, inCompletionListenerCallback);
323+
324+
stats.onMessage();
260325
}
261326

262-
public void send(Message message, AsyncCallback onComplete) throws JMSException {
327+
328+
public void send(Message message, AsyncCallback onComplete) throws JMSException {
263329
this.send(this.getDestination(),
264330
message,
265331
this.defaultDeliveryMode,

activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public class ActiveMQProducer implements JMSProducer {
5656

5757
// Properties applied to all messages on a per-JMS producer instance basis
5858
private Map<String, Object> messageProperties = null;
59+
private CompletionListener completionListener = null;
5960

6061
ActiveMQProducer(ActiveMQContext activemqContext, ActiveMQMessageProducer activemqMessageProducer) {
6162
this.activemqContext = activemqContext;
@@ -86,8 +87,7 @@ public JMSProducer send(Destination destination, Message message) {
8687
message.setObjectProperty(propertyEntry.getKey(), propertyEntry.getValue());
8788
}
8889
}
89-
90-
activemqMessageProducer.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), getDisableMessageID(), getDisableMessageTimestamp(), null);
90+
activemqMessageProducer.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), getDisableMessageID(), getDisableMessageTimestamp(), getAsync());
9191
} catch (JMSException e) {
9292
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
9393
}
@@ -253,12 +253,13 @@ public long getDeliveryDelay() {
253253

254254
@Override
255255
public JMSProducer setAsync(CompletionListener completionListener) {
256-
throw new UnsupportedOperationException("setAsync(CompletionListener) is not supported");
256+
this.completionListener = completionListener;
257+
return this;
257258
}
258259

259260
@Override
260261
public CompletionListener getAsync() {
261-
throw new UnsupportedOperationException("getAsync() is not supported");
262+
return this.completionListener;
262263
}
263264

264265
@Override

0 commit comments

Comments
 (0)