Skip to content

Commit

Permalink
Merge pull request #1934 from dlt-hub/devel
Browse files Browse the repository at this point in the history
master merge for 1.2.0 release
  • Loading branch information
rudolfix authored Oct 7, 2024
2 parents d2b6d05 + 2d07a43 commit 8798c17
Show file tree
Hide file tree
Showing 111 changed files with 2,733 additions and 1,888 deletions.
2 changes: 1 addition & 1 deletion dlt/common/runtime/anon_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def _default_context_fields() -> TExecutionContext:
global _TRACKER_CONTEXT

if not _TRACKER_CONTEXT:
# Make sure to update the example in docs/docs/telemetry/telemetry.mdx
# Make sure to update the example in docs/reference/telemetry.md
# if you change / add context
_TRACKER_CONTEXT = get_execution_context()

Expand Down
9 changes: 7 additions & 2 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,15 +232,20 @@ class TWriteDispositionDict(TypedDict):
disposition: TWriteDisposition


class TMergeDispositionDict(TWriteDispositionDict, total=False):
class TMergeDispositionDict(TWriteDispositionDict):
strategy: Optional[TLoaderMergeStrategy]


class TScd2StrategyDict(TMergeDispositionDict, total=False):
validity_column_names: Optional[List[str]]
active_record_timestamp: Optional[TAnyDateTime]
boundary_timestamp: Optional[TAnyDateTime]
row_version_column_name: Optional[str]


TWriteDispositionConfig = Union[TWriteDisposition, TWriteDispositionDict, TMergeDispositionDict]
TWriteDispositionConfig = Union[
TWriteDisposition, TWriteDispositionDict, TMergeDispositionDict, TScd2StrategyDict
]


