Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide an alternative OpenTelemetry implementation for traces that follows standard otel practices #43941

Open
wants to merge 54 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
e1f721d
trace and span creation with context propagation
xBis7 Nov 12, 2024
fdb92df
cleanup
xBis7 Nov 12, 2024
4b3c93e
revert the metrics name length change
xBis7 Nov 14, 2024
939511b
cleanup
xBis7 Nov 15, 2024
bae8bca
move the active span dictionaries to the scheduler class
xBis7 Nov 16, 2024
c841f17
fix variable name + comment
xBis7 Nov 16, 2024
0266f27
initial impl: handling scheduler ha
xBis7 Nov 26, 2024
9af9c6b
fix span timings
xBis7 Nov 26, 2024
526d50e
refactor test_otel.py
xBis7 Nov 26, 2024
07076ee
handle scheduler forceful exit
xBis7 Dec 2, 2024
cf290d6
cleanup old dagrun and ti spans
xBis7 Dec 2, 2024
691865b
remove CTX_PROP_SUFFIX
xBis7 Dec 2, 2024
87d6efb
cleaning up comments
xBis7 Dec 2, 2024
8329733
remove context propagation config flag
xBis7 Dec 2, 2024
2af6df5
resolve conflicts and finish merging with main
xBis7 Dec 2, 2024
2ad643b
add attributes while cleaning up ended spans
xBis7 Dec 2, 2024
03e1383
fix integration and system test failures
xBis7 Dec 3, 2024
2042334
fix unit test failures
xBis7 Dec 4, 2024
5178e14
cleanup
xBis7 Dec 4, 2024
8f8d212
Merge remote-tracking branch 'origin/main' into ctx_prop_final
xBis7 Dec 4, 2024
ea3fbc1
fix sqlite migration failure
xBis7 Dec 4, 2024
ce0c356
Merge branch 'main' into ctx_prop_final
xBis7 Dec 4, 2024
e3be7e3
task output not captured by tests - fixed
xBis7 Dec 13, 2024
975f533
cleanup inaccurate comment
xBis7 Dec 13, 2024
e7c4cfd
use one common dictionary for active spans
xBis7 Dec 13, 2024
48928f9
rephrase comment about otel spans
xBis7 Dec 13, 2024
3943ffc
scheduler cleanup
xBis7 Dec 13, 2024
7f122f6
scheduler unit tests
xBis7 Dec 15, 2024
8511bd7
revert changes in old migration file
xBis7 Dec 15, 2024
08de46f
move passing reference to active_spans dict out of the scheduler loop
xBis7 Dec 15, 2024
ec32152
make set_dagrun_span_attrs not a static method
xBis7 Dec 15, 2024
5a234be
remove set methods from dagrun and ti
xBis7 Dec 15, 2024
54161f9
finish merging with main
xBis7 Dec 16, 2024
b3f544a
add a migration file
xBis7 Dec 16, 2024
0c5e45d
fix failing tests
xBis7 Dec 16, 2024
db481a1
fix discrepancies between model and migration file
xBis7 Dec 16, 2024
6744115
fix include_dag_run on ti refresh_from_db
xBis7 Dec 17, 2024
8434b02
fix issue with recreated spans + fix integration tests
xBis7 Dec 18, 2024
e96806b
resolve conflicts with main
xBis7 Dec 18, 2024
b562d87
trigger er diagram generation
xBis7 Dec 18, 2024
1d19534
trigger er diagram generation
xBis7 Dec 18, 2024
67e85af
refactor dagrun to improve readability
xBis7 Dec 18, 2024
82c159b
unit tests for dag_run span changes
xBis7 Dec 18, 2024
537dec0
fix refresh issue with loading dag_run when task_instance isn't bound…
xBis7 Dec 18, 2024
48374d5
resolve merge conflicts + new migration file
xBis7 Dec 30, 2024
11d5a6c
fix test_scheduler_job failures after merge
xBis7 Dec 31, 2024
09ed2e9
fix test_dagrun failures after merge
xBis7 Dec 31, 2024
cbfd68a
resolve conflicts after merge with main
xBis7 Jan 15, 2025
c5a71b6
fix test failures after merge
xBis7 Jan 15, 2025
6d5d361
fix test_exceptions.py
xBis7 Jan 15, 2025
cdc760d
merge with main + resolve conflicts
xBis7 Jan 17, 2025
cb96bf7
trigger ER diagram creation
xBis7 Jan 17, 2025
86b3cc4
trigger ER diagram creation
xBis7 Jan 17, 2025
4004ca5
trigger ER diagram creation
xBis7 Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,9 @@ def string_lower_type(val):
action="store_true",
)
ARG_RAW = Arg(("-r", "--raw"), argparse.SUPPRESS, "store_true")
ARG_CARRIER = Arg(
("-c", "--carrier"), help="Context Carrier, containing the injected context for the task span", nargs="?"
ashb marked this conversation as resolved.
Show resolved Hide resolved
)
ARG_IGNORE_ALL_DEPENDENCIES = Arg(
("-A", "--ignore-all-dependencies"),
help="Ignores all non-critical dependencies, including ignore_ti_state and ignore_task_deps",
Expand Down Expand Up @@ -1311,6 +1314,7 @@ class GroupCommand(NamedTuple):
ARG_CFG_PATH,
ARG_LOCAL,
ARG_RAW,
ARG_CARRIER,
ARG_IGNORE_ALL_DEPENDENCIES,
ARG_IGNORE_DEPENDENCIES,
ARG_DEPENDS_ON_PAST,
Expand Down
8 changes: 8 additions & 0 deletions airflow/cli/commands/remote_commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from airflow.utils.net import get_hostname
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.span_status import SpanStatus
from airflow.utils.state import DagRunState
from airflow.utils.task_instance_session import set_current_task_instance_session
from airflow.utils.types import DagRunTriggeredByType
Expand Down Expand Up @@ -462,6 +463,13 @@ def task_run(args, dag: DAG | None = None) -> TaskReturnCode | None:

log.info("Running %s on host %s", ti, hostname)

if args.carrier is not None:
log.info("Found args.carrier: %s. Setting the value on the ti instance.", args.carrier)
# The arg value is a dict string, and it needs to be converted back to a dict.
carrier_dict = json.loads(args.carrier)
ti.set_context_carrier(context_carrier=carrier_dict, with_commit=True)
ti.set_span_status(status=SpanStatus.ACTIVE, with_commit=True)

task_return_code = None
try:
if args.interactive:
Expand Down
37 changes: 37 additions & 0 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from __future__ import annotations

import json
import logging
import sys
from collections import defaultdict, deque
Expand All @@ -39,6 +40,7 @@
from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState
from airflow.utils.thread_safe_dict import ThreadSafeDict

PARALLELISM: int = conf.getint("core", "PARALLELISM")

Expand Down Expand Up @@ -115,6 +117,8 @@ class BaseExecutor(LoggingMixin):
:param parallelism: how many jobs should run at one time. Set to ``0`` for infinity.
"""

active_spans = ThreadSafeDict()

supports_ad_hoc_ti_run: bool = False
supports_sentry: bool = False

Expand Down Expand Up @@ -150,6 +154,10 @@ def __init__(self, parallelism: int = PARALLELISM):
def __repr__(self):
return f"{self.__class__.__name__}(parallelism={self.parallelism})"

@classmethod
def set_active_spans(cls, active_spans: ThreadSafeDict):
cls.active_spans = active_spans

def start(self): # pragma: no cover
"""Executors may need to get things started."""

Expand Down Expand Up @@ -337,6 +345,35 @@ def trigger_tasks(self, open_slots: int) -> None:
for _ in range(min((open_slots, len(self.queued_tasks)))):
key, (command, _, queue, ti) = sorted_queue.pop(0)

# If it's None, then the span for the current TaskInstanceKey hasn't been started.
if self.active_spans is not None and self.active_spans.get(key) is None:
from airflow.models.taskinstance import SimpleTaskInstance

if isinstance(ti, SimpleTaskInstance):
parent_context = Trace.extract(ti.parent_context_carrier)
else:
parent_context = Trace.extract(ti.dag_run.context_carrier)
# Start a new span using the context from the parent.
# Attributes will be set once the task has finished so that all
# values will be available (end_time, duration, etc.).
span = Trace.start_child_span(
span_name=f"{ti.task_id}",
parent_context=parent_context,
component="task",
start_time=ti.queued_dttm,
start_as_current=False,
)
self.active_spans.set(key, span)
# Inject the current context into the carrier.
carrier = Trace.inject()
# The carrier needs to be set on the ti, but it can't happen here because db calls are expensive.
# By the time the db update has finished, another heartbeat will have started
# and the tasks will have been triggered again.
xBis7 marked this conversation as resolved.
Show resolved Hide resolved
# So set the carrier as an argument to the command.
# The command execution will set it on the ti, and it will be propagated to the task itself.
command.append("--carrier")
command.append(json.dumps(carrier))

# If a task makes it here but is still understood by the executor
# to be running, it generally means that the task has been killed
# externally and not yet been marked as failed.
Expand Down
Loading