Skip to content

Commit

Permalink
Increase the number of consumers and queues
Browse files Browse the repository at this point in the history
  • Loading branch information
HyejiYu committed Sep 23, 2024
1 parent 3c2acce commit c6b5afe
Showing 1 changed file with 36 additions and 25 deletions.
61 changes: 36 additions & 25 deletions airflow/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ def send_post_request(categoryId):
print(f"Failed: {response.status_code}, {response.text}")
response.raise_for_status()


# DAG의 기본 설정
default_args = {
'owner': 'khuda', # DAG 소유자
'depends_on_past': False, # 이전 DAG 실패 여부에 의존하지 않음
'email': ['[email protected]'], # 수신자 이메일
"email_on_success": True, # 성공 시 이메일 전송
'email_on_failure': True, # 실패 시 이메일 전송
'email_on_retry': True, # 재시도 시 이메일 전송
# 'email': ['[email protected]'], # 수신자 이메일
# "email_on_success": True, # 성공 시 이메일 전송
# 'email_on_failure': True, # 실패 시 이메일 전송
# 'email_on_retry': True, # 재시도 시 이메일 전송
'retries': 1, # 실패 시 재시도 횟수
'retry_delay': timedelta(minutes=5) # 재시도 간격
}
Expand All @@ -45,36 +46,46 @@ def send_post_request(categoryId):
'HomePlus_Crawling_DAG', # DAG의 이름
default_args=default_args, # 기본 인자 설정
description='HomePlus Crawling', # DAG 설명
schedule_interval=timedelta(days=1), # 실행 주기 (매일 1회)
schedule_interval='0 9,16,22 * * *', # 실행 주기 (매일 09:00, 16:00, 22:00 정각)
start_date=datetime(2024, 9, 20), # 시작 날짜
catchup=False # 시작 날짜부터 현재까지의 미실행 작업 실행 여부
) as dag:

run_consumer_task1 = BashOperator(
task_id='run-consumer-1', # Task 이름
bash_command="python3 /home/patturning1/mq_consumer1.py &",
do_xcom_push=False
)
# run_consumer_task1 = BashOperator(
# task_id='run-consumer-1', # Task 이름
# bash_command="python3 /home/patturning1/mq_consumer1.py &",
# do_xcom_push=False
# )

run_consumer_task2 = BashOperator(
task_id='run-consumer-2', # Task 이름
bash_command="python3 /home/patturning1/mq_consumer2.py &",
do_xcom_push=False
)
# run_consumer_task2 = BashOperator(
# task_id='run-consumer-2', # Task 이름
# bash_command="python3 /home/patturning1/mq_consumer2.py &",
# do_xcom_push=False
# )

run_consumer_task3 = BashOperator(
task_id='run-consumer-3', # Task 이름
bash_command="python3 /home/patturning1/mq_consumer3.py &",
do_xcom_push=False
)
# run_consumer_task3 = BashOperator(
# task_id='run-consumer-3', # Task 이름
# bash_command="python3 /home/patturning1/mq_consumer3.py &",
# do_xcom_push=False
# )

@task
def send_post_request_HOMEPLUS_task(category_id):
return send_post_request(category_id)

category_ids = list(range(100001, 100078))

[run_consumer_task1, run_consumer_task2, run_consumer_task3, send_post_request_HOMEPLUS_task.expand(category_id=category_ids)]
return send_post_request(category_id)

# TaskFlow API로 task 정의
@task
def generate_queue_values():
return [{"consumer": "HomePlus.product.queue.1"}, {"consumer": "HomePlus.product.queue.1"},
{"consumer": "HomePlus.product.queue.2"}, {"consumer": "HomePlus.product.queue.2"},
{"consumer": "HomePlus.product.queue.3"}, {"consumer": "HomePlus.product.queue.3"}]

# BashOperator에서 expand로 받은 값을 사용
run_consumer_task = BashOperator.partial(
task_id="run-consumer-task",
bash_command="python3 /home/patturning1/consumer_test.py {{ params.param1 }}", # 템플릿을 사용하여 매핑된 값 사용
).expand(params=generate_queue_values())

category_ids = list(range(100001, 100078))

[run_consumer_task, send_post_request_HOMEPLUS_task.expand(category_id=category_ids)]

0 comments on commit c6b5afe

Please sign in to comment.