Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"execute cannot be called outside TaskInstance" warning fired despite being in a TaskInstance #41470

Closed
1 of 2 tasks
SKalide opened this issue Aug 14, 2024 · 16 comments
Closed
1 of 2 tasks
Labels
area:core area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet provider:google Google (including GCP) related issues

Comments

@SKalide
Copy link

SKalide commented Aug 14, 2024

Apache Airflow version

2.9.3

If "Other Airflow 2 version" selected, which one?

No response

What happened?

BigQueryTableExistenceSensor fire an "execute cannot be called outside TaskInstance" warning on execute

My observation: The ExecutorSafeguard is being called twice for a single sensor execution

Reason is, that the first encounter is over the TaskInstance _execute_callable (as intended). Now the decorater checks, if it is executed within a Taskinstance. After that, it will call the "real" execute function, which is also decorated and therefor will do the check again, but without a sentinel, which leads to a failed check and fires a warning (or raise a exception if allow_nested_operators is set).

See also: #41426

And there was also already opened an issue with the same problem but not solved, see #39413

What you think should happen instead?

Decorater should not fire a warning. Maybe it should be checked, if the decorater was already called and therefor did already checked it.

How to reproduce

from airflow import DAG
from airflow.providers.google.cloud.sensors.bigquery import BigQueryTableExistenceSensor
from airflow.utils.dates import days_ago
from airflow.models.baseoperator import BaseOperator


class LoggingBaseOperator(BaseOperator):
    def execute(self, context):
        self.log.info(f"Executing {self.__class__.__name__}")
        return super().execute(context)


class LoggingBigQueryTableExistenceSensor(
    BigQueryTableExistenceSensor, LoggingBaseOperator
):
    def poke(self, context):
        self.log.info(f"Poking {self.__class__.__name__}")
        return True  


dag = DAG(
    "test_bigquery_sensor_double_execution",
    default_args={
        "start_date": days_ago(1),
    },
    description="A simple DAG to test BigQueryTableExistenceSensor execution",
    schedule_interval=None,
)

sensor_task = LoggingBigQueryTableExistenceSensor(
    task_id="test_sensor",
    project_id="your-project-id",
    dataset_id="your-dataset-id",
    table_id="your-table-id",
    poke_interval=60,
    dag=dag,
)

log_bg.txt

Operating System

Ubuntu 22.04.4 LTS

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

Docker image: apache/airflow:2.9.3-python3.11

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@SKalide SKalide added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Aug 14, 2024
Copy link

boring-cyborg bot commented Aug 14, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@dosubot dosubot bot added the provider:google Google (including GCP) related issues label Aug 14, 2024
@SKalide
Copy link
Author

SKalide commented Aug 21, 2024

I've looked into this issue more deeply and I believe I can provide a solution. I could use some guidance on whether I should submit a PR for this.

The problem, in my opinion, is that after _execute_callable is called, it enters the ExecutorSafeguard wrapper. There, it checks if it's called inside a TaskInstance (hence the _execute_callable) and then returns the execution function itself. Because the execute method of BigQueryTableExistenceSensor is also decorated, it automatically re-enters the ExecutorSafeguard wrapper, but this time it appears as if it was called outside a Task Instance, causing the check to fail. With allow_nested_operators, it only fires a warning instead of failing.

My proposed solution is to check if the ExecutorSafeguard has already been called in the current execution context. If so, we can skip the check for nested calls. This can be achieved by setting a flag using thread-local storage to ensure thread safety.

class ExecutorSafeguard:
    """
    The ExecutorSafeguard decorator.

    Checks if the execute method of an operator isn't manually called outside
    the TaskInstance as we want to avoid bad mixing between decorated and
    classic operators.
    """

    test_mode = conf.getboolean("core", "unit_test_mode")
    _local = local()

    @classmethod
    def decorator(cls, func):
        @wraps(func)
        def wrapper(self, *args, **kwargs):
            if (
                getattr(ExecutorSafeguard._local, "in_executor_safeguard", False)
                and self.allow_nested_operators
            ):
                # If already in ExecutorSafeguard, call execution function - recursive call
                return func(self, *args, **kwargs)
            ExecutorSafeguard._local.in_executor_safeguard = True

            try:
                from airflow.decorators.base import DecoratedOperator

                sentinel = kwargs.pop(f"{self.__class__.__name__}__sentinel", None)

                if (
                    not cls.test_mode
                    and sentinel != _sentinel
                    and not isinstance(self, DecoratedOperator)
                ):
                    message = (
                        f"{self.__class__.__name__}.{func.__name__} cannot be called outside TaskInstance!"
                    )
                    raise AirflowException(message)
                return func(self, *args, **kwargs)
            finally:
                ExecutorSafeguard._local.in_executor_safeguard = False

        return wrapper

