Skip to content

Commit

Permalink
Update conf column in dag_run table type from bytes to JSON (#44533)
Browse files Browse the repository at this point in the history
* remove pickled data from dag run table

* fix downgrade + add news fragement

* remove archive table if exits after downgrade

* removing archiving data

* fixing static check

* fixing static checks

* simplying upgrade and downgrade as per review

* fixing failures

* removing setting conf to null

* refactor approach to migrate values in conf

* update offline warning

* resolving conflicts

* resolving conflicts

* resolving conflicts

* updating batch size

* updaing conf type

---------

Co-authored-by: Jed Cunningham <[email protected]>
  • Loading branch information
vatsrahul1001 and jedcunningham authored Jan 13, 2025
1 parent e229ca0 commit db132fb
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
remove pickled data from dagrun table.
Revision ID: e39a26ac59f6
Revises: 38770795785f
Create Date: 2024-12-01 08:33:15.425141
"""

from __future__ import annotations

import json
import pickle
from textwrap import dedent

import sqlalchemy as sa
from alembic import context, op
from sqlalchemy import text
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "e39a26ac59f6"
down_revision = "38770795785f"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"


def upgrade():
"""Apply remove pickled data from dagrun table."""
conn = op.get_bind()
conf_type = sa.JSON().with_variant(postgresql.JSONB, "postgresql")
op.add_column("dag_run", sa.Column("conf_json", conf_type, nullable=True))

if context.is_offline_mode():
print(
dedent("""
------------
-- WARNING: Unable to migrate the data in the 'conf' column while in offline mode!
-- The 'conf' column will be set to NULL in offline mode.
-- Avoid using offline mode if you need to retain 'conf' values.
------------
""")
)
else:
BATCH_SIZE = 100
offset = 0
while True:
rows = conn.execute(
text(
f"SELECT id,conf FROM dag_run WHERE conf IS not NULL order by id LIMIT {BATCH_SIZE} OFFSET {offset}"
)
).fetchall()
if not rows:
break
for row in rows:
row_id, pickle_data = row

try:
original_data = pickle.loads(pickle_data)
json_data = json.dumps(original_data)
conn.execute(
text("""
UPDATE dag_run
SET conf_json = :json_data
WHERE id = :id
"""),
{"json_data": json_data, "id": row_id},
)
except Exception as e:
print(f"Error converting dagrun conf to json for dagrun ID {row_id}: {e}")
continue
offset += BATCH_SIZE

op.drop_column("dag_run", "conf")

op.alter_column("dag_run", "conf_json", existing_type=conf_type, new_column_name="conf")


def downgrade():
"""Unapply Remove pickled data from dagrun table."""
conn = op.get_bind()
op.add_column("dag_run", sa.Column("conf_pickle", sa.PickleType(), nullable=True))

if context.is_offline_mode():
print(
dedent("""
------------
-- WARNING: Unable to migrate the data in the 'conf' column while in offline mode!
-- The 'conf' column will be set to NULL in offline mode.
-- Avoid using offline mode if you need to retain 'conf' values.
------------
""")
)

else:
BATCH_SIZE = 100
offset = 0
while True:
rows = conn.execute(
text(
f"SELECT id,conf FROM dag_run WHERE conf IS not NULL order by id LIMIT {BATCH_SIZE} OFFSET {offset}"
)
).fetchall()
if not rows:
break
for row in rows:
row_id, json_data = row

try:
pickled_data = pickle.dumps(json_data, protocol=pickle.HIGHEST_PROTOCOL)
conn.execute(
text("""
UPDATE dag_run
SET conf_pickle = :pickle_data
WHERE id = :id
"""),
{"pickle_data": pickled_data, "id": row_id},
)
except Exception as e:
print(f"Error pickling dagrun conf for dagrun ID {row_id}: {e}")
continue
offset += BATCH_SIZE

op.drop_column("dag_run", "conf")

op.alter_column("dag_run", "conf_pickle", existing_type=sa.PickleType(), new_column_name="conf")
5 changes: 3 additions & 2 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@

import re2
from sqlalchemy import (
JSON,
Boolean,
Column,
Enum,
ForeignKey,
ForeignKeyConstraint,
Index,
Integer,
PickleType,
PrimaryKeyConstraint,
String,
Text,
Expand All @@ -45,6 +45,7 @@
tuple_,
update,
)
from sqlalchemy.dialects import postgresql
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import declared_attr, joinedload, relationship, synonym, validates
Expand Down Expand Up @@ -138,7 +139,7 @@ class DagRun(Base, LoggingMixin):
triggered_by = Column(
Enum(DagRunTriggeredByType, native_enum=False, length=50)
) # Airflow component that triggered the run.
conf = Column(PickleType)
conf = Column(JSON().with_variant(postgresql.JSONB, "postgresql"))
# These two must be either both NULL or both datetime.
data_interval_start = Column(UtcDateTime)
data_interval_end = Column(UtcDateTime)
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class MappedClassProtocol(Protocol):
"2.9.2": "686269002441",
"2.10.0": "22ed7efa9da2",
"2.10.3": "5f2621c13b39",
"3.0.0": "38770795785f",
"3.0.0": "e39a26ac59f6",
}


Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
f4ad824c8d9ff45e86002506edd83b540a88dab45bb292b1af96cd86dec5ecab
ca59d711e6304f8bfdb25f49339d455602430dd6b880e420869fc892faef0596
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 3 additions & 1 deletion docs/apache-airflow/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
| ``38770795785f`` (head) | ``5c9c0231baa2`` | ``3.0.0`` | Add asset reference models. |
| ``e39a26ac59f6`` (head) | ``38770795785f`` | ``3.0.0`` | remove pickled data from dagrun table. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``38770795785f`` | ``5c9c0231baa2`` | ``3.0.0`` | Add asset reference models. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``5c9c0231baa2`` | ``237cef8dfea1`` | ``3.0.0`` | Remove processor_subdir. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
Expand Down
5 changes: 5 additions & 0 deletions newsfragments/44533.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
During offline migration, ``DagRun.conf`` is cleared

.. Provide additional contextual information
The ``conf`` column is changing from pickle to json, thus, the values in that column cannot be migrated during offline migrations. If you want to retain ``conf`` values for existing DagRuns, you must do a normal, non-offline, migration.

0 comments on commit db132fb

Please sign in to comment.