Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/2372 extend dataset querying API for incremental load #2386

Open
wants to merge 13 commits into
base: devel
Choose a base branch
from
45 changes: 45 additions & 0 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,33 @@ def table_schema_has_type_with_precision(table: TTableSchema, _typ: TDataType) -
)


def get_row_key(table: TTableSchema) -> str:
for col_name, col in table["columns"].items():
if col.get("row_key") is True:
return col_name
raise KeyError(
f"No row key found for table {table['name']}. This is likely a malformed table schema."
)


def get_parent_key(table: TTableSchema) -> Optional[str]:
for col_name, col in table["columns"].items():
if col.get("parent_key") is True:
return col_name
return None


def is_root_table(table: TTableSchema) -> bool:
return table.get("parent") is None


def get_root_key(table: TTableSchema) -> Optional[str]:
for col_name, col in table["columns"].items():
if col.get("root_key") is True:
return col_name
return None


def get_root_table(tables: TSchemaTables, table_name: str) -> TTableSchema:
"""Finds root (without parent) of a `table_name` following the nested references (row_key - parent_key)."""
table = tables[table_name]
Expand All @@ -837,6 +864,24 @@ def get_root_table(tables: TSchemaTables, table_name: str) -> TTableSchema:
return table


def get_root_to_table_chain(tables: TSchemaTables, table_name: str) -> List[TTableSchema]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think you need this anymore, do you?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove it for now, but we might want it if we want to support these joins when root_key=True is missing

"""Return a list of tables ordered from root to child using the (row_key - parent_key) references.

Similar functions:
- `get_root_table()` returns the root of the specified child instead of the full chain
- `get_nested_tables()` returns all children of the specified table as a chain
"""
chain: List[TTableSchema] = []

def _parent(t: TTableSchema) -> None:
chain.append(t)
if t.get("parent"):
_parent(tables[t["parent"]])

_parent(tables[table_name])
return chain[::-1]


def get_nested_tables(tables: TSchemaTables, table_name: str) -> List[TTableSchema]:
"""Get nested tables for table name and return a list of tables ordered by ancestry so the nested tables are always after their parents

Expand Down
46 changes: 39 additions & 7 deletions dlt/destinations/dataset/dataset.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
from typing import Any, Union, TYPE_CHECKING, List

from dlt.common.json import json

from dlt.common.exceptions import MissingDependencyException

from dlt.common.destination.reference import TDestinationReferenceArg, Destination
from dlt.common.destination.client import JobClientBase, WithStateSync
from dlt.common.destination.dataset import SupportsReadableRelation, SupportsReadableDataset
from dlt.common.destination.reference import TDestinationReferenceArg, Destination
from dlt.common.destination.typing import TDatasetType

from dlt.destinations.sql_client import SqlClientBase, WithSqlClient
from dlt.common.exceptions import MissingDependencyException
from dlt.common.json import json
from dlt.common.schema import Schema
from dlt.destinations.dataset.relation import ReadableDBAPIRelation
from dlt.destinations.dataset.utils import get_destination_clients
from dlt.destinations.sql_client import SqlClientBase, WithSqlClient

if TYPE_CHECKING:
try:
Expand Down Expand Up @@ -133,6 +130,7 @@ def table(self, table_name: str) -> SupportsReadableRelation:
readable_dataset=self,
ibis_object=unbound_table,
columns_schema=self.schema.tables[table_name]["columns"],
table_name=table_name,
)
except MissingDependencyException:
# if ibis is explicitly requested, reraise
Expand Down Expand Up @@ -171,6 +169,40 @@ def row_counts(
# Execute query and build result dict
return self(query)

def list_load_ids(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above

self, status: Union[int, list[int], None] = 0, limit: Union[int, None] = 10
) -> SupportsReadableRelation:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be implemented as ibis expression and raise if ibis is not installed

"""Return the list most recent `load_id`s in descending order.

Executing the query returns a list of tuples [(load_id, ), ...]
If no `load_id` is found, return empty list.
"""
# TODO protect from SQL injection
query = f"SELECT load_id FROM {self.schema.loads_table_name} "

if status is not None:
status_list = [status] if isinstance(status, int) else status
query += f"WHERE status IN {status_list} "

query += "ORDER BY load_id DESC "

if limit is not None:
query += f"LIMIT {limit}"

return self(query)

def latest_load_id(self, status: Union[int, list[int], None] = 0) -> SupportsReadableRelation:
"""Return the latest `load_id`.

