Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.grpc.internal.ClientStream;
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ClientTransportFactory.ClientTransportOptions;
import io.grpc.internal.DisconnectError;
import io.grpc.internal.FixedObjectPool;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.ObjectPool;
Expand Down Expand Up @@ -514,7 +515,7 @@ private static final class TestTransportListener implements ManagedClientTranspo
private final SettableFuture<Boolean> isTerminated = SettableFuture.create();

@Override
public void transportShutdown(Status shutdownStatus) {
public void transportShutdown(Status shutdownStatus, DisconnectError disconnectError) {
if (!this.shutdownStatus.set(shutdownStatus)) {
throw new IllegalStateException("transportShutdown() already called");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SimpleDisconnectError;
import io.grpc.internal.StatsTraceContext;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -303,7 +304,7 @@ public synchronized void shutdownNow(Status reason) {
@Override
@GuardedBy("this")
void notifyShutdown(Status status) {
clientTransportListener.transportShutdown(status);
clientTransportListener.transportShutdown(status, SimpleDisconnectError.UNKNOWN);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import io.grpc.internal.ClientTransport;
import io.grpc.internal.ClientTransportFactory.ClientTransportOptions;
import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.DisconnectError;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.InternalServer;
import io.grpc.internal.ManagedClientTransport;
Expand Down Expand Up @@ -336,7 +337,7 @@ public void clientIgnoresTransactionFromNonServerUids() throws Exception {
sendShutdownTransportTransactionAsUid(client, serverUid);

verify(mockClientTransportListener, timeout(TIMEOUT_MS))
.transportShutdown(statusCaptor.capture());
.transportShutdown(statusCaptor.capture(), any(DisconnectError.class));
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(statusCaptor.getValue().getDescription()).contains("shutdown");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ public ListenableFuture<SocketStats> getStats() {
}

/**
* Prevents creating any new streams. Buffered streams are not failed and may still proceed
* when {@link #reprocess} is called. The delayed transport will be terminated when there is no
* Prevents creating any new streams. Buffered streams are not failed and may still proceed
* when {@link #reprocess} is called. The delayed transport will be terminated when there is no
* more buffered streams.
*/
@Override
Expand All @@ -215,7 +215,7 @@ public final void shutdown(final Status status) {
syncContext.executeLater(new Runnable() {
@Override
public void run() {
listener.transportShutdown(status);
listener.transportShutdown(status, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN);
}
});
if (!hasPendingStreams() && reportTransportTerminated != null) {
Expand Down
34 changes: 34 additions & 0 deletions core/src/main/java/io/grpc/internal/DisconnectError.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2025 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.internal;

import javax.annotation.concurrent.Immutable;

/**
* Represents the reason for a subchannel disconnection.
* Implementations are either the SimpleDisconnectError enum or the GoAwayDisconnectError class for
* dynamic ones.
*/
@Immutable
public interface DisconnectError {
/**
* Returns the string representation suitable for use as an error tag.
*
* @return The formatted error tag string.
*/
String toErrorString();
}
64 changes: 64 additions & 0 deletions core/src/main/java/io/grpc/internal/GoAwayDisconnectError.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2025 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.internal;


import javax.annotation.concurrent.Immutable;

/**
* Represents a dynamic disconnection due to an HTTP/2 GOAWAY frame.
* This class is immutable and holds the specific error code from the frame.
*/
@Immutable
public final class GoAwayDisconnectError implements DisconnectError {
private static final String ERROR_TAG = "GOAWAY";
private final GrpcUtil.Http2Error errorCode;

/**
* Creates a GoAway reason.
*
* @param errorCode The specific, non-null HTTP/2 error code (e.g., "NO_ERROR").
*/
public GoAwayDisconnectError(GrpcUtil.Http2Error errorCode) {
if (errorCode == null) {
throw new NullPointerException("Http2Error cannot be null for GOAWAY");
}
this.errorCode = errorCode;
}

@Override
public String toErrorString() {
return ERROR_TAG + " " + errorCode.name();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
GoAwayDisconnectError goAwayDisconnectError = (GoAwayDisconnectError) o;
return errorCode == goAwayDisconnectError.errorCode;
}

@Override
public int hashCode() {
return errorCode.hashCode();
}
}
7 changes: 3 additions & 4 deletions core/src/main/java/io/grpc/internal/InternalSubchannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ public void run() {
}

/**
* Immediately attempt to reconnect if the current state is TRANSIENT_FAILURE. Otherwise this
* Immediately attempt to reconnect if the current state is TRANSIENT_FAILURE. Otherwise, this
* method has no effect.
*/
void resetConnectBackoff() {
Expand Down Expand Up @@ -620,7 +620,7 @@ public void transportInUse(boolean inUse) {
}

@Override
public void transportShutdown(final Status s) {
public void transportShutdown(final Status s, final DisconnectError disconnectError) {
channelLogger.log(
ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s));
shutdownInitiated = true;
Expand All @@ -639,8 +639,7 @@ public void run() {
NameResolver.ATTR_BACKEND_SERVICE),
/* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
EquivalentAddressGroup.ATTR_LOCALITY_NAME),
/* disconnectError= */ SubchannelMetrics.DisconnectError.UNKNOWN
.getErrorString(null),
/* disconnectError= */ disconnectError.toErrorString(),
/* securityLevel= */ extractSecurityLevel(addressIndex.getCurrentEagAttributes()
.get(GrpcAttributes.ATTR_SECURITY_LEVEL)));
} else if (pendingTransport == transport) {
Expand Down
36 changes: 32 additions & 4 deletions core/src/main/java/io/grpc/internal/KeepAliveManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;

/**
* Manages keepalive pings.
Expand Down Expand Up @@ -262,9 +263,34 @@ public interface KeepAlivePinger {
* Default client side {@link KeepAlivePinger}.
*/
public static final class ClientKeepAlivePinger implements KeepAlivePinger {
private final ConnectionClientTransport transport;

public ClientKeepAlivePinger(ConnectionClientTransport transport) {

/**
* A {@link ClientTransport} that has life-cycle management.
*
*/
@ThreadSafe
public interface TransportWithDisconnectReason extends ClientTransport {

/**
* Initiates an orderly shutdown of the transport. Existing streams continue, but the
* transport will not own any new streams. New streams will either fail (once
* {@link ManagedClientTransport.Listener#transportShutdown} callback called), or be
* transferred off this transport (in which case they may succeed). This method may only be
* called once.
*/
void shutdown(Status reason, DisconnectError disconnectError);

/**
* Initiates a forceful shutdown in which preexisting and new calls are closed. Existing calls
* should be closed with the provided {@code reason} and {@code disconnectError}.
*/
void shutdownNow(Status reason, DisconnectError disconnectError);
}

private final TransportWithDisconnectReason transport;

public ClientKeepAlivePinger(TransportWithDisconnectReason transport) {
this.transport = transport;
}

Expand All @@ -277,15 +303,17 @@ public void onSuccess(long roundTripTimeNanos) {}
@Override
public void onFailure(Status cause) {
transport.shutdownNow(Status.UNAVAILABLE.withDescription(
"Keepalive failed. The connection is likely gone"));
"Keepalive failed. The connection is likely gone"),
SimpleDisconnectError.CONNECTION_TIMED_OUT);
}
}, MoreExecutors.directExecutor());
}

@Override
public void onPingTimeout() {
transport.shutdownNow(Status.UNAVAILABLE.withDescription(
"Keepalive failed. The connection is likely gone"));
"Keepalive failed. The connection is likely gone"),
SimpleDisconnectError.CONNECTION_TIMED_OUT);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2056,7 +2056,7 @@ public String toString() {
*/
private final class DelayedTransportListener implements ManagedClientTransport.Listener {
@Override
public void transportShutdown(Status s) {
public void transportShutdown(Status s, DisconnectError e) {
checkState(shutdown.get(), "Channel must have been shut down");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ interface Listener {
* <p>This is called exactly once, and must be called prior to {@link #transportTerminated}.
*
* @param s the reason for the shutdown.
* @param e the disconnect error.
*/
void transportShutdown(Status s);
void transportShutdown(Status s, DisconnectError e);

/**
* The transport completed shutting down. All resources have been released. All streams have
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/io/grpc/internal/OobChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public ClientStream newStream(MethodDescriptor<?, ?> method,
this.channelz = Preconditions.checkNotNull(channelz);
this.delayedTransport.start(new ManagedClientTransport.Listener() {
@Override
public void transportShutdown(Status s) {
public void transportShutdown(Status s, DisconnectError e) {
// Don't care
}

Expand Down
68 changes: 68 additions & 0 deletions core/src/main/java/io/grpc/internal/SimpleDisconnectError.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2025 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.internal;

import javax.annotation.concurrent.Immutable;

/**
* Represents a fixed, static reason for disconnection.
*/
@Immutable
public enum SimpleDisconnectError implements DisconnectError {
/**
* The subchannel was shut down for various reasons like parent channel shutdown,
* idleness, or load balancing policy changes.
*/
SUBCHANNEL_SHUTDOWN("subchannel shutdown"),

/**
* Connection was reset (e.g., ECONNRESET, WSAECONNERESET).
*/
CONNECTION_RESET("connection reset"),

/**
* Connection timed out (e.g., ETIMEDOUT, WSAETIMEDOUT), including closures
* from gRPC keepalives.
*/
CONNECTION_TIMED_OUT("connection timed out"),

/**
* Connection was aborted (e.g., ECONNABORTED, WSAECONNABORTED).
*/
CONNECTION_ABORTED("connection aborted"),

/**
* Any socket error not covered by other specific disconnect errors.
*/
SOCKET_ERROR("socket error"),

/**
* A catch-all for any other unclassified reason.
*/
UNKNOWN("unknown");

private final String errorTag;

SimpleDisconnectError(String errorTag) {
this.errorTag = errorTag;
}

@Override
public String toErrorString() {
return this.errorTag;
}
}
Loading