Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

synchronise pipeline protocol definitions and fix signature to use proper typing #2364

Open
wants to merge 4 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 46 additions & 31 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
NamedTuple,
Optional,
Protocol,
Sequence,
Tuple,
TypeVar,
Mapping,
Literal,
)
from typing_extensions import NotRequired

from dlt.common.typing import TypedDict
from dlt.common.typing import TypedDict, Unpack
from dlt.common.configuration import configspec
from dlt.common.configuration import known_sections
from dlt.common.configuration.container import Container
Expand All @@ -46,7 +45,7 @@
NormalizeMetrics,
StepMetrics,
)
from dlt.common.schema import Schema
from dlt.common.schema import Schema, TAnySchemaColumns, TTableFormat
from dlt.common.schema.typing import (
TColumnSchema,
TWriteDispositionConfig,
Expand Down Expand Up @@ -474,6 +473,48 @@ class TSourceState(TPipelineState):
sources: Dict[str, Dict[str, Any]] # type: ignore[misc]


class SupportsPipelineRunArgs(TypedDict, total=False):
destination: Optional[TDestinationReferenceArg]
"""A name of the destination to which dlt will load the data, or a destination module imported from `dlt.destination`.
If not provided, the value passed to `dlt.pipeline` will be used."""
staging: Optional[TDestinationReferenceArg]
"""The staging dataset."""
dataset_name: Optional[str]
"""A name of the dataset to which the data will be loaded. A dataset is a logical group of tables ie. `schema` in relational databases or folder grouping many files.
If not provided, the value passed to `dlt.pipeline` will be used. If not provided at all then defaults to the `pipeline_name`."""
credentials: Optional[Any]
"""Credentials for the `destination` ie. database connection string or a dictionary with google cloud credentials.
In most cases should be set to None, which lets `dlt` to use `secrets.toml` or environment variables to infer right credentials values."""
table_name: Optional[str]
"""The name of the table to which the data should be loaded within the `dataset`. This argument is required for a `data` that is a list/Iterable or Iterator without `__name__` attribute.
The behavior of this argument depends on the type of the `data`:
* generator functions - the function name is used as table name, `table_name` overrides this default
* `@dlt.resource` - resource contains the full table schema and that includes the table name. `table_name` will override this property. Use with care!
* `@dlt.source` - source contains several resources each with a table schema. `table_name` will override all table names within the source and load the data into single table."""
write_disposition: Optional[TWriteDispositionConfig]
"""Controls how to write data to a table. Accepts a shorthand string literal or configuration dictionary.
Allowed shorthand string literals: `append` will always add new data at the end of the table. `replace` will replace existing data with new data. `skip` will prevent data from loading. "merge" will deduplicate and merge data based on "primary_key" and "merge_key" hints. Defaults to "append".
Write behaviour can be further customized through a configuration dictionary. For example, to obtain an SCD2 table provide `write_disposition={"disposition": "merge", "strategy": "scd2"}`.
Please note that in case of `dlt.resource` the table schema value will be overwritten and in case of `dlt.source`, the values in all resources will be overwritten."""
columns: Optional[TAnySchemaColumns]
"""A list of column schemas. Typed dictionary describing column names, data types, write disposition and performance hints that gives you full control over the created table schema."""
primary_key: Optional[TColumnNames]
"""A column name or a list of column names that comprise a private key. Typically used with "merge" write disposition to deduplicate loaded data."""
schema: Optional[Schema]
"""An explicit `Schema` object in which all table schemas will be grouped. By default `dlt` takes the schema from the source (if passed in `data` argument) or creates a default one itself."""
loader_file_format: Optional[TLoaderFileFormat]
"""The file format the loader will use to create the load package. Not all file_formats are compatible with all destinations. Defaults to the preferred file format of the selected destination."""
table_format: Optional[TTableFormat]
"""The table format used by the destination to store tables. Currently you can select table format on filesystem and Athena destinations."""
schema_contract: Optional[TSchemaContract]
"""On override for the schema contract settings, this will replace the schema contract settings for all tables in the schema. Defaults to None."""
refresh: Optional[TRefreshMode]
"""Fully or partially reset sources before loading new data in this run. The following refresh modes are supported:
* `drop_sources` - Drop tables and source and resource state for all sources currently being processed in `run` or `extract` methods of the pipeline. (Note: schema history is erased)
* `drop_resources`- Drop tables and resource state for all resources being processed. Source level state is not modified. (Note: schema history is erased)
* `drop_data` - Wipe all data and resource state for all resources being processed. Schema is not modified."""


class SupportsPipeline(Protocol):
"""A protocol with core pipeline operations that lets high level abstractions ie. sources to access pipeline methods and properties"""

Expand Down Expand Up @@ -508,21 +549,7 @@ def set_local_state_val(self, key: str, value: Any) -> None:
def get_local_state_val(self, key: str) -> Any:
"""Gets value from local state. Local state is not synchronized with destination."""

def run(
self,
data: Any = None,
*,
destination: TDestinationReferenceArg = None,
dataset_name: str = None,
credentials: Any = None,
table_name: str = None,
write_disposition: TWriteDispositionConfig = None,
columns: Sequence[TColumnSchema] = None,
primary_key: TColumnNames = None,
schema: Schema = None,
loader_file_format: TLoaderFileFormat = None,
schema_contract: TSchemaContract = None,
) -> LoadInfo: ...
def run(self, data: Any = None, **kwargs: Unpack[SupportsPipelineRunArgs]) -> LoadInfo: ...

def _set_context(self, is_active: bool) -> None:
"""Called when pipeline context activated or deactivate"""
Expand All @@ -532,19 +559,7 @@ def _make_schema_with_default_name(self) -> Schema:


class SupportsPipelineRun(Protocol):
def __call__(
self,
*,
destination: TDestinationReferenceArg = None,
dataset_name: str = None,
credentials: Any = None,
table_name: str = None,
write_disposition: TWriteDispositionConfig = None,
columns: Sequence[TColumnSchema] = None,
schema: Schema = None,
loader_file_format: TLoaderFileFormat = None,
schema_contract: TSchemaContract = None,
) -> LoadInfo: ...
def __call__(self, data: Any = None, **kwargs: Unpack[SupportsPipelineRunArgs]) -> LoadInfo: ...


@configspec
Expand Down
4 changes: 4 additions & 0 deletions dlt/common/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
TColumnHint,
TColumnSchema,
TColumnSchemaBase,
TAnySchemaColumns,
TTableFormat,
)
from dlt.common.schema.typing import COLUMN_HINTS
from dlt.common.schema.schema import Schema, DEFAULT_SCHEMA_CONTRACT_MODE
Expand All @@ -23,6 +25,8 @@
"TColumnHint",
"TColumnSchema",
"TColumnSchemaBase",
"TAnySchemaColumns",
"TTableFormat",
"COLUMN_HINTS",
"Schema",
"verify_schema_hash",
Expand Down
76 changes: 12 additions & 64 deletions dlt/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@
TSchemaContract,
)

from dlt.common.typing import TSecretStrValue, Any
from dlt.common.typing import TSecretStrValue, Any, Unpack
from dlt.common.configuration import with_config
from dlt.common.configuration.container import Container
from dlt.common.configuration.inject import get_orig_args, last_config
from dlt.common.destination import TLoaderFileFormat, Destination, TDestinationReferenceArg
from dlt.common.pipeline import LoadInfo, PipelineContext, get_dlt_pipelines_dir, TRefreshMode
from dlt.common.pipeline import (
LoadInfo,
PipelineContext,
get_dlt_pipelines_dir,
TRefreshMode,
SupportsPipelineRunArgs,
)
from dlt.common.runtime import apply_runtime_config, init_telemetry

from dlt.pipeline.configuration import PipelineConfiguration, ensure_correct_pipeline_kwargs
Expand Down Expand Up @@ -235,18 +241,7 @@ def attach(

def run(
data: Any,
*,
destination: TDestinationReferenceArg = None,
staging: TDestinationReferenceArg = None,
dataset_name: str = None,
table_name: str = None,
write_disposition: TWriteDispositionConfig = None,
columns: Sequence[TColumnSchema] = None,
schema: Schema = None,
loader_file_format: TLoaderFileFormat = None,
table_format: TTableFormat = None,
schema_contract: TSchemaContract = None,
refresh: Optional[TRefreshMode] = None,
**kwargs: Unpack[SupportsPipelineRunArgs],
) -> LoadInfo:
"""Loads the data in `data` argument into the destination specified in `destination` and dataset specified in `dataset_name`.

Expand All @@ -266,61 +261,14 @@ def run(
Next it will make sure that data from the previous is fully processed. If not, `run` method normalizes and loads pending data items.
Only then the new data from `data` argument is extracted, normalized and loaded.

Args:
data (Any): Data to be loaded to destination

destination (str | DestinationReference, optional): A name of the destination to which dlt will load the data, or a destination module imported from `dlt.destination`.
If not provided, the value passed to `dlt.pipeline` will be used.

dataset_name (str, optional): A name of the dataset to which the data will be loaded. A dataset is a logical group of tables ie. `schema` in relational databases or folder grouping many files.
If not provided, the value passed to `dlt.pipeline` will be used. If not provided at all then defaults to the `pipeline_name`

table_name (str, optional): The name of the table to which the data should be loaded within the `dataset`. This argument is required for a `data` that is a list/Iterable or Iterator without `__name__` attribute.
The behavior of this argument depends on the type of the `data`:
* generator functions: the function name is used as table name, `table_name` overrides this default
* `@dlt.resource`: resource contains the full table schema and that includes the table name. `table_name` will override this property. Use with care!
* `@dlt.source`: source contains several resources each with a table schema. `table_name` will override all table names within the source and load the data into single table.

write_disposition (TWriteDispositionConfig, optional): Controls how to write data to a table. Accepts a shorthand string literal or configuration dictionary.
Allowed shorthand string literals: `append` will always add new data at the end of the table. `replace` will replace existing data with new data. `skip` will prevent data from loading. "merge" will deduplicate and merge data based on "primary_key" and "merge_key" hints. Defaults to "append".
Write behaviour can be further customized through a configuration dictionary. For example, to obtain an SCD2 table provide `write_disposition={"disposition": "merge", "strategy": "scd2"}`.
Please note that in case of `dlt.resource` the table schema value will be overwritten and in case of `dlt.source`, the values in all resources will be overwritten.

columns (Sequence[TColumnSchema], optional): A list of column schemas. Typed dictionary describing column names, data types, write disposition and performance hints that gives you full control over the created table schema.

schema (Schema, optional): An explicit `Schema` object in which all table schemas will be grouped. By default `dlt` takes the schema from the source (if passed in `data` argument) or creates a default one itself.

loader_file_format (Literal["jsonl", "insert_values", "parquet"], optional): The file format the loader will use to create the load package. Not all file_formats are compatible with all destinations. Defaults to the preferred file format of the selected destination.

table_format (Literal["delta", "iceberg"], optional): The table format used by the destination to store tables. Currently you can select table format on filesystem and Athena destinations.

schema_contract (TSchemaContract, optional): On override for the schema contract settings, this will replace the schema contract settings for all tables in the schema. Defaults to None.

refresh (str | TRefreshMode): Fully or partially reset sources before loading new data in this run. The following refresh modes are supported:
* `drop_sources`: Drop tables and source and resource state for all sources currently being processed in `run` or `extract` methods of the pipeline. (Note: schema history is erased)
* `drop_resources`: Drop tables and resource state for all resources being processed. Source level state is not modified. (Note: schema history is erased)
* `drop_data`: Wipe all data and resource state for all resources being processed. Schema is not modified.

Raises:
PipelineStepFailed: when a problem happened during `extract`, `normalize` or `load` steps.
Returns:
LoadInfo: Information on loaded data including the list of package ids and failed job statuses. Please not that `dlt` will not raise if a single job terminally fails. Such information is provided via LoadInfo.
"""
destination = Destination.from_reference(destination)
return pipeline().run(
data,
destination=destination,
staging=staging,
dataset_name=dataset_name,
table_name=table_name,
write_disposition=write_disposition,
columns=columns,
schema=schema,
loader_file_format=loader_file_format,
table_format=table_format,
schema_contract=schema_contract,
refresh=refresh,
)
destination = Destination.from_reference(kwargs.pop("destination", None))
kwargs["destination"] = destination
return pipeline().run(data, **kwargs)


# plug default tracking module
Expand Down
Loading
Loading