-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathharmful_client.py
51 lines (43 loc) · 1.19 KB
/
harmful_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import json
from itertools import cycle
import asyncclick as click
import trio
from loguru import logger
from trio_websocket import open_websocket_url
ERRORS = [
json.dumps(
{
"data": {
"south_lat": 1,
"north_lat": 1,
"west_lng": 2,
"east_lng": 2,
}
}
),
json.dumps({"msgType": "Error Msg Type"}),
json.dumps({"msgType": "newBounds", "data": {"lat": 123, "north_lat": 565}}),
"abc",
'{ "busId": [] }',
]
@click.command()
@click.option(
"--server",
default="ws://127.0.0.1:8080",
show_default=True,
type=str,
help="Server address",
)
async def main(server: str):
async with open_websocket_url(server) as ws:
for error in cycle(ERRORS):
try:
await ws.send_message(error)
logger.info(f"Send msg {error}")
message = await ws.get_message()
logger.error(f"Received message: {message}")
except Exception as e:
logger.error(f"Connection attempt failed: {e}")
await trio.sleep(5)
if __name__ == "__main__":
main(_anyio_backend="trio")