Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- **Databricks Delta Lake destination** ([#167](https://github.com/drt-hub/drt/issues/167)): the third DWH destination alongside Snowflake ([#353](https://github.com/drt-hub/drt/pull/353)) and BigQuery ([#584](https://github.com/drt-hub/drt/pull/584), in flight) — completes the major-DWH lineup. Supports **INSERT** (append), **MERGE** (upsert via Delta Lake's native `MERGE INTO`), and **`sync.mode: mirror`** (upsert + end-of-sync DELETE-missing — joins the Postgres / MySQL / ClickHouse / Snowflake mirror family from [#340](https://github.com/drt-hub/drt/issues/340)). Auth via the **Databricks SQL Connector**: `host_env` resolves to the workspace hostname (e.g. `dbc-abc12345-1234.cloud.databricks.com`), `http_path_env` to the SQL warehouse HTTP path (e.g. `/sql/1.0/warehouses/abc123def456`), `token_env` to a personal access token (PAT, `dapi*`). The token-bearing principal needs `USAGE` on the catalog + schema, `MODIFY` on the target table, and `CREATE` on the schema for the merge-path staging. Unity Catalog three-part names (`catalog.schema.table`) are the default; legacy workspaces use `catalog: hive_metastore`. **Merge implementation**: because Delta Lake doesn't have session-local temp tables (no `CREATE TEMP TABLE` syntax), drt creates a uniquely-named Delta scratch table `catalog.schema.__drt_staging_<table>` cloned from the target's schema, stages rows via per-row `INSERT`, executes the `MERGE INTO target USING staging ON <upsert_key>` statement, and `DROP TABLE`s the staging at the end. The `__drt_staging_*` prefix makes the scratch table identifiable in audit logs; a `CREATE OR REPLACE TABLE` on the next run cleanly overwrites any interrupted-mid-sync remnant. Composite keys are supported (`upsert_key: [tenant_id, user_id]`) — the `ON` clause becomes AND-joined. When every column is in `upsert_key`, the MERGE skips the `UPDATE` clause (effectively INSERT-IF-NOT-EXISTS). **Mirror semantics**: `sync.mode: mirror` forces the MERGE write path regardless of `config.mode`, then issues `DELETE FROM <table> WHERE upsert_key NOT IN (observed)` at end-of-sync. Composite keys use the `WHERE (c1, c2) NOT IN ((v1a, v1b), (v2a, v2b), ...)` form (same shape as Snowflake's #599 leg). Safety guard: if no batch ever produced records, the DELETE is skipped entirely so a transient empty source doesn't wipe the destination. Empty batches short-circuit before any `databricks.sql` import or warehouse call — the same implicit "no driver was imported" contract used by the other SQL destinations ([#595](https://github.com/drt-hub/drt/pull/595)). Row-level errors during INSERT are captured in `result.row_errors`; `on_error: skip` (default) continues the sync, `on_error: fail` raises. Requires `pip install drt-core[databricks]` (depends on `databricks-sql-connector>=3.0`). New `docs/connectors/databricks.md` covers all three modes, the auth flow with PAT generation steps, the Unity Catalog vs Hive Metastore catalog field, the merge-path staging design (why a Delta scratch table and not `CREATE TEMP TABLE`), and a sync-mode compatibility table. 22 unit tests in `tests/unit/test_databricks_destination.py` cover config validation (including the `schema:` YAML alias for the mypy-strict `schema_` field, three-part FQN in `describe()`, and Hive Metastore catalog), the empty-batch short-circuit, the `databricks.sql.connect()` kwargs shape (protects against silent template-copy drift from the Snowflake destination, since the two share most other code), INSERT / MERGE happy paths, the `upsert_key`-required ValueError, on_error skip vs fail behaviour, composite-key MERGE `ON` clause, all-columns-are-key MERGE (no UPDATE clause), all mirror invariants (`upsert_key` validation, MERGE-write-path forcing, single-column DELETE, composite-key DELETE tuple form, skip-when-no-records safety guard, no-op `finalize_sync` for non-mirror modes), and the `test_connection` round-trip. `databricks.sql` is mocked via `sys.modules` injection (no real Databricks workspace or `databricks-sql-connector` install required). Closes [#167](https://github.com/drt-hub/drt/issues/167). BigQuery destination ([#584](https://github.com/drt-hub/drt/pull/584), contributor PR) follows the same DWH MERGE shape and remains in flight.

- **Amazon S3 destination** ([#168](https://github.com/drt-hub/drt/issues/168)): new file-style destination that uploads each sync batch as a single file to an S3 bucket, in any of four formats — **csv** / **json** / **jsonl** / **parquet** — with optional **gzip** compression for the three text formats (parquet uses its own column-level compression via `parquet_compression`). Authentication defers to boto3's standard credential chain (env vars → `~/.aws/credentials` → instance profile → IAM role), which is the right shape for most real deployments; explicit overrides are available via `aws_profile`, `aws_access_key_id_env` / `aws_secret_access_key_env` / `aws_session_token_env`, or by setting `endpoint_url` for S3-compatible services (MinIO, LocalStack, Cloudflare R2, DigitalOcean Spaces). Default S3 key is `<prefix><UTC ISO8601 basic>.<ext>` — timestamped so re-runs land at a fresh key rather than overwriting, matching the Census / Hightouch convention and making downstream "new files since last check" polling trivial. Override the file name via `key_template` (supports the `{timestamp}` placeholder); for per-sync routing, the recommended pattern is a sync-specific `prefix`. Empty batches short-circuit before any boto3 import or AWS call — the same implicit "no driver was imported" contract used by the SQL destinations ([#595](https://github.com/drt-hub/drt/pull/595)), so a run with zero source rows produces zero S3 objects. Failure semantics distinguish missing-extras (`ImportError` bubbles up so the engine surfaces the deployment mistake once at the top) from network / auth / permission errors (recorded as `result.failed = len(records)` so other batches keep going) — see `drt/destinations/s3.py`. Requires `pip install drt-core[s3]` (and additionally `drt-core[parquet]` for `format: parquet`). New `docs/connectors/s3.md` covers all four formats, the authentication flow, the file-naming convention with `key_template` examples, and the MinIO / R2 endpoint override pattern. 21 unit tests in `tests/unit/test_s3_destination.py` cover config validation across all four formats and both compressions, the implicit no-boto3-import empty-batch path, put_object call shape per format, gzip → `ContentEncoding: gzip` header + `.gz` extension, key-naming default + template overrides (with and without an explicit extension), the credential threading (profile / env vars / endpoint URL), and both failure paths (serialisation error → row failures, missing extras → ImportError bubble-up). boto3 is mocked via `sys.modules` injection (no real AWS account or boto3 install required for the non-parquet tests). Closes [#168](https://github.com/drt-hub/drt/issues/168). GCS ([#169](https://github.com/drt-hub/drt/issues/169)) and Azure Blob ([#170](https://github.com/drt-hub/drt/issues/170)) are natural follow-ups — the serialiser / key-naming / credential-threading layer here is reusable.

### Documentation
Expand Down
1 change: 1 addition & 0 deletions README.ja.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ Claude Codeの公式スキルをインストールすると、チャットイン
| Salesforce Bulk API 2.0 | ✅ v0.6 | (core) | OAuth2(username-password) |
| Staged Upload | ✅ v0.6 | (core) | プロバイダーごとに設定 |
| Snowflake | ✅ v0.7 | `pip install drt-core[snowflake]` | パスワード(環境変数) |
| Databricks Delta Lake | ✅ v0.7.9 | `pip install drt-core[databricks]` | Personal Access Token(環境変数) |

### インテグレーション

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ Copy the files from `.claude/commands/` into your drt project's `.claude/command
| Salesforce Bulk API 2.0 | ✅ v0.6 | (core) | OAuth2 (username-password) |
| Staged Upload | ✅ v0.6 | (core) | Configurable per provider |
| Snowflake | ✅ v0.7 | `pip install drt-core[snowflake]` | Password (env var) |
| Databricks Delta Lake | ✅ v0.7.9 | `pip install drt-core[databricks]` | Personal Access Token (env var) |

### Integrations

Expand Down
174 changes: 174 additions & 0 deletions docs/connectors/databricks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
# Databricks Destination

> Write data back to Databricks Delta Lake tables via the Databricks
> SQL Connector. Supports **INSERT**, **MERGE** (upsert), and
> **`sync.mode: mirror`** (upsert + DELETE-missing) — the same modes
> as the [Snowflake destination](snowflake.md).

## YAML Example — INSERT (append)

```yaml
destination:
type: databricks
host_env: DATABRICKS_HOST
http_path_env: DATABRICKS_HTTP_PATH
token_env: DATABRICKS_TOKEN
catalog: main
schema: default
table: user_events
mode: insert
```

## YAML Example — MERGE (upsert)

```yaml
destination:
type: databricks
host_env: DATABRICKS_HOST
http_path_env: DATABRICKS_HTTP_PATH
token_env: DATABRICKS_TOKEN
catalog: main
schema: default
table: user_scores
mode: merge
upsert_key: [user_id]
```

## YAML Example — Mirror (upsert + delete-missing)

```yaml
destination:
type: databricks
host_env: DATABRICKS_HOST
http_path_env: DATABRICKS_HTTP_PATH
token_env: DATABRICKS_TOKEN
catalog: main
schema: analytics
table: active_users
upsert_key: [user_id] # required for mirror

sync:
mode: mirror # forces MERGE write path + end-of-sync DELETE
```

## Configuration

| Field | Type | Default | Description |
|---|---|---|---|
| `type` | `"databricks"` | — | Required |
| `host_env` | string | — | Env var name holding the workspace hostname (e.g. `dbc-abc12345-1234.cloud.databricks.com`) |
| `http_path_env` | string | — | Env var name holding the SQL warehouse HTTP path (e.g. `/sql/1.0/warehouses/abc123def456`) |
| `token_env` | string | — | Env var name holding a Databricks personal access token (PAT, starts with `dapi`) |
| `catalog` | string | — | Unity Catalog catalog name. Use `hive_metastore` for legacy workspaces. |
| `schema` | string | — | Database/schema name within the catalog |
| `table` | string | — | Target Delta Lake table name |
| `mode` | `insert` \| `merge` | `insert` | Write mode. `merge` requires `upsert_key`. |
| `upsert_key` | list[str] \| null | null | Column list that uniquely identifies a row. Required for `mode: merge` and for `sync.mode: mirror`. Composite keys supported (`[tenant_id, user_id]`). |

## Authentication

Databricks SQL Connector authenticates via a **personal access token**
(PAT). Generate one in the Databricks workspace at User Settings →
Developer → Access tokens.

```bash
export DATABRICKS_HOST=dbc-abc12345-1234.cloud.databricks.com
export DATABRICKS_HTTP_PATH=/sql/1.0/warehouses/abc123def456
export DATABRICKS_TOKEN=dapi-xxxxxxxxxxxxxxxxxxxx
```

The token-bearing principal needs:
- `USAGE` on the catalog and schema
- `MODIFY` on the target table
- `CREATE` on the schema (for the merge-path staging table — see below)

> **OAuth M2M / service principal flows** are not yet supported.
> Track [#634](https://github.com/drt-hub/drt/issues) if you need
> them — for now use a PAT on a service-principal-scoped account.

## Write modes

### `mode: insert`

Issues one `INSERT INTO catalog.schema.table (...) VALUES (...)` per
record. Best for **append-only** workloads — event streams, audit
logs, telemetry.

Errors at the row level are tracked in `result.row_errors`; the
sync continues for the rest of the batch unless `on_error: fail`.

### `mode: merge`

Creates a uniquely-named **Delta scratch table**
(`catalog.schema.__drt_staging_<table>`), stages rows into it via
per-row `INSERT`, then issues `MERGE INTO target USING staging ON
<upsert_key> WHEN MATCHED THEN UPDATE WHEN NOT MATCHED THEN INSERT`,
and finally `DROP TABLE` the staging table.

**Why a Delta scratch table and not `CREATE TEMP TABLE`?** Databricks
Delta Lake doesn't have session-local tables — the standard
`CREATE TEMP TABLE` syntax isn't supported. A uniquely-named scratch
table in the same catalog.schema is the idiomatic shape. The
`__drt_staging_*` prefix makes it identifiable in audit logs.

**Composite keys** are supported (`upsert_key: [tenant_id, user_id]`)
— the `ON` clause becomes `target.tenant_id = source.tenant_id AND
target.user_id = source.user_id`.

If every column is in `upsert_key`, the MERGE skips the `UPDATE` clause
(no non-key columns to update); the resulting statement is effectively
`INSERT-IF-NOT-EXISTS`.

### `sync.mode: mirror`

Mirrors the source table to the destination: upserts source rows AND
deletes destination rows whose `upsert_key` was not observed in the
source. Forces the MERGE write path regardless of `config.mode`, so
the only YAML changes needed are:

```yaml
destination:
...
upsert_key: [user_id] # required
sync:
mode: mirror
```

End-of-sync, drt issues a single
`DELETE FROM catalog.schema.table WHERE upsert_key NOT IN (observed)`
against the destination. Composite keys use the
`WHERE (c1, c2) NOT IN ((v1a, v1b), (v2a, v2b), ...)` form.

**Safety guard**: if no batch ever produced records (source returned
zero rows), the DELETE is skipped entirely — protects against wiping
the destination when the source is transiently empty.

Mirror semantics fit the same shape as Postgres / MySQL / ClickHouse /
Snowflake mirror destinations (see #340) — same `upsert_key` contract,
same source-key-cardinality memory bound on `_mirror_keys`.

## Sync modes

| `sync.mode` | Behaviour on Databricks |
|-------------|--------------------------|
| `full` | Re-extracts every run + writes via `config.mode` (insert / merge). |
| `incremental` | Watermark-based — extracts rows with `cursor_field > last_value`, writes via `config.mode`. |
| `upsert` | Same as `incremental` but with `upsert_key` enforced. |
| `mirror` | Forces the MERGE write path + end-of-sync DELETE. `upsert_key` required. |
| `replace` | Not yet supported on Databricks (use `mirror` for "everything in source = everything in dest" semantics). |

## Notes

- Requires `pip install drt-core[databricks]` (depends on `databricks-sql-connector>=3.0`).
- Target table must be a **Delta Lake table**. Non-Delta formats fail at `MERGE INTO` time with a Databricks server error.
- Empty batches short-circuit before any `databricks.sql` import or warehouse call — the same "no driver was imported" contract used by the SQL destinations (#595). A run with zero source rows produces zero warehouse statements.
- Errors during the connect step (missing env vars, bad token, network) raise immediately; errors during row INSERT are captured per-row and surface in `result.row_errors`.
- The `__drt_staging_*` scratch table is dropped at the end of every merge run. If a sync is interrupted mid-merge, the next run's `CREATE OR REPLACE TABLE` overwrites it cleanly.
- Unity Catalog three-part names are the default. Workspaces still on Hive Metastore should use `catalog: hive_metastore`.

## References

- [Databricks SQL Connector for Python](https://docs.databricks.com/dev-tools/python-sql-connector.html)
- [Delta Lake MERGE INTO](https://docs.databricks.com/delta/merge.html)
- [Personal access tokens](https://docs.databricks.com/dev-tools/auth/pat.html)
- Sibling SQL destinations with the same shape: [Snowflake](snowflake.md), [PostgreSQL](postgres.md), [MySQL](mysql.md), [ClickHouse](clickhouse.md)
41 changes: 40 additions & 1 deletion drt/config/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,44 @@ def describe(self) -> str:
return f"{self.type} ({self.database}.{self.schema_}.{self.table})"


class DatabricksDestinationConfig(BaseModel):
"""Databricks Delta Lake destination — write data back to Databricks tables.

Auth via the Databricks SQL Connector: a SQL warehouse HTTP path
plus a personal access token (PAT). The token-bearing user needs
USAGE on the catalog + schema and INSERT/MODIFY on the target
table.
"""

type: Literal["databricks"]

# Workspace hostname (env-var resolved), e.g.
# ``dbc-abc12345-1234.cloud.databricks.com``.
host_env: str
# SQL warehouse HTTP path (env-var resolved), e.g.
# ``/sql/1.0/warehouses/abc123def456``.
http_path_env: str
# Databricks personal access token (env-var resolved). Starts with ``dapi``.
token_env: str

# Three-part name (Unity Catalog). For Hive Metastore deployments
# use catalog="hive_metastore".
catalog: str
# Field alias because BaseModel.schema() shadows a plain `schema`
# attribute under mypy strict mode; YAML key stays `schema:`.
schema_: str = Field(alias="schema")
table: str

mode: Literal["insert", "merge"] = "insert"

# Required for merge mode and for ``sync.mode: mirror``. Composite
# keys supported (`upsert_key: [tenant_id, user_id]`).
upsert_key: list[str] | None = None

def describe(self) -> str:
return f"{self.type} ({self.catalog}.{self.schema_}.{self.table})"


class LinearDestinationConfig(BaseModel):
type: Literal["linear"]
team_id: str | None = None
Expand Down Expand Up @@ -807,7 +845,8 @@ def _check_instance_url(self) -> SalesforceBulkDestinationConfig:
| StagedUploadDestinationConfig
| SalesforceBulkDestinationConfig
| TwilioDestinationConfig
| SnowflakeDestinationConfig,
| SnowflakeDestinationConfig
| DatabricksDestinationConfig,
Field(discriminator="type"),
]

Expand Down
3 changes: 3 additions & 0 deletions drt/connectors/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def _register_all_connectors() -> None:
from drt.config.models import (
AmplitudeDestinationConfig,
ClickHouseDestinationConfig,
DatabricksDestinationConfig,
DiscordDestinationConfig,
EmailSmtpDestinationConfig,
FileDestinationConfig,
Expand Down Expand Up @@ -170,6 +171,7 @@ def _register_all_connectors() -> None:
# Import destination classes
from drt.destinations.amplitude import AmplitudeDestination
from drt.destinations.clickhouse import ClickHouseDestination
from drt.destinations.databricks import DatabricksDestination
from drt.destinations.discord import DiscordDestination
from drt.destinations.email_smtp import EmailSmtpDestination
from drt.destinations.file import FileDestination
Expand Down Expand Up @@ -239,6 +241,7 @@ def _register_all_connectors() -> None:
register_destination("staged_upload", StagedUploadDestinationConfig, StagedUploadDestination)
register_destination("intercom", IntercomDestinationConfig, IntercomDestination)
register_destination("snowflake", SnowflakeDestinationConfig, SnowflakeDestination)
register_destination("databricks", DatabricksDestinationConfig, DatabricksDestination)

# Register all sources
register_source("bigquery", BigQueryProfile, BigQuerySource)
Expand Down
Loading
Loading