diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6069d8ae56bd2..d15cf0b94b9f8 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1160,3 +1160,17 @@ repos: language: python files: .*test.*\.py$ pass_filenames: true + - repo: local + hooks: + - id: check-missing-error-meta-in-exceptions + name: Check for missing error meta properties in exception classes that use error code mixin + entry: scripts/ci/prek/check_airflow_exceptions_error_meta.py + language: python + files: > + (?x) + ^airflow-core/src/.*\.py$| + ^task-sdk/src/.*\.py$| + ^providers/.*/src/.*\.py$ + types: [python] + additional_dependencies: ['pyyaml>=6.0'] + require_serial: true diff --git a/airflow-core/docs/.gitignore b/airflow-core/docs/.gitignore index 69e1d56a72377..30263b1747d4a 100644 --- a/airflow-core/docs/.gitignore +++ b/airflow-core/docs/.gitignore @@ -1,3 +1,4 @@ +/error-codes /dags /logs /plugins diff --git a/airflow-core/docs/index.rst b/airflow-core/docs/index.rst index 4280171e42dc9..804fde8543203 100644 --- a/airflow-core/docs/index.rst +++ b/airflow-core/docs/index.rst @@ -156,6 +156,7 @@ experience is continuously improving, but defining workflows as code is central best-practices faq troubleshooting + error-codes/index Release Policies release_notes privacy_notice diff --git a/airflow-core/newsfragments/65423.significant.rst b/airflow-core/newsfragments/65423.significant.rst new file mode 100644 index 0000000000000..9bd4d6752edd4 --- /dev/null +++ b/airflow-core/newsfragments/65423.significant.rst @@ -0,0 +1,40 @@ +Error codes with doc pages gen and static checks + +This adds a way for devs to use error codes (AERR) while raising +exceptions. Error codes are derived from the exception class names when +AirflowErrorCodeMixin is added to respective exception class. Error +metadata i.e. user-facing error message, error description, +documentation link and first steps are specified by dev inside the +exception class, which are then available to users via docs - a new doc +page is generated for each error code when breeze builds the docs. + +This also adds a static check that validates exception raises for error +code mixin usage and error meta with supporting tests. Additionally it +adds error mixin to few exception classes, as examples to get started. + +**Step by step flow** + +- dev adds AirflowErrorCodeMixin to respective exception class that + they'd want to raise with an error_code. +- dev then specifies user_facing_error_message, first_steps, + error_description, documentation_link for that exception class. +- AirflowErrorCodeMixin automatically derives an error_code for the + exception class. +- dev runs breeze build-docs that generates a new docs page + AERR-NOT-FOUND.rst +- breeze static check takes care of validating + user_facing_error_message, first_steps, error_description, +documentation_link are specified correctly within the exception class. + +On airflow users' side, they now see airflow error code as part of the +stack trace, which they can use for communicating problems instead of +pasting verbose stack traces. Error codes also improve LLM-based +discovery of airflow errors as codes are much more +deterministic/well-defined than plain stack traces. + + +**Error code as part of exception string** + +Error code will be part exceptions' string output in the logs in the +following format: ``[AERR-TIMETABLE-INVALID] schedule interval must be +positive...`` diff --git a/airflow-core/pyproject.toml b/airflow-core/pyproject.toml index 3158a36124675..0b14955876f50 100644 --- a/airflow-core/pyproject.toml +++ b/airflow-core/pyproject.toml @@ -254,6 +254,7 @@ exclude = [ "../shared/plugins_manager/src/airflow_shared/plugins_manager" = "src/airflow/_shared/plugins_manager" "../shared/providers_discovery/src/airflow_shared/providers_discovery" = "src/airflow/_shared/providers_discovery" "../shared/template_rendering/src/airflow_shared/template_rendering" = "src/airflow/_shared/template_rendering" +"../shared/exceptions/src/airflow_shared/exceptions" = "src/airflow/_shared/exceptions" [tool.hatch.build.targets.custom] path = "./hatch_build.py" @@ -330,6 +331,7 @@ apache-airflow-devel-common = { workspace = true } shared_distributions = [ "apache-airflow-shared-configuration", "apache-airflow-shared-dagnode", + "apache-airflow-shared-exceptions", "apache-airflow-shared-listeners", "apache-airflow-shared-logging", "apache-airflow-shared-module-loading", diff --git a/airflow-core/src/airflow/_shared/exceptions b/airflow-core/src/airflow/_shared/exceptions new file mode 120000 index 0000000000000..5f00a22a5f47d --- /dev/null +++ b/airflow-core/src/airflow/_shared/exceptions @@ -0,0 +1 @@ +../../../../shared/exceptions/src/airflow_shared/exceptions \ No newline at end of file diff --git a/airflow-core/src/airflow/exceptions.py b/airflow-core/src/airflow/exceptions.py index 21798efee9879..34396538eae0d 100644 --- a/airflow-core/src/airflow/exceptions.py +++ b/airflow-core/src/airflow/exceptions.py @@ -30,6 +30,9 @@ # Re exporting AirflowConfigException from shared configuration from airflow._shared.configuration.exceptions import AirflowConfigException as AirflowConfigException +# Using AirflowErrorCodeMixin from shared exceptions +from airflow._shared.exceptions import AirflowErrorCodeMixin + try: from airflow.sdk.exceptions import ( AirflowException, @@ -47,18 +50,102 @@ class AirflowException(Exception): # type: ignore[no-redef] """Base exception for Airflow errors.""" - class AirflowNotFoundException(AirflowException): # type: ignore[no-redef] + class AirflowNotFoundException(AirflowErrorCodeMixin, AirflowException): # type: ignore[no-redef] """Raise when a requested object is not found.""" - class AirflowTimetableInvalid(AirflowException): # type: ignore[no-redef] + user_facing_error_message = "Requested resource was not found" + + description = ( + "This error occurs when Airflow is unable to locate a requested object " + "during resolution. The object may not exist, may have been removed, or " + "may not be accessible through the configured sources (such as metadata " + "database, configuration, or external providers). All available lookup " + "mechanisms were checked but none returned a matching result." + ) + + first_steps = ( + "Verify that the requested identifier is correct and exists. " + "Check for typos or mismatched names. Ensure the resource is defined " + "in the expected location (e.g., Airflow metadata database, configuration, " + "or any configured external providers). If applicable, confirm that " + "external systems are reachable and properly configured. " + "Review recent changes that may have removed or renamed the resource." + ) + + documentation = "https://airflow.apache.org/docs/apache-airflow/stable/" + + class AirflowTimetableInvalid(AirflowErrorCodeMixin, AirflowException): # type: ignore[no-redef] """Raise when a DAG has an invalid timetable.""" - class TaskNotFound(AirflowException): # type: ignore[no-redef] + user_facing_error_message = "Invalid timetable configuration" + + description = ( + "This error occurs when Airflow is unable to use the provided timetable " + "definition for scheduling. The timetable may be malformed, incompatible " + "with the current context, or fail internal validation rules required for " + "generating valid schedules." + ) + + first_steps = ( + "Verify that the timetable definition is correct and follows the expected " + "interface or format. If using a custom timetable, ensure it implements all " + "required methods and returns valid schedule data. Check for recent changes " + "to Dag or Airflow version compatibility that may affect timetable behavior. " + "Review logs for specific validation errors that indicate which part of the " + "timetable is invalid." + ) + + documentation = ( + "https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/timetable.html" + ) + + class TaskNotFound(AirflowErrorCodeMixin, AirflowException): # type: ignore[no-redef] """Raise when a Task is not available in the system.""" + user_facing_error_message = "Requested task was not found" + + description = ( + "This error occurs when Airflow is unable to locate a task with the " + "specified identifier in the current context. The task may not exist, " + "may have been removed or renamed, or may not be part of the active Dag " + "or execution scope being evaluated." + ) + + first_steps = ( + "Verify that the task_id is correct and exists in the expected Dag. " + "Check whether the task has been renamed, removed, or conditionally " + "excluded. Ensure you are referencing the correct Dag version and that " + "the Dag has been successfully parsed and is active. " + "If using dynamic task mapping or generated tasks, confirm that the " + "task instance was created as expected during Dag run expansion." + ) + + documentation = "https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html" + class NodeNotFound(TaskNotFound, KeyError): # type: ignore[no-redef] """Raise when attempting to access an invalid node (task or task group) using [] notation.""" + user_facing_error_message = "Requested Node was not found" + + description = ( + "This error occurs when Airflow attempts to evaluate, render, or traverse " + "a specific structural component (Node) inside a Directed Acyclic Graph " + "(Dag), but cannot locate it. It typically triggers within visual graph layouts, " + "TaskGroup rendering, or internal serialization code when an element id " + "fails to map to the established topology." + ) + + first_steps = ( + "Verify that the targeting identifier or key for the specific node is correctly " + "spelled. Check the visual graph layout in the Airflow UI to confirm the node is active " + "and rendered. Ensure that any parent-child relationships or upstream and downstream " + "dependencies are correctly linked. Inspect the Dag serialization and parsing status to " + "confirm the structure matches the backend metadata. Confirm that the underlying workflow " + "definitions have not dynamically filtered or omitted the node." + ) + + documentation = "https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html" + def __str__(self) -> str: return str(self.args[0]) if self.args else "" @@ -132,14 +219,82 @@ class AirflowClusterPolicyError(AirflowException): class DagNotFound(AirflowNotFoundException): """Raise when a DAG is not available in the system.""" + user_facing_error_message = "Requested Dag was not found" + + description = ( + "This error occurs when Airflow is unable to locate a Dag with the " + "specified identifier within the active metadata database. It typically " + "happens when a pipeline is requested by an external trigger, API call, " + "or UI operation, but the scheduler or webserver has not yet registered " + "or has recently removed the pipeline configuration." + ) + + first_steps = ( + "Verify that the Dag ID is spelled correctly and exactly matches the " + "definition in your Python source file. Confirm that the Dag file resides " + "in the designated Dags directory or within an active Dag bundle. Check the " + "Airflow UI main dashboard to see if the Dag is visible, active, or currently " + "paused. Inspect the Dag processing and serialization logs to ensure no parsing " + "exceptions are blocking the pipeline from registering. Ensure that the scheduler " + "or dedicated Dag file processor is running and actively scanning the directory " + "for code updates." + ) + + documentation = "https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html" + class DagCodeNotFound(AirflowNotFoundException): """Raise when a DAG code is not available in the system.""" + user_facing_error_message = "Requested Dag code was not found" + + description = ( + "This error occurs when Airflow tries to read, render, or display the underlying " + "Python source code for a Dag, but cannot find the matching record in the " + "dag_code database table. It typically happens when the Webserver or UI " + "requests the code view for a Dag ID that has been partially removed, renamed, " + "or hasn't been successfully synchronized by the Dag processor/scheduler." + ) + + first_steps = ( + "Check the Airflow UI dashboard to verify if the Dag ID is currently active " + "and parsed without errors. Confirm whether the underlying Python file still " + "exists in your Dags directory and hasn't been deleted or renamed. Verify that " + "the Dag processor or scheduler is running actively to ensure new and modified " + "code gets serialized into the database. Refresh the Dag structure using the UI " + "sync buttons or trigger a parsing interval to force the scheduler to regenerate " + "the missing code record. Inspect the database state to ensure that cleanup scripts " + "or metadata retention loops have not accidentally purged older Dag code versions." + ) + + documentation = "https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/dag-serialization.html" + class DagRunNotFound(AirflowNotFoundException): """Raise when a DAG Run is not available in the system.""" + user_facing_error_message = "Requested Dag Run was not found" + + description = ( + "This error occurs when Airflow attempts to access, execute, or evaluate " + "a specific execution instance (Dag Run) that does not exist in the metadata " + "database. It typically triggers when targeting a specific run_id or logical " + "date via the UI, API, or automated pipelines without an actively recorded " + "history for that instance." + ) + + first_steps = ( + "Check the Airflow UI to confirm if the targeted Dag exists, is active, and is unpaused. " + "Verify whether the specific execution tracking identifier (run_id or logical date) " + "accurately aligns with a historical entry saved in the database. Ensure an actual " + "pipeline run has been initialized (manually or via schedule) before attempting to " + "interact with individual task scopes. Inspect the central scheduler logs or execution " + "logs to determine if an external process, database cleanup script, or task runner " + "lifecycle failure prematurely deleted or omitted the run record." + ) + + documentation = "https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dag-run.html" + class DagRunAlreadyExists(AirflowBadRequest): """Raise when creating a DAG run for DAG which already has DAG run entry.""" @@ -174,6 +329,32 @@ class SerializationError(AirflowException): class TaskInstanceNotFound(AirflowNotFoundException): """Raise when a task instance is not available in the system.""" + user_facing_error_message = "Requested Task Instance was not found" + + description = ( + "This error occurs when Airflow searches for a specific execution slice " + "of a task, uniquely identified by its task_id, run_id (or execution_date), " + "and map_index but cannot locate the corresponding row in the task_instance " + "database table. It typically happens when querying logs, changing task " + "states, or attempting to clear an execution for a pipeline run that does " + "not contain that specific task record." + ) + + first_steps = ( + "Verify that the targeting task_id, run_id, and map_index values are completely accurate. " + "Confirm that the parent Dag run exists and is actively tracked in the metadata database. " + "Check if the task was recently added, renamed, or removed from the Python Dag file, causing " + "a discrepancy between the active code structure and old execution histories. " + "Ensure that database maintenance operations, aggressive metadata cleanup scripts, or manual " + "database purges did not clear out historical task state tables prematurely. " + "Inspect the central scheduler or webserver logs to determine if serialization lag or " + "communication errors between distributed executors temporarily broke instance tracking." + ) + + documentation = ( + "https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#task-instances" + ) + class NotMapped(Exception): """Raise if a task is neither mapped nor has any parent mapped groups.""" @@ -182,6 +363,28 @@ class NotMapped(Exception): class PoolNotFound(AirflowNotFoundException): """Raise when a Pool is not available in the system.""" + user_facing_error_message = "Requested Pool was not found" + + description = ( + "This error occurs when a task is scheduled or triggered with a specific pool " + "assignment, but that pool does not exist in the Airflow metadata database. It " + "typically happens when a Dag references a custom slot limitation pool that has " + "not yet been created via the UI, CLI, or API, or has been accidentally deleted." + ) + + first_steps = ( + "Navigate to Admin -> Pools in the Airflow UI to check the list of available slots. " + "Verify that the pool name assigned in your task definition exactly matches an existing " + "record. Ensure that any custom initialization scripts or deployment pipelines create " + "the pool before execution. Check if the task can temporarily fall back to the " + "built-in 'default_pool' while resolving the issue. Inspect the database migration and " + "setup logs to ensure metadata tables are fully initialized." + ) + + documentation = ( + "https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/pools.html" + ) + class FileSyntaxError(NamedTuple): """Information about a single error in a file.""" diff --git a/airflow-core/tests/unit/models/test_pool.py b/airflow-core/tests/unit/models/test_pool.py index 2c7db9d9800e4..2ad53fc0fb988 100644 --- a/airflow-core/tests/unit/models/test_pool.py +++ b/airflow-core/tests/unit/models/test_pool.py @@ -313,7 +313,7 @@ def test_delete_pool(self, session): assert session.scalar(select(func.count()).select_from(Pool)) == self.TOTAL_POOL_COUNT - 1 def test_delete_pool_non_existing(self): - with pytest.raises(PoolNotFound, match="^Pool 'test' doesn't exist$"): + with pytest.raises(PoolNotFound, match=r"^\[AERR-POOL-NOT-FOUND\] Pool 'test' doesn't exist$"): Pool.delete_pool(name="test") def test_delete_default_pool_not_allowed(self): diff --git a/airflow-core/tests/unit/timetables/test_interval_timetable.py b/airflow-core/tests/unit/timetables/test_interval_timetable.py index d8ad62131a8a9..85d97743e18c6 100644 --- a/airflow-core/tests/unit/timetables/test_interval_timetable.py +++ b/airflow-core/tests/unit/timetables/test_interval_timetable.py @@ -174,28 +174,28 @@ def test_validate_success(timetable: Timetable) -> None: [ pytest.param( CronDataIntervalTimetable("0 0 1 13 0", utc), - "[0 0 1 13 0] is not acceptable, out of range", + "[AERR-TIMETABLE-INVALID] [0 0 1 13 0] is not acceptable, out of range", id="invalid-cron", ), pytest.param( DeltaDataIntervalTimetable(datetime.timedelta()), - "schedule interval must be positive, not datetime.timedelta(0)", + "[AERR-TIMETABLE-INVALID] schedule interval must be positive, not datetime.timedelta(0)", id="zero-timedelta", ), pytest.param( DeltaDataIntervalTimetable(dateutil.relativedelta.relativedelta()), - "schedule interval must be positive, not relativedelta()", + "[AERR-TIMETABLE-INVALID] schedule interval must be positive, not relativedelta()", id="zero-relativedelta", ), pytest.param( DeltaDataIntervalTimetable(datetime.timedelta(days=-1)), # Dynamically formatted since different Python versions display timedelta differently. - f"schedule interval must be positive, not {datetime.timedelta(days=-1)!r}", + f"[AERR-TIMETABLE-INVALID] schedule interval must be positive, not {datetime.timedelta(days=-1)!r}", id="negative-timedelta", ), pytest.param( DeltaDataIntervalTimetable(dateutil.relativedelta.relativedelta(days=-1)), - "schedule interval must be positive, not relativedelta(days=-1)", + "[AERR-TIMETABLE-INVALID] schedule interval must be positive, not relativedelta(days=-1)", id="negative-relativedelta", ), ], diff --git a/airflow-core/tests/unit/timetables/test_trigger_timetable.py b/airflow-core/tests/unit/timetables/test_trigger_timetable.py index 114080685ca5a..d5e5da610ff5f 100644 --- a/airflow-core/tests/unit/timetables/test_trigger_timetable.py +++ b/airflow-core/tests/unit/timetables/test_trigger_timetable.py @@ -365,17 +365,17 @@ def test_validate_success(timetable: Timetable) -> None: [ pytest.param( CronTriggerTimetable("0 0 1 13 0", timezone=utc), - "[0 0 1 13 0] is not acceptable, out of range", + "[AERR-TIMETABLE-INVALID] [0 0 1 13 0] is not acceptable, out of range", id="cron", ), pytest.param( DeltaTriggerTimetable(datetime.timedelta(days=-1)), - "schedule interval must be positive, not datetime.timedelta(days=-1)", + "[AERR-TIMETABLE-INVALID] schedule interval must be positive, not datetime.timedelta(days=-1)", id="timedelta", ), pytest.param( DeltaTriggerTimetable(dateutil.relativedelta.relativedelta(days=-1)), - "schedule interval must be positive, not relativedelta(days=-1)", + "[AERR-TIMETABLE-INVALID] schedule interval must be positive, not relativedelta(days=-1)", id="relativedelta", ), ], diff --git a/dev/breeze/tests/test_selective_checks.py b/dev/breeze/tests/test_selective_checks.py index d8cbd2737a9ba..74ab82cb2f1dc 100644 --- a/dev/breeze/tests/test_selective_checks.py +++ b/dev/breeze/tests/test_selective_checks.py @@ -104,7 +104,7 @@ "mypy-airflow-core,mypy-airflow-ctl,mypy-airflow-ctl-tests,mypy-airflow-e2e-tests," "mypy-dev,mypy-devel-common,mypy-docker-tests,mypy-helm-tests,mypy-kubernetes-tests," "mypy-scripts," - "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-listeners,mypy-shared-logging," + "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-exceptions,mypy-shared-listeners,mypy-shared-logging," "mypy-shared-module_loading,mypy-shared-observability,mypy-shared-plugins_manager," "mypy-shared-providers_discovery,mypy-shared-secrets_backend,mypy-shared-secrets_masker," "mypy-shared-serialization,mypy-shared-state,mypy-shared-template_rendering,mypy-shared-timezones,mypy-task-sdk,mypy-task-sdk-integration-tests," @@ -117,7 +117,7 @@ "identity,mypy-airflow-core,mypy-airflow-ctl,mypy-airflow-ctl-tests,mypy-airflow-e2e-tests," "mypy-dev,mypy-devel-common,mypy-docker-tests,mypy-helm-tests,mypy-kubernetes-tests," "mypy-scripts," - "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-listeners,mypy-shared-logging," + "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-exceptions,mypy-shared-listeners,mypy-shared-logging," "mypy-shared-module_loading,mypy-shared-observability,mypy-shared-plugins_manager," "mypy-shared-providers_discovery,mypy-shared-secrets_backend,mypy-shared-secrets_masker," "mypy-shared-serialization,mypy-shared-state,mypy-shared-template_rendering,mypy-shared-timezones,mypy-task-sdk,mypy-task-sdk-integration-tests," @@ -128,7 +128,7 @@ "mypy-airflow-core,mypy-airflow-ctl,mypy-airflow-ctl-tests,mypy-airflow-e2e-tests," "mypy-dev,mypy-devel-common,mypy-docker-tests,mypy-helm-tests,mypy-kubernetes-tests," "mypy-scripts," - "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-listeners,mypy-shared-logging," + "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-exceptions,mypy-shared-listeners,mypy-shared-logging," "mypy-shared-module_loading,mypy-shared-observability,mypy-shared-plugins_manager," "mypy-shared-providers_discovery,mypy-shared-secrets_backend,mypy-shared-secrets_masker," "mypy-shared-serialization,mypy-shared-state,mypy-shared-template_rendering,mypy-shared-timezones,mypy-task-sdk,mypy-task-sdk-integration-tests,update-uv-lock" @@ -139,7 +139,7 @@ "mypy-airflow-core,mypy-airflow-ctl,mypy-airflow-ctl-tests,mypy-airflow-e2e-tests," "mypy-dev,mypy-devel-common,mypy-docker-tests,mypy-helm-tests,mypy-kubernetes-tests," "mypy-scripts," - "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-listeners,mypy-shared-logging," + "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-exceptions,mypy-shared-listeners,mypy-shared-logging," "mypy-shared-module_loading,mypy-shared-observability,mypy-shared-plugins_manager," "mypy-shared-providers_discovery,mypy-shared-secrets_backend,mypy-shared-secrets_masker," "mypy-shared-serialization,mypy-shared-state,mypy-shared-template_rendering,mypy-shared-timezones,mypy-task-sdk,mypy-task-sdk-integration-tests," @@ -151,7 +151,7 @@ "mypy-airflow-core,mypy-airflow-ctl,mypy-airflow-ctl-tests,mypy-airflow-e2e-tests," "mypy-dev,mypy-devel-common,mypy-docker-tests,mypy-helm-tests,mypy-kubernetes-tests," "mypy-scripts," - "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-listeners,mypy-shared-logging," + "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-exceptions,mypy-shared-listeners,mypy-shared-logging," "mypy-shared-module_loading,mypy-shared-observability,mypy-shared-plugins_manager," "mypy-shared-providers_discovery,mypy-shared-secrets_backend,mypy-shared-secrets_masker," "mypy-shared-serialization,mypy-shared-state,mypy-shared-template_rendering,mypy-shared-timezones,mypy-task-sdk,mypy-task-sdk-integration-tests," @@ -163,7 +163,7 @@ "mypy-airflow-core,mypy-airflow-ctl,mypy-airflow-ctl-tests,mypy-airflow-e2e-tests," "mypy-dev,mypy-devel-common,mypy-docker-tests,mypy-helm-tests,mypy-kubernetes-tests," "mypy-scripts," - "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-listeners,mypy-shared-logging," + "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-exceptions,mypy-shared-listeners,mypy-shared-logging," "mypy-shared-module_loading,mypy-shared-observability,mypy-shared-plugins_manager," "mypy-shared-providers_discovery,mypy-shared-secrets_backend,mypy-shared-secrets_masker," "mypy-shared-serialization,mypy-shared-state,mypy-shared-template_rendering,mypy-shared-timezones,mypy-task-sdk,mypy-task-sdk-integration-tests," @@ -176,7 +176,7 @@ "mypy-airflow-core,mypy-airflow-ctl,mypy-airflow-ctl-tests,mypy-airflow-e2e-tests," "mypy-dev,mypy-devel-common,mypy-docker-tests,mypy-helm-tests,mypy-kubernetes-tests," "mypy-scripts," - "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-listeners,mypy-shared-logging," + "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-exceptions,mypy-shared-listeners,mypy-shared-logging," "mypy-shared-module_loading,mypy-shared-observability,mypy-shared-plugins_manager," "mypy-shared-providers_discovery,mypy-shared-secrets_backend,mypy-shared-secrets_masker," "mypy-shared-serialization,mypy-shared-state,mypy-shared-template_rendering,mypy-shared-timezones,mypy-task-sdk,mypy-task-sdk-integration-tests," @@ -188,7 +188,7 @@ "mypy-airflow-core,mypy-airflow-ctl,mypy-airflow-ctl-tests,mypy-airflow-e2e-tests," "mypy-dev,mypy-devel-common,mypy-docker-tests,mypy-helm-tests,mypy-kubernetes-tests," "mypy-scripts," - "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-listeners,mypy-shared-logging," + "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-exceptions,mypy-shared-listeners,mypy-shared-logging," "mypy-shared-module_loading,mypy-shared-observability,mypy-shared-plugins_manager," "mypy-shared-providers_discovery,mypy-shared-secrets_backend,mypy-shared-secrets_masker," "mypy-shared-serialization,mypy-shared-state,mypy-shared-template_rendering,mypy-shared-timezones,mypy-task-sdk,mypy-task-sdk-integration-tests,update-uv-lock" @@ -199,7 +199,7 @@ "mypy-airflow-core,mypy-airflow-ctl,mypy-airflow-ctl-tests,mypy-airflow-e2e-tests," "mypy-dev,mypy-devel-common,mypy-docker-tests,mypy-helm-tests,mypy-kubernetes-tests," "mypy-scripts," - "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-listeners,mypy-shared-logging," + "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-exceptions,mypy-shared-listeners,mypy-shared-logging," "mypy-shared-module_loading,mypy-shared-observability,mypy-shared-plugins_manager," "mypy-shared-providers_discovery,mypy-shared-secrets_backend,mypy-shared-secrets_masker," "mypy-shared-serialization,mypy-shared-state,mypy-shared-template_rendering,mypy-shared-timezones,mypy-task-sdk,mypy-task-sdk-integration-tests," @@ -402,7 +402,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "mypy-airflow-ctl,mypy-airflow-ctl-tests,mypy-airflow-e2e-tests," "mypy-dev,mypy-devel-common,mypy-docker-tests,mypy-helm-tests,mypy-kubernetes-tests," "mypy-scripts," - "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-listeners,mypy-shared-logging," + "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-exceptions,mypy-shared-listeners,mypy-shared-logging," "mypy-shared-module_loading,mypy-shared-observability,mypy-shared-plugins_manager," "mypy-shared-providers_discovery,mypy-shared-secrets_backend,mypy-shared-secrets_masker," "mypy-shared-serialization,mypy-shared-state,mypy-shared-template_rendering,mypy-shared-timezones,mypy-task-sdk,mypy-task-sdk-integration-tests," @@ -675,7 +675,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "mypy-airflow-core,mypy-airflow-ctl,mypy-airflow-ctl-tests,mypy-airflow-e2e-tests," "mypy-dev,mypy-devel-common,mypy-docker-tests,mypy-helm-tests,mypy-kubernetes-tests," "mypy-scripts," - "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-listeners,mypy-shared-logging," + "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-exceptions,mypy-shared-listeners,mypy-shared-logging," "mypy-shared-module_loading,mypy-shared-observability,mypy-shared-plugins_manager," "mypy-shared-providers_discovery,mypy-shared-secrets_backend,mypy-shared-secrets_masker," "mypy-shared-serialization,mypy-shared-state,mypy-shared-template_rendering,mypy-shared-timezones,mypy-task-sdk-integration-tests," @@ -713,7 +713,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "mypy-airflow-core,mypy-airflow-ctl,mypy-airflow-ctl-tests,mypy-airflow-e2e-tests," "mypy-dev,mypy-devel-common,mypy-docker-tests,mypy-helm-tests,mypy-kubernetes-tests," "mypy-scripts," - "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-listeners,mypy-shared-logging," + "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-exceptions,mypy-shared-listeners,mypy-shared-logging," "mypy-shared-module_loading,mypy-shared-observability,mypy-shared-plugins_manager," "mypy-shared-providers_discovery,mypy-shared-secrets_backend,mypy-shared-secrets_masker," "mypy-shared-serialization,mypy-shared-state,mypy-shared-template_rendering,mypy-shared-timezones,mypy-task-sdk," @@ -750,7 +750,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "mypy-airflow-core,mypy-airflow-ctl-tests,mypy-airflow-e2e-tests," "mypy-dev,mypy-devel-common,mypy-docker-tests,mypy-helm-tests,mypy-kubernetes-tests," "mypy-scripts," - "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-listeners,mypy-shared-logging," + "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-exceptions,mypy-shared-listeners,mypy-shared-logging," "mypy-shared-module_loading,mypy-shared-observability,mypy-shared-plugins_manager," "mypy-shared-providers_discovery,mypy-shared-secrets_backend,mypy-shared-secrets_masker," "mypy-shared-serialization,mypy-shared-state,mypy-shared-template_rendering,mypy-shared-timezones,mypy-task-sdk,mypy-task-sdk-integration-tests," @@ -786,7 +786,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "mypy-airflow-core,mypy-airflow-ctl,mypy-airflow-e2e-tests," "mypy-dev,mypy-devel-common,mypy-docker-tests,mypy-helm-tests,mypy-kubernetes-tests," "mypy-scripts," - "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-listeners,mypy-shared-logging," + "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-exceptions,mypy-shared-listeners,mypy-shared-logging," "mypy-shared-module_loading,mypy-shared-observability,mypy-shared-plugins_manager," "mypy-shared-providers_discovery,mypy-shared-secrets_backend,mypy-shared-secrets_masker," "mypy-shared-serialization,mypy-shared-state,mypy-shared-template_rendering,mypy-shared-timezones,mypy-task-sdk,mypy-task-sdk-integration-tests," @@ -1127,7 +1127,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "mypy-airflow-ctl,mypy-airflow-ctl-tests,mypy-airflow-e2e-tests," "mypy-dev,mypy-devel-common,mypy-docker-tests,mypy-helm-tests,mypy-kubernetes-tests," "mypy-scripts," - "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-listeners,mypy-shared-logging," + "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-exceptions,mypy-shared-listeners,mypy-shared-logging," "mypy-shared-module_loading,mypy-shared-observability,mypy-shared-plugins_manager," "mypy-shared-providers_discovery,mypy-shared-secrets_backend,mypy-shared-secrets_masker," "mypy-shared-serialization,mypy-shared-state,mypy-shared-template_rendering,mypy-shared-timezones,mypy-task-sdk,mypy-task-sdk-integration-tests," @@ -1291,7 +1291,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "mypy-airflow-core,mypy-airflow-ctl,mypy-airflow-ctl-tests,mypy-airflow-e2e-tests," "mypy-dev,mypy-devel-common,mypy-docker-tests,mypy-helm-tests,mypy-kubernetes-tests," "mypy-scripts," - "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-listeners,mypy-shared-logging," + "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-exceptions,mypy-shared-listeners,mypy-shared-logging," "mypy-shared-module_loading,mypy-shared-observability,mypy-shared-plugins_manager," "mypy-shared-providers_discovery,mypy-shared-secrets_backend,mypy-shared-secrets_masker," "mypy-shared-serialization,mypy-shared-state,mypy-shared-template_rendering,mypy-shared-timezones,mypy-task-sdk,mypy-task-sdk-integration-tests," @@ -1450,7 +1450,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "mypy-airflow-core,mypy-airflow-ctl,mypy-airflow-ctl-tests," "mypy-airflow-e2e-tests,mypy-dev,mypy-devel-common,mypy-docker-tests," "mypy-helm-tests,mypy-kubernetes-tests,mypy-scripts," - "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-listeners," + "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-exceptions,mypy-shared-listeners," "mypy-shared-module_loading,mypy-shared-observability," "mypy-shared-plugins_manager,mypy-shared-providers_discovery," "mypy-shared-secrets_backend,mypy-shared-secrets_masker," @@ -2188,7 +2188,7 @@ def test_expected_output_push( "mypy-airflow-ctl,mypy-airflow-ctl-tests,mypy-airflow-e2e-tests," "mypy-dev,mypy-devel-common,mypy-docker-tests,mypy-helm-tests,mypy-kubernetes-tests," "mypy-scripts," - "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-listeners,mypy-shared-logging," + "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-exceptions,mypy-shared-listeners,mypy-shared-logging," "mypy-shared-module_loading,mypy-shared-observability,mypy-shared-plugins_manager," "mypy-shared-providers_discovery,mypy-shared-secrets_backend,mypy-shared-secrets_masker," "mypy-shared-serialization,mypy-shared-state,mypy-shared-template_rendering,mypy-shared-timezones,mypy-task-sdk,mypy-task-sdk-integration-tests," @@ -2229,7 +2229,7 @@ def test_expected_output_push( "identity,mypy-airflow-ctl,mypy-airflow-ctl-tests,mypy-airflow-e2e-tests," "mypy-dev,mypy-devel-common,mypy-docker-tests,mypy-helm-tests,mypy-kubernetes-tests," "mypy-scripts," - "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-listeners,mypy-shared-logging," + "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-exceptions,mypy-shared-listeners,mypy-shared-logging," "mypy-shared-module_loading,mypy-shared-observability,mypy-shared-plugins_manager," "mypy-shared-providers_discovery,mypy-shared-secrets_backend,mypy-shared-secrets_masker," "mypy-shared-serialization,mypy-shared-state,mypy-shared-template_rendering,mypy-shared-timezones,mypy-task-sdk,mypy-task-sdk-integration-tests," @@ -2275,7 +2275,7 @@ def test_expected_output_push( "mypy-airflow-ctl,mypy-airflow-ctl-tests,mypy-airflow-e2e-tests," "mypy-dev,mypy-devel-common,mypy-docker-tests,mypy-helm-tests,mypy-kubernetes-tests," "mypy-scripts," - "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-listeners,mypy-shared-logging," + "mypy-shared-configuration,mypy-shared-dagnode,mypy-shared-exceptions,mypy-shared-listeners,mypy-shared-logging," "mypy-shared-module_loading,mypy-shared-observability,mypy-shared-plugins_manager," "mypy-shared-providers_discovery,mypy-shared-secrets_backend,mypy-shared-secrets_masker," "mypy-shared-serialization,mypy-shared-state,mypy-shared-template_rendering,mypy-shared-timezones,mypy-task-sdk,mypy-task-sdk-integration-tests," diff --git a/devel-common/src/docs/utils/conf_constants.py b/devel-common/src/docs/utils/conf_constants.py index dd2e0c31b67f2..ffc5ef51039ba 100644 --- a/devel-common/src/docs/utils/conf_constants.py +++ b/devel-common/src/docs/utils/conf_constants.py @@ -86,6 +86,7 @@ def get_rst_epilogue(package_version: str, airflow_core: bool) -> str: "removemarktransform", "sphinx_copybutton", "airflow_intersphinx", + "error_guide_generator", "sphinxcontrib.spelling", "sphinx_airflow_theme", "redirects", diff --git a/devel-common/src/sphinx_exts/error_guide_generator.py b/devel-common/src/sphinx_exts/error_guide_generator.py new file mode 100644 index 0000000000000..e0fc6fa0a1da2 --- /dev/null +++ b/devel-common/src/sphinx_exts/error_guide_generator.py @@ -0,0 +1,168 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import os +from pathlib import Path + +from sphinx.util import logging + +from airflow_shared.exceptions.common import get_error_meta_list + +AIRFLOW_ROOT_PATH = Path(os.path.abspath(__file__)).parents[3] +GENERATED_PATH = AIRFLOW_ROOT_PATH / "airflow-core" / "docs" / "error-codes" +AIRFLOW_CORE_SRC_PATH = AIRFLOW_ROOT_PATH / "airflow-core" / "src" +TASK_SDK_SRC_PATH = AIRFLOW_ROOT_PATH / "task-sdk" / "src" +PROVIDERS_SRC_PATH = AIRFLOW_ROOT_PATH / "providers" +EXCEPTION_CLASS_PATHS = [AIRFLOW_CORE_SRC_PATH, TASK_SDK_SRC_PATH, PROVIDERS_SRC_PATH] + + +def get_license_header(): + return """ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + """ + + +class ErrorGuideIndexHandler: + """File handler class for error guide index file.""" + + def __init__(self, file_path): + self.file_path = file_path + self.lines = [ + get_license_header(), + "", + "Error Codes Guide", + "=================", + "", + "Quick debug guide for common problems when using Airflow, with error code as a starting point. " + "In your Airflow logs, look for the error code in beginning of exception traceback. " + "e.g: ``[AERR-TIMETABLE-INVALID] schedule interval must be positive...``", + "", + "List of documented error codes", + "------------------------------", + "", + ] + self._toc_tree_lines = [ + ".. toctree::", + " :maxdepth: 1", + "", + ] + self.error_codes = [] + + def add_error_code(self, error_code): + self.error_codes.append(error_code) + + def commit(self): + if self.error_codes: + self.lines += self._toc_tree_lines + [f" {error_code}" for error_code in self.error_codes] + else: + self.lines.append( + "There are no error codes listed as of now. If you'd like a new error code listed, " + "please raise an issue on `GitHub `_." + ) + self.lines.append("") + self.file_path.write_text( + "\n".join(self.lines), + encoding="utf-8", + ) + + +class ErrorGuideDocHandler: + """File handler class for error code rst file.""" + + def __init__(self, file_path): + self.file_path = file_path + self.lines = [] + + def prepare(self, error_code_info: dict[str, str]): + code = error_code_info["error_code"] + title = f"{code}: {error_code_info['user_facing_error_message']}" + underline = "=" * len(title) + self.lines = [ + get_license_header(), + "", + title, + underline, + "", + f"**Exception:** ``{error_code_info['exception_type']}``", + "", + "Description", + "-----------", + error_code_info["description"], + "", + "First Steps", + "-----------", + error_code_info["first_steps"], + "", + "Documentation to refer", + "-----------------------", + error_code_info["documentation"], + "", + ] + + def commit(self): + self.file_path.write_text( + "\n".join(self.lines) + "\n", + encoding="utf-8", + ) + + +def generate_error_docs(app): + """Generates .rst files for each error code in the YAML mapping.""" + + GENERATED_PATH.mkdir(parents=True, exist_ok=True) + index_handler = ErrorGuideIndexHandler(GENERATED_PATH / "index.rst") + logger = logging.getLogger(__name__) + + error_list: list[dict[str, str]] = get_error_meta_list(EXCEPTION_CLASS_PATHS) + if not error_list: + logger.error("Error mapping is empty.") + index_handler.commit() + return + + for entry in error_list: + code = entry["error_code"] + + # add error code to index (toctree) page + index_handler.add_error_code(code) + + # prep and commit error page doc to disk + error_doc = ErrorGuideDocHandler(GENERATED_PATH / f"{code}.rst") + error_doc.prepare(entry) + error_doc.commit() + + index_handler.commit() + + +def setup(app): + app.connect("builder-inited", generate_error_docs) diff --git a/providers/telegram/tests/unit/telegram/hooks/test_telegram.py b/providers/telegram/tests/unit/telegram/hooks/test_telegram.py index 38b1f1841ea3c..87fb22808a431 100644 --- a/providers/telegram/tests/unit/telegram/hooks/test_telegram.py +++ b/providers/telegram/tests/unit/telegram/hooks/test_telegram.py @@ -75,7 +75,7 @@ def test_should_raise_exception_if_conn_id_doesnt_exist(self, sdk_connection_not with pytest.raises(airflow.exceptions.AirflowNotFoundException) as ctx: TelegramHook(telegram_conn_id="telegram-webhook-non-existent") - assert str(ctx.value) == "The conn_id `telegram-webhook-non-existent` isn't defined" + assert str(ctx.value) == "[AERR-NOT-FOUND] The conn_id `telegram-webhook-non-existent` isn't defined" def test_should_raise_exception_if_conn_id_doesnt_contain_token(self): with pytest.raises(airflow.exceptions.AirflowException) as ctx: diff --git a/pyproject.toml b/pyproject.toml index 8d2c55b9e9774..35a88e9fdc10d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1333,6 +1333,7 @@ dev = [ "apache-airflow-ctl-tests", "apache-airflow-shared-configuration", "apache-airflow-shared-dagnode", + "apache-airflow-shared-exceptions", "apache-airflow-shared-listeners", "apache-airflow-shared-logging", "apache-airflow-shared-module-loading", @@ -1500,6 +1501,7 @@ apache-airflow-providers-zendesk = false apache-airflow-scripts = false apache-airflow-shared-configuration = false apache-airflow-shared-dagnode = false +apache-airflow-shared-exceptions = false apache-airflow-shared-listeners = false apache-airflow-shared-logging = false apache-airflow-shared-module-loading = false @@ -1651,6 +1653,7 @@ apache-airflow-providers-zendesk = false apache-airflow-scripts = false apache-airflow-shared-configuration = false apache-airflow-shared-dagnode = false +apache-airflow-shared-exceptions = false apache-airflow-shared-listeners = false apache-airflow-shared-logging = false apache-airflow-shared-module-loading = false @@ -1696,6 +1699,7 @@ apache-airflow-providers = { workspace = true } apache-aurflow-docker-stack = { workspace = true } apache-airflow-shared-configuration = { workspace = true } apache-airflow-shared-dagnode = { workspace = true } +apache-airflow-shared-exceptions = { workspace = true } apache-airflow-shared-listeners = { workspace = true } apache-airflow-shared-logging = { workspace = true } apache-airflow-shared-module-loading = { workspace = true } @@ -1833,6 +1837,7 @@ members = [ "docker-stack-docs", "shared/configuration", "shared/dagnode", + "shared/exceptions", "shared/listeners", "shared/logging", "shared/module_loading", diff --git a/scripts/ci/prek/check_airflow_exceptions_error_meta.py b/scripts/ci/prek/check_airflow_exceptions_error_meta.py new file mode 100755 index 0000000000000..37b186f7978f5 --- /dev/null +++ b/scripts/ci/prek/check_airflow_exceptions_error_meta.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import ast +import sys +from pathlib import Path + + +def class_extends_error_mixin( + class_def: ast.ClassDef, + class_registry: dict[str, list[tuple[str, ast.ClassDef]]], + current_package: str, + visited: set[str] | None = None, +) -> bool: + """Return True if class def found to extend AirflowErrorCodeMixin, False otherwise.""" + if visited is None: + visited = set() + + if class_def.name in visited: + return False + visited.add(class_def.name) + + for base in class_def.bases: + node = base.value if isinstance(base, ast.Subscript) else base + if isinstance(node, ast.Name) and node.id == "AirflowErrorCodeMixin": + return True + + for base in class_def.bases: + node = base.value if isinstance(base, ast.Subscript) else base + if isinstance(node, ast.Name): + parent_class_name = node.id + parent_candidates: list[tuple[str, ast.ClassDef]] = class_registry.get(parent_class_name, []) + parent_def = None + for package, candidate in parent_candidates: + if package == current_package: + parent_def = candidate + break + if parent_def: + if class_extends_error_mixin( + class_def=parent_def, + class_registry=class_registry, + current_package=current_package, + visited=visited, + ): + return True + + return False + + +def extract_static_properties(class_def: ast.ClassDef, required_props: list[str]) -> dict: + """Extract class level static properties.""" + properties = {} + for stmt in class_def.body: + if not isinstance(stmt, ast.Assign): + continue + for target in stmt.targets: + if isinstance(target, ast.Name) and target.id in required_props: + try: + properties[target.id] = ast.literal_eval(stmt.value) + except Exception: + properties[target.id] = "" + return properties + + +def check_raise_statement( + node: ast.Raise, + filename: str, + line_no: int, + class_registry: dict[str, list[tuple[str, ast.ClassDef]]], + current_package: str, +) -> list[str]: + """Gather raise statements and check respective exception classes for missing properties.""" + errors: list[str] = [] + + if node.exc is None or not isinstance(node.exc, ast.Call): + return errors + + func_name = getattr(node.exc.func, "id", getattr(node.exc.func, "attr", None)) + if not func_name: + return errors + + matching_classes: list[tuple[str, ast.ClassDef]] = class_registry.get(func_name, []) + class_def: ast.ClassDef | None = None + for package, package_class_def in matching_classes: + if package == current_package: + class_def = package_class_def + break + + if class_def and class_extends_error_mixin(class_def, class_registry, current_package): + required_props = ["user_facing_error_message", "first_steps", "description", "documentation"] + properties = extract_static_properties(class_def, required_props) + + for prop in required_props: + if not properties.get(prop): + errors.append( + f"{filename}:{line_no} [{current_package}] " + f"{prop} is missing in exception class {func_name}." + ) + + return errors + + +def get_package_name(filepath: Path) -> str: + """Identify if a file belongs to tasksdk or airflow-core.""" + parts = filepath.parts + if "task-sdk" in parts or "sdk" in parts: + return "task-sdk" + if "airflow-core" in parts or "core" in parts: + return "airflow-core" + if "providers" in parts: + return "providers" + raise ValueError("Unsupported package name") + + +def validate_files(paths: list[Path]) -> list[str]: + """This acts as the entrypoint function for this static check.""" + errors = [] + + class_registry: dict[str, list[tuple[str, ast.ClassDef]]] = {} + raise_statements: list[tuple[ast.Raise, str, str]] = [] + + for filepath in paths: + if not filepath.suffix == ".py": + continue + try: + tree = ast.parse(filepath.read_text(encoding="utf-8"), filename=str(filepath)) + package_name: str = get_package_name(filepath) + + for node in ast.walk(tree): + if isinstance(node, ast.ClassDef): + if node.name not in class_registry: + class_registry[node.name] = [] + class_registry[node.name].append((package_name, node)) + + elif isinstance(node, ast.Raise): + raise_statements.append((node, str(filepath), package_name)) + + except Exception as e: + errors.append(f"{filepath}: Error parsing file: {e}") + + for raise_node, filename, current_package in raise_statements: + errors.extend( + check_raise_statement( + node=raise_node, + filename=filename, + line_no=raise_node.lineno, + class_registry=class_registry, + current_package=current_package, + ) + ) + + return errors + + +if __name__ == "__main__": + if len(sys.argv) < 2: + print("Usage: check_airflow_error_codes.py [file2.py ...]", file=sys.stderr) + sys.exit(1) + + validation_errors = validate_files([Path(arg) for arg in sys.argv[1:]]) + if validation_errors: + for error in validation_errors: + print(error) + sys.exit(1) + + sys.exit(0) diff --git a/scripts/tests/ci/prek/test_check_airflow_exceptions_error_meta.py b/scripts/tests/ci/prek/test_check_airflow_exceptions_error_meta.py new file mode 100644 index 0000000000000..9744d2c6bcb80 --- /dev/null +++ b/scripts/tests/ci/prek/test_check_airflow_exceptions_error_meta.py @@ -0,0 +1,113 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +import pytest +from check_airflow_exceptions_error_meta import get_package_name, validate_files + + +class TestCheckAirflowExcErrorMeta: + def setup_method(self): + self.exc_without_err_meta = """ +from airflow._shared.exceptions import AirflowErrorCodeMixin + +class ExcTestWithoutErrMetaException(AirflowErrorCodeMixin, Exception): + pass + +raise ExcTestWithoutErrMetaException""" + + self.exc_with_err_meta = """ +from airflow._shared.exceptions import AirflowErrorCodeMixin + +class ExcTestWithErrMetaException(AirflowErrorCodeMixin, Exception): + description = "test description" + documentation = "https://airflow.apache.org/docs/apache-airflow/stable/" + first_steps = "test first steps" + user_facing_error_message = "test user facing message" + +raise ExcTestWithErrMetaException""" + + @pytest.mark.parametrize( + "package_name, expected_errors", + [ + ( + "airflow-core", + [ + "[airflow-core] description is missing in exception class ExcTestWithoutErrMetaException.", + "[airflow-core] documentation is missing in exception class ExcTestWithoutErrMetaException.", + "[airflow-core] first_steps is missing in exception class ExcTestWithoutErrMetaException.", + "[airflow-core] user_facing_error_message is missing in exception class ExcTestWithoutErrMetaException.", + ], + ), + ( + "task-sdk", + [ + "[task-sdk] description is missing in exception class ExcTestWithoutErrMetaException.", + "[task-sdk] documentation is missing in exception class ExcTestWithoutErrMetaException.", + "[task-sdk] first_steps is missing in exception class ExcTestWithoutErrMetaException.", + "[task-sdk] user_facing_error_message is missing in exception class ExcTestWithoutErrMetaException.", + ], + ), + ], + ) + @mock.patch("check_airflow_exceptions_error_meta.get_package_name") + def test_raise_with_exc_without_err_meta( + self, + mock_get_package_name, + tmp_path, + package_name, + expected_errors, + ): + mock_get_package_name.return_value = package_name + f = tmp_path / "fake_module.py" + f.write_text(f"{self.exc_without_err_meta}('this is a test')") + errors = validate_files(paths=[f]) + assert len(errors) == len(expected_errors) + actual_errors = list(sorted(errors)) + for idx in range(len(actual_errors)): + assert expected_errors[idx] in actual_errors[idx] + + @pytest.mark.parametrize("package_name, num_errors", [("airflow-core", 0), ("task-sdk", 0)]) + @mock.patch("check_airflow_exceptions_error_meta.get_package_name") + def test_raise_with_exc_with_err_meta(self, mock_get_package_name, tmp_path, package_name, num_errors): + mock_get_package_name.return_value = package_name + f = tmp_path / "fake_module.py" + f.write_text(f"{self.exc_with_err_meta}('this is a test')") + errors = validate_files(paths=[f]) + assert len(errors) == num_errors + + @pytest.mark.parametrize( + "path_suffix, exc_type, raise_match, expected_result", + [ + ("some-package", ValueError, "Unsupported package name", None), + ("core", None, "", "airflow-core"), + ("airflow-core", None, "", "airflow-core"), + ("sdk", None, "", "task-sdk"), + ("task-sdk", None, "", "task-sdk"), + ("providers", None, "", "providers"), + ], + ) + def test_get_package_name(self, tmp_path, path_suffix, exc_type, raise_match, expected_result): + package_path = tmp_path / path_suffix + if exc_type is None: + actual_result = get_package_name(package_path) + assert actual_result == expected_result + else: + with pytest.raises(exc_type, match=raise_match): + get_package_name(package_path) diff --git a/shared/exceptions/.gitignore b/shared/exceptions/.gitignore new file mode 100644 index 0000000000000..bff2d7629604d --- /dev/null +++ b/shared/exceptions/.gitignore @@ -0,0 +1 @@ +*.iml diff --git a/shared/exceptions/.pre-commit-config.yaml b/shared/exceptions/.pre-commit-config.yaml new file mode 100644 index 0000000000000..ed2ffb39109fd --- /dev/null +++ b/shared/exceptions/.pre-commit-config.yaml @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +--- +default_stages: [pre-commit, pre-push] +minimum_prek_version: '0.3.4' +default_language_version: + python: python3 +repos: + - repo: local + hooks: + - id: mypy-shared-exceptions + name: Run mypy for shared-exceptions + language: python + entry: ../../scripts/ci/prek/run_mypy_full_dist_local_venv_or_breeze_in_ci.py shared/exceptions + pass_filenames: false + files: ^.*\.py$ + require_serial: true diff --git a/shared/exceptions/pyproject.toml b/shared/exceptions/pyproject.toml new file mode 100644 index 0000000000000..e973368886410 --- /dev/null +++ b/shared/exceptions/pyproject.toml @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[project] +name = "apache-airflow-shared-exceptions" +description = "Shared exception code for Airflow exceptions" +version = "0.0" +classifiers = [ + "Private :: Do Not Upload", +] + +[dependency-groups] +dev = [ + "apache-airflow-devel-common" +] +mypy = [ + "apache-airflow-devel-common[mypy]", +] + + +[build-system] +requires = [ + "hatchling==1.29.0", + "packaging==26.2", + "pathspec==1.1.1", + "pluggy==1.6.0", + "tomli==2.4.1; python_version < '3.11'", + "trove-classifiers==2026.4.28.13", +] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/airflow_shared"] + +[tool.ruff] +extend = "../../pyproject.toml" +src = ["src"] + +[tool.ruff.lint.per-file-ignores] +# Ignore Doc rules et al for anything outside of tests +"!src/*" = ["D", "S101", "TRY002"] diff --git a/shared/exceptions/src/airflow_shared/exceptions/__init__.py b/shared/exceptions/src/airflow_shared/exceptions/__init__.py new file mode 100644 index 0000000000000..a5562845a7c84 --- /dev/null +++ b/shared/exceptions/src/airflow_shared/exceptions/__init__.py @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +__all__ = ["AirflowErrorCodeMixin"] + +from .common import AirflowErrorCodeMixin diff --git a/shared/exceptions/src/airflow_shared/exceptions/common.py b/shared/exceptions/src/airflow_shared/exceptions/common.py new file mode 100644 index 0000000000000..eecc443b3862f --- /dev/null +++ b/shared/exceptions/src/airflow_shared/exceptions/common.py @@ -0,0 +1,191 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import ast +import re +from pathlib import Path +from typing import Any + + +class AirflowErrorCodeMixin: + """Mixin class providing ability to pass error_code to an exception.""" + + user_facing_error_message: str | None = None + first_steps: str | None = None + description: str | None = None + documentation: str | None = None + + @staticmethod + def get_error_code(class_name): + class_name = re.sub(r"^(Airflow)|Exception$", "", class_name) + parts = re.findall(r"[A-Z]+(?=[A-Z][a-z]|$)|[A-Z][a-z0-9]*", class_name) + return f"AERR-{'-'.join(part.upper() for part in parts)}" + + @property + def error_code(self) -> str: + class_name = self.__class__.__name__ + return self.get_error_code(class_name) + + def __str__(self): + base = super().__str__() + error_code_str = f"[{self.error_code}]" + return f"{error_code_str} {base}" if base else error_code_str + + def __repr__(self): + args_repr = ", ".join(repr(a) for a in self.args) + return f"{type(self).__name__}({args_repr}, error_code={self.error_code!r})" + + +def _get_literal_value(node: ast.expr) -> Any: + try: + return ast.literal_eval(node) + except ValueError: + return f"" + + +def _is_exception_subclass(inheritance_dict: dict, class_name: str, visited: set) -> bool: + """Return True if class is subclass of an exception class, False otherwise.""" + if "Exception" in class_name or "Error" in class_name: + return True + if class_name in visited: + return False + + visited.add(class_name) + parents = inheritance_dict.get(class_name, []) + for parent in parents: + if _is_exception_subclass(inheritance_dict, parent, visited): + return True + return False + + +def _has_error_code_mixin( + inheritance_dict: dict, imported_names: dict, class_name: str, visited: set[str] | None = None +) -> bool: + """Return True if class inherits from AirflowErrorCodeMixin, False otherwise.""" + if visited is None: + visited = set() + + # check for current class name + if class_name == "AirflowErrorCodeMixin" or class_name.endswith(".AirflowErrorCodeMixin"): + return True + resolved_current = imported_names.get(class_name, class_name) + if resolved_current == "AirflowErrorCodeMixin": + return True + + if class_name in visited: + return False + visited.add(class_name) + + # recursively check for parent class names + parents = inheritance_dict.get(class_name, []) + for parent in parents: + if parent == "AirflowErrorCodeMixin" or parent.endswith(".AirflowErrorCodeMixin"): + return True + if imported_names.get(parent) == "AirflowErrorCodeMixin": + return True + resolved_parent = imported_names.get(parent, parent) + if _has_error_code_mixin(inheritance_dict, imported_names, resolved_parent, visited): + return True + + return False + + +def extract_error_metadata(base_path: Path, error_meta_props: list[str]) -> list[dict[str, Any]]: + """Fetch exception classes using AirflowErrorCodeMixin, extract error meta properties and return as a dict.""" + error_meta_list = [] + file_paths = base_path.rglob("*.py") + for file_path in file_paths: + try: + with open(file_path, encoding="utf-8") as f: + root = ast.parse(f.read(), filename=str(file_path)) + except (SyntaxError, UnicodeDecodeError): + continue + + imported_names = {} + inheritance_map = {} + class_nodes: list[ast.ClassDef] = [] + + for node in ast.walk(root): + # fill imported names + if isinstance(node, ast.Import): + for alias in node.names: + imported_names[alias.asname or alias.name] = alias.name + elif isinstance(node, ast.ImportFrom): + for alias in node.names: + imported_names[alias.asname or alias.name] = alias.name + + # fill inheritance map + if isinstance(node, ast.ClassDef): + class_nodes.append(node) + bases = [] + for base in node.bases: + if isinstance(base, ast.Name): + bases.append(base.id) + elif isinstance(base, ast.Attribute) and isinstance(base.value, ast.Name): + bases.append(f"{base.value.id}.{base.attr}") + inheritance_map[node.name] = bases + + # fill error_meta_list based on available props + for node in class_nodes: + if _is_exception_subclass(inheritance_map, node.name, set()) and _has_error_code_mixin( + inheritance_map, imported_names, node.name + ): + error_meta = {} + for body_node in node.body: + if isinstance(body_node, ast.Assign): + # multiple targets possible for ast.Assign + for target in body_node.targets: + if isinstance(target, ast.Name) and target.id in error_meta_props: + error_meta[target.id] = _get_literal_value(body_node.value) + elif isinstance(body_node, ast.AnnAssign): + # only single target for ast.AnnAssign, always + if isinstance(body_node.target, ast.Name) and body_node.target.id in error_meta_props: + if body_node.value: + error_meta[body_node.target.id] = _get_literal_value(body_node.value) + error_meta["exception_type"] = node.name + error_meta["error_code"] = AirflowErrorCodeMixin.get_error_code(node.name) + error_meta_list.append(error_meta) + + return error_meta_list + + +def get_error_meta_list(paths: list[Path]): + """Return unique list of error code metadata extracted from relevant exception classes.""" + error_meta_dict = {} + required_props = [ + "user_facing_error_message", + "first_steps", + "description", + "documentation", + "exception_type", + "error_code", + ] + for class_path in paths: + error_meta_list = extract_error_metadata( + class_path, + required_props, + ) + # gather items into error meta dict with error code as key for uniqueness + for item in error_meta_list: + missing_props = set(required_props) - set(item) + if missing_props: + raise ValueError(f"Missing error meta properties {', '.join(missing_props)} in {item}") + error_code = item["error_code"] + error_meta_dict[error_code] = item + return list(error_meta_dict.values()) diff --git a/shared/exceptions/tests/conftest.py b/shared/exceptions/tests/conftest.py new file mode 100644 index 0000000000000..93aecf261843a --- /dev/null +++ b/shared/exceptions/tests/conftest.py @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os + +os.environ["_AIRFLOW__AS_LIBRARY"] = "true" diff --git a/shared/exceptions/tests/exceptions/__init__.py b/shared/exceptions/tests/exceptions/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/shared/exceptions/tests/exceptions/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/shared/exceptions/tests/exceptions/test_error_mixin.py b/shared/exceptions/tests/exceptions/test_error_mixin.py new file mode 100644 index 0000000000000..e854262be4515 --- /dev/null +++ b/shared/exceptions/tests/exceptions/test_error_mixin.py @@ -0,0 +1,86 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from airflow_shared.exceptions import AirflowErrorCodeMixin + + +class SampleErrorCaseException(AirflowErrorCodeMixin, Exception): + """Sample class 1 using AirflowErrorCodeMixin for unit tests.""" + + pass + + +class SampleHTTPErrorCaseException(AirflowErrorCodeMixin, Exception): + """Sample class 2 using AirflowErrorCodeMixin for unit tests.""" + + pass + + +class AirflowSampleAPIErrorCaseException(AirflowErrorCodeMixin, Exception): + """Sample class 2 using AirflowErrorCodeMixin for unit tests.""" + + pass + + +class AirflowSomeParameterInvalid(AirflowErrorCodeMixin, Exception): + """Sample class 3 using AirflowErrorCodeMixin for unit tests.""" + + pass + + +class TestAirflowErrorCodeMixin: + """Tests class for AirflowErrorCodeMixin.""" + + def test_str_with_both_code_and_message(self): + """Check exception string when both message and error code are present in exception.""" + + exc = SampleErrorCaseException("Provided cron expression is invalid") + assert str(exc) == "[AERR-SAMPLE-ERROR-CASE] Provided cron expression is invalid" + + def test_str_without_message(self): + """Check exception string when no exception message is passed.""" + + exc = SampleErrorCaseException() + assert str(exc) == "[AERR-SAMPLE-ERROR-CASE]" + + def test_str_with_message(self): + """Check exception string with exception message passed.""" + + exc = SampleErrorCaseException("This is a test message") + assert str(exc) == "[AERR-SAMPLE-ERROR-CASE] This is a test message" + + def test_repr_output(self): + """Check if repr correctly includes error code.""" + + exc = SampleErrorCaseException("fail") + assert repr(exc), "MyCustomException('fail', error_code=AERR-SAMPLE-ERROR-CASE)" + + def test_str_with_other_excs(self): + """Check exception string when both message and error code are present in exception.""" + + exc = SampleHTTPErrorCaseException("Some http exception") + assert exc.error_code == "AERR-SAMPLE-HTTP-ERROR-CASE" + assert str(exc) == "[AERR-SAMPLE-HTTP-ERROR-CASE] Some http exception" + + exc = AirflowSampleAPIErrorCaseException("Some api exception") + assert exc.error_code == "AERR-SAMPLE-API-ERROR-CASE" + assert str(exc) == "[AERR-SAMPLE-API-ERROR-CASE] Some api exception" + + exc = AirflowSomeParameterInvalid("Some parameter is invalid") + assert exc.error_code == "AERR-SOME-PARAMETER-INVALID" + assert str(exc) == "[AERR-SOME-PARAMETER-INVALID] Some parameter is invalid" diff --git a/task-sdk/pyproject.toml b/task-sdk/pyproject.toml index 89a17fb52c3c9..9de0cd2cf4fe5 100644 --- a/task-sdk/pyproject.toml +++ b/task-sdk/pyproject.toml @@ -148,6 +148,7 @@ path = "src/airflow/sdk/__init__.py" "../shared/plugins_manager/src/airflow_shared/plugins_manager" = "src/airflow/sdk/_shared/plugins_manager" "../shared/providers_discovery/src/airflow_shared/providers_discovery" = "src/airflow/sdk/_shared/providers_discovery" "../shared/template_rendering/src/airflow_shared/template_rendering" = "src/airflow/sdk/_shared/template_rendering" +"../shared/exceptions/src/airflow_shared/exceptions" = "src/airflow/sdk/_shared/exceptions" [tool.hatch.build.targets.wheel] packages = ["src/airflow"] @@ -310,6 +311,7 @@ tmp_path_retention_policy = "failed" shared_distributions = [ "apache-airflow-shared-configuration", "apache-airflow-shared-dagnode", + "apache-airflow-shared-exceptions", "apache-airflow-shared-listeners", "apache-airflow-shared-logging", "apache-airflow-shared-module-loading", diff --git a/task-sdk/src/airflow/sdk/_shared/exceptions b/task-sdk/src/airflow/sdk/_shared/exceptions new file mode 120000 index 0000000000000..2f584a357b67b --- /dev/null +++ b/task-sdk/src/airflow/sdk/_shared/exceptions @@ -0,0 +1 @@ +../../../../../shared/exceptions/src/airflow_shared/exceptions \ No newline at end of file diff --git a/task-sdk/src/airflow/sdk/exceptions.py b/task-sdk/src/airflow/sdk/exceptions.py index b0ff82be293e7..ae10df0552bcb 100644 --- a/task-sdk/src/airflow/sdk/exceptions.py +++ b/task-sdk/src/airflow/sdk/exceptions.py @@ -26,6 +26,9 @@ # Re exporting AirflowConfigException from shared configuration from airflow.sdk._shared.configuration.exceptions import AirflowConfigException as AirflowConfigException +# Using AirflowErrorCodeMixin from shared exceptions +from airflow.sdk._shared.exceptions import AirflowErrorCodeMixin + if TYPE_CHECKING: from collections.abc import Collection @@ -51,11 +54,32 @@ class AirflowOptionalProviderFeatureException(AirflowException): """Raise by providers when imports are missing for optional provider features.""" -class AirflowNotFoundException(AirflowException): +class AirflowNotFoundException(AirflowErrorCodeMixin, AirflowException): """Raise when the requested object/resource is not available in the system.""" status_code = HTTPStatus.NOT_FOUND + user_facing_error_message = "Requested resource was not found" + + description = ( + "This error occurs when Airflow is unable to locate a requested object " + "during resolution. The object may not exist, may have been removed, or " + "may not be accessible through the configured sources (such as metadata " + "database, configuration, or external providers). All available lookup " + "mechanisms were checked but none returned a matching result." + ) + + first_steps = ( + "Verify that the requested identifier is correct and exists. " + "Check for typos or mismatched names. Ensure the resource is defined " + "in the expected location (e.g., Airflow metadata database, configuration, " + "or any configured external providers). If applicable, confirm that " + "external systems are reachable and properly configured. " + "Review recent changes that may have removed or renamed the resource." + ) + + documentation = "https://airflow.apache.org/docs/apache-airflow/stable/" + class AirflowDagCycleException(AirflowException): """Raise when there is a cycle in Dag definition.""" @@ -69,9 +93,31 @@ def __init__(self, error: ErrorResponse): super().__init__(f"{error.error.value}: {error.detail}") -class AirflowTimetableInvalid(AirflowException): +class AirflowTimetableInvalid(AirflowErrorCodeMixin, AirflowException): """Raise when a DAG has an invalid timetable.""" + user_facing_error_message = "Invalid timetable configuration" + + description = ( + "This error occurs when Airflow is unable to use the provided timetable " + "definition for scheduling. The timetable may be malformed, incompatible " + "with the current context, or fail internal validation rules required for " + "generating valid schedules." + ) + + first_steps = ( + "Verify that the timetable definition is correct and follows the expected " + "interface or format. If using a custom timetable, ensure it implements all " + "required methods and returns valid schedule data. Check for recent changes " + "to DAG or Airflow version compatibility that may affect timetable behavior. " + "Review logs for specific validation errors that indicate which part of the " + "timetable is invalid." + ) + + documentation = ( + "https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/timetable.html" + ) + class ErrorType(enum.Enum): """Error types used in the API client.""" @@ -330,13 +376,54 @@ def __str__(self) -> str: return f"cannot add {self.task_id!r} to {self.new_group_id!r} (already in {existing_group})" -class TaskNotFound(AirflowException): +class TaskNotFound(AirflowErrorCodeMixin, AirflowException): """Raise when a Task is not available in the system.""" + user_facing_error_message = "Requested task was not found" + + description = ( + "This error occurs when Airflow is unable to locate a task with the " + "specified identifier in the current context. The task may not exist, " + "may have been removed or renamed, or may not be part of the active DAG " + "or execution scope being evaluated." + ) + + first_steps = ( + "Verify that the task_id is correct and exists in the expected DAG. " + "Check whether the task has been renamed, removed, or conditionally " + "excluded. Ensure you are referencing the correct DAG version and that " + "the DAG has been successfully parsed and is active. " + "If using dynamic task mapping or generated tasks, confirm that the " + "task instance was created as expected during DAG run expansion." + ) + + documentation = "https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html" + class NodeNotFound(TaskNotFound, KeyError): """Raise when attempting to access an invalid node (task or task group) using [] notation.""" + user_facing_error_message = "Requested Node was not found" + + description = ( + "This error occurs when Airflow attempts to evaluate, render, or traverse " + "a specific structural component (Node) inside a Directed Acyclic Graph " + "(Dag), but cannot locate it. It typically triggers within visual graph layouts, " + "TaskGroup rendering, or internal serialization code when an element id " + "fails to map to the established topology." + ) + + first_steps = ( + "Verify that the targeting identifier or key for the specific node is correctly " + "spelled. Check the visual graph layout in the Airflow UI to confirm the node is active " + "and rendered. Ensure that any parent-child relationships or upstream and downstream " + "dependencies are correctly linked. Inspect the Dag serialization and parsing status to " + "confirm the structure matches the backend metadata. Confirm that the underlying workflow " + "definitions have not dynamically filtered or omitted the node." + ) + + documentation = "https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html" + def __str__(self) -> str: return str(self.args[0]) if self.args else "" diff --git a/uv.lock b/uv.lock index 4e5e7a6f79f80..257e7159c0fdb 100644 --- a/uv.lock +++ b/uv.lock @@ -52,6 +52,7 @@ apache-airflow-providers-celery = false apache-airflow-providers-docker = false apache-airflow-providers-sendgrid = false apache-airflow-providers-common-ai = false +apache-airflow-shared-exceptions = false apache-airflow = false apache-airflow-shared-observability = false apache-airflow-dev = false @@ -273,6 +274,7 @@ members = [ "apache-airflow-scripts", "apache-airflow-shared-configuration", "apache-airflow-shared-dagnode", + "apache-airflow-shared-exceptions", "apache-airflow-shared-listeners", "apache-airflow-shared-logging", "apache-airflow-shared-module-loading", @@ -1427,6 +1429,7 @@ dev = [ { name = "apache-airflow-scripts" }, { name = "apache-airflow-shared-configuration" }, { name = "apache-airflow-shared-dagnode" }, + { name = "apache-airflow-shared-exceptions" }, { name = "apache-airflow-shared-listeners" }, { name = "apache-airflow-shared-logging" }, { name = "apache-airflow-shared-module-loading" }, @@ -1700,6 +1703,7 @@ dev = [ { name = "apache-airflow-scripts", editable = "scripts" }, { name = "apache-airflow-shared-configuration", editable = "shared/configuration" }, { name = "apache-airflow-shared-dagnode", editable = "shared/dagnode" }, + { name = "apache-airflow-shared-exceptions", editable = "shared/exceptions" }, { name = "apache-airflow-shared-listeners", editable = "shared/listeners" }, { name = "apache-airflow-shared-logging", editable = "shared/logging" }, { name = "apache-airflow-shared-module-loading", editable = "shared/module_loading" }, @@ -8103,6 +8107,25 @@ requires-dist = [{ name = "structlog", specifier = ">=25.4.0" }] dev = [{ name = "apache-airflow-devel-common", editable = "devel-common" }] mypy = [{ name = "apache-airflow-devel-common", extras = ["mypy"], editable = "devel-common" }] +[[package]] +name = "apache-airflow-shared-exceptions" +version = "0.0" +source = { editable = "shared/exceptions" } + +[package.dev-dependencies] +dev = [ + { name = "apache-airflow-devel-common" }, +] +mypy = [ + { name = "apache-airflow-devel-common", extra = ["mypy"] }, +] + +[package.metadata] + +[package.metadata.requires-dev] +dev = [{ name = "apache-airflow-devel-common", editable = "devel-common" }] +mypy = [{ name = "apache-airflow-devel-common", extras = ["mypy"], editable = "devel-common" }] + [[package]] name = "apache-airflow-shared-listeners" version = "0.0"