forked from stb-tester/stb-tester
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstbt_control_relay.py
executable file
·101 lines (77 loc) · 2.99 KB
/
stbt_control_relay.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#!/usr/bin/python
"""
Allows using any of the stbt remote control backends remotely using the lirc
protocol.
Presents the same socket protocol as lircd but sending keypresses using any of
stbt's controls. This allows for example controlling a roku over its HTTP
interface from some software that only speaks lirc.
Example usage:
$ stbt control-relay file:example
Listens on `/var/run/lirc/lircd` for lirc clients. Keypress sent will be
written to the file example. So
$ irsend SEND_ONCE stbt KEY_UP
Will write the text "KEY_UP" to the file `example`.
$ stbt control-relay --socket=lircd.sock roku:192.168.1.13
Listens on lircd.sock and will forward keypresses to the roku at 192.168.1.13
using its HTTP protocol. So
$ irsend -d lircd.sock SEND_ONCE stbt KEY_OK
Will press KEY_OK on the roku device.
"""
import argparse
import re
import signal
import socket
import sys
import _stbt.logging
from _stbt.control import uri_to_remote
def main(argv):
parser = argparse.ArgumentParser(
epilog=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument(
"--socket", default="/var/run/lirc/lircd", help="""LIRC socket to read
remote control presses from (defaults to %(default)s).""")
parser.add_argument("output", help="""Remote control configuration to
transmit on. Values are the same as stbt run's --control.""")
_stbt.logging.argparser_add_verbose_argument(parser)
args = parser.parse_args(argv[1:])
signal.signal(signal.SIGTERM, lambda _signo, _stack_frame: sys.exit(0))
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.bind(args.socket)
s.listen(5)
control = uri_to_remote(args.output)
while True:
conn, _ = s.accept()
for cmd in conn.makefile():
cmd = cmd.rstrip("\n")
m = re.match(r"SEND_ONCE (?P<ctrl>\w+) (?P<key>\w+)", cmd)
if not m:
debug("Ignoring invalid command: %s" % cmd)
continue
key = m.groupdict()["key"]
debug("Received %s" % key)
try:
control.press(key)
except Exception as e: # pylint: disable=broad-except
debug("Error pressing key %r: %r" % (key, e))
send_response(conn, cmd, success=False, data=str(e))
continue
send_response(conn, cmd, success=True)
def send_response(sock, request, success, data=""):
# See http://www.lirc.org/html/lircd.html
message = "BEGIN\n{cmd}\n{status}\n".format(
cmd=request,
status="SUCCESS" if success else "ERROR")
if data:
data = data.split("\n")
message += "DATA\n{length}\n{data}\n".format(
length=len(data),
data="\n".join(data))
message += "END\n"
try:
sock.sendall(message)
except Exception: # pylint: disable=broad-except
pass
def debug(s):
_stbt.logging.debug("stbt-control-relay: %s" % s)
if __name__ == "__main__":
sys.exit(main(sys.argv))