-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathavrconnector.py
144 lines (126 loc) · 4.62 KB
/
avrconnector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import struct
import serial
from blinkconfig import *
from sys import platform
import logging
from enum import Enum
class ConnectionType(Enum):
usb = 1
bluetooth = 2
ethernet = 3
class AVRConnector:
def __init__(self, baud_rate=38400):
self.baud_rate = baud_rate
self.write = None
self.close = None
self.active = False
def _connect_linux(self):
dev = getstring("serial_device_linux")
for x in [""] + list(range(0, 10)):
try:
logging.info("Trying to connect to %s%s" % (dev, str(x)))
ser = serial.Serial("%s%s" % (dev, str(x)), baudrate=self.baud_rate)
return ser.write, ser.close
except serial.serialutil.SerialException:
pass
return False
def _connect_bluetooth_linux(self):
try:
import socket
server_mac = getstring("bluetooth_mac")
s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM)
s.settimeout(getint("connection_timeout"))
s.connect((server_mac, 1))
return s.send, s.close
except:
return False
def _connect_bluetooth_windows(self):
try:
ser = serial.Serial(port=getstring("bluetooth_device_windows"), baudrate=self.baud_rate)
return ser.write, ser.close
except:
return False
def _connect_macos(self, dev):
try:
logging.info("Trying to connect to %s" % dev)
ser = serial.Serial(dev, baudrate=self.baud_rate)
return ser.write, ser.close
except serial.serialutil.SerialException:
return False
def _connect_windows(self):
try:
ser = serial.Serial(port=getstring("serial_device_windows"), baudrate=self.baud_rate)
return ser.write, ser.close
except:
return False
def _connect_ethernet(self):
try:
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((getstring("ethernet_host"), getint("ethernet_port")))
return s.send, s.close
except:
return False
def connect(self, connection_type):
"""
:param connection_type: usb, ethernet, bluetooth
:return:
"""
if connection_type == ConnectionType.ethernet:
ret = self._connect_ethernet()
elif platform == "win32":
if connection_type == ConnectionType.bluetooth:
ret = self._connect_bluetooth_windows()
elif connection_type == ConnectionType.usb:
ret = self._connect_windows()
elif platform == "darwin":
if connection_type == ConnectionType.bluetooth:
ret = self._connect_macos(getstring("bluetooth_device_macos"))
elif connection_type == ConnectionType.usb:
ret = self._connect_macos(getstring("serial_device_macos"))
else:
if connection_type == ConnectionType.bluetooth:
ret = self._connect_bluetooth_linux()
elif connection_type == ConnectionType.usb:
ret = self._connect_linux()
if not ret:
return False
else:
self.write, self.close = ret
self.active = True
return True
@staticmethod
def pack_mcuf(dat, size, even_line_starts_left):
if not even_line_starts_left:
copy = list(dat)
chunks = [copy[x:x+size.width] for x in range(0, len(copy), size.width)]
dat = []
for idx, c in enumerate(chunks):
if idx % 2 == 1:
c.reverse()
dat.extend(c)
dat = [item for sub in dat for item in sub]
data_len = len(dat)
return struct.pack("!HHHHHH" + "B"*data_len, 13, 37, size.height, size.width, 3, 255, *dat)
def write_frame(self, colors, grid_size, even_line_starts_left=False):
c = list(reversed(colors))
d = self.pack_mcuf(c, grid_size, even_line_starts_left)
self.write(d)
def write_single_color(self, color, grid_size):
colors = [color]*grid_size.height*grid_size.width
d = self.pack_mcuf(colors, grid_size, True)
self.write(d)
def is_active(self):
return self.active
def close(self):
self.close()
self.active = False
# TESTS
if __name__ == "__main__":
conn = AVRConnector()
conn.connect()
while True:
data = input("> ").split(" ")
data = [int(d) for d in data]
print(data*15)
conn.write_colors(data*15)