Skip to content

Commit c2faab3

Browse files
authored
Solve file leakage and turn JDK socket support on by default (#8752)
* Add test for file descriptors * Fix closing technique * Add tests for socket and stream connection and closing * Fix closing technique again * Clean implementation and tests to match expected Socket behavior * Remove comments * Change default JDK_SOCKET_ENABLED to true * Address PR comments about socket closing and buffer exceptions
1 parent 5098c6f commit c2faab3

File tree

3 files changed

+227
-34
lines changed

3 files changed

+227
-34
lines changed

internal-api/src/main/java/datadog/trace/api/Config.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2025,7 +2025,7 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment())
20252025

20262026
this.apmTracingEnabled = configProvider.getBoolean(GeneralConfig.APM_TRACING_ENABLED, true);
20272027

2028-
this.jdkSocketEnabled = configProvider.getBoolean(JDK_SOCKET_ENABLED, false);
2028+
this.jdkSocketEnabled = configProvider.getBoolean(JDK_SOCKET_ENABLED, true);
20292029

20302030
log.debug("New instance: {}", this);
20312031
}

utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java

Lines changed: 71 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ final class TunnelingJdkSocket extends Socket {
2929
private InetSocketAddress inetSocketAddress;
3030

3131
private SocketChannel unixSocketChannel;
32+
private Selector selector;
3233

3334
private int timeout;
3435
private boolean shutIn;
@@ -90,6 +91,9 @@ public synchronized int getSoTimeout() throws SocketException {
9091

9192
@Override
9293
public void connect(final SocketAddress endpoint) throws IOException {
94+
if (endpoint == null) {
95+
throw new IllegalArgumentException("Endpoint cannot be null");
96+
}
9397
if (isClosed()) {
9498
throw new SocketException("Socket is closed");
9599
}
@@ -105,6 +109,12 @@ public void connect(final SocketAddress endpoint) throws IOException {
105109
// https://github.com/jnr/jnr-unixsocket/blob/master/src/main/java/jnr/unixsocket/UnixSocket.java#L89-L97
106110
@Override
107111
public void connect(final SocketAddress endpoint, final int timeout) throws IOException {
112+
if (endpoint == null) {
113+
throw new IllegalArgumentException("Endpoint cannot be null");
114+
}
115+
if (timeout < 0) {
116+
throw new IllegalArgumentException("Timeout cannot be negative");
117+
}
108118
if (isClosed()) {
109119
throw new SocketException("Socket is closed");
110120
}
@@ -122,17 +132,19 @@ public SocketChannel getChannel() {
122132

123133
@Override
124134
public void setSendBufferSize(int size) throws SocketException {
135+
if (size <= 0) {
136+
throw new IllegalArgumentException("Invalid send buffer size");
137+
}
125138
if (isClosed()) {
126139
throw new SocketException("Socket is closed");
127140
}
128-
if (size < 0) {
129-
throw new IllegalArgumentException("Invalid send buffer size");
130-
}
141+
sendBufferSize = size;
131142
try {
132143
unixSocketChannel.setOption(java.net.StandardSocketOptions.SO_SNDBUF, size);
133-
sendBufferSize = size;
134144
} catch (IOException e) {
135-
throw new SocketException("Failed to set send buffer size");
145+
SocketException se = new SocketException("Failed to set send buffer size socket option");
146+
se.initCause(e);
147+
throw se;
136148
}
137149
}
138150

@@ -149,17 +161,19 @@ public int getSendBufferSize() throws SocketException {
149161

150162
@Override
151163
public void setReceiveBufferSize(int size) throws SocketException {
164+
if (size <= 0) {
165+
throw new IllegalArgumentException("Invalid receive buffer size");
166+
}
152167
if (isClosed()) {
153168
throw new SocketException("Socket is closed");
154169
}
155-
if (size < 0) {
156-
throw new IllegalArgumentException("Invalid receive buffer size");
157-
}
170+
receiveBufferSize = size;
158171
try {
159172
unixSocketChannel.setOption(java.net.StandardSocketOptions.SO_RCVBUF, size);
160-
receiveBufferSize = size;
161173
} catch (IOException e) {
162-
throw new SocketException("Failed to set receive buffer size");
174+
SocketException se = new SocketException("Failed to set receive buffer size socket option");
175+
se.initCause(e);
176+
throw se;
163177
}
164178
}
165179

@@ -196,14 +210,14 @@ public InputStream getInputStream() throws IOException {
196210
throw new SocketException("Socket input is shutdown");
197211
}
198212

213+
if (selector == null) {
214+
selector = Selector.open();
215+
unixSocketChannel.configureBlocking(false);
216+
unixSocketChannel.register(selector, SelectionKey.OP_READ);
217+
}
218+
199219
return new InputStream() {
200220
private final ByteBuffer buffer = ByteBuffer.allocate(getStreamBufferSize());
201-
private final Selector selector = Selector.open();
202-
203-
{
204-
unixSocketChannel.configureBlocking(false);
205-
unixSocketChannel.register(selector, SelectionKey.OP_READ);
206-
}
207221

208222
@Override
209223
public int read() throws IOException {
@@ -213,6 +227,9 @@ public int read() throws IOException {
213227

214228
@Override
215229
public int read(byte[] b, int off, int len) throws IOException {
230+
if (isInputShutdown()) {
231+
return -1;
232+
}
216233
buffer.clear();
217234

218235
int readyChannels = selector.select(timeout);
@@ -241,7 +258,7 @@ public int read(byte[] b, int off, int len) throws IOException {
241258

242259
@Override
243260
public void close() throws IOException {
244-
selector.close();
261+
TunnelingJdkSocket.this.close();
245262
}
246263
};
247264
}
@@ -254,7 +271,7 @@ public OutputStream getOutputStream() throws IOException {
254271
if (!isConnected()) {
255272
throw new SocketException("Socket is not connected");
256273
}
257-
if (isInputShutdown()) {
274+
if (isOutputShutdown()) {
258275
throw new SocketException("Socket output is shutdown");
259276
}
260277

@@ -267,12 +284,19 @@ public void write(int b) throws IOException {
267284

268285
@Override
269286
public void write(byte[] b, int off, int len) throws IOException {
287+
if (isOutputShutdown()) {
288+
throw new IOException("Stream closed");
289+
}
270290
ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
271-
272291
while (buffer.hasRemaining()) {
273292
unixSocketChannel.write(buffer);
274293
}
275294
}
295+
296+
@Override
297+
public void close() throws IOException {
298+
TunnelingJdkSocket.this.close();
299+
}
276300
};
277301
}
278302

@@ -308,6 +332,9 @@ public void shutdownOutput() throws IOException {
308332

309333
@Override
310334
public InetAddress getInetAddress() {
335+
if (!isConnected()) {
336+
return null;
337+
}
311338
return inetSocketAddress.getAddress();
312339
}
313340

@@ -316,8 +343,31 @@ public void close() throws IOException {
316343
if (isClosed()) {
317344
return;
318345
}
319-
if (null != unixSocketChannel) {
320-
unixSocketChannel.close();
346+
// Ignore possible exceptions so that we continue closing the socket
347+
try {
348+
if (!isInputShutdown()) {
349+
shutdownInput();
350+
}
351+
} catch (IOException e) {
352+
}
353+
try {
354+
if (!isOutputShutdown()) {
355+
shutdownOutput();
356+
}
357+
} catch (IOException e) {
358+
}
359+
try {
360+
if (selector != null) {
361+
selector.close();
362+
selector = null;
363+
}
364+
} catch (IOException e) {
365+
}
366+
try {
367+
if (unixSocketChannel != null) {
368+
unixSocketChannel.close();
369+
}
370+
} catch (IOException e) {
321371
}
322372
closed = true;
323373
}

0 commit comments

Comments
 (0)