16
16
17
17
18
18
class WokwiClient :
19
+ """
20
+ Asynchronous client for the Wokwi Simulation API.
21
+
22
+ This class provides methods to connect to the Wokwi simulator, upload files, control simulations,
23
+ and monitor serial output. It is designed to be asyncio-friendly and easy to use in Python scripts
24
+ and applications.
25
+ """
26
+
19
27
version : str
20
28
last_pause_nanos : int
21
29
22
30
def __init__ (self , token : str , server : Optional [str ] = None ):
31
+ """
32
+ Initialize the WokwiClient.
33
+
34
+ Args:
35
+ token: API token for authentication (get from https://wokwi.com/dashboard/ci).
36
+ server: Optional custom server URL. Defaults to the public Wokwi server.
37
+ """
23
38
self .version = get_version ()
24
39
self ._transport = Transport (token , server or DEFAULT_WS_URL )
25
40
self .last_pause_nanos = 0
26
41
self ._transport .add_event_listener ("sim:pause" , self ._on_pause )
27
42
self ._pause_queue = EventQueue (self ._transport , "sim:pause" )
28
43
29
44
async def connect (self ) -> dict [str , Any ]:
45
+ """
46
+ Connect to the Wokwi simulator server.
47
+
48
+ Returns:
49
+ A dictionary with server information (e.g., version).
50
+ """
30
51
return await self ._transport .connect ()
31
52
32
53
async def disconnect (self ) -> None :
54
+ """
55
+ Disconnect from the Wokwi simulator server.
56
+ """
33
57
await self ._transport .close ()
34
58
35
59
async def upload (self , name : str , content : bytes ) -> ResponseMessage :
60
+ """
61
+ Upload a file to the simulator from bytes content.
62
+
63
+ Args:
64
+ name: The name to use for the uploaded file.
65
+ content: The file content as bytes.
66
+
67
+ Returns:
68
+ The response message from the server.
69
+ """
36
70
return await upload (self ._transport , name , content )
37
71
38
72
async def upload_file (
39
73
self , filename : str , local_path : Optional [Path ] = None
40
74
) -> ResponseMessage :
75
+ """
76
+ Upload a local file to the simulator.
77
+
78
+ Args:
79
+ filename: The name to use for the uploaded file.
80
+ local_path: Optional path to the local file. If not provided, uses filename as the path.
81
+
82
+ Returns:
83
+ The response message from the server.
84
+ """
41
85
return await upload_file (self ._transport , filename , local_path )
42
86
43
- async def start_simulation (self , ** kwargs : Any ) -> ResponseMessage :
44
- return await start (self ._transport , ** kwargs )
87
+ async def start_simulation (
88
+ self ,
89
+ firmware : str ,
90
+ elf : str ,
91
+ pause : bool = False ,
92
+ chips : list [str ] = [],
93
+ ) -> ResponseMessage :
94
+ """
95
+ Start a new simulation with the given parameters.
96
+
97
+ The firmware and ELF files must be uploaded to the simulator first using the
98
+ `upload()` or `upload_file()` methods. The firmware and ELF files are required
99
+ for the simulation to run.
100
+
101
+ The optional `chips` parameter can be used to load custom chips into the simulation.
102
+ For each custom chip, you need to upload two files:
103
+ - A JSON file with the chip definition, called `<chip_name>.chip.json`.
104
+ - A binary file with the chip firmware, called `<chip_name>.chip.bin`.
105
+
106
+ For example, to load the `inverter` chip, you need to upload the `inverter.chip.json`
107
+ and `inverter.chip.bin` files. Then you can pass `["inverter"]` to the `chips` parameter,
108
+ and reference it in your diagram.json file by adding a part with the type `chip-inverter`.
109
+
110
+ Args:
111
+ firmware: The firmware binary filename.
112
+ elf: The ELF file filename.
113
+ pause: Whether to start the simulation paused (default: False).
114
+ chips: List of custom chips to load into the simulation (default: empty list).
115
+
116
+ Returns:
117
+ The response message from the server.
118
+ """
119
+ return await start (
120
+ self ._transport ,
121
+ firmware = firmware ,
122
+ elf = elf ,
123
+ pause = pause ,
124
+ chips = chips ,
125
+ )
45
126
46
127
async def pause_simulation (self ) -> ResponseMessage :
128
+ """
129
+ Pause the running simulation.
130
+
131
+ Returns:
132
+ The response message from the server.
133
+ """
47
134
return await pause (self ._transport )
48
135
49
136
async def resume_simulation (self , pause_after : Optional [int ] = None ) -> ResponseMessage :
137
+ """
138
+ Resume the simulation, optionally pausing after a given number of nanoseconds.
139
+
140
+ Args:
141
+ pause_after: Number of nanoseconds to run before pausing again (optional).
142
+
143
+ Returns:
144
+ The response message from the server.
145
+ """
50
146
return await resume (self ._transport , pause_after )
51
147
52
148
async def wait_until_simulation_time (self , seconds : float ) -> None :
149
+ """
150
+ Pause and resume the simulation until the given simulation time (in seconds) is reached.
151
+
152
+ Args:
153
+ seconds: The simulation time to wait for, in seconds.
154
+ """
53
155
await pause (self ._transport )
54
156
remaining_nanos = seconds * 1e9 - self .last_pause_nanos
55
157
if remaining_nanos > 0 :
@@ -58,9 +160,21 @@ async def wait_until_simulation_time(self, seconds: float) -> None:
58
160
await self ._pause_queue .get ()
59
161
60
162
async def restart_simulation (self , pause : bool = False ) -> ResponseMessage :
163
+ """
164
+ Restart the simulation, optionally starting paused.
165
+
166
+ Args:
167
+ pause: Whether to start the simulation paused (default: False).
168
+
169
+ Returns:
170
+ The response message from the server.
171
+ """
61
172
return await restart (self ._transport , pause )
62
173
63
174
async def serial_monitor_cat (self ) -> None :
175
+ """
176
+ Print serial monitor output to stdout as it is received from the simulation.
177
+ """
64
178
async for line in monitor_lines (self ._transport ):
65
179
print (line .decode ("utf-8" ), end = "" , flush = True )
66
180
0 commit comments