@@ -19,10 +19,13 @@ package newfeaturetest
19
19
import (
20
20
"context"
21
21
"fmt"
22
+ "sync"
22
23
"testing"
24
+ "time"
23
25
24
26
"github.com/stretchr/testify/require"
25
27
28
+ "vitess.io/vitess/go/mysql"
26
29
"vitess.io/vitess/go/test/endtoend/cluster"
27
30
"vitess.io/vitess/go/test/endtoend/reparent/utils"
28
31
)
@@ -156,3 +159,63 @@ func TestChangeTypeWithoutSemiSync(t *testing.T) {
156
159
err = clusterInstance .VtctlclientProcess .ExecuteCommand ("ChangeTabletType" , replica .Alias , "replica" )
157
160
require .NoError (t , err )
158
161
}
162
+
163
+ func TestBufferingWithMultipleDisruptions (t * testing.T ) {
164
+ defer cluster .PanicHandler (t )
165
+ clusterInstance := utils .SetupShardedReparentCluster (t )
166
+ defer utils .TeardownCluster (clusterInstance )
167
+
168
+ // Stop all VTOrc instances, so that they don't interfere with the test.
169
+ for _ , vtorc := range clusterInstance .VTOrcProcesses {
170
+ err := vtorc .TearDown ()
171
+ require .NoError (t , err )
172
+ }
173
+
174
+ // Start by reparenting all the shards to the first tablet.
175
+ keyspace := clusterInstance .Keyspaces [0 ]
176
+ shards := keyspace .Shards
177
+ for _ , shard := range shards {
178
+ err := clusterInstance .VtctldClientProcess .PlannedReparentShard (keyspace .Name , shard .Name , shard .Vttablets [0 ].Alias )
179
+ require .NoError (t , err )
180
+ }
181
+
182
+ // We simulate start of external reparent or a PRS where the healthcheck update from the tablet gets lost in transit
183
+ // to vtgate by just setting the primary read only. This is also why we needed to shutdown all VTOrcs, so that they don't
184
+ // fix this.
185
+ utils .RunSQL (context .Background (), t , "set global read_only=1" , shards [0 ].Vttablets [0 ])
186
+ utils .RunSQL (context .Background (), t , "set global read_only=1" , shards [1 ].Vttablets [0 ])
187
+
188
+ wg := sync.WaitGroup {}
189
+ rowCount := 10
190
+ vtParams := clusterInstance .GetVTParams (keyspace .Name )
191
+ // We now spawn writes for a bunch of go routines.
192
+ // The ones going to shard 1 and shard 2 should block, since
193
+ // they're in the midst of a reparenting operation (as seen by the buffering code).
194
+ for i := 1 ; i <= rowCount ; i ++ {
195
+ wg .Add (1 )
196
+ go func (i int ) {
197
+ defer wg .Done ()
198
+ conn , err := mysql .Connect (context .Background (), & vtParams )
199
+ if err != nil {
200
+ return
201
+ }
202
+ defer conn .Close ()
203
+ _ , err = conn .ExecuteFetch (utils .GetInsertQuery (i ), 0 , false )
204
+ require .NoError (t , err )
205
+ }(i )
206
+ }
207
+
208
+ // Now, run a PRS call on the last shard. This shouldn't unbuffer the queries that are buffered for shards 1 and 2
209
+ // since the disruption on the two shards hasn't stopped.
210
+ err := clusterInstance .VtctldClientProcess .PlannedReparentShard (keyspace .Name , shards [2 ].Name , shards [2 ].Vttablets [1 ].Alias )
211
+ require .NoError (t , err )
212
+ // We wait a second just to make sure the PRS changes are processed by the buffering logic in vtgate.
213
+ time .Sleep (1 * time .Second )
214
+ // Finally, we'll now make the 2 shards healthy again by running PRS.
215
+ err = clusterInstance .VtctldClientProcess .PlannedReparentShard (keyspace .Name , shards [0 ].Name , shards [0 ].Vttablets [1 ].Alias )
216
+ require .NoError (t , err )
217
+ err = clusterInstance .VtctldClientProcess .PlannedReparentShard (keyspace .Name , shards [1 ].Name , shards [1 ].Vttablets [1 ].Alias )
218
+ require .NoError (t , err )
219
+ // Wait for all the writes to have succeeded.
220
+ wg .Wait ()
221
+ }
0 commit comments