Skip to content

Commit f705bf5

Browse files
authored
Plug cli-protonj2 into selftests (#287)
1 parent 03940c0 commit f705bf5

File tree

17 files changed

+1154
-503
lines changed

17 files changed

+1154
-503
lines changed

cli-protonj2/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@
9090
<version>4.6.1</version>
9191
</dependency>
9292

93+
<dependency>
94+
<groupId>com.redhat.cli-java</groupId>
95+
<artifactId>cli</artifactId>
96+
<scope>test</scope>
97+
</dependency>
9398
<dependency>
9499
<groupId>com.redhat.cli-java</groupId>
95100
<artifactId>tests</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
/*
2+
* Copyright (c) 2021 Red Hat, Inc.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package com.redhat.mqe;
21+
22+
import com.redhat.mqe.lib.Utils;
23+
import org.apache.qpid.protonj2.client.Client;
24+
import org.apache.qpid.protonj2.client.ClientOptions;
25+
import org.apache.qpid.protonj2.client.Connection;
26+
import org.apache.qpid.protonj2.client.ConnectionOptions;
27+
import org.apache.qpid.protonj2.client.Delivery;
28+
import org.apache.qpid.protonj2.client.DistributionMode;
29+
import org.apache.qpid.protonj2.client.Message;
30+
import org.apache.qpid.protonj2.client.Receiver;
31+
import org.apache.qpid.protonj2.client.ReceiverOptions;
32+
import org.apache.qpid.protonj2.client.Sender;
33+
import picocli.CommandLine;
34+
35+
import java.net.URI;
36+
import java.util.ArrayList;
37+
import java.util.List;
38+
import java.util.Map;
39+
import java.util.concurrent.Callable;
40+
import java.util.concurrent.TimeUnit;
41+
42+
import static com.redhat.mqe.lib.ClientOptionManager.QUEUE_PREFIX;
43+
import static com.redhat.mqe.lib.ClientOptionManager.TOPIC_PREFIX;
44+
45+
@CommandLine.Command(
46+
name = "receiver",
47+
mixinStandardHelpOptions = true,
48+
version = "1.0.0",
49+
description = "Opens AMQP connections"
50+
)
51+
public class CliProtonJ2Receiver extends CliProtonJ2SenderReceiver implements Callable<Integer> {
52+
53+
private final ProtonJ2MessageFormatter messageFormatter;
54+
55+
@CommandLine.Option(names = {"--log-msgs"}, description = "MD5, SHA-1, SHA-256, ...")
56+
private LogMsgs logMsgs = LogMsgs.dict;
57+
58+
@CommandLine.Option(names = {"--msg-content-hashed"})
59+
private String msgContentHashedString = "false";
60+
61+
@CommandLine.Option(names = {"-b", "--broker"}, description = "MD5, SHA-1, SHA-256, ...")
62+
private String broker = "MD5";
63+
64+
@CommandLine.Option(names = {"--conn-username"}, description = "MD5, SHA-1, SHA-256, ...")
65+
private String connUsername = "MD5";
66+
67+
@CommandLine.Option(names = {"--conn-password"}, description = "MD5, SHA-1, SHA-256, ...")
68+
private String connPassword = "MD5";
69+
70+
@CommandLine.Option(names = {"--conn-clientid"})
71+
private String connClientId;
72+
73+
@CommandLine.Option(names = {"--durable-subscriber"})
74+
private String durableSubscriberString = "false";
75+
76+
@CommandLine.Option(names = {"--durable-subscriber-name"})
77+
private String durableSubscriberName;
78+
79+
// TODO not implemented
80+
@CommandLine.Option(names = {"--subscriber-unsubscribe"})
81+
private String subscriberUnsubscribeString;
82+
83+
@CommandLine.Option(names = {"-a", "--address"}, description = "MD5, SHA-1, SHA-256, ...")
84+
private String address = "MD5";
85+
86+
@CommandLine.Option(names = {"--recv-browse"}, description = "browse queued messages instead of receiving them")
87+
private String recvBrowseString = "false";
88+
89+
@CommandLine.Option(names = {"--count"}, description = "MD5, SHA-1, SHA-256, ...")
90+
private int count = 1;
91+
92+
@CommandLine.Option(names = {"--timeout"}, description = "MD5, SHA-1, SHA-256, ...")
93+
private int timeout;
94+
95+
@CommandLine.Option(names = {"--conn-auth-mechanisms"}, description = "MD5, SHA-1, SHA-256, ...")
96+
// todo, want to accept comma-separated lists; there is https://picocli.info/#_split_regex
97+
private List<AuthMechanism> connAuthMechanisms = new ArrayList<>();
98+
99+
@CommandLine.Option(names = {"--process-reply-to"})
100+
private boolean processReplyTo = false;
101+
102+
@CommandLine.Option(names = {"--duration"}) // todo
103+
private Integer duration;
104+
105+
@CommandLine.Option(names = {"--duration-mode"}) // todo
106+
private DurationMode durationMode;
107+
108+
@CommandLine.Option(names = {"--ssn-ack-mode"})
109+
private SsnAckMode ssnAckMode;
110+
111+
public CliProtonJ2Receiver() {
112+
this.messageFormatter = new ProtonJ2MessageFormatter();
113+
}
114+
115+
public CliProtonJ2Receiver(ProtonJ2MessageFormatter messageFormatter) {
116+
this.messageFormatter = messageFormatter;
117+
}
118+
119+
@Override
120+
public Integer call() throws Exception { // your business logic goes here...
121+
String prefix = "";
122+
if (!broker.startsWith("amqp://") && !broker.startsWith("amqps://")) {
123+
prefix = "amqp://";
124+
}
125+
final URI url = new URI(prefix + broker);
126+
final String serverHost = url.getHost();
127+
int serverPort = url.getPort();
128+
serverPort = (serverPort == -1) ? 5672 : serverPort;
129+
130+
String destinationCapability = "queue";
131+
if (address.startsWith(TOPIC_PREFIX)) {
132+
address = address.substring((TOPIC_PREFIX.length()));
133+
destinationCapability = "topic";
134+
}
135+
if (address.startsWith(QUEUE_PREFIX)) {
136+
address = address.substring((QUEUE_PREFIX.length()));
137+
}
138+
139+
ClientOptions clientOptions = new ClientOptions();
140+
// TODO api usability I had to hunt for this a bit; the idea is to have durable subscription: need specify connection id and subscriber name
141+
if (connClientId != null) {
142+
clientOptions.id(connClientId);
143+
}
144+
145+
// TODO api usability; If I use the w/ clientOptions variant of Client.create, then .id defaults to null, and I get exception;
146+
// ok, that just cannot be true ^^^; but it looks to be true; what!?!
147+
// aha, right; constructor does not check, factory method does check for null
148+
// proposed solution: allow null there, and let it mean autoassign; or tell us method to generate ID ourselves if we don't care
149+
Client client;
150+
if (clientOptions.id() != null) {
151+
client = Client.create(clientOptions);
152+
} else {
153+
client = Client.create();
154+
}
155+
156+
final ConnectionOptions options = new ConnectionOptions();
157+
options.user(connUsername);
158+
options.password(connPassword);
159+
for (AuthMechanism mech : connAuthMechanisms) {
160+
options.saslOptions().addAllowedMechanism(mech.name());
161+
}
162+
163+
/*
164+
TODO API usability, hard to ask for queue when dealing with broker that likes to autocreate topics
165+
*/
166+
ReceiverOptions receiverOptions = new ReceiverOptions();
167+
// is it target or source? target.
168+
receiverOptions.sourceOptions().capabilities(destinationCapability);
169+
170+
// todo: another usability, little hard to figure out this is analogue of jms to browse queues
171+
if (stringToBool(recvBrowseString)) {
172+
receiverOptions.sourceOptions().distributionMode(DistributionMode.COPY);
173+
}
174+
175+
// TODO: API question: what is difference between autoSettle and autoAccept? why I want one but not the other?
176+
if (ssnAckMode != null) {
177+
if (ssnAckMode == SsnAckMode.client) {
178+
receiverOptions.autoAccept(false);
179+
receiverOptions.autoSettle(false);
180+
}
181+
}
182+
183+
try (Connection connection = client.connect(serverHost, serverPort, options)) {
184+
Receiver receiver;
185+
if (stringToBool(durableSubscriberString)) {
186+
receiver = connection.openDurableReceiver(address, durableSubscriberName, receiverOptions);
187+
} else {
188+
receiver = connection.openReceiver(address, receiverOptions);
189+
}
190+
191+
double initialTimestamp = Utils.getTime();
192+
for (int i = 0; i < count; i++) {
193+
194+
// if (durationMode == DurationMode.sleepBeforeReceive) {
195+
// LOG.trace("Sleeping before receive");
196+
// Utils.sleepUntilNextIteration(initialTimestamp, msgCount, duration, i + 1);
197+
// }
198+
199+
final Delivery delivery;
200+
if (timeout == 0) {
201+
delivery = receiver.receive(); // todo: can default it to -1
202+
} else {
203+
delivery = receiver.receive(timeout, TimeUnit.SECONDS);
204+
}
205+
206+
if (delivery == null) {
207+
break;
208+
}
209+
210+
if (durationMode == DurationMode.afterReceive) {
211+
// LOG.trace("Sleeping after receive");
212+
Utils.sleepUntilNextIteration(initialTimestamp, count, duration, i + 1); // todo possibly it is i, different loop here
213+
}
214+
215+
if (processReplyTo && delivery.message().replyTo() != null) {
216+
String replyTo = delivery.message().replyTo();
217+
Message<Object> message = delivery.message();
218+
message.replyTo(null);
219+
try (Sender sender = connection.openSender(replyTo)) {
220+
sender.send(message);
221+
}
222+
}
223+
224+
int messageFormat = delivery.messageFormat();
225+
Message<Object> message = delivery.message();
226+
227+
// todo, is this what we mean?
228+
if (ssnAckMode != null && ssnAckMode == SsnAckMode.client) {
229+
delivery.accept();
230+
}
231+
232+
Map<String, Object> messageDict = messageFormatter.formatMessage(address, message, stringToBool(msgContentHashedString));
233+
switch (logMsgs) {
234+
case dict:
235+
messageFormatter.printMessageAsPython(messageDict);
236+
break;
237+
case interop:
238+
messageFormatter.printMessageAsJson(messageDict);
239+
break;
240+
}
241+
}
242+
243+
// TODO API usability, how do I do durable subscription with detach, resume, etc; no mention of unsubscribe in the client anywhere
244+
receiver.close(); // TODO want to do autoclosable, need helper func, that's all
245+
// receiver.detach();
246+
}
247+
248+
return 0;
249+
}
250+
251+
}

0 commit comments

Comments
 (0)