Skip to content

Commit

Permalink
ringhash: fix a couple of flakes in e2e style tests (#7784)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Oct 29, 2024
1 parent 52d7f6a commit 2e3f547
Showing 1 changed file with 44 additions and 57 deletions.
101 changes: 44 additions & 57 deletions xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,28 +220,29 @@ func checkRPCSendOK(ctx context.Context, t *testing.T, client testgrpc.TestServi
return backendCount
}

// makeNonExistentBackends returns a slice of strings with num listeners, each
// of which is closed immediately. Useful to simulate servers that are
// unreachable.
func makeNonExistentBackends(t *testing.T, num int) []string {
// makeUnreachableBackends returns a slice of addresses of backends that close
// connections as soon as they are established. Useful to simulate servers that
// are unreachable.
func makeUnreachableBackends(t *testing.T, num int) []string {
t.Helper()

closedListeners := make([]net.Listener, 0, num)
addrs := make([]string, 0, num)
for i := 0; i < num; i++ {
lis, err := testutils.LocalTCPListener()
l, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
closedListeners = append(closedListeners, lis)
}

// Stop the servers that we want to be unreachable and collect their
// addresses. We don't close them in the loop above to make sure ports are
// not reused across them.
addrs := make([]string, 0, num)
for _, lis := range closedListeners {
lis := testutils.NewRestartableListener(l)
addrs = append(addrs, lis.Addr().String())
lis.Close()

// It is enough to fail the first connection attempt to put the subchannel
// in TRANSIENT_FAILURE.
go func() { lis.Accept() }()

// We don't close these listeners here, to make sure ports are
// not reused across them, and across tests.
lis.Stop()
t.Cleanup(func() { lis.Close() })
}
return addrs
}
Expand Down Expand Up @@ -304,7 +305,7 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashAtStartup(t *testing.T
Localities: []e2e.LocalityOptions{{
Name: "locality0",
Weight: 1,
Backends: backendOptions(t, makeNonExistentBackends(t, 2)),
Backends: backendOptions(t, makeUnreachableBackends(t, 2)),
}},
})
ep2 := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
Expand Down Expand Up @@ -403,7 +404,7 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup(
Localities: []e2e.LocalityOptions{{
Name: "locality0",
Weight: 1,
Backends: backendOptions(t, makeNonExistentBackends(t, 1)),
Backends: backendOptions(t, makeUnreachableBackends(t, 1)),
Priority: 0,
}},
})
Expand Down Expand Up @@ -480,7 +481,7 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupN
Localities: []e2e.LocalityOptions{{
Name: "locality0",
Weight: 1,
Backends: backendOptions(t, makeNonExistentBackends(t, 1)),
Backends: backendOptions(t, makeUnreachableBackends(t, 1)),
Priority: 0,
}},
})
Expand Down Expand Up @@ -1369,17 +1370,11 @@ func (s) TestRingHash_IdleToReady(t *testing.T) {
t.Fatalf("Failed to create client: %s", err)
}
defer conn.Close()
client := testgrpc.NewTestServiceClient(conn)

if got, want := conn.GetState(), connectivity.Idle; got != want {
t.Errorf("conn.GetState(): got %v, want %v", got, want)
}
testutils.AwaitState(ctx, t, conn, connectivity.Idle)

client := testgrpc.NewTestServiceClient(conn)
checkRPCSendOK(ctx, t, client, 1)

if got, want := conn.GetState(), connectivity.Ready; got != want {
t.Errorf("conn.GetState(): got %v, want %v", got, want)
}
testutils.AwaitState(ctx, t, conn, connectivity.Ready)
}

// Test that the channel will transition to READY once it starts
Expand All @@ -1395,10 +1390,10 @@ func (s) TestRingHash_ContinuesConnectingWithoutPicks(t *testing.T) {
})
defer backend.Stop()

nonExistentServerAddr := makeNonExistentBackends(t, 1)[0]
unReachableServerAddr := makeUnreachableBackends(t, 1)[0]

