Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,9 @@ public static ClientProtos.Scan toScan(final Scan scan) throws IOException {
scanBuilder.setNeedCursorResult(true);
}
scanBuilder.setQueryMetricsEnabled(scan.isQueryMetricsEnabled());
if (scan.getLimit() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not set limit here? Strange, if this is the case, lots of our limit related UTs should fail?

Copy link
Member Author

@junegunn junegunn Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The absence of it only matters when Protobuf serialization is involved. For example, in the existing test cases in TestTableInputFormat, 1. no serialization was performed, and 2. no Scan#setLimit call was made, so the issue never surfaced. So I guess this particular case (Protobuf serialization of a Scan object with a non-default limit) wasn't previously covered by the unit tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, limit is in ScanRequest, not in Scan, Scan is just part of a ScanRequest.

So we should not change the code here.

scanBuilder.setLimit(scan.getLimit());
}
return scanBuilder.build();
}

Expand Down Expand Up @@ -1204,6 +1207,9 @@ public static Scan toScan(final ClientProtos.Scan proto) throws IOException {
scan.setNeedCursorResult(true);
}
scan.setQueryMetricsEnabled(proto.getQueryMetricsEnabled());
if (proto.hasLimit()) {
scan.setLimit(proto.getLimit());
}
return scan;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,10 @@ public ClientSideRegionScanner getScanner() {
public void initialize(InputSplit split, Configuration conf) throws IOException {
this.scan = TableMapReduceUtil.convertStringToScan(split.getScan());
this.split = split;
this.rowLimitPerSplit = conf.getInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 0);
int confLimit = conf.getInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 0);
int scanLimit = Math.max(scan.getLimit(), 0);
this.rowLimitPerSplit =
confLimit == 0 ? scanLimit : scanLimit == 0 ? confLimit : Math.min(confLimit, scanLimit);
Comment on lines +260 to +263
Copy link

Copilot AI Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The logic for merging confLimit and scanLimit uses nested ternary operators which are difficult to read and maintain. Consider refactoring to use if-else statements or extracting this logic into a separate helper method with a descriptive name like resolveLimitPerSplit(int confLimit, int scanLimit).

Copilot uses AI. Check for mistakes.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A fair suggestion, but I feel the current code isn't complicated enough to warrant refactoring.

