diff --git a/airflow/dag.py b/airflow/dag.py index ae491c0..c49356c 100644 --- a/airflow/dag.py +++ b/airflow/dag.py @@ -76,14 +76,13 @@ def send_post_request_HOMEPLUS_task(category_id): # TaskFlow API로 task 정의 @task def generate_queue_values(): - return [{"consumer": "HomePlus.product.queue.1"}, {"consumer": "HomePlus.product.queue.1"}, - {"consumer": "HomePlus.product.queue.2"}, {"consumer": "HomePlus.product.queue.2"}, - {"consumer": "HomePlus.product.queue.3"}, {"consumer": "HomePlus.product.queue.3"}] + return [{"consumer": "HomePlus.product.queue"}, {"consumer": "HomePlus.product.queue"}, + {"consumer": "HomePlus.product.queue"}] # BashOperator에서 expand로 받은 값을 사용 run_consumer_task = BashOperator.partial( task_id="run-consumer-task", - bash_command="python3 /home/patturning1/consumer_test.py {{ params.param1 }}", # 템플릿을 사용하여 매핑된 값 사용 + bash_command="python3 /home/patturning1/consumer_mq.py {{ params.param1 }}", # 템플릿을 사용하여 매핑된 값 사용 ).expand(params=generate_queue_values()) category_ids = list(range(100001, 100078))