-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathsailenv_manager.py
More file actions
77 lines (59 loc) · 2.3 KB
/
Copy pathsailenv_manager.py
File metadata and controls
77 lines (59 loc) · 2.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import os
import platform
import subprocess
from sailenv.agent import Agent
executables_per_platform = {
"Windows": "SAILenv.exe",
"Linux": "SAILenv.run",
"Darwin": os.path.join("SAILenv.app", "Contents", "MacOS", "SAILenv")
}
class SAILenvException(Exception):
def __init__(self, stdout, stderr, *args: object) -> None:
super().__init__(*args)
self.stdout = stdout
self.stderr = stderr
def __str__(self):
return "STDOUT: \n" + \
"\n".join(self.stdout) + "\n" + \
"STDERR: \n" + \
"\n".join(self.stderr)
class SAILenvManager:
# process: Union[Popen[bytes], Popen[Any], None]
def __init__(self, port="8085", prefix=None, sailenv_home="./"):
"""
Inits the SAILenv Manager
:param port: port on which to run sailenv
:param prefix: optional prefix (can be used to set environment variables, such as DISPLAY in linux systems)
:param sailenv_home: The path where the SAILenv executable is installed
"""
curr_platform = platform.system()
self.sailenv_exe = os.path.join(sailenv_home, executables_per_platform[curr_platform])
self.port = port
self.sailenv_home = sailenv_home
self.prefix = prefix
self.process = None
def start(self):
command = []
if self.prefix is not None:
command.append(self.prefix)
command.extend([self.sailenv_exe, f"--port={self.port}"])
self.process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=self.sailenv_home)
# wait a second to be sure the executable has started
try:
self.process.wait(timeout=1)
# if timeout is not expired, SAILenv has died without notice
stdout = self.process.stderr.readlines()
stderr = self.process.stderr.readlines()
raise SAILenvException(stdout, stderr)
except subprocess.TimeoutExpired:
# if timeout expired, it means that SAILenv is still running
return True
def change_scene(self, scene):
agent = Agent(port=int(self.port))
agent.register()
agent.change_scene(scene)
def stop(self):
self.process.kill()
def restart(self):
self.stop()
self.start()