This solution would suppress the warning for legitimate nested calls while still protecting against unintended external calls. The use of local() ensures that this check is thread-safe.

I'm willing to create a Pull Request with this solution if the maintainers think this approach is appropriate. What do you think about this solution? Are there any aspects I should consider or modify before proceeding with a PR?

@SKalide SKalide closed this as completed Aug 21, 2024
@SKalide SKalide reopened this Aug 21, 2024
@zachliu
Copy link
Contributor

zachliu commented Oct 8, 2024

@SKalide i think you can simplify your example:

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator


class MyBashOperator(BashOperator):
    def execute(self, context):
        self.log.info(f"Executing {self.__class__.__name__}")
        return super().execute(context)


dag = DAG(
    "test_bash_execution",
    default_args={
        "start_date": days_ago(1),
    },
    description="A simple DAG to test MyBashOperator execution",
    schedule_interval=None,
)

task = MyBashOperator(
    task_id="my_bash_operator",
    bash_command="ls -lah",
    dag=dag,
)

this produces the same warning message
by using this simpler example, i believe it's more convincing that this issue is "serious" 😁

@dabla
Copy link
Contributor

dabla commented Oct 9, 2024

@SKalide i think you can simplify your example:

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator


class MyBashOperator(BashOperator):
    def execute(self, context):
        self.log.info(f"Executing {self.__class__.__name__}")
        return super().execute(context)


dag = DAG(
    "test_bash_execution",
    default_args={
        "start_date": days_ago(1),
    },
    description="A simple DAG to test MyBashOperator execution",
    schedule_interval=None,
)

task = MyBashOperator(
    task_id="my_bash_operator",
    bash_command="ls -lah",
    dag=dag,
)

this produces the same warning message by using this simpler example, i believe it's more convincing that this issue is "serious" 😁

I think I understand the issue of the warning and I think some refactoring will be needed to avoid the warning in the case you extend an existing operator, as that should still be allowed. Will have to create a test case for it and then check how we can prevent the warning in that case. The warning is there to prevent people from calling other operators from python operators, but in you example its not the case as it's inheritance.

@dabla
Copy link
Contributor

dabla commented Oct 9, 2024

I've looked into this issue more deeply and I believe I can provide a solution. I could use some guidance on whether I should submit a PR for this.

The problem, in my opinion, is that after _execute_callable is called, it enters the ExecutorSafeguard wrapper. There, it checks if it's called inside a TaskInstance (hence the _execute_callable) and then returns the execution function itself. Because the execute method of BigQueryTableExistenceSensor is also decorated, it automatically re-enters the ExecutorSafeguard wrapper, but this time it appears as if it was called outside a Task Instance, causing the check to fail. With allow_nested_operators, it only fires a warning instead of failing.

My proposed solution is to check if the ExecutorSafeguard has already been called in the current execution context. If so, we can skip the check for nested calls. This can be achieved by setting a flag using thread-local storage to ensure thread safety.

class ExecutorSafeguard:
    """
    The ExecutorSafeguard decorator.

    Checks if the execute method of an operator isn't manually called outside
    the TaskInstance as we want to avoid bad mixing between decorated and
    classic operators.
    """

    test_mode = conf.getboolean("core", "unit_test_mode")
    _local = local()

    @classmethod
    def decorator(cls, func):
        @wraps(func)
        def wrapper(self, *args, **kwargs):
            if (
                getattr(ExecutorSafeguard._local, "in_executor_safeguard", False)
                and self.allow_nested_operators
            ):
                # If already in ExecutorSafeguard, call execution function - recursive call
                return func(self, *args, **kwargs)
            ExecutorSafeguard._local.in_executor_safeguard = True

            try:
                from airflow.decorators.base import DecoratedOperator

                sentinel = kwargs.pop(f"{self.__class__.__name__}__sentinel", None)

                if (
                    not cls.test_mode
                    and sentinel != _sentinel
                    and not isinstance(self, DecoratedOperator)
                ):
                    message = (
                        f"{self.__class__.__name__}.{func.__name__} cannot be called outside TaskInstance!"
                    )
                    raise AirflowException(message)
                return func(self, *args, **kwargs)
            finally:
                ExecutorSafeguard._local.in_executor_safeguard = False

        return wrapper

