Skip to content
This repository has been archived by the owner on Feb 20, 2025. It is now read-only.

Commit

Permalink
feat: add overwrite with retries using tenacity
Browse files Browse the repository at this point in the history
  • Loading branch information
JasperHG90 committed Oct 19, 2024
1 parent 7baa560 commit 6f596f6
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 2 deletions.
39 changes: 38 additions & 1 deletion packages/dagster_pyiceberg/src/dagster_pyiceberg/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
from pyiceberg import types as T
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import CommitFailedException
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
from tenacity import RetryError, Retrying, stop_after_attempt, wait_random

U = TypeVar("U")

Expand Down Expand Up @@ -289,12 +291,47 @@ def _table_writer(
# TODO: use some sort of retry mechanism here
# See: https://github.com/apache/iceberg-python/pull/330
# See: https://github.com/apache/iceberg-python/issues/269
table.overwrite(
_overwrite_table_with_retries(
table=table,
df=data,
overwrite_filter=row_filter,
)


def _overwrite_table_with_retries(
table: table.Table,
df: pa.Table,
overwrite_filter: Union[E.BooleanExpression, str],
retries: int = 4,
):
"""Overwrites an iceberg table and retries on failure
Args:
table (table.Table): Iceberg table
df (pa.Table): Data to write to the table
overwrite_filter (Union[E.BooleanExpression, str]): Filter to apply to the overwrite operation
retries (int, optional): Max number of retries. Defaults to 4.
Raises:
RetryError: Raised when the commit fails after the maximum number of retries
"""
try:
for retry in Retrying(
stop=stop_after_attempt(retries), reraise=True, wait=wait_random(0.1, 0.99)
):
with retry:
try:
with table.transaction() as tx:
tx.overwrite(df=df, overwrite_filter=overwrite_filter)
tx.commit_transaction()
except CommitFailedException:
# Do not refresh on the final try
if retry.retry_state.attempt_number < retries:
table.refresh()
except RetryError as e:
raise RetryError(f"Commit failed after {retries} retries") from e


def _time_window_partition_filter(
table_partition: TablePartitionDimension,
iceberg_partition_spec_field_type: Union[
Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ authors = [
]
requires-python = ">=3.10"
readme = "README.md"
dependencies = []
dependencies = [
"tenacity>=8.5.0",
]

[tool.uv]
dev-dependencies = [
Expand Down
4 changes: 4 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 6f596f6

Please sign in to comment.