Executing the query returns a list of length 1 with tuple [(latest_load_id, )]
"""
query = f"SELECT max(load_id) FROM {self.schema.loads_table_name} "
if status is not None:
status_list = [status] if isinstance(status, int) else status
query += f"WHERE status IN {status_list} "

return self(query)

def __getitem__(self, table_name: str) -> SupportsReadableRelation:
"""access of table via dict notation"""
return self.table(table_name)
Expand Down
157 changes: 147 additions & 10 deletions dlt/destinations/dataset/ibis_relation.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
from typing import TYPE_CHECKING, Any, Union, Sequence

from collections.abc import Sequence
from functools import partial
from typing import TYPE_CHECKING, Any, Optional, Union

from dlt.common.exceptions import MissingDependencyException
from dlt.destinations.dataset.relation import BaseReadableDBAPIRelation
from dlt.common.schema.utils import (
get_root_table,
get_root_to_table_chain,
get_row_key,
get_parent_key,
get_root_key,
is_root_table,
)
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.destinations.dataset.relation import BaseReadableDBAPIRelation


if TYPE_CHECKING:
Expand All @@ -13,9 +21,9 @@
ReadableDBAPIDataset = Any

try:
from dlt.helpers.ibis import Expr
from dlt.helpers.ibis import Table
except MissingDependencyException:
Expr = Any
Table = Any

# map dlt destination to sqlglot dialect
DIALECT_MAP = {
Expand Down Expand Up @@ -54,11 +62,13 @@ def __init__(
readable_dataset: ReadableDBAPIDataset,
ibis_object: Any = None,
columns_schema: TTableSchemaColumns = None,
table_name: str = None,
) -> None:
"""Create a lazy evaluated relation to for the dataset of a destination"""
super().__init__(readable_dataset=readable_dataset)
self._ibis_object = ibis_object
self._columns_schema = columns_schema
self._table_name = table_name

def query(self) -> Any:
"""build the query"""
Expand All @@ -85,6 +95,10 @@ def columns_schema(self) -> TTableSchemaColumns:
def columns_schema(self, new_value: TTableSchemaColumns) -> None:
raise NotImplementedError("columns schema in ReadableDBAPIRelation can only be computed")

@property
def table_name(self) -> str:
return self._table_name

def compute_columns_schema(self) -> TTableSchemaColumns:
"""provide schema columns for the cursor, may be filtered by selected columns"""
# TODO: provide column lineage tracing with sqlglot lineage
Expand Down Expand Up @@ -152,15 +166,42 @@ def __getattr__(self, name: str) -> Any:

return partial(self._proxy_expression_method, name)

def __getitem__(self, columns: Union[str, Sequence[str]]) -> "ReadableIbisRelation":
# TODO this doesn't respect superclass with `columns: Sequence[str]`
def __getitem__(self, *columns: str) -> "ReadableIbisRelation":
"""Proxy method to select columns on an Ibis expression.

This supports 3 notations:
```
self["foo"] # Column type
self["foo", "bar"] # Table type
self[["foo", "bar"]] # Table type
```
Ibis reference: https://ibis-project.org/tutorials/ibis-for-pandas-users#selecting-columns
"""
# casefold column-names
columns = [columns] if isinstance(columns, str) else columns
columns = [self.sql_client.capabilities.casefold_identifier(col) for col in columns]
expr = self._ibis_object[columns]
# self["foo"]
if len(columns) == 1 and isinstance(columns[0], str):
col = self.sql_client.capabilities.casefold_identifier(columns[0])
cols = [col]
expr = self._ibis_object[col]
# self[["foo", "bar"]]
elif len(columns) == 1 and isinstance(columns[0], Sequence):
cols = [self.sql_client.capabilities.casefold_identifier(col) for col in columns[0]]
expr = self._ibis_object[cols]
# self["foo", "bar"]
elif all(isinstance(col, str) for col in columns):
cols = [self.sql_client.capabilities.casefold_identifier(col) for col in columns]
expr = self._ibis_object[cols]
else:
raise ValueError(
"ReadableIbisRelation can be accessed using `rel['foo']` to retrieve a column, or"
" `rel['foo', 'bar']` and `rel[['foo', 'bar']]` to access a table"
)

return self.__class__(
readable_dataset=self._dataset,
ibis_object=expr,
columns_schema=self._get_filtered_columns_schema(columns),
columns_schema=self._get_filtered_columns_schema(cols),
)

def _get_filtered_columns_schema(self, columns: Sequence[str]) -> TTableSchemaColumns:
Expand All @@ -173,6 +214,32 @@ def _get_filtered_columns_schema(self, columns: Sequence[str]) -> TTableSchemaCo
# here we just break the column schema inheritance chain
return None

def _join_to_root_table(self) -> "ReadableIbisRelation":
"""Join the current table to the root table. If the current table is root, it's a no-op."""
LOAD_ID_COL = "_dlt_load_id"

table_schema = self.schema.tables[self.table_name]
if is_root_table(table_schema):
return self

root_key = get_root_key(table_schema)
# TODO setup another case that traverse parent-row keys to join nested tables without root_key
if root_key is None:
raise KeyError(
"ReadableIbisRelation requires a `root_key` hint to join non-root tables. "
"Set `root_key=True` on the source or use `write_disposition='merge'`."
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add a link to the transformation docs in the future for this exception.

)
root_table_schema = get_root_table(self.schema.tables, self.table_name)
root_row_key = get_row_key(root_table_schema)
root_table = self._dataset.table(root_table_schema["name"])

