-
Notifications
You must be signed in to change notification settings - Fork 0
Feat/55/press-fault-data-simulator-service #64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
21de7f6
d165221
21a52e2
b3c70e1
d26302c
9946cc2
2c1b264
c24dce2
a94f337
3ec5a64
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -205,16 +205,29 @@ def _clear_cache(self): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.current_file_name = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.current_row_index = 0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async def close(self): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async def reset_connection(self): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """연결 상태 초기화 (재시작 시 사용)""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if hasattr(self, '_current_client'): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await self._current_client.close() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| delattr(self, '_current_client') | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.current_file_name = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.current_row_index = 0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| system_log.info("Azure Storage 연결 상태 초기화 완료") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| system_log.warning(f"연결 초기화 중 오류 (무시됨): {str(e)}") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async def close(self): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """연결 종료 (선택적)""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if hasattr(self, "blob_service_client") and self.blob_service_client: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await self.blob_service_client.close() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| system_log.info("Azure Storage 연결 종료 완료") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if hasattr(self, '_current_client'): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await self._current_client.close() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| delattr(self, '_current_client') | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| system_log.info("Azure Storage 연결 종료 완료") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| system_log.error(f"Azure Storage 연결 종료 중 오류: {str(e)}") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| finally: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.is_connected = False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| system_log.warning(f"연결 종료 중 오류 (무시됨): {str(e)}") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+222
to
231
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. close() no longer closes BlobServiceClient; potential resource leak and failing tests close() checks only _current_client and never closes blob_service_client. Existing tests (e.g., test_close_closes_underlying_client) expect blob_service_client.close() to be awaited. This is both a functional leak and a test regression. Apply this minimal, backward-compatible fix: - async def close(self):
+ async def close(self) -> None:
"""연결 종료 (선택적)"""
try:
- if hasattr(self, '_current_client'):
- await self._current_client.close()
- delattr(self, '_current_client')
- system_log.info("Azure Storage 연결 종료 완료")
+ # 우선 실제 BlobServiceClient를 닫음
+ if hasattr(self, 'blob_service_client') and self.blob_service_client:
+ await self.blob_service_client.close()
+ # 호환성: 개별 클라이언트가 있다면 닫음
+ if hasattr(self, '_current_client'):
+ await self._current_client.close()
+ delattr(self, '_current_client')
+ system_log.info("Azure Storage 연결 종료 완료")
except Exception as e:
- system_log.warning(f"연결 종료 중 오류 (무시됨): {str(e)}")
+ system_log.warning(f"연결 종료 중 오류 (무시됨): {str(e)}")
+ finally:
+ self.is_connected = FalseThis will also restore the test expectation that the underlying client is closed. 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| azure_storage = AzureStorageService() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,10 +3,11 @@ | |
| from datetime import datetime | ||
|
|
||
| from app.services.azure_storage_service import AzureStorageService | ||
| from app.services.prediction_api_service import PredictAPIService | ||
| # from app.services.prediction_api_service import PredictAPIService | ||
| from app.services.spring_boot_service import SpringBootService | ||
| from app.models.data_models import PredictionRequest | ||
| from app.config.settings import settings | ||
| from app.utils.logger import system_log, prediction_log | ||
| from app.utils.logger import system_log | ||
|
|
||
|
|
||
| class SchedulerService: | ||
|
|
@@ -18,13 +19,17 @@ def __init__(self): | |
|
|
||
| # 서비스 인스턴스들 | ||
| self.storage_service = AzureStorageService() | ||
| self.api_service = PredictAPIService() | ||
| # self.api_service = PredictAPIService() | ||
| self.spring_boot_service = SpringBootService() | ||
|
|
||
| # 통계 | ||
| self.total_predictions = 0 | ||
| self.fault_detections = 0 | ||
| # self.total_predictions = 0 | ||
| # self.fault_detections = 0 | ||
| self.start_time: Optional[datetime] = None | ||
|
|
||
| self.total_transmissions = 0 | ||
| self.successful_transmissions = 0 | ||
|
|
||
|
Comment on lines
+26
to
+32
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Verification agent 🧩 Analysis chainCounters renamed; ensure references/tests are updated You replaced total_predictions/fault_detections with total_transmissions/successful_transmissions. Ensure no stale references remain in tests or other modules (e.g., test_scheduler_service). Run this to locate legacy identifiers and logger usage that may now be dead: 🏁 Script executed: #!/bin/bash
set -euo pipefail
# Find old counters/loggers that should be removed/updated
rg -nP -C2 '\b(total_predictions|fault_detections|prediction_log)\b'Length of output: 21033 🏁 Script executed: #!/bin/bash
set -euo pipefail
sed -n '140,200p' services/press-fault-data-simulator-service/app/services/scheduler_service.pyLength of output: 2067 🏁 Script executed: #!/bin/bash
set -euo pipefail
sed -n '180,260p' services/press-fault-data-simulator-service/app/services/scheduler_service.pyLength of output: 1367 Update test references after renaming counters The scheduler service now uses • Around lines 154–155:
• Around lines 228–229 and 243–244 (after
• Around lines 260–261 and 272–273 (error cases):
• Around lines 338–339 (in
Make sure all references to the old counters in tests are updated to the new names. 🤖 Prompt for AI Agents |
||
| async def start_simulation(self) -> bool: | ||
| """시뮬레이션 시작!""" | ||
|
|
||
|
|
@@ -34,28 +39,30 @@ async def start_simulation(self) -> bool: | |
|
|
||
| system_log.info("🚀 시뮬레이션 시작 중...") | ||
|
|
||
| # API 서버 상태 확인 | ||
| if not await self.api_service.health_check(): | ||
| system_log.error("API 서버 연결 실패. 시뮬레이션을 시작할 수 없습니다.") | ||
| await self.storage_service.reset_connection() | ||
| # Spring Boot 서버 상태 확인 | ||
| if not await self.spring_boot_service.health_check(): | ||
| system_log.error("Spring Boot 서버 연결 실패. 시뮬레이션을 시작할 수 없습니다.") | ||
| return False | ||
|
|
||
| # Azure Storage 연결 확인 | ||
| if not await self.storage_service.connect(): | ||
| system_log.error("Azure Storage 연결 실패.") | ||
| return False | ||
|
|
||
| self.is_running = True | ||
| self.start_time = datetime.now() | ||
| self.total_predictions = 0 | ||
| self.fault_detections = 0 | ||
| # self.total_predictions = 0 | ||
| # self.fault_detections = 0 | ||
| self.total_transmissions = 0 | ||
| self.successful_transmissions = 0 | ||
|
|
||
| self.loop = asyncio.get_event_loop() | ||
| self.task = self.loop.create_task(self._run_simulation_loop()) | ||
|
|
||
| system_log.info( | ||
| f"✅ 시뮬레이션 시작됨 - 간격: {settings.SIMULATOR_INTERVAL_MINUTES}분" | ||
| ) | ||
| system_log.info(f"📊 API URL: {settings.PREDICTION_API_FULL_URL}") | ||
|
|
||
| return True | ||
|
|
||
|
|
@@ -83,7 +90,7 @@ async def stop_simulation(self) -> bool: | |
| if self.start_time: | ||
| duration = datetime.now() - self.start_time | ||
| system_log.info(f"실행 시간: {duration}") | ||
| system_log.info(f"총 예측 횟수: {self.total_predictions}") | ||
| system_log.info(f" └─ 총 전송 횟수: {self.total_transmissions}") | ||
|
|
||
| system_log.info("✅ 시뮬레이션 종료 완료") | ||
| return True | ||
|
|
@@ -131,24 +138,25 @@ async def _run_single_simulation(self) -> bool: | |
|
|
||
| minute_data, file_name, is_end_of_file = data_result | ||
|
|
||
| # 2. 예측 요청 데이터 생성 | ||
| prediction_request = PredictionRequest.from_csv_data(minute_data) | ||
| # 3. API 호출 | ||
| prediction_result = await self.api_service.call_predict_api( | ||
| prediction_request | ||
| # 2. Spring Boot 전송용 데이터 생성 | ||
| sensor_data_request = PredictionRequest.from_csv_data(minute_data) | ||
| # 3. Spring Boot로 데이터 전송 | ||
| transmission_success = await self.spring_boot_service.send_sensor_data( | ||
| sensor_data_request, | ||
| data_source=file_name | ||
| ) | ||
|
|
||
| if prediction_result is None: | ||
| system_log.error("API 호출 실패") | ||
| if not transmission_success: | ||
| system_log.error("Spring Boot 데이터 전송 실패") | ||
| return False | ||
|
|
||
| # 4. 결과 로그 처리 | ||
| self._handle_prediction_result(prediction_result, file_name) | ||
|
|
||
| # 4. 전송 성공 로그 | ||
| system_log.info(f"✅ 데이터 전송 성공 - Source: {file_name}, " | ||
| f"Size: {len(minute_data)}행") | ||
| # 5. 통계 업데이트 | ||
| self.total_predictions += 1 | ||
| if prediction_result.is_fault: | ||
| self.fault_detections += 1 | ||
| self.total_transmissions += 1 | ||
| if transmission_success: | ||
| self.successful_transmissions += 1 | ||
|
|
||
| if is_end_of_file: | ||
| system_log.info(f"파일 '{file_name}' 처리 완료") | ||
|
|
@@ -159,7 +167,7 @@ async def _run_single_simulation(self) -> bool: | |
| system_log.error(f"시뮬레이션 실행 오류: {str(e)}") | ||
| return False | ||
|
|
||
| def _handle_prediction_result(self, result, data_source: str): | ||
| # def _handle_prediction_result(self, result, data_source: str): | ||
| """예측 결과 처리 (로그 기록)""" | ||
| status = "FAULT DETECTED" if result.is_fault else "✅ NORMAL" | ||
|
|
||
|
|
@@ -200,10 +208,9 @@ def get_simulation_status(self) -> dict: | |
| "is_running": True, | ||
| "start_time": self.start_time.isoformat() if self.start_time else None, | ||
| "runtime": runtime, | ||
| "total_predictions": self.total_predictions, | ||
| "fault_detections": self.fault_detections, | ||
| "total_transmissions": self.total_transmissions, | ||
| "successful_transmissions": self.successful_transmissions, | ||
| "interval_minutes": settings.SIMULATOR_INTERVAL_MINUTES, | ||
| "api_url": settings.PREDICTION_API_FULL_URL, | ||
| "storage_status": storage_status, | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
reset_connection uses an undefined _current_client and doesn’t ensure the main client is closed
_reset_connection() currently targets self._current_client, which is never initialized elsewhere in this class, and it doesn’t close blob_service_client. This can leave connections hanging and diverges from existing tests that expect blob_service_client.close() to be awaited.
Proposed fix: delegate to close(), reset state, and mark disconnected.
This aligns behavior with expectations and avoids leaking the underlying client.
📝 Committable suggestion