diff --git a/packages/dagster_pyiceberg/src/dagster_pyiceberg/handler.py b/packages/dagster_pyiceberg/src/dagster_pyiceberg/handler.py index e02a06c..ec2a526 100644 --- a/packages/dagster_pyiceberg/src/dagster_pyiceberg/handler.py +++ b/packages/dagster_pyiceberg/src/dagster_pyiceberg/handler.py @@ -371,7 +371,7 @@ def update_table_spec(self, table: table.Table): and self.number_of_table_spec_changes > 0 ): raise ValueError( - "Schema update mode is set to 'error' but there are schema changes to the Iceberg table" + "Partition spec update mode is set to 'error' but there are partition spec changes to the Iceberg table" ) # If there are no changes, do nothing elif self.number_of_table_spec_changes == 0: @@ -505,12 +505,6 @@ def _table_writer( else: row_filter = iceberg_table.ALWAYS_TRUE - # An overwrite may produce zero or more snapshots based on the operation: - - # DELETE: In case existing Parquet files can be dropped completely. - # REPLACE: In case existing Parquet files need to be rewritten. - # APPEND: In case new data is being inserted into the table. - # TODO: use some sort of retry mechanism here # See: https://github.com/apache/iceberg-python/pull/330 # See: https://github.com/apache/iceberg-python/issues/269 @@ -548,6 +542,11 @@ def _overwrite_table_with_retries( with retry: try: with table.transaction() as tx: + # An overwrite may produce zero or more snapshots based on the operation: + + # DELETE: In case existing Parquet files can be dropped completely. + # REPLACE: In case existing Parquet files need to be rewritten. + # APPEND: In case new data is being inserted into the table. tx.overwrite(df=df, overwrite_filter=overwrite_filter) tx.commit_transaction() except CommitFailedException: