From 42a8c059894673f9adfe602b198b2cb18f38436a Mon Sep 17 00:00:00 2001 From: Junegunn Choi Date: Mon, 3 Nov 2025 14:44:36 +0900 Subject: [PATCH] HBASE-29699 Scan#setLimit ignored in MapReduce jobs --- .../hbase/shaded/protobuf/ProtobufUtil.java | 6 ++ .../TableSnapshotInputFormatImpl.java | 5 +- .../hbase/mapreduce/TestTableInputFormat.java | 23 ++++++- .../mapreduce/TestTableMapReduceUtil.java | 66 +++++++++++++++++++ .../TestTableSnapshotInputFormat.java | 30 +++++++-- .../src/main/protobuf/client/Client.proto | 1 + 6 files changed, 124 insertions(+), 7 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 60175137ad2c..9257b1d2ea28 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -1099,6 +1099,9 @@ public static ClientProtos.Scan toScan(final Scan scan) throws IOException { scanBuilder.setNeedCursorResult(true); } scanBuilder.setQueryMetricsEnabled(scan.isQueryMetricsEnabled()); + if (scan.getLimit() > 0) { + scanBuilder.setLimit(scan.getLimit()); + } return scanBuilder.build(); } @@ -1204,6 +1207,9 @@ public static Scan toScan(final ClientProtos.Scan proto) throws IOException { scan.setNeedCursorResult(true); } scan.setQueryMetricsEnabled(proto.getQueryMetricsEnabled()); + if (proto.hasLimit()) { + scan.setLimit(proto.getLimit()); + } return scan; } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java index 501209f1c902..1a19988a6409 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java @@ -257,7 +257,10 @@ public ClientSideRegionScanner getScanner() { public void initialize(InputSplit split, Configuration conf) throws IOException { this.scan = TableMapReduceUtil.convertStringToScan(split.getScan()); this.split = split; - this.rowLimitPerSplit = conf.getInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 0); + int confLimit = conf.getInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 0); + int scanLimit = Math.max(scan.getLimit(), 0); + this.rowLimitPerSplit = + confLimit == 0 ? scanLimit : scanLimit == 0 ? confLimit : Math.min(confLimit, scanLimit); TableDescriptor htd = split.htd; RegionInfo hri = this.split.getRegionInfo(); FileSystem fs = CommonFSUtils.getCurrentFileSystem(conf); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java index c12a7e817bb7..5e8ea5a3c233 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java @@ -151,9 +151,12 @@ static boolean checkResult(Result r, ImmutableBytesWritable key, byte[] expected * Create table data and run tests on specified htable using the o.a.h.hbase.mapreduce API. */ static void runTestMapreduce(Table table) throws IOException, InterruptedException { + runTestMapreduce(table, new Scan()); + } + + static void runTestMapreduce(Table table, Scan s) throws IOException, InterruptedException { org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr = new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl(); - Scan s = new Scan(); s.withStartRow(Bytes.toBytes("aaa")); s.withStopRow(Bytes.toBytes("zzz")); s.addFamily(FAMILY); @@ -171,6 +174,10 @@ static void runTestMapreduce(Table table) throws IOException, InterruptedExcepti checkResult(r, key, Bytes.toBytes("aaa"), Bytes.toBytes("value aaa")); more = trr.nextKeyValue(); + if (s.getLimit() == 1) { + assertFalse(more); + return; + } assertTrue(more); key = trr.getCurrentKey(); r = trr.getCurrentValue(); @@ -254,6 +261,20 @@ public void testTableRecordReaderMapreduce() throws IOException, InterruptedExce runTestMapreduce(table); } + /** + * Run test assuming no errors using newer mapreduce api with a Scan object with limit set + */ + @Test + public void testTableRecordReaderMapreduceLimit1() throws IOException, InterruptedException { + Table table = createTable(Bytes.toBytes("table1-mr-limit-1")); + Scan scan = new Scan(); + scan.setLimit(1); + + // Serialize and deserialize the Scan to mimic actual usage. + runTestMapreduce(table, + TableMapReduceUtil.convertStringToScan(TableMapReduceUtil.convertScanToString(scan))); + } + /** * Run test assuming Scanner IOException failure using newer mapreduce api */ diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java index 17a94c999566..5c45d81375c6 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java @@ -24,12 +24,15 @@ import java.io.Closeable; import java.io.File; +import java.io.IOException; import java.net.URI; import java.util.Collection; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.KeyOnlyFilter; import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders; import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; import org.apache.hadoop.hbase.testclassification.MapReduceTests; @@ -295,4 +298,67 @@ public void testInitCredentialsForClusterUri() throws Exception { kdc.stop(); } } + + @Test + public void testScanSerialization() throws IOException { + final byte[] cf = "cf".getBytes(); + final Scan scan = new Scan(); + scan.setLimit(1); + scan.setBatch(1); + scan.setMaxResultSize(1); + scan.setAllowPartialResults(true); + scan.setLoadColumnFamiliesOnDemand(true); + scan.readVersions(1); + scan.setColumnFamilyTimeRange(cf, 0, 1); + scan.setTimeRange(0, 1); + scan.setAttribute("cf", cf); + scan.withStartRow("0".getBytes(), false); + scan.withStopRow("1".getBytes(), true); + scan.setFilter(new KeyOnlyFilter()); + scan.addColumn(cf, cf); + scan.setMaxResultsPerColumnFamily(1); + scan.setRowOffsetPerColumnFamily(1); + scan.setReversed(true); + scan.setConsistency(Consistency.TIMELINE); + scan.setCaching(1); + scan.setReadType(Scan.ReadType.STREAM); + scan.setNeedCursorResult(true); + scan.setQueryMetricsEnabled(true); + + final String serialized = TableMapReduceUtil.convertScanToString(scan); + final Scan deserialized = TableMapReduceUtil.convertStringToScan(serialized); + final String reserialized = TableMapReduceUtil.convertScanToString(deserialized); + + // Verify that serialization is symmetric + assertEquals(serialized, reserialized); + + // Verify individual fields to catch potential omissions + assertEquals(scan.getLimit(), deserialized.getLimit()); + assertEquals(scan.getBatch(), deserialized.getBatch()); + assertEquals(scan.getMaxResultSize(), deserialized.getMaxResultSize()); + assertEquals(scan.getAllowPartialResults(), deserialized.getAllowPartialResults()); + assertEquals(scan.getLoadColumnFamiliesOnDemandValue(), + deserialized.getLoadColumnFamiliesOnDemandValue()); + assertEquals(scan.getMaxVersions(), deserialized.getMaxVersions()); + assertEquals(scan.getColumnFamilyTimeRange().get(cf).toString(), + deserialized.getColumnFamilyTimeRange().get(cf).toString()); + assertEquals(scan.getTimeRange().toString(), deserialized.getTimeRange().toString()); + assertEquals(Bytes.toString(scan.getAttribute("cf")), + Bytes.toString(deserialized.getAttribute("cf"))); + assertEquals(0, Bytes.compareTo(scan.getStartRow(), deserialized.getStartRow())); + assertEquals(scan.includeStartRow(), deserialized.includeStartRow()); + assertEquals(0, Bytes.compareTo(scan.getStopRow(), deserialized.getStopRow())); + assertEquals(scan.includeStopRow(), deserialized.includeStopRow()); + assertEquals(scan.getFilter().getClass().getName(), + deserialized.getFilter().getClass().getName()); + assertEquals(scan.getFamilyMap().size(), deserialized.getFamilyMap().size()); + assertEquals(scan.getMaxResultsPerColumnFamily(), deserialized.getMaxResultsPerColumnFamily()); + assertEquals(scan.getRowOffsetPerColumnFamily(), deserialized.getRowOffsetPerColumnFamily()); + assertEquals(scan.isReversed(), deserialized.isReversed()); + assertEquals(scan.getConsistency(), deserialized.getConsistency()); + assertEquals(scan.getCaching(), deserialized.getCaching()); + assertEquals(scan.getReadType(), deserialized.getReadType()); + assertEquals(scan.isNeedCursorResult(), deserialized.isNeedCursorResult()); + assertEquals(scan.isQueryMetricsEnabled(), deserialized.isQueryMetricsEnabled()); + } } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java index edd2da4129ac..ebebc2d2b6b7 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -304,13 +304,14 @@ public void testWithMockedMapReduceWithNoStartRowStopRow() throws Exception { } } - @Test - public void testScanLimit() throws Exception { + private void testScanLimit(int confLimit, Scan scan, int expectedLimit) throws Exception { final TableName tableName = TableName.valueOf(name.getMethodName()); final String snapshotName = tableName + "Snapshot"; Table table = null; try { - UTIL.getConfiguration().setInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 10); + if (confLimit > 0) { + UTIL.getConfiguration().setInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, confLimit); + } if (UTIL.getAdmin().tableExists(tableName)) { UTIL.deleteTable(tableName); } @@ -332,7 +333,6 @@ public void testScanLimit() throws Exception { Job job = new Job(UTIL.getConfiguration()); Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); - Scan scan = new Scan(); TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), TestTableSnapshotInputFormat.class); @@ -340,7 +340,7 @@ public void testScanLimit() throws Exception { RowCounter.RowCounterMapper.class, NullWritable.class, NullWritable.class, job, true, tmpTableDir); Assert.assertTrue(job.waitForCompletion(true)); - Assert.assertEquals(10 * regionNum, + Assert.assertEquals(expectedLimit * regionNum, job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS).getValue()); } finally { if (table != null) { @@ -352,6 +352,26 @@ public void testScanLimit() throws Exception { } } + @Test + public void testScanLimitOnConfig() throws Exception { + testScanLimit(10, new Scan(), 10); + } + + @Test + public void testScanLimitOnScan() throws Exception { + testScanLimit(0, new Scan().setLimit(10), 10); + } + + @Test + public void testScanLimitOnBothButScanWins() throws Exception { + testScanLimit(10, new Scan().setLimit(5), 5); + } + + @Test + public void testScanLimitOnBothButConfigWins() throws Exception { + testScanLimit(5, new Scan().setLimit(10), 5); + } + @Test public void testNoDuplicateResultsWhenSplitting() throws Exception { TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting"); diff --git a/hbase-protocol-shaded/src/main/protobuf/client/Client.proto b/hbase-protocol-shaded/src/main/protobuf/client/Client.proto index 874df527fc87..cd9136760daa 100644 --- a/hbase-protocol-shaded/src/main/protobuf/client/Client.proto +++ b/hbase-protocol-shaded/src/main/protobuf/client/Client.proto @@ -281,6 +281,7 @@ message Scan { optional ReadType readType = 23 [default = DEFAULT]; optional bool need_cursor_result = 24 [default = false]; optional bool query_metrics_enabled = 25 [default = false]; + optional uint32 limit = 26; } /**