Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/2372 extend dataset querying API for incremental load #2386

Open
wants to merge 13 commits into
base: devel
Choose a base branch
from
18 changes: 18 additions & 0 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,24 @@ def get_root_table(tables: TSchemaTables, table_name: str) -> TTableSchema:
return table


def get_root_to_table_chain(tables: TSchemaTables, table_name: str) -> List[TTableSchema]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think you need this anymore, do you?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove it for now, but we might want it if we want to support these joins when root_key=True is missing

"""Return a list of tables ordered from root to child using the (row_key - parent_key) references.

Similar functions:
- `get_root_table()` returns the root of the specified child instead of the full chain
- `get_nested_tables()` returns all children of the specified table as a chain
"""
chain: List[TTableSchema] = []

def _parent(t: TTableSchema) -> None:
chain.append(t)
if t.get("parent"):
_parent(tables[t["parent"]])

_parent(tables[table_name])
return chain[::-1]


def get_nested_tables(tables: TSchemaTables, table_name: str) -> List[TTableSchema]:
"""Get nested tables for table name and return a list of tables ordered by ancestry so the nested tables are always after their parents

Expand Down
27 changes: 27 additions & 0 deletions dlt/destinations/dataset/dataset.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import textwrap
from typing import Any, Union, TYPE_CHECKING, List

from dlt.common.json import json
Expand Down Expand Up @@ -128,11 +129,14 @@ def table(self, table_name: str) -> SupportsReadableRelation:
from dlt.helpers.ibis import create_unbound_ibis_table
from dlt.destinations.dataset.ibis_relation import ReadableIbisRelation

# TODO use the self.ibis() connection instead of `self.sql_client` and work against the Ibis Schema
# creating unbound tables would no longer be required and queries could be evaluated lazily
unbound_table = create_unbound_ibis_table(self.sql_client, self.schema, table_name)
return ReadableIbisRelation( # type: ignore[abstract]
readable_dataset=self,
ibis_object=unbound_table,
columns_schema=self.schema.tables[table_name]["columns"],
table_name=table_name,
)
except MissingDependencyException:
# if ibis is explicitly requested, reraise
Expand Down Expand Up @@ -171,6 +175,29 @@ def row_counts(
# Execute query and build result dict
return self(query)

def list_load_ids(self, status: Union[int, list[int]] = 0, limit: int = 10) -> list[str]:
"""Return the list most recent `load_id`s in descending order.

If no `load_id` is found, return empty list.
"""
status_value = (status,) if isinstance(status, int) else tuple(status)
# TODO protect from SQL injection
query = textwrap.dedent(f"""SELECT load_id
FROM {self.schema.loads_table_name}
WHERE status IN {status_value}
ORDER BY load_id DESC
LIMIT {limit}""")
results = self.__call__(query=query).fetchall()
return [row[0] for row in results]

def latest_load_id(self, status: Union[int, list[int]] = 0) -> Union[str, None]:
"""Return the latest `load_id`.

If no `load_id` is found, return None
"""
results = self.list_load_ids(status=status, limit=1)
return results[0] if len(results) > 0 else None

def __getitem__(self, table_name: str) -> SupportsReadableRelation:
"""access of table via dict notation"""
return self.table(table_name)
Expand Down
162 changes: 161 additions & 1 deletion dlt/destinations/dataset/ibis_relation.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any, Union, Sequence

from functools import partial

from dlt.common.exceptions import MissingDependencyException
from dlt.destinations.dataset.relation import BaseReadableDBAPIRelation
from dlt.common.schema import Schema
from dlt.common.schema.utils import get_root_table, get_root_to_table_chain
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.destinations.dataset.relation import BaseReadableDBAPIRelation


if TYPE_CHECKING:
Expand Down Expand Up @@ -54,11 +57,13 @@ def __init__(
readable_dataset: ReadableDBAPIDataset,
ibis_object: Any = None,
columns_schema: TTableSchemaColumns = None,
table_name: str = None,
) -> None:
"""Create a lazy evaluated relation to for the dataset of a destination"""
super().__init__(readable_dataset=readable_dataset)
self._ibis_object = ibis_object
self._columns_schema = columns_schema
self._table_name = table_name

def query(self) -> Any:
"""build the query"""
Expand All @@ -85,6 +90,10 @@ def columns_schema(self) -> TTableSchemaColumns:
def columns_schema(self, new_value: TTableSchemaColumns) -> None:
raise NotImplementedError("columns schema in ReadableDBAPIRelation can only be computed")

@property
def table_name(self) -> str:
return self._table_name

def compute_columns_schema(self) -> TTableSchemaColumns:
"""provide schema columns for the cursor, may be filtered by selected columns"""
# TODO: provide column lineage tracing with sqlglot lineage
Expand Down Expand Up @@ -173,6 +182,64 @@ def _get_filtered_columns_schema(self, columns: Sequence[str]) -> TTableSchemaCo
# here we just break the column schema inheritance chain
return None

def _filter_nested_table(self, filtered_root_table: Any) -> Any: # ibis.expr.types.Table
"""Filter the current table based on a filtered root table.

