Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions grpclb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ java_library(
"//context",
"//core:internal",
"//stub",
"//util",
"@com_google_protobuf//:protobuf_java_util",
"@io_grpc_grpc_proto//:grpclb_load_balancer_java_proto",
artifact("com.google.code.findbugs:jsr305"),
Expand Down
1 change: 1 addition & 0 deletions grpclb/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
implementation project(':grpc-core'),
project(':grpc-protobuf'),
project(':grpc-stub'),
project(':grpc-util'),
libraries.guava,
libraries.protobuf.java,
libraries.protobuf.java.util
Expand Down
226 changes: 153 additions & 73 deletions grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@
import io.grpc.ConnectivityStateInfo;
import io.grpc.Context;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer.CreateSubchannelArgs;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.FixedResultPicker;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
Expand All @@ -62,6 +65,7 @@
import io.grpc.lb.v1.Server;
import io.grpc.lb.v1.ServerList;
import io.grpc.stub.StreamObserver;
import io.grpc.util.ForwardingLoadBalancerHelper;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
Expand Down Expand Up @@ -119,7 +123,7 @@ final class GrpclbState {
@VisibleForTesting
static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() {
@Override
public PickResult picked(Metadata headers) {
public PickResult picked(PickSubchannelArgs args) {
return PickResult.withNoResult();
}

Expand Down Expand Up @@ -187,6 +191,15 @@ enum Mode {
new RoundRobinPicker(Collections.<DropEntry>emptyList(), Arrays.asList(BUFFER_ENTRY));
private boolean requestConnectionPending;

// Child LoadBalancer and state for PICK_FIRST mode delegation.
private final LoadBalancerProvider pickFirstLbProvider;
@Nullable
private LoadBalancer pickFirstLb;
private ConnectivityState pickFirstLbState = CONNECTING;
private SubchannelPicker pickFirstLbPicker = new FixedResultPicker(PickResult.withNoResult());
@Nullable
private GrpclbClientLoadRecorder currentPickFirstLoadRecorder;

GrpclbState(
GrpclbConfig config,
Helper helper,
Expand All @@ -212,6 +225,9 @@ public void onSubchannelState(
} else {
this.subchannelPool = null;
}
this.pickFirstLbProvider = checkNotNull(
LoadBalancerRegistry.getDefaultRegistry().getProvider("pick_first"),
"pick_first balancer not available");
this.time = checkNotNull(time, "time provider");
this.stopwatch = checkNotNull(stopwatch, "stopwatch");
this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timerService");
Expand Down Expand Up @@ -309,6 +325,12 @@ void handleAddresses(

void requestConnection() {
requestConnectionPending = true;
// For PICK_FIRST mode with delegation, forward to the child LB.
if (config.getMode() == Mode.PICK_FIRST && pickFirstLb != null) {
pickFirstLb.requestConnection();
requestConnectionPending = false;
return;
}
for (RoundRobinEntry entry : currentPicker.pickList) {
if (entry instanceof IdleSubchannelEntry) {
((IdleSubchannelEntry) entry).subchannel.requestConnection();
Expand All @@ -323,15 +345,23 @@ private void maybeUseFallbackBackends() {
}
// Balancer RPC should have either been broken or timed out.
checkState(fallbackReason != null, "no reason to fallback");
for (Subchannel subchannel : subchannels.values()) {
ConnectivityStateInfo stateInfo = subchannel.getAttributes().get(STATE_INFO).get();
if (stateInfo.getState() == READY) {
// For PICK_FIRST mode with delegation, check the child LB's state.
if (config.getMode() == Mode.PICK_FIRST) {
if (pickFirstLb != null && pickFirstLbState == READY) {
return;
}
// If we do have balancer-provided backends, use one of its error in the error message if
// fail to fallback.
if (stateInfo.getState() == TRANSIENT_FAILURE) {
fallbackReason = stateInfo.getStatus();
// For PICK_FIRST, we don't have individual subchannel states to use as fallback reason.
} else {
for (Subchannel subchannel : subchannels.values()) {
ConnectivityStateInfo stateInfo = subchannel.getAttributes().get(STATE_INFO).get();
if (stateInfo.getState() == READY) {
return;
}
// If we do have balancer-provided backends, use one of its error in the error message if
// fail to fallback.
if (stateInfo.getState() == TRANSIENT_FAILURE) {
fallbackReason = stateInfo.getStatus();
}
}
}
// Fallback conditions met
Expand Down Expand Up @@ -438,9 +468,10 @@ void shutdown() {
subchannelPool.clear();
break;
case PICK_FIRST:
if (!subchannels.isEmpty()) {
checkState(subchannels.size() == 1, "Excessive Subchannels: %s", subchannels);
subchannels.values().iterator().next().shutdown();
// Shutdown the child pick_first LB which manages its own subchannels.
if (pickFirstLb != null) {
pickFirstLb.shutdown();
pickFirstLb = null;
}
break;
default:
Expand Down Expand Up @@ -517,22 +548,17 @@ private void updateServerList(
subchannels = Collections.unmodifiableMap(newSubchannelMap);
break;
case PICK_FIRST:
checkState(subchannels.size() <= 1, "Unexpected Subchannel count: %s", subchannels);
final Subchannel subchannel;
// Delegate to child pick_first LB for address management.
// Shutdown existing child LB if addresses become empty.
if (newBackendAddrList.isEmpty()) {
if (subchannels.size() == 1) {
subchannel = subchannels.values().iterator().next();
subchannel.shutdown();
subchannels = Collections.emptyMap();
if (pickFirstLb != null) {
pickFirstLb.shutdown();
pickFirstLb = null;
}
break;
}
List<EquivalentAddressGroup> eagList = new ArrayList<>();
// Because for PICK_FIRST, we create a single Subchannel for all addresses, we have to
// attach the tokens to the EAG attributes and use TokenAttachingLoadRecorder to put them on
// headers.
//
// The PICK_FIRST code path doesn't cache Subchannels.
// Attach tokens to EAG attributes for TokenAttachingTracerFactory to retrieve.
for (BackendAddressGroup bag : newBackendAddrList) {
EquivalentAddressGroup origEag = bag.getAddresses();
Attributes eagAttrs = origEag.getAttributes();
Expand All @@ -542,30 +568,22 @@ private void updateServerList(
}
eagList.add(new EquivalentAddressGroup(origEag.getAddresses(), eagAttrs));
}
if (subchannels.isEmpty()) {
subchannel =
helper.createSubchannel(
CreateSubchannelArgs.newBuilder()
.setAddresses(eagList)
.setAttributes(createSubchannelAttrs())
.build());
subchannel.start(new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo newState) {
handleSubchannelState(subchannel, newState);
}
});
if (requestConnectionPending) {
subchannel.requestConnection();
requestConnectionPending = false;
}
} else {
subchannel = subchannels.values().iterator().next();
subchannel.updateAddresses(eagList);

if (pickFirstLb == null) {
pickFirstLb = pickFirstLbProvider.newLoadBalancer(new PickFirstLbHelper());
}

// Pass addresses to child LB.
pickFirstLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(eagList)
.build());
if (requestConnectionPending) {
pickFirstLb.requestConnection();
requestConnectionPending = false;
}
subchannels = Collections.singletonMap(eagList, subchannel);
newBackendList.add(
new BackendEntry(subchannel, new TokenAttachingTracerFactory(loadRecorder)));
// Store the load recorder for token attachment.
currentPickFirstLoadRecorder = loadRecorder;
break;
default:
throw new AssertionError("Missing case for " + config.getMode());
Expand Down Expand Up @@ -842,7 +860,11 @@ private void cleanUp() {
private void maybeUpdatePicker() {
List<RoundRobinEntry> pickList;
ConnectivityState state;
if (backendList.isEmpty()) {
// For PICK_FIRST mode with delegation, check if child LB exists instead of backendList.
boolean hasBackends = config.getMode() == Mode.PICK_FIRST
? pickFirstLb != null
: !backendList.isEmpty();
if (!hasBackends) {
// Note balancer (is working) may enforce using fallback backends, and that fallback may
// fail. So we should check if currently in fallback first.
if (usingFallbackBackends) {
Expand Down Expand Up @@ -894,26 +916,12 @@ private void maybeUpdatePicker() {
}
break;
case PICK_FIRST: {
checkState(backendList.size() == 1, "Excessive backend entries: %s", backendList);
BackendEntry onlyEntry = backendList.get(0);
ConnectivityStateInfo stateInfo =
onlyEntry.subchannel.getAttributes().get(STATE_INFO).get();
state = stateInfo.getState();
switch (state) {
case READY:
pickList = Collections.<RoundRobinEntry>singletonList(onlyEntry);
break;
case TRANSIENT_FAILURE:
pickList =
Collections.<RoundRobinEntry>singletonList(new ErrorEntry(stateInfo.getStatus()));
break;
case CONNECTING:
pickList = Collections.singletonList(BUFFER_ENTRY);
break;
default:
pickList = Collections.<RoundRobinEntry>singletonList(
new IdleSubchannelEntry(onlyEntry.subchannel, syncContext));
}
// Use child LB's state and picker. Wrap the picker for token attachment.
state = pickFirstLbState;
TokenAttachingTracerFactory tracerFactory =
new TokenAttachingTracerFactory(currentPickFirstLoadRecorder);
pickList = Collections.<RoundRobinEntry>singletonList(
new ChildLbPickerEntry(pickFirstLbPicker, tracerFactory));
break;
}
default:
Expand Down Expand Up @@ -983,7 +991,7 @@ public boolean equals(Object other) {

@VisibleForTesting
interface RoundRobinEntry {
PickResult picked(Metadata headers);
PickResult picked(PickSubchannelArgs args);
}

@VisibleForTesting
Expand Down Expand Up @@ -1024,7 +1032,8 @@ static final class BackendEntry implements RoundRobinEntry {
}

@Override
public PickResult picked(Metadata headers) {
public PickResult picked(PickSubchannelArgs args) {
Metadata headers = args.getHeaders();
headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY);
if (token != null) {
headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token);
Expand Down Expand Up @@ -1065,7 +1074,7 @@ static final class IdleSubchannelEntry implements RoundRobinEntry {
}

@Override
public PickResult picked(Metadata headers) {
public PickResult picked(PickSubchannelArgs args) {
if (connectionRequested.compareAndSet(false, true)) {
syncContext.execute(new Runnable() {
@Override
Expand Down Expand Up @@ -1108,7 +1117,7 @@ static final class ErrorEntry implements RoundRobinEntry {
}

@Override
public PickResult picked(Metadata headers) {
public PickResult picked(PickSubchannelArgs args) {
return result;
}

Expand All @@ -1132,6 +1141,53 @@ public String toString() {
}
}

/**
* Entry that wraps a child LB's picker for PICK_FIRST mode delegation.
* Attaches TokenAttachingTracerFactory to the pick result for token propagation.
*/
@VisibleForTesting
static final class ChildLbPickerEntry implements RoundRobinEntry {
private final SubchannelPicker childPicker;
private final TokenAttachingTracerFactory tracerFactory;

ChildLbPickerEntry(SubchannelPicker childPicker, TokenAttachingTracerFactory tracerFactory) {
this.childPicker = checkNotNull(childPicker, "childPicker");
this.tracerFactory = checkNotNull(tracerFactory, "tracerFactory");
}

@Override
public PickResult picked(PickSubchannelArgs args) {
PickResult childResult = childPicker.pickSubchannel(args);
if (childResult.getSubchannel() == null) {
// No subchannel (e.g., buffer, error), return as-is.
return childResult;
}
// Wrap the pick result to attach tokens via the tracer factory.
return PickResult.withSubchannel(
childResult.getSubchannel(), tracerFactory, childResult.getAuthorityOverride());
}

@Override
public int hashCode() {
return Objects.hashCode(childPicker, tracerFactory);
}

@Override
public boolean equals(Object other) {
if (!(other instanceof ChildLbPickerEntry)) {
return false;
}
ChildLbPickerEntry that = (ChildLbPickerEntry) other;
return Objects.equal(childPicker, that.childPicker)
&& Objects.equal(tracerFactory, that.tracerFactory);
}

@Override
public String toString() {
return "ChildLbPickerEntry(" + childPicker + ")";
}
}

@VisibleForTesting
static final class RoundRobinPicker extends SubchannelPicker {
@VisibleForTesting
Expand Down Expand Up @@ -1174,7 +1230,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
if (pickIndex == pickList.size()) {
pickIndex = 0;
}
return pick.picked(args.getHeaders());
return pick.picked(args);
}
}

Expand All @@ -1189,4 +1245,28 @@ public String toString() {
return MoreObjects.toStringHelper(RoundRobinPicker.class).toString();
}
}

/**
* Helper for the child pick_first LB in PICK_FIRST mode. Intercepts updateBalancingState()
* to store state and trigger the grpclb picker update with drops and token attachment.
*/
private final class PickFirstLbHelper extends ForwardingLoadBalancerHelper {

@Override
protected Helper delegate() {
return helper;
}

@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
pickFirstLbState = newState;
pickFirstLbPicker = newPicker;
// Trigger name resolution refresh on TRANSIENT_FAILURE or IDLE, similar to ROUND_ROBIN.
if (newState == TRANSIENT_FAILURE || newState == IDLE) {
helper.refreshNameResolution();
}
maybeUseFallbackBackends();
maybeUpdatePicker();
}
}
}
Loading