This solution would suppress the warning for legitimate nested calls while still protecting against unintended external calls. The use of local() ensures that this check is thread-safe.

I'm willing to create a Pull Request with this solution if the maintainers think this approach is appropriate. What do you think about this solution? Are there any aspects I should consider or modify before proceeding with a PR?

I like the approach, you can open a PR and add you example as a test case, that way we are sure this case is not only fixed but also tested against regression.

@dabla
Copy link
Contributor

dabla commented Oct 9, 2024

class LoggingBaseOperator(BaseOperator):
    def execute(self, context):
        self.log.info(f"Executing {self.__class__.__name__}")
        return super().execute(context)

I've been able to reproduce the issue in a test case, will open a PR for it which will fix it, will try the solution proposed by @SKalide

@dabla
Copy link
Contributor

dabla commented Oct 9, 2024

I've looked into this issue more deeply and I believe I can provide a solution. I could use some guidance on whether I should submit a PR for this.

The problem, in my opinion, is that after _execute_callable is called, it enters the ExecutorSafeguard wrapper. There, it checks if it's called inside a TaskInstance (hence the _execute_callable) and then returns the execution function itself. Because the execute method of BigQueryTableExistenceSensor is also decorated, it automatically re-enters the ExecutorSafeguard wrapper, but this time it appears as if it was called outside a Task Instance, causing the check to fail. With allow_nested_operators, it only fires a warning instead of failing.

My proposed solution is to check if the ExecutorSafeguard has already been called in the current execution context. If so, we can skip the check for nested calls. This can be achieved by setting a flag using thread-local storage to ensure thread safety.

class ExecutorSafeguard:
    """
    The ExecutorSafeguard decorator.

    Checks if the execute method of an operator isn't manually called outside
    the TaskInstance as we want to avoid bad mixing between decorated and
    classic operators.
    """

    test_mode = conf.getboolean("core", "unit_test_mode")
    _local = local()

    @classmethod
    def decorator(cls, func):
        @wraps(func)
        def wrapper(self, *args, **kwargs):
            if (
                getattr(ExecutorSafeguard._local, "in_executor_safeguard", False)
                and self.allow_nested_operators
            ):
                # If already in ExecutorSafeguard, call execution function - recursive call
                return func(self, *args, **kwargs)
            ExecutorSafeguard._local.in_executor_safeguard = True

            try:
                from airflow.decorators.base import DecoratedOperator

                sentinel = kwargs.pop(f"{self.__class__.__name__}__sentinel", None)

                if (
                    not cls.test_mode
                    and sentinel != _sentinel
                    and not isinstance(self, DecoratedOperator)
                ):
                    message = (
                        f"{self.__class__.__name__}.{func.__name__} cannot be called outside TaskInstance!"
                    )
                    raise AirflowException(message)
                return func(self, *args, **kwargs)
            finally:
                ExecutorSafeguard._local.in_executor_safeguard = False

        return wrapper

This solution would suppress the warning for legitimate nested calls while still protecting against unintended external calls. The use of local() ensures that this check is thread-safe.

I'm willing to create a Pull Request with this solution if the maintainers think this approach is appropriate. What do you think about this solution? Are there any aspects I should consider or modify before proceeding with a PR?

The above proposed solution doesn't work for all test cases, especially in case of mixed usage with task decorated function calling execute methods on native operator. I've inspired the final working solution from it and now it should also support the case you presented.

@dabla
Copy link
Contributor

dabla commented Oct 9, 2024

I've created a PR which should fix this issue.

@dabla
Copy link
Contributor

dabla commented Oct 9, 2024

The build of the PR passed so once it's merged I think we can close this issue.

@SKalide SKalide closed this as completed Oct 19, 2024
@sam-gen-cop
Copy link

sam-gen-cop commented Oct 31, 2024

Hi @dabla, I'm using AWS MWAA 2.10.1 and I still get the same warning. Just wondering whether the issue is fixed in 2.10.1 or only in 2.9.3

Script,

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.utils.log.logging_mixin import LoggingMixin

class MyBashOperator(BashOperator, LoggingMixin):
    def execute(self, context):
        self.log.info("Executing MyBashOperator directly in the DAG definition")
        return super().execute(context)

dag = DAG(
    "test_bash_execution",
    default_args={
        "start_date": days_ago(1),
    },
    schedule_interval=None,
)

# Instantiate the operator
my_bash_task = MyBashOperator(
    task_id="my_bash_operator",
    bash_command="echo 'Testing direct execution'",
    dag=dag,
)

Logs,
image

@dabla
Copy link
Contributor

dabla commented Oct 31, 2024

Hi @dabla, I'm using AWS MWAA 2.10.1 and I still get the same warning. Just wondering whether the issue is fixed in 2.10.1 or only in 2.9.3

Script,

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.utils.log.logging_mixin import LoggingMixin

class MyBashOperator(BashOperator, LoggingMixin):
    def execute(self, context):
        self.log.info("Executing MyBashOperator directly in the DAG definition")
        return super().execute(context)

dag = DAG(
    "test_bash_execution",
    default_args={
        "start_date": days_ago(1),
    },
    schedule_interval=None,
)

# Instantiate the operator
my_bash_task = MyBashOperator(
    task_id="my_bash_operator",
    bash_command="echo 'Testing direct execution'",
    dag=dag,
)

Logs, image

The fix is not in Airflow 2.10.1 nor 2.10.2, I just checked the source code. I suppose it will be in 2.10.3 @potiuk @uranusjr ?

Hmm seem I can't find it in rc1 of 2.10.3?

@dabla
Copy link
Contributor

dabla commented Nov 6, 2024

Hi @dabla, I'm using AWS MWAA 2.10.1 and I still get the same warning. Just wondering whether the issue is fixed in 2.10.1 or only in 2.9.3

Script,

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.utils.log.logging_mixin import LoggingMixin

class MyBashOperator(BashOperator, LoggingMixin):
    def execute(self, context):
        self.log.info("Executing MyBashOperator directly in the DAG definition")
        return super().execute(context)

dag = DAG(
    "test_bash_execution",
    default_args={
        "start_date": days_ago(1),
    },
    schedule_interval=None,
)

# Instantiate the operator
my_bash_task = MyBashOperator(
    task_id="my_bash_operator",
    bash_command="echo 'Testing direct execution'",
    dag=dag,
)

Logs, image

@sam-gen-cop The fix is now available in Airflow 2.10.3

@npatel44
Copy link

npatel44 commented Jan 15, 2025

This Warning still happens for custom Operator written on top of Provider's operators. (Multiple levels of nesting) . Logic of this PR should work for any levels of nesting so not sure what's the issue but I still see this with airflow 2.10.2 and 2.10.4

@potiuk
Copy link
Member

potiuk commented Jan 16, 2025

This Warning still happens for custom Operator written on top of Provider's operators. (Multiple levels of nesting) . Logic of this PR should work for any levels of nesting so not sure what's the issue but I still see this with airflow 2.10.2 and 2.10.4

Can you open a new issue for that with the detail of your case please ? Or even better - contributing a fix - following that one here should be relatively simple so maybe you or your company would like to contribute it? Just to put it in perspective - this is an open-source project that a lot of people contribute to, so when you create such an issue, you will have to wait until somoene picks it up and fix it. But the most certain way is to implement and contribute a fix yourself. The second best is to find someone who can do it - maybe even paying them. The third best is to open a detailed issue that makes it easy to reproduce by anyone, we can then mark it as "good first issue" for example and hopefully sooner or later one of the contributors will fix it.

Following one of those routes is the best way to help in solving your issue @npatel44

@dabla
Copy link
Contributor

dabla commented Jan 16, 2025

I’m wondering if there wasn’t even an additional fix for this issue but only made it in main/Airflow 3 codebase but could be mistalen ofc.

@adam-aczel-towa
Copy link

adam-aczel-towa commented Feb 4, 2025

I'm getting the same warning for a ShortCircuitOperator in Version 2.10.4
{baseoperator.py:421} WARNING - ShortCircuitOperator.execute cannot be called outside TaskInstance!

The DAG configuration looks like this:

  # Create Airflow Operators for each task dynamically
  task_objects = {}
  for task_id, python_callable in tasks:
      if task_id.startswith('get_'):
          task_objects[task_id] = ShortCircuitOperator(
              task_id=task_id,
              python_callable=python_callable,
              op_kwargs={
                  'credentials': credentials
              },
              dag=dag,
          )
      else:
          task_objects[task_id] = PythonOperator(
              task_id=task_id,
              python_callable=python_callable,
              op_kwargs={
                  'credentials': credentials
              },
              dag=dag,
          )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet provider:google Google (including GCP) related issues
Projects
None yet
Development

No branches or pull requests

8 participants