Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions pkg/server/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,13 @@ func (s *drainServer) drainInner(
return s.drainNode(ctx, reporter, verbose)
}

// isDraining returns true if either SQL client connections are being drained
// or if one of the stores on the node is not accepting replicas.
// isDraining returns true if the SQL client connections are being drained,
// if one of the stores on the node is not accepting replicas, or if the
// server controller is being drained.
func (s *drainServer) isDraining() bool {
return s.sqlServer.pgServer.IsDraining() || (s.kvServer.node != nil && s.kvServer.node.IsDraining())
return s.sqlServer.pgServer.IsDraining() ||
(s.kvServer.node != nil && s.kvServer.node.IsDraining()) ||
(s.serverCtl != nil && s.serverCtl.IsDraining())
}

// drainClients starts draining the SQL layer.
Expand All @@ -379,7 +382,10 @@ func (s *drainServer) drainClients(
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()
shouldDelayDraining := !s.isDraining()

// If this is the first time we are draining the clients, we delay draining
// to allow health probes to notice that the node is not ready.
shouldDelayDraining := !s.sqlServer.pgServer.IsDraining()

// Set the gRPC mode of the node to "draining" and mark the node as "not ready".
// Probes to /health?ready=1 will now notice the change in the node's readiness.
Expand Down
45 changes: 40 additions & 5 deletions pkg/server/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package server_test

import (
"context"
gosql "database/sql"
"io"
"testing"
"time"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
Expand All @@ -33,15 +35,39 @@ import (
func TestDrain(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
doTestDrain(t)
testutils.RunTrueAndFalse(t, "secondaryTenants", func(t *testing.T, secondaryTenants bool) {
doTestDrain(t, secondaryTenants)
})
}

// doTestDrain runs the drain test.
func doTestDrain(tt *testing.T) {
func doTestDrain(tt *testing.T, secondaryTenants bool) {
if secondaryTenants {
// Draining the tenant server takes a long time under stress. See #155229.
skip.UnderRace(tt)
}
var drainSleepCallCount = 0
t := newTestDrainContext(tt, &drainSleepCallCount)
defer t.Close()

var tenantConn *gosql.DB
if secondaryTenants {
_, err := t.tc.ServerConn(0).Exec("CREATE TENANT hello")
require.NoError(tt, err)
_, err = t.tc.ServerConn(0).Exec("ALTER TENANT hello START SERVICE SHARED")
require.NoError(tt, err)

// Wait for the tenant to be ready by establishing a test connection.
testutils.SucceedsSoon(tt, func() error {
tenantConn, err = t.tc.Server(0).SystemLayer().SQLConnE(
serverutils.DBName("cluster:hello/defaultdb"))
if err != nil {
return err
}
return tenantConn.Ping()
})
}

// Issue a probe. We're not draining yet, so the probe should
// reflect that.
resp := t.sendProbe()
Expand All @@ -53,7 +79,12 @@ func doTestDrain(tt *testing.T) {
resp = t.sendDrainNoShutdown()
t.assertDraining(resp, true)
t.assertRemaining(resp, true)
t.assertEqual(1, drainSleepCallCount)
if !secondaryTenants {
// If we have secondary tenants, we might not have waited before draining
// the clients at this point because we might have returned early if we are
// still draining the serverController.
t.assertEqual(1, drainSleepCallCount)
}

// Issue another probe. This checks that the server is still running
// (i.e. Shutdown: false was effective), the draining status is
Expand All @@ -63,12 +94,14 @@ func doTestDrain(tt *testing.T) {
t.assertDraining(resp, true)
// probe-only has no remaining.
t.assertRemaining(resp, false)
t.assertEqual(1, drainSleepCallCount)

if !secondaryTenants {
t.assertEqual(1, drainSleepCallCount)
}
// Repeat drain commands until we verify that there are zero remaining leases
// (i.e. complete). Also validate that the server did not sleep again.
testutils.SucceedsSoon(t, func() error {
resp = t.sendDrainNoShutdown()
t.Logf("drain response: %+v", resp)
if !resp.IsDraining {
return errors.Newf("expected draining")
}
Expand All @@ -78,6 +111,7 @@ func doTestDrain(tt *testing.T) {
}
return nil
})

t.assertEqual(1, drainSleepCallCount)

// Now issue a drain request without drain but with shutdown.
Expand Down Expand Up @@ -184,6 +218,7 @@ func newTestDrainContext(t *testing.T, drainSleepCallCount *int) *testDrainConte
// We need to start the cluster insecure in order to not
// care about TLS settings for the RPC client connection.
ServerArgs: base.TestServerArgs{
DefaultTestTenant: base.TestControlsTenantsExplicitly,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
DrainSleepFn: func(time.Duration) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/server_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ func (s *serverController) SetDraining() {
}
}

func (s *serverController) IsDraining() bool {
return s.draining.Load()
}

// start monitors changes to the service mode and updates
// the running servers accordingly.
func (c *serverController) start(ctx context.Context, ie isql.Executor) error {
Expand Down