diff --git a/app/routers/events_router.py b/app/routers/events_router.py index c10e153..5924a41 100644 --- a/app/routers/events_router.py +++ b/app/routers/events_router.py @@ -1,6 +1,6 @@ # app/routers/events_router.py -from fastapi import APIRouter, status +from fastapi import APIRouter, status, BackgroundTasks # Added BackgroundTasks from app.schemas.event import EventChunk from app.services.event_service import EventService from typing import Dict, Any @@ -18,7 +18,7 @@ summary="이벤트 청크 전송", description="SDK로부터 사용자 행동 이벤트 청크를 수신합니다." ) -async def receive_event_chunk(chunk: EventChunk) -> Dict[str, Any]: +async def receive_event_chunk(chunk: EventChunk, background_tasks: BackgroundTasks) -> Dict[str, Any]: # Added background_tasks """ 사용자 행동 이벤트 데이터 청크를 받아 처리합니다. 수신된 청크는 KS3에 저장됩니다. @@ -30,5 +30,5 @@ async def receive_event_chunk(chunk: EventChunk) -> Dict[str, Any]: Dict[str, Any]: 처리 결과. """ event_service = EventService() - result = event_service.process_event_chunk(chunk) - return result \ No newline at end of file + background_tasks.add_task(event_service.process_event_chunk, chunk) # Changed to add_task + return {"status": "success", "message": "이벤트 청크 수신 완료, 백그라운드에서 처리 중"} # Immediate response \ No newline at end of file diff --git a/app/services/captcha_service.py b/app/services/captcha_service.py index 9d5e78d..56cd1c8 100644 --- a/app/services/captcha_service.py +++ b/app/services/captcha_service.py @@ -163,7 +163,7 @@ def verifyCaptchaAnswer( CaptchaVerificationResponse: 캡챠 검증 결과 (성공, 실패, 시간 초과). """ # 순환 참조를 방지하기 위해 함수 내에서 task를 임포트합니다. - from app.tasks.captcha_tasks import uploadBehaviorDataTask + from app.tasks.captcha_tasks import uploadBehaviorDataTask, processBehaviorVerificationTask try: confidence = None # Initialize confidence @@ -232,15 +232,12 @@ def verifyCaptchaAnswer( confidence = device_type_check_result.confidence skip_behavior_verification = True - behavior_result = None if not skip_behavior_verification and session_meta and full_events_from_chunks: - behavior_result = behavior_service.run_behavior_verification( - session_meta, full_events_from_chunks) - # logger.info(f"[디버그] 행동 분석 결과: {behavior_result}") - if behavior_result and behavior_result.get("ok"): - confidence = behavior_result.get("bot_prob") - if verdict is None: # Only update verdict if not already set by rule checker - verdict = behavior_result.get("verdict") + # 비동기적으로 행동 검증 모델 실행 + processBehaviorVerificationTask.delay(clientToken, session_meta, full_events_from_chunks) + logger.info(f"[행동 검증 모델] 비동기 작업 시작: clientToken={clientToken}") + else: + logger.info(f"[행동 검증 모델] 실행 건너뜀: clientToken={clientToken}, skip_behavior_verification={skip_behavior_verification}, session_meta_exists={bool(session_meta)}, full_events_from_chunks_exists={bool(full_events_from_chunks)}") logger.info(f"[행동 검증 모델] 신뢰도: {confidence}, 판정: {verdict}") # 7. 세션에 연결된 캡챠 문제의 정답을 가져옵니다. diff --git a/app/services/event_service.py b/app/services/event_service.py index 28fe2eb..0dd626f 100644 --- a/app/services/event_service.py +++ b/app/services/event_service.py @@ -4,7 +4,8 @@ from app.schemas.event import EventChunk from app.core.ks3 import upload_behavior_chunk from typing import Dict, Any -from fastapi import HTTPException, status # Import HTTPException and status +from fastapi import HTTPException, status +from fastapi.concurrency import run_in_threadpool # Added import logger = logging.getLogger(__name__) @@ -13,7 +14,7 @@ class EventService: def __init__(self): pass - def process_event_chunk(self, chunk: EventChunk) -> Dict[str, Any]: + async def process_event_chunk(self, chunk: EventChunk) -> Dict[str, Any]: """ 수신된 이벤트 청크를 처리하고 KS3에 업로드합니다. @@ -27,7 +28,7 @@ def process_event_chunk(self, chunk: EventChunk) -> Dict[str, Any]: try: # KS3에 청크 업로드 - upload_behavior_chunk(chunk) + await run_in_threadpool(upload_behavior_chunk, chunk) # Wrapped with run_in_threadpool logger.info(f"세션 {chunk.client_token}의 청크 {chunk.chunk_index} KS3 업로드 성공.") except Exception as e: logger.error(f"세션 {chunk.client_token}의 청크 {chunk.chunk_index} KS3 업로드 중 오류 발생: {e}", exc_info=True) diff --git a/app/tasks/captcha_tasks.py b/app/tasks/captcha_tasks.py index 19d5d18..1dcacbc 100644 --- a/app/tasks/captcha_tasks.py +++ b/app/tasks/captcha_tasks.py @@ -106,6 +106,52 @@ def uploadBehaviorDataTask(clientToken: str): f"클라이언트 토큰 {clientToken}에 대한 행동 데이터 업로드 오류: {e}") +@celery_app.task(bind=True) +def processBehaviorVerificationTask(self, clientToken: str, session_meta: Dict[str, Any], full_events_from_chunks: List[Dict[str, Any]]): + """ + 행동 데이터를 기반으로 봇 여부를 비동기적으로 검증하고 CaptchaLog를 업데이트하는 Celery 작업입니다. + """ + from app.services import behavior_service + from app.repositories.captcha_repo import CaptchaRepository + from db.session import SessionLocal + + db = SessionLocal() + try: + captcha_repo = CaptchaRepository(db) + session = captcha_repo.getCaptchaSessionByClientToken(clientToken) + if not session: + logger.warning(f"processBehaviorVerificationTask: CaptchaSession not found for clientToken {clientToken}") + return + + behavior_result = behavior_service.run_behavior_verification( + session_meta, full_events_from_chunks) + logger.info(f"[비동기 행동 분석] clientToken: {clientToken}, 결과: {behavior_result}") + + ml_confidence = None + ml_is_bot = None + if behavior_result and behavior_result.get("ok"): + ml_confidence = behavior_result.get("bot_prob") + verdict_from_ml = behavior_result.get("verdict") + ml_is_bot = True if verdict_from_ml == "bot" else False + + # CaptchaLog 업데이트 + captcha_log = captcha_repo.getCaptchaLogBySessionId(session.id) + if captcha_log: + captcha_log.ml_confidence = ml_confidence + captcha_log.ml_is_bot = ml_is_bot + db.add(captcha_log) + db.commit() + logger.info(f"clientToken {clientToken}에 대한 CaptchaLog ML 결과 업데이트 성공.") + else: + logger.warning(f"clientToken {clientToken}에 대한 CaptchaLog를 찾을 수 없어 ML 결과 업데이트 실패.") + + except Exception as e: + db.rollback() + logger.error(f"clientToken {clientToken}에 대한 행동 검증 작업 중 오류 발생: {e}", exc_info=True) + finally: + db.close() + + @celery_app.task def cleanupExpiredSessionsTask(): """