Skip to content

Commit 682c672

Browse files
committed
suggested changes
1 parent b7fedc6 commit 682c672

File tree

8 files changed

+96
-75
lines changed

8 files changed

+96
-75
lines changed

binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ public synchronized void shutdownNow(Status reason) {
304304
@Override
305305
@GuardedBy("this")
306306
void notifyShutdown(Status status) {
307-
clientTransportListener.transportShutdown(status, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN);
307+
clientTransportListener.transportShutdown(status, SimpleDisconnectError.UNKNOWN);
308308
}
309309

310310
@Override

core/src/main/java/io/grpc/internal/ClientTransportWithDisconnectReason.java

Lines changed: 0 additions & 34 deletions
This file was deleted.

core/src/main/java/io/grpc/internal/KeepAliveManager.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.ScheduledExecutorService;
2828
import java.util.concurrent.ScheduledFuture;
2929
import java.util.concurrent.TimeUnit;
30+
import javax.annotation.concurrent.ThreadSafe;
3031

3132
/**
3233
* Manages keepalive pings.
@@ -262,9 +263,34 @@ public interface KeepAlivePinger {
262263
* Default client side {@link KeepAlivePinger}.
263264
*/
264265
public static final class ClientKeepAlivePinger implements KeepAlivePinger {
265-
private final ClientTransportWithDisconnectReason transport;
266266

267-
public ClientKeepAlivePinger(ClientTransportWithDisconnectReason transport) {
267+
268+
/**
269+
* A {@link ClientTransport} that has life-cycle management.
270+
*
271+
*/
272+
@ThreadSafe
273+
public interface TransportWithDisconnectReason extends ClientTransport {
274+
275+
/**
276+
* Initiates an orderly shutdown of the transport. Existing streams continue, but the
277+
* transport will not own any new streams. New streams will either fail (once
278+
* {@link ManagedClientTransport.Listener#transportShutdown} callback called), or be
279+
* transferred off this transport (in which case they may succeed). This method may only be
280+
* called once.
281+
*/
282+
void shutdown(Status reason, DisconnectError disconnectError);
283+
284+
/**
285+
* Initiates a forceful shutdown in which preexisting and new calls are closed. Existing calls
286+
* should be closed with the provided {@code reason} and {@code disconnectError}.
287+
*/
288+
void shutdownNow(Status reason, DisconnectError disconnectError);
289+
}
290+
291+
private final TransportWithDisconnectReason transport;
292+
293+
public ClientKeepAlivePinger(TransportWithDisconnectReason transport) {
268294
this.transport = transport;
269295
}
270296

core/src/test/java/io/grpc/internal/KeepAliveManagerTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ public void keepAlivePingDelayedByIncomingData() {
105105

106106
@Test
107107
public void clientKeepAlivePinger_pingTimeout() {
108-
ClientTransportWithDisconnectReason transport =
109-
mock(ClientTransportWithDisconnectReason.class);
108+
ClientKeepAlivePinger.TransportWithDisconnectReason transport =
109+
mock(ClientKeepAlivePinger.TransportWithDisconnectReason.class);
110110
ClientKeepAlivePinger pinger = new ClientKeepAlivePinger(transport);
111111

112112
pinger.onPingTimeout();
@@ -122,8 +122,8 @@ public void clientKeepAlivePinger_pingTimeout() {
122122

123123
@Test
124124
public void clientKeepAlivePinger_pingFailure() {
125-
ClientTransportWithDisconnectReason transport =
126-
mock(ClientTransportWithDisconnectReason.class);
125+
ClientKeepAlivePinger.TransportWithDisconnectReason transport =
126+
mock(ClientKeepAlivePinger.TransportWithDisconnectReason.class);
127127
ClientKeepAlivePinger pinger = new ClientKeepAlivePinger(transport);
128128
pinger.ping();
129129
ArgumentCaptor<ClientTransport.PingCallback> pingCallbackCaptor =

core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,7 @@ public void shutdownNowKillsClientStream() throws Exception {
509509
client = null;
510510

511511
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class),
512-
eq(SimpleDisconnectError.SUBCHANNEL_SHUTDOWN));
512+
any(DisconnectError.class));
513513
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
514514
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(false);
515515
assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS));

netty/src/main/java/io/grpc/netty/NettyClientTransport.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import io.grpc.MethodDescriptor;
3737
import io.grpc.Status;
3838
import io.grpc.internal.ClientStream;
39-
import io.grpc.internal.ClientTransportWithDisconnectReason;
4039
import io.grpc.internal.ConnectionClientTransport;
4140
import io.grpc.internal.DisconnectError;
4241
import io.grpc.internal.FailingClientStream;
@@ -72,7 +71,7 @@
7271
* A Netty-based {@link ConnectionClientTransport} implementation.
7372
*/
7473
class NettyClientTransport implements ConnectionClientTransport,
75-
ClientTransportWithDisconnectReason {
74+
ClientKeepAlivePinger.TransportWithDisconnectReason {
7675

7776
private final InternalLogId logId;
7877
private final Map<ChannelOption<?>, ?> channelOptions;
@@ -351,6 +350,11 @@ public void operationComplete(ChannelFuture future) throws Exception {
351350

352351
@Override
353352
public void shutdown(Status reason) {
353+
shutdown(reason, SimpleDisconnectError.UNKNOWN);
354+
}
355+
356+
@Override
357+
public void shutdown(Status reason, DisconnectError disconnectError) {
354358
// start() could have failed
355359
if (channel == null) {
356360
return;
@@ -373,7 +377,7 @@ public void shutdownNow(final Status reason, DisconnectError disconnectError) {
373377
handler.getWriteQueue().enqueue(new Runnable() {
374378
@Override
375379
public void run() {
376-
lifecycleManager.notifyShutdown(reason, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN);
380+
lifecycleManager.notifyShutdown(reason, disconnectError);
377381
channel.write(new ForcefulCloseCommand(reason));
378382
}
379383
}, true);

okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import io.grpc.TlsChannelCredentials;
4848
import io.grpc.internal.CertificateUtils;
4949
import io.grpc.internal.ClientStreamListener.RpcProgress;
50-
import io.grpc.internal.ClientTransportWithDisconnectReason;
5150
import io.grpc.internal.ConnectionClientTransport;
5251
import io.grpc.internal.DisconnectError;
5352
import io.grpc.internal.GoAwayDisconnectError;
@@ -133,7 +132,7 @@
133132
* A okhttp-based {@link ConnectionClientTransport} implementation.
134133
*/
135134
class OkHttpClientTransport implements ConnectionClientTransport, TransportExceptionHandler,
136-
OutboundFlowController.Transport, ClientTransportWithDisconnectReason {
135+
OutboundFlowController.Transport, ClientKeepAlivePinger.TransportWithDisconnectReason {
137136
private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS = buildErrorCodeToStatusMap();
138137
private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName());
139138
private static final String GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK =
@@ -999,6 +998,19 @@ public void shutdown(Status reason) {
999998
}
1000999
}
10011000

1001+
@Override
1002+
public void shutdown(Status reason, DisconnectError disconnectError) {
1003+
synchronized (lock) {
1004+
if (goAwayStatus != null) {
1005+
return;
1006+
}
1007+
1008+
goAwayStatus = reason;
1009+
listener.transportShutdown(goAwayStatus, disconnectError);
1010+
stopIfNecessary();
1011+
}
1012+
}
1013+
10021014
@Override
10031015
public void shutdownNow(Status reason) {
10041016
shutdownNow(reason, SimpleDisconnectError.UNKNOWN);

okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1835,12 +1835,16 @@ public void unreachableServer() throws Exception {
18351835

18361836
ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class);
18371837
clientTransport.start(listener);
1838-
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
1839-
verify(listener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture(),
1840-
any(DisconnectError.class));
1841-
Status status = captor.getValue();
1838+
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
1839+
ArgumentCaptor<DisconnectError> errorCaptor = ArgumentCaptor.forClass(DisconnectError.class);
1840+
1841+
verify(listener, timeout(TIME_OUT_MS)).transportShutdown(statusCaptor.capture(),
1842+
errorCaptor.capture());
1843+
Status status = statusCaptor.getValue();
1844+
DisconnectError error = errorCaptor.getValue();
18421845
assertEquals(Status.UNAVAILABLE.getCode(), status.getCode());
18431846
assertTrue(status.getCause().toString(), status.getCause() instanceof IOException);
1847+
assertEquals(new GoAwayDisconnectError(GrpcUtil.Http2Error.INTERNAL_ERROR), error);
18441848

18451849
MockStreamListener streamListener = new MockStreamListener();
18461850
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers)
@@ -1867,11 +1871,14 @@ public void customSocketFactory() throws Exception {
18671871

18681872
ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class);
18691873
clientTransport.start(listener);
1870-
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
1871-
verify(listener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture(),
1872-
any(DisconnectError.class));
1873-
Status status = captor.getValue();
1874+
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
1875+
ArgumentCaptor<DisconnectError> errorCaptor = ArgumentCaptor.forClass(DisconnectError.class);
1876+
verify(listener, timeout(TIME_OUT_MS)).transportShutdown(statusCaptor.capture(),
1877+
errorCaptor.capture());
1878+
Status status = statusCaptor.getValue();
1879+
DisconnectError error = errorCaptor.getValue();
18741880
assertEquals(Status.UNAVAILABLE.getCode(), status.getCode());
1881+
assertEquals(new GoAwayDisconnectError(GrpcUtil.Http2Error.INTERNAL_ERROR), error);
18751882
assertSame(exception, status.getCause());
18761883
}
18771884

@@ -1960,18 +1967,21 @@ public void proxy_500() throws Exception {
19601967

19611968
assertEquals(-1, sock.getInputStream().read());
19621969

1963-
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
1964-
verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture(),
1965-
any(DisconnectError.class));
1966-
Status error = captor.getValue();
1967-
assertTrue("Status didn't contain error code: " + captor.getValue(),
1968-
error.getDescription().contains("500"));
1969-
assertTrue("Status didn't contain error description: " + captor.getValue(),
1970-
error.getDescription().contains("OH NO"));
1971-
assertTrue("Status didn't contain error text: " + captor.getValue(),
1972-
error.getDescription().contains(errorText));
1973-
assertEquals("Not UNAVAILABLE: " + captor.getValue(),
1974-
Status.UNAVAILABLE.getCode(), error.getCode());
1970+
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
1971+
ArgumentCaptor<DisconnectError> errorCaptor = ArgumentCaptor.forClass(DisconnectError.class);
1972+
verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(statusCaptor.capture(),
1973+
errorCaptor.capture());
1974+
Status status = statusCaptor.getValue();
1975+
DisconnectError error = errorCaptor.getValue();
1976+
assertTrue("Status didn't contain error code: " + statusCaptor.getValue(),
1977+
status.getDescription().contains("500"));
1978+
assertTrue("Status didn't contain error description: " + statusCaptor.getValue(),
1979+
status.getDescription().contains("OH NO"));
1980+
assertTrue("Status didn't contain error text: " + statusCaptor.getValue(),
1981+
status.getDescription().contains(errorText));
1982+
assertEquals("Not UNAVAILABLE: " + statusCaptor.getValue(),
1983+
Status.UNAVAILABLE.getCode(), status.getCode());
1984+
assertEquals(new GoAwayDisconnectError(GrpcUtil.Http2Error.INTERNAL_ERROR), error);
19751985
sock.close();
19761986
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
19771987
}
@@ -1998,14 +2008,17 @@ public void proxy_immediateServerClose() throws Exception {
19982008
serverSocket.close();
19992009
sock.close();
20002010

2001-
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
2002-
verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture(),
2003-
any(DisconnectError.class));
2004-
Status error = captor.getValue();
2005-
assertTrue("Status didn't contain proxy: " + captor.getValue(),
2006-
error.getDescription().contains("proxy"));
2007-
assertEquals("Not UNAVAILABLE: " + captor.getValue(),
2008-
Status.UNAVAILABLE.getCode(), error.getCode());
2011+
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
2012+
ArgumentCaptor<DisconnectError> errorCaptor = ArgumentCaptor.forClass(DisconnectError.class);
2013+
verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(statusCaptor.capture(),
2014+
errorCaptor.capture());
2015+
Status status = statusCaptor.getValue();
2016+
DisconnectError error = errorCaptor.getValue();
2017+
assertTrue("Status didn't contain proxy: " + statusCaptor.getValue(),
2018+
status.getDescription().contains("proxy"));
2019+
assertEquals("Not UNAVAILABLE: " + statusCaptor.getValue(),
2020+
Status.UNAVAILABLE.getCode(), status.getCode());
2021+
assertEquals(SimpleDisconnectError.SUBCHANNEL_SHUTDOWN, error);
20092022
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
20102023
}
20112024

0 commit comments

Comments
 (0)