diff --git a/airflow-core/src/airflow/executors/base_executor.py b/airflow-core/src/airflow/executors/base_executor.py index 8f20f23f8da05..d21bfa4fed875 100644 --- a/airflow-core/src/airflow/executors/base_executor.py +++ b/airflow-core/src/airflow/executors/base_executor.py @@ -295,7 +295,7 @@ def _get_workloads_to_schedule(self, open_slots: int) -> list[tuple[WorkloadKey, return workloads_to_schedule - def _process_workloads(self, workloads: Sequence[ExecutorWorkload]) -> None: + def _process_workloads(self, workload_items: Sequence[ExecutorWorkload]) -> None: """ Process the given workloads. @@ -303,7 +303,7 @@ def _process_workloads(self, workloads: Sequence[ExecutorWorkload]) -> None: the execution of workloads (e.g., queuing them to workers, submitting to external systems, etc.). - :param workloads: List of workloads to process + :param workload_items: List of workloads to process """ raise NotImplementedError(f"{type(self).__name__} must implement _process_workloads()") diff --git a/providers/celery/src/airflow/providers/celery/executors/celery_executor.py b/providers/celery/src/airflow/providers/celery/executors/celery_executor.py index 16d634fbdefbb..7fc388c9608a8 100644 --- a/providers/celery/src/airflow/providers/celery/executors/celery_executor.py +++ b/providers/celery/src/airflow/providers/celery/executors/celery_executor.py @@ -173,7 +173,7 @@ def _process_tasks(self, task_tuples: Sequence[TaskTuple]) -> None: self._send_workloads(task_tuples_to_send) - def _process_workloads(self, workloads: Sequence[workloads.All]) -> None: + def _process_workloads(self, workload_items: Sequence[workloads.All]) -> None: # Airflow V3 version -- have to delay imports until we know we are on v3. from airflow.executors.workloads import ExecuteTask @@ -181,7 +181,7 @@ def _process_workloads(self, workloads: Sequence[workloads.All]) -> None: from airflow.executors.workloads import ExecuteCallback workloads_to_be_sent: list[WorkloadInCelery] = [] - for workload in workloads: + for workload in workload_items: if isinstance(workload, ExecuteTask): workloads_to_be_sent.append((workload.ti.key, workload, workload.ti.queue, self.team_name)) elif AIRFLOW_V_3_2_PLUS and isinstance(workload, ExecuteCallback):