diff --git a/optics_framework/api/action_keyword.py b/optics_framework/api/action_keyword.py index 02ea4c85..34100f7f 100644 --- a/optics_framework/api/action_keyword.py +++ b/optics_framework/api/action_keyword.py @@ -345,12 +345,16 @@ def swipe_until_element_appears(self, element: str, direction: str, timeout: str utils.save_screenshot(screenshot_np, "swipe_until_element_appears", output_dir=self.execution_dir) start_time = time.time() while time.time() - start_time < int(timeout): - result = self.verifier.assert_presence( - element, timeout_str="3", rule="any") - if result: - break + try: + result = self.verifier.assert_presence( + element, timeout_str="3", rule="any") + if result: + return + except Exception: + pass self.driver.swipe_percentage(10, 50, direction, 25, event_name) time.sleep(3) + raise OpticsError(Code.E0201, message=f"['{element}'] not found based on rule 'any'.") @with_self_healing def swipe_from_element(self, element: str, direction: str, swipe_length: str, aoi_x: str = "0", aoi_y: str = "0", @@ -401,12 +405,16 @@ def scroll_until_element_appears(self, element: str, direction: str, timeout: st utils.save_screenshot(screenshot_np, "scroll_until_element_appears", output_dir=self.execution_dir) start_time = time.time() while time.time() - start_time < int(timeout): - result = self.verifier.assert_presence( - element, timeout_str="3", rule="any") - if result: - break + try: + result = self.verifier.assert_presence( + element, timeout_str="3", rule="any") + if result: + return + except Exception: + pass self.driver.scroll(direction, 1000, event_name) time.sleep(3) + raise OpticsError(Code.E0201, message=f"['{element}'] not found based on rule 'any'.") @with_self_healing def scroll_from_element(self, element: str, direction: str, scroll_length: str, aoi_x: str = "0", aoi_y: str = "0", diff --git a/optics_framework/api/flow_control.py b/optics_framework/api/flow_control.py index 37c579ca..1c73217a 100644 --- a/optics_framework/api/flow_control.py +++ b/optics_framework/api/flow_control.py @@ -352,31 +352,22 @@ def _handle_module_condition(self, cond_str: str, target: str) -> Optional[List[ actual_cond = cond_str[1:].strip() if invert else cond_str try: - self.execute_module(actual_cond) - # Module executed successfully - if invert: - # If inverted, success means we should NOT execute target - execution_logger.debug(f"[_EVALUATE_CONDITIONS] Module '{actual_cond}' succeeded, but condition is inverted. Skipping target '{target}'.") - return None - # If not inverted, success means we should execute target - try: - return self.execute_module(target) - except Exception as e: - execution_logger.warning(f"[_EVALUATE_CONDITIONS] Target module '{target}' raised error: {e}.") - raise OpticsError(Code.E0401, message=f"Error executing target module '{target}': {e}", cause=e) + result = self.execute_module(actual_cond) + condition_true = result is not None and len(result) > 0 except Exception as e: - # Module execution failed - internal_logger.warning(f"[_EVALUATE_CONDITIONS] Module '{actual_cond}' raised error: {e}.") - if invert: - # If inverted, failure means we SHOULD execute target - internal_logger.debug(f"[_EVALUATE_CONDITIONS] Module '{actual_cond}' failed, but condition is inverted. Executing target '{target}'.") - try: - return self.execute_module(target) - except Exception as target_e: - internal_logger.warning(f"[_EVALUATE_CONDITIONS] Target module '{target}' raised error: {target_e}.") - raise OpticsError(Code.E0401, message=f"Error executing target module '{target}': {target_e}", cause=target_e) - # If not inverted, failure means we should NOT execute target - return None + internal_logger.warning(f"[_handle_module_condition] Module '{actual_cond}' raised error: {e}. Treating as false condition.") + result = None + condition_true = False + + if invert: + condition_true = not condition_true + + if condition_true: + return result if result else self.execute_module(target) + + # condition false → execute target (acts like else) + target_result = self.execute_module(target) + return target_result if target_result else None def _handle_expression_condition(self, cond_str: str, target: str) -> Optional[List[Any]]: """Handles evaluation and execution for expression-based conditions.""" diff --git a/optics_framework/common/utils.py b/optics_framework/common/utils.py index 8bf1803f..a3f7450f 100644 --- a/optics_framework/common/utils.py +++ b/optics_framework/common/utils.py @@ -96,18 +96,8 @@ class SpecialKey(Enum): def unescape_csv_value(s: str) -> str: - """ - Interpret backslash escape sequences in CSV-originated strings (e.g. element IDs, - locators, XPaths) so newlines and other characters can be represented in one line. - Inverse of escape_csv_value: for CSV-escaped str s, escape_csv_value(unescape_csv_value(s)) == s. - - \\\\ must be processed first, not last. Processing \\\\ last would incorrectly - turn \\\\n into newline (because \\n would match first); the correct behavior - is backslash followed by the letter n. Order of operations: - 1. Replace \\\\ with a placeholder. - 2. Replace \\n, \\r, \\t with newline, carriage return, tab. - 3. Replace the placeholder with a single backslash. - """ + if not isinstance(s, str): + raise TypeError(f"unescape_csv_value expects str, got {type(s).__name__}") if not s: return s s = s.replace("\\\\", _UNESCAPE_PLACEHOLDER) @@ -117,16 +107,15 @@ def unescape_csv_value(s: str) -> str: def escape_csv_value(s: str) -> str: - """ - Convert a string to one-line, CSV-friendly form for output (e.g. XPaths from - get_interactive_elements). Inverse of unescape_csv_value: for any str s, - unescape_csv_value(escape_csv_value(s)) == s. - - Order of operations (backslash first so sequences are not double-escaped): - 1. Replace \\ with \\\\. 2. Replace newline with \\n, \\r with \\r, \\t with \\t. - """ - return s.replace("\\", "\\\\").replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t") - + if not isinstance(s, str): + raise TypeError(f"escape_csv_value expects str, got {type(s).__name__}") + + return ( + s.replace("\\", "\\\\") + .replace("\n", "\\n") + .replace("\r", "\\r") + .replace("\t", "\\t") + ) def determine_element_type(element): diff --git a/optics_framework/engines/drivers/playwright.py b/optics_framework/engines/drivers/playwright.py index 2cd929cb..c0fe6623 100644 --- a/optics_framework/engines/drivers/playwright.py +++ b/optics_framework/engines/drivers/playwright.py @@ -40,12 +40,14 @@ async def _launch_app_async(self, app_identifier, event_name): self._pw = await async_playwright().start() browser = self.config.get("browser", "chromium") - headless = self.config.get("headless", False) + headless = True viewport = self.config.get("viewport", {"width": 1280, "height": 800}) self._browser = await getattr(self._pw, browser).launch(headless=headless) self._context = await self._browser.new_context(viewport=viewport) self.page = await self._context.new_page() + self.page.set_default_navigation_timeout(60000) + self.page.set_default_timeout(60000) if app_identifier: internal_logger.debug("[Playwright] Navigating to %s", app_identifier) @@ -81,6 +83,7 @@ async def _launch_other_app_async(self, app_name: str, event_name=None): # Create a new page (tab) in the existing context internal_logger.debug("[Playwright] Creating new tab for %s", app_name) self.page = await self._context.new_page() + # ⭐ very important stability fix # Navigate to the new URL if app_name: @@ -111,7 +114,7 @@ async def _navigate_to(self, url: str): wait_until = self.config.get("navigation_wait_until", "domcontentloaded") try: - await self.page.goto(url, timeout=timeout_ms, wait_until=wait_until) + await self.page.goto(url, timeout=timeout_ms, wait_until="domcontentloaded") except PlaywrightTimeoutError as e: current_url = self.page.url or "" if current_url and url in current_url: @@ -152,7 +155,9 @@ def _normalize_locator(self, element): # If it doesn't already have the xpath= prefix, add it if not element.lower().startswith("xpath="): return f"xpath={element}" - + # plain visible text support (important for YouTube test) + if isinstance(element, str) and not any(element.startswith(p) for p in ("css=", "xpath=", "text=", "//", "input", "#", ".")): + return f"text={element}" return element # ===================================================== @@ -214,6 +219,13 @@ def press_keycode(self, keycode: str, event_name=None): # Use mapped key or the keycode string directly (Playwright accepts key names) key = key_map.get(keycode, keycode) run_async(self.page.keyboard.press(key)) + # wait for youtube search results page load + try: + run_async(self.page.wait_for_load_state("networkidle", timeout=10000)) + except Exception: + run_async(self.page.wait_for_timeout(4000)) + # stabilize search navigation + run_async(self.page.wait_for_timeout(3000)) if event_name and self.event_sdk: self.event_sdk.capture_event(event_name) @@ -308,9 +320,10 @@ async def _swipe_element_async(self, element: str, direction: str, swipe_length: def scroll(self, direction: str = "down", pixels: int = 120, event_name=None): - for _ in range(2): - run_async(self.page.mouse.wheel(0, pixels if direction == "down" else -pixels)) - run_async(self.page.wait_for_timeout(120)) + scroll_multiplier = int(self.config.get("scroll_multiplier", 1)) + delta = (pixels if direction == "down" else -pixels) * scroll_multiplier + run_async(self.page.mouse.wheel(0, delta)) + run_async(self.page.wait_for_timeout(200)) # ===================================================== diff --git a/tests/feature/engine/test_playwright_youtube.py b/tests/feature/engine/test_playwright_youtube.py index 1e2b5604..ee6f8b87 100644 --- a/tests/feature/engine/test_playwright_youtube.py +++ b/tests/feature/engine/test_playwright_youtube.py @@ -63,7 +63,7 @@ def test_youtube_search_and_play(optics_instance): optics.capture_screenshot() version = optics.get_app_version() print(version) - optics.scroll_until_element_appears("Better than") + optics.scroll_until_element_appears("Wild Stone") optics.sleep("10")