From edd8d3ea5940e5976f8aea6d6452a00effe09a25 Mon Sep 17 00:00:00 2001 From: nas Date: Sat, 1 Mar 2025 16:11:46 +0100 Subject: [PATCH 1/4] synchronise SupportPipeline and SupportPipelineRun protocols with current implementation and fix type hints for optional fields --- dlt/common/pipeline.py | 53 ++++++++++++++++++++--------------- dlt/common/schema/__init__.py | 4 +++ dlt/pipeline/pipeline.py | 26 ++++++++--------- 3 files changed, 47 insertions(+), 36 deletions(-) diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index 4f0e4f368d..6f5b576438 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -15,7 +15,6 @@ NamedTuple, Optional, Protocol, - Sequence, Tuple, TypeVar, Mapping, @@ -23,7 +22,7 @@ ) from typing_extensions import NotRequired -from dlt.common.typing import TypedDict +from dlt.common.typing import TypedDict, Unpack from dlt.common.configuration import configspec from dlt.common.configuration import known_sections from dlt.common.configuration.container import Container @@ -46,7 +45,7 @@ NormalizeMetrics, StepMetrics, ) -from dlt.common.schema import Schema +from dlt.common.schema import Schema, TAnySchemaColumns, TTableFormat from dlt.common.schema.typing import ( TColumnSchema, TWriteDispositionConfig, @@ -510,18 +509,21 @@ def get_local_state_val(self, key: str) -> Any: def run( self, - data: Any = None, + data: Optional[Any] = None, *, - destination: TDestinationReferenceArg = None, - dataset_name: str = None, - credentials: Any = None, - table_name: str = None, - write_disposition: TWriteDispositionConfig = None, - columns: Sequence[TColumnSchema] = None, - primary_key: TColumnNames = None, - schema: Schema = None, - loader_file_format: TLoaderFileFormat = None, - schema_contract: TSchemaContract = None, + destination: Optional[TDestinationReferenceArg] = None, + staging: Optional[TDestinationReferenceArg] = None, + dataset_name: Optional[str] = None, + credentials: Optional[Any] = None, + table_name: Optional[str] = None, + write_disposition: Optional[TWriteDispositionConfig] = None, + columns: Optional[TAnySchemaColumns] = None, + primary_key: Optional[TColumnNames] = None, + schema: Optional[Schema] = None, + loader_file_format: Optional[TLoaderFileFormat] = None, + table_format: Optional[TTableFormat] = None, + schema_contract: Optional[TSchemaContract] = None, + refresh: Optional[TRefreshMode] = None, ) -> LoadInfo: ... def _set_context(self, is_active: bool) -> None: @@ -534,16 +536,21 @@ def _make_schema_with_default_name(self) -> Schema: class SupportsPipelineRun(Protocol): def __call__( self, + data: Optional[Any] = None, *, - destination: TDestinationReferenceArg = None, - dataset_name: str = None, - credentials: Any = None, - table_name: str = None, - write_disposition: TWriteDispositionConfig = None, - columns: Sequence[TColumnSchema] = None, - schema: Schema = None, - loader_file_format: TLoaderFileFormat = None, - schema_contract: TSchemaContract = None, + destination: Optional[TDestinationReferenceArg] = None, + staging: Optional[TDestinationReferenceArg] = None, + dataset_name: Optional[str] = None, + credentials: Optional[Any] = None, + table_name: Optional[str] = None, + write_disposition: Optional[TWriteDispositionConfig] = None, + columns: Optional[TAnySchemaColumns] = None, + primary_key: Optional[TColumnNames] = None, + schema: Optional[Schema] = None, + loader_file_format: Optional[TLoaderFileFormat] = None, + table_format: Optional[TTableFormat] = None, + schema_contract: Optional[TSchemaContract] = None, + refresh: Optional[TRefreshMode] = None, ) -> LoadInfo: ... diff --git a/dlt/common/schema/__init__.py b/dlt/common/schema/__init__.py index 9cb5e2ab76..8533962aa0 100644 --- a/dlt/common/schema/__init__.py +++ b/dlt/common/schema/__init__.py @@ -8,6 +8,8 @@ TColumnHint, TColumnSchema, TColumnSchemaBase, + TAnySchemaColumns, + TTableFormat, ) from dlt.common.schema.typing import COLUMN_HINTS from dlt.common.schema.schema import Schema, DEFAULT_SCHEMA_CONTRACT_MODE @@ -23,6 +25,8 @@ "TColumnHint", "TColumnSchema", "TColumnSchemaBase", + "TAnySchemaColumns", + "TTableFormat", "COLUMN_HINTS", "Schema", "verify_schema_hash", diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 84d8c2c65b..6f1456c23b 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -617,20 +617,20 @@ def load( @with_config_section(("run",)) def run( self, - data: Any = None, + data: Optional[Any] = None, *, - destination: TDestinationReferenceArg = None, - staging: TDestinationReferenceArg = None, - dataset_name: str = None, - credentials: Any = None, - table_name: str = None, - write_disposition: TWriteDispositionConfig = None, - columns: TAnySchemaColumns = None, - primary_key: TColumnNames = None, - schema: Schema = None, - loader_file_format: TLoaderFileFormat = None, - table_format: TTableFormat = None, - schema_contract: TSchemaContract = None, + destination: Optional[TDestinationReferenceArg] = None, + staging: Optional[TDestinationReferenceArg] = None, + dataset_name: Optional[str] = None, + credentials: Optional[Any] = None, + table_name: Optional[str] = None, + write_disposition: Optional[TWriteDispositionConfig] = None, + columns: Optional[TAnySchemaColumns] = None, + primary_key: Optional[TColumnNames] = None, + schema: Optional[Schema] = None, + loader_file_format: Optional[TLoaderFileFormat] = None, + table_format: Optional[TTableFormat] = None, + schema_contract: Optional[TSchemaContract] = None, refresh: Optional[TRefreshMode] = None, ) -> LoadInfo: """Loads the data from `data` argument into the destination specified in `destination` and dataset specified in `dataset_name`. From 05109a77e2eaa929fa6f4f55506c32fa444f3ff3 Mon Sep 17 00:00:00 2001 From: nas Date: Sat, 1 Mar 2025 18:40:06 +0100 Subject: [PATCH 2/4] extract run arguments into a typed dictionary to always keep implementation and protocols in sync --- dlt/common/pipeline.py | 50 +++++++++++++++------------------------- dlt/pipeline/pipeline.py | 40 ++++++++++++++------------------ 2 files changed, 35 insertions(+), 55 deletions(-) diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index 6f5b576438..285bbfe22e 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -473,6 +473,22 @@ class TSourceState(TPipelineState): sources: Dict[str, Dict[str, Any]] # type: ignore[misc] +class SupportsPipelineRunArgs(TypedDict, total=False): + destination: Optional[TDestinationReferenceArg] + staging: Optional[TDestinationReferenceArg] + dataset_name: Optional[str] + credentials: Optional[Any] + table_name: Optional[str] + write_disposition: Optional[TWriteDispositionConfig] + columns: Optional[TAnySchemaColumns] + primary_key: Optional[TColumnNames] + schema: Optional[Schema] + loader_file_format: Optional[TLoaderFileFormat] + table_format: Optional[TTableFormat] + schema_contract: Optional[TSchemaContract] + refresh: Optional[TRefreshMode] + + class SupportsPipeline(Protocol): """A protocol with core pipeline operations that lets high level abstractions ie. sources to access pipeline methods and properties""" @@ -508,22 +524,7 @@ def get_local_state_val(self, key: str) -> Any: """Gets value from local state. Local state is not synchronized with destination.""" def run( - self, - data: Optional[Any] = None, - *, - destination: Optional[TDestinationReferenceArg] = None, - staging: Optional[TDestinationReferenceArg] = None, - dataset_name: Optional[str] = None, - credentials: Optional[Any] = None, - table_name: Optional[str] = None, - write_disposition: Optional[TWriteDispositionConfig] = None, - columns: Optional[TAnySchemaColumns] = None, - primary_key: Optional[TColumnNames] = None, - schema: Optional[Schema] = None, - loader_file_format: Optional[TLoaderFileFormat] = None, - table_format: Optional[TTableFormat] = None, - schema_contract: Optional[TSchemaContract] = None, - refresh: Optional[TRefreshMode] = None, + self, data: Optional[Any] = None, **kwargs: Unpack[SupportsPipelineRunArgs] ) -> LoadInfo: ... def _set_context(self, is_active: bool) -> None: @@ -535,22 +536,7 @@ def _make_schema_with_default_name(self) -> Schema: class SupportsPipelineRun(Protocol): def __call__( - self, - data: Optional[Any] = None, - *, - destination: Optional[TDestinationReferenceArg] = None, - staging: Optional[TDestinationReferenceArg] = None, - dataset_name: Optional[str] = None, - credentials: Optional[Any] = None, - table_name: Optional[str] = None, - write_disposition: Optional[TWriteDispositionConfig] = None, - columns: Optional[TAnySchemaColumns] = None, - primary_key: Optional[TColumnNames] = None, - schema: Optional[Schema] = None, - loader_file_format: Optional[TLoaderFileFormat] = None, - table_format: Optional[TTableFormat] = None, - schema_contract: Optional[TSchemaContract] = None, - refresh: Optional[TRefreshMode] = None, + self, data: Optional[Any] = None, **kwargs: Unpack[SupportsPipelineRunArgs] ) -> LoadInfo: ... diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 6f1456c23b..775987490b 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -17,7 +17,8 @@ ContextManager, Union, ) - +from typing_extensions import Unpack +from types import SimpleNamespace import dlt from dlt.common import logger from dlt.common.json import json @@ -84,6 +85,7 @@ LoadInfo, NormalizeInfo, PipelineContext, + SupportsPipelineRunArgs, TStepInfo, SupportsPipeline, TPipelineLocalState, @@ -618,20 +620,7 @@ def load( def run( self, data: Optional[Any] = None, - *, - destination: Optional[TDestinationReferenceArg] = None, - staging: Optional[TDestinationReferenceArg] = None, - dataset_name: Optional[str] = None, - credentials: Optional[Any] = None, - table_name: Optional[str] = None, - write_disposition: Optional[TWriteDispositionConfig] = None, - columns: Optional[TAnySchemaColumns] = None, - primary_key: Optional[TColumnNames] = None, - schema: Optional[Schema] = None, - loader_file_format: Optional[TLoaderFileFormat] = None, - table_format: Optional[TTableFormat] = None, - schema_contract: Optional[TSchemaContract] = None, - refresh: Optional[TRefreshMode] = None, + **kwargs: Unpack[SupportsPipelineRunArgs], ) -> LoadInfo: """Loads the data from `data` argument into the destination specified in `destination` and dataset specified in `dataset_name`. @@ -696,6 +685,11 @@ def run( Returns: LoadInfo: Information on loaded data including the list of package ids and failed job statuses. Please not that `dlt` will not raise if a single job terminally fails. Such information is provided via LoadInfo. """ + destination = kwargs.get("destination", None) + credentials = kwargs.get("credentials", None) + staging = kwargs.get("staging", None) + dataset_name = kwargs.get("dataset_name", None) + loader_file_format = kwargs.get("loader_file_format", None) signals.raise_if_signalled() self.activate() @@ -731,14 +725,14 @@ def run( if data is not None: self.extract( data, - table_name=table_name, - write_disposition=write_disposition, - columns=columns, - primary_key=primary_key, - schema=schema, - table_format=table_format, - schema_contract=schema_contract, - refresh=refresh or self.refresh, + table_name=kwargs.get("table_name", None), + write_disposition=kwargs.get("write_disposition", None), + columns=kwargs.get("columns", None), + primary_key=kwargs.get("primary_key", None), + schema=kwargs.get("schema", None), + table_format=kwargs.get("table_format", None), + schema_contract=kwargs.get("schema_contract", None), + refresh=kwargs.get("refresh", self.refresh), ) self.normalize(loader_file_format=loader_file_format) return self.load(destination, dataset_name, credentials=credentials) From e40360af0a85e42ff9c506a18e38e466289bf743 Mon Sep 17 00:00:00 2001 From: nas Date: Sat, 1 Mar 2025 18:55:15 +0100 Subject: [PATCH 3/4] remove Optional wrapper from data argument in run and __call__ --- dlt/common/pipeline.py | 8 ++------ dlt/pipeline/pipeline.py | 4 ++-- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index 285bbfe22e..2a3872c859 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -523,9 +523,7 @@ def set_local_state_val(self, key: str, value: Any) -> None: def get_local_state_val(self, key: str) -> Any: """Gets value from local state. Local state is not synchronized with destination.""" - def run( - self, data: Optional[Any] = None, **kwargs: Unpack[SupportsPipelineRunArgs] - ) -> LoadInfo: ... + def run(self, data: Any = None, **kwargs: Unpack[SupportsPipelineRunArgs]) -> LoadInfo: ... def _set_context(self, is_active: bool) -> None: """Called when pipeline context activated or deactivate""" @@ -535,9 +533,7 @@ def _make_schema_with_default_name(self) -> Schema: class SupportsPipelineRun(Protocol): - def __call__( - self, data: Optional[Any] = None, **kwargs: Unpack[SupportsPipelineRunArgs] - ) -> LoadInfo: ... + def __call__(self, data: Any = None, **kwargs: Unpack[SupportsPipelineRunArgs]) -> LoadInfo: ... @configspec diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 775987490b..5bebc8cb49 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -18,7 +18,7 @@ Union, ) from typing_extensions import Unpack -from types import SimpleNamespace + import dlt from dlt.common import logger from dlt.common.json import json @@ -619,7 +619,7 @@ def load( @with_config_section(("run",)) def run( self, - data: Optional[Any] = None, + data: Any = None, **kwargs: Unpack[SupportsPipelineRunArgs], ) -> LoadInfo: """Loads the data from `data` argument into the destination specified in `destination` and dataset specified in `dataset_name`. From 4824e0f11e8aa8e4b35214f6b65d9558345dd152 Mon Sep 17 00:00:00 2001 From: nas Date: Sun, 2 Mar 2025 20:18:55 +0100 Subject: [PATCH 4/4] add docstrings to SupportsPipelineRunArgs and extend run in __init__ to use those as well --- dlt/common/pipeline.py | 26 ++++++++++++++ dlt/pipeline/__init__.py | 76 +++++++--------------------------------- dlt/pipeline/pipeline.py | 40 --------------------- 3 files changed, 38 insertions(+), 104 deletions(-) diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index 2a3872c859..2c258ec8a6 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -475,18 +475,44 @@ class TSourceState(TPipelineState): class SupportsPipelineRunArgs(TypedDict, total=False): destination: Optional[TDestinationReferenceArg] + """A name of the destination to which dlt will load the data, or a destination module imported from `dlt.destination`. + If not provided, the value passed to `dlt.pipeline` will be used.""" staging: Optional[TDestinationReferenceArg] + """The staging dataset.""" dataset_name: Optional[str] + """A name of the dataset to which the data will be loaded. A dataset is a logical group of tables ie. `schema` in relational databases or folder grouping many files. + If not provided, the value passed to `dlt.pipeline` will be used. If not provided at all then defaults to the `pipeline_name`.""" credentials: Optional[Any] + """Credentials for the `destination` ie. database connection string or a dictionary with google cloud credentials. + In most cases should be set to None, which lets `dlt` to use `secrets.toml` or environment variables to infer right credentials values.""" table_name: Optional[str] + """The name of the table to which the data should be loaded within the `dataset`. This argument is required for a `data` that is a list/Iterable or Iterator without `__name__` attribute. + The behavior of this argument depends on the type of the `data`: + * generator functions - the function name is used as table name, `table_name` overrides this default + * `@dlt.resource` - resource contains the full table schema and that includes the table name. `table_name` will override this property. Use with care! + * `@dlt.source` - source contains several resources each with a table schema. `table_name` will override all table names within the source and load the data into single table.""" write_disposition: Optional[TWriteDispositionConfig] + """Controls how to write data to a table. Accepts a shorthand string literal or configuration dictionary. + Allowed shorthand string literals: `append` will always add new data at the end of the table. `replace` will replace existing data with new data. `skip` will prevent data from loading. "merge" will deduplicate and merge data based on "primary_key" and "merge_key" hints. Defaults to "append". + Write behaviour can be further customized through a configuration dictionary. For example, to obtain an SCD2 table provide `write_disposition={"disposition": "merge", "strategy": "scd2"}`. + Please note that in case of `dlt.resource` the table schema value will be overwritten and in case of `dlt.source`, the values in all resources will be overwritten.""" columns: Optional[TAnySchemaColumns] + """A list of column schemas. Typed dictionary describing column names, data types, write disposition and performance hints that gives you full control over the created table schema.""" primary_key: Optional[TColumnNames] + """A column name or a list of column names that comprise a private key. Typically used with "merge" write disposition to deduplicate loaded data.""" schema: Optional[Schema] + """An explicit `Schema` object in which all table schemas will be grouped. By default `dlt` takes the schema from the source (if passed in `data` argument) or creates a default one itself.""" loader_file_format: Optional[TLoaderFileFormat] + """The file format the loader will use to create the load package. Not all file_formats are compatible with all destinations. Defaults to the preferred file format of the selected destination.""" table_format: Optional[TTableFormat] + """The table format used by the destination to store tables. Currently you can select table format on filesystem and Athena destinations.""" schema_contract: Optional[TSchemaContract] + """On override for the schema contract settings, this will replace the schema contract settings for all tables in the schema. Defaults to None.""" refresh: Optional[TRefreshMode] + """Fully or partially reset sources before loading new data in this run. The following refresh modes are supported: + * `drop_sources` - Drop tables and source and resource state for all sources currently being processed in `run` or `extract` methods of the pipeline. (Note: schema history is erased) + * `drop_resources`- Drop tables and resource state for all resources being processed. Source level state is not modified. (Note: schema history is erased) + * `drop_data` - Wipe all data and resource state for all resources being processed. Schema is not modified.""" class SupportsPipeline(Protocol): diff --git a/dlt/pipeline/__init__.py b/dlt/pipeline/__init__.py index 175bee33b3..37afe832ae 100644 --- a/dlt/pipeline/__init__.py +++ b/dlt/pipeline/__init__.py @@ -10,12 +10,18 @@ TSchemaContract, ) -from dlt.common.typing import TSecretStrValue, Any +from dlt.common.typing import TSecretStrValue, Any, Unpack from dlt.common.configuration import with_config from dlt.common.configuration.container import Container from dlt.common.configuration.inject import get_orig_args, last_config from dlt.common.destination import TLoaderFileFormat, Destination, TDestinationReferenceArg -from dlt.common.pipeline import LoadInfo, PipelineContext, get_dlt_pipelines_dir, TRefreshMode +from dlt.common.pipeline import ( + LoadInfo, + PipelineContext, + get_dlt_pipelines_dir, + TRefreshMode, + SupportsPipelineRunArgs, +) from dlt.common.runtime import apply_runtime_config, init_telemetry from dlt.pipeline.configuration import PipelineConfiguration, ensure_correct_pipeline_kwargs @@ -235,18 +241,7 @@ def attach( def run( data: Any, - *, - destination: TDestinationReferenceArg = None, - staging: TDestinationReferenceArg = None, - dataset_name: str = None, - table_name: str = None, - write_disposition: TWriteDispositionConfig = None, - columns: Sequence[TColumnSchema] = None, - schema: Schema = None, - loader_file_format: TLoaderFileFormat = None, - table_format: TTableFormat = None, - schema_contract: TSchemaContract = None, - refresh: Optional[TRefreshMode] = None, + **kwargs: Unpack[SupportsPipelineRunArgs], ) -> LoadInfo: """Loads the data in `data` argument into the destination specified in `destination` and dataset specified in `dataset_name`. @@ -266,61 +261,14 @@ def run( Next it will make sure that data from the previous is fully processed. If not, `run` method normalizes and loads pending data items. Only then the new data from `data` argument is extracted, normalized and loaded. - Args: - data (Any): Data to be loaded to destination - - destination (str | DestinationReference, optional): A name of the destination to which dlt will load the data, or a destination module imported from `dlt.destination`. - If not provided, the value passed to `dlt.pipeline` will be used. - - dataset_name (str, optional): A name of the dataset to which the data will be loaded. A dataset is a logical group of tables ie. `schema` in relational databases or folder grouping many files. - If not provided, the value passed to `dlt.pipeline` will be used. If not provided at all then defaults to the `pipeline_name` - - table_name (str, optional): The name of the table to which the data should be loaded within the `dataset`. This argument is required for a `data` that is a list/Iterable or Iterator without `__name__` attribute. - The behavior of this argument depends on the type of the `data`: - * generator functions: the function name is used as table name, `table_name` overrides this default - * `@dlt.resource`: resource contains the full table schema and that includes the table name. `table_name` will override this property. Use with care! - * `@dlt.source`: source contains several resources each with a table schema. `table_name` will override all table names within the source and load the data into single table. - - write_disposition (TWriteDispositionConfig, optional): Controls how to write data to a table. Accepts a shorthand string literal or configuration dictionary. - Allowed shorthand string literals: `append` will always add new data at the end of the table. `replace` will replace existing data with new data. `skip` will prevent data from loading. "merge" will deduplicate and merge data based on "primary_key" and "merge_key" hints. Defaults to "append". - Write behaviour can be further customized through a configuration dictionary. For example, to obtain an SCD2 table provide `write_disposition={"disposition": "merge", "strategy": "scd2"}`. - Please note that in case of `dlt.resource` the table schema value will be overwritten and in case of `dlt.source`, the values in all resources will be overwritten. - - columns (Sequence[TColumnSchema], optional): A list of column schemas. Typed dictionary describing column names, data types, write disposition and performance hints that gives you full control over the created table schema. - - schema (Schema, optional): An explicit `Schema` object in which all table schemas will be grouped. By default `dlt` takes the schema from the source (if passed in `data` argument) or creates a default one itself. - - loader_file_format (Literal["jsonl", "insert_values", "parquet"], optional): The file format the loader will use to create the load package. Not all file_formats are compatible with all destinations. Defaults to the preferred file format of the selected destination. - - table_format (Literal["delta", "iceberg"], optional): The table format used by the destination to store tables. Currently you can select table format on filesystem and Athena destinations. - - schema_contract (TSchemaContract, optional): On override for the schema contract settings, this will replace the schema contract settings for all tables in the schema. Defaults to None. - - refresh (str | TRefreshMode): Fully or partially reset sources before loading new data in this run. The following refresh modes are supported: - * `drop_sources`: Drop tables and source and resource state for all sources currently being processed in `run` or `extract` methods of the pipeline. (Note: schema history is erased) - * `drop_resources`: Drop tables and resource state for all resources being processed. Source level state is not modified. (Note: schema history is erased) - * `drop_data`: Wipe all data and resource state for all resources being processed. Schema is not modified. - Raises: PipelineStepFailed: when a problem happened during `extract`, `normalize` or `load` steps. Returns: LoadInfo: Information on loaded data including the list of package ids and failed job statuses. Please not that `dlt` will not raise if a single job terminally fails. Such information is provided via LoadInfo. """ - destination = Destination.from_reference(destination) - return pipeline().run( - data, - destination=destination, - staging=staging, - dataset_name=dataset_name, - table_name=table_name, - write_disposition=write_disposition, - columns=columns, - schema=schema, - loader_file_format=loader_file_format, - table_format=table_format, - schema_contract=schema_contract, - refresh=refresh, - ) + destination = Destination.from_reference(kwargs.pop("destination", None)) + kwargs["destination"] = destination + return pipeline().run(data, **kwargs) # plug default tracking module diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 5bebc8cb49..b38169c097 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -640,46 +640,6 @@ def run( Next it will make sure that data from the previous is fully processed. If not, `run` method normalizes, loads pending data items and **exits** If there was no pending data, new data from `data` argument is extracted, normalized and loaded. - Args: - data (Any): Data to be loaded to destination - - destination (str | DestinationReference, optional): A name of the destination to which dlt will load the data, or a destination module imported from `dlt.destination`. - If not provided, the value passed to `dlt.pipeline` will be used. - - dataset_name (str, optional): A name of the dataset to which the data will be loaded. A dataset is a logical group of tables ie. `schema` in relational databases or folder grouping many files. - If not provided, the value passed to `dlt.pipeline` will be used. If not provided at all then defaults to the `pipeline_name` - - credentials (Any, optional): Credentials for the `destination` ie. database connection string or a dictionary with google cloud credentials. - In most cases should be set to None, which lets `dlt` to use `secrets.toml` or environment variables to infer right credentials values. - - table_name (str, optional): The name of the table to which the data should be loaded within the `dataset`. This argument is required for a `data` that is a list/Iterable or Iterator without `__name__` attribute. - The behavior of this argument depends on the type of the `data`: - * generator functions - the function name is used as table name, `table_name` overrides this default - * `@dlt.resource` - resource contains the full table schema and that includes the table name. `table_name` will override this property. Use with care! - * `@dlt.source` - source contains several resources each with a table schema. `table_name` will override all table names within the source and load the data into single table. - - write_disposition (TWriteDispositionConfig, optional): Controls how to write data to a table. Accepts a shorthand string literal or configuration dictionary. - Allowed shorthand string literals: `append` will always add new data at the end of the table. `replace` will replace existing data with new data. `skip` will prevent data from loading. "merge" will deduplicate and merge data based on "primary_key" and "merge_key" hints. Defaults to "append". - Write behaviour can be further customized through a configuration dictionary. For example, to obtain an SCD2 table provide `write_disposition={"disposition": "merge", "strategy": "scd2"}`. - Please note that in case of `dlt.resource` the table schema value will be overwritten and in case of `dlt.source`, the values in all resources will be overwritten. - - columns (Sequence[TColumnSchema], optional): A list of column schemas. Typed dictionary describing column names, data types, write disposition and performance hints that gives you full control over the created table schema. - - primary_key (str | Sequence[str]): A column name or a list of column names that comprise a private key. Typically used with "merge" write disposition to deduplicate loaded data. - - schema (Schema, optional): An explicit `Schema` object in which all table schemas will be grouped. By default `dlt` takes the schema from the source (if passed in `data` argument) or creates a default one itself. - - loader_file_format (Literal["jsonl", "insert_values", "parquet"], optional): The file format the loader will use to create the load package. Not all file_formats are compatible with all destinations. Defaults to the preferred file format of the selected destination. - - table_format (Literal["delta", "iceberg"], optional): The table format used by the destination to store tables. Currently you can select table format on filesystem and Athena destinations. - - schema_contract (TSchemaContract, optional): On override for the schema contract settings, this will replace the schema contract settings for all tables in the schema. Defaults to None. - - refresh (str | TRefreshMode): Fully or partially reset sources before loading new data in this run. The following refresh modes are supported: - * `drop_sources` - Drop tables and source and resource state for all sources currently being processed in `run` or `extract` methods of the pipeline. (Note: schema history is erased) - * `drop_resources`- Drop tables and resource state for all resources being processed. Source level state is not modified. (Note: schema history is erased) - * `drop_data` - Wipe all data and resource state for all resources being processed. Schema is not modified. - Raises: PipelineStepFailed: when a problem happened during `extract`, `normalize` or `load` steps. Returns: