Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent using trigger_rule=TriggerRule.ALWAYS in a task-generated mapping within mapped task groups #43368

Merged
merged 1 commit into from
Nov 8, 2024

Conversation

shahar1
Copy link
Contributor

@shahar1 shahar1 commented Oct 24, 2024

closes: #30334

I started to stabilize the dynamic task mapping area, and specifically the interactions with the different trigger rules.
In the case of TriggerRule.ALWAYS, it is currently impractical to use it along with the dynamic task mapping. That is due the fact that the expanded task is triggered immediately, while the expanded parameters from the upstream are undefined - what will always result in the immediate failure of the task (upstream_failure).
I'm not entirely sure what the purpose of the original issue was by putting an "ALWAYS" task within a dynamic task group (they mentioned using a "watcher" pattern). However, I think that for now it makes sense to assume that upon an expension of a task - its expanded parameter(s) should be well-defined.
If at some point we find a good reason to make this definition more flexible - I'm up to it, but at the current implementation it's rather a misuse that we should prevent.

I will cherry-pick into v2-10-test upon approval.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@shahar1 shahar1 added type:bug-fix Changelog: Bug Fixes area:dynamic-task-mapping AIP-42 labels Oct 24, 2024
@shahar1 shahar1 added this to the Airflow 2.10.3 milestone Oct 24, 2024
@shahar1 shahar1 requested a review from potiuk as a code owner October 24, 2024 21:34
@shahar1 shahar1 force-pushed the prevent-trigger-rule-always-in-dtm branch from 1ef1aee to ada1edd Compare October 24, 2024 21:35
@shahar1 shahar1 changed the title Prevent using trigger_rule="always" in a dynamic mapped task Prevent using trigger_rule=TriggerRule.ALWAYS in a dynamic mapped task Oct 24, 2024
@potiuk
Copy link
Member

potiuk commented Oct 24, 2024

cc: @uranusjr -> I think it will need your input.

@shahar1
Copy link
Contributor Author

shahar1 commented Oct 24, 2024

I need some help with fixing the tests - it gives strange errors when running them altogether:

FAILED tests/decorators/test_task_group.py::test_expand_create_mapped - RuntimeError: ('Cannot mix TaskGroups from different DAGs: %s and %s', <DAG: pipeline>, <DAG: test_executor>)
FAILED tests/decorators/test_task_group.py::test_expand_kwargs_create_mapped - RuntimeError: ('Cannot mix TaskGroups from different DAGs: %s and %s', <DAG: pipeline>, <DAG: test_executor>)
FAILED tests/decorators/test_task_group.py::test_task_group_expand_kwargs_with_upstream - airflow.exceptions.DuplicateTaskIdFound: Task id 'tg.t1' has already been added to the DAG
FAILED tests/decorators/test_task_group.py::test_task_group_expand_with_upstream - airflow.exceptions.DuplicateTaskIdFound: Task id 'tg.t1' has already been added to the DAG
FAILED tests/decorators/test_task_group.py::test_override_dag_default_args - RuntimeError: ('Cannot mix TaskGroups from different DAGs: %s and %s', <DAG: test_dag>, <DAG: test_executor>)
FAILED tests/decorators/test_task_group.py::test_override_dag_default_args_nested_tg - RuntimeError: ('Cannot mix TaskGroups from different DAGs: %s and %s', <DAG: test_dag>, <DAG: test_executor>)

Thanks beforehand :)

Update: I've managed by moving the pytest.raises just above tg.expand.

@shahar1 shahar1 force-pushed the prevent-trigger-rule-always-in-dtm branch 5 times, most recently from 1bd0fe9 to 22f6bd0 Compare October 25, 2024 09:30
@shahar1 shahar1 force-pushed the prevent-trigger-rule-always-in-dtm branch from 22f6bd0 to c753ca2 Compare October 25, 2024 10:48
@shahar1 shahar1 requested a review from ephraimbuddy October 25, 2024 13:05
Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! @uranusjr - can you take a look as well if that makes sense for you?

@potiuk
Copy link
Member

potiuk commented Oct 31, 2024

Can you resolve conflicts @shahar1 and back-port it - we are going to have 2.10.3 RC2 so we might include it there

@shahar1 shahar1 force-pushed the prevent-trigger-rule-always-in-dtm branch from c753ca2 to e057029 Compare November 7, 2024 18:29
@shahar1 shahar1 merged commit f6e0900 into apache:main Nov 8, 2024
53 checks passed
@shahar1 shahar1 deleted the prevent-trigger-rule-always-in-dtm branch November 8, 2024 06:48
@jedcunningham
Copy link
Member

So it does work if you use static list to expand:

    @task(trigger_rule="always")
    def hello(input):
        print(f"Hello, {input}")

    hello.expand(input=["world", "moon"])

Granted, not sure it matters. I question the value of "always" anyway - there is no point in having any upstream if it's just going to run right away anyways, right? Just use a root task...

@eladkal
Copy link
Contributor

eladkal commented Dec 4, 2024

there is no point in having any upstream if it's just going to run right away anyways, right? Just use a root task...

It does matter if upstream is ShortCircuitOperaror. Short circuit can cascade skip signal through trigger rules.

@shahar1
Copy link
Contributor Author

shahar1 commented Dec 6, 2024

So it does work if you use static list to expand:

    @task(trigger_rule="always")
    def hello(input):
        print(f"Hello, {input}")

    hello.expand(input=["world", "moon"])

Granted, not sure it matters. I question the value of "always" anyway - there is no point in having any upstream if it's just going to run right away anyways, right? Just use a root task...

That's correct - it only applies for dynamic task mappings, where the input value is another operator (that's the actual check that I added).
As Elad stated, there could be some cases where always might be useful. However, there's plenty of work to do in the Trigger Rule area, I'll try to lead the efforts when I have the time.

@jedcunningham
Copy link
Member

jedcunningham commented Dec 6, 2024

Sorry, I should have spent more time digging into this - I leaned too much on the title/newsfrag/docs changes.

I will say, however, that the docs/newsfragment on this is misleading/wrong. It does still work for static expansions. This also only blocks task group expansions, expanding bare tasks with dynamic input from another task still parses and ultimately fails when running:

    @task
    def get_input():
        return ["world", "moon"]

    @task(trigger_rule="always")
    def hello(input):
        print(f"Hello, {input}")

    hello.expand(input=get_input())

Screenshot 2024-12-06 at 11 14 50 AM

I'd be in favor of removing this from 2.10.4 for now, then go in and block both bare tasks and task groups, and document it more accurately too. (To be clear, just saying we pull it based on timing, if we can get it before we merge the 2.10.4 changes I'm completely happy with that too.)

@jedcunningham
Copy link
Member

@eladkal I'm curious, can you provide a simple example for the short circuit scenario you are talking about? In my experimenting, that feels like a bug on short circuit, not a useful feature of always.

e.g. if I have this:

    @task.short_circuit()
    def short():
        sleep(20)
        return False

    @task(trigger_rule="always")
    def hello(input):
        print(f"Hello, {input}")

    short() >> hello.expand(input=["world", "moon"])

(Note: I added a sleep to expand the race condition a bit to make it easier to see the problem.)

When it runs, the ALWAYS means those tasks run right away. So those hello tasks have run before the short task finishes:
Screenshot 2024-12-06 at 11 34 13 AM

Then, when short eventually finishes, it goes in and retroactively skips... but thats wrong, those have already run!
Screenshot 2024-12-06 at 11 36 24 AM

@eladkal
Copy link
Contributor

eladkal commented Dec 6, 2024

Then, when short eventually finishes, it goes in and retroactively skips... but thats wrong, those have already run!

then this is a bug. I assume you hit this because you set direct dependency between them. I think if there will be a middle task you won't hit this bug.

I do agree that the value of always is very minor. We can consider removing it for Airflow 3.

@shahar1 shahar1 changed the title Prevent using trigger_rule=TriggerRule.ALWAYS in a dynamic mapped task Prevent using trigger_rule=TriggerRule.ALWAYS in a task-generated mapping within mapped task Dec 6, 2024
@shahar1 shahar1 changed the title Prevent using trigger_rule=TriggerRule.ALWAYS in a task-generated mapping within mapped task Prevent using trigger_rule=TriggerRule.ALWAYS in a task-generated mapping within mapped task groups Dec 6, 2024
@shahar1 shahar1 added type:improvement Changelog: Improvements type:bug-fix Changelog: Bug Fixes kind:documentation and removed type:bug-fix Changelog: Bug Fixes kind:documentation type:improvement Changelog: Improvements labels Dec 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

trigger_rule=TriggerRule.ALWAYS doesn't work properly with task_groups
5 participants