TableDescriptor htd = split.htd;
RegionInfo hri = this.split.getRegionInfo();
FileSystem fs = CommonFSUtils.getCurrentFileSystem(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,12 @@ static boolean checkResult(Result r, ImmutableBytesWritable key, byte[] expected
* Create table data and run tests on specified htable using the o.a.h.hbase.mapreduce API.
*/
static void runTestMapreduce(Table table) throws IOException, InterruptedException {
runTestMapreduce(table, new Scan());
}

static void runTestMapreduce(Table table, Scan s) throws IOException, InterruptedException {
org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr =
new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl();
Scan s = new Scan();
s.withStartRow(Bytes.toBytes("aaa"));
s.withStopRow(Bytes.toBytes("zzz"));
s.addFamily(FAMILY);
Expand All @@ -171,6 +174,10 @@ static void runTestMapreduce(Table table) throws IOException, InterruptedExcepti
checkResult(r, key, Bytes.toBytes("aaa"), Bytes.toBytes("value aaa"));

more = trr.nextKeyValue();
if (s.getLimit() == 1) {
assertFalse(more);
return;
}
assertTrue(more);
key = trr.getCurrentKey();
r = trr.getCurrentValue();
Expand Down Expand Up @@ -254,6 +261,20 @@ public void testTableRecordReaderMapreduce() throws IOException, InterruptedExce
runTestMapreduce(table);
}

/**
* Run test assuming no errors using newer mapreduce api with a Scan object with limit set
*/
@Test
public void testTableRecordReaderMapreduceLimit1() throws IOException, InterruptedException {
Table table = createTable(Bytes.toBytes("table1-mr-limit-1"));
Scan scan = new Scan();
scan.setLimit(1);

// Serialize and deserialize the Scan to mimic actual usage.
runTestMapreduce(table,
TableMapReduceUtil.convertStringToScan(TableMapReduceUtil.convertScanToString(scan)));
}

/**
* Run test assuming Scanner IOException failure using newer mapreduce api
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
Expand Down Expand Up @@ -295,4 +298,67 @@ public void testInitCredentialsForClusterUri() throws Exception {
kdc.stop();
}
}

@Test
public void testScanSerialization() throws IOException {
final byte[] cf = "cf".getBytes();
final Scan scan = new Scan();
scan.setLimit(1);
scan.setBatch(1);
scan.setMaxResultSize(1);
scan.setAllowPartialResults(true);
scan.setLoadColumnFamiliesOnDemand(true);
scan.readVersions(1);
scan.setColumnFamilyTimeRange(cf, 0, 1);
scan.setTimeRange(0, 1);
scan.setAttribute("cf", cf);
scan.withStartRow("0".getBytes(), false);
scan.withStopRow("1".getBytes(), true);
scan.setFilter(new KeyOnlyFilter());
scan.addColumn(cf, cf);
scan.setMaxResultsPerColumnFamily(1);
scan.setRowOffsetPerColumnFamily(1);
scan.setReversed(true);
scan.setConsistency(Consistency.TIMELINE);
scan.setCaching(1);
scan.setReadType(Scan.ReadType.STREAM);
scan.setNeedCursorResult(true);
scan.setQueryMetricsEnabled(true);

final String serialized = TableMapReduceUtil.convertScanToString(scan);
final Scan deserialized = TableMapReduceUtil.convertStringToScan(serialized);
final String reserialized = TableMapReduceUtil.convertScanToString(deserialized);

// Verify that serialization is symmetric
assertEquals(serialized, reserialized);

// Verify individual fields to catch potential omissions
assertEquals(scan.getLimit(), deserialized.getLimit());
assertEquals(scan.getBatch(), deserialized.getBatch());
assertEquals(scan.getMaxResultSize(), deserialized.getMaxResultSize());
assertEquals(scan.getAllowPartialResults(), deserialized.getAllowPartialResults());
assertEquals(scan.getLoadColumnFamiliesOnDemandValue(),
deserialized.getLoadColumnFamiliesOnDemandValue());
assertEquals(scan.getMaxVersions(), deserialized.getMaxVersions());
assertEquals(scan.getColumnFamilyTimeRange().get(cf).toString(),
deserialized.getColumnFamilyTimeRange().get(cf).toString());
assertEquals(scan.getTimeRange().toString(), deserialized.getTimeRange().toString());
assertEquals(Bytes.toString(scan.getAttribute("cf")),
Bytes.toString(deserialized.getAttribute("cf")));
assertEquals(0, Bytes.compareTo(scan.getStartRow(), deserialized.getStartRow()));
assertEquals(scan.includeStartRow(), deserialized.includeStartRow());
assertEquals(0, Bytes.compareTo(scan.getStopRow(), deserialized.getStopRow()));
assertEquals(scan.includeStopRow(), deserialized.includeStopRow());
assertEquals(scan.getFilter().getClass().getName(),
deserialized.getFilter().getClass().getName());
assertEquals(scan.getFamilyMap().size(), deserialized.getFamilyMap().size());
assertEquals(scan.getMaxResultsPerColumnFamily(), deserialized.getMaxResultsPerColumnFamily());
assertEquals(scan.getRowOffsetPerColumnFamily(), deserialized.getRowOffsetPerColumnFamily());
assertEquals(scan.isReversed(), deserialized.isReversed());
assertEquals(scan.getConsistency(), deserialized.getConsistency());
assertEquals(scan.getCaching(), deserialized.getCaching());
assertEquals(scan.getReadType(), deserialized.getReadType());
assertEquals(scan.isNeedCursorResult(), deserialized.isNeedCursorResult());
assertEquals(scan.isQueryMetricsEnabled(), deserialized.isQueryMetricsEnabled());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,14 @@ public void testWithMockedMapReduceWithNoStartRowStopRow() throws Exception {
}
}

@Test
public void testScanLimit() throws Exception {
private void testScanLimit(int confLimit, Scan scan, int expectedLimit) throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
final String snapshotName = tableName + "Snapshot";
Table table = null;
try {
UTIL.getConfiguration().setInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 10);
if (confLimit > 0) {
UTIL.getConfiguration().setInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, confLimit);
}
if (UTIL.getAdmin().tableExists(tableName)) {
UTIL.deleteTable(tableName);
}
Expand All @@ -332,15 +333,14 @@ public void testScanLimit() throws Exception {

Job job = new Job(UTIL.getConfiguration());
Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
Scan scan = new Scan();
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
TestTableSnapshotInputFormat.class);

TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
RowCounter.RowCounterMapper.class, NullWritable.class, NullWritable.class, job, true,
tmpTableDir);
Assert.assertTrue(job.waitForCompletion(true));
Assert.assertEquals(10 * regionNum,
Assert.assertEquals(expectedLimit * regionNum,
job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS).getValue());
} finally {
if (table != null) {
Expand All @@ -352,6 +352,26 @@ public void testScanLimit() throws Exception {
}
}

@Test
public void testScanLimitOnConfig() throws Exception {
testScanLimit(10, new Scan(), 10);
}

@Test
public void testScanLimitOnScan() throws Exception {
testScanLimit(0, new Scan().setLimit(10), 10);
}

@Test
public void testScanLimitOnBothButScanWins() throws Exception {
testScanLimit(10, new Scan().setLimit(5), 5);
}

@Test
public void testScanLimitOnBothButConfigWins() throws Exception {
testScanLimit(5, new Scan().setLimit(10), 5);
}
Comment on lines +355 to +373
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To verify that the smaller value from the two sources is chosen. Maybe a bit overkill for such a simple logic? Considering each case takes quite a while to run.


@Test
public void testNoDuplicateResultsWhenSplitting() throws Exception {
TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ message Scan {
optional ReadType readType = 23 [default = DEFAULT];
optional bool need_cursor_result = 24 [default = false];
optional bool query_metrics_enabled = 25 [default = false];
optional uint32 limit = 26;
}

/**
Expand Down