const clusterName = "cluster"
endpoints := endpointResource(t, clusterName, []string{backend.Address, nonExistentServerAddr})
endpoints := endpointResource(t, clusterName, []string{backend.Address, unReachableServerAddr})
cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
ClusterName: clusterName,
ServiceName: clusterName,
Expand Down Expand Up @@ -1432,7 +1427,7 @@ func (s) TestRingHash_ContinuesConnectingWithoutPicks(t *testing.T) {

rpcCtx, rpcCancel := context.WithCancel(ctx)
go func() {
rpcCtx = metadata.NewOutgoingContext(rpcCtx, metadata.Pairs("address_hash", nonExistentServerAddr+"_0"))
rpcCtx = metadata.NewOutgoingContext(rpcCtx, metadata.Pairs("address_hash", unReachableServerAddr+"_0"))
_, err := client.EmptyCall(rpcCtx, &testpb.Empty{})
if status.Code(err) != codes.Canceled {
t.Errorf("Expected RPC to be canceled, got error: %v", err)
Expand All @@ -1459,10 +1454,10 @@ func (s) TestRingHash_ContinuesConnectingWithoutPicks(t *testing.T) {
// will move on to the next ring hash entry.
func (s) TestRingHash_TransientFailureCheckNextOne(t *testing.T) {
backends := startTestServiceBackends(t, 1)
nonExistentBackends := makeNonExistentBackends(t, 1)
unReachableBackends := makeUnreachableBackends(t, 1)

const clusterName = "cluster"
endpoints := endpointResource(t, clusterName, append(nonExistentBackends, backends...))
endpoints := endpointResource(t, clusterName, append(unReachableBackends, backends...))
cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
ClusterName: clusterName,
ServiceName: clusterName,
Expand All @@ -1489,7 +1484,7 @@ func (s) TestRingHash_TransientFailureCheckNextOne(t *testing.T) {
// Note each type of RPC contains a header value that will always be hashed
// the value that was used to place the non-existent endpoint on the ring,
// but it still gets routed to the backend that is up.
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", nonExistentBackends[0]+"_0"))
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", unReachableBackends[0]+"_0"))
reqPerBackend := checkRPCSendOK(ctx, t, client, 1)
var got string
for got = range reqPerBackend {
Expand Down Expand Up @@ -1530,24 +1525,18 @@ func (s) TestRingHash_ReattemptWhenGoingFromTransientFailureToIdle(t *testing.T)
t.Fatalf("Failed to create client: %s", err)
}
defer conn.Close()
client := testgrpc.NewTestServiceClient(conn)

if got, want := conn.GetState(), connectivity.Idle; got != want {
t.Errorf("conn.GetState(): got %v, want %v", got, want)
}
testutils.AwaitState(ctx, t, conn, connectivity.Idle)

// There are no endpoints in EDS. RPCs should fail and the channel should
// transition to transient failure.
client := testgrpc.NewTestServiceClient(conn)
if _, err = client.EmptyCall(ctx, &testpb.Empty{}); err == nil {
t.Errorf("rpc EmptyCall() succeeded, want error")
}
if got, want := conn.GetState(), connectivity.TransientFailure; got != want {
t.Errorf("conn.GetState(): got %v, want %v", got, want)
}

backends := startTestServiceBackends(t, 1)
testutils.AwaitState(ctx, t, conn, connectivity.TransientFailure)

t.Log("Updating EDS with a new backend endpoint.")
backends := startTestServiceBackends(t, 1)
endpoints = e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
ClusterName: clusterName,
Localities: []e2e.LocalityOptions{{
Expand All @@ -1563,9 +1552,7 @@ func (s) TestRingHash_ReattemptWhenGoingFromTransientFailureToIdle(t *testing.T)
if _, err = client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Errorf("rpc EmptyCall() failed: %v", err)
}
if got, want := conn.GetState(), connectivity.Ready; got != want {
t.Errorf("conn.GetState(): got %v, want %v", got, want)
}
testutils.AwaitState(ctx, t, conn, connectivity.Ready)
}

// Tests that when all backends are down and then up, we may pick a TF backend
Expand Down Expand Up @@ -1596,11 +1583,11 @@ func (s) TestRingHash_TransientFailureSkipToAvailableReady(t *testing.T) {
})
defer restartableServer2.Stop()

nonExistentBackends := makeNonExistentBackends(t, 2)
unReachableBackends := makeUnreachableBackends(t, 2)

const clusterName = "cluster"
backends := []string{restartableServer1.Address, restartableServer2.Address}
backends = append(backends, nonExistentBackends...)
backends = append(backends, unReachableBackends...)
endpoints := endpointResource(t, clusterName, backends)
cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
ClusterName: clusterName,
Expand Down Expand Up @@ -1862,11 +1849,11 @@ func (s) TestRingHash_SwitchToLowerPriorityAndThenBack(t *testing.T) {
// so for only one subchannel at a time.
func (s) TestRingHash_ContinuesConnectingWithoutPicksOneSubchannelAtATime(t *testing.T) {
backends := startTestServiceBackends(t, 1)
nonExistentBackends := makeNonExistentBackends(t, 3)
unReachableBackends := makeUnreachableBackends(t, 3)

const clusterName = "cluster"

endpoints := endpointResource(t, clusterName, append(nonExistentBackends, backends...))
endpoints := endpointResource(t, clusterName, append(unReachableBackends, backends...))
cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
ClusterName: clusterName,
ServiceName: clusterName,
Expand Down Expand Up @@ -1897,15 +1884,15 @@ func (s) TestRingHash_ContinuesConnectingWithoutPicksOneSubchannelAtATime(t *tes
defer conn.Close()
client := testgrpc.NewTestServiceClient(conn)

holdNonExistent0 := dialer.Hold(nonExistentBackends[0])
holdNonExistent1 := dialer.Hold(nonExistentBackends[1])
holdNonExistent2 := dialer.Hold(nonExistentBackends[2])
holdNonExistent0 := dialer.Hold(unReachableBackends[0])
holdNonExistent1 := dialer.Hold(unReachableBackends[1])
holdNonExistent2 := dialer.Hold(unReachableBackends[2])
holdGood := dialer.Hold(backends[0])

rpcCtx, rpcCancel := context.WithCancel(ctx)
errCh := make(chan error, 1)
go func() {
rpcCtx = metadata.NewOutgoingContext(rpcCtx, metadata.Pairs("address_hash", nonExistentBackends[0]+"_0"))
rpcCtx = metadata.NewOutgoingContext(rpcCtx, metadata.Pairs("address_hash", unReachableBackends[0]+"_0"))
_, err := client.EmptyCall(rpcCtx, &testpb.Empty{})
if status.Code(err) == codes.Canceled {
errCh <- nil
Expand Down Expand Up @@ -1939,7 +1926,7 @@ func (s) TestRingHash_ContinuesConnectingWithoutPicksOneSubchannelAtATime(t *tes
// Allow the connection attempt to the first address to resume and wait for
// the attempt for the second address. No other connection attempts should
// be started yet.
holdNonExistent0Again := dialer.Hold(nonExistentBackends[0])
holdNonExistent0Again := dialer.Hold(unReachableBackends[0])
holdNonExistent0.Resume()
if !holdNonExistent1.Wait(ctx) {
t.Fatalf("Timeout waiting for connection attempt to backend 1")
Expand All @@ -1957,7 +1944,7 @@ func (s) TestRingHash_ContinuesConnectingWithoutPicksOneSubchannelAtATime(t *tes
// Allow the connection attempt to the second address to resume and wait for
// the attempt for the third address. No other connection attempts should
// be started yet.
holdNonExistent1Again := dialer.Hold(nonExistentBackends[1])
holdNonExistent1Again := dialer.Hold(unReachableBackends[1])
holdNonExistent1.Resume()
if !holdNonExistent2.Wait(ctx) {
t.Fatalf("Timeout waiting for connection attempt to backend 2")
Expand All @@ -1975,7 +1962,7 @@ func (s) TestRingHash_ContinuesConnectingWithoutPicksOneSubchannelAtATime(t *tes
// Allow the connection attempt to the third address to resume and wait
// for the attempt for the final address. No other connection attempts
// should be started yet.
holdNonExistent2Again := dialer.Hold(nonExistentBackends[2])
holdNonExistent2Again := dialer.Hold(unReachableBackends[2])
holdNonExistent2.Resume()
if !holdGood.Wait(ctx) {
t.Fatalf("Timeout waiting for connection attempt to good backend")
Expand Down

0 comments on commit 2e3f547

Please sign in to comment.