This takes the Ibis Table expression `filtered_root_table`, which has filtered rows,
and propagate the selection of `_dlt_id` on the root table via `row_key -> parent_key`
recursively.

To visualize it:
```
filtered_root_table = root.filter(root._dlt_load_id.isin(
["foo", "bar"]
))._dlt_id # root row_key

child2.filter(child2._dlt_parent_id.isin( # child2 parent_key
child1.filter(child1._dlt_parent_id.isin( # child1 parent_key
filtered_root_table
))._dlt_id # child1 row_key
))
```
Takes as input and returns a `ibis.expr.types.Table`
"""
from dlt.helpers.ibis import create_unbound_ibis_table

filtered_table = filtered_root_table
parent_row_key = None
for table in get_root_to_table_chain(self.schema.tables, self.table_name):
# the root table is already filtered, only set the parent_row_key
if parent_row_key is None:
parent_row_key = next(
col_name
for col_name, col in table["columns"].items()
if col.get("row_key") is True
)
continue

parent_key = None
row_key = None
for col_name, col in table["columns"].items():
if col.get("parent_key") is True:
parent_key = col_name
if col.get("row_key") is True:
row_key = col_name

# should always match a column because `get_root_to_table_chain()` returns tables based on parent_key / row_key
assert parent_key is not None
assert row_key is not None

ibis_table = create_unbound_ibis_table(
self.sql_client, schema=self.schema, table_name=table["name"]
)
filter_clause = ibis_table[parent_key].isin(filtered_table[parent_row_key])
# TODO the only operation to proxy if required
filtered_table = ibis_table.filter(filter_clause)

parent_row_key = row_key

return filtered_table

# forward ibis methods defined on interface
def limit(self, limit: int, **kwargs: Any) -> "ReadableIbisRelation":
"""limit the result to 'limit' items"""
Expand All @@ -186,6 +253,99 @@ def select(self, *columns: str) -> "ReadableIbisRelation":
"""set which columns will be selected"""
return self._proxy_expression_method("select", *columns) # type: ignore

def filter_by_load_ids(self, load_ids: Union[str, list[str]]) -> "ReadableIbisRelation":
"""Filter on matching `load_ids`."""
from dlt.helpers.ibis import create_unbound_ibis_table

load_ids = (
[
load_ids,
]
if isinstance(load_ids, str)
else load_ids
)

root_table = get_root_table(self.schema.tables, self.table_name)
ibis_root_table = create_unbound_ibis_table(
self.sql_client, schema=self.schema, table_name=root_table["name"]
)

# filter the root table
filtered_table = ibis_root_table.filter(ibis_root_table["_dlt_load_id"].isin(load_ids))
if root_table["name"] != self.table_name:
filtered_table = self._filter_nested_table(filtered_table)

# TODO use the proxies? it's a bit hard to do all that proxying
return self.__class__(
readable_dataset=self._dataset,
ibis_object=filtered_table,
columns_schema=self.columns_schema,
)

def filter_by_latest_load_id(self, status: Union[int, list[int]] = 0) -> "ReadableIbisRelation":
"""Filter on the most recent `load_id` with a specific status."""
from dlt.helpers.ibis import create_unbound_ibis_table

status = (
[
status,
]
if isinstance(status, int)
else status
)

root_table = get_root_table(self.schema.tables, self.table_name)
ibis_root_table = create_unbound_ibis_table(
self.sql_client, schema=self.schema, table_name=root_table["name"]
)
load_table = create_unbound_ibis_table(
self.sql_client, schema=self.schema, table_name=self.schema.loads_table_name
)

latest_load_id = load_table.filter(
load_table.status.isin(status)
).load_id.max() # lazy expression
filtered_table = ibis_root_table.filter(ibis_root_table["_dlt_load_id"] == latest_load_id)
if root_table["name"] != self.table_name:
filtered_table = self._filter_nested_table(filtered_table)

return self.__class__(
readable_dataset=self._dataset,
ibis_object=filtered_table,
columns_schema=self.columns_schema,
)

def filter_by_load_status(self, status: Union[int, list[int]] = 0) -> "ReadableIbisRelation":
""""""
from dlt.helpers.ibis import create_unbound_ibis_table

status = (
[
status,
]
if isinstance(status, int)
else status
)

root_table = get_root_table(self.schema.tables, self.table_name)
ibis_root_table = create_unbound_ibis_table(
self.sql_client, schema=self.schema, table_name=root_table["name"]
)
load_table = create_unbound_ibis_table(
self.sql_client, schema=self.schema, table_name=self.schema.loads_table_name
)

load_ids = load_table.filter(load_table.status.isin(status)).load_id # lazy expression
filtered_table = ibis_root_table.filter(ibis_root_table["_dlt_load_id"].isin(load_ids))
if root_table["name"] != self.table_name:
filtered_table = self._filter_nested_table(filtered_table)

return self.__class__(
readable_dataset=self._dataset,
ibis_object=filtered_table,
columns_schema=self.columns_schema,
)

# forward ibis comparison and math operators
def __lt__(self, other: Any) -> "ReadableIbisRelation":
return self._proxy_expression_method("__lt__", other) # type: ignore
Expand Down
Loading
Loading