Skip to content

Commit

Permalink
Trying out conditional version of slack notification on failure
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcundill committed Dec 10, 2024
1 parent 01e44e3 commit c843784
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 27 deletions.
32 changes: 5 additions & 27 deletions dags/collection_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
EcsRegisterTaskDefinitionOperator,
EcsRunTaskOperator,
)
from airflow.providers.slack.notifications.slack import send_slack_notification

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.models.param import Param
from airflow.models import Variable
from airflow.models.connection import Connection

from utils import dag_default_args, get_config, load_specification_datasets, setup_configure_dag_callable
from utils import dag_default_args, get_config, load_specification_datasets, setup_configure_dag_callable, send_slack_notification

# read config from file and environment
config = get_config()
Expand Down Expand Up @@ -102,28 +102,13 @@

configure_dag_task >> collection_ecs_task


slack_default_connection = Connection(
conn_id="slack_api_default",
conn_type="slack",
password=Variable.get("slack-api-key"),
extra={
# Specify extra parameters here
"timeout": "42",
},
)

with DAG(
"slack-notifcation-tester",
is_paused_upon_creation=False,
schedule=None,
on_success_callback=[
send_slack_notification(
text="The DAG {{ dag.dag_id }} succeeded",
channel="#dl-developers",
username="Airflow",
)
],
on_failure_callback=[
send_slack_notification(config, "The DAG {{ dag.dag_id }} failed")
]
) as dag:
configure_dag_task = PythonOperator(
task_id="configure-dag",
Expand All @@ -132,12 +117,5 @@
)
BashOperator(
task_id="mytask",
on_failure_callback=[
send_slack_notification(
text="The task {{ ti.task_id }} failed",
channel="#planning-data-platform",
username="Airflow",
)
],
bash_command="fail",
)
11 changes: 11 additions & 0 deletions dags/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from datetime import datetime, timedelta
from pathlib import Path

from airflow.providers.slack.notifications.slack import send_slack_notification

import boto3
import logging

Expand Down Expand Up @@ -116,3 +118,12 @@ def configure_dag(**kwargs):
ti.xcom_push(key='incremental-loading-override', value=incremental_loading_override)

return configure_dag


def send_slack_notification(config, message):
if config['env'] == 'production':
send_slack_notification(
text=message,
channel="#planning-data-platform",
username="Airflow"
)

0 comments on commit c843784

Please sign in to comment.