feat(destinations): add Databricks Delta Lake destination (insert/merge/mirror) (closes #167)#629
Merged
Merged
Conversation
…ge/mirror) (closes #167) Third DWH destination alongside Snowflake (#353) and BigQuery (#584 in flight) — completes the major-DWH lineup. Supports the same three modes as Snowflake's leg: - INSERT (append, `config.mode: insert`) - MERGE (upsert via Delta Lake's native MERGE INTO, `config.mode: merge`) - sync.mode: mirror (#340 family — Databricks leg) — MERGE upsert + end-of-sync DELETE-missing Auth via Databricks SQL Connector: - `host_env` — workspace hostname (dbc-*.cloud.databricks.com) - `http_path_env` — SQL warehouse HTTP path (/sql/1.0/warehouses/*) - `token_env` — personal access token (PAT, dapi*) Unity Catalog three-part names (catalog.schema.table) are the default; legacy workspaces use `catalog: hive_metastore`. Merge implementation note: Databricks Delta Lake doesn't have session-local temp tables (no `CREATE TEMP TABLE` syntax), so the merge path creates a uniquely-named scratch Delta table `catalog.schema.__drt_staging_<table>` cloned from the target's schema, stages rows via per-row INSERT, executes MERGE INTO, and DROP TABLEs the staging at the end. The `__drt_staging_*` prefix makes it identifiable in audit logs. The token-bearing principal needs CREATE on the schema in addition to MODIFY on the target. Mirror semantics match the Snowflake leg of #340: - `sync.mode: mirror` forces the MERGE write path regardless of `config.mode` - End-of-sync issues `DELETE FROM <table> WHERE upsert_key NOT IN (observed)` - Composite keys use `WHERE (c1, c2) NOT IN ((v1a, v1b), ...)` form - Safety guard: skips DELETE entirely when no batch produced records 22 unit tests in tests/unit/test_databricks_destination.py cover: - Config validation (schema: YAML alias, three-part FQN in describe(), Hive Metastore catalog) - Empty-batch short-circuit (#595 contract) - databricks.sql.connect() kwargs shape — protects against silent template-copy drift from the Snowflake destination - INSERT happy path + on_error=skip / on_error=fail - MERGE happy path + upsert_key required + composite key ON clause + all-columns-are-key (no UPDATE clause) - Mirror invariants: upsert_key validation, MERGE-write-path forcing, single-column DELETE, composite-key DELETE tuple form, skip-when-no-records safety guard, no-op finalize_sync for non-mirror modes - test_connection round-trip databricks.sql is mocked via sys.modules injection — no real Databricks workspace or databricks-sql-connector install required. Requires `pip install drt-core[databricks]` (depends on databricks-sql-connector>=3.0, already in pyproject extras). New `docs/connectors/databricks.md` covers all three modes, auth flow with PAT generation steps, Unity Catalog vs Hive Metastore, the merge-path staging design (why Delta scratch table and not CREATE TEMP TABLE), and a sync-mode compatibility table. README destination table updated on both English and Japanese sides (Databricks Delta Lake row added after Snowflake, v0.7.9). i18n marker bump for README.ja.md follows the established post-merge housekeeping pattern (#618-style). Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
…e to 100% codecov/patch on the prior commit hit 94.73% (target 86.72%) — the gate passed but not at 100%. Uncovered slice was the 3 branches not exercised by the happy-path tests: - **Lines 152-163** — MERGE-path staging INSERT failure handler (the per-row try/except inside the staging-table INSERT loop) - **Line 196** — mirror's ``failed_indices`` skip path inside the ``_mirror_keys`` accumulator (skip rows that didn't make it into the destination so they don't count as "observed in source" for the end-of-sync DELETE) - **Line 202** — the ``Unsupported mode`` defensive fallthrough ValueError (unreachable in normal flow because Pydantic Literal validates at config-load time, but tracked by coverage) 4 new tests: 1. `test_merge_staging_insert_failure_on_error_skip` — first staging INSERT fails, second succeeds; verifies result.failed=1 + row_errors recorded + MERGE still runs against whatever made it into staging. 2. `test_merge_staging_insert_failure_on_error_fail_raises` — same failure scenario but with on_error=fail; verifies the exception re-raises and the connection is still closed via try/finally. 3. `test_unsupported_mode_raises` — manually corrupts ``config.mode`` to "garbage" after Pydantic construction (bypasses Literal validation via ``object.__setattr__``) and verifies the defensive ValueError fires. 4. `test_mirror_skips_failed_keys_from_delete_observed_set` — mirror load with a staging failure on row 1, then finalize_sync; verifies the DELETE's NOT-IN list contains only the survivor's key (id=2), not the failed row's key (id=1). This catches the semantic bug where a row that failed to load would be deleted from the destination on next mirror run. drt/destinations/databricks.py file coverage: 94% → 100% (119/119 stmts). Coverage now matches the S3 / GCS / Azure Blob destinations from #613 / #623 / #624. Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to subscribe to this conversation on GitHub.
Already have an account?
Sign in.
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Databricks Delta Lake destination — the third DWH destination alongside Snowflake (#353) and BigQuery (#584 in flight). Completes the major-DWH lineup. Same three-mode shape as the Snowflake leg.
config.mode: insertINSERT INTOper recordconfig.mode: mergeMERGE INTOvia staging scratch table (requiresupsert_key)sync.mode: mirrorDELETE WHERE upsert_key NOT IN (observed)Implementation
Mirrors the Snowflake destination's structure (#599 / mirror leg of #340 family) but with Databricks-specific shape:
host_env(workspace hostname) +http_path_env(SQL warehouse) +token_env(PAT,dapi*)catalog.schema.table(orcatalog: hive_metastorefor legacy workspaces)CREATE TEMP TABLEsyntax), so the merge path creates a uniquely-named scratch Delta tablecatalog.schema.__drt_staging_<table>cloned from the target's schema, stages rows, MERGEs, thenDROP TABLEs the scratchWHERE (c1, c2) NOT IN ((v1a, v1b), ...); safety guard skips DELETE entirely when no batch produced recordsExample
Mirror mode (Census "Full Sync with Deletion" equivalent):
Tests
22 unit tests in
tests/unit/test_databricks_destination.py:schema:YAML alias for the mypy-strictschema_field, three-part FQN indescribe(), Hive Metastore catalogdatabricks.sqlimport (#595 contract)databricks.sql.connect()kwargs shape (server_hostname/http_path/access_token) — protects against silent template-copy drift from the Snowflake destinationupsert_keyrequired, composite-keyONclause, all-columns-are-key (skips UPDATE clause)upsert_keyvalidation, MERGE-write-path forcing, single-column DELETE, composite-key DELETE tuple form, skip-when-no-records safety guard, no-opfinalize_syncfor non-mirror modestest_connectionround-tripdatabricks.sqlis mocked viasys.modulesinjection — no real Databricks workspace ordatabricks-sql-connectorinstall required.Why a Delta scratch table instead of CREATE TEMP TABLE?
Databricks Delta Lake doesn't have session-local temp tables — the standard
CREATE TEMP TABLEsyntax isn't supported by Delta. A uniquely-named Delta scratch table in the samecatalog.schemais the idiomatic shape. The__drt_staging_*prefix makes it identifiable in audit logs, and aCREATE OR REPLACE TABLEon the next run cleanly overwrites any interrupted-mid-sync remnant. Documented indocs/connectors/databricks.md.Test plan
pytest tests/unit/test_databricks_destination.py— 22 passedpytest tests/unit/test_databricks_destination.py tests/unit/test_snowflake_destination.py tests/unit/test_connector_registry.py tests/contracts/— 125 passed (no regression on sibling destinations or contracts)ruff check drt tests— cleanDocs
docs/connectors/databricks.md— full reference (all three modes, auth with PAT generation, Unity Catalog vs Hive Metastore, merge-path staging design, sync-mode compatibility table)i18n marker bump for README.ja.md follows the established post-merge housekeeping pattern (same as #618 for #613 S3 etc.).
CHANGELOG
[Unreleased] → Addedentry above the S3 entry.Out of scope
replace_strategy: swapzero-downtime replace (Snowflake also lacks this — separate follow-up issue)Related
🤖 Generated with Claude Code