Skip to content

Conversation

@zikangh
Copy link
Collaborator

@zikangh zikangh commented Nov 14, 2025

🥞 Stacked PR

Use this link to review incremental changes.


Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

In this PR, we implement the DSv2 methods planInputPartitions() and createReaderFactory().
These are DSv2-only API that will replace DSv1's getBatch()

planInputPartitions(): Returns physical partitions describing how to read the data; Called once per micro-batch during planning
createReaderFactory(): Returns a factory that creates readers for the partitions; Each executor uses this factory to read its assigned partitions

How was this patch tested?

Does this PR introduce any user-facing changes?

@zikangh zikangh changed the title minor change [kernel-spark] planInputPartitions Nov 17, 2025
@zikangh zikangh changed the title [kernel-spark] planInputPartitions [WIP] [kernel-spark] planInputPartitions Nov 17, 2025
@zikangh zikangh force-pushed the stack/plan1 branch 7 times, most recently from 5382311 to 90345c7 Compare December 1, 2025 19:11
@zikangh zikangh changed the title [WIP] [kernel-spark] planInputPartitions [kernel-spark] Implement planInputPartitions and createReaderFactory for dsv2 streaming Dec 1, 2025
huan233usc pushed a commit that referenced this pull request Dec 1, 2025
…treaming (#5409)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/5409/files) to
review incremental changes.
-
[**stack/latestsnapshot2**](#5409)
[[Files changed](https://github.com/delta-io/delta/pull/5409/files)]
- [stack/initialoffset2](#5498)
[[Files
changed](https://github.com/delta-io/delta/pull/5498/files/1718356813a6b39c80585d36e7aac6c8abc3a6a0..9833eaf816ee2f1dcf94d5d9a47136e69fd26336)]
- [stack/plan1](#5499) [[Files
changed](https://github.com/delta-io/delta/pull/5499/files/9833eaf816ee2f1dcf94d5d9a47136e69fd26336..90345c732c6bd182c51648a4b875fdce2c14fc63)]
- [stack/integration](#5572)
[[Files
changed](https://github.com/delta-io/delta/pull/5572/files/90345c732c6bd182c51648a4b875fdce2c14fc63..813a49a41719ef4b773caf5438c975c8f77c646b)]
      - stack/reader

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
We add implementation for `latestOffset(startOffset, limit)` and
`getDefaultReadLimit()` for a complete `SupportsAdmissionControl`
implementation.
Also refactored a few `DeltaSource.scala` methods -- we make them static
so we can call them from SparkMicrobatchStream.java.

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2
(SparkMicroBatchStream).



## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No

---------

Signed-off-by: TimothyW553 <[email protected]>
Signed-off-by: Timothy Wang <[email protected]>
Co-authored-by: Claude <[email protected]>
Co-authored-by: Timothy Wang <[email protected]>
TimothyW553 added a commit to TimothyW553/delta that referenced this pull request Dec 2, 2025
…treaming (delta-io#5409)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/5409/files) to
review incremental changes.
-
[**stack/latestsnapshot2**](delta-io#5409)
[[Files changed](https://github.com/delta-io/delta/pull/5409/files)]
- [stack/initialoffset2](delta-io#5498)
[[Files
changed](https://github.com/delta-io/delta/pull/5498/files/1718356813a6b39c80585d36e7aac6c8abc3a6a0..9833eaf816ee2f1dcf94d5d9a47136e69fd26336)]
- [stack/plan1](delta-io#5499) [[Files
changed](https://github.com/delta-io/delta/pull/5499/files/9833eaf816ee2f1dcf94d5d9a47136e69fd26336..90345c732c6bd182c51648a4b875fdce2c14fc63)]
- [stack/integration](delta-io#5572)
[[Files
changed](https://github.com/delta-io/delta/pull/5572/files/90345c732c6bd182c51648a4b875fdce2c14fc63..813a49a41719ef4b773caf5438c975c8f77c646b)]
      - stack/reader

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
We add implementation for `latestOffset(startOffset, limit)` and
`getDefaultReadLimit()` for a complete `SupportsAdmissionControl`
implementation.
Also refactored a few `DeltaSource.scala` methods -- we make them static
so we can call them from SparkMicrobatchStream.java.

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2
(SparkMicroBatchStream).



## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No

---------

Signed-off-by: TimothyW553 <[email protected]>
Signed-off-by: Timothy Wang <[email protected]>
Co-authored-by: Claude <[email protected]>
Co-authored-by: Timothy Wang <[email protected]>

List<PartitionedFile> partitionedFiles = new ArrayList<>();
long totalBytesToRead = 0;
try (CloseableIterator<IndexedFile> fileChanges =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: is it possible to push down dataFilters when getting the file list?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to support data skipping in kernel's getChange API

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but we don't need to add this support for now, because the DSv1 connector doesn't actually enable data skipping.

Comment on lines 286 to 303
InternalRow partitionRow =
PartitionUtils.getPartitionRow(
addFile.getPartitionValues(),
partitionSchema,
ZoneId.of(sqlConf.sessionLocalTimeZone()));
// Preferred node locations are not used.
String[] preferredLocations = new String[0];
// Constant metadata columns are not used.
scala.collection.immutable.Map<String, Object> otherConstantMetadataColumnValues =
scala.collection.immutable.Map$.MODULE$.empty();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can those code be shared between batch and streaming

PartitionedFile buildPartitionFile(AddFile, partitionSchema)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@zikangh zikangh force-pushed the stack/plan1 branch 5 times, most recently from 154897c to 35731eb Compare December 5, 2025 00:59
huan233usc pushed a commit that referenced this pull request Dec 5, 2025
## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/5498/files) to
review incremental changes.
-
[**stack/initialoffset2**](#5498)
[[Files changed](https://github.com/delta-io/delta/pull/5498/files)]
- [stack/plan1](#5499) [[Files
changed](https://github.com/delta-io/delta/pull/5499/files/90e1d9ba4b26d039bfa1b870e693e73204201750..35731eb6ffcb10f85ed97b04058e3bf49de771d8)]
- [stack/integration](#5572)
[[Files
changed](https://github.com/delta-io/delta/pull/5572/files/35731eb6ffcb10f85ed97b04058e3bf49de771d8..9c2e743cff0c1fcb8cf6ddf8efa3a1b98fddba3c)]
      - stack/snapshot1
- [stack/reader](#5638) [[Files
changed](https://github.com/delta-io/delta/pull/5638/files/35731eb6ffcb10f85ed97b04058e3bf49de771d8..35731eb6ffcb10f85ed97b04058e3bf49de771d8)]

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
We finish implementing `initialOffset()` in
`SparkMicroBatchStream.java`.

The `initialOffset()` method determines where a streaming query should
start reading when there's no checkpointed offset. This is a DSv2-only
API.

Details: 

- Added `isFirstBatch` tracking field - Boolean flag to track whether
we're processing the first batch (set to true in initialOffset())
- Updated `latestOffset(startOffset, limit)` - Now handles first batch
differently by returning null (not `previousOffset`) when no data is
available, matching DSv1's `getStartingOffsetFromSpecificDeltaVersion`
behavior


## How was this patch tested?
Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2
(SparkMicroBatchStream).
<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
@zikangh zikangh requested a review from gengliangwang December 5, 2025 19:25
@zikangh zikangh requested a review from huan233usc December 5, 2025 19:25
}

@Test
public void testGetFileChanges_StartingVersionAfterCheckpointAndLogCleanup(@TempDir File tempDir)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this test case removed by mistake?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted. thanks!

this.scalaOptions = Objects.requireNonNull(scalaOptions, "scalaOptions is null");

// Initialize snapshot at source init to get table ID, similar to DeltaSource.scala
Snapshot snapshotAtSourceInit = snapshotManager.loadLatestSnapshot();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkScan already has initialSnapshot available. The snapshot might be loaded redundantly

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. this was brought in by mistake after resolving merge conflicts with master.

@zikangh zikangh requested a review from gengliangwang December 6, 2025 03:14
harperjiang pushed a commit to harperjiang/delta that referenced this pull request Dec 8, 2025
…#5498)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/5498/files) to
review incremental changes.
-
[**stack/initialoffset2**](delta-io#5498)
[[Files changed](https://github.com/delta-io/delta/pull/5498/files)]
- [stack/plan1](delta-io#5499) [[Files
changed](https://github.com/delta-io/delta/pull/5499/files/90e1d9ba4b26d039bfa1b870e693e73204201750..35731eb6ffcb10f85ed97b04058e3bf49de771d8)]
- [stack/integration](delta-io#5572)
[[Files
changed](https://github.com/delta-io/delta/pull/5572/files/35731eb6ffcb10f85ed97b04058e3bf49de771d8..9c2e743cff0c1fcb8cf6ddf8efa3a1b98fddba3c)]
      - stack/snapshot1
- [stack/reader](delta-io#5638) [[Files
changed](https://github.com/delta-io/delta/pull/5638/files/35731eb6ffcb10f85ed97b04058e3bf49de771d8..35731eb6ffcb10f85ed97b04058e3bf49de771d8)]

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
We finish implementing `initialOffset()` in
`SparkMicroBatchStream.java`.

The `initialOffset()` method determines where a streaming query should
start reading when there's no checkpointed offset. This is a DSv2-only
API.

Details: 

- Added `isFirstBatch` tracking field - Boolean flag to track whether
we're processing the first batch (set to true in initialOffset())
- Updated `latestOffset(startOffset, limit)` - Now handles first batch
differently by returning null (not `previousOffset`) when no data is
available, matching DSv1's `getStartingOffsetFromSpecificDeltaVersion`
behavior


## How was this patch tested?
Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2
(SparkMicroBatchStream).
<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->

@ParameterizedTest
@MethodSource("planInputPartitionsParameters")
public void testPlanInputPartitions_DataParity(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: naming testPlanInputPartitions_dataParity

@gengliangwang gengliangwang merged commit fd1fbbe into delta-io:master Dec 9, 2025
19 checks passed
TimothyW553 pushed a commit to TimothyW553/delta that referenced this pull request Dec 10, 2025
…#5498)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/5498/files) to
review incremental changes.
-
[**stack/initialoffset2**](delta-io#5498)
[[Files changed](https://github.com/delta-io/delta/pull/5498/files)]
- [stack/plan1](delta-io#5499) [[Files
changed](https://github.com/delta-io/delta/pull/5499/files/90e1d9ba4b26d039bfa1b870e693e73204201750..35731eb6ffcb10f85ed97b04058e3bf49de771d8)]
- [stack/integration](delta-io#5572)
[[Files
changed](https://github.com/delta-io/delta/pull/5572/files/35731eb6ffcb10f85ed97b04058e3bf49de771d8..9c2e743cff0c1fcb8cf6ddf8efa3a1b98fddba3c)]
      - stack/snapshot1
- [stack/reader](delta-io#5638) [[Files
changed](https://github.com/delta-io/delta/pull/5638/files/35731eb6ffcb10f85ed97b04058e3bf49de771d8..35731eb6ffcb10f85ed97b04058e3bf49de771d8)]

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
We finish implementing `initialOffset()` in
`SparkMicroBatchStream.java`.

The `initialOffset()` method determines where a streaming query should
start reading when there's no checkpointed offset. This is a DSv2-only
API.

Details: 

- Added `isFirstBatch` tracking field - Boolean flag to track whether
we're processing the first batch (set to true in initialOffset())
- Updated `latestOffset(startOffset, limit)` - Now handles first batch
differently by returning null (not `previousOffset`) when no data is
available, matching DSv1's `getStartingOffsetFromSpecificDeltaVersion`
behavior


## How was this patch tested?
Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2
(SparkMicroBatchStream).
<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
TimothyW553 pushed a commit to TimothyW553/delta that referenced this pull request Dec 10, 2025
…for dsv2 streaming (delta-io#5499)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/5499/files) to
review incremental changes.
- [**stack/plan1**](delta-io#5499) [[Files
changed](https://github.com/delta-io/delta/pull/5499/files)]
- [stack/integration](delta-io#5572)
[[Files
changed](https://github.com/delta-io/delta/pull/5572/files/a0512bb563ff00a31461c2a188e11d59f19146e1..321432605fc1efe3253b30116de2d389f6f66977)]
- [stack/integration2](delta-io#5652)
[[Files
changed](https://github.com/delta-io/delta/pull/5652/files/321432605fc1efe3253b30116de2d389f6f66977..dee64c4ca3abbdd530c232e42325e702e9d61a0b)]
- [stack/reader](delta-io#5638) [[Files
changed](https://github.com/delta-io/delta/pull/5638/files/dee64c4ca3abbdd530c232e42325e702e9d61a0b..c8f25572585682e9d540bce0c4e982dc8dc1079c)]
- [stack/lazy](delta-io#5650) [[Files
changed](https://github.com/delta-io/delta/pull/5650/files/c8f25572585682e9d540bce0c4e982dc8dc1079c..0703f5750fa765d45c5b8c04b286d05f87e0ac6c)]
- [stack/snapshot](delta-io#5651) [[Files
changed](https://github.com/delta-io/delta/pull/5651/files/0703f5750fa765d45c5b8c04b286d05f87e0ac6c..cb6efb41f4cf62ec1501f6b7dce5e8d6e926eaf1)]

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
In this PR, we implement the DSv2 methods `planInputPartitions()` and
`createReaderFactory()`.
These are DSv2-only API that will replace DSv1's `getBatch()`

`planInputPartitions()`: Returns physical partitions describing how to
read the data; Called once per micro-batch during planning
`createReaderFactory()`: Returns a factory that creates readers for the
partitions; Each executor uses this factory to read its assigned
partitions


## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
* Calculate the maximum split bytes for file partitioning, considering total bytes and file
* count. This is used for optimal file splitting in both batch and streaming read.
*/
public static long calculateMaxSplitBytes(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can we not re-use FilePartition.maxSplitBytes()?

partitionRow,
SparkPath.fromUrlString(tablePath + addFile.getPath()),
/* start= */ 0L,
/* length= */ addFile.getSize(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do we always read to the end?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, we never really split a single add file right?

assertTrue(result > 0);
assertTrue(result >= sqlConf.filesOpenCostInBytes());
assertTrue(result <= sqlConf.filesMaxPartitionBytes());
long calculatedTotalBytes = totalBytes + (long) fileCount * sqlConf.filesOpenCostInBytes();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we compare with the results from FilePartition.maxSplitBytes()?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants