diff --git a/services/welding-machine-data-simulator-service/app/config/settings.py b/services/welding-machine-data-simulator-service/app/config/settings.py index 86bf95f..89439a5 100644 --- a/services/welding-machine-data-simulator-service/app/config/settings.py +++ b/services/welding-machine-data-simulator-service/app/config/settings.py @@ -4,6 +4,9 @@ class Settings(BaseSettings): + # ✅ 환경 구분 (기본값을 local로 변경) + environment: str = "production" # 기본값을 다시 local로 변경 + # Azure Storage 설정 azure_connection_string: str azure_container_name: str = "simulator-data" @@ -15,25 +18,109 @@ class Settings(BaseSettings): scheduler_interval_minutes: int = 1 batch_size: int = 10 - # Welding Machine 모델 서비스 설정 + # ✅ 기존 호환성을 위한 필드 (deprecated but needed) welding_machine_url: str = "http://localhost:8006" + # ✅ 환경별 서비스 URL 설정 + # 로컬 개발 + local_gateway_url: str = "http://localhost:8088" + local_model_service_url: str = "http://localhost:8006" + + # Docker Compose + docker_gateway_url: str = "http://gateway:8088" + docker_model_service_url: str = "http://welding-model-service:8006" + + # AKS Kubernetes (내부 서비스 통신) + kubernetes_gateway_url: str = "http://gateway.smart-be.svc.cluster.local:8088" + kubernetes_model_service_url: str = "http://sf-welding-machine-defect-detection-model-service.sf-welding-machine-defect-detection-model-service.svc.cluster.local:80" + + # ✅ Production (현재 실제 배포된 URL) - 기본값으로 설정 + production_gateway_url: str = "http://20.249.138.42:8088" # 배포시 기본값 + production_model_service_url: str = "http://20.249.138.42:8088/api/v1/welding/defect" + + # HTTP 설정 + spring_boot_timeout: int = 30 + spring_boot_max_retries: int = 3 + http_timeout: int = 30 + max_retries: int = 3 + # 로그 설정 log_directory: str = "logs" log_filename: str = "welding_anomaly_detections.json" error_log_filename: str = "welding_errors.json" - # HTTP 클라이언트 설정 - http_timeout: int = 30 - max_retries: int = 3 + def __init__(self, **kwargs): + super().__init__(**kwargs) + # ✅ 명시적으로 environment가 설정되지 않은 경우에만 자동 감지 + if not kwargs.get('environment') and not os.getenv('ENVIRONMENT'): + # 특정 조건에서만 production으로 감지 (더 엄격한 조건) + if (hasattr(self, 'azure_connection_string') and + self.azure_connection_string and + self.azure_connection_string != "your_azure_connection_string" and + "localhost" not in self.azure_connection_string and + # 추가 조건: Kubernetes 환경 변수 존재 여부 + (os.getenv('KUBERNETES_SERVICE_HOST') or os.getenv('WEBSITE_HOSTNAME'))): + self.environment = "production" + else: + self.environment = "local" + + @property + def gateway_service_url(self) -> str: + """환경에 따른 게이트웨이 서비스 URL""" + url_map = { + "local": self.local_gateway_url, + "development": self.local_gateway_url, + "docker": self.docker_gateway_url, + "kubernetes": self.kubernetes_gateway_url, + "production": self.production_gateway_url, + } + return url_map.get(self.environment, self.local_gateway_url) + + @property + def model_service_url(self) -> str: + """환경에 따른 모델 서빙 서비스 URL""" + url_map = { + "local": self.local_model_service_url, + "development": self.local_model_service_url, + "docker": self.docker_model_service_url, + "kubernetes": self.kubernetes_model_service_url, + "production": self.production_model_service_url, + } + return url_map.get(self.environment, self.local_model_service_url) @property def model_services(self) -> Dict[str, str]: - """Welding Machine 모델 서비스 URL""" + """Welding Machine 모델 서비스 URL (기존 호환성 유지)""" return { - "welding-machine": self.welding_machine_url + "welding-machine": self.welding_machine_url # 기존 필드 사용 } + @property + def spring_boot_service_url(self) -> str: + """기존 호환성을 위한 별칭""" + return self.gateway_service_url + + @property + def spring_boot_endpoints(self) -> Dict[str, str]: + """스프링부트 서비스 엔드포인트 (게이트웨이 경유)""" + base_url = self.gateway_service_url + + return { + "welding_data": f"{base_url}/weldingMachineDefectDetectionLogs", + "health": f"{base_url}/actuator/health", + "status": f"{base_url}/weldingMachineDefectDetectionLogs" + } + + @property + def is_local_environment(self) -> bool: + """로컬 환경 여부 확인""" + return self.environment in ["local", "development"] + + @property + def is_production_environment(self) -> bool: + """프로덕션 환경 여부 확인""" + return self.environment in ["production", "kubernetes"] + model_config = { "env_file": ".env", "env_file_encoding": "utf-8" diff --git a/services/welding-machine-data-simulator-service/app/main.py b/services/welding-machine-data-simulator-service/app/main.py index 8196a29..0f4ae09 100644 --- a/services/welding-machine-data-simulator-service/app/main.py +++ b/services/welding-machine-data-simulator-service/app/main.py @@ -1,4 +1,5 @@ from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager from app.config.settings import settings from app.services.scheduler_service import simulator_scheduler @@ -9,16 +10,12 @@ @asynccontextmanager async def lifespan(app: FastAPI): - """애플리케이션 생명주기 관리""" - # 시작 시 print("🚀 Data Simulator Service 시작 중...") - # 환경 변수 체크 if not settings.azure_connection_string: print("⚠️ AZURE_CONNECTION_STRING 환경 변수가 설정되지 않았습니다.") print(" .env 파일을 생성하거나 환경 변수를 설정해주세요.") - # 로그 디렉토리 생성 os.makedirs(settings.log_directory, exist_ok=True) print(f"📁 로그 디렉토리: {settings.log_directory}") @@ -27,12 +24,11 @@ async def lifespan(app: FastAPI): yield - # 종료 시 print("🛑 Data Simulator Service 종료 중...") if simulator_scheduler.is_running: await simulator_scheduler.stop() -# FastAPI 앱 생성 + app = FastAPI( title="Welding Machine Data Simulator Service", description="용접기 결함 탐지 모델을 위한 실시간 데이터 시뮬레이터", @@ -40,17 +36,25 @@ async def lifespan(app: FastAPI): lifespan=lifespan ) -# 라우터 설정 -# 시뮬레이터 활성화/비활성화/상태확인 API 모음 +# ✅ CORS (개발 환경) +origins = ["http://localhost:3000"] +app.add_middleware( + CORSMiddleware, + allow_origins=origins, # 프론트 도메인 명시 + allow_credentials=True, # 쿠키/인증 헤더 사용 시 True + allow_methods=["*"], # 또는 ["GET","POST","OPTIONS",...] + allow_headers=["*"], +) + +# ✅ 라우터: 여기에서만 최종 prefix 부여 +# simulator_router 내부는 @router.get("/status")처럼 상대 경로만 있어야 함 app.include_router(simulator_router.router, prefix="/simulator") -# azure storage 연결, model serving 서비스 연결 확인 API 모음 -app.include_router(test_connection_router.router, prefix="/test") +app.include_router(test_connection_router.router, + prefix="/test") -# 아래는 서비스 기본 정보 확인과 서비스 헬스 체크 api 정의 @app.get("/") async def root(): - """서비스 정보""" return { "service": "Welding Machine Data Simulator Service", "version": "1.0.0", @@ -62,5 +66,4 @@ async def root(): @app.get("/health") async def health_check(): - """헬스 체크""" return {"status": "healthy"} diff --git a/services/welding-machine-data-simulator-service/app/services/scheduler_service.py b/services/welding-machine-data-simulator-service/app/services/scheduler_service.py index 992cbf2..8a09ebb 100644 --- a/services/welding-machine-data-simulator-service/app/services/scheduler_service.py +++ b/services/welding-machine-data-simulator-service/app/services/scheduler_service.py @@ -1,9 +1,10 @@ +import asyncio from datetime import datetime from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from app.config.settings import settings from app.services.azure_storage import azure_storage -from app.services.model_client import model_client +from app.services.spring_client import spring_client from app.utils.logger import anomaly_logger @@ -22,7 +23,7 @@ async def start(self): # Azure Storage 연결 await azure_storage.connect() - # 헬스 체크 + # 헬스 체크 (스프링부트 포함) await self._initial_health_check() # 스케줄 작업 등록 @@ -39,7 +40,7 @@ async def start(self): self.is_running = True print(f"🚀 시뮬레이터 시작! (간격: {settings.scheduler_interval_minutes}분)") - print(f"📊 대상 서비스: {list(settings.model_services.keys())}") + print(f"📊 스프링부트 서비스: {settings.spring_boot_service_url}") print("-" * 60) except Exception as e: @@ -49,34 +50,42 @@ async def start(self): async def stop(self): """스케줄러 중지""" if not self.is_running: - print("⚠️ 스케줄러가 실행 중이 아닙니다.") + print("⚠️ 스케줄러가 이미 실행 중이 아닙니다.") return - self.scheduler.shutdown() - await azure_storage.disconnect() + print("🔄 시뮬레이터 중지 중...") + + # 실행 중인 작업들이 완료될 수 있도록 잠시 대기 + try: + self.scheduler.shutdown(wait=True) + except Exception as e: + print(f"⚠️ 스케줄러 종료 중 오류: {e}") + + try: + await azure_storage.disconnect() + except Exception as e: + print(f"⚠️ Azure Storage 연결 해제 중 오류: {e}") + self.is_running = False print("🛑 시뮬레이터 중지됨") async def _initial_health_check(self): - """초기 헬스 체크""" - print("🔍 모델 서비스 헬스 체크 중...") - health_status = await model_client.health_check_all() + """초기 헬스 체크 (스프링부트 서비스 포함)""" + print("🔍 서비스 헬스 체크 중...") - for service_name, is_healthy in health_status.items(): - status = "✅" if is_healthy else "❌" - print(f" {status} {service_name}") + # 스프링부트 서비스 헬스 체크 + spring_health = await spring_client.health_check() + status = "✅" if spring_health else "❌" + print( + f" {status} Spring Boot Service ({settings.spring_boot_service_url})") - healthy_count = sum(health_status.values()) - total_count = len(health_status) + if not spring_health: + print("⚠️ 경고: 스프링부트 서비스에 연결할 수 없습니다. 시뮬레이터는 시작되지만 데이터 전송이 실패할 수 있습니다.") - if healthy_count == 0: - raise Exception("모든 모델 서비스가 비활성 상태입니다.") - - print(f"📈 활성 서비스: {healthy_count}/{total_count}") print("-" * 60) async def _simulate_data_collection(self): - """주기적 Welding Machine 데이터 수집 및 예측 작업""" + """주기적 Welding Machine 데이터 수집 및 스프링부트 전송 작업""" try: print( f"🔄 Welding Machine 데이터 수집 시작 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") @@ -94,45 +103,92 @@ async def _simulate_data_collection(self): print(f"📊 전류 데이터: {len(current_data['values'])} 포인트") print(f"📊 진동 데이터: {len(vibration_data['values'])} 포인트") - # 모델 서비스에 예측 요청 (전류 + 진동) - predictions = await model_client.predict_welding_data(current_data, vibration_data) - - if not predictions: - print("❌ 예측 결과를 받을 수 없습니다.") - return - - # 결과 처리 - combined_result = predictions.get("combined") - if not combined_result: - print("❌ 조합된 예측 결과가 없습니다.") - return + # 각 신호 타입별로 스프링부트 서비스에 전송 + results = await self._send_data_to_spring_boot(current_data, vibration_data) - # 이상 감지 여부에 따른 로깅 - if combined_result.get("status") == "anomaly": - # 전체 예측 정보와 원본 데이터 함께 로깅 - anomaly_logger.log_anomaly( - "welding-machine", - combined_result, - { - "current_data": current_data, - "vibration_data": vibration_data, - "detailed_results": predictions - } - ) - print("🚨 이상 감지!") + if results: + print("✅ 데이터 전송 완료") + for signal_type, result in results.items(): + print(f" 📡 {signal_type}: {result.get('message', 'OK')}") else: - anomaly_logger.log_normal_processing( - "welding-machine", combined_result) - print("✅ 정상 상태") + print("❌ 데이터 전송 실패") - # 상세 결과 출력 - print(f"📋 {combined_result.get('combined_logic', 'N/A')}") print("-" * 60) + except asyncio.CancelledError: + print("⚠️ 데이터 수집 작업이 취소되었습니다 (스케줄러 종료)") + # CancelledError는 다시 raise하지 않음 (정상적인 종료) + return except Exception as e: print(f"❌ 데이터 수집 중 오류 발생: {e}") + # 에러 로깅은 유지 (스프링부트 전송 실패 등의 상황을 위해) anomaly_logger.log_error("welding-machine-scheduler", str(e)) + async def _send_data_to_spring_boot(self, current_data: dict, vibration_data: dict) -> dict: + """ + 수집된 데이터를 스프링부트 서비스로 전송 + + Args: + current_data: 전류 센서 데이터 + vibration_data: 진동 센서 데이터 + + Returns: + 전송 결과 딕셔너리 + """ + results = {} + timestamp = datetime.now().isoformat() + machine_id = "WELDING_MACHINE_001" + + try: + # 전류 데이터 전송 + current_sensor_data = { + "signal_type": current_data["signal_type"], + "values": current_data["values"], + "machine_id": machine_id, + "timestamp": timestamp + } + + print( + f"📤 전류 데이터 전송 중... (signal_type: {current_data['signal_type']})") + current_result = await spring_client.send_sensor_data(current_sensor_data) + results["current"] = current_result + + # 잠시 대기 (API 호출 간격) + try: + await asyncio.sleep(0.5) + except asyncio.CancelledError: + print("⚠️ 작업이 취소되었습니다.") + return {} + + # 진동 데이터 전송 + vibration_sensor_data = { + "signal_type": vibration_data["signal_type"], + "values": vibration_data["values"], + "machine_id": machine_id, + "timestamp": timestamp + } + + print( + f"📤 진동 데이터 전송 중... (signal_type: {vibration_data['signal_type']})") + vibration_result = await spring_client.send_sensor_data(vibration_sensor_data) + results["vibration"] = vibration_result + + return results + + except asyncio.CancelledError: + print("⚠️ 스프링부트 데이터 전송이 취소되었습니다.") + return {} + except Exception as e: + print(f"❌ 스프링부트 데이터 전송 실패: {str(e)}") + # 전송 실패시 로컬 로그에 기록 (백업용) + anomaly_logger.log_error("spring-boot-transmission", { + "error": str(e), + "current_data_size": len(current_data.get("values", [])), + "vibration_data_size": len(vibration_data.get("values", [])), + "timestamp": timestamp + }) + return {} + def get_status(self) -> dict: """스케줄러 상태 정보""" jobs = self.scheduler.get_jobs() @@ -140,7 +196,8 @@ def get_status(self) -> dict: "is_running": self.is_running, "interval_minutes": settings.scheduler_interval_minutes, "next_run": str(jobs[0].next_run_time) if jobs else None, - "total_services": len(settings.model_services) + "spring_boot_url": settings.spring_boot_service_url, + "data_transmission_mode": "spring_boot_api" } diff --git a/services/welding-machine-data-simulator-service/app/services/spring_client.py b/services/welding-machine-data-simulator-service/app/services/spring_client.py new file mode 100644 index 0000000..e2afdde --- /dev/null +++ b/services/welding-machine-data-simulator-service/app/services/spring_client.py @@ -0,0 +1,202 @@ +import asyncio +import httpx +import logging +from typing import Dict, Any, Optional +from datetime import datetime +from app.config.settings import settings + +logger = logging.getLogger(__name__) + + +class SpringBootClient: + """스프링부트 서비스와 통신하는 HTTP 클라이언트 (게이트웨이 경유)""" + + def __init__(self): + # spring_boot_service_url → gateway_service_url 변경 + self.base_url = settings.gateway_service_url + self.timeout = settings.spring_boot_timeout + self.max_retries = settings.spring_boot_max_retries + + async def send_sensor_data(self, sensor_data: Dict[str, Any]) -> Dict[str, Any]: + """ + 센서 데이터를 스프링부트 서비스로 전송 (게이트웨이 경유, MSAez 스타일) + + Args: + sensor_data: 센서 데이터 (signal_type, values, machine_id, timestamp 포함) + + Returns: + 스프링부트 서비스 응답 + + Raises: + httpx.HTTPError: HTTP 요청 실패시 + """ + url = settings.spring_boot_endpoints["welding_data"] + + # ✅ SensorDataRequest DTO 형식으로 데이터 구성 (스프링부트가 기대하는 형식) + payload = { + # String 그대로 + "machineId": sensor_data.get("machine_id", "WELDING_MACHINE_001"), + # timestamp (소문자) + "timestamp": sensor_data.get("timestamp", datetime.now().isoformat()), + "signalType": sensor_data["signal_type"], # signalType 필드 추가 + "sensorValues": sensor_data["values"], # sensorValues 배열 그대로 전송 + "dataSource": "simulator" + } + + retry_count = 0 + last_exception = None + + while retry_count <= self.max_retries: + try: + async with httpx.AsyncClient(timeout=self.timeout) as client: + logger.info( + f"게이트웨이를 통해 스프링부트로 센서 데이터 전송 시도 (시도 {retry_count + 1}/{self.max_retries + 1})") + logger.info(f"URL: {url}") + # DEBUG -> INFO로 변경하여 로그에서 확인 + logger.info(f"Payload: {payload}") + + headers = { + "Content-Type": "application/json", + "X-Source": "welding-simulator", + "X-Gateway-Route": "weldingprocessmonitoring" + } + + response = await client.post(url, json=payload, headers=headers) + + # HTTP 상태 코드 확인 + response.raise_for_status() + + result = response.json() + logger.info(f"게이트웨이를 통한 스프링부트 응답 성공: {result}") + + return result + + except asyncio.CancelledError: + logger.info("HTTP 요청이 취소되었습니다.") + raise # CancelledError는 다시 raise해야 함 + + except httpx.TimeoutException as e: + last_exception = e + logger.warning( + f"게이트웨이 타임아웃 (시도 {retry_count + 1}): {str(e)}") + + except httpx.HTTPStatusError as e: + last_exception = e + logger.error( + f"게이트웨이 HTTP 오류 (시도 {retry_count + 1}): {e.response.status_code} - {e.response.text}") + + # 4xx 오류는 재시도하지 않음 + if 400 <= e.response.status_code < 500: + raise e + + except httpx.RequestError as e: + last_exception = e + logger.error( + f"게이트웨이 연결 오류 (시도 {retry_count + 1}): {str(e)}") + + except Exception as e: + last_exception = e + logger.error( + f"게이트웨이 예상치 못한 오류 (시도 {retry_count + 1}): {str(e)}") + + retry_count += 1 + + # 마지막 시도가 아니면 잠시 대기 + if retry_count <= self.max_retries: + try: + await asyncio.sleep(1) + except asyncio.CancelledError: + logger.info("재시도 대기 중 작업 취소됨") + raise + + # 모든 재시도 실패 + logger.error(f"게이트웨이를 통한 스프링부트 전송 실패 - 모든 재시도 완료") + raise last_exception + + def _extract_machine_id(self, machine_id_str: str) -> int: + """기계 ID에서 숫자만 추출""" + import re + numbers = re.findall(r'\d+', machine_id_str) + return int(numbers[0]) if numbers else 1 + + def _format_timestamp(self, timestamp_str: str) -> str: + """타임스탬프를 Java Date 형식으로 변환 (짧은 형식 사용)""" + try: + # 스프링부트 예시에서는 "2025-08-19" 같은 짧은 형식 사용 + from datetime import datetime + if isinstance(timestamp_str, str): + # 'Z' 또는 '+00:00' 제거하고 파싱 + clean_timestamp = timestamp_str.replace( + 'Z', '').replace('+00:00', '') + dt = datetime.fromisoformat(clean_timestamp) + else: + dt = datetime.now() + + # 짧은 날짜 형식으로 변환 (예시와 동일하게): "2025-08-19" + return dt.strftime('%Y-%m-%d') + except Exception as e: + logger.warning(f"타임스탬프 변환 실패: {e}, 현재 날짜 사용") + return datetime.now().strftime('%Y-%m-%d') + + def _map_sensor_values_to_db_columns(self, values: list) -> Dict[str, float]: + """센서 값 배열을 DB 컬럼에 매핑 (기존 엔티티 구조에 맞게)""" + def get_value_at_index(index: int) -> Optional[float]: + return float(values[index]) if index < len(values) else None + + return { + "sensorValue0Ms": get_value_at_index(0), + "sensorValue25Ms": get_value_at_index(25), + "sensorValue125Ms": get_value_at_index(125), + "sensorValue312Ms": get_value_at_index(312), + "sensorValue375Ms": get_value_at_index(375), + "sensorValue625Ms": get_value_at_index(625), + "sensorValue938Ms": get_value_at_index(938), + "sensorValue1562Ms": get_value_at_index(1562), + "sensorValue1875Ms": get_value_at_index(1875), + "sensorValue2188Ms": get_value_at_index(2188), + "sensorValue2812Ms": get_value_at_index(2812), + "sensorValue3125Ms": get_value_at_index(3125), + "sensorValue3438Ms": get_value_at_index(3438), + "sensorValue4062Ms": get_value_at_index(4062), + } + + async def health_check(self) -> bool: + """ + 스프링부트 서비스 헬스 체크 (게이트웨이 경유) + + Returns: + 서비스 상태 (True: 정상, False: 비정상) + """ + try: + url = settings.spring_boot_endpoints["health"] + + async with httpx.AsyncClient(timeout=5) as client: + headers = { + "X-Source": "welding-simulator", + "X-Gateway-Route": "weldingprocessmonitoring" + } + + response = await client.get(url, headers=headers) + + if response.status_code == 200: + try: + health_data = response.json() + status = health_data.get("status", "").upper() + logger.info(f"게이트웨이를 통한 스프링부트 헬스 체크 성공: {status}") + return status == "UP" + except: + # JSON 파싱 실패 시에도 200이면 정상으로 간주 + logger.info("게이트웨이를 통한 스프링부트 헬스 체크 성공 (200 OK)") + return True + else: + logger.warning( + f"게이트웨이를 통한 스프링부트 헬스 체크 실패: HTTP {response.status_code}") + return False + + except Exception as e: + logger.error(f"게이트웨이를 통한 스프링부트 헬스 체크 오류: {str(e)}") + return False + + +# 전역 클라이언트 인스턴스 +spring_client = SpringBootClient()