diff --git a/README.md b/README.md index 24642f4..5f8e299 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ The table below shows which PyIceberg features are currently available. | Feature | Supported | Link | Comment | |---|---|---|---| | Add existing files | ❌ | https://py.iceberg.apache.org/api/#add-files | Useful for existing partitions that users don't want to re-materialize/re-compute. | -| Schema evolution | ❌ | https://py.iceberg.apache.org/api/#schema-evolution | More complicated than e.g. delta lake since updates require diffing input table with existing Iceberg table. This is implemented by checking the schema of incoming data, dropping any columns that no longer exist in the data schema, and then using the `union_by_name()` method to merge the current schema with the table schema. | +| Schema evolution | ✅ | https://py.iceberg.apache.org/api/#schema-evolution | More complicated than e.g. delta lake since updates require diffing input table with existing Iceberg table. This is implemented by checking the schema of incoming data, dropping any columns that no longer exist in the data schema, and then using the `union_by_name()` method to merge the current schema with the table schema. Current implementation has a chance of creating a race condition when e.g. partition A tries to write to a table that has not yet processed a schema update | | Sort order | ❌ | https://shorturl.at/TycZN | These can be partitions but that's not necessary. Also, they require a transform. Easiest thing to do is to allow end-users to set this in metadata. | | PyIceberg commit retries | ✅ | https://github.com/apache/iceberg-python/pull/330 https://github.com/apache/iceberg-python/issues/269 | PR to add this to PyIceberg is open. Will probably be merged for an upcoming release. Added a custom retry function using Tenacity for the time being. | | Partition evolution | ✅ | https://py.iceberg.apache.org/api/#partition-evolution | Create, Update, Delete partitions by updating the Dagster partitions definition | diff --git a/examples/partitioned_dag.py b/examples/partitioned_dag.py index 427e798..d92b1cb 100644 --- a/examples/partitioned_dag.py +++ b/examples/partitioned_dag.py @@ -23,6 +23,7 @@ properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE} ), schema="dagster", + schema_update_mode="update", ) } @@ -37,10 +38,14 @@ partitions_def=partition, metadata={"partition_expr": "date"}, ) -def asset_1(): +def asset_1(context): + data = { - "date": [dt.datetime(2024, 10, i + 1, 0) for i in range(20)], + "date": [ + dt.datetime.strptime(context.partition_key, "%Y-%m-%d") for _ in range(20) + ], "values": np.random.normal(0, 1, 20).tolist(), + "category": ["A"] * 10 + ["B"] * 10, } return pa.Table.from_pydict(data)