From d52fe03a2b14e14f406d12a51346f0a89d710c5f Mon Sep 17 00:00:00 2001 From: hyejiyu Date: Tue, 24 Sep 2024 00:33:19 +0900 Subject: [PATCH] the number of consumers: 3 --- airflow/dag.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/airflow/dag.py b/airflow/dag.py index ae491c0..c49356c 100644 --- a/airflow/dag.py +++ b/airflow/dag.py @@ -76,14 +76,13 @@ def send_post_request_HOMEPLUS_task(category_id): # TaskFlow API로 task 정의 @task def generate_queue_values(): - return [{"consumer": "HomePlus.product.queue.1"}, {"consumer": "HomePlus.product.queue.1"}, - {"consumer": "HomePlus.product.queue.2"}, {"consumer": "HomePlus.product.queue.2"}, - {"consumer": "HomePlus.product.queue.3"}, {"consumer": "HomePlus.product.queue.3"}] + return [{"consumer": "HomePlus.product.queue"}, {"consumer": "HomePlus.product.queue"}, + {"consumer": "HomePlus.product.queue"}] # BashOperator에서 expand로 받은 값을 사용 run_consumer_task = BashOperator.partial( task_id="run-consumer-task", - bash_command="python3 /home/patturning1/consumer_test.py {{ params.param1 }}", # 템플릿을 사용하여 매핑된 값 사용 + bash_command="python3 /home/patturning1/consumer_mq.py {{ params.param1 }}", # 템플릿을 사용하여 매핑된 값 사용 ).expand(params=generate_queue_values()) category_ids = list(range(100001, 100078))