Skip to content

Commit

Permalink
Add error handling for encoding the dag runs (#40222)
Browse files Browse the repository at this point in the history
  • Loading branch information
molcay authored Jul 16, 2024
1 parent ee2be50 commit a10b98e
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 23 deletions.
3 changes: 3 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ indent_size = 4
[*.js]
indent_size = 2

[*.ts]
indent_size = 2

[*.css]
indent_size = 2

Expand Down
13 changes: 13 additions & 0 deletions airflow/www/static/js/api/useGridData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export interface GridData {
dagRuns: DagRun[];
groups: Task;
ordering: RunOrdering;
errors: string[];
}

export const emptyGridData: GridData = {
Expand All @@ -57,6 +58,7 @@ export const emptyGridData: GridData = {
instances: [],
},
ordering: [],
errors: [],
};

const formatOrdering = (data: GridData) => ({
Expand Down Expand Up @@ -132,6 +134,17 @@ const useGridData = () => {
}
// turn off auto refresh if there are no active runs
if (!areActiveRuns(response.dagRuns)) stopRefresh();
// if any errors returned then show as toast message
if (response.errors.length > 0) {
response.errors.forEach((errorMsg) => {
const error = Error(errorMsg);
errorToast({
title: "Error",
error,
});
});
}

return response;
},
{
Expand Down
1 change: 1 addition & 0 deletions airflow/www/static/js/dag/grid/index.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ const mockGridData = {
},
],
ordering: ["dataIntervalStart"],
errors: [],
} as useGridDataModule.GridData;

const EXPAND = "Expand all task groups";
Expand Down
54 changes: 34 additions & 20 deletions airflow/www/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from __future__ import annotations

import json
import logging
import textwrap
import time
from typing import TYPE_CHECKING, Any, Callable, Sequence
Expand Down Expand Up @@ -67,6 +68,8 @@

TI = TaskInstance

logger = logging.getLogger(__name__)


def datetime_to_string(value: DateTime | None) -> str | None:
if value is None:
Expand Down Expand Up @@ -163,28 +166,39 @@ def get_dag_run_conf(

def encode_dag_run(
dag_run: DagRun | None, *, json_encoder: type[json.JSONEncoder] = json.JSONEncoder
) -> dict[str, Any] | None:
) -> tuple[dict[str, Any] | None, None | str]:
if not dag_run:
return None

dag_run_conf, conf_is_json = get_dag_run_conf(dag_run.conf, json_encoder=json_encoder)
return None, None

try:
dag_run_conf, conf_is_json = get_dag_run_conf(dag_run.conf, json_encoder=json_encoder)
encoded_dag_run = {
"run_id": dag_run.run_id,
"queued_at": datetime_to_string(dag_run.queued_at),
"start_date": datetime_to_string(dag_run.start_date),
"end_date": datetime_to_string(dag_run.end_date),
"state": dag_run.state,
"execution_date": datetime_to_string(dag_run.execution_date),
"data_interval_start": datetime_to_string(dag_run.data_interval_start),
"data_interval_end": datetime_to_string(dag_run.data_interval_end),
"run_type": dag_run.run_type,
"last_scheduling_decision": datetime_to_string(dag_run.last_scheduling_decision),
"external_trigger": dag_run.external_trigger,
"conf": dag_run_conf,
"conf_is_json": conf_is_json,
"note": dag_run.note,
}
except ValueError as e:
logger.error("Error while encoding the DAG Run!", exc_info=e)
if str(e) == "Circular reference detected":
return None, (
f"Circular reference detected in the DAG Run config (#{dag_run.run_id}). "
f"You should check your webserver logs for more details."
)
else:
raise e

return {
"run_id": dag_run.run_id,
"queued_at": datetime_to_string(dag_run.queued_at),
"start_date": datetime_to_string(dag_run.start_date),
"end_date": datetime_to_string(dag_run.end_date),
"state": dag_run.state,
"execution_date": datetime_to_string(dag_run.execution_date),
"data_interval_start": datetime_to_string(dag_run.data_interval_start),
"data_interval_end": datetime_to_string(dag_run.data_interval_end),
"run_type": dag_run.run_type,
"last_scheduling_decision": datetime_to_string(dag_run.last_scheduling_decision),
"external_trigger": dag_run.external_trigger,
"conf": dag_run_conf,
"conf_is_json": conf_is_json,
"note": dag_run.note,
}
return encoded_dag_run, None


def check_import_errors(fileloc, session):
Expand Down
11 changes: 10 additions & 1 deletion airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3350,15 +3350,24 @@ def grid_data(self):
if run_states:
query = query.where(DagRun.state.in_(run_states))

# Retrieve, sort and encode the previous DAG Runs
dag_runs = wwwutils.sorted_dag_runs(
query, ordering=dag.timetable.run_ordering, limit=num_runs, session=session
)
encoded_runs = []
encoding_errors = []
for dr in dag_runs:
encoded_dr, error = wwwutils.encode_dag_run(dr, json_encoder=utils_json.WebEncoder)
if error:
encoding_errors.append(error)
else:
encoded_runs.append(encoded_dr)

encoded_runs = [wwwutils.encode_dag_run(dr, json_encoder=utils_json.WebEncoder) for dr in dag_runs]
data = {
"groups": dag_to_grid(dag, dag_runs, session),
"dag_runs": encoded_runs,
"ordering": dag.timetable.run_ordering,
"errors": encoding_errors,
}
# avoid spaces to reduce payload size
return (
Expand Down
53 changes: 51 additions & 2 deletions tests/www/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@
from airflow.models import DagRun
from airflow.utils import json as utils_json
from airflow.www import utils
from airflow.www.utils import CustomSQLAInterface, DagRunCustomSQLAInterface, json_f, wrapped_markdown
from airflow.www.utils import (
CustomSQLAInterface,
DagRunCustomSQLAInterface,
encode_dag_run,
json_f,
wrapped_markdown,
)
from tests.test_utils.config import conf_vars


Expand Down Expand Up @@ -321,7 +327,7 @@ def test_get_dag_run_conf(self):

def test_encode_dag_run_none(self):
no_dag_run_result = utils.encode_dag_run(None)
assert no_dag_run_result is None
assert no_dag_run_result == (None, None)

def test_json_f_webencoder(self):
dag_run_conf = {
Expand Down Expand Up @@ -537,6 +543,49 @@ def test_wrapped_markdown_with_raw_html(self, allow_html):

assert escape(HTML) in rendered

@pytest.mark.parametrize(
"dag_run,expected_val",
[
[None, (None, None)],
[
DagRun(run_id="run_id_1", conf={}),
(
{
"conf": None,
"conf_is_json": False,
"data_interval_end": None,
"data_interval_start": None,
"end_date": None,
"execution_date": None,
"external_trigger": None,
"last_scheduling_decision": None,
"note": None,
"queued_at": None,
"run_id": "run_id_1",
"run_type": None,
"start_date": None,
"state": None,
},
None,
),
],
],
)
def test_encode_dag_run(self, dag_run, expected_val):
val = encode_dag_run(dag_run)
assert val == expected_val

def test_encode_dag_run_circular_reference(self):
conf = {}
conf["a"] = conf
dr = DagRun(run_id="run_id_1", conf=conf)
encoded_dr, error = encode_dag_run(dr)
assert encoded_dr is None
assert error == (
f"Circular reference detected in the DAG Run config (#{dr.run_id}). "
f"You should check your webserver logs for more details."
)


class TestFilter:
def setup_method(self):
Expand Down
3 changes: 3 additions & 0 deletions tests/www/views/test_views_grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def test_no_runs(admin_client, dag_without_runs):
"label": None,
},
"ordering": ["data_interval_end", "execution_date"],
"errors": [],
}


Expand Down Expand Up @@ -406,6 +407,7 @@ def test_one_run(admin_client, dag_with_runs: list[DagRun], session):
"label": None,
},
"ordering": ["data_interval_end", "execution_date"],
"errors": [],
}


Expand Down Expand Up @@ -459,6 +461,7 @@ def _expected_task_details(task_id, has_outlet_datasets):
"label": None,
},
"ordering": ["data_interval_end", "execution_date"],
"errors": [],
}


Expand Down

0 comments on commit a10b98e

Please sign in to comment.