Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions app/routers/events_router.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# app/routers/events_router.py

from fastapi import APIRouter, status
from fastapi import APIRouter, status, BackgroundTasks # Added BackgroundTasks
from app.schemas.event import EventChunk
from app.services.event_service import EventService
from typing import Dict, Any
Expand All @@ -18,7 +18,7 @@
summary="이벤트 청크 전송",
description="SDK로부터 사용자 행동 이벤트 청크를 수신합니다."
)
async def receive_event_chunk(chunk: EventChunk) -> Dict[str, Any]:
async def receive_event_chunk(chunk: EventChunk, background_tasks: BackgroundTasks) -> Dict[str, Any]: # Added background_tasks
"""
사용자 행동 이벤트 데이터 청크를 받아 처리합니다.
수신된 청크는 KS3에 저장됩니다.
Expand All @@ -30,5 +30,5 @@ async def receive_event_chunk(chunk: EventChunk) -> Dict[str, Any]:
Dict[str, Any]: 처리 결과.
"""
event_service = EventService()
result = event_service.process_event_chunk(chunk)
return result
background_tasks.add_task(event_service.process_event_chunk, chunk) # Changed to add_task
return {"status": "success", "message": "이벤트 청크 수신 완료, 백그라운드에서 처리 중"} # Immediate response
15 changes: 6 additions & 9 deletions app/services/captcha_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def verifyCaptchaAnswer(
CaptchaVerificationResponse: 캡챠 검증 결과 (성공, 실패, 시간 초과).
"""
# 순환 참조를 방지하기 위해 함수 내에서 task를 임포트합니다.
from app.tasks.captcha_tasks import uploadBehaviorDataTask
from app.tasks.captcha_tasks import uploadBehaviorDataTask, processBehaviorVerificationTask

try:
confidence = None # Initialize confidence
Expand Down Expand Up @@ -232,15 +232,12 @@ def verifyCaptchaAnswer(
confidence = device_type_check_result.confidence
skip_behavior_verification = True

behavior_result = None
if not skip_behavior_verification and session_meta and full_events_from_chunks:
behavior_result = behavior_service.run_behavior_verification(
session_meta, full_events_from_chunks)
# logger.info(f"[디버그] 행동 분석 결과: {behavior_result}")
if behavior_result and behavior_result.get("ok"):
confidence = behavior_result.get("bot_prob")
if verdict is None: # Only update verdict if not already set by rule checker
verdict = behavior_result.get("verdict")
# 비동기적으로 행동 검증 모델 실행
processBehaviorVerificationTask.delay(clientToken, session_meta, full_events_from_chunks)
logger.info(f"[행동 검증 모델] 비동기 작업 시작: clientToken={clientToken}")
else:
logger.info(f"[행동 검증 모델] 실행 건너뜀: clientToken={clientToken}, skip_behavior_verification={skip_behavior_verification}, session_meta_exists={bool(session_meta)}, full_events_from_chunks_exists={bool(full_events_from_chunks)}")
logger.info(f"[행동 검증 모델] 신뢰도: {confidence}, 판정: {verdict}")

# 7. 세션에 연결된 캡챠 문제의 정답을 가져옵니다.
Expand Down
7 changes: 4 additions & 3 deletions app/services/event_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from app.schemas.event import EventChunk
from app.core.ks3 import upload_behavior_chunk
from typing import Dict, Any
from fastapi import HTTPException, status # Import HTTPException and status
from fastapi import HTTPException, status
from fastapi.concurrency import run_in_threadpool # Added import

logger = logging.getLogger(__name__)

Expand All @@ -13,7 +14,7 @@ class EventService:
def __init__(self):
pass

def process_event_chunk(self, chunk: EventChunk) -> Dict[str, Any]:
async def process_event_chunk(self, chunk: EventChunk) -> Dict[str, Any]:
"""
수신된 이벤트 청크를 처리하고 KS3에 업로드합니다.

Expand All @@ -27,7 +28,7 @@ def process_event_chunk(self, chunk: EventChunk) -> Dict[str, Any]:

try:
# KS3에 청크 업로드
upload_behavior_chunk(chunk)
await run_in_threadpool(upload_behavior_chunk, chunk) # Wrapped with run_in_threadpool
logger.info(f"세션 {chunk.client_token}의 청크 {chunk.chunk_index} KS3 업로드 성공.")
except Exception as e:
logger.error(f"세션 {chunk.client_token}의 청크 {chunk.chunk_index} KS3 업로드 중 오류 발생: {e}", exc_info=True)
Expand Down
46 changes: 46 additions & 0 deletions app/tasks/captcha_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,52 @@ def uploadBehaviorDataTask(clientToken: str):
f"클라이언트 토큰 {clientToken}에 대한 행동 데이터 업로드 오류: {e}")


@celery_app.task(bind=True)
def processBehaviorVerificationTask(self, clientToken: str, session_meta: Dict[str, Any], full_events_from_chunks: List[Dict[str, Any]]):
"""
행동 데이터를 기반으로 봇 여부를 비동기적으로 검증하고 CaptchaLog를 업데이트하는 Celery 작업입니다.
"""
from app.services import behavior_service
from app.repositories.captcha_repo import CaptchaRepository
from db.session import SessionLocal

db = SessionLocal()
try:
captcha_repo = CaptchaRepository(db)
session = captcha_repo.getCaptchaSessionByClientToken(clientToken)
if not session:
logger.warning(f"processBehaviorVerificationTask: CaptchaSession not found for clientToken {clientToken}")
return

behavior_result = behavior_service.run_behavior_verification(
session_meta, full_events_from_chunks)
logger.info(f"[비동기 행동 분석] clientToken: {clientToken}, 결과: {behavior_result}")

ml_confidence = None
ml_is_bot = None
if behavior_result and behavior_result.get("ok"):
ml_confidence = behavior_result.get("bot_prob")
verdict_from_ml = behavior_result.get("verdict")
ml_is_bot = True if verdict_from_ml == "bot" else False

# CaptchaLog 업데이트
captcha_log = captcha_repo.getCaptchaLogBySessionId(session.id)
if captcha_log:
captcha_log.ml_confidence = ml_confidence
captcha_log.ml_is_bot = ml_is_bot
db.add(captcha_log)
db.commit()
logger.info(f"clientToken {clientToken}에 대한 CaptchaLog ML 결과 업데이트 성공.")
else:
logger.warning(f"clientToken {clientToken}에 대한 CaptchaLog를 찾을 수 없어 ML 결과 업데이트 실패.")

except Exception as e:
db.rollback()
logger.error(f"clientToken {clientToken}에 대한 행동 검증 작업 중 오류 발생: {e}", exc_info=True)
finally:
db.close()


@celery_app.task
def cleanupExpiredSessionsTask():
"""
Expand Down