18
18
package org .apache .phoenix .replication ;
19
19
20
20
import static org .junit .Assert .assertNotNull ;
21
+ import static org .junit .Assert .assertTrue ;
22
+ import static org .junit .Assert .assertFalse ;
21
23
import static org .mockito .ArgumentMatchers .eq ;
24
+ import static org .mockito .ArgumentMatchers .anyString ;
25
+ import static org .mockito .ArgumentMatchers .anyLong ;
26
+ import static org .mockito .ArgumentMatchers .any ;
22
27
import static org .mockito .Mockito .doAnswer ;
23
28
import static org .mockito .Mockito .doThrow ;
24
29
import static org .mockito .Mockito .mock ;
25
30
import static org .mockito .Mockito .spy ;
26
31
import static org .mockito .Mockito .timeout ;
27
32
import static org .mockito .Mockito .times ;
28
33
import static org .mockito .Mockito .when ;
34
+ import static org .mockito .Mockito .verify ;
29
35
30
36
import java .io .IOException ;
31
37
import java .util .concurrent .CompletableFuture ;
@@ -63,9 +69,14 @@ public class ReplicationLogWriterTest {
63
69
private TestableReplicationLogManager logManager ;
64
70
private LogFile .Writer internalWriter ;
65
71
72
+ private static final int TEST_RING_BUFFER_SIZE = 32 ;
73
+
66
74
@ Before
67
75
public void setUp () throws IOException {
68
76
conf = HBaseConfiguration .create ();
77
+ // Small ring buffer size for testing
78
+ conf .setInt (ReplicationLogWriter .REPLICATION_WRITER_RINGBUFFER_SIZE_KEY ,
79
+ TEST_RING_BUFFER_SIZE );
69
80
serverName = ServerName .valueOf ("test" , 60010 , EnvironmentEdgeManager .currentTimeMillis ());
70
81
// Use a temporary folder for the standby HDFS URL
71
82
standbyLogDir = new Path (testFolder .newFolder ("standby" ).toURI ());
@@ -159,30 +170,23 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
159
170
});
160
171
syncThread .start ();
161
172
162
- // Happens-before ordering verification, using Mockito's inOrder.
173
+ // Wait for the sync() call itself to complete. Any issues here will cause the test to
174
+ // stall and eventually time out.
175
+ syncResultFuture .get ();
163
176
164
- // Verify that the appends happen before sync, and sync happened after appends
165
- // Use timeout with verify to wait for the async event handler to process
177
+ // Happens-before ordering verification, using Mockito's inOrder. Verify that the appends
178
+ // happen before sync, and sync happened after appends.
166
179
InOrder inOrder = Mockito .inOrder (internalWriter );
167
180
168
- // Wait for appends (these might happen very quickly)
169
- inOrder .verify (internalWriter , timeout (1000 ).times (1 )).append (eq (tableName ), eq (commitId1 ),
170
- eq (put1 ));
171
- inOrder .verify (internalWriter , timeout (1000 ).times (1 )).append (eq (tableName ), eq (commitId2 ),
172
- eq (put2 ));
173
- inOrder .verify (internalWriter , timeout (1000 ).times (1 )).append (eq (tableName ), eq (commitId3 ),
174
- eq (put3 ));
175
- inOrder .verify (internalWriter , timeout (1000 ).times (1 )).append (eq (tableName ), eq (commitId4 ),
176
- eq (put4 ));
177
- inOrder .verify (internalWriter , timeout (1000 ).times (1 )).append (eq (tableName ), eq (commitId5 ),
178
- eq (put5 ));
179
-
180
- // Wait for sync (this depends on the syncFuture completion)
181
- inOrder .verify (internalWriter , timeout (1000 ).times (1 )).sync ();
181
+ // Verify the appends happened in order.
182
+ inOrder .verify (internalWriter , times (1 )).append (eq (tableName ), eq (commitId1 ), eq (put1 ));
183
+ inOrder .verify (internalWriter , times (1 )).append (eq (tableName ), eq (commitId2 ), eq (put2 ));
184
+ inOrder .verify (internalWriter , times (1 )).append (eq (tableName ), eq (commitId3 ), eq (put3 ));
185
+ inOrder .verify (internalWriter , times (1 )).append (eq (tableName ), eq (commitId4 ), eq (put4 ));
186
+ inOrder .verify (internalWriter , times (1 )).append (eq (tableName ), eq (commitId5 ), eq (put5 ));
182
187
183
- // Wait for the sync() call itself to complete. Any issues here will cause the test to
184
- // stall and eventually time out.
185
- syncResultFuture .get ();
188
+ // Verify the sync happened after the appends.
189
+ inOrder .verify (internalWriter , times (1 )).sync ();
186
190
187
191
// Close the log writer.
188
192
logWriter .close ();
@@ -214,6 +218,9 @@ public void testSyncFailureAndRetry() throws Exception {
214
218
LogFile .Writer writerAfterRoll = mock (LogFile .Writer .class );
215
219
when (writerAfterRoll .getLength ()).thenReturn (0L ); // Needed for shouldRotate check
216
220
221
+ // Configure writerAfterRoll to succeed on sync
222
+ doAnswer (invocation -> null ).when (writerAfterRoll ).sync ();
223
+
217
224
// Configure the logManager's rotateLog method to return the second writer
218
225
doAnswer (invocation -> {
219
226
// Simulate the actual rotation behavior: close old, create new
@@ -225,14 +232,10 @@ public void testSyncFailureAndRetry() throws Exception {
225
232
return logManager .currentWriter ;
226
233
}).when (logManager ).rotateLog ();
227
234
228
- // Configure internalWriter2 to succeed on sync
229
- doAnswer (invocation -> null ).when (writerAfterRoll ).sync ();
230
-
231
235
// Append data
232
236
logWriter .append (tableName , commitId1 , put1 );
233
237
234
- // Call sync and expect it to eventually succeed after retry. Do it in another thread so we
235
- // continue to InOrder behavioral verification.
238
+ // Call sync and expect it to eventually succeed after retry.
236
239
CompletableFuture <Void > syncResultFuture = new CompletableFuture <>();
237
240
Thread syncThread = new Thread (() -> {
238
241
try {
@@ -244,25 +247,84 @@ public void testSyncFailureAndRetry() throws Exception {
244
247
});
245
248
syncThread .start ();
246
249
250
+ // Wait for the sync() call itself to complete successfully
251
+ syncResultFuture .get ();
252
+
247
253
// Verify the sequence: append, sync (fail), rotate, append (retry), sync (succeed)
248
254
InOrder inOrder = Mockito .inOrder (writerBeforeRoll , writerAfterRoll , logManager );
249
-
250
- // 1. Append to the first writer
255
+ // Append to the first writer.
251
256
inOrder .verify (writerBeforeRoll , times (1 )).append (eq (tableName ), eq (commitId1 ), eq (put1 ));
252
- // 2. First sync attempt fails
257
+ // First sync attempt fails.
253
258
inOrder .verify (writerBeforeRoll , times (1 )).sync ();
254
- // 3. Log rotation is triggered due to failure
255
- inOrder .verify (logManager , times (1 )).rotateLog (); // Verify rotation was called
256
-
257
- // Wait for the sync() call itself to complete successfully
258
- syncResultFuture .get ();
259
-
260
- // 4. Append is retried on the second writer
259
+ // Log rotation is triggered due to failure.
260
+ inOrder .verify (logManager , times (1 )).rotateLog ();
261
+ // Append is retried on the second writer.
261
262
inOrder .verify (writerAfterRoll , times (1 )).append (eq (tableName ), eq (commitId1 ), eq (put1 ));
262
- // 5. Second sync attempt succeeds
263
+ // Second sync attempt succeeds.
263
264
inOrder .verify (writerAfterRoll , times (1 )).sync ();
264
265
265
266
logWriter .close ();
266
267
}
267
268
269
+ @ Test (timeout =15000 )
270
+ public void testBlockingWhenRingFull () throws Exception {
271
+ final String tableName = "TBLBWRF" ;
272
+ final Mutation put = LogFileTestUtil .newPut ("row" , 1 , 1 );
273
+ long commitId = 0 ;
274
+
275
+ // Create the ReplicationLogWriter instance to be tested.
276
+ ReplicationLogWriter logWriter = new ReplicationLogWriter (conf , logManager );
277
+ logWriter .init ();
278
+
279
+ // Create a slow consumer to fill up the ring buffer.
280
+ doAnswer (new Answer <Void >() {
281
+ @ Override
282
+ public Void answer (InvocationOnMock invocation ) throws Throwable {
283
+ Thread .sleep (50 ); // Simulate slow processing
284
+ return null ;
285
+ }
286
+ }).when (internalWriter ).append (anyString (), anyLong (), any (Mutation .class ));
287
+
288
+ // Fill up the ring buffer by sending enough events.
289
+ for (int i = 0 ; i < TEST_RING_BUFFER_SIZE ; i ++) {
290
+ logWriter .append (tableName , commitId ++, put );
291
+ }
292
+
293
+ // Now try to append when the ring is full. This should block until space becomes
294
+ // available.
295
+ long myCommitId = commitId ++;
296
+ CompletableFuture <Void > startFuture = new CompletableFuture <>();
297
+ CompletableFuture <Void > appendFuture = new CompletableFuture <>();
298
+ Thread appendThread = new Thread (() -> {
299
+ try {
300
+ startFuture .complete (null );
301
+ logWriter .append (tableName , myCommitId , put );
302
+ appendFuture .complete (null );
303
+ } catch (IOException e ) {
304
+ appendFuture .completeExceptionally (e );
305
+ }
306
+ });
307
+ appendThread .start ();
308
+
309
+ // Wait for the append thread.
310
+ startFuture .get ();
311
+
312
+ // Verify the append is still blocked
313
+ assertFalse ("Append should be blocked when ring is full" , appendFuture .isDone ());
314
+
315
+ // Let some events process to free up space.
316
+ Thread .sleep (1000 );
317
+
318
+ // Now the append should complete. Any issues and we will time out here.
319
+ appendFuture .get ();
320
+ assertTrue ("Append should have completed" , appendFuture .isDone ());
321
+
322
+ // Verify the append eventually happens on the writer. We may have to wait for a while
323
+ // because of our slow consumer.
324
+ verify (internalWriter , timeout (10000 ).times (1 )).append (eq (tableName ), eq (myCommitId ),
325
+ eq (put ));
326
+
327
+ logWriter .close ();
328
+ }
329
+
268
330
}
0 commit comments