-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-29699 Scan#setLimit ignored in MapReduce jobs #7432
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for a limit field to the Scan protobuf message, allowing scans to specify a maximum number of rows to return. The implementation integrates this limit with the existing configuration-based limit in table snapshot input format.
Key Changes
- Added optional
uint32 limitfield to the Scan protobuf message - Updated serialization/deserialization logic in ProtobufUtil to handle the limit field
- Modified TableSnapshotInputFormatImpl to merge scan limit with configuration limit, using the minimum of both when both are specified
- Added comprehensive test coverage including serialization tests and MapReduce integration tests
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| hbase-protocol-shaded/src/main/protobuf/client/Client.proto | Added optional uint32 limit field to Scan message |
| hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java | Added serialization/deserialization logic for scan limit field |
| hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java | Implemented logic to merge configuration and scan-based limits |
| hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java | Refactored and added test cases for limit behavior |
| hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java | Added comprehensive serialization test with limit field |
| hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java | Added test for limit=1 behavior with serialization |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| int confLimit = conf.getInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 0); | ||
| int scanLimit = Math.max(scan.getLimit(), 0); | ||
| this.rowLimitPerSplit = | ||
| confLimit == 0 ? scanLimit : scanLimit == 0 ? confLimit : Math.min(confLimit, scanLimit); |
Copilot
AI
Nov 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The logic for merging confLimit and scanLimit uses nested ternary operators which are difficult to read and maintain. Consider refactoring to use if-else statements or extracting this logic into a separate helper method with a descriptive name like resolveLimitPerSplit(int confLimit, int scanLimit).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A fair suggestion, but I feel the current code isn't complicated enough to warrant refactoring.
hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
Show resolved
Hide resolved
...-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
Show resolved
Hide resolved
|
🎊 +1 overall
This message was automatically generated. |
|
🎊 +1 overall
This message was automatically generated. |
| scanBuilder.setNeedCursorResult(true); | ||
| } | ||
| scanBuilder.setQueryMetricsEnabled(scan.isQueryMetricsEnabled()); | ||
| if (scan.getLimit() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not set limit here? Strange, if this is the case, lots of our limit related UTs should fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The absence of it only matters when Protobuf serialization is involved. For example, in the existing test cases in TestTableInputFormat, 1. no serialization was performed, and 2. no Scan#setLimit call was made, so the issue never surfaced. So I guess this particular case (Protobuf serialization of a Scan object with a non-default limit) wasn't previously covered by the unit tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, limit is in ScanRequest, not in Scan, Scan is just part of a ScanRequest.
So we should not change the code here.
| @Test | ||
| public void testScanLimitOnConfig() throws Exception { | ||
| testScanLimit(10, new Scan(), 10); | ||
| } | ||
|
|
||
| @Test | ||
| public void testScanLimitOnScan() throws Exception { | ||
| testScanLimit(0, new Scan().setLimit(10), 10); | ||
| } | ||
|
|
||
| @Test | ||
| public void testScanLimitOnBothButScanWins() throws Exception { | ||
| testScanLimit(10, new Scan().setLimit(5), 5); | ||
| } | ||
|
|
||
| @Test | ||
| public void testScanLimitOnBothButConfigWins() throws Exception { | ||
| testScanLimit(5, new Scan().setLimit(10), 5); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To verify that the smaller value from the two sources is chosen. Maybe a bit overkill for such a simple logic? Considering each case takes quite a while to run.
Apache9
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not fully understand the problem here.
In real usage, how can we set the limit on the Scan object? When using TableInputFormat or TableSnapshotInputFormat, there is no way for us to pass a Scan object in?
|
When we run an MR job or a Spark job, we define how we want to scan a table or a snapshot via a Scan object, serialize it into a String, and set it as the hbase/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Lines 154 to 193 in f800a13
TableInputFormat and TableSnapshotInputFormat then deserialize this back into a Scan. |
|
OK, so the problem is that, we do not store limit in the serialized scan object, so when deserializing, we can not get the limit. But the design of the client API is that, we do not pass the actual limit through the scan object, since it is the global limit, not per region, so we need to calculate the remaining limit for each region and pass it to region server. Maybe a possible way is to add a new config field where we serialize the Scan object with json? In this way we can add all the fields we want. Thanks. |
|
Thanks for the comment.
Correct. When you pass a Scan with a custom limit to an MR job, you would expect each mapper to return at most that number of rows, but instead you end up getting all records in the table.
I assumed that users advanced enough to run MR/Spark jobs with HBase would already understand that, in that context, each partition (region) runs its own Scan in parallel, and that When this patch is applied, users might be surprised to see that an MR job using As for storing the limit value in the serialized field, I don't see any problem with it. It might look a bit redundant, but it's harmless because it's not used anywhere else (please correct me if I'm wrong) and serves as a more accurate description of the original Scan itself. |
OK, I think this is the reason that why we do not consider scan limit in the past, under a parallel execution scenario, a global limit does not make sense. The rowsLimitPerSplit configuration is better as it says 'per split', so maybe we should also introduce a config in TableInputFormat? And when serializing the Scan object, if we find users setting scan limit, we log a warn message to tell users that this will not work?
It will increase the message size for Scan object and does not bring any advantages for Scan, so for me I prefer we do not add it if possible... |
Fair enough, however small, it's still wasteful. If I'm not mistaken, the serialized size does not increase if we don't set the optional field. So how about overloading |
|
Oh, there are some formatting problem with my reply so maybe you missed part of it... As I explained above, scan limit is a global limit, under parallel scenario a global limit does not make sense...
|
|
Thanks, that is one option. But can we update the documentation of
From a user’s perspective, even having a per-split limit is still an improvement over the current behavior. As for introducing a new dedicated option, I'd personally prefer fixing and reusing the existing interface. As long as we clearly document its behavior, I think it's better than introducing another configuration parameter for users to discover. HBase already has too many little-known parameters buried deep in the source code. |
We'd better follow the same pattern with TableSnapshotInputFormat, where we introduce a configuration to set limit per split, instead of passing it through Scan. This is a more clear solution. Not all users will look deeply into the javadoc and we introduce different meanings when using Scan limit through normal scan and map reduce job if go with your solution, which may confuse our users... |
|
I understand your point.
True, and unfortunately, those users will still complain that
In order to do that, we need to introduce an internal version of
I thought it was acceptable, because we already have constructs that behave differently in parallel scenarios (e.g. stateful filters like
So I assumed it was already well-understood that a separate Scan operates per split in such cases. But maybe that's just me. |
|
But we already have a rowsLimitPerSplit for TableSnapshotInputFormat right? So aligning the logic with TableInputFormat and TableSnapshotInputFormat is natural, and we also do not need to care about the serialization problem. The logic will be complicated if we support both(as in this PR). |
|
I see. Actually, when I opened this PR I was considering deprecating the
However, I respect your decision if you prefer not to promote By the way, |
https://issues.apache.org/jira/browse/HBASE-29699