class _TTableSchemaBase(TTableProcessingHints, total=False):
Expand Down
6 changes: 6 additions & 0 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ def gen_delete_temp_table_sql(
sql.insert(0, f"""DROP TABLE IF EXISTS {temp_table_name.replace('"', '`')};""")
return sql, temp_table_name

@classmethod
def gen_concat_sql(cls, columns: Sequence[str]) -> str:
# Athena requires explicit casting
columns = [f"CAST({c} AS VARCHAR)" for c in columns]
return f"CONCAT({', '.join(columns)})"

@classmethod
def requires_temp_table_for_delete(cls) -> bool:
return True
Expand Down
16 changes: 12 additions & 4 deletions dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from dlt.destinations.job_impl import ReferenceFollowupJobRequest

AZURE_BLOB_STORAGE_PROTOCOLS = ["az", "abfss", "abfs"]
SUPPORTED_BLOB_STORAGE_PROTOCOLS = AZURE_BLOB_STORAGE_PROTOCOLS + ["s3", "gs", "gcs"]


class DatabricksLoadJob(RunnableLoadJob, HasFollowupJobs):
Expand Down Expand Up @@ -69,11 +70,12 @@ def run(self) -> None:
bucket_url = urlparse(bucket_path)
bucket_scheme = bucket_url.scheme

if bucket_scheme not in AZURE_BLOB_STORAGE_PROTOCOLS + ["s3"]:
if bucket_scheme not in SUPPORTED_BLOB_STORAGE_PROTOCOLS:
raise LoadJobTerminalException(
self._file_path,
f"Databricks cannot load data from staging bucket {bucket_path}. Only s3 and"
" azure buckets are supported",
f"Databricks cannot load data from staging bucket {bucket_path}. Only s3, azure"
" and gcs buckets are supported. Please note that gcs buckets are supported"
" only via named credential",
)

if self._job_client.config.is_staging_external_location:
Expand Down Expand Up @@ -106,6 +108,12 @@ def run(self) -> None:
bucket_path = self.ensure_databricks_abfss_url(
bucket_path, staging_credentials.azure_storage_account_name
)
else:
raise LoadJobTerminalException(
self._file_path,
"You need to use Databricks named credential to use google storage."
" Passing explicit Google credentials is not supported by Databricks.",
)

if bucket_scheme in AZURE_BLOB_STORAGE_PROTOCOLS:
assert isinstance(
Expand All @@ -125,7 +133,7 @@ def run(self) -> None:
raise LoadJobTerminalException(
self._file_path,
"Cannot load from local file. Databricks does not support loading from local files."
" Configure staging with an s3 or azure storage bucket.",
" Configure staging with an s3, azure or google storage bucket.",
)

# decide on source format, stage_file_path will either be a local file or a bucket path
Expand Down
10 changes: 5 additions & 5 deletions dlt/destinations/impl/sqlalchemy/db_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,16 +238,16 @@ def _sqlite_create_dataset(self, dataset_name: str) -> None:
"""Mimic multiple schemas in sqlite using ATTACH DATABASE to
attach a new database file to the current connection.
"""
if dataset_name == "main":
# main always exists
return
if self._sqlite_is_memory_db():
new_db_fn = ":memory:"
else:
new_db_fn = self._sqlite_dataset_filename(dataset_name)

statement = "ATTACH DATABASE :fn AS :name"
self.execute_sql(statement, fn=new_db_fn, name=dataset_name)
if dataset_name != "main": # main is the current file, it is always attached
statement = "ATTACH DATABASE :fn AS :name"
self.execute_sql(statement, fn=new_db_fn, name=dataset_name)
# WAL mode is applied to all currently attached databases
self.execute_sql("PRAGMA journal_mode=WAL")
self._sqlite_attached_datasets.add(dataset_name)

def _sqlite_drop_dataset(self, dataset_name: str) -> None:
Expand Down
12 changes: 12 additions & 0 deletions dlt/destinations/impl/sqlalchemy/factory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import typing as t

from dlt.common import pendulum
from dlt.common.destination import Destination, DestinationCapabilitiesContext
from dlt.common.destination.capabilities import DataTypeMapper
from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
Expand All @@ -9,6 +10,7 @@
SqlalchemyCredentials,
SqlalchemyClientConfiguration,
)
from dlt.common.data_writers.escape import format_datetime_literal

SqlalchemyTypeMapper: t.Type[DataTypeMapper]

Expand All @@ -24,6 +26,13 @@
from sqlalchemy.engine import Engine


def _format_mysql_datetime_literal(
v: pendulum.DateTime, precision: int = 6, no_tz: bool = False
) -> str:
# Format without timezone to prevent tz conversion in SELECT
return format_datetime_literal(v, precision, no_tz=True)


class sqlalchemy(Destination[SqlalchemyClientConfiguration, "SqlalchemyJobClient"]):
spec = SqlalchemyClientConfiguration

Expand All @@ -50,6 +59,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_multiple_statements = False
caps.type_mapper = SqlalchemyTypeMapper
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]
caps.supported_merge_strategies = ["delete-insert", "scd2"]

return caps

Expand All @@ -67,6 +77,8 @@ def adjust_capabilities(
caps.max_identifier_length = dialect.max_identifier_length
caps.max_column_identifier_length = dialect.max_identifier_length
caps.supports_native_boolean = dialect.supports_native_boolean
if dialect.name == "mysql":
caps.format_datetime_literal = _format_mysql_datetime_literal

return caps

Expand Down
9 changes: 9 additions & 0 deletions dlt/destinations/impl/sqlalchemy/load_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from dlt.destinations.sql_jobs import SqlFollowupJob, SqlJobParams

from dlt.destinations.impl.sqlalchemy.db_api_client import SqlalchemyClient
from dlt.destinations.impl.sqlalchemy.merge_job import SqlalchemyMergeFollowupJob

if TYPE_CHECKING:
from dlt.destinations.impl.sqlalchemy.sqlalchemy_job_client import SqlalchemyJobClient
Expand Down Expand Up @@ -134,3 +135,11 @@ def generate_sql(
statements.append(stmt)

return statements


__all__ = [
"SqlalchemyJsonLInsertJob",
"SqlalchemyParquetInsertJob",
"SqlalchemyStagingCopyJob",
"SqlalchemyMergeFollowupJob",
]
Loading

0 comments on commit 8798c17

Please sign in to comment.