@@ -507,6 +507,106 @@ public void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disab
507507 }
508508 }
509509
510+ @ Test
511+ public void testMetaCacheMissTriggersRefresh () throws Exception {
512+ TableName tableName = TableName .valueOf (name .getMethodName ());
513+ int regionReplication = 3 ;
514+ HTableDescriptor htd = HTU .createTableDescriptor (tableName );
515+ htd .setRegionReplication (regionReplication );
516+ createOrEnableTableWithRetries (htd , true );
517+
518+ Connection connection = ConnectionFactory .createConnection (HTU .getConfiguration ());
519+ Table table = connection .getTable (tableName );
520+
521+ try {
522+ HTU .loadNumericRows (table , HBaseTestingUtility .fam1 , 0 , 100 );
523+
524+ RegionLocator rl = connection .getRegionLocator (tableName );
525+ HRegionLocation hrl = rl .getRegionLocation (HConstants .EMPTY_BYTE_ARRAY );
526+ byte [] encodedRegionName = hrl .getRegionInfo ().getEncodedNameAsBytes ();
527+ rl .close ();
528+
529+ AtomicLong skippedEdits = new AtomicLong ();
530+ RegionReplicaReplicationEndpoint .RegionReplicaOutputSink sink =
531+ mock (RegionReplicaReplicationEndpoint .RegionReplicaOutputSink .class );
532+ when (sink .getSkippedEditsCounter ()).thenReturn (skippedEdits );
533+
534+ FSTableDescriptors fstd =
535+ new FSTableDescriptors (FileSystem .get (HTU .getConfiguration ()), HTU .getDefaultRootDirPath ());
536+
537+ RegionReplicaReplicationEndpoint .RegionReplicaSinkWriter sinkWriter =
538+ new RegionReplicaReplicationEndpoint .RegionReplicaSinkWriter (sink ,
539+ (ClusterConnection ) connection , Executors .newSingleThreadExecutor (), Integer .MAX_VALUE ,
540+ fstd );
541+
542+ Cell cell = CellBuilderFactory .create (CellBuilderType .DEEP_COPY )
543+ .setRow (Bytes .toBytes ("testRow" )).setFamily (HBaseTestingUtility .fam1 )
544+ .setValue (Bytes .toBytes ("testValue" )).setType (Type .Put ).build ();
545+
546+ Entry entry =
547+ new Entry (new WALKeyImpl (encodedRegionName , tableName , 1 ), new WALEdit ().add (cell ));
548+
549+ sinkWriter .append (tableName , encodedRegionName , Bytes .toBytes ("testRow" ),
550+ Lists .newArrayList (entry ));
551+
552+ assertEquals ("No entries should be skipped for valid table" , 0 , skippedEdits .get ());
553+
554+ } finally {
555+ table .close ();
556+ connection .close ();
557+ }
558+ }
559+
560+ @ Test
561+ public void testMetaCacheSkippedForSingleReplicaTable () throws Exception {
562+ TableName tableName = TableName .valueOf (name .getMethodName ());
563+ int regionReplication = 1 ;
564+ HTableDescriptor htd = HTU .createTableDescriptor (tableName );
565+ htd .setRegionReplication (regionReplication );
566+ createOrEnableTableWithRetries (htd , true );
567+
568+ Connection connection = ConnectionFactory .createConnection (HTU .getConfiguration ());
569+ Table table = connection .getTable (tableName );
570+
571+ try {
572+ HTU .loadNumericRows (table , HBaseTestingUtility .fam1 , 0 , 100 );
573+
574+ RegionLocator rl = connection .getRegionLocator (tableName );
575+ HRegionLocation hrl = rl .getRegionLocation (HConstants .EMPTY_BYTE_ARRAY );
576+ byte [] encodedRegionName = hrl .getRegionInfo ().getEncodedNameAsBytes ();
577+ rl .close ();
578+
579+ AtomicLong skippedEdits = new AtomicLong ();
580+ RegionReplicaReplicationEndpoint .RegionReplicaOutputSink sink =
581+ mock (RegionReplicaReplicationEndpoint .RegionReplicaOutputSink .class );
582+ when (sink .getSkippedEditsCounter ()).thenReturn (skippedEdits );
583+
584+ FSTableDescriptors fstd =
585+ new FSTableDescriptors (FileSystem .get (HTU .getConfiguration ()), HTU .getDefaultRootDirPath ());
586+
587+ RegionReplicaReplicationEndpoint .RegionReplicaSinkWriter sinkWriter =
588+ new RegionReplicaReplicationEndpoint .RegionReplicaSinkWriter (sink ,
589+ (ClusterConnection ) connection , Executors .newSingleThreadExecutor (), Integer .MAX_VALUE ,
590+ fstd );
591+
592+ Cell cell = CellBuilderFactory .create (CellBuilderType .DEEP_COPY )
593+ .setRow (Bytes .toBytes ("testRow" )).setFamily (HBaseTestingUtility .fam1 )
594+ .setValue (Bytes .toBytes ("testValue" )).setType (Type .Put ).build ();
595+
596+ Entry entry =
597+ new Entry (new WALKeyImpl (encodedRegionName , tableName , 1 ), new WALEdit ().add (cell ));
598+
599+ sinkWriter .append (tableName , encodedRegionName , Bytes .toBytes ("testRow" ),
600+ Lists .newArrayList (entry ));
601+
602+ assertEquals ("No entries should be skipped for single replica table" , 0 , skippedEdits .get ());
603+
604+ } finally {
605+ table .close ();
606+ connection .close ();
607+ }
608+ }
609+
510610 private void createOrEnableTableWithRetries (TableDescriptor htd , boolean createTableOperation ) {
511611 // Helper function to run create/enable table operations with a retry feature
512612 boolean continueToRetry = true ;
0 commit comments