Skip to content

Commit fefd3b5

Browse files
authored
Merge branch 'apache:main' into runAsNonRoot
2 parents 4d15090 + 526e83b commit fefd3b5

File tree

12 files changed

+58
-59
lines changed

12 files changed

+58
-59
lines changed

activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,22 @@ public AmqpNioSslTransport(WireFormat wireFormat, Socket socket,
5454
@Override
5555
protected void initializeStreams() throws IOException {
5656
super.initializeStreams();
57+
if (initBuffer != null) {
58+
initBuffer.buffer.flip();
59+
// If we are processing the initial buffer from the auto transport,
60+
// then process first. This generally should only have 8 bytes from
61+
// the initial read
62+
if (initBuffer.buffer.hasRemaining()) {
63+
receiveCounter.addAndGet(initBuffer.readSize);
64+
try {
65+
// one call is all that is needed to consume all data
66+
processCommand(initBuffer.buffer);
67+
} catch (Exception e) {
68+
throw new IOException(e);
69+
}
70+
initBuffer.buffer.clear();
71+
}
72+
}
5773
if (inputBuffer.position() != 0 && inputBuffer.hasRemaining()) {
5874
serviceRead();
5975
}
@@ -76,23 +92,4 @@ protected void doInit() throws Exception {
7692
super.doInit();
7793
}
7894

79-
@Override
80-
protected int secureRead(ByteBuffer plain) throws Exception {
81-
if (initBuffer != null) {
82-
initBuffer.buffer.flip();
83-
if (initBuffer.buffer.hasRemaining()) {
84-
plain.flip();
85-
for (int i =0; i < 8; i++) {
86-
plain.put(initBuffer.buffer.get());
87-
}
88-
plain.flip();
89-
processCommand(plain);
90-
initBuffer.buffer.clear();
91-
return 8;
92-
}
93-
}
94-
return super.secureRead(plain);
95-
}
96-
97-
98-
}
95+
}

activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ private void serviceRead() {
140140
}
141141

142142
protected void processBuffer(ByteBuffer buffer, int readSize) throws Exception {
143-
receiveCounter += readSize;
143+
receiveCounter.addAndGet(readSize);
144144

145145
buffer.flip();
146146
frameReader.parse(buffer);
@@ -164,4 +164,4 @@ protected void doStop(ServiceStopper stopper) throws Exception {
164164
super.doStop(stopper);
165165
}
166166
}
167-
}
167+
}

activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ public void run() {
290290

291291
try {
292292
//If this fails and throws an exception and the socket will be closed
293-
waitForProtocolDetectionFinish(future, readBytes);
293+
waitForProtocolDetectionFinish(future, readBytes.get());
294294
} finally {
295295
//call cancel in case task didn't complete
296296
future.cancel(true);
@@ -311,7 +311,7 @@ public void run() {
311311
return new TransportInfo(format, transport, protocolInfo.detectedTransportFactory);
312312
}
313313

314-
protected void waitForProtocolDetectionFinish(final Future<?> future, final AtomicInteger readBytes) throws Exception {
314+
protected void waitForProtocolDetectionFinish(final Future<?> future, final int readBytes) throws Exception {
315315
try {
316316
//Wait for protocolDetectionTimeOut if defined
317317
if (protocolDetectionTimeOut > 0) {
@@ -321,7 +321,7 @@ protected void waitForProtocolDetectionFinish(final Future<?> future, final Atom
321321
}
322322
} catch (TimeoutException e) {
323323
throw new InactivityIOException("Client timed out before wire format could be detected. " +
324-
" 8 bytes are required to detect the protocol but only: " + readBytes.get() + " byte(s) were sent.");
324+
" 8 bytes are required to detect the protocol but only: " + readBytes + " byte(s) were sent.");
325325
}
326326
}
327327

activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,20 +145,20 @@ public void run() {
145145
//to be told when bytes are ready
146146
in.serviceRead();
147147
attempts++;
148-
} while(in.getReadSize().get() < 8 && !Thread.interrupted());
148+
} while(in.getReceiveCounter() < 8 && !Thread.interrupted());
149149
}
150150
});
151151

152152
try {
153153
//If this fails and throws an exception and the socket will be closed
154-
waitForProtocolDetectionFinish(future, in.getReadSize());
154+
waitForProtocolDetectionFinish(future, in.getReceiveCounter());
155155
} finally {
156156
//call cancel in case task didn't complete which will interrupt the task
157157
future.cancel(true);
158158
}
159159
in.stop();
160160

161-
InitBuffer initBuffer = new InitBuffer(in.getReadSize().get(), ByteBuffer.allocate(in.getReadData().length));
161+
InitBuffer initBuffer = new InitBuffer(in.getReceiveCounter(), ByteBuffer.allocate(in.getReadData().length));
162162
initBuffer.buffer.put(in.getReadData());
163163

164164
ProtocolInfo protocolInfo = detectProtocol(in.getReadData());

activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -148,16 +148,10 @@ public SSLEngine getSslSession() {
148148

149149
private volatile byte[] readData;
150150

151-
private final AtomicInteger readSize = new AtomicInteger();
152-
153151
public byte[] getReadData() {
154152
return readData != null ? readData : new byte[0];
155153
}
156154

157-
public AtomicInteger getReadSize() {
158-
return readSize;
159-
}
160-
161155
@Override
162156
public void serviceRead() {
163157
try {
@@ -187,14 +181,13 @@ public void serviceRead() {
187181
break;
188182
}
189183

190-
receiveCounter += readCount;
191-
readSize.addAndGet(readCount);
184+
receiveCounter.addAndGet(readCount);
192185
}
193186

194187
if (status == SSLEngineResult.Status.OK && handshakeStatus != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) {
195188
processCommand(plain);
196189
//we have received enough bytes to detect the protocol
197-
if (receiveCounter >= 8) {
190+
if (receiveCounter.get() >= 8) {
198191
break;
199192
}
200193
}
@@ -208,7 +201,7 @@ public void serviceRead() {
208201

209202
@Override
210203
protected void processCommand(ByteBuffer plain) throws Exception {
211-
ByteBuffer newBuffer = ByteBuffer.allocate(receiveCounter);
204+
ByteBuffer newBuffer = ByteBuffer.allocate(receiveCounter.get());
212205
if (readData != null) {
213206
newBuffer.put(readData);
214207
}

activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -215,19 +215,27 @@ public void run() {
215215
}
216216

217217
//Only used for the auto transport to abort the openwire init method early if already initialized
218-
boolean openWireInititialized = false;
218+
boolean openWireInitialized = false;
219219

220220
protected void doOpenWireInit() throws Exception {
221221
//Do this later to let wire format negotiation happen
222-
if (initBuffer != null && !openWireInititialized && this.wireFormat instanceof OpenWireFormat) {
222+
if (initBuffer != null && !openWireInitialized && this.wireFormat instanceof OpenWireFormat) {
223223
initBuffer.buffer.flip();
224224
if (initBuffer.buffer.hasRemaining()) {
225225
nextFrameSize = -1;
226-
receiveCounter += initBuffer.readSize;
227-
processCommand(initBuffer.buffer);
228-
processCommand(initBuffer.buffer);
226+
receiveCounter.addAndGet(initBuffer.readSize);
227+
do {
228+
// This should almost always just be called 2 times, the first call reads
229+
// the size and allocates space for the frame. The second call reads
230+
// in the frame to process. This is enough to read in the initial WireFormatInfo
231+
// frame that will be sent. However, it's technically possible for
232+
// there to be extra data after that if more bytes came in during the initial
233+
// socket read if a client sends more, so keep calling until we process the
234+
// entire initial buffer before we continue so we do not miss any bytes.
235+
processCommand(initBuffer.buffer);
236+
} while (initBuffer.buffer.hasRemaining());
229237
initBuffer.buffer.clear();
230-
openWireInititialized = true;
238+
openWireInitialized = true;
231239
}
232240
}
233241
}
@@ -277,7 +285,7 @@ public void serviceRead() {
277285
break;
278286
}
279287

280-
receiveCounter += readCount;
288+
receiveCounter.addAndGet(readCount);
281289
}
282290

283291
if (status == SSLEngineResult.Status.OK && handshakeStatus != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) {

activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ protected void serviceRead() {
124124
break;
125125
}
126126

127-
this.receiveCounter += readSize;
127+
this.receiveCounter.addAndGet(readSize);
128128
if (currentBuffer.hasRemaining()) {
129129
continue;
130130
}

activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.Map;
3535
import java.util.concurrent.CountDownLatch;
3636
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.atomic.AtomicInteger;
3738
import java.util.concurrent.atomic.AtomicReference;
3839

3940
import javax.net.SocketFactory;
@@ -130,8 +131,8 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
130131
protected boolean useLocalHost = false;
131132
protected int minmumWireFormatVersion;
132133
protected SocketFactory socketFactory;
133-
protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
134-
protected volatile int receiveCounter;
134+
protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<>();
135+
protected final AtomicInteger receiveCounter = new AtomicInteger();
135136

136137
protected Map<String, Object> socketOptions;
137138
private int soLinger = Integer.MIN_VALUE;
@@ -615,22 +616,22 @@ protected void initializeStreams() throws Exception {
615616
TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) {
616617
@Override
617618
public int read() throws IOException {
618-
receiveCounter++;
619+
receiveCounter.incrementAndGet();
619620
return super.read();
620621
}
621622
@Override
622623
public int read(byte[] b, int off, int len) throws IOException {
623-
receiveCounter++;
624+
receiveCounter.incrementAndGet();
624625
return super.read(b, off, len);
625626
}
626627
@Override
627628
public long skip(long n) throws IOException {
628-
receiveCounter++;
629+
receiveCounter.incrementAndGet();
629630
return super.skip(n);
630631
}
631632
@Override
632633
protected void fill() throws IOException {
633-
receiveCounter++;
634+
receiveCounter.incrementAndGet();
634635
super.fill();
635636
}
636637
};
@@ -684,7 +685,7 @@ public <T> T narrow(Class<T> target) {
684685

685686
@Override
686687
public int getReceiveCounter() {
687-
return receiveCounter;
688+
return receiveCounter.get();
688689
}
689690

690691
public static class InitBuffer {

activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,12 @@ protected void processCommand(ByteBuffer plain) throws Exception {
7070
protected void doInit() throws Exception {
7171
if (initBuffer != null) {
7272
nextFrameSize = -1;
73-
receiveCounter += initBuffer.readSize;
73+
receiveCounter.addAndGet(initBuffer.readSize);
7474
initBuffer.buffer.flip();
7575
processCommand(initBuffer.buffer);
7676
}
7777
super.doInit();
7878
}
7979

8080

81-
}
81+
}

activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ protected void processBuffer(ByteBuffer buffer, int readSize) throws Exception {
131131
DataByteArrayInputStream dis = new DataByteArrayInputStream(buffer.array());
132132
codec.parse(dis, readSize);
133133

134-
receiveCounter += readSize;
134+
receiveCounter.addAndGet(readSize);
135135

136136
// clear the buffer
137137
buffer.clear();
@@ -154,4 +154,4 @@ protected void doStop(ServiceStopper stopper) throws Exception {
154154
super.doStop(stopper);
155155
}
156156
}
157-
}
157+
}

0 commit comments

Comments
 (0)