diff --git a/README.md b/README.md index da643a2..d946894 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,53 @@ This implementation is based on the [dagster-deltalake](https://github.com/dagst ## Usage -See 'examples' directory. +This library allows you to read from and write to Iceberg tables using PyIceberg. + +You need to configure an Iceberg Catalog backend for this. See the [PyIceberg documentation](https://py.iceberg.apache.org/configuration/#catalogs) for more information. + +Then, you can define the IO manager resource as follows: + +```python +from dagster_pyiceberg import IcebergPyarrowIOManager, IcebergSqlCatalogConfig + +CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/dag/warehouse/catalog.db" +CATALOG_WAREHOUSE = "file:///home/vscode/workspace/.tmp/dag/warehouse" + +resources = { + "io_manager": IcebergPyarrowIOManager( + name="test", + config=IcebergSqlCatalogConfig( + properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE} + ), + schema="dagster", + ) +} +``` + +You can also use the IO manager with partitioned assets: + +```python +from dagster import DailyPartitionsDefinition, Definitions, asset + +partition = DailyPartitionsDefinition( + start_date=dt.datetime(2024, 10, 1, 0, tzinfo=dt.timezone.utc), + end_date=dt.datetime(2024, 10, 30, 0, tzinfo=dt.timezone.utc), +) + + +@asset( + partitions_def=partition, + metadata={"partition_expr": "date"}, +) +def asset_1(): + data = { + "date": [dt.datetime(2024, 10, i + 1, 0) for i in range(20)], + "values": np.random.normal(0, 1, 20).tolist(), + } + return pa.Table.from_pydict(data) +``` + +For full examples, see 'examples' directory. ## Limitations @@ -36,6 +82,12 @@ The following engines are currently implemented. - arrow - pandas +## Development + +1. Clone repo +2. Set up the devcontainer +3. Run `just s` to install dependencies + ## To do - Add additional configuration options diff --git a/examples/howto.md b/examples/howto.md new file mode 100644 index 0000000..8127e94 --- /dev/null +++ b/examples/howto.md @@ -0,0 +1,13 @@ +# How to run these examples? + +1. Create a folder called '.dagster' in your home directory + +``` +mkdir -p .dagster +``` + +1. Launch dagster with one of the examples + +``` +DAGSTER_HOME=.dagster uv run dagster dev -f simple_dag.py +``` diff --git a/examples/partitioned_dag.py b/examples/partitioned_dag.py index 96ae8e4..427e798 100644 --- a/examples/partitioned_dag.py +++ b/examples/partitioned_dag.py @@ -1,9 +1,8 @@ import datetime as dt -import time import numpy as np import pyarrow as pa -from dagster import DailyPartitionsDefinition, Definitions, Jitter, RetryPolicy, asset +from dagster import DailyPartitionsDefinition, Definitions, asset from dagster_pyiceberg import IcebergPyarrowIOManager, IcebergSqlCatalogConfig from pyiceberg.catalog.sql import SqlCatalog @@ -37,17 +36,12 @@ @asset( partitions_def=partition, metadata={"partition_expr": "date"}, - # Concurrent writes to commit log with raise CommitFailedException - # See: https://github.com/apache/iceberg-python/issues/1084 - # Workaround: Retry the commit operation with some jitter - retry_policy=RetryPolicy(max_retries=3, delay=1, jitter=Jitter.PLUS_MINUS), ) def asset_1(): data = { "date": [dt.datetime(2024, 10, i + 1, 0) for i in range(20)], "values": np.random.normal(0, 1, 20).tolist(), } - time.sleep(30) return pa.Table.from_pydict(data)