Skip to content

Commit b7c49e1

Browse files
committed
Add an XHR CDP Mode example
1 parent 265deb5 commit b7c49e1

File tree

1 file changed

+84
-0
lines changed

1 file changed

+84
-0
lines changed

examples/cdp_mode/raw_xhr_sb.py

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
"""CDP.network.ResponseReceived with CDP.network.ResourceType.XHR."""
2+
import ast
3+
import colorama
4+
import mycdp
5+
import sys
6+
import time
7+
from seleniumbase import SB
8+
9+
xhr_requests = []
10+
last_xhr_request = None
11+
c1 = colorama.Fore.BLUE + colorama.Back.LIGHTYELLOW_EX
12+
c2 = colorama.Fore.BLUE + colorama.Back.LIGHTGREEN_EX
13+
cr = colorama.Style.RESET_ALL
14+
if "linux" in sys.platform:
15+
c1 = c2 = cr = ""
16+
17+
18+
def listenXHR(page):
19+
async def handler(evt):
20+
# Get AJAX requests
21+
if evt.type_ is mycdp.network.ResourceType.XHR:
22+
xhr_requests.append([evt.response.url, evt.request_id])
23+
global last_xhr_request
24+
last_xhr_request = time.time()
25+
page.add_handler(mycdp.network.ResponseReceived, handler)
26+
27+
28+
async def receiveXHR(page, requests):
29+
responses = []
30+
retries = 0
31+
max_retries = 5
32+
# Wait at least 2 seconds after last XHR request for more
33+
while True:
34+
if last_xhr_request is None or retries > max_retries:
35+
break
36+
if time.time() - last_xhr_request <= 2:
37+
retries = retries + 1
38+
time.sleep(2)
39+
continue
40+
else:
41+
break
42+
await page
43+
# Loop through gathered requests and get response body
44+
for request in requests:
45+
try:
46+
res = await page.send(mycdp.network.get_response_body(request[1]))
47+
if res is None:
48+
continue
49+
responses.append({
50+
"url": request[0],
51+
"body": res[0],
52+
"is_base64": res[1],
53+
})
54+
except Exception as e:
55+
print("Error getting response:", e)
56+
return responses
57+
58+
59+
with SB(uc=True, test=True, locale_code="en") as sb:
60+
sb.activate_cdp_mode("about:blank")
61+
tab = sb.cdp.page
62+
listenXHR(tab)
63+
64+
# Change url to something that makes ajax requests
65+
sb.cdp.open("https://learn.microsoft.com/en-us/")
66+
time.sleep(2)
67+
for i in range(15):
68+
sb.cdp.scroll_down(15)
69+
70+
loop = sb.cdp.get_event_loop()
71+
xhr_responses = loop.run_until_complete(receiveXHR(tab, xhr_requests))
72+
for response in xhr_responses:
73+
print(c1 + "*** ==> XHR Request URL <== ***" + cr)
74+
print(f'{response["url"]}')
75+
is_base64 = response["is_base64"]
76+
b64_data = "Base64 encoded data"
77+
try:
78+
headers = ast.literal_eval(response["body"])["headers"]
79+
print(c2 + "*** ==> XHR Response Headers <== ***" + cr)
80+
print(headers if not is_base64 else b64_data)
81+
except Exception:
82+
response_body = response["body"]
83+
print(c2 + "*** ==> XHR Response Body <== ***" + cr)
84+
print(response_body if not is_base64 else b64_data)

0 commit comments

Comments
 (0)