Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-83 Restore uniqueness constraint on logical date and make logical date nullable #46232

Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
d0e22ad
restore unique constraint on logical date and make it nullable
vatsrahul1001 Jan 29, 2025
2b11aed
restore unique constraint on logical date and make it nullable
vatsrahul1001 Jan 29, 2025
237360d
fix migration file
vatsrahul1001 Jan 29, 2025
6f423c1
Merge branch 'main' into restore-unique-constraint-logical-date
vatsrahul1001 Jan 29, 2025
18efafb
fix migration file
vatsrahul1001 Jan 29, 2025
8f9e076
Merge branch 'restore-unique-constraint-logical-date' of github.com:a…
vatsrahul1001 Jan 29, 2025
1c47c75
Merge branch 'main' into restore-unique-constraint-logical-date
vatsrahul1001 Jan 29, 2025
d3bfe20
Merge branch 'main' of github.com:astronomer/airflow into restore-uni…
vatsrahul1001 Jan 30, 2025
0fa2477
Merge branch 'restore-unique-constraint-logical-date' of github.com:a…
vatsrahul1001 Jan 30, 2025
75a684a
fixing tests
vatsrahul1001 Jan 30, 2025
53d329b
resolve conflict
vatsrahul1001 Feb 3, 2025
c3e29c7
remove default date from logical date in dag run model
vatsrahul1001 Feb 3, 2025
0830d58
Merge branch 'main' of github.com:astronomer/airflow into restore-uni…
vatsrahul1001 Feb 3, 2025
2b4251d
fix task_filter
vatsrahul1001 Feb 3, 2025
3ec1ecd
merge main
vatsrahul1001 Feb 3, 2025
5680a76
Merge branch 'main' into restore-unique-constraint-logical-date
vatsrahul1001 Feb 3, 2025
16cd717
Merge branch 'main' into restore-unique-constraint-logical-date
vatsrahul1001 Feb 3, 2025
7d6c9f4
fix tests
vatsrahul1001 Feb 3, 2025
02a82a0
Merge branch 'restore-unique-constraint-logical-date' of github.com:a…
vatsrahul1001 Feb 3, 2025
83b62bb
implement review comments
vatsrahul1001 Feb 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
# under the License.

"""
Drop ``execution_date`` unique constraint on DagRun.
Make logical_date nullable.

The column has also been renamed to logical_date, although the Python model is
The column has been renamed to logical_date, although the Python model is
not changed. This allows us to not need to fix all the Python code at once, but
still do the two changes in one migration instead of two.

Expand Down Expand Up @@ -49,10 +49,15 @@ def upgrade():
"execution_date",
new_column_name="logical_date",
existing_type=TIMESTAMP(timezone=True),
existing_nullable=False,
existing_nullable=True,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
existing_nullable=True,
nullable=True,

existing_nullable is for specifying the nullable property we don’t want to change. Here we want to change nullability.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I Implemented this change.

)

with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.drop_constraint("dag_run_dag_id_execution_date_key", type_="unique")
batch_op.create_unique_constraint(
"dag_run_dag_id_logical_date_key",
columns=["dag_id", "logical_date"],
)


def downgrade():
Expand All @@ -63,7 +68,9 @@ def downgrade():
existing_type=TIMESTAMP(timezone=True),
existing_nullable=False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
existing_nullable=False,
nullable=False,

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I Implemented this change.

)

with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.drop_constraint("dag_run_dag_id_logical_date_key", type_="unique")
batch_op.create_unique_constraint(
"dag_run_dag_id_execution_date_key",
columns=["dag_id", "execution_date"],
Expand Down
5 changes: 3 additions & 2 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1719,7 +1719,7 @@ def create_dagrun(
self,
*,
run_id: str,
logical_date: datetime,
logical_date: datetime | None,
data_interval: tuple[datetime, datetime],
conf: dict | None = None,
run_type: DagRunType,
Expand All @@ -1743,7 +1743,8 @@ def create_dagrun(

:meta private:
"""
logical_date = timezone.coerce_datetime(logical_date)
if logical_date is not None:
logical_date = timezone.coerce_datetime(logical_date)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if logical_date is not None:
logical_date = timezone.coerce_datetime(logical_date)
logical_date = timezone.coerce_datetime(logical_date)

This function handles None on its own

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I Implemented this change.


if data_interval and not isinstance(data_interval, DataInterval):
data_interval = DataInterval(*map(timezone.coerce_datetime, data_interval))
Expand Down
13 changes: 10 additions & 3 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class DagRun(Base, LoggingMixin):
id = Column(Integer, primary_key=True)
dag_id = Column(StringID(), nullable=False)
queued_at = Column(UtcDateTime)
logical_date = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
logical_date = Column(UtcDateTime, nullable=True)
start_date = Column(UtcDateTime)
end_date = Column(UtcDateTime)
_state = Column("state", String(50), default=DagRunState.QUEUED)
Expand Down Expand Up @@ -179,6 +179,7 @@ class DagRun(Base, LoggingMixin):
__table_args__ = (
Index("dag_id_state", dag_id, _state),
UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"),
UniqueConstraint("dag_id", "logical_date", name="dag_run_dag_id_logical_date_key"),
Index("idx_dag_run_dag_id", dag_id),
Index(
"idx_dag_run_running_dags",
Expand Down Expand Up @@ -1304,8 +1305,14 @@ def verify_integrity(self, *, session: Session = NEW_SESSION) -> None:
def task_filter(task: Operator) -> bool:
return task.task_id not in task_ids and (
self.run_type == DagRunType.BACKFILL_JOB
or (task.start_date is None or task.start_date <= self.logical_date)
and (task.end_date is None or self.logical_date <= task.end_date)
or (
task.start_date is None
or (self.logical_date is not None and task.start_date <= self.logical_date)
)
and (
task.end_date is None
or (self.logical_date is not None and self.logical_date <= task.end_date)
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
or (
task.start_date is None
or (self.logical_date is not None and task.start_date <= self.logical_date)
)
and (
task.end_date is None
or (self.logical_date is not None and self.logical_date <= task.end_date)
)
or (
task.start_date is None
or self.logical_date is None
or task.start_date <= self.logical_date
)
and (task.end_date is None or self.logical_date is None or self.logical_date <= task.end_date)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this "might" be the correct condition.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this one is correct. Thanks @Lee-W

)

created_counts: dict[str, int] = defaultdict(int)
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0f041321ecbb5d059f7a9c85fd5cfba745732eea205dc2e4f5e3b6dd76a9468d
051d91640c4b2c3daa69ed48eb523b42d8149f43c07c8b6e6f36e0a97c56651b
1 change: 0 additions & 1 deletion docs/apache-airflow/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/apache-airflow/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``522625f6d606`` | ``1cdc775ca98f`` | ``3.0.0`` | Add tables for backfill. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``1cdc775ca98f`` | ``a2c32e6c7729`` | ``3.0.0`` | Drop ``execution_date`` unique constraint on DagRun. |
| ``1cdc775ca98f`` | ``a2c32e6c7729`` | ``3.0.0`` | Make logical_date nullable. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``a2c32e6c7729`` | ``0bfc26bc256e`` | ``3.0.0`` | Add triggered_by field to DagRun. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
Expand Down
5 changes: 3 additions & 2 deletions tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,11 @@ def test_dagrun_no_deadlock_with_depends_on_past(self, dag_maker, session):
run_id="test_dagrun_no_deadlock_1",
start_date=DEFAULT_DATE,
)
dr2 = dag_maker.create_dagrun_after(
dr,
next_date = DEFAULT_DATE + datetime.timedelta(days=1)
dr2 = dag_maker.create_dagrun(
run_id="test_dagrun_no_deadlock_2",
start_date=DEFAULT_DATE + datetime.timedelta(days=1),
logical_date=next_date,
)
Comment on lines -354 to 359
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this shouldn’t need to be changed. How does this fail originally?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uranusjr When we pass the dr object to create_dagrun_after, it fails with a unique constraint error on the dag_run table. In dag_maker, we set logical_date to start_date if it's None here, and we are passing the same logical_date in create_dagrun_after due to which it fails.

ti1_op1 = dr.get_task_instance(task_id="dop")
dr2.get_task_instance(task_id="dop")
Expand Down
Loading