Skip to content

Commit 22fe3a7

Browse files
committed
modified DAGs and arrange import modules and functions
1 parent 4d21f43 commit 22fe3a7

File tree

4 files changed

+82
-94
lines changed

4 files changed

+82
-94
lines changed

airflow/HomePlus_dag.py

+4-52
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,5 @@
1-
from airflow.operators.python import PythonOperator, get_current_context
2-
from airflow.exceptions import AirflowSkipException
3-
from airflow import DAG
4-
from airflow.decorators import task
5-
from dotenv import load_dotenv
6-
import os
7-
from airflow.operators.bash import BashOperator
8-
from airflow.operators.email import EmailOperator
9-
from datetime import datetime, timedelta
10-
import json, requests
11-
12-
# HTTP POST 요청 함수
13-
def send_post_request(categoryId):
14-
load_dotenv()
15-
url = os.getenv("HOMEPLUS_SERVICE_URL")
16-
headers = {"Content-Type": "application/json"}
17-
data = json.dumps({"category_id": categoryId})
18-
response = requests.post(url, headers=headers, data=data)
19-
20-
if response.status_code == 200:
21-
print(f"Success: {response.json()}")
22-
elif "500 Internal Server Error" in response.text:
23-
context = get_current_context()
24-
raise AirflowSkipException(f"Skip task {context['ti'].task_id}")
25-
else:
26-
response.raise_for_status()
1+
from modules import collect_task_results, send_post_request
2+
from packages import *
273

284
# DAG 기본 설정
295
default_args = {
@@ -59,35 +35,11 @@ def generate_queue_values():
5935
# BashOperator에서 expand로 받은 값을 사용
6036
run_consumer_task = BashOperator.partial(
6137
task_id="run-consumer-task",
62-
bash_command="python3 /home/patturning1/HomePlus_consumer.py {{ params.consumer }}",
38+
bash_command="python3 /home/patturning2/HomePlus_consumer.py {{ params.consumer }}",
6339
).expand(params=generate_queue_values())
6440

6541
category_ids = list(range(100001, 100078))
6642

67-
# 모든 태스크의 상태를 수집하여 결과를 XCom에 저장하는 함수
68-
def collect_task_results(**context):
69-
task_instances = context['dag_run'].get_task_instances()
70-
task_states = {task_instance.task_id: task_instance.state for task_instance in task_instances}
71-
72-
# 실패한 태스크가 있는지 확인
73-
if any(state == 'failed' for state in task_states.values()):
74-
email_subject = "Task Failure Alert"
75-
email_body = f"""
76-
<h3>One or more tasks have failed!</h3>
77-
<p>Task States:</p>
78-
<pre>{task_states}</pre>
79-
"""
80-
else:
81-
email_subject = "Task Success Alert"
82-
email_body = f"""
83-
<h3>All tasks have completed successfully!</h3>
84-
<p>Task States:</p>
85-
<pre>{task_states}</pre>
86-
"""
87-
88-
# 결과를 XCom에 저장
89-
context['ti'].xcom_push(key='email_subject', value=email_subject)
90-
context['ti'].xcom_push(key='email_body', value=email_body)
9143

9244
# 모든 태스크가 완료된 후 상태를 수집
9345
collect_task_results_task = PythonOperator(
@@ -100,7 +52,7 @@ def collect_task_results(**context):
10052
# 이메일 전송
10153
send_summary_email = EmailOperator(
10254
task_id="send_summary_email",
103-
to="patturning1@gmail.com",
55+
to="patturning2@gmail.com",
10456
subject="{{ task_instance.xcom_pull(task_ids='collect_task_results', key='email_subject') }}",
10557
html_content="{{ task_instance.xcom_pull(task_ids='collect_task_results', key='email_body') }}",
10658
trigger_rule="all_done",

airflow/Oasis_dag.py

+27-42
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,12 @@
1-
from airflow import DAG
2-
from airflow.decorators import task
3-
from dotenv import load_dotenv
4-
import os
5-
from airflow.operators.bash import BashOperator
6-
from airflow.exceptions import AirflowSkipException
7-
from airflow.operators.python import get_current_context
8-
from datetime import datetime, timedelta
9-
import json, requests
10-
11-
12-
# 함수 정의: HTTP POST 요청
13-
def send_post_request():
14-
load_dotenv()
15-
url = os.getenv("OASIS_SERVICE_URL")
16-
17-
response = requests.post(url)
18-
19-
# 응답 코드와 내용을 로그로 남김
20-
if response.status_code == 200:
21-
print(f"Success: {response.json()}")
22-
elif "500 Internal Server Error" in response.text:
23-
context = get_current_context()
24-
print(
25-
f"Failed: {response.status_code}, NO PRODUCT EXISTS, detail shows below\n{response.text}"
26-
)
27-
raise AirflowSkipException(f"Skip task {context['ti'].task_id}")
28-
else:
29-
print(f"Failed: {response.status_code}, {response.text}")
30-
response.raise_for_status()
31-
1+
from modules import collect_task_results, send_post_request
2+
from packages import *
323

334
# DAG의 기본 설정
345
default_args = {
35-
"owner": "khuda", # DAG 소유자
36-
"depends_on_past": False, # 이전 DAG 실패 여부에 의존하지 않음
37-
# 'email': ['[email protected]'], # 수신자 이메일
38-
# "email_on_success": True, # 성공 시 이메일 전송
39-
# 'email_on_failure': True, # 실패 시 이메일 전송
40-
# 'email_on_retry': True, # 재시도 시 이메일 전송
41-
"retries": 1, # 실패 시 재시도 횟수
42-
"retry_delay": timedelta(minutes=5), # 재시도 간격
6+
"owner": "khuda",
7+
"depends_on_past": False,
8+
"retries": 1,
9+
"retry_delay": timedelta(minutes=5),
4310
}
4411

4512
# DAG 정의
@@ -64,8 +31,26 @@ def generate_queue_values():
6431
# BashOperator에서 expand로 받은 값을 사용
6532
run_consumer_task = BashOperator.partial(
6633
task_id="run-consumer-task",
67-
bash_command="python3 /home/patturning1/Oasis_consumer.py {{ params.consumer }}", # 템플릿을 사용하여 매핑된 값 사용
34+
bash_command="python3 /home/patturning2/Oasis_consumer.py {{ params.consumer }}", # 템플릿을 사용하여 매핑된 값 사용
6835
).expand(params=generate_queue_values())
6936

70-
send_post_request_OASIS_task()
71-
run_consumer_task
37+
# 모든 태스크가 완료된 후 상태를 수집
38+
collect_task_results_task = PythonOperator(
39+
task_id="collect_task_results",
40+
python_callable=collect_task_results,
41+
provide_context=True,
42+
trigger_rule="all_done",
43+
)
44+
45+
# 이메일 전송
46+
send_summary_email = EmailOperator(
47+
task_id="send_summary_email",
48+
49+
subject="{{ task_instance.xcom_pull(task_ids='collect_task_results', key='email_subject') }}",
50+
html_content="{{ task_instance.xcom_pull(task_ids='collect_task_results', key='email_body') }}",
51+
trigger_rule="all_done",
52+
)
53+
[
54+
send_post_request_OASIS_task()
55+
run_consumer_task
56+
] >> collect_task_results_task >> send_summary_email

airflow/modules.py

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from dotenv import load_dotenv
2+
import os, json, requests
3+
from airflow.operators.python import get_current_context
4+
from airflow.exceptions import AirflowSkipException
5+
6+
# 모든 태스크의 상태를 수집하여 결과를 XCom에 저장하는 함수
7+
def collect_task_results(**context):
8+
task_instances = context['dag_run'].get_task_instances()
9+
task_states = {task_instance.task_id: task_instance.state for task_instance in task_instances}
10+
11+
# 실패한 태스크가 있는지 확인
12+
if any(state == 'failed' for state in task_states.values()):
13+
email_subject = "Task Failure Alert"
14+
email_body = f"""
15+
<h3>One or more tasks have failed!</h3>
16+
<p>Task States:</p>
17+
<pre>{task_states}</pre>
18+
"""
19+
else:
20+
email_subject = "Task Success Alert"
21+
email_body = f"""
22+
<h3>All tasks have completed successfully!</h3>
23+
<p>Task States:</p>
24+
<pre>{task_states}</pre>
25+
"""
26+
27+
# 결과를 XCom에 저장
28+
context['ti'].xcom_push(key='email_subject', value=email_subject)
29+
context['ti'].xcom_push(key='email_body', value=email_body)
30+
31+
# HTTP POST 요청 함수
32+
def send_post_request(categoryId):
33+
load_dotenv()
34+
url = os.getenv("HOMEPLUS_SERVICE_URL")
35+
headers = {"Content-Type": "application/json"}
36+
data = json.dumps({"category_id": categoryId})
37+
response = requests.post(url, headers=headers, data=data)
38+
39+
if response.status_code == 200:
40+
print(f"Success: {response.json()}")
41+
elif "500 Internal Server Error" in response.text:
42+
context = get_current_context()
43+
raise AirflowSkipException(f"Skip task {context['ti'].task_id}")
44+
else:
45+
response.raise_for_status()

airflow/packages.py

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from airflow.operators.bash import BashOperator
2+
from airflow.operators.email import EmailOperator
3+
from airflow.operators.python import PythonOperator
4+
from datetime import datetime, timedelta
5+
from airflow import DAG
6+
from airflow.decorators import task

0 commit comments

Comments
 (0)