Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/2372 extend dataset querying API for incremental load #2386

Open
wants to merge 13 commits into
base: devel
Choose a base branch
from

Conversation

zilto
Copy link
Collaborator

@zilto zilto commented Mar 6, 2025

Implements #2372

Description

This follows the specs. last was replaced by latest. Which is more consistent with the dlt language?

Currently, the internal of the ibis methods don't use proxying because the added layer make readability and debugging harder. I believe it's primarily meant for user-facing methods?

Notes

  • I notice that the Ibis relationships use unbound tables. Is there a reason why we're not directly interacting with tables via the ibis connection?
  • To achieve this feature I imagined 3 solutions:
    1. Nested filters (current solution): plan a filter on the root table, then use the filtered root table to filter the next table (using WHERE). This has the benefit of narrowing the result set early (e.g., select a single load_id), but nesting filters could reduce the ability of the backend to optimize the query plan
    2. Parallel filters: using the same filters, but instead of nesting them, you join them with and clause. You run the risk of handling much larger result sets, but the conditions can be evaluated in parallel and optimized by the query planner.
    3. Use inner joins: this would allow us to join the _dlt_load_id to the child table, which could be useful for downstream users. However, we have to be careful when selecting columns while doing many joins

@zilto zilto changed the title feat/23723 extend dataset querying API for incremental load feat/2372 extend dataset querying API for incremental load Mar 6, 2025
Copy link

netlify bot commented Mar 6, 2025

Deploy Preview for dlt-hub-docs failed. Why did it fail? →

Name Link
🔨 Latest commit e28c402
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/67d05b1551f8be00087eb96b

Copy link
Collaborator

@sh-rp sh-rp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR and good first approach! I think you can do it much easier, I left some comments. I understand that this proxy I have built is a bit hard to understand, but if you do it right you can just chain ibis expressions on to "self" and do not need to proxy anything. Also you don't need to process the full table chain for the derived tables.

@sh-rp
Copy link
Collaborator

sh-rp commented Mar 6, 2025

@zilto after working more on the transformations, I have a few changes to the original ticket:
we need one filter_by_load_id_gt(load_id: str, status: Union[int, List[int]] = 0) which should return all rows with a load id greater than the one provided, I need this for incremental loading.

@sh-rp
Copy link
Collaborator

sh-rp commented Mar 6, 2025

Even one more request: for all methods that filter by status, status should also be able to be "None", in that case no filtering on status happens and the loads table doesn't even get joined.

@@ -171,6 +213,78 @@ def row_counts(
# Execute query and build result dict
return self(query)

def list_load_ids(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above

# Case 1: if current table is root table, you can filter directly and exit early
dlt_root_table = get_root_table(self.schema.tables, self.table_name)
if dlt_root_table["name"] == self.table_name:
# doing ibis.expr.types.Table[COL_NAME] returns a column, but ReadableIbisRelation[COL_NAME] returns a table
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can access columns with self.columns(column_name) but I am not sure. In any case it would be nice to have a simple helper method for accessing columns so it is clearer what is going on.

Copy link
Collaborator

@sh-rp sh-rp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the further changes! I think this ticket is actually more complicated than I thought.

I think you can do something much shorter and nicer here. I will try to explain how I would do this and we can talk about it if it is unclear. In short, the resulting query that we want to have for brevity and performance reason would join the main table that we are querying to the loads table and then to ordering or filtering on the joined result. You do not want to be "joining" any table with "in" statements, this will be super slow. You can always inspect the result with self.query() to see what is going on. I would also create one function that joins the current table to the load table via any intermediary joins, you can probably make this recursive (meaning the function calls it self rather than going via a loop) but I am not sure if this will work. So the code would look something like this on the ReadableIbisRelation:

  • def _join_to_root_table(self) -> ReadableIbisRelation: This method returns self if "parent" on the schema of this table is None, or returns a relation with a join to the root table which we then can further use to join to the loads_table. This is the method that can probably be done in a recursive way.

  • def _join_to_loads_table(self) -> ReadableIbisRelation:: This method joins the current table to the loads table (it will also need to use the above methods) and return the result.

Now you can use the above methods to get an ibis relation that is joined to the loads table and do any other operations for the real task. You can filter on status, you can filter by load_id etc. and it will all operate on the joined table. The sql you want to have as an output would be something like this if you filter "table_name" by load_id greater than x via a root table. Actually you'll only need to join to the loads table if you need the status, if you don't, then joining to the root is enough.

SELECT table_name.col1, table_name.col2 FROM table_name JOIN root_table ON (table_name._dlt_root_id = root_table._dlt_load_id) JOIN _dlt_loads ON (root_table._dlt_load_id = _dlt_loads.load_id) WHERE _dlt_loads.load_id > x.

Let's not support the case where we don't have root_ids for now to make it easier, you can throw an exception with some help on how to create root ids.

@sh-rp sh-rp linked an issue Mar 10, 2025 that may be closed by this pull request
@zilto zilto requested a review from sh-rp March 11, 2025 13:16
@zilto
Copy link
Collaborator Author

zilto commented Mar 11, 2025

The change to ReadableIbisRelation.__getitem__ triggers mypy because it doesn't respect the superclass signature. Let me know how I should approach this. Test coverage is not perfect, but I didn't want this PR to be a blocker for longer

@@ -837,6 +864,24 @@ def get_root_table(tables: TSchemaTables, table_name: str) -> TTableSchema:
return table


def get_root_to_table_chain(tables: TSchemaTables, table_name: str) -> List[TTableSchema]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think you need this anymore, do you?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove it for now, but we might want it if we want to support these joins when root_key=True is missing

@@ -171,6 +169,40 @@ def row_counts(
# Execute query and build result dict
return self(query)

def list_load_ids(
self, status: Union[int, list[int], None] = 0, limit: Union[int, None] = 10
) -> SupportsReadableRelation:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be implemented as ibis expression and raise if ibis is not installed

# TODO setup another case that traverse parent-row keys to join nested tables without root_key
if root_key is None:
raise KeyError(
"ReadableIbisRelation requires a `root_key` hint to join non-root tables. "
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add a link to the transformation docs in the future for this exception.

load_table = self._dataset.table(self.schema.loads_table_name)
if status is not None:
status = [status] if isinstance(status, int) else status
load_table = load_table.filter(load_table["status"].isin(status))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be interesting to find out wether isin with a one item list is the same speed as filter with that value. I'd hope the sql optimizer would find this automatically. Just a thought.

Copy link
Collaborator Author

@zilto zilto Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the code uses a filter clause. For .isin() with subqueries (what I was using before), it should be converted to EXIST clauses (ref)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Extensions to ReadableDBAPIDataset and ReadableIbisRelation
2 participants