Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions optics_framework/api/action_keyword.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Callable, Optional, Any
from optics_framework.common.logging_config import internal_logger, execution_logger
from optics_framework.common.optics_builder import OpticsBuilder
from optics_framework.common.strategies import StrategyManager
from optics_framework.common import utils
from .verifier import Verifier

Expand Down Expand Up @@ -54,9 +53,7 @@ def __init__(self, builder: OpticsBuilder):
self.image_detection = builder.get_image_detection()
self.text_detection = builder.get_text_detection()
self.verifier = Verifier(builder)
self.strategy_manager = StrategyManager(
self.element_source, self.text_detection, self.image_detection)

self.strategy_manager = builder.get_strategy_manager()
# Click actions
@with_self_healing
def press_element(
Expand Down
2 changes: 1 addition & 1 deletion optics_framework/api/verifier.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from typing import Optional, Any
from optics_framework.common.logging_config import internal_logger
from optics_framework.common import utils
from optics_framework.common.optics_builder import OpticsBuilder
from optics_framework.common.strategies import StrategyManager
from optics_framework.common.optics_builder import OpticsBuilder
from optics_framework.common.eventSDK import EventSDK

class Verifier:
Expand Down
1 change: 1 addition & 0 deletions optics_framework/common/config_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class Config(BaseModel):
event_attributes_json: Optional[str] = None
halt_duration: float = 0.1
max_attempts: int = 3
synthetic: bool = False

def __init__(self, **data):
super().__init__(**data)
Expand Down
19 changes: 19 additions & 0 deletions optics_framework/common/optics_builder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Union, List, Dict, Optional, Type, TypeVar
from optics_framework.common.factories import DeviceFactory, ElementSourceFactory, ImageFactory, TextFactory
from optics_framework.common.strategies import StrategyManager
from pydantic import BaseModel

T = TypeVar('T') # Generic type for the build method
Expand All @@ -20,6 +21,7 @@ class OpticsBuilder:

def __init__(self):
self.config = OpticsConfig()
self._strategy_manager = None

# Fluent methods to set configurations
def add_driver(self, config: Union[str, List[Union[str, Dict]]]) -> 'OpticsBuilder':
Expand Down Expand Up @@ -59,6 +61,23 @@ def get_text_detection(self):
return None
return TextFactory.get_driver(self.config.text_config)

def get_strategy_manager(self):
"""
Get or create the singleton StrategyManager instance.

:return: A singleton StrategyManager instance configured with the builder's dependencies.
:raises ValueError: If required configurations are missing.
"""
if self._strategy_manager is None:
element_source = self.get_element_source()
text_detection = self.get_text_detection()
image_detection = self.get_image_detection()

self._strategy_manager = StrategyManager(
element_source, text_detection, image_detection
)
return self._strategy_manager

def build(self, cls: Type[T]) -> T:
"""
Build an instance of the specified class using the stored configurations.
Expand Down
223 changes: 159 additions & 64 deletions optics_framework/common/strategies.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
from abc import ABC, abstractmethod
import inspect
from typing import List, Union, Tuple, Generator, Set, Optional
from typing import List, Union, Tuple, Generator, Set, Optional, Callable
from optics_framework.common.base_factory import InstanceFallback
from optics_framework.common.elementsource_interface import ElementSourceInterface
from optics_framework.common import utils
from optics_framework.common.screenshot_stream import ScreenshotStream
from optics_framework.common.logging_config import internal_logger, execution_logger
from optics_framework.common.execution_tracer import execution_tracer
from optics_framework.engines.vision_models.base_methods import match_and_annotate
from optics_framework.common.config_handler import ConfigHandler
import numpy as np
import time

def get_test_mode() -> bool:
"""Check if the test mode is enabled based on the configuration."""
config_handler = ConfigHandler.get_instance()
return config_handler.config.synthetic


class LocatorStrategy(ABC):
"""Abstract base class for element location strategies."""
Expand Down Expand Up @@ -105,7 +111,108 @@ def assert_elements(self, elements: list, timeout: int = 30, rule: str = 'any')
def supports(element_type: str, element_source: ElementSourceInterface) -> bool:
return element_type == "Text" and LocatorStrategy._is_method_implemented(element_source, "locate")

class TextDetectionStrategy(LocatorStrategy):

class _AssertionLoop:
"""Shared assertion loop (streaming vs single-capture) for low complexity & reuse."""

screenshot_timeout: float = 0.2 # subclasses may override

@property
def element_source(self) -> "ElementSourceInterface":
raise NotImplementedError

def _sleep_step(self) -> float:
return getattr(self, "screenshot_timeout", 0.2)

def _save_and_return(
self,
annotated,
save_name: str,
ts: Optional[str]
) -> Tuple[bool, Optional[str]]:
if annotated is not None:
utils.save_screenshot(annotated, save_name)
return True, ts

def _process_frame_batch(
self,
frames,
frame_handler: Callable[[object, Optional[str]], Tuple[bool, Optional[object]]],
save_name: str,
) -> Tuple[bool, Optional[str]]:
for frame, ts in frames:
hit, annotated = frame_handler(frame, ts)
if hit:
return self._save_and_return(annotated, save_name, ts)
return False, None

def _stream_assertion_loop(
self,
timeout: int,
save_name: str,
frame_handler: Callable[[object, Optional[str]], Tuple[bool, Optional[object]]],
) -> Tuple[bool, Optional[str]]:
deadline = time.time() + timeout
sleep_step = self._sleep_step()
ss_stream = self.strategy_manager.capture_screenshot_stream(timeout=timeout)

try:
while time.time() < deadline:
remaining = max(0.0, deadline - time.time())
frames = ss_stream.get_all_available_screenshots(wait_time=min(1.0, remaining))
if not frames:
time.sleep(sleep_step)
continue

hit, ts = self._process_frame_batch(frames, frame_handler, save_name)
if hit:
return True, ts
finally:
try:
ss_stream.stop_capture()
except Exception as e:
execution_logger.warning(f"Failed to stop screenshot stream: {e}")

return False, None

def _single_capture_loop(
self,
timeout: int,
save_name: str,
frame_handler: Callable[[object, Optional[str]], Tuple[bool, Optional[object]]],
) -> Tuple[bool, Optional[str]]:
deadline = time.time() + timeout
sleep_step = self._sleep_step()

while time.time() < deadline:
screenshot = self.element_source.capture()
if screenshot is None:
time.sleep(sleep_step)
continue

hit, annotated = frame_handler(screenshot, None)
if hit:
ts = utils.get_current_timestamp()
return self._save_and_return(annotated, save_name, ts)

time.sleep(sleep_step)

return False, None

def _run_assertion(
self,
timeout: int,
synthetic: bool,
save_name: str,
frame_handler: Callable[[object, Optional[str]], Tuple[bool, Optional[object]]],
) -> Tuple[bool, Optional[str]]:
"""Dispatch to the appropriate low-complexity loop."""
if synthetic:
return self._stream_assertion_loop(timeout, save_name, frame_handler)
return self._single_capture_loop(timeout, save_name, frame_handler)


class TextDetectionStrategy(LocatorStrategy, _AssertionLoop):
"""Strategy for locating text elements using text detection."""

def __init__(self, element_source: ElementSourceInterface, text_detection, strategy_manager):
Expand All @@ -123,47 +230,43 @@ def locate(self, element: str) -> Union[object, Tuple[int, int]]:
_, coor, _ = self.text_detection.find_element(screenshot, element)
return coor

def assert_elements(self, elements: list, timeout: int = 30, rule: str = 'any') -> Tuple[bool, str]:
end_time = time.time() + timeout
def assert_elements(self, elements: list, timeout: int = 30, rule: str = "any") -> Tuple[bool, Optional[str]]:
"""
OCR/text assertion:
- synthetic=True: use continuous stream
- synthetic=False: single-shot capture via self.element_source.capture()
Returns:
(found: bool, timestamp: Optional[str])
"""
found_status = dict.fromkeys(elements, False)
result = False
annotated_frame = None
timestamp = None
ss_stream = self.strategy_manager.capture_screenshot_stream(timeout=timeout)
try:
while time.time() < end_time:
time.sleep(self.screenshot_timeout) # Allow some time for screenshots to be captured
frames = ss_stream.get_all_available_screenshots(wait_time=1)
if not frames:
time.sleep(self.screenshot_timeout)
continue
for frame, ts in frames:
current_frame = frame.copy()
detected_texts, ocr_results = self.text_detection.detect_text(current_frame)
execution_logger.info(f"Detected texts: {detected_texts}")
match_and_annotate(ocr_results, elements, found_status, current_frame)

if (rule == "any" and any(found_status.values())) or (rule == "all" and all(found_status.values())):
result = True
timestamp = ts
execution_logger.info(f"Elements found: {found_status} on screenshot taken at {timestamp}")
annotated_frame = current_frame
break

if result:
break
finally:
ss_stream.stop_capture()
if annotated_frame is not None:
utils.save_screenshot(annotated_frame, "assert_elements_text_detection_result")
return result, timestamp
synthetic = get_test_mode()

def satisfied() -> bool:
return (rule == "any" and any(found_status.values())) or (rule == "all" and all(found_status.values()))

def frame_handler(frame, ts) -> Tuple[bool, Optional[object]]:
current = frame.copy()
detected_texts, ocr_results = self.text_detection.detect_text(current)
execution_logger.info(f"Detected texts: {detected_texts}")

# Mutates found_status in-place and draws on 'current'
match_and_annotate(ocr_results, elements, found_status, current)

return (satisfied(), current if satisfied() else None)

return self._run_assertion(
timeout=timeout,
synthetic=synthetic,
save_name="assert_elements_text_detection_result",
frame_handler=frame_handler,
)

@staticmethod
def supports(element_type: str, element_source: ElementSourceInterface) -> bool:
return element_type == "Text" and LocatorStrategy._is_method_implemented(element_source, "capture")


class ImageDetectionStrategy(LocatorStrategy):
class ImageDetectionStrategy(LocatorStrategy, _AssertionLoop):
"""Strategy for locating image elements using image detection."""

def __init__(self, element_source: ElementSourceInterface, image_detection, strategy_manager):
Expand All @@ -181,34 +284,26 @@ def locate(self, element: str) -> Union[object, Tuple[int, int]]:
_, centre, _ = self.image_detection.find_element(screenshot, element)
return centre

def assert_elements(self, elements: list, timeout: int = 30, rule: str = 'any') -> Tuple[bool, str]:
end_time = time.time() + timeout
result = False
ss_stream = self.strategy_manager.capture_screenshot_stream(timeout=timeout)
annotated_frame = None
timestamp = None
try:
while time.time() < end_time:
time.sleep(self.screenshot_timeout) # Allow some time for screenshots to be captured
frames = ss_stream.get_all_available_screenshots(wait_time=1)
if not frames:
time.sleep(self.screenshot_timeout)
continue
for frame, ts in frames:
current_frame = frame.copy()
result, annotated = self.image_detection.assert_elements(current_frame, elements, rule)
if result:
timestamp = ts
annotated_frame = annotated # assuming assert_elements returns the annotated image
execution_logger.info(f"Image elements found on screenshot taken at {timestamp}")
break
if result:
break
finally:
ss_stream.stop_capture()
if annotated_frame is not None:
utils.save_screenshot(annotated_frame, "assert_elements_image_detection_result")
return result, timestamp
def assert_elements(self, elements: list, timeout: int = 30, rule: str = "any") -> Tuple[bool, Optional[str]]:
"""
Image assertion:
- synthetic=True: continuous stream
- synthetic=False: single-shot capture via self.element_source.capture()
Returns:
(found: bool, timestamp: Optional[str])
"""
synthetic = get_test_mode()

def frame_handler(frame, ts) -> Tuple[bool, Optional[object]]:
ok, annotated = self.image_detection.assert_elements(frame, elements, rule)
return ok, annotated if ok else None

return self._run_assertion(
timeout=timeout,
synthetic=synthetic,
save_name="assert_elements_image_detection_result",
frame_handler=frame_handler,
)

@staticmethod
def supports(element_type: str, element_source: ElementSourceInterface) -> bool:
Expand Down