diff --git a/CHANGELOG.md b/CHANGELOG.md
index 646ebfa..bf67155 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -47,6 +47,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
+- **Databricks Delta Lake destination** ([#167](https://github.com/drt-hub/drt/issues/167)): the third DWH destination alongside Snowflake ([#353](https://github.com/drt-hub/drt/pull/353)) and BigQuery ([#584](https://github.com/drt-hub/drt/pull/584), in flight) — completes the major-DWH lineup. Supports **INSERT** (append), **MERGE** (upsert via Delta Lake's native `MERGE INTO`), and **`sync.mode: mirror`** (upsert + end-of-sync DELETE-missing — joins the Postgres / MySQL / ClickHouse / Snowflake mirror family from [#340](https://github.com/drt-hub/drt/issues/340)). Auth via the **Databricks SQL Connector**: `host_env` resolves to the workspace hostname (e.g. `dbc-abc12345-1234.cloud.databricks.com`), `http_path_env` to the SQL warehouse HTTP path (e.g. `/sql/1.0/warehouses/abc123def456`), `token_env` to a personal access token (PAT, `dapi*`). The token-bearing principal needs `USAGE` on the catalog + schema, `MODIFY` on the target table, and `CREATE` on the schema for the merge-path staging. Unity Catalog three-part names (`catalog.schema.table`) are the default; legacy workspaces use `catalog: hive_metastore`. **Merge implementation**: because Delta Lake doesn't have session-local temp tables (no `CREATE TEMP TABLE` syntax), drt creates a uniquely-named Delta scratch table `catalog.schema.__drt_staging_
` cloned from the target's schema, stages rows via per-row `INSERT`, executes the `MERGE INTO target USING staging ON ` statement, and `DROP TABLE`s the staging at the end. The `__drt_staging_*` prefix makes the scratch table identifiable in audit logs; a `CREATE OR REPLACE TABLE` on the next run cleanly overwrites any interrupted-mid-sync remnant. Composite keys are supported (`upsert_key: [tenant_id, user_id]`) — the `ON` clause becomes AND-joined. When every column is in `upsert_key`, the MERGE skips the `UPDATE` clause (effectively INSERT-IF-NOT-EXISTS). **Mirror semantics**: `sync.mode: mirror` forces the MERGE write path regardless of `config.mode`, then issues `DELETE FROM WHERE upsert_key NOT IN (observed)` at end-of-sync. Composite keys use the `WHERE (c1, c2) NOT IN ((v1a, v1b), (v2a, v2b), ...)` form (same shape as Snowflake's #599 leg). Safety guard: if no batch ever produced records, the DELETE is skipped entirely so a transient empty source doesn't wipe the destination. Empty batches short-circuit before any `databricks.sql` import or warehouse call — the same implicit "no driver was imported" contract used by the other SQL destinations ([#595](https://github.com/drt-hub/drt/pull/595)). Row-level errors during INSERT are captured in `result.row_errors`; `on_error: skip` (default) continues the sync, `on_error: fail` raises. Requires `pip install drt-core[databricks]` (depends on `databricks-sql-connector>=3.0`). New `docs/connectors/databricks.md` covers all three modes, the auth flow with PAT generation steps, the Unity Catalog vs Hive Metastore catalog field, the merge-path staging design (why a Delta scratch table and not `CREATE TEMP TABLE`), and a sync-mode compatibility table. 22 unit tests in `tests/unit/test_databricks_destination.py` cover config validation (including the `schema:` YAML alias for the mypy-strict `schema_` field, three-part FQN in `describe()`, and Hive Metastore catalog), the empty-batch short-circuit, the `databricks.sql.connect()` kwargs shape (protects against silent template-copy drift from the Snowflake destination, since the two share most other code), INSERT / MERGE happy paths, the `upsert_key`-required ValueError, on_error skip vs fail behaviour, composite-key MERGE `ON` clause, all-columns-are-key MERGE (no UPDATE clause), all mirror invariants (`upsert_key` validation, MERGE-write-path forcing, single-column DELETE, composite-key DELETE tuple form, skip-when-no-records safety guard, no-op `finalize_sync` for non-mirror modes), and the `test_connection` round-trip. `databricks.sql` is mocked via `sys.modules` injection (no real Databricks workspace or `databricks-sql-connector` install required). Closes [#167](https://github.com/drt-hub/drt/issues/167). BigQuery destination ([#584](https://github.com/drt-hub/drt/pull/584), contributor PR) follows the same DWH MERGE shape and remains in flight.
+
- **Amazon S3 destination** ([#168](https://github.com/drt-hub/drt/issues/168)): new file-style destination that uploads each sync batch as a single file to an S3 bucket, in any of four formats — **csv** / **json** / **jsonl** / **parquet** — with optional **gzip** compression for the three text formats (parquet uses its own column-level compression via `parquet_compression`). Authentication defers to boto3's standard credential chain (env vars → `~/.aws/credentials` → instance profile → IAM role), which is the right shape for most real deployments; explicit overrides are available via `aws_profile`, `aws_access_key_id_env` / `aws_secret_access_key_env` / `aws_session_token_env`, or by setting `endpoint_url` for S3-compatible services (MinIO, LocalStack, Cloudflare R2, DigitalOcean Spaces). Default S3 key is `.` — timestamped so re-runs land at a fresh key rather than overwriting, matching the Census / Hightouch convention and making downstream "new files since last check" polling trivial. Override the file name via `key_template` (supports the `{timestamp}` placeholder); for per-sync routing, the recommended pattern is a sync-specific `prefix`. Empty batches short-circuit before any boto3 import or AWS call — the same implicit "no driver was imported" contract used by the SQL destinations ([#595](https://github.com/drt-hub/drt/pull/595)), so a run with zero source rows produces zero S3 objects. Failure semantics distinguish missing-extras (`ImportError` bubbles up so the engine surfaces the deployment mistake once at the top) from network / auth / permission errors (recorded as `result.failed = len(records)` so other batches keep going) — see `drt/destinations/s3.py`. Requires `pip install drt-core[s3]` (and additionally `drt-core[parquet]` for `format: parquet`). New `docs/connectors/s3.md` covers all four formats, the authentication flow, the file-naming convention with `key_template` examples, and the MinIO / R2 endpoint override pattern. 21 unit tests in `tests/unit/test_s3_destination.py` cover config validation across all four formats and both compressions, the implicit no-boto3-import empty-batch path, put_object call shape per format, gzip → `ContentEncoding: gzip` header + `.gz` extension, key-naming default + template overrides (with and without an explicit extension), the credential threading (profile / env vars / endpoint URL), and both failure paths (serialisation error → row failures, missing extras → ImportError bubble-up). boto3 is mocked via `sys.modules` injection (no real AWS account or boto3 install required for the non-parquet tests). Closes [#168](https://github.com/drt-hub/drt/issues/168). GCS ([#169](https://github.com/drt-hub/drt/issues/169)) and Azure Blob ([#170](https://github.com/drt-hub/drt/issues/170)) are natural follow-ups — the serialiser / key-naming / credential-threading layer here is reusable.
### Documentation
diff --git a/README.ja.md b/README.ja.md
index 93af303..8390dae 100644
--- a/README.ja.md
+++ b/README.ja.md
@@ -272,6 +272,7 @@ Claude Codeの公式スキルをインストールすると、チャットイン
| Salesforce Bulk API 2.0 | ✅ v0.6 | (core) | OAuth2(username-password) |
| Staged Upload | ✅ v0.6 | (core) | プロバイダーごとに設定 |
| Snowflake | ✅ v0.7 | `pip install drt-core[snowflake]` | パスワード(環境変数) |
+| Databricks Delta Lake | ✅ v0.7.9 | `pip install drt-core[databricks]` | Personal Access Token(環境変数) |
### インテグレーション
diff --git a/README.md b/README.md
index 6c85989..fab063c 100644
--- a/README.md
+++ b/README.md
@@ -299,6 +299,7 @@ Copy the files from `.claude/commands/` into your drt project's `.claude/command
| Salesforce Bulk API 2.0 | ✅ v0.6 | (core) | OAuth2 (username-password) |
| Staged Upload | ✅ v0.6 | (core) | Configurable per provider |
| Snowflake | ✅ v0.7 | `pip install drt-core[snowflake]` | Password (env var) |
+| Databricks Delta Lake | ✅ v0.7.9 | `pip install drt-core[databricks]` | Personal Access Token (env var) |
### Integrations
diff --git a/docs/connectors/databricks.md b/docs/connectors/databricks.md
new file mode 100644
index 0000000..0b9df9a
--- /dev/null
+++ b/docs/connectors/databricks.md
@@ -0,0 +1,174 @@
+# Databricks Destination
+
+> Write data back to Databricks Delta Lake tables via the Databricks
+> SQL Connector. Supports **INSERT**, **MERGE** (upsert), and
+> **`sync.mode: mirror`** (upsert + DELETE-missing) — the same modes
+> as the [Snowflake destination](snowflake.md).
+
+## YAML Example — INSERT (append)
+
+```yaml
+destination:
+ type: databricks
+ host_env: DATABRICKS_HOST
+ http_path_env: DATABRICKS_HTTP_PATH
+ token_env: DATABRICKS_TOKEN
+ catalog: main
+ schema: default
+ table: user_events
+ mode: insert
+```
+
+## YAML Example — MERGE (upsert)
+
+```yaml
+destination:
+ type: databricks
+ host_env: DATABRICKS_HOST
+ http_path_env: DATABRICKS_HTTP_PATH
+ token_env: DATABRICKS_TOKEN
+ catalog: main
+ schema: default
+ table: user_scores
+ mode: merge
+ upsert_key: [user_id]
+```
+
+## YAML Example — Mirror (upsert + delete-missing)
+
+```yaml
+destination:
+ type: databricks
+ host_env: DATABRICKS_HOST
+ http_path_env: DATABRICKS_HTTP_PATH
+ token_env: DATABRICKS_TOKEN
+ catalog: main
+ schema: analytics
+ table: active_users
+ upsert_key: [user_id] # required for mirror
+
+sync:
+ mode: mirror # forces MERGE write path + end-of-sync DELETE
+```
+
+## Configuration
+
+| Field | Type | Default | Description |
+|---|---|---|---|
+| `type` | `"databricks"` | — | Required |
+| `host_env` | string | — | Env var name holding the workspace hostname (e.g. `dbc-abc12345-1234.cloud.databricks.com`) |
+| `http_path_env` | string | — | Env var name holding the SQL warehouse HTTP path (e.g. `/sql/1.0/warehouses/abc123def456`) |
+| `token_env` | string | — | Env var name holding a Databricks personal access token (PAT, starts with `dapi`) |
+| `catalog` | string | — | Unity Catalog catalog name. Use `hive_metastore` for legacy workspaces. |
+| `schema` | string | — | Database/schema name within the catalog |
+| `table` | string | — | Target Delta Lake table name |
+| `mode` | `insert` \| `merge` | `insert` | Write mode. `merge` requires `upsert_key`. |
+| `upsert_key` | list[str] \| null | null | Column list that uniquely identifies a row. Required for `mode: merge` and for `sync.mode: mirror`. Composite keys supported (`[tenant_id, user_id]`). |
+
+## Authentication
+
+Databricks SQL Connector authenticates via a **personal access token**
+(PAT). Generate one in the Databricks workspace at User Settings →
+Developer → Access tokens.
+
+```bash
+export DATABRICKS_HOST=dbc-abc12345-1234.cloud.databricks.com
+export DATABRICKS_HTTP_PATH=/sql/1.0/warehouses/abc123def456
+export DATABRICKS_TOKEN=dapi-xxxxxxxxxxxxxxxxxxxx
+```
+
+The token-bearing principal needs:
+- `USAGE` on the catalog and schema
+- `MODIFY` on the target table
+- `CREATE` on the schema (for the merge-path staging table — see below)
+
+> **OAuth M2M / service principal flows** are not yet supported.
+> Track [#634](https://github.com/drt-hub/drt/issues) if you need
+> them — for now use a PAT on a service-principal-scoped account.
+
+## Write modes
+
+### `mode: insert`
+
+Issues one `INSERT INTO catalog.schema.table (...) VALUES (...)` per
+record. Best for **append-only** workloads — event streams, audit
+logs, telemetry.
+
+Errors at the row level are tracked in `result.row_errors`; the
+sync continues for the rest of the batch unless `on_error: fail`.
+
+### `mode: merge`
+
+Creates a uniquely-named **Delta scratch table**
+(`catalog.schema.__drt_staging_`), stages rows into it via
+per-row `INSERT`, then issues `MERGE INTO target USING staging ON
+ WHEN MATCHED THEN UPDATE WHEN NOT MATCHED THEN INSERT`,
+and finally `DROP TABLE` the staging table.
+
+**Why a Delta scratch table and not `CREATE TEMP TABLE`?** Databricks
+Delta Lake doesn't have session-local tables — the standard
+`CREATE TEMP TABLE` syntax isn't supported. A uniquely-named scratch
+table in the same catalog.schema is the idiomatic shape. The
+`__drt_staging_*` prefix makes it identifiable in audit logs.
+
+**Composite keys** are supported (`upsert_key: [tenant_id, user_id]`)
+— the `ON` clause becomes `target.tenant_id = source.tenant_id AND
+target.user_id = source.user_id`.
+
+If every column is in `upsert_key`, the MERGE skips the `UPDATE` clause
+(no non-key columns to update); the resulting statement is effectively
+`INSERT-IF-NOT-EXISTS`.
+
+### `sync.mode: mirror`
+
+Mirrors the source table to the destination: upserts source rows AND
+deletes destination rows whose `upsert_key` was not observed in the
+source. Forces the MERGE write path regardless of `config.mode`, so
+the only YAML changes needed are:
+
+```yaml
+destination:
+ ...
+ upsert_key: [user_id] # required
+sync:
+ mode: mirror
+```
+
+End-of-sync, drt issues a single
+`DELETE FROM catalog.schema.table WHERE upsert_key NOT IN (observed)`
+against the destination. Composite keys use the
+`WHERE (c1, c2) NOT IN ((v1a, v1b), (v2a, v2b), ...)` form.
+
+**Safety guard**: if no batch ever produced records (source returned
+zero rows), the DELETE is skipped entirely — protects against wiping
+the destination when the source is transiently empty.
+
+Mirror semantics fit the same shape as Postgres / MySQL / ClickHouse /
+Snowflake mirror destinations (see #340) — same `upsert_key` contract,
+same source-key-cardinality memory bound on `_mirror_keys`.
+
+## Sync modes
+
+| `sync.mode` | Behaviour on Databricks |
+|-------------|--------------------------|
+| `full` | Re-extracts every run + writes via `config.mode` (insert / merge). |
+| `incremental` | Watermark-based — extracts rows with `cursor_field > last_value`, writes via `config.mode`. |
+| `upsert` | Same as `incremental` but with `upsert_key` enforced. |
+| `mirror` | Forces the MERGE write path + end-of-sync DELETE. `upsert_key` required. |
+| `replace` | Not yet supported on Databricks (use `mirror` for "everything in source = everything in dest" semantics). |
+
+## Notes
+
+- Requires `pip install drt-core[databricks]` (depends on `databricks-sql-connector>=3.0`).
+- Target table must be a **Delta Lake table**. Non-Delta formats fail at `MERGE INTO` time with a Databricks server error.
+- Empty batches short-circuit before any `databricks.sql` import or warehouse call — the same "no driver was imported" contract used by the SQL destinations (#595). A run with zero source rows produces zero warehouse statements.
+- Errors during the connect step (missing env vars, bad token, network) raise immediately; errors during row INSERT are captured per-row and surface in `result.row_errors`.
+- The `__drt_staging_*` scratch table is dropped at the end of every merge run. If a sync is interrupted mid-merge, the next run's `CREATE OR REPLACE TABLE` overwrites it cleanly.
+- Unity Catalog three-part names are the default. Workspaces still on Hive Metastore should use `catalog: hive_metastore`.
+
+## References
+
+- [Databricks SQL Connector for Python](https://docs.databricks.com/dev-tools/python-sql-connector.html)
+- [Delta Lake MERGE INTO](https://docs.databricks.com/delta/merge.html)
+- [Personal access tokens](https://docs.databricks.com/dev-tools/auth/pat.html)
+- Sibling SQL destinations with the same shape: [Snowflake](snowflake.md), [PostgreSQL](postgres.md), [MySQL](mysql.md), [ClickHouse](clickhouse.md)
diff --git a/drt/config/models.py b/drt/config/models.py
index a1aac2c..4bd6999 100644
--- a/drt/config/models.py
+++ b/drt/config/models.py
@@ -408,6 +408,44 @@ def describe(self) -> str:
return f"{self.type} ({self.database}.{self.schema_}.{self.table})"
+class DatabricksDestinationConfig(BaseModel):
+ """Databricks Delta Lake destination — write data back to Databricks tables.
+
+ Auth via the Databricks SQL Connector: a SQL warehouse HTTP path
+ plus a personal access token (PAT). The token-bearing user needs
+ USAGE on the catalog + schema and INSERT/MODIFY on the target
+ table.
+ """
+
+ type: Literal["databricks"]
+
+ # Workspace hostname (env-var resolved), e.g.
+ # ``dbc-abc12345-1234.cloud.databricks.com``.
+ host_env: str
+ # SQL warehouse HTTP path (env-var resolved), e.g.
+ # ``/sql/1.0/warehouses/abc123def456``.
+ http_path_env: str
+ # Databricks personal access token (env-var resolved). Starts with ``dapi``.
+ token_env: str
+
+ # Three-part name (Unity Catalog). For Hive Metastore deployments
+ # use catalog="hive_metastore".
+ catalog: str
+ # Field alias because BaseModel.schema() shadows a plain `schema`
+ # attribute under mypy strict mode; YAML key stays `schema:`.
+ schema_: str = Field(alias="schema")
+ table: str
+
+ mode: Literal["insert", "merge"] = "insert"
+
+ # Required for merge mode and for ``sync.mode: mirror``. Composite
+ # keys supported (`upsert_key: [tenant_id, user_id]`).
+ upsert_key: list[str] | None = None
+
+ def describe(self) -> str:
+ return f"{self.type} ({self.catalog}.{self.schema_}.{self.table})"
+
+
class LinearDestinationConfig(BaseModel):
type: Literal["linear"]
team_id: str | None = None
@@ -807,7 +845,8 @@ def _check_instance_url(self) -> SalesforceBulkDestinationConfig:
| StagedUploadDestinationConfig
| SalesforceBulkDestinationConfig
| TwilioDestinationConfig
- | SnowflakeDestinationConfig,
+ | SnowflakeDestinationConfig
+ | DatabricksDestinationConfig,
Field(discriminator="type"),
]
diff --git a/drt/connectors/registry.py b/drt/connectors/registry.py
index d048000..63bc68d 100644
--- a/drt/connectors/registry.py
+++ b/drt/connectors/registry.py
@@ -140,6 +140,7 @@ def _register_all_connectors() -> None:
from drt.config.models import (
AmplitudeDestinationConfig,
ClickHouseDestinationConfig,
+ DatabricksDestinationConfig,
DiscordDestinationConfig,
EmailSmtpDestinationConfig,
FileDestinationConfig,
@@ -170,6 +171,7 @@ def _register_all_connectors() -> None:
# Import destination classes
from drt.destinations.amplitude import AmplitudeDestination
from drt.destinations.clickhouse import ClickHouseDestination
+ from drt.destinations.databricks import DatabricksDestination
from drt.destinations.discord import DiscordDestination
from drt.destinations.email_smtp import EmailSmtpDestination
from drt.destinations.file import FileDestination
@@ -239,6 +241,7 @@ def _register_all_connectors() -> None:
register_destination("staged_upload", StagedUploadDestinationConfig, StagedUploadDestination)
register_destination("intercom", IntercomDestinationConfig, IntercomDestination)
register_destination("snowflake", SnowflakeDestinationConfig, SnowflakeDestination)
+ register_destination("databricks", DatabricksDestinationConfig, DatabricksDestination)
# Register all sources
register_source("bigquery", BigQueryProfile, BigQuerySource)
diff --git a/drt/destinations/databricks.py b/drt/destinations/databricks.py
new file mode 100644
index 0000000..71a776a
--- /dev/null
+++ b/drt/destinations/databricks.py
@@ -0,0 +1,307 @@
+"""Databricks Delta Lake destination — write data back to Databricks tables.
+
+Supports:
+
+- INSERT (append, ``config.mode: insert``)
+- MERGE (upsert via Delta Lake's native ``MERGE INTO``, ``config.mode: merge``)
+- ``sync.mode: mirror`` (#340 family — Databricks leg) — MERGE upsert,
+ then end-of-sync ``DELETE FROM ... WHERE upsert_key NOT IN (observed)``
+ from :meth:`finalize_sync`. Mirror mode forces the MERGE write path
+ regardless of ``config.mode``, so users only need to set
+ ``destination.upsert_key`` and ``sync.mode: mirror``.
+
+Naming: Unity Catalog three-part name ``catalog.schema.table``. For
+workspaces still on Hive Metastore, set ``catalog: hive_metastore``.
+
+Auth: Databricks SQL Connector — ``host_env`` / ``http_path_env`` /
+``token_env`` resolved at runtime. The token-bearing principal needs
+USAGE on the catalog and schema plus ``MODIFY`` on the target table
+(plus ``CREATE`` on the schema for the merge-path Delta scratch table).
+
+Install: ``pip install drt-core[databricks]`` (depends on
+``databricks-sql-connector>=3.0``). The target table MUST be a Delta
+Lake table for MERGE / mirror to work — non-Delta tables will fail at
+``MERGE INTO`` time with a Databricks error.
+
+Example sync YAML:
+
+ destination:
+ type: databricks
+ host_env: DATABRICKS_HOST
+ http_path_env: DATABRICKS_HTTP_PATH
+ token_env: DATABRICKS_TOKEN
+ catalog: main
+ schema: default
+ table: user_scores
+ mode: merge
+ upsert_key: [user_id]
+"""
+
+from __future__ import annotations
+
+from typing import Any
+
+from drt.config.credentials import resolve_env
+from drt.config.models import DatabricksDestinationConfig, DestinationConfig, SyncOptions
+from drt.destinations.base import SyncResult
+from drt.destinations.row_errors import RowError
+
+
+class DatabricksDestination:
+ """Write records into Databricks Delta Lake tables."""
+
+ def __init__(self) -> None:
+ # sync.mode: mirror — accumulates upsert_key tuples seen across
+ # batches so finalize_sync can DELETE missing rows. ``None`` means
+ # mirror mode hasn't engaged yet (no batch with records); finalize
+ # treats that as "skip DELETE" — safety against deleting
+ # everything when the source produced no data.
+ self._mirror_keys: list[tuple[Any, ...]] | None = None
+
+ def load(
+ self,
+ records: list[dict[str, Any]],
+ config: DestinationConfig,
+ sync_options: SyncOptions,
+ ) -> SyncResult:
+ assert isinstance(config, DatabricksDestinationConfig)
+ if not records:
+ # Empty-source short-circuit — no databricks import, no
+ # warehouse call. Same shape as the other registered
+ # destinations (empty-batch contract suite, #604-#606).
+ return SyncResult()
+
+ result = SyncResult()
+ conn = self._connect(config)
+
+ # sync.mode: mirror forces the MERGE write path regardless of
+ # config.mode — mirror semantics require upsert. Validate
+ # upsert_key here so the misconfiguration is surfaced before any
+ # row touches Databricks.
+ is_mirror = sync_options.mode == "mirror"
+ if is_mirror and not config.upsert_key:
+ conn.close()
+ raise ValueError(
+ "sync.mode: mirror requires destination.upsert_key "
+ "(needed to identify which rows to DELETE)."
+ )
+ effective_mode = "merge" if is_mirror else config.mode
+
+ try:
+ with conn.cursor() as cur:
+ columns = list(records[0].keys())
+ col_list = ", ".join(columns)
+ placeholders = ", ".join(["%s"] * len(columns))
+ table_fq = f"{config.catalog}.{config.schema_}.{config.table}"
+
+ if effective_mode == "insert":
+ sql = f"INSERT INTO {table_fq} ({col_list}) VALUES ({placeholders})"
+
+ for i, row in enumerate(records):
+ try:
+ cur.execute(sql, list(row.values()))
+ result.success += 1
+ except Exception as e:
+ result.failed += 1
+ result.row_errors.append(
+ RowError(
+ batch_index=i,
+ record_preview=str(row)[:200],
+ http_status=None,
+ error_message=str(e),
+ )
+ )
+ if sync_options.on_error == "fail":
+ raise
+
+ elif effective_mode == "merge":
+ if not config.upsert_key:
+ raise ValueError("upsert_key is required for merge mode")
+
+ key_clause = " AND ".join(
+ [f"target.{k} = source.{k}" for k in config.upsert_key]
+ )
+ update_cols = [c for c in columns if c not in config.upsert_key]
+ update_clause = ", ".join([f"{c} = source.{c}" for c in update_cols])
+ insert_cols = col_list
+ insert_vals = ", ".join([f"source.{c}" for c in columns])
+
+ # Databricks Delta needs a relation on the USING
+ # side of MERGE. Delta doesn't have session-local
+ # temp tables (no `CREATE TEMP TABLE`), so we stage
+ # into a uniquely-named Delta scratch table in the
+ # target catalog.schema, then DROP it at the end.
+ # The token-bearing principal needs ``CREATE`` on
+ # the schema in addition to ``MODIFY`` on the
+ # target.
+ staging_table = (
+ f"{config.catalog}.{config.schema_}.__drt_staging_{config.table}"
+ )
+
+ cur.execute(
+ f"CREATE OR REPLACE TABLE {staging_table} "
+ f"AS SELECT * FROM {table_fq} WHERE 1=0"
+ )
+
+ for i, row in enumerate(records):
+ try:
+ cur.execute(
+ f"INSERT INTO {staging_table} ({col_list}) VALUES ({placeholders})",
+ list(row.values()),
+ )
+ except Exception as e:
+ result.failed += 1
+ result.row_errors.append(
+ RowError(
+ batch_index=i,
+ record_preview=str(row)[:200],
+ http_status=None,
+ error_message=str(e),
+ )
+ )
+ if sync_options.on_error == "fail":
+ raise
+
+ matched_clause = (
+ f"WHEN MATCHED THEN UPDATE SET {update_clause}" if update_cols else ""
+ )
+
+ merge_sql = (
+ f"MERGE INTO {table_fq} target "
+ f"USING {staging_table} source "
+ f"ON {key_clause} "
+ f"{matched_clause} "
+ f"WHEN NOT MATCHED THEN INSERT ({insert_cols}) "
+ f"VALUES ({insert_vals})"
+ )
+ cur.execute(merge_sql)
+ result.success += len(records) - result.failed
+
+ # Clean up the staging Delta table so subsequent
+ # syncs don't trip over it (and so storage doesn't
+ # accumulate).
+ cur.execute(f"DROP TABLE IF EXISTS {staging_table}")
+
+ # sync.mode: mirror — accumulate upsert_key tuples
+ # for the finalize_sync DELETE pass. Only keys from
+ # records that survived the staging INSERT count as
+ # "source state" — failed records are skipped.
+ if is_mirror:
+ assert config.upsert_key
+ if self._mirror_keys is None:
+ self._mirror_keys = []
+ failed_indices = {re.batch_index for re in result.row_errors}
+ for idx, record in enumerate(records):
+ if idx in failed_indices:
+ continue
+ self._mirror_keys.append(
+ tuple(record.get(k) for k in config.upsert_key)
+ )
+
+ else:
+ raise ValueError(f"Unsupported mode: {config.mode}")
+
+ finally:
+ conn.close()
+
+ return result
+
+ def finalize_sync(
+ self,
+ config: DestinationConfig,
+ sync_options: SyncOptions,
+ ) -> SyncResult | None:
+ """End-of-sync hook: DELETE-missing for ``sync.mode: mirror``.
+
+ Databricks has no swap-replace finalize path in this destination
+ (no shadow tables), so this hook is mirror-only. Resets
+ ``_mirror_keys`` after dispatch so a re-run starts fresh.
+ """
+ if sync_options.mode != "mirror":
+ return None
+ result = self._finalize_mirror(config, sync_options)
+ self._mirror_keys = None
+ return result
+
+ def _finalize_mirror(
+ self,
+ config: DestinationConfig,
+ sync_options: SyncOptions,
+ ) -> SyncResult | None:
+ """``sync.mode: mirror`` end-of-sync DELETE pass.
+
+ Issues ``DELETE FROM .. WHERE key NOT IN
+ ()`` against Databricks Delta. The connector uses
+ ``pyformat`` placeholders, but Databricks SQL does not auto-expand
+ a tuple-of-tuples — so the placeholder list is built explicitly,
+ mirroring the Snowflake leg of #340.
+
+ Returns ``None`` when ``_mirror_keys`` is empty or ``None`` —
+ treats "no batch with records was ever observed" as a signal to
+ skip the DELETE entirely, so a transient empty source doesn't
+ wipe the destination.
+ """
+ assert isinstance(config, DatabricksDestinationConfig)
+ if not self._mirror_keys:
+ return None
+
+ upsert_cols = config.upsert_key
+ assert upsert_cols # guarded in load()
+
+ # Dedupe to keep the IN list compact when batches overlap.
+ keys = list({tuple(k) for k in self._mirror_keys})
+ table_fq = f"{config.catalog}.{config.schema_}.{config.table}"
+
+ conn = self._connect(config)
+ try:
+ with conn.cursor() as cur:
+ if len(upsert_cols) == 1:
+ placeholders = ", ".join(["%s"] * len(keys))
+ stmt = f"DELETE FROM {table_fq} WHERE {upsert_cols[0]} NOT IN ({placeholders})"
+ params: list[Any] = [k[0] for k in keys]
+ else:
+ col_tuple = "(" + ", ".join(upsert_cols) + ")"
+ row_placeholder = "(" + ", ".join(["%s"] * len(upsert_cols)) + ")"
+ placeholders = ", ".join([row_placeholder] * len(keys))
+ stmt = f"DELETE FROM {table_fq} WHERE {col_tuple} NOT IN ({placeholders})"
+ params = [v for key in keys for v in key]
+ cur.execute(stmt, params)
+ finally:
+ conn.close()
+
+ return SyncResult()
+
+ def test_connection(self, config: DestinationConfig) -> None:
+ """Test connectivity by establishing a connection and running ``SELECT 1``."""
+ assert isinstance(config, DatabricksDestinationConfig)
+ conn = self._connect(config)
+ try:
+ with conn.cursor() as cur:
+ cur.execute("SELECT 1")
+ finally:
+ conn.close()
+
+ def _connect(self, config: DatabricksDestinationConfig) -> Any:
+ """Establish a connection to Databricks via SQL Connector."""
+ try:
+ from databricks import sql # type: ignore[import-untyped]
+ except ImportError as e:
+ raise ImportError(
+ "Databricks destination requires: pip install drt-core[databricks]"
+ ) from e
+
+ host = resolve_env(None, config.host_env)
+ http_path = resolve_env(None, config.http_path_env)
+ token = resolve_env(None, config.token_env)
+
+ if not host or not http_path or not token:
+ raise ValueError(
+ "Missing Databricks credentials. Check environment variables "
+ f"({config.host_env}, {config.http_path_env}, {config.token_env})."
+ )
+
+ return sql.connect(
+ server_hostname=host,
+ http_path=http_path,
+ access_token=token,
+ )
diff --git a/tests/unit/test_databricks_destination.py b/tests/unit/test_databricks_destination.py
new file mode 100644
index 0000000..9d3bfa6
--- /dev/null
+++ b/tests/unit/test_databricks_destination.py
@@ -0,0 +1,534 @@
+"""Unit tests for the Databricks destination.
+
+Uses ``sys.modules`` injection to mock ``databricks.sql`` — no real
+Databricks workspace or databricks-sql-connector install required
+(matches the pattern in test_snowflake_destination.py).
+"""
+
+from __future__ import annotations
+
+from typing import Any
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from drt.config.models import DatabricksDestinationConfig, SyncOptions
+from drt.destinations.databricks import DatabricksDestination
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+
+def _options(**kwargs: Any) -> SyncOptions:
+ return SyncOptions(**kwargs)
+
+
+def _config(**overrides: Any) -> DatabricksDestinationConfig:
+ defaults: dict[str, Any] = {
+ "type": "databricks",
+ "host_env": "DB_HOST",
+ "http_path_env": "DB_HTTP_PATH",
+ "token_env": "DB_TOKEN",
+ "catalog": "main",
+ "schema": "default", # alias form — populated into schema_
+ "table": "user_scores",
+ }
+ defaults.update(overrides)
+ return DatabricksDestinationConfig.model_validate(defaults)
+
+
+def _set_creds(monkeypatch: pytest.MonkeyPatch) -> None:
+ monkeypatch.setenv("DB_HOST", "dbc-abc123.cloud.databricks.com")
+ monkeypatch.setenv("DB_HTTP_PATH", "/sql/1.0/warehouses/xyz789")
+ monkeypatch.setenv("DB_TOKEN", "dapi-test-token")
+
+
+def _fake_conn() -> MagicMock:
+ """Fake databricks.sql connection with a context-managed cursor."""
+ conn = MagicMock()
+ cur = MagicMock()
+ conn.cursor.return_value.__enter__.return_value = cur
+ conn.cursor.return_value.__exit__.return_value = False
+ conn._cur = cur # for assertions
+ return conn
+
+
+def _mocked_databricks_modules(conn: MagicMock | None = None) -> dict[str, MagicMock]:
+ """Build sys.modules entries that satisfy ``from databricks import sql``."""
+ mock_sql = MagicMock()
+ if conn is not None:
+ mock_sql.connect.return_value = conn
+
+ mock_databricks = MagicMock()
+ mock_databricks.sql = mock_sql
+
+ return {"databricks": mock_databricks, "databricks.sql": mock_sql}
+
+
+# ---------------------------------------------------------------------------
+# Config validation
+# ---------------------------------------------------------------------------
+
+
+class TestDatabricksDestinationConfig:
+ def test_valid_config(self) -> None:
+ config = _config()
+ assert config.catalog == "main"
+ assert config.schema_ == "default"
+ assert config.table == "user_scores"
+ assert config.mode == "insert"
+ assert config.upsert_key is None
+
+ def test_yaml_uses_schema_alias(self) -> None:
+ """YAML key `schema:` populates the `schema_` field (mypy-strict workaround)."""
+ config = DatabricksDestinationConfig.model_validate(
+ {
+ "type": "databricks",
+ "host_env": "DB_HOST",
+ "http_path_env": "DB_HTTP_PATH",
+ "token_env": "DB_TOKEN",
+ "catalog": "main",
+ "schema": "analytics",
+ "table": "users",
+ }
+ )
+ assert config.schema_ == "analytics"
+
+ def test_describe_uses_three_part_name(self) -> None:
+ assert _config().describe() == "databricks (main.default.user_scores)"
+
+ def test_hive_metastore_catalog_is_valid(self) -> None:
+ """Workspaces on Hive Metastore use ``catalog: hive_metastore``."""
+ config = _config(catalog="hive_metastore")
+ assert config.describe() == "databricks (hive_metastore.default.user_scores)"
+
+
+# ---------------------------------------------------------------------------
+# Load behavior
+# ---------------------------------------------------------------------------
+
+
+class TestDatabricksDestinationLoad:
+ def test_empty_records_short_circuits_before_import(self) -> None:
+ """No records → returns early before even attempting the databricks import.
+
+ Mirrors the empty-batch contract (#604–#606): if ``load([])``
+ ever reaches the import, this test crashes with
+ ``ModuleNotFoundError`` on CI's minimal install (no [databricks]).
+ """
+ result = DatabricksDestination().load([], _config(), _options())
+ assert result.success == 0
+ assert result.failed == 0
+
+ def test_missing_credentials_raises(
+ self, monkeypatch: pytest.MonkeyPatch, tmp_path: Any
+ ) -> None:
+ monkeypatch.delenv("DB_HOST", raising=False)
+ monkeypatch.delenv("DB_HTTP_PATH", raising=False)
+ monkeypatch.delenv("DB_TOKEN", raising=False)
+ monkeypatch.chdir(tmp_path)
+ with patch.dict("sys.modules", _mocked_databricks_modules()):
+ with pytest.raises(ValueError, match="Missing Databricks credentials"):
+ DatabricksDestination().load([{"id": 1}], _config(), _options())
+
+ def test_import_error_when_extras_missing(self) -> None:
+ """No [databricks] extras → ImportError with the install hint."""
+ with patch("builtins.__import__", side_effect=ImportError):
+ with pytest.raises(ImportError, match=r"drt-core\[databricks\]"):
+ DatabricksDestination().load([{"id": 1}], _config(), _options())
+
+ def test_connect_uses_databricks_sql_kwargs(self, monkeypatch: pytest.MonkeyPatch) -> None:
+ """Confirm the connect() call uses the Databricks SQL Connector
+ kwargs (``server_hostname``, ``http_path``, ``access_token``)
+ rather than e.g. the Snowflake shape — protects against
+ silent template-copy drift between SQL destinations."""
+ _set_creds(monkeypatch)
+ conn = _fake_conn()
+ modules = _mocked_databricks_modules(conn)
+
+ with patch.dict("sys.modules", modules):
+ DatabricksDestination().load([{"id": 1}], _config(), _options())
+
+ conn_kwargs = modules["databricks.sql"].connect.call_args[1]
+ assert conn_kwargs["server_hostname"] == "dbc-abc123.cloud.databricks.com"
+ assert conn_kwargs["http_path"] == "/sql/1.0/warehouses/xyz789"
+ assert conn_kwargs["access_token"] == "dapi-test-token"
+
+ def test_insert_mode_success(self, monkeypatch: pytest.MonkeyPatch) -> None:
+ _set_creds(monkeypatch)
+ conn = _fake_conn()
+ modules = _mocked_databricks_modules(conn)
+
+ records = [
+ {"id": 1, "score": 0.95},
+ {"id": 2, "score": 0.80},
+ ]
+ with patch.dict("sys.modules", modules):
+ result = DatabricksDestination().load(records, _config(), _options())
+
+ assert result.success == 2
+ assert result.failed == 0
+ cur = conn._cur
+ assert cur.execute.call_count == 2
+ first_sql = cur.execute.call_args_list[0][0][0]
+ assert "INSERT INTO main.default.user_scores" in first_sql
+ assert "id, score" in first_sql
+ conn.close.assert_called_once()
+
+ def test_merge_mode_success(self, monkeypatch: pytest.MonkeyPatch) -> None:
+ _set_creds(monkeypatch)
+ conn = _fake_conn()
+ modules = _mocked_databricks_modules(conn)
+
+ records = [
+ {"id": 1, "score": 0.95},
+ {"id": 2, "score": 0.80},
+ ]
+ config = _config(mode="merge", upsert_key=["id"])
+ with patch.dict("sys.modules", modules):
+ result = DatabricksDestination().load(records, config, _options())
+
+ assert result.success == 2
+ sqls = [(call.args[0] if call.args else "") for call in conn._cur.execute.call_args_list]
+ # Staging Delta table created from the target table's schema
+ assert any(
+ "CREATE OR REPLACE TABLE main.default.__drt_staging_user_scores" in s for s in sqls
+ )
+ # Staging gets INSERTed before MERGE
+ assert any("INSERT INTO main.default.__drt_staging_user_scores" in s for s in sqls)
+ # MERGE INTO target FROM staging
+ assert any("MERGE INTO main.default.user_scores" in s for s in sqls)
+ assert any("WHEN MATCHED THEN UPDATE" in s for s in sqls)
+ assert any("WHEN NOT MATCHED THEN INSERT" in s for s in sqls)
+ # Staging table is dropped at the end so subsequent syncs don't trip
+ assert any("DROP TABLE IF EXISTS main.default.__drt_staging_user_scores" in s for s in sqls)
+
+ def test_merge_mode_requires_upsert_key(self, monkeypatch: pytest.MonkeyPatch) -> None:
+ _set_creds(monkeypatch)
+ modules = _mocked_databricks_modules(_fake_conn())
+ config = _config(mode="merge", upsert_key=None)
+ with patch.dict("sys.modules", modules):
+ with pytest.raises(ValueError, match="upsert_key is required"):
+ DatabricksDestination().load([{"id": 1}], config, _options())
+
+ def test_insert_row_error_on_error_skip(self, monkeypatch: pytest.MonkeyPatch) -> None:
+ _set_creds(monkeypatch)
+ conn = _fake_conn()
+ conn._cur.execute.side_effect = [Exception("type mismatch"), None]
+ modules = _mocked_databricks_modules(conn)
+
+ records = [
+ {"id": 1, "score": 0.5},
+ {"id": 2, "score": 0.9},
+ ]
+ with patch.dict("sys.modules", modules):
+ result = DatabricksDestination().load(records, _config(), _options(on_error="skip"))
+ assert result.failed == 1
+ assert result.success == 1
+ assert len(result.row_errors) == 1
+ assert "type mismatch" in result.row_errors[0].error_message
+
+ def test_insert_row_error_on_error_fail_raises(self, monkeypatch: pytest.MonkeyPatch) -> None:
+ _set_creds(monkeypatch)
+ conn = _fake_conn()
+ conn._cur.execute.side_effect = Exception("type mismatch")
+ modules = _mocked_databricks_modules(conn)
+
+ with patch.dict("sys.modules", modules):
+ with pytest.raises(Exception, match="type mismatch"):
+ DatabricksDestination().load([{"id": 1}], _config(), _options(on_error="fail"))
+ # Connection still closed via the try/finally
+ conn.close.assert_called_once()
+
+ def test_merge_composite_key(self, monkeypatch: pytest.MonkeyPatch) -> None:
+ """Composite ``upsert_key`` builds an AND-joined ON clause."""
+ _set_creds(monkeypatch)
+ conn = _fake_conn()
+ modules = _mocked_databricks_modules(conn)
+
+ records = [{"tenant_id": "a", "user_id": 1, "score": 0.95}]
+ config = _config(mode="merge", upsert_key=["tenant_id", "user_id"])
+ with patch.dict("sys.modules", modules):
+ DatabricksDestination().load(records, config, _options())
+
+ sqls = [(call.args[0] if call.args else "") for call in conn._cur.execute.call_args_list]
+ merge_sql = next(s for s in sqls if "MERGE INTO" in s)
+ assert (
+ "target.tenant_id = source.tenant_id AND target.user_id = source.user_id" in merge_sql
+ )
+
+ def test_merge_staging_insert_failure_on_error_skip(
+ self, monkeypatch: pytest.MonkeyPatch
+ ) -> None:
+ """Row failure during the staging INSERT lands in row_errors,
+ the sync continues, and the MERGE still runs against whatever
+ the staging table holds."""
+ _set_creds(monkeypatch)
+ conn = _fake_conn()
+ cur = conn._cur
+
+ # Fail the FIRST staging INSERT (row 0), let everything else through.
+ # The CREATE OR REPLACE TABLE statement runs first, then per-row
+ # INSERT INTO __drt_staging_..., then MERGE INTO, then DROP TABLE.
+ insert_call_count = {"n": 0}
+
+ def execute_side_effect(sql: str, *args: Any) -> None:
+ if "INSERT INTO main.default.__drt_staging_user_scores" in sql:
+ insert_call_count["n"] += 1
+ if insert_call_count["n"] == 1:
+ raise Exception("staging type mismatch")
+ return None
+
+ cur.execute.side_effect = execute_side_effect
+ modules = _mocked_databricks_modules(conn)
+
+ records = [
+ {"id": 1, "score": 0.5},
+ {"id": 2, "score": 0.9},
+ ]
+ config = _config(mode="merge", upsert_key=["id"])
+ with patch.dict("sys.modules", modules):
+ result = DatabricksDestination().load(
+ records, config, _options(on_error="skip")
+ )
+
+ assert result.failed == 1
+ assert result.success == 1
+ assert len(result.row_errors) == 1
+ assert "staging type mismatch" in result.row_errors[0].error_message
+ # The MERGE statement still ran (against the staging table that
+ # ended up with one row).
+ sqls = [(call.args[0] if call.args else "") for call in cur.execute.call_args_list]
+ assert any("MERGE INTO main.default.user_scores" in s for s in sqls)
+
+ def test_merge_staging_insert_failure_on_error_fail_raises(
+ self, monkeypatch: pytest.MonkeyPatch
+ ) -> None:
+ """``on_error=fail`` re-raises the staging-INSERT exception
+ immediately. The connection is still closed via try/finally."""
+ _set_creds(monkeypatch)
+ conn = _fake_conn()
+ cur = conn._cur
+
+ def execute_side_effect(sql: str, *args: Any) -> None:
+ if "INSERT INTO main.default.__drt_staging_user_scores" in sql:
+ raise Exception("staging type mismatch")
+ return None
+
+ cur.execute.side_effect = execute_side_effect
+ modules = _mocked_databricks_modules(conn)
+
+ config = _config(mode="merge", upsert_key=["id"])
+ with patch.dict("sys.modules", modules):
+ with pytest.raises(Exception, match="staging type mismatch"):
+ DatabricksDestination().load(
+ [{"id": 1, "score": 0.5}], config, _options(on_error="fail")
+ )
+ conn.close.assert_called_once()
+
+ def test_unsupported_mode_raises(self, monkeypatch: pytest.MonkeyPatch) -> None:
+ """An invalid ``config.mode`` raises ``ValueError``. Pydantic
+ prevents this at config-load time so the path is defensive —
+ this test bypasses Pydantic to exercise the fallthrough branch."""
+ _set_creds(monkeypatch)
+ conn = _fake_conn()
+ modules = _mocked_databricks_modules(conn)
+
+ config = _config(mode="insert")
+ # Bypass Pydantic Literal validation by mutating after construction.
+ object.__setattr__(config, "mode", "garbage") # type: ignore[arg-type]
+
+ with patch.dict("sys.modules", modules):
+ with pytest.raises(ValueError, match="Unsupported mode: garbage"):
+ DatabricksDestination().load([{"id": 1}], config, _options())
+
+ def test_merge_all_columns_are_key(self, monkeypatch: pytest.MonkeyPatch) -> None:
+ """When every column is in upsert_key, the MERGE skips the
+ UPDATE clause (no non-key columns to update)."""
+ _set_creds(monkeypatch)
+ conn = _fake_conn()
+ modules = _mocked_databricks_modules(conn)
+
+ records = [{"id": 1, "score": 0.95}]
+ config = _config(mode="merge", upsert_key=["id", "score"])
+ with patch.dict("sys.modules", modules):
+ DatabricksDestination().load(records, config, _options())
+
+ sqls = [(call.args[0] if call.args else "") for call in conn._cur.execute.call_args_list]
+ merge_sql = next(s for s in sqls if "MERGE INTO" in s)
+ assert "WHEN NOT MATCHED THEN INSERT" in merge_sql
+ assert "WHEN MATCHED THEN UPDATE" not in merge_sql
+
+
+# ---------------------------------------------------------------------------
+# sync.mode: mirror (#340 family — Databricks leg)
+# ---------------------------------------------------------------------------
+
+
+class TestDatabricksMirrorMode:
+ def test_mirror_requires_upsert_key(self, monkeypatch: pytest.MonkeyPatch) -> None:
+ _set_creds(monkeypatch)
+ modules = _mocked_databricks_modules(_fake_conn())
+ config = _config(upsert_key=None)
+ with patch.dict("sys.modules", modules):
+ with pytest.raises(ValueError, match="mirror requires destination.upsert_key"):
+ DatabricksDestination().load([{"id": 1}], config, _options(mode="mirror"))
+
+ def test_mirror_forces_merge_path_regardless_of_config_mode(
+ self, monkeypatch: pytest.MonkeyPatch
+ ) -> None:
+ """``sync.mode: mirror`` forces MERGE write path even when
+ ``config.mode: insert``. Matches Snowflake's leg of #340."""
+ _set_creds(monkeypatch)
+ conn = _fake_conn()
+ modules = _mocked_databricks_modules(conn)
+
+ config = _config(mode="insert", upsert_key=["id"])
+ with patch.dict("sys.modules", modules):
+ DatabricksDestination().load(
+ [{"id": 1, "score": 0.95}], config, _options(mode="mirror")
+ )
+
+ sqls = [(call.args[0] if call.args else "") for call in conn._cur.execute.call_args_list]
+ assert any("MERGE INTO main.default.user_scores" in s for s in sqls)
+
+ def test_mirror_finalize_issues_delete_not_in(self, monkeypatch: pytest.MonkeyPatch) -> None:
+ """End-of-sync DELETE removes destination rows whose
+ upsert_key was not observed in the source."""
+ _set_creds(monkeypatch)
+ conn = _fake_conn()
+ modules = _mocked_databricks_modules(conn)
+
+ config = _config(mode="merge", upsert_key=["id"])
+ dest = DatabricksDestination()
+ with patch.dict("sys.modules", modules):
+ dest.load(
+ [{"id": 1, "score": 0.5}, {"id": 2, "score": 0.9}],
+ config,
+ _options(mode="mirror"),
+ )
+ dest.finalize_sync(config, _options(mode="mirror"))
+
+ # The DELETE was issued (in a separate connection cycle)
+ sqls = [(call.args[0] if call.args else "") for call in conn._cur.execute.call_args_list]
+ delete_sql = next(s for s in sqls if s.startswith("DELETE FROM"))
+ assert "DELETE FROM main.default.user_scores" in delete_sql
+ assert "WHERE id NOT IN" in delete_sql
+
+ def test_mirror_finalize_composite_key_uses_tuple_form(
+ self, monkeypatch: pytest.MonkeyPatch
+ ) -> None:
+ """Composite upsert_key uses ``WHERE (c1, c2) NOT IN ((..., ...))``."""
+ _set_creds(monkeypatch)
+ conn = _fake_conn()
+ modules = _mocked_databricks_modules(conn)
+
+ config = _config(mode="merge", upsert_key=["tenant_id", "user_id"])
+ dest = DatabricksDestination()
+ with patch.dict("sys.modules", modules):
+ dest.load(
+ [{"tenant_id": "a", "user_id": 1, "score": 0.5}],
+ config,
+ _options(mode="mirror"),
+ )
+ dest.finalize_sync(config, _options(mode="mirror"))
+
+ sqls = [(call.args[0] if call.args else "") for call in conn._cur.execute.call_args_list]
+ delete_sql = next(s for s in sqls if s.startswith("DELETE FROM"))
+ assert "WHERE (tenant_id, user_id) NOT IN" in delete_sql
+
+ def test_mirror_skips_failed_keys_from_delete_observed_set(
+ self, monkeypatch: pytest.MonkeyPatch
+ ) -> None:
+ """Mirror's ``_mirror_keys`` accumulator must skip records that
+ failed the staging INSERT — those rows never made it into the
+ destination, so they shouldn't count as "observed in source"
+ for the end-of-sync DELETE."""
+ _set_creds(monkeypatch)
+ conn = _fake_conn()
+ cur = conn._cur
+
+ # First record's staging INSERT fails; second succeeds.
+ insert_call_count = {"n": 0}
+
+ def execute_side_effect(sql: str, *args: Any) -> None:
+ if "INSERT INTO main.default.__drt_staging_user_scores" in sql:
+ insert_call_count["n"] += 1
+ if insert_call_count["n"] == 1:
+ raise Exception("staging type mismatch")
+ return None
+
+ cur.execute.side_effect = execute_side_effect
+ modules = _mocked_databricks_modules(conn)
+
+ config = _config(mode="merge", upsert_key=["id"])
+ dest = DatabricksDestination()
+ with patch.dict("sys.modules", modules):
+ dest.load(
+ [{"id": 1, "score": 0.5}, {"id": 2, "score": 0.9}],
+ config,
+ _options(mode="mirror", on_error="skip"),
+ )
+ dest.finalize_sync(config, _options(mode="mirror"))
+
+ # The DELETE was issued and includes only id=2 (the survivor),
+ # not id=1 (which failed staging).
+ sqls = [(call.args[0] if call.args else "") for call in cur.execute.call_args_list]
+ delete_call = next(
+ call
+ for call in cur.execute.call_args_list
+ if call.args and call.args[0].startswith("DELETE FROM")
+ )
+ delete_params = delete_call.args[1] if len(delete_call.args) > 1 else []
+ assert delete_params == [2]
+ # And the DELETE was actually issued (not skipped — at least one
+ # record made it through).
+ assert any(s.startswith("DELETE FROM") for s in sqls)
+
+ def test_mirror_finalize_skipped_when_no_records_observed(
+ self, monkeypatch: pytest.MonkeyPatch
+ ) -> None:
+ """No batch ever produced records → ``finalize_sync`` skips the
+ DELETE entirely. Safety guard against wiping the destination
+ when the source is transiently empty."""
+ _set_creds(monkeypatch)
+ modules = _mocked_databricks_modules(_fake_conn())
+ config = _config(mode="merge", upsert_key=["id"])
+ dest = DatabricksDestination()
+ with patch.dict("sys.modules", modules):
+ # No load() call — _mirror_keys stays None
+ result = dest.finalize_sync(config, _options(mode="mirror"))
+
+ assert result is None
+
+ def test_finalize_sync_skipped_for_non_mirror_modes(
+ self, monkeypatch: pytest.MonkeyPatch
+ ) -> None:
+ """``finalize_sync`` is a no-op for any mode that isn't mirror."""
+ _set_creds(monkeypatch)
+ modules = _mocked_databricks_modules(_fake_conn())
+ config = _config(mode="merge", upsert_key=["id"])
+ dest = DatabricksDestination()
+ with patch.dict("sys.modules", modules):
+ assert dest.finalize_sync(config, _options(mode="full")) is None
+ assert dest.finalize_sync(config, _options(mode="upsert")) is None
+ assert dest.finalize_sync(config, _options(mode="replace")) is None
+
+
+# ---------------------------------------------------------------------------
+# test_connection
+# ---------------------------------------------------------------------------
+
+
+class TestDatabricksConnection:
+ def test_test_connection_runs_select_1(self, monkeypatch: pytest.MonkeyPatch) -> None:
+ _set_creds(monkeypatch)
+ conn = _fake_conn()
+ modules = _mocked_databricks_modules(conn)
+
+ with patch.dict("sys.modules", modules):
+ DatabricksDestination().test_connection(_config())
+
+ conn.close.assert_called_once()
+ assert any("SELECT 1" in str(call.args[0]) for call in conn._cur.execute.call_args_list)