-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathplot_emg.py
85 lines (63 loc) · 2.23 KB
/
plot_emg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from matplotlib import pyplot as plt
import numpy as np
from myo_api import Myo, emg_mode
import time
from collections import Counter
from multiprocessing import Queue, Process, Manager
class MultichannelPlot(object):
def __init__(self, nchan=8, xlen=512):
self.nchan = nchan
self.xlen = xlen
self.fig = plt.figure(figsize=(10, 8))
self.axes = [
self.fig.add_subplot(int(str(self.nchan) + "1" + str(i + 1)))
for i in range(self.nchan)
]
for i, ax in enumerate(self.axes):
plt.sca(ax)
plt.ylabel("Ch.%d" % (i + 1))
self.set_ylim([-128, 128])
self.graphs = [
ax.plot(np.arange(self.xlen), np.zeros(self.xlen))[0] for ax in self.axes
]
def set_ylim(self, lims):
[(ax.set_ylim(lims)) for ax in self.axes]
def update_plot(self, sig):
for g, data in zip(self.graphs, sig):
if len(data) < self.xlen:
data = np.concatenate([np.zeros(self.xlen - len(data)), data])
if len(data) > self.xlen:
data = data[-self.xlen :]
g.set_ydata(data)
plt.draw()
plt.pause(0.04)
def update_plot(emg_data):
plotter = MultichannelPlot()
while True:
plotter.update_plot(np.array(emg_data).T)
def main():
with Manager() as manager:
m = Myo(None, mode=emg_mode.RAW)
emg_data = manager.list()
plotter_process = Process(target=update_plot, args=(emg_data,))
counter = Counter()
state = {}
state["start_time"] = time.time()
def proc_emg(emg, moving):
emg_data.append(emg)
state["curr_time"] = time.time()
counter["samples"] += 1
if state["curr_time"] - state["start_time"] > 1:
counter["samples_per_sec"] = counter["samples"]
counter["samples"] = 0
sps = counter["samples_per_sec"]
print(f"samples per second {sps}")
state["start_time"] = time.time()
m.connect()
m.add_emg_handler(proc_emg)
running = True
plotter_process.start()
while running:
m.run()
if __name__ == "__main__":
main()