diff --git a/src/dagster_pyiceberg/io_manager/arrow.py b/src/dagster_pyiceberg/io_manager/arrow.py index 881bba4..539d225 100644 --- a/src/dagster_pyiceberg/io_manager/arrow.py +++ b/src/dagster_pyiceberg/io_manager/arrow.py @@ -57,25 +57,29 @@ class IcebergPyarrowIOManager(_io_manager.IcebergIOManager): import pandas as pd import pyarrow as pa from dagster import Definitions, asset - from dagster_pyiceberg import IcebergPyarrowIOManager, IcebergSqlCatalogConfig + + from dagster_pyiceberg.config import IcebergCatalogConfig + from dagster_pyiceberg.io_manager.arrow import IcebergPyarrowIOManager CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db" CATALOG_WAREHOUSE = ( "file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse" ) + resources = { "io_manager": IcebergPyarrowIOManager( name="test", - config=IcebergSqlCatalogConfig( + config=IcebergCatalogConfig( properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE} ), schema="dagster", ) } + @asset - def iris_dataset() -> pd.DataFrame: + def iris_dataset() -> pa.Table: pa.Table.from_pandas( pd.read_csv( "https://docs.dagster.io/assets/iris.csv", @@ -89,6 +93,7 @@ def iris_dataset() -> pd.DataFrame: ) ) + defs = Definitions(assets=[iris_dataset], resources=resources) If you do not provide a schema, Dagster will determine a schema based on the assets and ops using diff --git a/src/dagster_pyiceberg/io_manager/daft.py b/src/dagster_pyiceberg/io_manager/daft.py index eb95268..4a1bc4d 100644 --- a/src/dagster_pyiceberg/io_manager/daft.py +++ b/src/dagster_pyiceberg/io_manager/daft.py @@ -53,29 +53,33 @@ class IcebergDaftIOManager(_io_manager.IcebergIOManager): Examples: .. code-block:: python + import daft as da import pandas as pd - import pyarrow as pa from dagster import Definitions, asset - from dagster_pyiceberg import IcebergPyarrowIOManager, IcebergSqlCatalogConfig + + from dagster_pyiceberg.config import IcebergCatalogConfig + from dagster_pyiceberg.io_manager.daft import IcebergDaftIOManager CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db" CATALOG_WAREHOUSE = ( "file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse" ) + resources = { - "io_manager": IcebergPyarrowIOManager( + "io_manager": IcebergDaftIOManager( name="test", - config=IcebergSqlCatalogConfig( + config=IcebergCatalogConfig( properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE} ), schema="dagster", ) } + @asset - def iris_dataset() -> pd.DataFrame: - pa.Table.from_pandas( + def iris_dataset() -> da.DataFrame: + return da.from_pandas( pd.read_csv( "https://docs.dagster.io/assets/iris.csv", names=[ @@ -88,6 +92,7 @@ def iris_dataset() -> pd.DataFrame: ) ) + defs = Definitions(assets=[iris_dataset], resources=resources) If you do not provide a schema, Dagster will determine a schema based on the assets and ops using diff --git a/src/dagster_pyiceberg/io_manager/polars.py b/src/dagster_pyiceberg/io_manager/polars.py index 51b89f1..018699c 100644 --- a/src/dagster_pyiceberg/io_manager/polars.py +++ b/src/dagster_pyiceberg/io_manager/polars.py @@ -63,28 +63,32 @@ class IcebergPolarsIOManager(_io_manager.IcebergIOManager): .. code-block:: python import pandas as pd - import pyarrow as pa + import polars as pl from dagster import Definitions, asset - from dagster_pyiceberg import IcebergPyarrowIOManager, IcebergSqlCatalogConfig + + from dagster_pyiceberg.config import IcebergCatalogConfig + from dagster_pyiceberg.io_manager.polars import IcebergPolarsIOManager CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db" CATALOG_WAREHOUSE = ( "file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse" ) + resources = { - "io_manager": IcebergPyarrowIOManager( + "io_manager": IcebergPolarsIOManager( name="test", - config=IcebergSqlCatalogConfig( + config=IcebergCatalogConfig( properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE} ), schema="dagster", ) } + @asset - def iris_dataset() -> pd.DataFrame: - pa.Table.from_pandas( + def iris_dataset() -> pl.DataFrame: + return pl.from_pandas( pd.read_csv( "https://docs.dagster.io/assets/iris.csv", names=[ @@ -97,6 +101,7 @@ def iris_dataset() -> pd.DataFrame: ) ) + defs = Definitions(assets=[iris_dataset], resources=resources) If you do not provide a schema, Dagster will determine a schema based on the assets and ops using