-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfake_bus.py
133 lines (115 loc) · 3.6 KB
/
fake_bus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import itertools
import json
import random
import sys
from dataclasses import asdict
from itertools import cycle, islice
import asyncclick as click
import trio
from loguru import logger
from trio_websocket import open_websocket_url
from schemas import Bus
from utils import generate_bus_id, load_routes, relaunch_on_disconnect
@click.command()
@click.option(
"--server",
default="ws://127.0.0.1:8080",
show_default=True,
type=str,
help="Server address",
)
@click.option(
"--routes_number",
default=5,
show_default=True,
type=int,
help="Amount of routes. There are 963 routes available",
)
@click.option(
"--buses_per_route",
default=3,
show_default=True,
type=int,
help="Amount of busses for each route.",
)
@click.option(
"--websockets_number",
default=3,
show_default=True,
type=int,
help="Amount of open websockets",
)
@click.option(
"--emulator_id",
default="",
show_default=True,
type=str,
help="Prefix for bus_id in case of running several instances of emulator",
)
@click.option(
"--refresh_timeout",
default=0.1,
show_default=True,
type=float,
help="Server coordinates refresh rate",
)
@click.option(
"--verbose",
"-v",
is_flag=True,
help="Display verbose log output",
)
async def main(
server: str,
routes_number: int,
buses_per_route: int,
websockets_number: int,
emulator_id: str,
refresh_timeout: float,
verbose: bool,
):
send_channels = []
if not verbose:
logger.remove()
logger.add(sys.stderr, level="ERROR")
async with trio.open_nursery() as nursery:
for _ in range(websockets_number):
send_channel, recieve_channel = trio.open_memory_channel(0)
send_channels.append(send_channel)
nursery.start_soon(send_updates, server, recieve_channel, refresh_timeout)
for bus_number in range(1, buses_per_route + 1):
for route in itertools.islice(load_routes(), routes_number):
bus_id = generate_bus_id(emulator_id, route["name"], bus_number)
send_channel = random.choice(send_channels)
nursery.start_soon(run_bus, send_channel, route, bus_id)
@relaunch_on_disconnect
async def send_updates(
server_address: str,
receive_channel: trio.MemoryReceiveChannel,
refresh_timeout: float,
):
try:
async with open_websocket_url(server_address) as ws:
while True:
async for value in receive_channel:
await ws.send_message(value)
await trio.sleep(refresh_timeout)
except OSError as ose:
logger.error(f"Connection attempt failed: {ose}")
async def create_bus(bus_id: str, route: dict, send_channel: trio.MemorySendChannel):
stop = len(route["coordinates"])
for coords in islice(route["coordinates"], random.randint(1, stop), stop):
latitude, longtitude = coords
bus = Bus(busId=bus_id, lat=latitude, lng=longtitude, route=route["name"])
await send_channel.send(json.dumps(asdict(bus), ensure_ascii=True))
for coords in cycle(route["coordinates"]):
latitude, longtitude = coords
bus = Bus(busId=bus_id, lat=latitude, lng=longtitude, route=route["name"])
await send_channel.send(json.dumps(asdict(bus), ensure_ascii=True))
async def run_bus(send_channel: trio.MemorySendChannel, route: dict, bus_id: str):
try:
await create_bus(bus_id, route, send_channel)
except OSError as ose:
logger.error(f"Connection attempt failed: {ose}")
if __name__ == "__main__":
main(_anyio_backend="trio")