-
Notifications
You must be signed in to change notification settings - Fork 2k
[Server-Side Planning] Add filter pushdown infrastructure #5625
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
murali-db
wants to merge
5
commits into
delta-io:master
Choose a base branch
from
murali-db:upstream/row4-filter-infrastructure
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
[Server-Side Planning] Add filter pushdown infrastructure #5625
murali-db
wants to merge
5
commits into
delta-io:master
from
murali-db:upstream/row4-filter-infrastructure
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
) * Add ServerSidePlanningClient interface and test implementations Core interface for server-side scan planning: - ServerSidePlanningClient trait with planScan method - ServerSidePlanningClientFactory trait with createClient and buildForCatalog - IcebergRESTCatalogPlanningClientFactory (uses reflection to load Iceberg impl) - MockServerSidePlanningClient for testing - ServerSidePlanningTestClient using Spark for file discovery The interface is intentionally simple with no Iceberg dependencies, allowing it to live in the delta-spark module. The buildForCatalog method reads catalog-specific configuration from spark.sql.catalog.<name>.uri/token. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Add ServerSidePlannedTable DSv2 table implementation Implements DSv2 Table/Scan/Batch interfaces for server-side planning: - ServerSidePlannedTable: DSv2 table backed by server-side planning client - ServerSidePlannedScan/Batch: Scan and batch implementations - ServerSidePlannedFilePartition: Custom InputPartition for file-based reads - ServerSidePlanningTestClient: Test client using Spark for file discovery This allows Delta to read tables using file lists from remote planning services (e.g., Iceberg REST catalog) without local file discovery. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Rename namespace to database for consistency Rename 'namespace' parameter to 'database' throughout ServerSidePlannedTable and its related classes to match the naming in ServerSidePlanningClient.planScan(). Also update tests to use the new parameter names and fix references to removed schema field in ScanPlan and renamed factory class. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Fix scalastyle: use Locale.ROOT for toLowerCase Use toLowerCase(Locale.ROOT) instead of toLowerCase() to ensure consistent behavior regardless of default locale. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Remove redundant mock client tests The mock client tests only verified DSv2 API wiring without reading actual data. The end-to-end tests with ServerSidePlanningTestClient cover the same functionality plus actual data reading, making the mock tests redundant. Removed: - MockServerSidePlanningClient and MockServerSidePlanningClientFactory - All unit tests using mock client - Updated factory registry test to use test client instead 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Rename ServerSidePlanningTestClient to TestServerSidePlanningClient Renamed for consistency with naming conventions (Test prefix). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Remove createClient method and consolidate on buildForCatalog Having two methods with different names (createClient vs buildForCatalog) was confusing. Removed createClient and updated all tests to use buildForCatalog("spark_catalog") instead for consistency. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Pass SparkSession and DeltaLog as parameters, use deltaLog.newDeltaHadoopConf() Following the DeltaTableV2 pattern, ServerSidePlannedTable now accepts SparkSession and DeltaLog as constructor parameters instead of calling SparkSession.active internally. This ensures proper initialization and follows Delta Lake conventions. Changed ServerSidePlannedFilePartitionReaderFactory to use deltaLog.newDeltaHadoopConf() instead of sessionState.newHadoopConf(). This properly includes DataFrame options like custom S3 credentials that users may pass in, matching the behavior of other Delta code. Updated tests to use Delta tables (required for DeltaLog) instead of plain Parquet tables. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Remove .bazelbsp files 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Add comment explaining fully qualified name in ServerSidePlannedTable.name() 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Update comment to clarify test client is for testing without real server-side planning 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Remove redundant unit test and consolidate into comprehensive integration test The first test was testing direct instantiation of ServerSidePlannedTable, which never happens in production. Removed it and added the detailed assertions (file format check, partition count) to the integration test that goes through the actual production code path via DeltaCatalog. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Remove unused forceServerSidePlanning config from test This config is not implemented or checked anywhere in the production code. The test works by setting the ServerSidePlanningClientFactory directly. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Use checkAnswer pattern instead of manual assertions Replaced manual row-by-row assertions with QueryTest.checkAnswer(), which is the standard pattern in Spark tests for comparing query results. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Fix line length scalastyle violation Split long line in newScanBuilder to comply with 100 character limit. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Fix DeltaLog.forTable method call syntax Use the simpler overload that takes catalogTable directly instead of path with named parameter. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Fix test to create table and insert data before configuring DeltaCatalog ServerSidePlannedTable only supports reads (SupportsRead), not writes. Restructured test to: 1. Create table and INSERT data using default catalog (supports writes) 2. Configure DeltaCatalog and test factory 3. Execute SELECT through ServerSidePlannedTable (read-only) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Fix try-finally block nesting syntax error 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Remove DeltaLog dependency, use Parquet tables, and consolidate tests - Remove DeltaLog parameter from ServerSidePlannedTable and related classes - Accept SparkSession as constructor parameter (following DeltaTableV2 pattern) - Use sessionState.newHadoopConf() with scalastyle suppression and comment explaining the limitation - Change tests from Delta to Parquet tables (no longer need DeltaLog) - Merge two tests into one comprehensive end-to-end test with fine-grained assertions - Fix URL decoding issue in TestServerSidePlanningClient for input_file_name() paths - Add detailed assertions for factory registry, file discovery, table metadata, and reader creation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Remove IcebergRESTCatalogPlanningClientFactory from interface PR Move IcebergRESTCatalogPlanningClientFactory to PR B where the actual Iceberg implementation lives. Update factory registry to require explicit registration instead of defaulting to Iceberg factory. This makes PR A self-contained with no dependencies on unimplemented classes. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Add explanatory comments for caselocale scalastyle suppressions --------- Co-authored-by: Claude <[email protected]>
8 tasks
0f9b321 to
c38c581
Compare
- Update copyright year from 2021 to 2025 in all files - Rename 'database' parameter to 'databaseName' for consistency - Rename 'schema' parameter to 'tableSchema' in PartitionReaderFactory - Move ServerSidePlannedTable to serverSidePlanning package - Update Hadoop config comment to clarify intentional design - Make factory classes private[serverSidePlanning] - Simplify factory API: replace getFactory() and buildForCatalog() with single getClient() method - Remove unnecessary docstring line from TestServerSidePlanningClient - Simplify getFileFormat() to return only 'parquet' for test code - Update imports in ServerSidePlannedTableSuite
Update ServerSidePlannedTableSuite to match API changes: - Remove getFactory() assertion (method made package-private) - Replace buildForCatalog() with getClient() - Rename database parameter to databaseName Addresses CI compilation errors after addressing TD's review comments.
…able and add E2E tests (#9) * [Server-Side Planning] Integrate DeltaCatalog with ServerSidePlannedTable and add E2E tests Implements DeltaCatalog integration to use ServerSidePlannedTable when Unity Catalog tables lack credentials, with comprehensive end-to-end testing and improvements. Key changes: - Add loadTable() logic in DeltaCatalog to detect UC tables without credentials - Implement hasCredentials() to check for credential properties in table metadata - Add ENABLE_SERVER_SIDE_PLANNING config flag for testing - Add comprehensive integration tests with reflection-based credential testing - Add Spark source code references for Identifier namespace structure - Improve test suite by removing redundant aggregation test - Revert verbose documentation comments in ServerSidePlannedTable Test coverage: - E2E: Full stack integration with DeltaCatalog - E2E: Verify normal path unchanged when feature disabled - loadTable() decision logic with ENABLE_SERVER_SIDE_PLANNING config (tests 3 scenarios including UC without credentials via reflection) See Spark's LookupCatalog, CatalogAndIdentifier and ResolveSessionCatalog for Identifier namespace structure references. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Refactor PR9: Extract decision logic and improve test quality Changes: - Extracted shouldUseServerSidePlanning() method with boolean inputs - Replaced reflection-based test with clean unit test - Tests all input combinations without brittle reflection code - Improved testability and maintainability 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Clean up ServerSidePlannedTable: Remove unnecessary helper function - Remove misleading "Fallback" comment that didn't apply to all cases - Inline create() function into tryCreate() to reduce indirection - Simplify logic: directly handle client creation in try-catch 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Remove unnecessary logging and unused imports from ServerSidePlannedTable - Remove conditional logging that differentiated between forced and fallback paths - Remove unused imports: MDC and DeltaLogKeys 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Remove useless AutoCloseable implementation from ServerSidePlannedTable The close() method was never called because: - Spark's Table interface has no lifecycle hooks - No code explicitly called close() on ServerSidePlannedTable instances - No try-with-resources or Using() blocks wrapped it HTTP connection cleanup happens via connection timeouts (30s) and JVM finalization, making AutoCloseable purely ceremonial dead code. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Simplify verbose test suite comments Reduced 20+ line formatted comments to simple 2-line descriptions. The bullet-pointed lists were over-documenting obvious test structure. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Remove useless forTesting() wrapper method The forTesting() method was just a wrapper around 'new' that added no value. Tests now directly instantiate ServerSidePlannedTable with the constructor. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Fix brittle test assertion for table capabilities Removed assertion that table has exactly 1 capability, which would break if we add streaming support (MICRO_BATCH_READ, CONTINUOUS_READ) or other non-write capabilities later. Now tests what actually matters: supports BATCH_READ, does NOT support BATCH_WRITE. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Remove redundant SupportsWrite interface check in test Testing !isInstanceOf[SupportsWrite] is redundant with checking !capabilities.contains(BATCH_WRITE) because: - BATCH_WRITE capability requires SupportsWrite interface - Having SupportsWrite without BATCH_WRITE would violate Spark contract The capability check tests the public API contract, which is sufficient. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Remove e2e/integration terminology from ServerSidePlanningSuite Changed: - Test names: removed "E2E:" prefix - Database name: integration_db → test_db - Table name: e2e_test → planning_test These tests use mock clients, not external systems, so e2e/integration terminology was misleading. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Merge test suites: keep existing ServerSidePlannedTableSuite, delete new ServerSidePlanningSuite ServerSidePlanningSuite was added in this PR, while ServerSidePlannedTableSuite existed before. Merged them by keeping the existing file and deleting the new one, so the PR shows modification rather than deletion+addition. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Refactor tests and improve documentation - Refactor ServerSidePlannedTableSuite: create database/table once in beforeAll() - Use minimal early-return pattern in DeltaCatalog with oss-only markers - Move current snapshot limitation docs to ServerSidePlanningClient interface - Add UC credential injection link to hasCredentials() documentation - Lowercase test names and remove redundant client verification 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Simplify current snapshot limitation documentation Shorten documentation from 4 lines to 1 concise line. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Address PR9 review feedback Changes: 1. Remove unnecessary afterEach() cleanup - no resource leaks to prevent 2. Make test 2 explicit by setting config=false instead of relying on cleanup 3. Remove redundant test "loadTable() decision logic" - already covered by other tests 4. Add explanation for deltahadoopconfiguration scalastyle suppression Tests: 4/4 passing, scalastyle clean 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Address PR9 review comments: improve test quality and cleanup - Add withServerSidePlanningEnabled helper method to prevent test pollution - Encapsulates factory and config setup/teardown in one place - Guarantees cleanup in finally block - Prevents tests from interfering with each other - Replace white-box capability check with black-box insert test - Test actual insert behavior instead of inspecting capabilities - Verifies inserts succeed without SSP, fail with SSP enabled - More realistic end-to-end test of read-only behavior - Remove OSS-only marker comments from DeltaCatalog - Clean up // oss-only-start and // oss-only-end comments - Remove unused import (DeltaCatalog) All tests passing (4/4): - full query through DeltaCatalog with server-side planning - verify normal path unchanged when feature disabled - shouldUseServerSidePlanning() decision logic - ServerSidePlannedTable is read-only 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> --------- Co-authored-by: Claude <[email protected]>
Add generic filter pushdown infrastructure to ServerSidePlanningClient: - Add filter parameter to planScan() interface using Spark's Filter type - Implement SupportsPushDownFilters in ServerSidePlannedScanBuilder - Wire filters through to ServerSidePlannedScan and planScan() call - Add FilterCapturingTestClient for testing filter flow - Add 3 tests verifying filters are passed through to planning client Changes: - ServerSidePlanningClient.planScan() now accepts optional filter parameter - Filter uses Spark's org.apache.spark.sql.sources.Filter (catalog-agnostic) - ServerSidePlannedScanBuilder implements SupportsPushDownFilters - pushFilters() accepts all filters (returns Array.empty) - Filters combined with And and passed to planScan() - Documentation explains each catalog converts to their native format - Test infrastructure captures and validates filter objects Testing: - Test 1: Simple EqualTo filter (WHERE id = 2) - Test 2: Compound And filter (WHERE id > 1 AND value < 30) - Test 3: No filter when no WHERE clause Note: Residual filter handling (returning unsupported filters) will be added in future PR with catalog-specific converter implementations. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
c38c581 to
e1cc187
Compare
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Summary
Stack Position: This PR is stacked on top of #5622. Please review in order.
To review only this PR's changes, use this comparison:
murali-db:upstream/row2-deltacatalog-integration...murali-db:upstream/row4-filter-infrastructure
Adds generic filter pushdown infrastructure to ServerSidePlanningClient:
Key Changes
org.apache.spark.sql.sources.Filter(catalog-agnostic)None(maintains backward compatibility)Testing
3 new tests added (7 total):
```
spark/testOnly org.apache.spark.sql.delta.serverSidePlanning.ServerSidePlannedTableSuite
```
Design Notes