Skip to content

Commit

Permalink
chore: update examples
Browse files Browse the repository at this point in the history
  • Loading branch information
JasperHG90 committed Oct 20, 2024
1 parent d96a999 commit 51b299d
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 8 deletions.
54 changes: 53 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,53 @@ This implementation is based on the [dagster-deltalake](https://github.com/dagst

## Usage

See 'examples' directory.
This library allows you to read from and write to Iceberg tables using PyIceberg.

You need to configure an Iceberg Catalog backend for this. See the [PyIceberg documentation](https://py.iceberg.apache.org/configuration/#catalogs) for more information.

Then, you can define the IO manager resource as follows:

```python
from dagster_pyiceberg import IcebergPyarrowIOManager, IcebergSqlCatalogConfig

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/dag/warehouse/catalog.db"
CATALOG_WAREHOUSE = "file:///home/vscode/workspace/.tmp/dag/warehouse"

resources = {
"io_manager": IcebergPyarrowIOManager(
name="test",
config=IcebergSqlCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
schema="dagster",
)
}
```

You can also use the IO manager with partitioned assets:

```python
from dagster import DailyPartitionsDefinition, Definitions, asset

partition = DailyPartitionsDefinition(
start_date=dt.datetime(2024, 10, 1, 0, tzinfo=dt.timezone.utc),
end_date=dt.datetime(2024, 10, 30, 0, tzinfo=dt.timezone.utc),
)


@asset(
partitions_def=partition,
metadata={"partition_expr": "date"},
)
def asset_1():
data = {
"date": [dt.datetime(2024, 10, i + 1, 0) for i in range(20)],
"values": np.random.normal(0, 1, 20).tolist(),
}
return pa.Table.from_pydict(data)
```

For full examples, see 'examples' directory.

## Limitations

Expand Down Expand Up @@ -36,6 +82,12 @@ The following engines are currently implemented.
- arrow
- pandas

## Development

1. Clone repo
2. Set up the devcontainer
3. Run `just s` to install dependencies

## To do

- Add additional configuration options
Expand Down
13 changes: 13 additions & 0 deletions examples/howto.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# How to run these examples?

1. Create a folder called '.dagster' in your home directory

```
mkdir -p .dagster
```

1. Launch dagster with one of the examples

```
DAGSTER_HOME=.dagster uv run dagster dev -f simple_dag.py
```
8 changes: 1 addition & 7 deletions examples/partitioned_dag.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import datetime as dt
import time

import numpy as np
import pyarrow as pa
from dagster import DailyPartitionsDefinition, Definitions, Jitter, RetryPolicy, asset
from dagster import DailyPartitionsDefinition, Definitions, asset
from dagster_pyiceberg import IcebergPyarrowIOManager, IcebergSqlCatalogConfig
from pyiceberg.catalog.sql import SqlCatalog

Expand Down Expand Up @@ -37,17 +36,12 @@
@asset(
partitions_def=partition,
metadata={"partition_expr": "date"},
# Concurrent writes to commit log with raise CommitFailedException
# See: https://github.com/apache/iceberg-python/issues/1084
# Workaround: Retry the commit operation with some jitter
retry_policy=RetryPolicy(max_retries=3, delay=1, jitter=Jitter.PLUS_MINUS),
)
def asset_1():
data = {
"date": [dt.datetime(2024, 10, i + 1, 0) for i in range(20)],
"values": np.random.normal(0, 1, 20).tolist(),
}
time.sleep(30)
return pa.Table.from_pydict(data)


Expand Down

0 comments on commit 51b299d

Please sign in to comment.