diff --git a/optics_framework/api/action_keyword.py b/optics_framework/api/action_keyword.py index dd3725bc..f721e1ea 100644 --- a/optics_framework/api/action_keyword.py +++ b/optics_framework/api/action_keyword.py @@ -3,7 +3,6 @@ from typing import Callable, Optional, Any from optics_framework.common.logging_config import internal_logger, execution_logger from optics_framework.common.optics_builder import OpticsBuilder -from optics_framework.common.strategies import StrategyManager from optics_framework.common import utils from .verifier import Verifier @@ -54,9 +53,7 @@ def __init__(self, builder: OpticsBuilder): self.image_detection = builder.get_image_detection() self.text_detection = builder.get_text_detection() self.verifier = Verifier(builder) - self.strategy_manager = StrategyManager( - self.element_source, self.text_detection, self.image_detection) - + self.strategy_manager = builder.get_strategy_manager() # Click actions @with_self_healing def press_element( diff --git a/optics_framework/api/verifier.py b/optics_framework/api/verifier.py index 9dde87e7..7ba9e7d9 100644 --- a/optics_framework/api/verifier.py +++ b/optics_framework/api/verifier.py @@ -1,8 +1,8 @@ from typing import Optional, Any from optics_framework.common.logging_config import internal_logger from optics_framework.common import utils -from optics_framework.common.optics_builder import OpticsBuilder from optics_framework.common.strategies import StrategyManager +from optics_framework.common.optics_builder import OpticsBuilder from optics_framework.common.eventSDK import EventSDK class Verifier: diff --git a/optics_framework/common/config_handler.py b/optics_framework/common/config_handler.py index a1ebd7aa..b48036cd 100644 --- a/optics_framework/common/config_handler.py +++ b/optics_framework/common/config_handler.py @@ -49,6 +49,7 @@ class Config(BaseModel): event_attributes_json: Optional[str] = None halt_duration: float = 0.1 max_attempts: int = 3 + synthetic: bool = False def __init__(self, **data): super().__init__(**data) diff --git a/optics_framework/common/optics_builder.py b/optics_framework/common/optics_builder.py index 496cb238..64df42f7 100644 --- a/optics_framework/common/optics_builder.py +++ b/optics_framework/common/optics_builder.py @@ -1,5 +1,6 @@ from typing import Union, List, Dict, Optional, Type, TypeVar from optics_framework.common.factories import DeviceFactory, ElementSourceFactory, ImageFactory, TextFactory +from optics_framework.common.strategies import StrategyManager from pydantic import BaseModel T = TypeVar('T') # Generic type for the build method @@ -20,6 +21,7 @@ class OpticsBuilder: def __init__(self): self.config = OpticsConfig() + self._strategy_manager = None # Fluent methods to set configurations def add_driver(self, config: Union[str, List[Union[str, Dict]]]) -> 'OpticsBuilder': @@ -59,6 +61,23 @@ def get_text_detection(self): return None return TextFactory.get_driver(self.config.text_config) + def get_strategy_manager(self): + """ + Get or create the singleton StrategyManager instance. + + :return: A singleton StrategyManager instance configured with the builder's dependencies. + :raises ValueError: If required configurations are missing. + """ + if self._strategy_manager is None: + element_source = self.get_element_source() + text_detection = self.get_text_detection() + image_detection = self.get_image_detection() + + self._strategy_manager = StrategyManager( + element_source, text_detection, image_detection + ) + return self._strategy_manager + def build(self, cls: Type[T]) -> T: """ Build an instance of the specified class using the stored configurations. diff --git a/optics_framework/common/strategies.py b/optics_framework/common/strategies.py index e07dbbac..e82d4fd9 100644 --- a/optics_framework/common/strategies.py +++ b/optics_framework/common/strategies.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod import inspect -from typing import List, Union, Tuple, Generator, Set, Optional +from typing import List, Union, Tuple, Generator, Set, Optional, Callable from optics_framework.common.base_factory import InstanceFallback from optics_framework.common.elementsource_interface import ElementSourceInterface from optics_framework.common import utils @@ -8,9 +8,15 @@ from optics_framework.common.logging_config import internal_logger, execution_logger from optics_framework.common.execution_tracer import execution_tracer from optics_framework.engines.vision_models.base_methods import match_and_annotate +from optics_framework.common.config_handler import ConfigHandler import numpy as np import time +def get_test_mode() -> bool: + """Check if the test mode is enabled based on the configuration.""" + config_handler = ConfigHandler.get_instance() + return config_handler.config.synthetic + class LocatorStrategy(ABC): """Abstract base class for element location strategies.""" @@ -105,7 +111,108 @@ def assert_elements(self, elements: list, timeout: int = 30, rule: str = 'any') def supports(element_type: str, element_source: ElementSourceInterface) -> bool: return element_type == "Text" and LocatorStrategy._is_method_implemented(element_source, "locate") -class TextDetectionStrategy(LocatorStrategy): + +class _AssertionLoop: + """Shared assertion loop (streaming vs single-capture) for low complexity & reuse.""" + + screenshot_timeout: float = 0.2 # subclasses may override + + @property + def element_source(self) -> "ElementSourceInterface": + raise NotImplementedError + + def _sleep_step(self) -> float: + return getattr(self, "screenshot_timeout", 0.2) + + def _save_and_return( + self, + annotated, + save_name: str, + ts: Optional[str] + ) -> Tuple[bool, Optional[str]]: + if annotated is not None: + utils.save_screenshot(annotated, save_name) + return True, ts + + def _process_frame_batch( + self, + frames, + frame_handler: Callable[[object, Optional[str]], Tuple[bool, Optional[object]]], + save_name: str, + ) -> Tuple[bool, Optional[str]]: + for frame, ts in frames: + hit, annotated = frame_handler(frame, ts) + if hit: + return self._save_and_return(annotated, save_name, ts) + return False, None + + def _stream_assertion_loop( + self, + timeout: int, + save_name: str, + frame_handler: Callable[[object, Optional[str]], Tuple[bool, Optional[object]]], + ) -> Tuple[bool, Optional[str]]: + deadline = time.time() + timeout + sleep_step = self._sleep_step() + ss_stream = self.strategy_manager.capture_screenshot_stream(timeout=timeout) + + try: + while time.time() < deadline: + remaining = max(0.0, deadline - time.time()) + frames = ss_stream.get_all_available_screenshots(wait_time=min(1.0, remaining)) + if not frames: + time.sleep(sleep_step) + continue + + hit, ts = self._process_frame_batch(frames, frame_handler, save_name) + if hit: + return True, ts + finally: + try: + ss_stream.stop_capture() + except Exception as e: + execution_logger.warning(f"Failed to stop screenshot stream: {e}") + + return False, None + + def _single_capture_loop( + self, + timeout: int, + save_name: str, + frame_handler: Callable[[object, Optional[str]], Tuple[bool, Optional[object]]], + ) -> Tuple[bool, Optional[str]]: + deadline = time.time() + timeout + sleep_step = self._sleep_step() + + while time.time() < deadline: + screenshot = self.element_source.capture() + if screenshot is None: + time.sleep(sleep_step) + continue + + hit, annotated = frame_handler(screenshot, None) + if hit: + ts = utils.get_current_timestamp() + return self._save_and_return(annotated, save_name, ts) + + time.sleep(sleep_step) + + return False, None + + def _run_assertion( + self, + timeout: int, + synthetic: bool, + save_name: str, + frame_handler: Callable[[object, Optional[str]], Tuple[bool, Optional[object]]], + ) -> Tuple[bool, Optional[str]]: + """Dispatch to the appropriate low-complexity loop.""" + if synthetic: + return self._stream_assertion_loop(timeout, save_name, frame_handler) + return self._single_capture_loop(timeout, save_name, frame_handler) + + +class TextDetectionStrategy(LocatorStrategy, _AssertionLoop): """Strategy for locating text elements using text detection.""" def __init__(self, element_source: ElementSourceInterface, text_detection, strategy_manager): @@ -123,47 +230,43 @@ def locate(self, element: str) -> Union[object, Tuple[int, int]]: _, coor, _ = self.text_detection.find_element(screenshot, element) return coor - def assert_elements(self, elements: list, timeout: int = 30, rule: str = 'any') -> Tuple[bool, str]: - end_time = time.time() + timeout + def assert_elements(self, elements: list, timeout: int = 30, rule: str = "any") -> Tuple[bool, Optional[str]]: + """ + OCR/text assertion: + - synthetic=True: use continuous stream + - synthetic=False: single-shot capture via self.element_source.capture() + Returns: + (found: bool, timestamp: Optional[str]) + """ found_status = dict.fromkeys(elements, False) - result = False - annotated_frame = None - timestamp = None - ss_stream = self.strategy_manager.capture_screenshot_stream(timeout=timeout) - try: - while time.time() < end_time: - time.sleep(self.screenshot_timeout) # Allow some time for screenshots to be captured - frames = ss_stream.get_all_available_screenshots(wait_time=1) - if not frames: - time.sleep(self.screenshot_timeout) - continue - for frame, ts in frames: - current_frame = frame.copy() - detected_texts, ocr_results = self.text_detection.detect_text(current_frame) - execution_logger.info(f"Detected texts: {detected_texts}") - match_and_annotate(ocr_results, elements, found_status, current_frame) - - if (rule == "any" and any(found_status.values())) or (rule == "all" and all(found_status.values())): - result = True - timestamp = ts - execution_logger.info(f"Elements found: {found_status} on screenshot taken at {timestamp}") - annotated_frame = current_frame - break - - if result: - break - finally: - ss_stream.stop_capture() - if annotated_frame is not None: - utils.save_screenshot(annotated_frame, "assert_elements_text_detection_result") - return result, timestamp + synthetic = get_test_mode() + + def satisfied() -> bool: + return (rule == "any" and any(found_status.values())) or (rule == "all" and all(found_status.values())) + + def frame_handler(frame, ts) -> Tuple[bool, Optional[object]]: + current = frame.copy() + detected_texts, ocr_results = self.text_detection.detect_text(current) + execution_logger.info(f"Detected texts: {detected_texts}") + + # Mutates found_status in-place and draws on 'current' + match_and_annotate(ocr_results, elements, found_status, current) + + return (satisfied(), current if satisfied() else None) + + return self._run_assertion( + timeout=timeout, + synthetic=synthetic, + save_name="assert_elements_text_detection_result", + frame_handler=frame_handler, + ) @staticmethod def supports(element_type: str, element_source: ElementSourceInterface) -> bool: return element_type == "Text" and LocatorStrategy._is_method_implemented(element_source, "capture") -class ImageDetectionStrategy(LocatorStrategy): +class ImageDetectionStrategy(LocatorStrategy, _AssertionLoop): """Strategy for locating image elements using image detection.""" def __init__(self, element_source: ElementSourceInterface, image_detection, strategy_manager): @@ -181,34 +284,26 @@ def locate(self, element: str) -> Union[object, Tuple[int, int]]: _, centre, _ = self.image_detection.find_element(screenshot, element) return centre - def assert_elements(self, elements: list, timeout: int = 30, rule: str = 'any') -> Tuple[bool, str]: - end_time = time.time() + timeout - result = False - ss_stream = self.strategy_manager.capture_screenshot_stream(timeout=timeout) - annotated_frame = None - timestamp = None - try: - while time.time() < end_time: - time.sleep(self.screenshot_timeout) # Allow some time for screenshots to be captured - frames = ss_stream.get_all_available_screenshots(wait_time=1) - if not frames: - time.sleep(self.screenshot_timeout) - continue - for frame, ts in frames: - current_frame = frame.copy() - result, annotated = self.image_detection.assert_elements(current_frame, elements, rule) - if result: - timestamp = ts - annotated_frame = annotated # assuming assert_elements returns the annotated image - execution_logger.info(f"Image elements found on screenshot taken at {timestamp}") - break - if result: - break - finally: - ss_stream.stop_capture() - if annotated_frame is not None: - utils.save_screenshot(annotated_frame, "assert_elements_image_detection_result") - return result, timestamp + def assert_elements(self, elements: list, timeout: int = 30, rule: str = "any") -> Tuple[bool, Optional[str]]: + """ + Image assertion: + - synthetic=True: continuous stream + - synthetic=False: single-shot capture via self.element_source.capture() + Returns: + (found: bool, timestamp: Optional[str]) + """ + synthetic = get_test_mode() + + def frame_handler(frame, ts) -> Tuple[bool, Optional[object]]: + ok, annotated = self.image_detection.assert_elements(frame, elements, rule) + return ok, annotated if ok else None + + return self._run_assertion( + timeout=timeout, + synthetic=synthetic, + save_name="assert_elements_image_detection_result", + frame_handler=frame_handler, + ) @staticmethod def supports(element_type: str, element_source: ElementSourceInterface) -> bool: