-
Notifications
You must be signed in to change notification settings - Fork 13
Fix FlowControl condition logic and Playwright text locator handling #266
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f966d79
5bd4638
1c99063
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -345,12 +345,16 @@ def swipe_until_element_appears(self, element: str, direction: str, timeout: str | |
| utils.save_screenshot(screenshot_np, "swipe_until_element_appears", output_dir=self.execution_dir) | ||
| start_time = time.time() | ||
| while time.time() - start_time < int(timeout): | ||
| result = self.verifier.assert_presence( | ||
| element, timeout_str="3", rule="any") | ||
| if result: | ||
| break | ||
| try: | ||
| result = self.verifier.assert_presence( | ||
| element, timeout_str="3", rule="any") | ||
| if result: | ||
| return | ||
| except Exception: | ||
| pass | ||
| self.driver.swipe_percentage(10, 50, direction, 25, event_name) | ||
| time.sleep(3) | ||
| raise OpticsError(Code.E0201, message=f"['{element}'] not found based on rule 'any'.") | ||
|
|
||
| @with_self_healing | ||
| def swipe_from_element(self, element: str, direction: str, swipe_length: str, aoi_x: str = "0", aoi_y: str = "0", | ||
|
|
@@ -401,12 +405,16 @@ def scroll_until_element_appears(self, element: str, direction: str, timeout: st | |
| utils.save_screenshot(screenshot_np, "scroll_until_element_appears", output_dir=self.execution_dir) | ||
| start_time = time.time() | ||
| while time.time() - start_time < int(timeout): | ||
| result = self.verifier.assert_presence( | ||
| element, timeout_str="3", rule="any") | ||
| if result: | ||
| break | ||
| try: | ||
| result = self.verifier.assert_presence( | ||
| element, timeout_str="3", rule="any") | ||
| if result: | ||
| return | ||
| except Exception: | ||
| pass | ||
|
Comment on lines
+408
to
+414
|
||
| self.driver.scroll(direction, 1000, event_name) | ||
| time.sleep(3) | ||
| raise OpticsError(Code.E0201, message=f"['{element}'] not found based on rule 'any'.") | ||
|
|
||
| @with_self_healing | ||
| def scroll_from_element(self, element: str, direction: str, scroll_length: str, aoi_x: str = "0", aoi_y: str = "0", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -352,31 +352,22 @@ def _handle_module_condition(self, cond_str: str, target: str) -> Optional[List[ | |
| actual_cond = cond_str[1:].strip() if invert else cond_str | ||
|
|
||
| try: | ||
| self.execute_module(actual_cond) | ||
| # Module executed successfully | ||
| if invert: | ||
| # If inverted, success means we should NOT execute target | ||
| execution_logger.debug(f"[_EVALUATE_CONDITIONS] Module '{actual_cond}' succeeded, but condition is inverted. Skipping target '{target}'.") | ||
| return None | ||
| # If not inverted, success means we should execute target | ||
| try: | ||
| return self.execute_module(target) | ||
| except Exception as e: | ||
| execution_logger.warning(f"[_EVALUATE_CONDITIONS] Target module '{target}' raised error: {e}.") | ||
| raise OpticsError(Code.E0401, message=f"Error executing target module '{target}': {e}", cause=e) | ||
| result = self.execute_module(actual_cond) | ||
| condition_true = result is not None and len(result) > 0 | ||
| except Exception as e: | ||
| # Module execution failed | ||
| internal_logger.warning(f"[_EVALUATE_CONDITIONS] Module '{actual_cond}' raised error: {e}.") | ||
| if invert: | ||
| # If inverted, failure means we SHOULD execute target | ||
| internal_logger.debug(f"[_EVALUATE_CONDITIONS] Module '{actual_cond}' failed, but condition is inverted. Executing target '{target}'.") | ||
| try: | ||
| return self.execute_module(target) | ||
| except Exception as target_e: | ||
| internal_logger.warning(f"[_EVALUATE_CONDITIONS] Target module '{target}' raised error: {target_e}.") | ||
| raise OpticsError(Code.E0401, message=f"Error executing target module '{target}': {target_e}", cause=target_e) | ||
| # If not inverted, failure means we should NOT execute target | ||
| return None | ||
| internal_logger.warning(f"[_handle_module_condition] Module '{actual_cond}' raised error: {e}. Treating as false condition.") | ||
| result = None | ||
| condition_true = False | ||
|
Comment on lines
+356
to
+360
|
||
|
|
||
| if invert: | ||
| condition_true = not condition_true | ||
|
|
||
| if condition_true: | ||
| return result if result else self.execute_module(target) | ||
|
|
||
| # condition false → execute target (acts like else) | ||
| target_result = self.execute_module(target) | ||
| return target_result if target_result else None | ||
|
|
||
| def _handle_expression_condition(self, cond_str: str, target: str) -> Optional[List[Any]]: | ||
| """Handles evaluation and execution for expression-based conditions.""" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,12 +40,14 @@ async def _launch_app_async(self, app_identifier, event_name): | |
| self._pw = await async_playwright().start() | ||
|
|
||
| browser = self.config.get("browser", "chromium") | ||
| headless = self.config.get("headless", False) | ||
| headless = True | ||
| viewport = self.config.get("viewport", {"width": 1280, "height": 800}) | ||
|
|
||
| self._browser = await getattr(self._pw, browser).launch(headless=headless) | ||
|
Comment on lines
42
to
46
|
||
| self._context = await self._browser.new_context(viewport=viewport) | ||
| self.page = await self._context.new_page() | ||
| self.page.set_default_navigation_timeout(60000) | ||
| self.page.set_default_timeout(60000) | ||
|
Comment on lines
48
to
+50
|
||
|
|
||
| if app_identifier: | ||
| internal_logger.debug("[Playwright] Navigating to %s", app_identifier) | ||
|
|
@@ -81,6 +83,7 @@ async def _launch_other_app_async(self, app_name: str, event_name=None): | |
| # Create a new page (tab) in the existing context | ||
| internal_logger.debug("[Playwright] Creating new tab for %s", app_name) | ||
| self.page = await self._context.new_page() | ||
| # ⭐ very important stability fix | ||
|
|
||
|
Comment on lines
84
to
87
|
||
| # Navigate to the new URL | ||
| if app_name: | ||
|
|
@@ -111,7 +114,7 @@ async def _navigate_to(self, url: str): | |
| wait_until = self.config.get("navigation_wait_until", "domcontentloaded") | ||
|
|
||
| try: | ||
| await self.page.goto(url, timeout=timeout_ms, wait_until=wait_until) | ||
| await self.page.goto(url, timeout=timeout_ms, wait_until="domcontentloaded") | ||
| except PlaywrightTimeoutError as e: | ||
| current_url = self.page.url or "" | ||
| if current_url and url in current_url: | ||
|
|
@@ -152,7 +155,9 @@ def _normalize_locator(self, element): | |
| # If it doesn't already have the xpath= prefix, add it | ||
| if not element.lower().startswith("xpath="): | ||
| return f"xpath={element}" | ||
|
|
||
| # plain visible text support (important for YouTube test) | ||
| if isinstance(element, str) and not any(element.startswith(p) for p in ("css=", "xpath=", "text=", "//", "input", "#", ".")): | ||
| return f"text={element}" | ||
| return element | ||
|
|
||
| # ===================================================== | ||
|
|
@@ -214,6 +219,13 @@ def press_keycode(self, keycode: str, event_name=None): | |
| # Use mapped key or the keycode string directly (Playwright accepts key names) | ||
| key = key_map.get(keycode, keycode) | ||
| run_async(self.page.keyboard.press(key)) | ||
| # wait for youtube search results page load | ||
| try: | ||
| run_async(self.page.wait_for_load_state("networkidle", timeout=10000)) | ||
| except Exception: | ||
| run_async(self.page.wait_for_timeout(4000)) | ||
| # stabilize search navigation | ||
| run_async(self.page.wait_for_timeout(3000)) | ||
|
Comment on lines
221
to
+228
|
||
|
|
||
| if event_name and self.event_sdk: | ||
| self.event_sdk.capture_event(event_name) | ||
|
|
@@ -308,9 +320,10 @@ async def _swipe_element_async(self, element: str, direction: str, swipe_length: | |
|
|
||
|
|
||
| def scroll(self, direction: str = "down", pixels: int = 120, event_name=None): | ||
| for _ in range(2): | ||
| run_async(self.page.mouse.wheel(0, pixels if direction == "down" else -pixels)) | ||
| run_async(self.page.wait_for_timeout(120)) | ||
| scroll_multiplier = int(self.config.get("scroll_multiplier", 1)) | ||
| delta = (pixels if direction == "down" else -pixels) * scroll_multiplier | ||
| run_async(self.page.mouse.wheel(0, delta)) | ||
| run_async(self.page.wait_for_timeout(200)) | ||
|
|
||
|
|
||
| # ===================================================== | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These loops swallow all exceptions from
verifier.assert_presence(), which can hide unexpected errors (not just “not present” failures). Sinceassert_presenceraisesAssertionErroron a failed presence check, consider catchingAssertionErrorspecifically and letting other exception types surface.