From 5f5b30c9700cd667c4b8a189ce8074b20f26df04 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Fri, 3 Jan 2025 10:59:23 -0500 Subject: [PATCH] DagFilePath -> DagFileInfo --- airflow/dag_processing/manager.py | 22 ++++++++++---------- tests/dag_processing/test_manager.py | 30 ++++++++++++++-------------- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 57621b09d831f..9ac8802f7d7fd 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -102,7 +102,7 @@ class DagFileStat: log = logging.getLogger("airflow.processor_manager") -class DagFilePath(NamedTuple): +class DagFileInfo(NamedTuple): """Information about a DAG file.""" path: str @@ -342,16 +342,16 @@ class DagFileProcessorManager: heartbeat: Callable[[], None] = attrs.field(default=lambda: None) """An overridable heartbeat called once every time around the loop""" - _file_paths: list[DagFilePath] = attrs.field(factory=list, init=False) - _file_path_queue: deque[DagFilePath] = attrs.field(factory=deque, init=False) - _file_stats: dict[DagFilePath, DagFileStat] = attrs.field( + _file_paths: list[DagFileInfo] = attrs.field(factory=list, init=False) + _file_path_queue: deque[DagFileInfo] = attrs.field(factory=deque, init=False) + _file_stats: dict[DagFileInfo, DagFileStat] = attrs.field( factory=lambda: defaultdict(DagFileStat), init=False ) _dag_bundles: list[BaseDagBundle] = attrs.field(factory=list, init=False) _bundle_versions: dict[str, str] = attrs.field(factory=dict, init=False) - _processors: dict[DagFilePath, DagFileProcessorProcess] = attrs.field(factory=dict, init=False) + _processors: dict[DagFileInfo, DagFileProcessorProcess] = attrs.field(factory=dict, init=False) _parsing_start_time: float = attrs.field(init=False) _num_run: int = attrs.field(default=0, init=False) @@ -428,7 +428,7 @@ def _scan_stale_dags(self): @provide_session def deactivate_stale_dags( self, - last_parsed: dict[DagFilePath, datetime | None], + last_parsed: dict[DagFileInfo, datetime | None], stale_dag_threshold: int, session: Session = NEW_SESSION, ): @@ -445,7 +445,7 @@ def deactivate_stale_dags( # last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is # no longer present in the file. We have a stale_dag_threshold configured to prevent a # significant delay in deactivation of stale dags when a large timeout is configured - dag_file_path = DagFilePath(path=dag.fileloc, bundle_name=dag.bundle_name) + dag_file_path = DagFileInfo(path=dag.fileloc, bundle_name=dag.bundle_name) if ( dag_file_path in last_parsed and (dag.last_parsed_time + timedelta(seconds=stale_dag_threshold)) @@ -678,7 +678,7 @@ def _refresh_dag_bundles(self): # remove all files from the bundle, then add the new ones self._file_paths = [f for f in self._file_paths if f.bundle_name != bundle_model.name] self._file_paths.extend( - DagFilePath(path=path, bundle_name=bundle_model.name) for path in bundle_file_paths + DagFileInfo(path=path, bundle_name=bundle_model.name) for path in bundle_file_paths ) try: @@ -844,7 +844,7 @@ def _log_file_processing_stats(self, known_file_paths): self.log.info(log_str) - def set_file_paths(self, new_file_paths: list[DagFilePath]): + def set_file_paths(self, new_file_paths: list[DagFileInfo]): """ Update this with a new set of DagFilePaths to DAG definition files. @@ -904,7 +904,7 @@ def _collect_results(self, session: Session = NEW_SESSION): for dag_file in finished: self._processors.pop(dag_file) - def _create_process(self, dag_file: DagFilePath) -> DagFileProcessorProcess: + def _create_process(self, dag_file: DagFileInfo) -> DagFileProcessorProcess: id = uuid7() # callback_to_execute_for_file = self._callback_to_execute.pop(file_path, []) @@ -1060,7 +1060,7 @@ def _kill_timed_out_processors(self): for proc in processors_to_remove: self._processors.pop(proc) - def _add_paths_to_queue(self, file_paths_to_enqueue: list[DagFilePath], add_at_front: bool): + def _add_paths_to_queue(self, file_paths_to_enqueue: list[DagFileInfo], add_at_front: bool): """Add stuff to the back or front of the file queue, unless it's already present.""" new_file_paths = list(p for p in file_paths_to_enqueue if p not in self._file_path_queue) if add_at_front: diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index 4312edc5117fc..861b61b73a1ee 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -44,7 +44,7 @@ from airflow.callbacks.callback_requests import CallbackRequest, DagCallbackRequest from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG from airflow.dag_processing.manager import ( - DagFilePath, + DagFileInfo, DagFileProcessorAgent, DagFileProcessorManager, DagFileStat, @@ -79,8 +79,8 @@ DEFAULT_DATE = timezone.datetime(2016, 1, 1) -def _get_dag_file_paths(files: list[str]) -> list[DagFilePath]: - return [DagFilePath(bundle_name="testing", path=f) for f in files] +def _get_dag_file_paths(files: list[str]) -> list[DagFileInfo]: + return [DagFileInfo(bundle_name="testing", path=f) for f in files] class TestDagFileProcessorManager: @@ -171,9 +171,9 @@ def test_start_new_processes_with_same_filepath(self): """ manager = DagFileProcessorManager(max_runs=1) - file_1 = DagFilePath(bundle_name="testing", path="file_1.py") - file_2 = DagFilePath(bundle_name="testing", path="file_2.py") - file_3 = DagFilePath(bundle_name="testing", path="file_3.py") + file_1 = DagFileInfo(bundle_name="testing", path="file_1.py") + file_2 = DagFileInfo(bundle_name="testing", path="file_2.py") + file_3 = DagFileInfo(bundle_name="testing", path="file_3.py") manager._file_path_queue = deque([file_1, file_2, file_3]) # Mock that only one processor exists. This processor runs with 'file_1' @@ -194,7 +194,7 @@ def test_start_new_processes_with_same_filepath(self): def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self): """Ensure processors and file stats are removed when the file path is not in the new file paths""" manager = DagFileProcessorManager(max_runs=1) - file = DagFilePath(bundle_name="testing", path="missing_file.txt") + file = DagFileInfo(bundle_name="testing", path="missing_file.txt") manager._processors[file] = MagicMock() manager._file_stats[file] = DagFileStat() @@ -205,7 +205,7 @@ def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self): def test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self): manager = DagFileProcessorManager(max_runs=1) - file = DagFilePath(bundle_name="testing", path="abc.txt") + file = DagFileInfo(bundle_name="testing", path="abc.txt") mock_processor = MagicMock() manager._processors[file] = mock_processor @@ -313,7 +313,7 @@ def test_add_new_file_to_parsing_queue( ordered_files = _get_dag_file_paths(["file_3.py", "file_2.py", "file_1.py"]) assert manager._file_path_queue == deque(ordered_files) - manager.set_file_paths([*dag_files, DagFilePath(bundle_name="testing", path="file_4.py")]) + manager.set_file_paths([*dag_files, DagFileInfo(bundle_name="testing", path="file_4.py")]) manager.add_new_file_path_to_queue() ordered_files = _get_dag_file_paths(["file_4.py", "file_3.py", "file_2.py", "file_1.py"]) assert manager._file_path_queue == deque(ordered_files) @@ -326,7 +326,7 @@ def test_recently_modified_file_is_parsed_with_mtime_mode(self, mock_getmtime): """ freezed_base_time = timezone.datetime(2020, 1, 5, 0, 0, 0) initial_file_1_mtime = (freezed_base_time - timedelta(minutes=5)).timestamp() - dag_file = DagFilePath(bundle_name="testing", path="file_1.py") + dag_file = DagFileInfo(bundle_name="testing", path="file_1.py") dag_files = [dag_file] mock_getmtime.side_effect = [initial_file_1_mtime] @@ -401,7 +401,7 @@ def test_scan_stale_dags(self, testing_dag_bundle): processor_timeout=10 * 60, ) - test_dag_path = DagFilePath( + test_dag_path = DagFileInfo( bundle_name="testing", path=str(TEST_DAG_FOLDER / "test_example_bash_operator.py"), ) @@ -456,7 +456,7 @@ def test_kill_timed_out_processors_kill(self): processor = self.mock_processor() processor._process.create_time.return_value = timezone.make_aware(datetime.min).timestamp() - manager._processors = {DagFilePath(bundle_name="testing", path="abc.txt"): processor} + manager._processors = {DagFileInfo(bundle_name="testing", path="abc.txt"): processor} with mock.patch.object(type(processor), "kill") as mock_kill: manager._kill_timed_out_processors() mock_kill.assert_called_once_with(signal.SIGKILL) @@ -470,7 +470,7 @@ def test_kill_timed_out_processors_no_kill(self): processor = self.mock_processor() processor._process.create_time.return_value = timezone.make_aware(datetime.max).timestamp() - manager._processors = {DagFilePath(bundle_name="testing", path="abc.txt"): processor} + manager._processors = {DagFileInfo(bundle_name="testing", path="abc.txt"): processor} with mock.patch.object(type(processor), "kill") as mock_kill: manager._kill_timed_out_processors() mock_kill.assert_not_called() @@ -741,7 +741,7 @@ def test_callback_queue(self, tmp_path): processor_timeout=365 * 86_400, ) - dag1_path = DagFilePath(bundle_name="testing", path="/green_eggs/ham/file1.py") + dag1_path = DagFileInfo(bundle_name="testing", path="/green_eggs/ham/file1.py") dag1_req1 = DagCallbackRequest( full_filepath="/green_eggs/ham/file1.py", dag_id="dag1", @@ -757,7 +757,7 @@ def test_callback_queue(self, tmp_path): msg=None, ) - dag2_path = DagFilePath(bundle_name="testing", path="/green_eggs/ham/file2.py") + dag2_path = DagFileInfo(bundle_name="testing", path="/green_eggs/ham/file2.py") dag2_req1 = DagCallbackRequest( full_filepath="/green_eggs/ham/file2.py", dag_id="dag2",