Skip to content

Commit

Permalink
DagFilePath -> DagFileInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
jedcunningham committed Jan 3, 2025
1 parent 626c342 commit 5f5b30c
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 26 deletions.
22 changes: 11 additions & 11 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class DagFileStat:
log = logging.getLogger("airflow.processor_manager")


class DagFilePath(NamedTuple):
class DagFileInfo(NamedTuple):
"""Information about a DAG file."""

path: str
Expand Down Expand Up @@ -342,16 +342,16 @@ class DagFileProcessorManager:
heartbeat: Callable[[], None] = attrs.field(default=lambda: None)
"""An overridable heartbeat called once every time around the loop"""

_file_paths: list[DagFilePath] = attrs.field(factory=list, init=False)
_file_path_queue: deque[DagFilePath] = attrs.field(factory=deque, init=False)
_file_stats: dict[DagFilePath, DagFileStat] = attrs.field(
_file_paths: list[DagFileInfo] = attrs.field(factory=list, init=False)
_file_path_queue: deque[DagFileInfo] = attrs.field(factory=deque, init=False)
_file_stats: dict[DagFileInfo, DagFileStat] = attrs.field(
factory=lambda: defaultdict(DagFileStat), init=False
)

_dag_bundles: list[BaseDagBundle] = attrs.field(factory=list, init=False)
_bundle_versions: dict[str, str] = attrs.field(factory=dict, init=False)

_processors: dict[DagFilePath, DagFileProcessorProcess] = attrs.field(factory=dict, init=False)
_processors: dict[DagFileInfo, DagFileProcessorProcess] = attrs.field(factory=dict, init=False)

_parsing_start_time: float = attrs.field(init=False)
_num_run: int = attrs.field(default=0, init=False)
Expand Down Expand Up @@ -428,7 +428,7 @@ def _scan_stale_dags(self):
@provide_session
def deactivate_stale_dags(
self,
last_parsed: dict[DagFilePath, datetime | None],
last_parsed: dict[DagFileInfo, datetime | None],
stale_dag_threshold: int,
session: Session = NEW_SESSION,
):
Expand All @@ -445,7 +445,7 @@ def deactivate_stale_dags(
# last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is
# no longer present in the file. We have a stale_dag_threshold configured to prevent a
# significant delay in deactivation of stale dags when a large timeout is configured
dag_file_path = DagFilePath(path=dag.fileloc, bundle_name=dag.bundle_name)
dag_file_path = DagFileInfo(path=dag.fileloc, bundle_name=dag.bundle_name)
if (
dag_file_path in last_parsed
and (dag.last_parsed_time + timedelta(seconds=stale_dag_threshold))
Expand Down Expand Up @@ -678,7 +678,7 @@ def _refresh_dag_bundles(self):
# remove all files from the bundle, then add the new ones
self._file_paths = [f for f in self._file_paths if f.bundle_name != bundle_model.name]
self._file_paths.extend(
DagFilePath(path=path, bundle_name=bundle_model.name) for path in bundle_file_paths
DagFileInfo(path=path, bundle_name=bundle_model.name) for path in bundle_file_paths
)

try:
Expand Down Expand Up @@ -844,7 +844,7 @@ def _log_file_processing_stats(self, known_file_paths):

self.log.info(log_str)

def set_file_paths(self, new_file_paths: list[DagFilePath]):
def set_file_paths(self, new_file_paths: list[DagFileInfo]):
"""
Update this with a new set of DagFilePaths to DAG definition files.
Expand Down Expand Up @@ -904,7 +904,7 @@ def _collect_results(self, session: Session = NEW_SESSION):
for dag_file in finished:
self._processors.pop(dag_file)

def _create_process(self, dag_file: DagFilePath) -> DagFileProcessorProcess:
def _create_process(self, dag_file: DagFileInfo) -> DagFileProcessorProcess:
id = uuid7()

# callback_to_execute_for_file = self._callback_to_execute.pop(file_path, [])
Expand Down Expand Up @@ -1060,7 +1060,7 @@ def _kill_timed_out_processors(self):
for proc in processors_to_remove:
self._processors.pop(proc)

def _add_paths_to_queue(self, file_paths_to_enqueue: list[DagFilePath], add_at_front: bool):
def _add_paths_to_queue(self, file_paths_to_enqueue: list[DagFileInfo], add_at_front: bool):
"""Add stuff to the back or front of the file queue, unless it's already present."""
new_file_paths = list(p for p in file_paths_to_enqueue if p not in self._file_path_queue)
if add_at_front:
Expand Down
30 changes: 15 additions & 15 deletions tests/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from airflow.callbacks.callback_requests import CallbackRequest, DagCallbackRequest
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow.dag_processing.manager import (
DagFilePath,
DagFileInfo,
DagFileProcessorAgent,
DagFileProcessorManager,
DagFileStat,
Expand Down Expand Up @@ -79,8 +79,8 @@
DEFAULT_DATE = timezone.datetime(2016, 1, 1)


def _get_dag_file_paths(files: list[str]) -> list[DagFilePath]:
return [DagFilePath(bundle_name="testing", path=f) for f in files]
def _get_dag_file_paths(files: list[str]) -> list[DagFileInfo]:
return [DagFileInfo(bundle_name="testing", path=f) for f in files]


class TestDagFileProcessorManager:
Expand Down Expand Up @@ -171,9 +171,9 @@ def test_start_new_processes_with_same_filepath(self):
"""
manager = DagFileProcessorManager(max_runs=1)

file_1 = DagFilePath(bundle_name="testing", path="file_1.py")
file_2 = DagFilePath(bundle_name="testing", path="file_2.py")
file_3 = DagFilePath(bundle_name="testing", path="file_3.py")
file_1 = DagFileInfo(bundle_name="testing", path="file_1.py")
file_2 = DagFileInfo(bundle_name="testing", path="file_2.py")
file_3 = DagFileInfo(bundle_name="testing", path="file_3.py")
manager._file_path_queue = deque([file_1, file_2, file_3])

# Mock that only one processor exists. This processor runs with 'file_1'
Expand All @@ -194,7 +194,7 @@ def test_start_new_processes_with_same_filepath(self):
def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):
"""Ensure processors and file stats are removed when the file path is not in the new file paths"""
manager = DagFileProcessorManager(max_runs=1)
file = DagFilePath(bundle_name="testing", path="missing_file.txt")
file = DagFileInfo(bundle_name="testing", path="missing_file.txt")

manager._processors[file] = MagicMock()
manager._file_stats[file] = DagFileStat()
Expand All @@ -205,7 +205,7 @@ def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):

def test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self):
manager = DagFileProcessorManager(max_runs=1)
file = DagFilePath(bundle_name="testing", path="abc.txt")
file = DagFileInfo(bundle_name="testing", path="abc.txt")
mock_processor = MagicMock()

manager._processors[file] = mock_processor
Expand Down Expand Up @@ -313,7 +313,7 @@ def test_add_new_file_to_parsing_queue(
ordered_files = _get_dag_file_paths(["file_3.py", "file_2.py", "file_1.py"])
assert manager._file_path_queue == deque(ordered_files)

manager.set_file_paths([*dag_files, DagFilePath(bundle_name="testing", path="file_4.py")])
manager.set_file_paths([*dag_files, DagFileInfo(bundle_name="testing", path="file_4.py")])
manager.add_new_file_path_to_queue()
ordered_files = _get_dag_file_paths(["file_4.py", "file_3.py", "file_2.py", "file_1.py"])
assert manager._file_path_queue == deque(ordered_files)
Expand All @@ -326,7 +326,7 @@ def test_recently_modified_file_is_parsed_with_mtime_mode(self, mock_getmtime):
"""
freezed_base_time = timezone.datetime(2020, 1, 5, 0, 0, 0)
initial_file_1_mtime = (freezed_base_time - timedelta(minutes=5)).timestamp()
dag_file = DagFilePath(bundle_name="testing", path="file_1.py")
dag_file = DagFileInfo(bundle_name="testing", path="file_1.py")
dag_files = [dag_file]
mock_getmtime.side_effect = [initial_file_1_mtime]

Expand Down Expand Up @@ -401,7 +401,7 @@ def test_scan_stale_dags(self, testing_dag_bundle):
processor_timeout=10 * 60,
)

test_dag_path = DagFilePath(
test_dag_path = DagFileInfo(
bundle_name="testing",
path=str(TEST_DAG_FOLDER / "test_example_bash_operator.py"),
)
Expand Down Expand Up @@ -456,7 +456,7 @@ def test_kill_timed_out_processors_kill(self):

processor = self.mock_processor()
processor._process.create_time.return_value = timezone.make_aware(datetime.min).timestamp()
manager._processors = {DagFilePath(bundle_name="testing", path="abc.txt"): processor}
manager._processors = {DagFileInfo(bundle_name="testing", path="abc.txt"): processor}
with mock.patch.object(type(processor), "kill") as mock_kill:
manager._kill_timed_out_processors()
mock_kill.assert_called_once_with(signal.SIGKILL)
Expand All @@ -470,7 +470,7 @@ def test_kill_timed_out_processors_no_kill(self):

processor = self.mock_processor()
processor._process.create_time.return_value = timezone.make_aware(datetime.max).timestamp()
manager._processors = {DagFilePath(bundle_name="testing", path="abc.txt"): processor}
manager._processors = {DagFileInfo(bundle_name="testing", path="abc.txt"): processor}
with mock.patch.object(type(processor), "kill") as mock_kill:
manager._kill_timed_out_processors()
mock_kill.assert_not_called()
Expand Down Expand Up @@ -741,7 +741,7 @@ def test_callback_queue(self, tmp_path):
processor_timeout=365 * 86_400,
)

dag1_path = DagFilePath(bundle_name="testing", path="/green_eggs/ham/file1.py")
dag1_path = DagFileInfo(bundle_name="testing", path="/green_eggs/ham/file1.py")
dag1_req1 = DagCallbackRequest(
full_filepath="/green_eggs/ham/file1.py",
dag_id="dag1",
Expand All @@ -757,7 +757,7 @@ def test_callback_queue(self, tmp_path):
msg=None,
)

dag2_path = DagFilePath(bundle_name="testing", path="/green_eggs/ham/file2.py")
dag2_path = DagFileInfo(bundle_name="testing", path="/green_eggs/ham/file2.py")
dag2_req1 = DagCallbackRequest(
full_filepath="/green_eggs/ham/file2.py",
dag_id="dag2",
Expand Down

0 comments on commit 5f5b30c

Please sign in to comment.