Skip to content

Commit

Permalink
the number of consumers: 3
Browse files Browse the repository at this point in the history
  • Loading branch information
HyejiYu committed Sep 23, 2024
1 parent c6b5afe commit d52fe03
Showing 1 changed file with 3 additions and 4 deletions.
7 changes: 3 additions & 4 deletions airflow/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,13 @@ def send_post_request_HOMEPLUS_task(category_id):
# TaskFlow API로 task 정의
@task
def generate_queue_values():
return [{"consumer": "HomePlus.product.queue.1"}, {"consumer": "HomePlus.product.queue.1"},
{"consumer": "HomePlus.product.queue.2"}, {"consumer": "HomePlus.product.queue.2"},
{"consumer": "HomePlus.product.queue.3"}, {"consumer": "HomePlus.product.queue.3"}]
return [{"consumer": "HomePlus.product.queue"}, {"consumer": "HomePlus.product.queue"},
{"consumer": "HomePlus.product.queue"}]

# BashOperator에서 expand로 받은 값을 사용
run_consumer_task = BashOperator.partial(
task_id="run-consumer-task",
bash_command="python3 /home/patturning1/consumer_test.py {{ params.param1 }}", # 템플릿을 사용하여 매핑된 값 사용
bash_command="python3 /home/patturning1/consumer_mq.py {{ params.param1 }}", # 템플릿을 사용하여 매핑된 값 사용
).expand(params=generate_queue_values())

category_ids = list(range(100001, 100078))
Expand Down

0 comments on commit d52fe03

Please sign in to comment.