Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions grpclb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ java_library(
"//context",
"//core:internal",
"//stub",
"//util",
"@com_google_protobuf//:protobuf_java_util",
"@io_grpc_grpc_proto//:grpclb_load_balancer_java_proto",
artifact("com.google.code.findbugs:jsr305"),
Expand Down
1 change: 1 addition & 0 deletions grpclb/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
implementation project(':grpc-core'),
project(':grpc-protobuf'),
project(':grpc-stub'),
project(':grpc-util'),
libraries.guava,
libraries.protobuf.java,
libraries.protobuf.java.util
Expand Down
231 changes: 158 additions & 73 deletions grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@
import io.grpc.ConnectivityStateInfo;
import io.grpc.Context;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer.CreateSubchannelArgs;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.FixedResultPicker;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
Expand All @@ -62,6 +65,7 @@
import io.grpc.lb.v1.Server;
import io.grpc.lb.v1.ServerList;
import io.grpc.stub.StreamObserver;
import io.grpc.util.ForwardingLoadBalancerHelper;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
Expand Down Expand Up @@ -119,7 +123,7 @@ final class GrpclbState {
@VisibleForTesting
static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() {
@Override
public PickResult picked(Metadata headers) {
public PickResult picked(PickSubchannelArgs args) {
return PickResult.withNoResult();
}

Expand Down Expand Up @@ -187,6 +191,15 @@ enum Mode {
new RoundRobinPicker(Collections.<DropEntry>emptyList(), Arrays.asList(BUFFER_ENTRY));
private boolean requestConnectionPending;

// Child LoadBalancer and state for PICK_FIRST mode delegation.
private final LoadBalancerProvider pickFirstLbProvider;
@Nullable
private LoadBalancer pickFirstLb;
private ConnectivityState pickFirstLbState = CONNECTING;
private SubchannelPicker pickFirstLbPicker = new FixedResultPicker(PickResult.withNoResult());
@Nullable
private GrpclbClientLoadRecorder currentPickFirstLoadRecorder;

GrpclbState(
GrpclbConfig config,
Helper helper,
Expand All @@ -212,6 +225,9 @@ public void onSubchannelState(
} else {
this.subchannelPool = null;
}
this.pickFirstLbProvider = checkNotNull(
LoadBalancerRegistry.getDefaultRegistry().getProvider("pick_first"),
"pick_first balancer not available");
this.time = checkNotNull(time, "time provider");
this.stopwatch = checkNotNull(stopwatch, "stopwatch");
this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timerService");
Expand Down Expand Up @@ -309,6 +325,12 @@ void handleAddresses(

void requestConnection() {
requestConnectionPending = true;
// For PICK_FIRST mode with delegation, forward to the child LB.
if (config.getMode() == Mode.PICK_FIRST && pickFirstLb != null) {
pickFirstLb.requestConnection();
requestConnectionPending = false;
return;
}
for (RoundRobinEntry entry : currentPicker.pickList) {
if (entry instanceof IdleSubchannelEntry) {
((IdleSubchannelEntry) entry).subchannel.requestConnection();
Expand All @@ -323,15 +345,23 @@ private void maybeUseFallbackBackends() {
}
// Balancer RPC should have either been broken or timed out.
checkState(fallbackReason != null, "no reason to fallback");
for (Subchannel subchannel : subchannels.values()) {
ConnectivityStateInfo stateInfo = subchannel.getAttributes().get(STATE_INFO).get();
if (stateInfo.getState() == READY) {
// For PICK_FIRST mode with delegation, check the child LB's state.
if (config.getMode() == Mode.PICK_FIRST) {
if (pickFirstLb != null && pickFirstLbState == READY) {
return;
}
// If we do have balancer-provided backends, use one of its error in the error message if
// fail to fallback.
if (stateInfo.getState() == TRANSIENT_FAILURE) {
fallbackReason = stateInfo.getStatus();
// For PICK_FIRST, we don't have individual subchannel states to use as fallback reason.
} else {
for (Subchannel subchannel : subchannels.values()) {
ConnectivityStateInfo stateInfo = subchannel.getAttributes().get(STATE_INFO).get();
if (stateInfo.getState() == READY) {
return;
}
// If we do have balancer-provided backends, use one of its error in the error message if
// fail to fallback.
if (stateInfo.getState() == TRANSIENT_FAILURE) {
fallbackReason = stateInfo.getStatus();
}
}
}
// Fallback conditions met
Expand Down Expand Up @@ -438,9 +468,10 @@ void shutdown() {
subchannelPool.clear();
break;
case PICK_FIRST:
if (!subchannels.isEmpty()) {
checkState(subchannels.size() == 1, "Excessive Subchannels: %s", subchannels);
subchannels.values().iterator().next().shutdown();
// Shutdown the child pick_first LB which manages its own subchannels.
if (pickFirstLb != null) {
pickFirstLb.shutdown();
pickFirstLb = null;
}
break;
default:
Expand Down Expand Up @@ -517,22 +548,17 @@ private void updateServerList(
subchannels = Collections.unmodifiableMap(newSubchannelMap);
break;
case PICK_FIRST:
checkState(subchannels.size() <= 1, "Unexpected Subchannel count: %s", subchannels);
final Subchannel subchannel;
// Delegate to child pick_first LB for address management.
// Shutdown existing child LB if addresses become empty.
if (newBackendAddrList.isEmpty()) {
if (subchannels.size() == 1) {
subchannel = subchannels.values().iterator().next();
subchannel.shutdown();
subchannels = Collections.emptyMap();
if (pickFirstLb != null) {
pickFirstLb.shutdown();
pickFirstLb = null;
}
break;
}
List<EquivalentAddressGroup> eagList = new ArrayList<>();
// Because for PICK_FIRST, we create a single Subchannel for all addresses, we have to
// attach the tokens to the EAG attributes and use TokenAttachingLoadRecorder to put them on
// headers.
//
// The PICK_FIRST code path doesn't cache Subchannels.
// Attach tokens to EAG attributes for TokenAttachingTracerFactory to retrieve.
for (BackendAddressGroup bag : newBackendAddrList) {
EquivalentAddressGroup origEag = bag.getAddresses();
Attributes eagAttrs = origEag.getAttributes();
Expand All @@ -542,30 +568,22 @@ private void updateServerList(
}
eagList.add(new EquivalentAddressGroup(origEag.getAddresses(), eagAttrs));
}
if (subchannels.isEmpty()) {
subchannel =
helper.createSubchannel(
CreateSubchannelArgs.newBuilder()
.setAddresses(eagList)
.setAttributes(createSubchannelAttrs())
.build());
subchannel.start(new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo newState) {
handleSubchannelState(subchannel, newState);
}
});
if (requestConnectionPending) {
subchannel.requestConnection();
requestConnectionPending = false;
}
} else {
subchannel = subchannels.values().iterator().next();
subchannel.updateAddresses(eagList);

if (pickFirstLb == null) {
pickFirstLb = pickFirstLbProvider.newLoadBalancer(new PickFirstLbHelper());
}
subchannels = Collections.singletonMap(eagList, subchannel);
newBackendList.add(
new BackendEntry(subchannel, new TokenAttachingTracerFactory(loadRecorder)));

// Pass addresses to child LB.
pickFirstLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(eagList)
.build());
if (requestConnectionPending) {
pickFirstLb.requestConnection();
requestConnectionPending = false;
}
// Store the load recorder for token attachment.
currentPickFirstLoadRecorder = loadRecorder;
break;
default:
throw new AssertionError("Missing case for " + config.getMode());
Expand Down Expand Up @@ -842,7 +860,11 @@ private void cleanUp() {
private void maybeUpdatePicker() {
List<RoundRobinEntry> pickList;
ConnectivityState state;
if (backendList.isEmpty()) {
// For PICK_FIRST mode with delegation, check if child LB exists instead of backendList.
boolean hasBackends = config.getMode() == Mode.PICK_FIRST
? pickFirstLb != null
: !backendList.isEmpty();
if (!hasBackends) {
// Note balancer (is working) may enforce using fallback backends, and that fallback may
// fail. So we should check if currently in fallback first.
if (usingFallbackBackends) {
Expand Down Expand Up @@ -894,26 +916,12 @@ private void maybeUpdatePicker() {
}
break;
case PICK_FIRST: {
checkState(backendList.size() == 1, "Excessive backend entries: %s", backendList);
BackendEntry onlyEntry = backendList.get(0);
ConnectivityStateInfo stateInfo =
onlyEntry.subchannel.getAttributes().get(STATE_INFO).get();
state = stateInfo.getState();
switch (state) {
case READY:
pickList = Collections.<RoundRobinEntry>singletonList(onlyEntry);
break;
case TRANSIENT_FAILURE:
pickList =
Collections.<RoundRobinEntry>singletonList(new ErrorEntry(stateInfo.getStatus()));
break;
case CONNECTING:
pickList = Collections.singletonList(BUFFER_ENTRY);
break;
default:
pickList = Collections.<RoundRobinEntry>singletonList(
new IdleSubchannelEntry(onlyEntry.subchannel, syncContext));
}
// Use child LB's state and picker. Wrap the picker for token attachment.
state = pickFirstLbState;
TokenAttachingTracerFactory tracerFactory =
new TokenAttachingTracerFactory(currentPickFirstLoadRecorder);
pickList = Collections.<RoundRobinEntry>singletonList(
new ChildLbPickerEntry(pickFirstLbPicker, tracerFactory));
break;
}
default:
Expand Down Expand Up @@ -983,7 +991,7 @@ public boolean equals(Object other) {

@VisibleForTesting
interface RoundRobinEntry {
PickResult picked(Metadata headers);
PickResult picked(PickSubchannelArgs args);
}

@VisibleForTesting
Expand Down Expand Up @@ -1024,7 +1032,8 @@ static final class BackendEntry implements RoundRobinEntry {
}

@Override
public PickResult picked(Metadata headers) {
public PickResult picked(PickSubchannelArgs args) {
Metadata headers = args.getHeaders();
headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY);
if (token != null) {
headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token);
Expand Down Expand Up @@ -1065,7 +1074,7 @@ static final class IdleSubchannelEntry implements RoundRobinEntry {
}

@Override
public PickResult picked(Metadata headers) {
public PickResult picked(PickSubchannelArgs args) {
if (connectionRequested.compareAndSet(false, true)) {
syncContext.execute(new Runnable() {
@Override
Expand Down Expand Up @@ -1108,7 +1117,7 @@ static final class ErrorEntry implements RoundRobinEntry {
}

@Override
public PickResult picked(Metadata headers) {
public PickResult picked(PickSubchannelArgs args) {
return result;
}

Expand All @@ -1132,6 +1141,58 @@ public String toString() {
}
}

/**
* Entry that wraps a child LB's picker for PICK_FIRST mode delegation.
* Attaches TokenAttachingTracerFactory to the pick result for token propagation.
*/
@VisibleForTesting
static final class ChildLbPickerEntry implements RoundRobinEntry {
private final SubchannelPicker childPicker;
private final TokenAttachingTracerFactory tracerFactory;

ChildLbPickerEntry(SubchannelPicker childPicker, TokenAttachingTracerFactory tracerFactory) {
this.childPicker = checkNotNull(childPicker, "childPicker");
this.tracerFactory = checkNotNull(tracerFactory, "tracerFactory");
}

@Override
public PickResult picked(PickSubchannelArgs args) {
PickResult childResult = childPicker.pickSubchannel(args);
if (childResult.getSubchannel() == null) {
// No subchannel (e.g., buffer, error), return as-is.
return childResult;
}
// Wrap the pick result to attach tokens via the tracer factory.
return PickResult.withSubchannel(
childResult.getSubchannel(), tracerFactory, childResult.getAuthorityOverride());
}

@Override
public int hashCode() {
return Objects.hashCode(childPicker, tracerFactory);
}

@Override
public boolean equals(Object other) {
if (!(other instanceof ChildLbPickerEntry)) {
return false;
}
ChildLbPickerEntry that = (ChildLbPickerEntry) other;
return Objects.equal(childPicker, that.childPicker)
&& Objects.equal(tracerFactory, that.tracerFactory);
}

@Override
public String toString() {
return "ChildLbPickerEntry(" + childPicker + ")";
}

@VisibleForTesting
SubchannelPicker getChildPicker() {
return childPicker;
}
}

@VisibleForTesting
static final class RoundRobinPicker extends SubchannelPicker {
@VisibleForTesting
Expand Down Expand Up @@ -1174,7 +1235,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
if (pickIndex == pickList.size()) {
pickIndex = 0;
}
return pick.picked(args.getHeaders());
return pick.picked(args);
}
}

Expand All @@ -1189,4 +1250,28 @@ public String toString() {
return MoreObjects.toStringHelper(RoundRobinPicker.class).toString();
}
}

/**
* Helper for the child pick_first LB in PICK_FIRST mode. Intercepts updateBalancingState()
* to store state and trigger the grpclb picker update with drops and token attachment.
*/
private final class PickFirstLbHelper extends ForwardingLoadBalancerHelper {

@Override
protected Helper delegate() {
return helper;
}

@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
pickFirstLbState = newState;
pickFirstLbPicker = newPicker;
// Trigger name resolution refresh on TRANSIENT_FAILURE or IDLE, similar to ROUND_ROBIN.
if (newState == TRANSIENT_FAILURE || newState == IDLE) {
helper.refreshNameResolution();
}
maybeUseFallbackBackends();
maybeUpdatePicker();
}
}
}
Loading