Skip to content

Commit

Permalink
modified homeplus consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
HyejiYu committed Sep 23, 2024
1 parent 48ea4d2 commit d600b8f
Showing 1 changed file with 36 additions and 13 deletions.
49 changes: 36 additions & 13 deletions messagequeue/consumer/mq_consumer.py → messagequeue/consumer/homeplus_consumer.py
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import pika
import pika, threading, sys
import os, time, json
from dotenv import load_dotenv
import datetime, logging
Expand All @@ -12,44 +12,67 @@
hostname = os.getenv('RABBITMQ_HOSTNAME')
port = os.getenv('RABBITMQ_PORT')
vhost = os.getenv('RABBITMQ_VHOST')
queue = os.getenv('RABBITMQ_QUEUE')
queue = sys.argv[1]

# 사용자 이름과 비밀번호 설정
credentials = pika.PlainCredentials(username, password)

# RabbitMQ 연결 설정
connection = pika.BlockingConnection(pika.ConnectionParameters(hostname, port, vhost, credentials))
channel = connection.channel()
channel.basic_qos(prefetch_count=10)

# 타임아웃 시간 설정 (단위: 초, 5분 = 300초)
TIMEOUT_SECONDS = 180

def timeout_handler():
print("Timeout reached, no messages received. Shutting down.")
connection.close() # Connection을 닫아서 종료
sys.exit(0) # 프로그램 종료

# 타이머를 설정하는 함수
def reset_timer():
global timer
if timer:
timer.cancel() # 기존 타이머를 취소
timer = threading.Timer(TIMEOUT_SECONDS, timeout_handler)
timer.start()

start = time.time()
current_date = datetime.datetime.now().strftime('%Y-%m-%d')

timer = None
# 콜백 함수 정의 (메시지 수신 시 호출됨)
def callback(ch, method, properties, body):
start = time.time()
global cnt
global start
cnt += 1
# now = datetime.datetime.now()
reset_timer()
now_hour = datetime.datetime.now().hour
# formatted_date = now.strftime("%Y-%m-%d %H:%M:%S")
message_str = body.decode('utf-8')
message_json = json.loads(message_str)
category_name = message_json['category_name']
product_id = message_json['product_id']
price = message_json['price']

if not os.path.exists(f"./homeplus/{category_name}"):
os.makedirs(f"./homeplus/{category_name}/")
logging.info(f"Directory ./homeplus/{category_name}/ created")
if not os.path.exists(f"/mnt/patturning/HomePlus/{category_name}"):
os.makedirs(f"/mnt/patturning/HomePlus/{category_name}/")
logging.info(f"Directory /mnt/patturning/HomePlus/{category_name}/ created")
# else:
# print(f"Directory ./homeplus/{category_name}/ already exists")
with open(f'./homeplus/{category_name}/{product_id}.txt', 'a') as f:
f.write(f"{current_date},{price}\n")
end = time.time()
time_spent = datetime.timedelta(seconds=(end-start))
logging.info(f"{cnt}, ProductId[{product_id}] saving spent {time_spent}")
with open(f'/mnt/patturning/HomePlus/{category_name}/{product_id}.txt', 'a') as f:
f.write(f"{current_date},{now_hour},{price}\n")
if cnt % 10000 == 0:
end = time.time()
time_spent = datetime.timedelta(seconds=(end-start))
logging.info(f"Saving Product Per 10000s spent {time_spent}")
start = time.time()

# 메시지를 수신하도록 큐에 연결
channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True)


logging.info('Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
reset_timer()
channel.start_consuming()

0 comments on commit d600b8f

Please sign in to comment.