diff --git a/packages/dagster_pyiceberg/src/dagster_pyiceberg/handler.py b/packages/dagster_pyiceberg/src/dagster_pyiceberg/handler.py index b13e95c..13bd9cf 100644 --- a/packages/dagster_pyiceberg/src/dagster_pyiceberg/handler.py +++ b/packages/dagster_pyiceberg/src/dagster_pyiceberg/handler.py @@ -32,8 +32,10 @@ from pyiceberg import types as T from pyiceberg.catalog.rest import RestCatalog from pyiceberg.catalog.sql import SqlCatalog +from pyiceberg.exceptions import CommitFailedException from pyiceberg.partitioning import PartitionSpec from pyiceberg.schema import Schema +from tenacity import RetryError, Retrying, stop_after_attempt, wait_random U = TypeVar("U") @@ -289,12 +291,47 @@ def _table_writer( # TODO: use some sort of retry mechanism here # See: https://github.com/apache/iceberg-python/pull/330 # See: https://github.com/apache/iceberg-python/issues/269 - table.overwrite( + _overwrite_table_with_retries( + table=table, df=data, overwrite_filter=row_filter, ) +def _overwrite_table_with_retries( + table: table.Table, + df: pa.Table, + overwrite_filter: Union[E.BooleanExpression, str], + retries: int = 4, +): + """Overwrites an iceberg table and retries on failure + + Args: + table (table.Table): Iceberg table + df (pa.Table): Data to write to the table + overwrite_filter (Union[E.BooleanExpression, str]): Filter to apply to the overwrite operation + retries (int, optional): Max number of retries. Defaults to 4. + + Raises: + RetryError: Raised when the commit fails after the maximum number of retries + """ + try: + for retry in Retrying( + stop=stop_after_attempt(retries), reraise=True, wait=wait_random(0.1, 0.99) + ): + with retry: + try: + with table.transaction() as tx: + tx.overwrite(df=df, overwrite_filter=overwrite_filter) + tx.commit_transaction() + except CommitFailedException: + # Do not refresh on the final try + if retry.retry_state.attempt_number < retries: + table.refresh() + except RetryError as e: + raise RetryError(f"Commit failed after {retries} retries") from e + + def _time_window_partition_filter( table_partition: TablePartitionDimension, iceberg_partition_spec_field_type: Union[ diff --git a/pyproject.toml b/pyproject.toml index 6145ab9..b4d66c2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,9 @@ authors = [ ] requires-python = ">=3.10" readme = "README.md" -dependencies = [] +dependencies = [ + "tenacity>=8.5.0", +] [tool.uv] dev-dependencies = [ diff --git a/uv.lock b/uv.lock index 38d086d..8abb8ec 100644 --- a/uv.lock +++ b/uv.lock @@ -470,6 +470,9 @@ requires-dist = [ name = "dagster-pyiceberg-project" version = "0.0.0" source = { virtual = "." } +dependencies = [ + { name = "tenacity" }, +] [package.dev-dependencies] dev = [ @@ -483,6 +486,7 @@ dev = [ ] [package.metadata] +requires-dist = [{ name = "tenacity", specifier = ">=8.5.0" }] [package.metadata.requires-dev] dev = [