joined_table = self.inner_join(
root_table.select(LOAD_ID_COL, root_row_key),
self[root_key] == root_table[root_row_key],
)
# `self` selects all columns from the original table
return joined_table.select(self, LOAD_ID_COL) # type: ignore

# forward ibis methods defined on interface
def limit(self, limit: int, **kwargs: Any) -> "ReadableIbisRelation":
"""limit the result to 'limit' items"""
Expand All @@ -186,6 +253,76 @@ def select(self, *columns: str) -> "ReadableIbisRelation":
"""set which columns will be selected"""
return self._proxy_expression_method("select", *columns) # type: ignore

# TODO ensure same defaults in ReadableDBAPIDataset and ReadableIbisRelation; and docstrings
def list_load_ids(
self, status: Union[int, list[int], None] = 0, limit: Optional[int] = None
) -> "ReadableIbisRelation":
load_table = self._dataset.table(self.schema.loads_table_name)
if status is not None:
status = [status] if isinstance(status, int) else status
load_table = load_table.filter(load_table["status"].isin(status))

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be interesting to find out wether isin with a one item list is the same speed as filter with that value. I'd hope the sql optimizer would find this automatically. Just a thought.

Copy link
Collaborator Author

@zilto zilto Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the code uses a filter clause. For .isin() with subqueries (what I was using before), it should be converted to EXIST clauses (ref)

load_table = load_table.order_by(load_table["load_id"].desc())
# limit needs to be applied after sorting
if limit is not None:
load_table = load_table.limit(limit)

return load_table.load_id # type: ignore

def latest_load_id(self, status: Union[int, list[int], None] = 0) -> "ReadableIbisRelation":
"""Latest `load_id` with matching load status (0 is success). If `status` is None, match any status."""
load_table = self._dataset.table(self.schema.loads_table_name)
if status is not None:
status = [status] if isinstance(status, int) else status
load_table = load_table.filter(load_table["status"].isin(status))

return load_table.load_id.max() # type: ignore

def filter_by_load_ids(self, load_ids: Union[str, list[str]]) -> "ReadableIbisRelation":
"""Filter on matching `load_ids`."""
load_ids = [load_ids] if isinstance(load_ids, str) else load_ids
table = self._join_to_root_table()
return table.filter(table["_dlt_load_id"].isin(load_ids)) # type: ignore

def filter_by_latest_load_id(
self, status: Union[int, list[int], None] = 0
) -> "ReadableIbisRelation":
"""Filter on the most recent `load_id` with a specific load status.

If `status` is None, don't filter by status.
"""
table = self._join_to_root_table()
return table.filter(table["_dlt_load_id"] == self.latest_load_id(status=status)) # type: ignore

def filter_by_load_status(
self, status: Union[int, list[int], None] = 0
) -> "ReadableIbisRelation":
"""Filter to rows with a specific load status."""
if status is None:
return self

load_ids = self.list_load_ids(status=status)
table = self._join_to_root_table()
return table.filter(table["_dlt_load_id"].isin(load_ids)) # type: ignore

def filter_by_load_id_gt(
self, load_id: str, status: Union[int, list[int], None] = 0
) -> "ReadableIbisRelation":
load_table = self._dataset.table(self.schema.loads_table_name)

conditions = [load_table["load_id"] > load_id] # type: ignore
if status is not None:
status = [status] if isinstance(status, int) else status
conditions.append(load_table["status"].isin(status))
load_table = load_table.filter(*conditions)

table = self._join_to_root_table()
joined_table = table.inner_join(
load_table,
table["_dlt_load_id"] == load_table["load_id"],
)
return joined_table.select(table) # type: ignore

# forward ibis comparison and math operators
def __lt__(self, other: Any) -> "ReadableIbisRelation":
return self._proxy_expression_method("__lt__", other) # type: ignore
Expand Down
7 changes: 4 additions & 3 deletions dlt/helpers/ibis.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from dlt.destinations.sql_client import SqlClientBase

try:
import ibis # type: ignore
import ibis
import sqlglot
from ibis import BaseBackend, Expr
from ibis import BaseBackend
from ibis.expr.types import Table
except ModuleNotFoundError:
raise MissingDependencyException("dlt ibis helpers", ["ibis-framework"])

Expand Down Expand Up @@ -161,7 +162,7 @@ def _ignore_hstore(conn: Any, name: Any) -> Any:

def create_unbound_ibis_table(
sql_client: SqlClientBase[Any], schema: Schema, table_name: str
) -> Expr:
) -> Table:
"""Create an unbound ibis table from a dlt schema"""

if table_name not in schema.tables:
Expand Down
Loading
Loading