diff --git a/examples/network_monitor.py b/examples/network_monitor.py index 7fb143c1..099fc39f 100644 --- a/examples/network_monitor.py +++ b/examples/network_monitor.py @@ -17,13 +17,13 @@ async def main(): tab = await browser.get("https://www.google.com/?hl=en") - reject_btn = await tab.find("reject all", best_match=True) + reject_btn = await tab.find(text="reject all") await reject_btn.click() search_inp = await tab.select("textarea") await search_inp.send_keys("undetected zendriver") - search_btn = await tab.find("google search", True) + search_btn = await tab.find(text="google search") await search_btn.click() for _ in range(10): diff --git a/zendriver/core/tab.py b/zendriver/core/tab.py index 75b4d22b..e173fb49 100644 --- a/zendriver/core/tab.py +++ b/zendriver/core/tab.py @@ -9,7 +9,9 @@ import urllib.parse import warnings import webbrowser -from typing import TYPE_CHECKING, Any, List, Literal, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Coroutine, List, Literal, Optional, Tuple, Union + +from zendriver.cdp.dom import Node from .. import cdp from . import element, util @@ -184,24 +186,21 @@ async def open_external_inspector(self): async def find( self, - text: str, - best_match: bool = True, - return_enclosing_element=True, + tagname: Optional[str] = None, + attrs: Optional[dict[str, str]] = None, + text: Optional[str] = None, timeout: Union[int, float] = 10, ): """ find single element by text can also be used to wait for such element to appear. + :param tagname: tagname to search for. ex: div, span, input, button.. + :type tagname: str + :param attrs: attributes to search for. ex: {'class':'class1', 'name':'name1', 'id':'123'} + :type attrs: dict :param text: text to search for. note: script contents are also considered text :type text: str - :param best_match: :param best_match: when True (default), it will return the element which has the most - comparable string length. this could help tremendously, when for example - you search for "login", you'd probably want the login button element, - and not thousands of scripts,meta,headings containing a string of "login". - When False, it will return naively just the first match (but is way faster). - :type best_match: bool - :param return_enclosing_element: since we deal with nodes instead of elements, the find function most often returns so called text nodes, which is actually a element of plain text, which is the somehow imaginary "child" of a "span", "p", "script" or any other elements which have text between their opening @@ -217,29 +216,38 @@ async def find( # todo, automatically determine node type # ignore the return_enclosing_element flag if the found node is NOT a text node but a # regular element (one having a tag) in which case that is exactly what we need. - :type return_enclosing_element: bool :param timeout: raise timeout exception when after this many seconds nothing is found. :type timeout: float,int """ + loop = asyncio.get_running_loop() start_time = loop.time() - text = text.strip() + tagname = tagname.strip().lower() if tagname else None + attrs = {k.strip(): v.strip() for k, v in attrs.items()} if attrs else None + text = text.strip().lower() if text else None - item = await self.find_element_by_text( - text, best_match, return_enclosing_element + if not text and not tagname and not attrs: + # raising an error in case neither text nor tagname values were provided + raise ValueError( + "You must provide either tagname, attrs, or text to find an element." + ) + + items = await self._find_elements_by_tagname_attrs_text( # items is a list that might contain either a single element if found, or None + tagname=tagname, attrs=attrs, text=text, return_after_first_match=True ) - while not item: + while not items: await self.wait() - item = await self.find_element_by_text( - text, best_match, return_enclosing_element + items = await self._find_elements_by_tagname_attrs_text( + tagname=tagname, attrs=attrs, text=text, return_after_first_match=True ) if loop.time() - start_time > timeout: raise asyncio.TimeoutError( - "time ran out while waiting for text: %s" % text + f"Time ran out while waiting for element with tagname: {tagname}, attributes: {attrs}, text:{text}" ) await self.sleep(0.5) - return item + + return items[0] # returning the first and only element of the list items async def select( self, @@ -275,33 +283,53 @@ async def select( async def find_all( self, - text: str, + tagname: Optional[str] = None, + attrs: Optional[dict[str, str]] = None, + text: Optional[str] = None, timeout: Union[int, float] = 10, ) -> List[Element]: """ find multiple elements by text can also be used to wait for such element to appear. + :param tagname: tagname to search for. ex: div, span, input, button.. + :type tagname: str + :param attrs: attributes to search for. ex: {'class':'class1', 'name':'name1', 'id':'123'} + :type attrs: dict :param text: text to search for. note: script contents are also considered text :type text: str :param timeout: raise timeout exception when after this many seconds nothing is found. :type timeout: float,int """ + loop = asyncio.get_running_loop() - now = loop.time() + start_time = loop.time() - text = text.strip() - items = await self.find_elements_by_text(text) + tagname = tagname.strip().lower() if tagname else None + attrs = {k.strip(): v.strip() for k, v in attrs.items()} if attrs else None + text = text.strip().lower() if text else None + if not text and not tagname and not attrs: + # raising an error in case neither text nor tagname values were provided + raise ValueError( + "You must provide either tagname, attrs, or text to find elements." + ) + + items = await self._find_elements_by_tagname_attrs_text( + tagname=tagname, attrs=attrs, text=text, return_after_first_match=False + ) while not items: await self.wait() - items = await self.find_elements_by_text(text) - if loop.time() - now > timeout: + items = await self._find_elements_by_tagname_attrs_text( + tagname=tagname, attrs=attrs, text=text, return_after_first_match=False + ) + if loop.time() - start_time > timeout: raise asyncio.TimeoutError( - "time ran out while waiting for text: %s" % text + f"Time ran out while waiting for elements with tagname: {tagname}, attributes: {attrs}, text: {text}" ) await self.sleep(0.5) + return items async def select_all( @@ -481,193 +509,159 @@ async def query_selector( return return element.create(node, self, doc) - async def find_elements_by_text( + async def _find_elements_by_tagname_attrs_text( self, - text: str, - tag_hint: Optional[str] = None, + tagname: Optional[str] = None, + attrs: Optional[dict[str, str]] = None, + text: Optional[str] = None, + return_after_first_match: bool = False, ) -> list[Element]: """ - returns element which match the given text. - please note: this may (or will) also return any other element (like inline scripts), - which happen to contain that text. + Finds and returns all elements matching the tagname, attributes, and optional innerText. - :param text: - :type text: - :param tag_hint: when provided, narrows down search to only elements which match given tag eg: a, div, script, span - :type tag_hint: str - :return: - :rtype: + :param tagname: The name of the HTML tag to search for (e.g., 'button', 'input'). Optional. + :type tagname: str | None + :param attrs: A dictionary of attributes and their corresponding values to match. Optional. + :type attrs: dict[str, str] | None + :param text: The expected text value of the element. Optional. + :type text: str | None + :param return_after_first_match: If True, stops traversal and returns a list containing only the first matching element. + :type return_after_first_match: bool + :return: List of matching elements. If return_after_first_match is True, the list contains at most one element. + :rtype: list[Element] """ - text = text.strip() - doc = await self.send(cdp.dom.get_document(-1, True)) - search_id, nresult = await self.send(cdp.dom.perform_search(text, True)) - if nresult: - node_ids = await self.send( - cdp.dom.get_search_results(search_id, 0, nresult) - ) - else: - node_ids = [] - await self.send(cdp.dom.discard_search_results(search_id)) + elements = [] + stop_searching = False # flag to indicate whether to stop searching - items = [] - for nid in node_ids: - node = util.filter_recurse(doc, lambda n: n.node_id == nid) - if not node: - node = await self.send(cdp.dom.resolve_node(node_id=nid)) # type: ignore - if not node: - continue - # remote_object = await self.send(cdp.dom.resolve_node(backend_node_id=node.backend_node_id)) - # node_id = await self.send(cdp.dom.request_node(object_id=remote_object.object_id)) - try: - elem = element.create(node, self, doc) - except: # noqa - continue - if elem.node_type == 3: - # if found element is a text node (which is plain text, and useless for our purpose), - # we return the parent element of the node (which is often a tag which can have text between their - # opening and closing tags (that is most tags, except for example "img" and "video", "br") - - if not elem.parent: - # check if parent actually has a parent and update it to be absolutely sure - await elem.update() - - items.append( - elem.parent or elem - ) # when it really has no parent, use the text node itself - continue - else: - # just add the element itself - items.append(elem) - - # since we already fetched the entire doc, including shadow and frames - # let's also search through the iframes - iframes = util.filter_recurse_all(doc, lambda node: node.node_name == "IFRAME") - if iframes: - iframes_elems = [ - element.create(iframe, self, iframe.content_document) + async def traverse(node: Node, parent_tree): + """Recursive traversal of the DOM and shadow DOM to collect all matching elements.""" + + nonlocal stop_searching + + if not node or stop_searching: + return + + # create an element to check for the conditions we're looking for + elem = element.create(node, self, parent_tree) + + # check for conditions + matches_tagname = ( + not tagname + or ( + elem.tag_name + and tagname.strip().lower() == elem.tag_name.strip().lower() + ) + ) # this condition evaluates to True if tagname was not provided; no filtering by tagname. Or if tagname equals our targeted element's tagname + + matches_attrs = ( + not attrs + or ( + elem.attributes + and all( + any( + elem.attributes[i] == attr + and value in elem.attributes[i + 1].split() + for i in range(0, len(elem.attributes), 2) + ) + for attr, value in attrs.items() + ) + ) + ) # this condition evaluates to True if attrs was not provided; no filtering by attrs. Or if the provided attrs are in our targeted element's attributes + + matches_text = ( + not text + or (elem.text and text.strip().lower() in elem.text.strip().lower()) + ) # this condition evaluates to True if text was not provided; no filtering by text. Or if text is in our targeted element's text + + # if all conditions match, add the element to the list of elements to return + if matches_tagname and matches_attrs and matches_text: + elements.append(elem) + if return_after_first_match: # if return_after_first_match is True then we stop searching for other elements after finding one target element + stop_searching = ( + True # set the flag to True to stop further traversal + ) + return + + # if stop_searching is True, skip further traversal + if stop_searching: + return + + tasks: list[Coroutine] = [] + + # traverse shadow roots nodes + if node.shadow_roots: + tasks.extend( + traverse(shadow_root, parent_tree) + for shadow_root in node.shadow_roots + ) + + # traverse child nodes + if node.children: + tasks.extend(traverse(child, parent_tree) for child in node.children) + + await asyncio.gather(*tasks) + + # fetch the document root + doc = await self.send(cdp.dom.get_document(depth=-1, pierce=True)) + + # start traversing the DOM tree + await traverse(doc, doc) + + # search within iframes concurrently + if not stop_searching: # only search iframes if we haven't found a match yet + iframes = util.filter_recurse_all( + doc, lambda node: node.node_name == "IFRAME" + ) + iframe_tasks = [ + traverse(iframe.content_document, iframe.content_document) for iframe in iframes + if iframe.content_document ] - for iframe_elem in iframes_elems: - if iframe_elem.content_document: - iframe_text_nodes = util.filter_recurse_all( - iframe_elem, - lambda node: node.node_type == 3 # noqa - and text.lower() in node.node_value.lower(), - ) - if iframe_text_nodes: - iframe_text_elems = [ - element.create(text_node, self, iframe_elem.tree) - for text_node in iframe_text_nodes - ] - items.extend( - text_node.parent for text_node in iframe_text_elems - ) - await self.disable_dom_agent() - return items or [] + + if iframe_tasks: + await asyncio.gather(*iframe_tasks) + + # return the appropriate result + if return_after_first_match: + return elements[ + :1 + ] # return a list containing only the first element (or empty list if no match) + else: + return elements # return all matching elements async def find_element_by_text( self, text: str, - best_match: Optional[bool] = False, - return_enclosing_element: Optional[bool] = True, ) -> Element | None: """ finds and returns the first element containing , or best match :param text: :type text: - :param best_match: when True, which is MUCH more expensive (thus much slower), - will find the closest match based on length. - this could help tremendously, when for example you search for "login", you'd probably want the login button element, - and not thousands of scripts,meta,headings containing a string of "login". - - :type best_match: bool - :param return_enclosing_element: - :type return_enclosing_element: :return: :rtype: """ - doc = await self.send(cdp.dom.get_document(-1, True)) - text = text.strip() - search_id, nresult = await self.send(cdp.dom.perform_search(text, True)) - - if nresult: - node_ids = await self.send( - cdp.dom.get_search_results(search_id, 0, nresult) - ) + if not text: + raise ValueError("You must provide a text value to find an element with.") else: - node_ids = [] - await self.send(cdp.dom.discard_search_results(search_id)) + return await self.find(text=text) - if not node_ids: - node_ids = [] - items = [] - for nid in node_ids: - node = util.filter_recurse(doc, lambda n: n.node_id == nid) - if node is None: - continue - - try: - elem = element.create(node, self, doc) - except: # noqa - continue - if elem.node_type == 3: - # if found element is a text node (which is plain text, and useless for our purpose), - # we return the parent element of the node (which is often a tag which can have text between their - # opening and closing tags (that is most tags, except for example "img" and "video", "br") - - if not elem.parent: - # check if parent actually has a parent and update it to be absolutely sure - await elem.update() - - items.append( - elem.parent or elem - ) # when it really has no parent, use the text node itself - continue - else: - # just add the element itself - items.append(elem) - - # since we already fetched the entire doc, including shadow and frames - # let's also search through the iframes - iframes = util.filter_recurse_all(doc, lambda node: node.node_name == "IFRAME") - if iframes: - iframes_elems = [ - element.create(iframe, self, iframe.content_document) - for iframe in iframes - ] - for iframe_elem in iframes_elems: - iframe_text_nodes = util.filter_recurse_all( - iframe_elem, - lambda node: node.node_type == 3 # noqa - and text.lower() in node.node_value.lower(), - ) - if iframe_text_nodes: - iframe_text_elems = [ - element.create(text_node, self, iframe_elem.tree) - for text_node in iframe_text_nodes - ] - items.extend(text_node.parent for text_node in iframe_text_elems) - try: - if not items: - return None - if best_match: - closest_by_length = min( - items, key=lambda el: abs(len(text) - len(el.text_all)) - ) - elem = closest_by_length or items[0] - - return elem - else: - # naively just return the first result - for elem in items: - if elem: - return elem - finally: - await self.disable_dom_agent() + async def find_elements_by_text(self, text: str) -> list[Element]: + """ + returns element which match the given text. + please note: this may (or will) also return any other element (like inline scripts), + which happen to contain that text. - return None + :param text: + :type text: + :return: + :rtype: + """ + if not text: + raise ValueError("You must provide a text value to find elements with.") + else: + return await self.find_all(text=text) async def back(self): """ @@ -1122,8 +1116,10 @@ async def scroll_up(self, amount=25): async def wait_for( self, - selector: str | None = None, - text: str | None = None, + tagname: Optional[str] = None, + attrs: Optional[dict[str, str]] = None, + selector: Optional[str] = None, + text: Optional[str] = None, timeout: int | float = 10, ) -> element.Element: """ @@ -1134,6 +1130,10 @@ async def wait_for( it will block for a maximum of seconds, after which an TimeoutError will be raised + :param tagname: element tagname + :type tagname: str + :param attrs: dictionary of attributes + :type attrs: dictionary :param selector: css selector :type selector: :param text: text @@ -1146,24 +1146,35 @@ async def wait_for( """ loop = asyncio.get_running_loop() start_time = loop.time() - if selector: - item = await self.query_selector(selector) + + if ( + tagname or attrs or text + ): # waiting for an element using either their tagname, attributes, text, or all. + if not tagname: + tagname = None + if not attrs: + attrs = None + if not text: + text = None + + item = await self.find(tagname=tagname, attrs=attrs, text=text) while not item and loop.time() - start_time < timeout: - item = await self.query_selector(selector) + item = await self.find(tagname=tagname, attrs=attrs, text=text) await self.sleep(0.5) if item: return item - if text: - item = await self.find_element_by_text(text) + + if selector: + item = await self.query_selector(selector) while not item and loop.time() - start_time < timeout: - item = await self.find_element_by_text(text) + item = await self.query_selector(selector) await self.sleep(0.5) if item: return item - raise asyncio.TimeoutError("time ran out while waiting") + raise asyncio.TimeoutError("Time ran out while waiting.") async def wait_for_ready_state( self, @@ -1530,6 +1541,8 @@ async def set_user_agent( def __call__( self, + tagname: str | None = None, + attrs: dict[str, str] | None = None, text: str | None = None, selector: str | None = None, timeout: int | float = 10, @@ -1543,7 +1556,9 @@ def __call__( :return: :rtype: """ - return self.wait_for(text, selector, timeout) + return self.wait_for( + tagname=tagname, attrs=attrs, text=text, selector=selector, timeout=timeout + ) def __eq__(self, other: Any) -> bool: if not isinstance(other, Tab):