-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathlasertarget.py
197 lines (174 loc) · 7.1 KB
/
lasertarget.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
#! /usr/bin/env python
"""
Purpose: Detect a red laser pointer on a surface and sends the picture to a Chromecast device on change.
Uses:
* Modified version of: https://github.com/bradmontgomery/python-laser-tracker for detecting the laser pointer
(scalable window; trigger on change; only show first laser hit, not trail)
* HTTPServer to serve the latest image regardless of client caching settings
* PyChromecast to display the latest image on a Chromecast device
Example:
lasertarget.py --address 192.168.0.2 --port 8080 --index 2 --castto "Target Monitor"
* address and port of local webserver on same network as Chromecast device
* index of camera device in OpenCV list, trial and error for now
* castto the Chromecast device with this name
"""
# built-in
import argparse
import time # to sleep
import threading
import socket # to get IP address
import queue
from http.server import BaseHTTPRequestHandler, HTTPServer
# 3rd party
import pychromecast
# local
from detection import LaserTracker
# WebServer stuff
class NoCacheServer(BaseHTTPRequestHandler):
def do_GET(self):
# No matter what the URL is, serve the same file, this helps get around client caching settings
print("Got request for file [{}]...".format(self.path))
self.send_response(200)
self.send_header('Content-type', 'image/jpg')
self.end_headers()
with open('latest.jpg', 'rb') as file:
self.wfile.write(file.read()) # Read the file and send the contents
file.close()
def WebServerThread(address='127.0.0.1', port=8080):
myServer = HTTPServer((address, port), NoCacheServer)
myServer.serve_forever() # ...forever ever?
def StartWebServer(params):
print("Starting web server for Chromecast on {}:{}...".format(params.address, params.port))
daemon = threading.Thread(name='chromecastserver', target=WebServerThread, args=(params.address, params.port))
daemon.setDaemon(True) # Set as a daemon so it will be killed once the main thread is dead.
daemon.start()
# LaserTracker stuff
def LaserTrackerThread(params):
tracker = LaserTracker(
cam_width=params.width,
cam_height=params.height,
cam_zoom=params.zoom,
hue_min=params.huemin,
hue_max=params.huemax,
sat_min=params.satmin,
sat_max=params.satmax,
val_min=params.valmin,
val_max=params.valmax,
display_thresholds=params.display,
cam_index=params.index,
detection_queue=params.queue
)
tracker.run()
def StartLaserTracker(params):
# start the thread
print("Starting LaserTracker...")
daemon = threading.Thread(name='lasertracker', target=LaserTrackerThread, args=(params, ))
daemon.setDaemon(True) # Set as a daemon so it will be killed once the main thread is dead.
daemon.start()
# main
def main(params):
params.queue = queue.Queue()
# try get the local IP if it wasn't provided
if (params.address == ''):
# https://stackoverflow.com/questions/166506/finding-local-ip-addresses-using-pythons-stdlib
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# doesn't even have to be reachable
s.connect(('10.255.255.255', 1))
params.address = s.getsockname()[0]
except:
params.address = '127.0.0.1'
finally:
s.close()
# try get the Chromecast if specified, or just use the first one if not
chromecasts = pychromecast.get_chromecasts()
cast = None
if len(chromecasts) > 0:
if (params.castto == ''):
cast = chromecasts[0]
else:
for cc in chromecasts:
if cc.device.friendly_name == params.castto:
cast = cc
break
if cast:
cast.wait()
mc = cast.media_controller
StartWebServer(params)
else:
print("! WARNING: could not find Chromecast !")
StartLaserTracker(params)
index = 0;
print("Running...")
while True:
if not params.queue.empty():
params.queue.get()
print("File changed, refreshing...")
index = index + 1 # Ensure it requests a new file that isn't in its cache
url = "http://{}:{}/latest{}.jpg".format(params.address, params.port, index)
print("New URL: {}".format(url))
if (cast):
mc.play_media(url, content_type = "image/jpg")
mc.block_until_active()
mc.play()
time.sleep(0.2)
# parameters and bootstrap
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Run the Laser Target Detector')
parser.add_argument('-I', '--index',
default=0,
type=int,
help='Device Index of camera in OpenCV')
parser.add_argument('-W', '--width',
default=640,
type=int,
help='Camera Width')
parser.add_argument('-H', '--height',
default=480,
type=int,
help='Camera Height')
parser.add_argument('-u', '--huemin',
default=20,
type=int,
help='Hue Minimum Threshold')
parser.add_argument('-U', '--huemax',
default=160,
type=int,
help='Hue Maximum Threshold')
parser.add_argument('-s', '--satmin',
default=100,
type=int,
help='Saturation Minimum Threshold')
parser.add_argument('-S', '--satmax',
default=255,
type=int,
help='Saturation Maximum Threshold')
parser.add_argument('-v', '--valmin',
default=200,
type=int,
help='Value Minimum Threshold')
parser.add_argument('-V', '--valmax',
default=255,
type=int,
help='Value Maximum Threshold')
parser.add_argument('-d', '--display',
action='store_true',
help='Display Threshold Windows')
parser.add_argument('-A', '--address',
default='', # will be patched later with gethostbyname if not provided
type=str,
help='Local IP to serve Chromecast content on')
parser.add_argument('-P', '--port',
default=8080,
type=int,
help='Local Port to serve Chromecast content on')
parser.add_argument('-C', '--castto',
default='',
type=str,
help='Name of Chromecast device to cast to')
parser.add_argument('-Z', '--zoom',
default=1,
type=int,
help='Zoom factor for capture window')
params = parser.parse_args()
main(params)